Skip to content
Published on

AutoML完全ガイド:AutoGluon、FLAML、Optunaで自動化されたMLパイプライン

Authors

1. AutoML概要

AutoMLとは?

AutoML(Automated Machine Learning)は機械学習パイプラインのさまざまな段階を自動化します。データサイエンティストがかつて手動で行っていた作業、データの前処理、特徴量エンジニアリング、モデル選択、ハイパーパラメータ最適化、アンサンブルなどをアルゴリズムが自動的に処理します。

AutoMLが自動化すること:

  1. データ前処理の自動化

    • 欠損値補完戦略の選択
    • スケーリングと正規化手法の選択
    • 外れ値処理
  2. 特徴量エンジニアリングの自動化

    • 特徴量変換(対数、二乗、相互作用)
    • カテゴリエンコーディング手法の選択
    • 特徴量選択と生成
  3. モデル選択(アルゴリズム選択)

    • 多様なアルゴリズムの探索
    • メタ学習(過去のタスクからの経験を活用)
  4. ハイパーパラメータ最適化(HPO)

    • グリッド/ランダム探索
    • ベイズ最適化
    • 進化的アルゴリズム
  5. アンサンブルの自動化

    • 最適なアンサンブル設定の探索
    • 自動スタッキングとブレンディング
  6. ニューラルアーキテクチャ探索(NAS)

    • 最適なニューラルネットワークアーキテクチャの自動設計

AutoMLの応用分野

産業への応用:

  • 金融:信用リスクモデル、自動不正検出
  • 医療:診断支援システムの迅速なプロトタイピング
  • 小売:自動需要予測モデルの更新
  • 製造:品質管理モデルの自動化

主要なオープンソースAutoMLツール:

ツール開発者強み
AutoGluonAmazonマルチモーダル、表形式、画像、テキスト
FLAMLMicrosoftコスト効率、高速
OptunaPreferred NetworksHPO、可視化
H2O AutoMLH2O.aiエンタープライズ、解釈可能性
Auto-sklearnAutoML Groupscikit-learn互換
Ray TuneAnyscale分散HPO
NNIMicrosoftNAS、HPO

AutoMLの長所と短所

長所:

  • 非専門家でも高品質なモデルを構築できる
  • 繰り返し実験の自動化により時間を節約
  • 人間が見逃すかもしれないハイパーパラメータの組み合わせを発見
  • 再現可能なパイプラインを提供

短所:

  • 計算コストが非常に高くなる可能性がある
  • ドメイン知識を組み込む能力が限られている
  • ブラックボックスの性質(内部動作が理解しにくい)
  • 特化した問題ではカスタムソリューションがより効果的
  • データ漏洩のリスク

2. ハイパーパラメータ最適化(HPO)

グリッド探索

最も単純なHPO手法 — 探索空間のすべての組み合わせを網羅的に試みます。

from sklearn.model_selection import GridSearchCV
import xgboost as xgb

def grid_search_example(X_train, y_train):
    """グリッド探索:網羅的(小さな探索空間でのみ実用的)"""
    param_grid = {
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'n_estimators': [100, 300, 500],
        'subsample': [0.7, 0.9],
    }
    # 組み合わせ総数: 3 * 3 * 3 * 2 = 54 * CVフォールド

    model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
    grid_search = GridSearchCV(
        model, param_grid,
        cv=5, scoring='roc_auc', n_jobs=-1, verbose=1, refit=True
    )
    grid_search.fit(X_train, y_train)

    print(f"Best params: {grid_search.best_params_}")
    print(f"Best CV score: {grid_search.best_score_:.4f}")

    results = pd.DataFrame(grid_search.cv_results_)
    print(results.sort_values('mean_test_score', ascending=False)[
        ['params', 'mean_test_score', 'std_test_score']
    ].head(10))

    return grid_search.best_estimator_

ランダム探索

Bergstra & Bengio(2012)が提案 — パラメータ分布からランダムにサンプリングし、グリッド探索よりもはるかに効率的な場合が多いです。

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint, loguniform

def random_search_example(X_train, y_train, n_iter=100):
    """ランダム探索:連続分布からサンプリング"""
    param_distributions = {
        'max_depth': randint(3, 10),
        'learning_rate': loguniform(1e-3, 0.5),   # [0.001, 0.5]の対数一様分布
        'n_estimators': randint(100, 1000),
        'subsample': uniform(0.6, 0.4),           # [0.6, 1.0]の一様分布
        'colsample_bytree': uniform(0.6, 0.4),
        'reg_alpha': loguniform(1e-4, 10),
        'reg_lambda': loguniform(1e-4, 10),
        'min_child_weight': randint(1, 10),
        'gamma': uniform(0, 0.5),
    }

    model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
    random_search = RandomizedSearchCV(
        model, param_distributions,
        n_iter=n_iter, cv=5, scoring='roc_auc',
        n_jobs=-1, verbose=1, random_state=42, refit=True
    )
    random_search.fit(X_train, y_train)

    print(f"Best params: {random_search.best_params_}")
    print(f"Best CV score: {random_search.best_score_:.4f}")
    return random_search.best_estimator_

ベイズ最適化

ベイズ最適化は過去の評価結果を使用して、次に評価する点をインテリジェントに選択します。

コアコンポーネント:

  1. 代理モデル:目的関数の確率的近似(通常はガウス過程)
  2. 獲得関数:次の評価点を決定
    • EI(Expected Improvement):現在のベストに対する期待改善量
    • UCB(Upper Confidence Bound):探索と活用のバランス
    • PI(Probability of Improvement):現在のベストを改善する確率

TPE(Tree-structured Parzen Estimator):

  • Optunaが使用するデフォルトアルゴリズム
  • p(x|y)の代わりに2つの密度関数l(x)とg(x)をモデル化
  • l(x)は良い結果(上位gamma%)につながったパラメータの分布をモデル化
  • g(x)は残りをモデル化
  • 次の点はl(x)/g(x)の比を最大化するように選択

3. Optuna

コアコンセプト

Preferred NetworksによってPythonネイティブHPOフレームワークとして開発されたOptunaは、シンプルさと柔軟性で知られています。

主要コンセプト:

  • Study:最適化実験全体(Trialsのコレクション)
  • Trial:単一のハイパーパラメータ設定の試み
  • Objective Function:最適化する関数(最小化または最大化)
  • Sampler:パラメータ提案アルゴリズム(TPE、CMA-ES、Randomなど)
  • Pruner:有望でないTrialの早期終了
pip install optuna optuna-dashboard
import optuna
from optuna.samplers import TPESampler, CmaEsSampler, RandomSampler
from optuna.pruners import MedianPruner, HyperbandPruner
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import numpy as np

optuna.logging.set_verbosity(optuna.logging.WARNING)

def objective_lgbm(trial, X, y):
    """LightGBM最適化のためのOptuna目的関数"""
    params = {
        'objective': 'binary',
        'metric': 'auc',
        'verbosity': -1,
        'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
        'num_leaves': trial.suggest_int('num_leaves', 20, 300),
        'max_depth': trial.suggest_int('max_depth', 3, 12),
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
        'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),
        'n_estimators': trial.suggest_int('n_estimators', 100, 2000),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
        'min_split_gain': trial.suggest_float('min_split_gain', 0, 1),
        'n_jobs': -1,
    }

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_scores = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

        train_data = lgb.Dataset(X_train, y_train)
        val_data = lgb.Dataset(X_val, y_val, reference=train_data)

        model = lgb.train(
            params, train_data,
            num_boost_round=params['n_estimators'],
            valid_sets=[val_data],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50, verbose=False),
                lgb.log_evaluation(-1),
            ],
        )

        preds = model.predict(X_val)
        fold_score = roc_auc_score(y_val, preds)
        cv_scores.append(fold_score)

        # プルーニングのための中間結果の報告
        trial.report(fold_score, fold)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return np.mean(cv_scores)

def run_optuna_study(X, y, n_trials=100, n_jobs=1):
    """TPEサンプラーとメジアンプルーニングでOptunaスタディを実行"""
    sampler = TPESampler(
        n_startup_trials=20,
        n_ei_candidates=24,
        multivariate=True,
        seed=42
    )

    pruner = MedianPruner(
        n_startup_trials=5,
        n_warmup_steps=10,
        interval_steps=1
    )

    study = optuna.create_study(
        direction='maximize',
        sampler=sampler,
        pruner=pruner,
        study_name='lgbm_optimization',
        # storage='sqlite:///optuna.db',  # 結果を永続化
        # load_if_exists=True,            # 既存のスタディを再開
    )

    study.optimize(
        lambda trial: objective_lgbm(trial, X, y),
        n_trials=n_trials,
        n_jobs=n_jobs,
        show_progress_bar=True,
    )

    print(f"\nBest params:")
    for key, value in study.best_params.items():
        print(f"  {key}: {value}")
    print(f"Best AUC: {study.best_value:.4f}")
    print(f"Completed trials: {len(study.trials)}")
    pruned = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]
    print(f"Pruned trials: {len(pruned)}")

    return study

def visualize_optuna_study(study):
    """Optuna最適化結果の可視化"""
    import optuna.visualization as vis

    vis.plot_optimization_history(study).show()
    vis.plot_param_importances(study).show()
    vis.plot_parallel_coordinate(study).show()
    vis.plot_slice(study).show()
    vis.plot_contour(study, params=['learning_rate', 'num_leaves']).show()

PyTorch + Optuna完全例

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna

def create_model(trial, input_dim):
    """Optunaトライアルパラメータからニューラルネットワークを動的に構築"""
    n_layers = trial.suggest_int('n_layers', 1, 4)
    dropout = trial.suggest_float('dropout', 0.1, 0.5)
    activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'elu'])

    activation_map = {'relu': nn.ReLU(), 'tanh': nn.Tanh(), 'elu': nn.ELU()}

    layers = []
    in_features = input_dim

    for i in range(n_layers):
        out_features = trial.suggest_int(f'n_units_l{i}', 32, 512)
        layers.extend([
            nn.Linear(in_features, out_features),
            nn.BatchNorm1d(out_features),
            activation_map[activation_name],
            nn.Dropout(dropout),
        ])
        in_features = out_features

    layers.extend([nn.Linear(in_features, 1), nn.Sigmoid()])
    return nn.Sequential(*layers)

def objective_pytorch(trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim):
    """PyTorchニューラルネットワーク用Optuna目的関数"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = create_model(trial, input_dim).to(device)

    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
    weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)

    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    elif optimizer_name == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
    else:
        optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay,
                              momentum=0.9)

    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
    criterion = nn.BCELoss()

    train_dataset = TensorDataset(
        X_train_t.to(device),
        y_train_t.to(device).float().unsqueeze(1)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    best_val_loss = float('inf')
    patience_counter = 0
    patience = 10

    for epoch in range(100):
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            loss = criterion(model(X_batch), y_batch)
            loss.backward()
            optimizer.step()
        scheduler.step()

        model.eval()
        with torch.no_grad():
            val_loss = criterion(
                model(X_val_t.to(device)),
                y_val_t.to(device).float().unsqueeze(1)
            ).item()

        trial.report(val_loss, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                break

    return best_val_loss

def run_pytorch_optuna(X_train, y_train, X_val, y_val, n_trials=50):
    X_train_t = torch.FloatTensor(X_train.values if hasattr(X_train, 'values') else X_train)
    y_train_t = torch.FloatTensor(y_train.values if hasattr(y_train, 'values') else y_train)
    X_val_t = torch.FloatTensor(X_val.values if hasattr(X_val, 'values') else X_val)
    y_val_t = torch.FloatTensor(y_val.values if hasattr(y_val, 'values') else y_val)
    input_dim = X_train_t.shape[1]

    study = optuna.create_study(
        direction='minimize',
        pruner=optuna.pruners.HyperbandPruner(
            min_resource=5, max_resource=100, reduction_factor=3
        ),
        sampler=TPESampler(seed=42)
    )
    study.optimize(
        lambda trial: objective_pytorch(
            trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim
        ),
        n_trials=n_trials,
        show_progress_bar=True,
    )
    print(f"Best val loss: {study.best_value:.4f}")
    print(f"Best params: {study.best_params}")
    return study

CMA-ESサンプラー

# CMA-ESは連続ハイパーパラメータ空間でより効率的
study_cmaes = optuna.create_study(
    direction='maximize',
    sampler=CmaEsSampler(
        n_startup_trials=10,
        restart_strategy='ipop',  # 局所最適からの脱出のための再起動戦略
        seed=42
    )
)

4. Ray Tune

Ray Tuneによる分散HPO

AnyscaleのRay Tuneは、複数のGPUとノード間での並列トレーニングを自動的に処理します。

pip install ray[tune] ray[air]
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining
from ray.tune.search.optuna import OptunaSearch
import torch
import torch.nn as nn
import torch.nn.functional as F

ray.init(ignore_reinit_error=True)

def train_with_tune(config, data=None):
    """Ray Tuneが呼び出すトレーニング関数"""
    X_train, y_train, X_val, y_val = data

    model = nn.Sequential(
        nn.Linear(X_train.shape[1], config['hidden_size']),
        nn.ReLU(),
        nn.Dropout(config['dropout']),
        nn.Linear(config['hidden_size'], config['hidden_size'] // 2),
        nn.ReLU(),
        nn.Linear(config['hidden_size'] // 2, 1),
        nn.Sigmoid()
    )

    optimizer = torch.optim.Adam(
        model.parameters(), lr=config['lr'], weight_decay=config['weight_decay']
    )
    criterion = nn.BCELoss()

    X_train_t = torch.FloatTensor(X_train)
    y_train_t = torch.FloatTensor(y_train).unsqueeze(1)
    X_val_t = torch.FloatTensor(X_val)
    y_val_t = torch.FloatTensor(y_val).unsqueeze(1)

    for epoch in range(config['max_epochs']):
        model.train()
        optimizer.zero_grad()
        loss = criterion(model(X_train_t), y_train_t)
        loss.backward()
        optimizer.step()

        if epoch % 5 == 0:
            model.eval()
            with torch.no_grad():
                val_loss = criterion(model(X_val_t), y_val_t).item()
            tune.report(val_loss=val_loss, training_iteration=epoch)

def run_ray_tune(X_train, y_train, X_val, y_val, num_samples=50):
    """Ray Tuneで分散HPOを実行"""
    config = {
        'hidden_size': tune.choice([64, 128, 256, 512]),
        'dropout': tune.uniform(0.1, 0.5),
        'lr': tune.loguniform(1e-5, 1e-1),
        'weight_decay': tune.loguniform(1e-8, 1e-3),
        'max_epochs': tune.choice([50, 100, 200]),
    }

    # ASHA:非同期逐次分割法
    scheduler = ASHAScheduler(
        metric='val_loss',
        mode='min',
        max_t=200,            # 最大エポック
        grace_period=10,      # プルーニング前の最小エポック
        reduction_factor=3,
    )

    search_alg = OptunaSearch(metric='val_loss', mode='min')

    result = tune.run(
        tune.with_parameters(
            train_with_tune,
            data=(X_train, y_train, X_val, y_val)
        ),
        config=config,
        num_samples=num_samples,
        scheduler=scheduler,
        search_alg=search_alg,
        progress_reporter=CLIReporter(
            metric_columns=['val_loss', 'training_iteration'],
            max_progress_rows=10
        ),
        verbose=1,
        resources_per_trial={'cpu': 2, 'gpu': 0},
    )

    best_trial = result.get_best_trial('val_loss', 'min', 'last')
    print(f"Best val loss: {best_trial.last_result['val_loss']:.4f}")
    print(f"Best config: {best_trial.config}")
    return result

def run_pbt(X_train, y_train, X_val, y_val):
    """Population Based Training:トレーニング中にハイパーパラメータを動的に変化させる"""
    pbt_scheduler = PopulationBasedTraining(
        time_attr='training_iteration',
        metric='val_loss',
        mode='min',
        perturbation_interval=20,
        hyperparam_mutations={
            'lr': tune.loguniform(1e-5, 1e-1),
            'dropout': tune.uniform(0.1, 0.5),
        },
        quantile_fraction=0.25,   # 下位25%を上位25%に置き換える
    )

    result = tune.run(
        tune.with_parameters(
            train_with_tune,
            data=(X_train, y_train, X_val, y_val)
        ),
        config={
            'hidden_size': 256,
            'dropout': tune.uniform(0.1, 0.5),
            'lr': tune.loguniform(1e-4, 1e-1),
            'weight_decay': 1e-5,
            'max_epochs': 200,
        },
        num_samples=8,
        scheduler=pbt_scheduler,
        verbose=1,
    )
    return result

5. AutoGluon

AutoGluon概要

AmazonのAutoGluonは、最小限のコード(時には3行)でKaggleレベルのパフォーマンスを達成します。

pip install autogluon

表形式データ(TabularPredictor)

from autogluon.tabular import TabularPredictor
import pandas as pd

def autogluon_tabular_example(train_df, test_df, target_col, eval_metric='roc_auc'):
    """AutoGluon表形式トレーニング"""
    predictor = TabularPredictor(
        label=target_col,
        eval_metric=eval_metric,
        path='autogluon_models/',
        problem_type='binary',   # 'binary'、'multiclass'、'regression'、'softclass'
    )

    predictor.fit(
        train_data=train_df,
        time_limit=3600,
        presets='best_quality',  # 'best_quality'、'good_quality'、'medium_quality'、
                                 # 'optimize_for_deployment'
        excluded_model_types=['KNN'],
        verbosity=2,
    )

    leaderboard = predictor.leaderboard(test_df, silent=True)
    print(leaderboard[['model', 'score_test', 'score_val', 'pred_time_test']].head(10))

    predictions = predictor.predict(test_df)
    pred_proba = predictor.predict_proba(test_df)
    feature_importance = predictor.feature_importance(test_df)
    print(feature_importance.head(20))

    return predictor, predictions, pred_proba

def autogluon_advanced(train_df, test_df, target_col):
    """カスタムハイパーパラメータを使ったAutoGluon"""
    hyperparameters = {
        'GBM': [
            {'num_boost_round': 300, 'ag_args': {'name_suffix': 'fast'}},
            {'num_boost_round': 1000, 'learning_rate': 0.03,
             'ag_args': {'name_suffix': 'slow', 'priority': 0}},
        ],
        'XGB': [{'n_estimators': 300, 'max_depth': 6}],
        'CAT': [{'iterations': 500, 'depth': 6}],
        'NN_TORCH': [{'num_epochs': 50, 'learning_rate': 1e-3, 'dropout_prob': 0.1}],
        'RF': [{'n_estimators': 300}],
    }

    predictor = TabularPredictor(
        label=target_col, eval_metric='roc_auc', path='autogluon_advanced/'
    )

    predictor.fit(
        train_data=train_df,
        hyperparameters=hyperparameters,
        time_limit=7200,
        num_stack_levels=1,    # スタッキングレベル数
        num_bag_folds=5,       # バギングのCVフォールド数
        num_bag_sets=1,        # バギングセット数
        verbosity=3,
    )
    return predictor

マルチモーダル学習

from autogluon.multimodal import MultiModalPredictor

def autogluon_image_classification(train_df, test_df, label_col):
    """AutoGluon画像分類"""
    predictor = MultiModalPredictor(label=label_col)

    predictor.fit(
        train_data=train_df,
        time_limit=3600,
        hyperparameters={
            'model.timm_image.checkpoint_name': 'efficientnet_b4',
            'optimization.learning_rate': 1e-4,
            'optimization.max_epochs': 20,
        }
    )
    return predictor

def autogluon_multimodal(train_df, test_df, target_col):
    """AutoGluonマルチモーダル:テキスト + 表形式特徴量を組み合わせる"""
    predictor = MultiModalPredictor(label=target_col, problem_type='binary')

    predictor.fit(
        train_data=train_df,
        time_limit=3600,
        hyperparameters={
            'model.hf_text.checkpoint_name': 'bert-base-uncased',
        }
    )
    return predictor

6. FLAML

Microsoft FLAML

Microsoft ResearchのFLAML(Fast and Lightweight AutoML)はコスト効率の高い自動化に特化しています。

pip install flaml
from flaml import AutoML
import pandas as pd
import numpy as np

def flaml_basic_example(X_train, y_train, X_test, task='classification'):
    """FLAMLの基本的な使用法"""
    automl = AutoML()

    automl_settings = {
        'time_budget': 300,
        'metric': 'roc_auc',
        'task': task,          # 'classification'、'regression'、'ranking'
        'estimator_list': [
            'lgbm', 'xgboost', 'catboost',
            'rf', 'extra_tree', 'lrl1', 'lrl2', 'kneighbor'
        ],
        'log_file_name': 'flaml_log.log',
        'seed': 42,
        'n_jobs': -1,
        'verbose': 1,
        'retrain_full': True,  # 最終モデルを全データで再トレーニング
        'max_iter': 100,
        'ensemble': True,
        'eval_method': 'cv',
        'n_splits': 5,
    }

    automl.fit(X_train, y_train, **automl_settings)

    print(f"Best estimator: {automl.best_estimator}")
    print(f"Best loss: {automl.best_loss:.4f}")
    print(f"Best config: {automl.best_config}")
    print(f"Time to find best model: {automl.time_to_find_best_model:.1f}s")

    predictions = automl.predict(X_test)
    pred_proba = automl.predict_proba(X_test)
    return automl, predictions, pred_proba

def flaml_sklearn_pipeline(X_train, y_train, X_test):
    """FLAMLをscikit-learnパイプラインに統合"""
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler

    automl = AutoML()
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('automl', automl),
    ])

    pipeline.fit(
        X_train, y_train,
        automl__time_budget=120,
        automl__metric='roc_auc',
        automl__task='classification',
    )
    return pipeline

def flaml_custom_objective(X_train, y_train):
    """カスタム評価指標を使ったFLAML"""
    def custom_metric(
        X_val, y_val, estimator, labels, X_train, y_train,
        weight_val=None, weight_train=None, *args
    ):
        """F-betaスコアを最適化"""
        from sklearn.metrics import fbeta_score
        y_pred = estimator.predict(X_val)
        score = fbeta_score(y_val, y_pred, beta=2, average='weighted')
        return -score, {'f2_score': score}  # (loss、metrics_dict)

    automl = AutoML()
    automl.fit(
        X_train, y_train,
        metric=custom_metric,
        task='classification',
        time_budget=120,
    )
    return automl

7. H2O AutoML

H2Oクラスター

H2O AutoMLは幅広い解釈可能性ツールを備えたエンタープライズグレードのAutoMLプラットフォームです。

pip install h2o
import h2o
from h2o.automl import H2OAutoML
import pandas as pd

def h2o_automl_example(train_df, test_df, target_col, max_models=20):
    """H2O AutoMLのエンドツーエンド例"""

    h2o.init(nthreads=-1, max_mem_size='8G', port=54321)

    train_h2o = h2o.H2OFrame(train_df)
    test_h2o = h2o.H2OFrame(test_df)

    # 分類のためにターゲットをfactorとしてマーク
    train_h2o[target_col] = train_h2o[target_col].asfactor()

    feature_cols = [col for col in train_df.columns if col != target_col]

    aml = H2OAutoML(
        max_models=max_models,
        max_runtime_secs=3600,
        seed=42,
        sort_metric='AUC',
        balance_classes=False,
        include_algos=[
            'GBM', 'GLM', 'DRF', 'DeepLearning',
            'StackedEnsemble', 'XGBoost'
        ],
        keep_cross_validation_predictions=True,
        keep_cross_validation_models=True,
        nfolds=5,
        verbosity='info',
    )

    aml.fit(
        x=feature_cols, y=target_col,
        training_frame=train_h2o,
        leaderboard_frame=test_h2o,
    )

    lb = aml.leaderboard
    print("H2O AutoML Leaderboard:")
    print(lb.head(20))

    best_model = aml.leader
    print(f"\nBest model: {best_model.model_id}")

    predictions = best_model.predict(test_h2o).as_data_frame()

    # モデルの保存
    model_path = h2o.save_model(model=best_model, path='h2o_models/', force=True)
    print(f"Model saved to: {model_path}")

    return aml, best_model, predictions

def cleanup_h2o():
    h2o.cluster().shutdown()

8. ニューラルアーキテクチャ探索(NAS)

NAS概要

ニューラルアーキテクチャ探索(NAS)は最適なニューラルネットワークアーキテクチャを自動的に見つけます。

NASの3つのコンポーネント:

  1. 探索空間:可能なアーキテクチャのセット
  2. 探索戦略:空間の探索方法(ランダム、進化的、RL、勾配ベース)
  3. 性能評価:候補アーキテクチャの評価方法

DARTSは(Liu et al., 2019)は離散的な選択の連続緩和によってアーキテクチャ探索を微分可能にします。

import torch
import torch.nn as nn
import torch.nn.functional as F

class MixedOperation(nn.Module):
    """DARTS混合演算:候補演算の重み付き和"""
    def __init__(self, operations):
        super().__init__()
        self.ops = nn.ModuleList(operations)
        self.alphas = nn.Parameter(torch.randn(len(operations)))

    def forward(self, x):
        weights = F.softmax(self.alphas, dim=0)
        return sum(w * op(x) for w, op in zip(weights, self.ops))

class DARTSCell(nn.Module):
    """単一のDARTSセル"""
    def __init__(self, in_channels, out_channels):
        super().__init__()
        operations = [
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.Conv2d(in_channels, out_channels, 5, padding=2),
            nn.MaxPool2d(3, stride=1, padding=1),
            nn.AvgPool2d(3, stride=1, padding=1),
            nn.Identity() if in_channels == out_channels
                else nn.Conv2d(in_channels, out_channels, 1),
        ]
        self.mixed_op = MixedOperation(operations)
        self.bn = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        return F.relu(self.bn(self.mixed_op(x)))

class SimpleDARTS(nn.Module):
    """簡略化されたDARTSネットワーク"""
    def __init__(self, num_classes=10, num_cells=6):
        super().__init__()
        self.stem = nn.Conv2d(3, 64, 3, padding=1)
        self.cells = nn.ModuleList([DARTSCell(64, 64) for _ in range(num_cells)])
        self.classifier = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.stem(x)
        for cell in self.cells:
            x = cell(x)
        x = x.mean([2, 3])  # グローバル平均プーリング
        return self.classifier(x)

    def arch_parameters(self):
        return [p for n, p in self.named_parameters() if 'alphas' in n]

    def model_parameters(self):
        return [p for n, p in self.named_parameters() if 'alphas' not in n]

def train_darts(model, train_loader, val_loader, epochs=50):
    """DARTSの二値最適化"""
    w_optimizer = torch.optim.SGD(
        model.model_parameters(), lr=0.025, momentum=0.9, weight_decay=3e-4
    )
    a_optimizer = torch.optim.Adam(
        model.arch_parameters(), lr=3e-4, betas=(0.5, 0.999), weight_decay=1e-3
    )
    w_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(w_optimizer, T_max=epochs)

    for epoch in range(epochs):
        model.train()
        train_iter = iter(train_loader)
        val_iter = iter(val_loader)

        for step in range(min(len(train_loader), len(val_loader))):
            # ステップ1:検証データを使ってアーキテクチャパラメータを更新
            try:
                X_val, y_val = next(val_iter)
            except StopIteration:
                val_iter = iter(val_loader)
                X_val, y_val = next(val_iter)

            a_optimizer.zero_grad()
            val_loss = F.cross_entropy(model(X_val), y_val)
            val_loss.backward()
            a_optimizer.step()

            # ステップ2:トレーニングデータを使って重みパラメータを更新
            X_train, y_train = next(train_iter)
            w_optimizer.zero_grad()
            train_loss = F.cross_entropy(model(X_train), y_train)
            train_loss.backward()
            nn.utils.clip_grad_norm_(model.model_parameters(), 5.0)
            w_optimizer.step()

        w_scheduler.step()

        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}")

    # 発見されたアーキテクチャの抽出
    for i, cell in enumerate(model.cells):
        weights = F.softmax(cell.mixed_op.alphas, dim=0).detach()
        best_op = weights.argmax().item()
        print(f"Cell {i}: Best op index = {best_op}, weights = {weights.numpy()}")

    return model

ワンショットNAS

class SuperNetwork(nn.Module):
    """ワンショットNAS:単一のスーパーネットワークからサブネットワークをサンプリング"""

    def __init__(self, num_classes=10, max_channels=256):
        super().__init__()
        self.max_channels = max_channels
        self.channel_options = [64, 128, 256]

        self.conv1 = nn.Conv2d(3, max_channels, 3, padding=1)
        self.conv2 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
        self.conv3 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(max_channels)
        self.bn2 = nn.BatchNorm2d(max_channels)
        self.bn3 = nn.BatchNorm2d(max_channels)
        self.classifier = nn.Linear(max_channels, num_classes)

    def forward(self, x, arch_config=None):
        if arch_config is None:
            arch_config = {
                'conv1_out': torch.randint(0, len(self.channel_options), (1,)).item(),
                'conv2_out': torch.randint(0, len(self.channel_options), (1,)).item(),
            }

        c1 = self.channel_options[arch_config['conv1_out']]
        c2 = self.channel_options[arch_config['conv2_out']]

        x = F.relu(self.bn1(self.conv1(x)[:, :c1]))
        x = F.relu(self.bn2(self.conv2(
            F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c1))
        )[:, :c2]))
        x = F.relu(self.bn3(self.conv3(
            F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c2))
        )))
        x = x.mean([2, 3])
        return self.classifier(x)

9. パイプライン自動化

Auto-sklearn

pip install auto-sklearn
import autosklearn.classification
import autosklearn.regression
from autosklearn.metrics import roc_auc, mean_squared_error

def auto_sklearn_example(X_train, y_train, X_test, task='classification'):
    """Auto-sklearn:scikit-learn互換AutoML"""

    if task == 'classification':
        automl = autosklearn.classification.AutoSklearnClassifier(
            time_left_for_this_task=3600,
            per_run_time_limit=360,
            n_jobs=-1,
            memory_limit=8192,
            ensemble_size=50,
            ensemble_nbest=50,
            max_models_on_disc=50,
            include={
                'classifier': [
                    'random_forest', 'gradient_boosting',
                    'extra_trees', 'liblinear_svc'
                ]
            },
            metric=roc_auc,
            resampling_strategy='cv',
            resampling_strategy_arguments={'folds': 5},
            seed=42,
        )
    else:
        automl = autosklearn.regression.AutoSklearnRegressor(
            time_left_for_this_task=3600,
            per_run_time_limit=360,
            n_jobs=-1,
            metric=mean_squared_error,
            seed=42,
        )

    automl.fit(X_train, y_train)

    print(automl.sprint_statistics())
    print(automl.leaderboard())

    predictions = automl.predict(X_test)
    return automl, predictions

10. LLM時代のAutoML

AutoMLへのLLMの活用

大規模言語モデル(LLM)はAutoMLに新しい可能性を開いています:

  1. ハイパーパラメータ提案:LLMがデータセットの特性に基づいて初期設定を推薦
  2. 特徴量エンジニアリング:LLMがドメイン知識を使用して新しい特徴量のアイデアを提案
  3. コード生成:前処理とトレーニングコードを自動生成
  4. エラーデバッグ:トレーニングの失敗を診断し、解決策を提案
# LLMガイドのハイパーパラメータ最適化(概念的コード)
from openai import OpenAI

def llm_hyperparameter_suggestion(dataset_description, model_type, previous_results=None):
    """LLMを使ってハイパーパラメータを提案"""
    client = OpenAI()

    prompt = f"""
    Dataset characteristics:
    {dataset_description}

    Model type: {model_type}

    Previous results:
    {previous_results if previous_results else 'None (first attempt)'}

    Based on this information, suggest optimal hyperparameters for {model_type} in JSON format.
    """

    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system",
             "content": "You are a machine learning expert. Help optimize hyperparameters."},
            {"role": "user", "content": prompt}
        ],
        response_format={"type": "json_object"}
    )
    return response.choices[0].message.content

# AutoMLエージェント(実験的)
class AutoMLAgent:
    """LLMガイドのAutoMLエージェント"""

    def __init__(self, llm_client, X_train, y_train, X_val, y_val, max_iterations=10):
        self.client = llm_client
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.max_iterations = max_iterations
        self.history = []
        self.best_score = 0
        self.best_params = None

    def get_next_config(self):
        """LLMに次の設定を尋ねる"""
        history_str = "\n".join([
            f"Iteration {i+1}: params={h['params']}, score={h['score']:.4f}"
            for i, h in enumerate(self.history[-5:])
        ])

        prompt = f"""
        LightGBM parameter attempts so far:
        {history_str if history_str else 'None (first attempt)'}

        Suggest the next parameter combination to try in JSON format.
        Valid ranges: num_leaves(10-300), learning_rate(0.001-0.3),
        n_estimators(100-2000), subsample(0.5-1.0), colsample_bytree(0.5-1.0)
        """

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are an HPO expert."},
                {"role": "user", "content": prompt}
            ],
            response_format={"type": "json_object"}
        )

        import json
        return json.loads(response.choices[0].message.content)

    def evaluate(self, params):
        """パラメータ設定を評価"""
        import lightgbm as lgb
        from sklearn.metrics import roc_auc_score

        model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
        model.fit(self.X_train, self.y_train)
        preds = model.predict_proba(self.X_val)[:, 1]
        return roc_auc_score(self.y_val, preds)

    def run(self):
        """AutoMLエージェントループを実行"""
        for i in range(self.max_iterations):
            config = self.get_next_config()
            score = self.evaluate(config)
            self.history.append({'params': config, 'score': score})

            if score > self.best_score:
                self.best_score = score
                self.best_params = config
                print(f"Iteration {i+1}: New best score {score:.4f}")

        print(f"\nBest score: {self.best_score:.4f}")
        print(f"Best params: {self.best_params}")
        return self.best_params

まとめ

このガイドでAutoMLエコシステム全体を網羅しました:

  1. ハイパーパラメータ最適化:グリッド探索からベイズ最適化まで、体系的な直感を構築
  2. Optuna:最も柔軟なPythonネイティブHPOフレームワーク、プルーニングと可視化
  3. Ray Tune:複数のGPUとノード間での大規模分散HPO
  4. AutoGluon:Amazonの強力なマルチモーダルAutoML(表形式、画像、テキスト)
  5. FLAML:MicrosoftのコスコートEfficient AutoML、最小限のオーバーヘッド
  6. H2O AutoML:解釈可能性ツールを備えたエンタープライズグレードのAutoML
  7. NAS:DARTSとワンショット手法による最適なニューラルアーキテクチャの自動設計
  8. LLM + AutoML:インテリジェントで言語ガイドの自動化の次のフロンティア

主な推奨事項:

  • 時間制約がある場合:FLAMLまたはAutoGluonをgood_qualityプリセットで使用
  • 特定モデルのチューニング:Optunaを使用
  • 大規模または分散実験:Ray Tuneを使用
  • エンタープライズ環境:解釈可能性ツールのためにH2O AutoMLを活用
  • LLMベースのAutoMLはまだ研究段階ですが、注目に値します

AutoMLはツールであり、魔法ではありません。ドメイン知識、データ品質、正しい評価フレームワークが成功の最も重要な要素であり続けます。

参考文献