- Authors

- Name
- Youngju Kim
- @fjvbn20031
1. AutoML概要
AutoMLとは?
AutoML(Automated Machine Learning)は機械学習パイプラインのさまざまな段階を自動化します。データサイエンティストがかつて手動で行っていた作業、データの前処理、特徴量エンジニアリング、モデル選択、ハイパーパラメータ最適化、アンサンブルなどをアルゴリズムが自動的に処理します。
AutoMLが自動化すること:
-
データ前処理の自動化
- 欠損値補完戦略の選択
- スケーリングと正規化手法の選択
- 外れ値処理
-
特徴量エンジニアリングの自動化
- 特徴量変換(対数、二乗、相互作用)
- カテゴリエンコーディング手法の選択
- 特徴量選択と生成
-
モデル選択(アルゴリズム選択)
- 多様なアルゴリズムの探索
- メタ学習(過去のタスクからの経験を活用)
-
ハイパーパラメータ最適化(HPO)
- グリッド/ランダム探索
- ベイズ最適化
- 進化的アルゴリズム
-
アンサンブルの自動化
- 最適なアンサンブル設定の探索
- 自動スタッキングとブレンディング
-
ニューラルアーキテクチャ探索(NAS)
- 最適なニューラルネットワークアーキテクチャの自動設計
AutoMLの応用分野
産業への応用:
- 金融:信用リスクモデル、自動不正検出
- 医療:診断支援システムの迅速なプロトタイピング
- 小売:自動需要予測モデルの更新
- 製造:品質管理モデルの自動化
主要なオープンソースAutoMLツール:
| ツール | 開発者 | 強み |
|---|---|---|
| AutoGluon | Amazon | マルチモーダル、表形式、画像、テキスト |
| FLAML | Microsoft | コスト効率、高速 |
| Optuna | Preferred Networks | HPO、可視化 |
| H2O AutoML | H2O.ai | エンタープライズ、解釈可能性 |
| Auto-sklearn | AutoML Group | scikit-learn互換 |
| Ray Tune | Anyscale | 分散HPO |
| NNI | Microsoft | NAS、HPO |
AutoMLの長所と短所
長所:
- 非専門家でも高品質なモデルを構築できる
- 繰り返し実験の自動化により時間を節約
- 人間が見逃すかもしれないハイパーパラメータの組み合わせを発見
- 再現可能なパイプラインを提供
短所:
- 計算コストが非常に高くなる可能性がある
- ドメイン知識を組み込む能力が限られている
- ブラックボックスの性質(内部動作が理解しにくい)
- 特化した問題ではカスタムソリューションがより効果的
- データ漏洩のリスク
2. ハイパーパラメータ最適化(HPO)
グリッド探索
最も単純なHPO手法 — 探索空間のすべての組み合わせを網羅的に試みます。
from sklearn.model_selection import GridSearchCV
import xgboost as xgb
def grid_search_example(X_train, y_train):
"""グリッド探索:網羅的(小さな探索空間でのみ実用的)"""
param_grid = {
'max_depth': [3, 5, 7],
'learning_rate': [0.01, 0.1, 0.3],
'n_estimators': [100, 300, 500],
'subsample': [0.7, 0.9],
}
# 組み合わせ総数: 3 * 3 * 3 * 2 = 54 * CVフォールド
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
grid_search = GridSearchCV(
model, param_grid,
cv=5, scoring='roc_auc', n_jobs=-1, verbose=1, refit=True
)
grid_search.fit(X_train, y_train)
print(f"Best params: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
results = pd.DataFrame(grid_search.cv_results_)
print(results.sort_values('mean_test_score', ascending=False)[
['params', 'mean_test_score', 'std_test_score']
].head(10))
return grid_search.best_estimator_
ランダム探索
Bergstra & Bengio(2012)が提案 — パラメータ分布からランダムにサンプリングし、グリッド探索よりもはるかに効率的な場合が多いです。
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint, loguniform
def random_search_example(X_train, y_train, n_iter=100):
"""ランダム探索:連続分布からサンプリング"""
param_distributions = {
'max_depth': randint(3, 10),
'learning_rate': loguniform(1e-3, 0.5), # [0.001, 0.5]の対数一様分布
'n_estimators': randint(100, 1000),
'subsample': uniform(0.6, 0.4), # [0.6, 1.0]の一様分布
'colsample_bytree': uniform(0.6, 0.4),
'reg_alpha': loguniform(1e-4, 10),
'reg_lambda': loguniform(1e-4, 10),
'min_child_weight': randint(1, 10),
'gamma': uniform(0, 0.5),
}
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
random_search = RandomizedSearchCV(
model, param_distributions,
n_iter=n_iter, cv=5, scoring='roc_auc',
n_jobs=-1, verbose=1, random_state=42, refit=True
)
random_search.fit(X_train, y_train)
print(f"Best params: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")
return random_search.best_estimator_
ベイズ最適化
ベイズ最適化は過去の評価結果を使用して、次に評価する点をインテリジェントに選択します。
コアコンポーネント:
- 代理モデル:目的関数の確率的近似(通常はガウス過程)
- 獲得関数:次の評価点を決定
- EI(Expected Improvement):現在のベストに対する期待改善量
- UCB(Upper Confidence Bound):探索と活用のバランス
- PI(Probability of Improvement):現在のベストを改善する確率
TPE(Tree-structured Parzen Estimator):
- Optunaが使用するデフォルトアルゴリズム
- p(x|y)の代わりに2つの密度関数l(x)とg(x)をモデル化
- l(x)は良い結果(上位gamma%)につながったパラメータの分布をモデル化
- g(x)は残りをモデル化
- 次の点はl(x)/g(x)の比を最大化するように選択
3. Optuna
コアコンセプト
Preferred NetworksによってPythonネイティブHPOフレームワークとして開発されたOptunaは、シンプルさと柔軟性で知られています。
主要コンセプト:
- Study:最適化実験全体(Trialsのコレクション)
- Trial:単一のハイパーパラメータ設定の試み
- Objective Function:最適化する関数(最小化または最大化)
- Sampler:パラメータ提案アルゴリズム(TPE、CMA-ES、Randomなど)
- Pruner:有望でないTrialの早期終了
pip install optuna optuna-dashboard
import optuna
from optuna.samplers import TPESampler, CmaEsSampler, RandomSampler
from optuna.pruners import MedianPruner, HyperbandPruner
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import numpy as np
optuna.logging.set_verbosity(optuna.logging.WARNING)
def objective_lgbm(trial, X, y):
"""LightGBM最適化のためのOptuna目的関数"""
params = {
'objective': 'binary',
'metric': 'auc',
'verbosity': -1,
'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
'num_leaves': trial.suggest_int('num_leaves', 20, 300),
'max_depth': trial.suggest_int('max_depth', 3, 12),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),
'n_estimators': trial.suggest_int('n_estimators', 100, 2000),
'subsample': trial.suggest_float('subsample', 0.5, 1.0),
'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
'min_split_gain': trial.suggest_float('min_split_gain', 0, 1),
'n_jobs': -1,
}
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
train_data = lgb.Dataset(X_train, y_train)
val_data = lgb.Dataset(X_val, y_val, reference=train_data)
model = lgb.train(
params, train_data,
num_boost_round=params['n_estimators'],
valid_sets=[val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(-1),
],
)
preds = model.predict(X_val)
fold_score = roc_auc_score(y_val, preds)
cv_scores.append(fold_score)
# プルーニングのための中間結果の報告
trial.report(fold_score, fold)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
return np.mean(cv_scores)
def run_optuna_study(X, y, n_trials=100, n_jobs=1):
"""TPEサンプラーとメジアンプルーニングでOptunaスタディを実行"""
sampler = TPESampler(
n_startup_trials=20,
n_ei_candidates=24,
multivariate=True,
seed=42
)
pruner = MedianPruner(
n_startup_trials=5,
n_warmup_steps=10,
interval_steps=1
)
study = optuna.create_study(
direction='maximize',
sampler=sampler,
pruner=pruner,
study_name='lgbm_optimization',
# storage='sqlite:///optuna.db', # 結果を永続化
# load_if_exists=True, # 既存のスタディを再開
)
study.optimize(
lambda trial: objective_lgbm(trial, X, y),
n_trials=n_trials,
n_jobs=n_jobs,
show_progress_bar=True,
)
print(f"\nBest params:")
for key, value in study.best_params.items():
print(f" {key}: {value}")
print(f"Best AUC: {study.best_value:.4f}")
print(f"Completed trials: {len(study.trials)}")
pruned = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]
print(f"Pruned trials: {len(pruned)}")
return study
def visualize_optuna_study(study):
"""Optuna最適化結果の可視化"""
import optuna.visualization as vis
vis.plot_optimization_history(study).show()
vis.plot_param_importances(study).show()
vis.plot_parallel_coordinate(study).show()
vis.plot_slice(study).show()
vis.plot_contour(study, params=['learning_rate', 'num_leaves']).show()
PyTorch + Optuna完全例
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
def create_model(trial, input_dim):
"""Optunaトライアルパラメータからニューラルネットワークを動的に構築"""
n_layers = trial.suggest_int('n_layers', 1, 4)
dropout = trial.suggest_float('dropout', 0.1, 0.5)
activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'elu'])
activation_map = {'relu': nn.ReLU(), 'tanh': nn.Tanh(), 'elu': nn.ELU()}
layers = []
in_features = input_dim
for i in range(n_layers):
out_features = trial.suggest_int(f'n_units_l{i}', 32, 512)
layers.extend([
nn.Linear(in_features, out_features),
nn.BatchNorm1d(out_features),
activation_map[activation_name],
nn.Dropout(dropout),
])
in_features = out_features
layers.extend([nn.Linear(in_features, 1), nn.Sigmoid()])
return nn.Sequential(*layers)
def objective_pytorch(trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim):
"""PyTorchニューラルネットワーク用Optuna目的関数"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = create_model(trial, input_dim).to(device)
lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)
if optimizer_name == 'Adam':
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
elif optimizer_name == 'RMSprop':
optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
else:
optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay,
momentum=0.9)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
criterion = nn.BCELoss()
train_dataset = TensorDataset(
X_train_t.to(device),
y_train_t.to(device).float().unsqueeze(1)
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
best_val_loss = float('inf')
patience_counter = 0
patience = 10
for epoch in range(100):
model.train()
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
loss = criterion(model(X_batch), y_batch)
loss.backward()
optimizer.step()
scheduler.step()
model.eval()
with torch.no_grad():
val_loss = criterion(
model(X_val_t.to(device)),
y_val_t.to(device).float().unsqueeze(1)
).item()
trial.report(val_loss, epoch)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
break
return best_val_loss
def run_pytorch_optuna(X_train, y_train, X_val, y_val, n_trials=50):
X_train_t = torch.FloatTensor(X_train.values if hasattr(X_train, 'values') else X_train)
y_train_t = torch.FloatTensor(y_train.values if hasattr(y_train, 'values') else y_train)
X_val_t = torch.FloatTensor(X_val.values if hasattr(X_val, 'values') else X_val)
y_val_t = torch.FloatTensor(y_val.values if hasattr(y_val, 'values') else y_val)
input_dim = X_train_t.shape[1]
study = optuna.create_study(
direction='minimize',
pruner=optuna.pruners.HyperbandPruner(
min_resource=5, max_resource=100, reduction_factor=3
),
sampler=TPESampler(seed=42)
)
study.optimize(
lambda trial: objective_pytorch(
trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim
),
n_trials=n_trials,
show_progress_bar=True,
)
print(f"Best val loss: {study.best_value:.4f}")
print(f"Best params: {study.best_params}")
return study
CMA-ESサンプラー
# CMA-ESは連続ハイパーパラメータ空間でより効率的
study_cmaes = optuna.create_study(
direction='maximize',
sampler=CmaEsSampler(
n_startup_trials=10,
restart_strategy='ipop', # 局所最適からの脱出のための再起動戦略
seed=42
)
)
4. Ray Tune
Ray Tuneによる分散HPO
AnyscaleのRay Tuneは、複数のGPUとノード間での並列トレーニングを自動的に処理します。
pip install ray[tune] ray[air]
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining
from ray.tune.search.optuna import OptunaSearch
import torch
import torch.nn as nn
import torch.nn.functional as F
ray.init(ignore_reinit_error=True)
def train_with_tune(config, data=None):
"""Ray Tuneが呼び出すトレーニング関数"""
X_train, y_train, X_val, y_val = data
model = nn.Sequential(
nn.Linear(X_train.shape[1], config['hidden_size']),
nn.ReLU(),
nn.Dropout(config['dropout']),
nn.Linear(config['hidden_size'], config['hidden_size'] // 2),
nn.ReLU(),
nn.Linear(config['hidden_size'] // 2, 1),
nn.Sigmoid()
)
optimizer = torch.optim.Adam(
model.parameters(), lr=config['lr'], weight_decay=config['weight_decay']
)
criterion = nn.BCELoss()
X_train_t = torch.FloatTensor(X_train)
y_train_t = torch.FloatTensor(y_train).unsqueeze(1)
X_val_t = torch.FloatTensor(X_val)
y_val_t = torch.FloatTensor(y_val).unsqueeze(1)
for epoch in range(config['max_epochs']):
model.train()
optimizer.zero_grad()
loss = criterion(model(X_train_t), y_train_t)
loss.backward()
optimizer.step()
if epoch % 5 == 0:
model.eval()
with torch.no_grad():
val_loss = criterion(model(X_val_t), y_val_t).item()
tune.report(val_loss=val_loss, training_iteration=epoch)
def run_ray_tune(X_train, y_train, X_val, y_val, num_samples=50):
"""Ray Tuneで分散HPOを実行"""
config = {
'hidden_size': tune.choice([64, 128, 256, 512]),
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-5, 1e-1),
'weight_decay': tune.loguniform(1e-8, 1e-3),
'max_epochs': tune.choice([50, 100, 200]),
}
# ASHA:非同期逐次分割法
scheduler = ASHAScheduler(
metric='val_loss',
mode='min',
max_t=200, # 最大エポック
grace_period=10, # プルーニング前の最小エポック
reduction_factor=3,
)
search_alg = OptunaSearch(metric='val_loss', mode='min')
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config=config,
num_samples=num_samples,
scheduler=scheduler,
search_alg=search_alg,
progress_reporter=CLIReporter(
metric_columns=['val_loss', 'training_iteration'],
max_progress_rows=10
),
verbose=1,
resources_per_trial={'cpu': 2, 'gpu': 0},
)
best_trial = result.get_best_trial('val_loss', 'min', 'last')
print(f"Best val loss: {best_trial.last_result['val_loss']:.4f}")
print(f"Best config: {best_trial.config}")
return result
def run_pbt(X_train, y_train, X_val, y_val):
"""Population Based Training:トレーニング中にハイパーパラメータを動的に変化させる"""
pbt_scheduler = PopulationBasedTraining(
time_attr='training_iteration',
metric='val_loss',
mode='min',
perturbation_interval=20,
hyperparam_mutations={
'lr': tune.loguniform(1e-5, 1e-1),
'dropout': tune.uniform(0.1, 0.5),
},
quantile_fraction=0.25, # 下位25%を上位25%に置き換える
)
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config={
'hidden_size': 256,
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-4, 1e-1),
'weight_decay': 1e-5,
'max_epochs': 200,
},
num_samples=8,
scheduler=pbt_scheduler,
verbose=1,
)
return result
5. AutoGluon
AutoGluon概要
AmazonのAutoGluonは、最小限のコード(時には3行)でKaggleレベルのパフォーマンスを達成します。
pip install autogluon
表形式データ(TabularPredictor)
from autogluon.tabular import TabularPredictor
import pandas as pd
def autogluon_tabular_example(train_df, test_df, target_col, eval_metric='roc_auc'):
"""AutoGluon表形式トレーニング"""
predictor = TabularPredictor(
label=target_col,
eval_metric=eval_metric,
path='autogluon_models/',
problem_type='binary', # 'binary'、'multiclass'、'regression'、'softclass'
)
predictor.fit(
train_data=train_df,
time_limit=3600,
presets='best_quality', # 'best_quality'、'good_quality'、'medium_quality'、
# 'optimize_for_deployment'
excluded_model_types=['KNN'],
verbosity=2,
)
leaderboard = predictor.leaderboard(test_df, silent=True)
print(leaderboard[['model', 'score_test', 'score_val', 'pred_time_test']].head(10))
predictions = predictor.predict(test_df)
pred_proba = predictor.predict_proba(test_df)
feature_importance = predictor.feature_importance(test_df)
print(feature_importance.head(20))
return predictor, predictions, pred_proba
def autogluon_advanced(train_df, test_df, target_col):
"""カスタムハイパーパラメータを使ったAutoGluon"""
hyperparameters = {
'GBM': [
{'num_boost_round': 300, 'ag_args': {'name_suffix': 'fast'}},
{'num_boost_round': 1000, 'learning_rate': 0.03,
'ag_args': {'name_suffix': 'slow', 'priority': 0}},
],
'XGB': [{'n_estimators': 300, 'max_depth': 6}],
'CAT': [{'iterations': 500, 'depth': 6}],
'NN_TORCH': [{'num_epochs': 50, 'learning_rate': 1e-3, 'dropout_prob': 0.1}],
'RF': [{'n_estimators': 300}],
}
predictor = TabularPredictor(
label=target_col, eval_metric='roc_auc', path='autogluon_advanced/'
)
predictor.fit(
train_data=train_df,
hyperparameters=hyperparameters,
time_limit=7200,
num_stack_levels=1, # スタッキングレベル数
num_bag_folds=5, # バギングのCVフォールド数
num_bag_sets=1, # バギングセット数
verbosity=3,
)
return predictor
マルチモーダル学習
from autogluon.multimodal import MultiModalPredictor
def autogluon_image_classification(train_df, test_df, label_col):
"""AutoGluon画像分類"""
predictor = MultiModalPredictor(label=label_col)
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.timm_image.checkpoint_name': 'efficientnet_b4',
'optimization.learning_rate': 1e-4,
'optimization.max_epochs': 20,
}
)
return predictor
def autogluon_multimodal(train_df, test_df, target_col):
"""AutoGluonマルチモーダル:テキスト + 表形式特徴量を組み合わせる"""
predictor = MultiModalPredictor(label=target_col, problem_type='binary')
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.hf_text.checkpoint_name': 'bert-base-uncased',
}
)
return predictor
6. FLAML
Microsoft FLAML
Microsoft ResearchのFLAML(Fast and Lightweight AutoML)はコスト効率の高い自動化に特化しています。
pip install flaml
from flaml import AutoML
import pandas as pd
import numpy as np
def flaml_basic_example(X_train, y_train, X_test, task='classification'):
"""FLAMLの基本的な使用法"""
automl = AutoML()
automl_settings = {
'time_budget': 300,
'metric': 'roc_auc',
'task': task, # 'classification'、'regression'、'ranking'
'estimator_list': [
'lgbm', 'xgboost', 'catboost',
'rf', 'extra_tree', 'lrl1', 'lrl2', 'kneighbor'
],
'log_file_name': 'flaml_log.log',
'seed': 42,
'n_jobs': -1,
'verbose': 1,
'retrain_full': True, # 最終モデルを全データで再トレーニング
'max_iter': 100,
'ensemble': True,
'eval_method': 'cv',
'n_splits': 5,
}
automl.fit(X_train, y_train, **automl_settings)
print(f"Best estimator: {automl.best_estimator}")
print(f"Best loss: {automl.best_loss:.4f}")
print(f"Best config: {automl.best_config}")
print(f"Time to find best model: {automl.time_to_find_best_model:.1f}s")
predictions = automl.predict(X_test)
pred_proba = automl.predict_proba(X_test)
return automl, predictions, pred_proba
def flaml_sklearn_pipeline(X_train, y_train, X_test):
"""FLAMLをscikit-learnパイプラインに統合"""
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
automl = AutoML()
pipeline = Pipeline([
('scaler', StandardScaler()),
('automl', automl),
])
pipeline.fit(
X_train, y_train,
automl__time_budget=120,
automl__metric='roc_auc',
automl__task='classification',
)
return pipeline
def flaml_custom_objective(X_train, y_train):
"""カスタム評価指標を使ったFLAML"""
def custom_metric(
X_val, y_val, estimator, labels, X_train, y_train,
weight_val=None, weight_train=None, *args
):
"""F-betaスコアを最適化"""
from sklearn.metrics import fbeta_score
y_pred = estimator.predict(X_val)
score = fbeta_score(y_val, y_pred, beta=2, average='weighted')
return -score, {'f2_score': score} # (loss、metrics_dict)
automl = AutoML()
automl.fit(
X_train, y_train,
metric=custom_metric,
task='classification',
time_budget=120,
)
return automl
7. H2O AutoML
H2Oクラスター
H2O AutoMLは幅広い解釈可能性ツールを備えたエンタープライズグレードのAutoMLプラットフォームです。
pip install h2o
import h2o
from h2o.automl import H2OAutoML
import pandas as pd
def h2o_automl_example(train_df, test_df, target_col, max_models=20):
"""H2O AutoMLのエンドツーエンド例"""
h2o.init(nthreads=-1, max_mem_size='8G', port=54321)
train_h2o = h2o.H2OFrame(train_df)
test_h2o = h2o.H2OFrame(test_df)
# 分類のためにターゲットをfactorとしてマーク
train_h2o[target_col] = train_h2o[target_col].asfactor()
feature_cols = [col for col in train_df.columns if col != target_col]
aml = H2OAutoML(
max_models=max_models,
max_runtime_secs=3600,
seed=42,
sort_metric='AUC',
balance_classes=False,
include_algos=[
'GBM', 'GLM', 'DRF', 'DeepLearning',
'StackedEnsemble', 'XGBoost'
],
keep_cross_validation_predictions=True,
keep_cross_validation_models=True,
nfolds=5,
verbosity='info',
)
aml.fit(
x=feature_cols, y=target_col,
training_frame=train_h2o,
leaderboard_frame=test_h2o,
)
lb = aml.leaderboard
print("H2O AutoML Leaderboard:")
print(lb.head(20))
best_model = aml.leader
print(f"\nBest model: {best_model.model_id}")
predictions = best_model.predict(test_h2o).as_data_frame()
# モデルの保存
model_path = h2o.save_model(model=best_model, path='h2o_models/', force=True)
print(f"Model saved to: {model_path}")
return aml, best_model, predictions
def cleanup_h2o():
h2o.cluster().shutdown()
8. ニューラルアーキテクチャ探索(NAS)
NAS概要
ニューラルアーキテクチャ探索(NAS)は最適なニューラルネットワークアーキテクチャを自動的に見つけます。
NASの3つのコンポーネント:
- 探索空間:可能なアーキテクチャのセット
- 探索戦略:空間の探索方法(ランダム、進化的、RL、勾配ベース)
- 性能評価:候補アーキテクチャの評価方法
DARTS(Differentiable Architecture Search)
DARTSは(Liu et al., 2019)は離散的な選択の連続緩和によってアーキテクチャ探索を微分可能にします。
import torch
import torch.nn as nn
import torch.nn.functional as F
class MixedOperation(nn.Module):
"""DARTS混合演算:候補演算の重み付き和"""
def __init__(self, operations):
super().__init__()
self.ops = nn.ModuleList(operations)
self.alphas = nn.Parameter(torch.randn(len(operations)))
def forward(self, x):
weights = F.softmax(self.alphas, dim=0)
return sum(w * op(x) for w, op in zip(weights, self.ops))
class DARTSCell(nn.Module):
"""単一のDARTSセル"""
def __init__(self, in_channels, out_channels):
super().__init__()
operations = [
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.Conv2d(in_channels, out_channels, 5, padding=2),
nn.MaxPool2d(3, stride=1, padding=1),
nn.AvgPool2d(3, stride=1, padding=1),
nn.Identity() if in_channels == out_channels
else nn.Conv2d(in_channels, out_channels, 1),
]
self.mixed_op = MixedOperation(operations)
self.bn = nn.BatchNorm2d(out_channels)
def forward(self, x):
return F.relu(self.bn(self.mixed_op(x)))
class SimpleDARTS(nn.Module):
"""簡略化されたDARTSネットワーク"""
def __init__(self, num_classes=10, num_cells=6):
super().__init__()
self.stem = nn.Conv2d(3, 64, 3, padding=1)
self.cells = nn.ModuleList([DARTSCell(64, 64) for _ in range(num_cells)])
self.classifier = nn.Linear(64, num_classes)
def forward(self, x):
x = self.stem(x)
for cell in self.cells:
x = cell(x)
x = x.mean([2, 3]) # グローバル平均プーリング
return self.classifier(x)
def arch_parameters(self):
return [p for n, p in self.named_parameters() if 'alphas' in n]
def model_parameters(self):
return [p for n, p in self.named_parameters() if 'alphas' not in n]
def train_darts(model, train_loader, val_loader, epochs=50):
"""DARTSの二値最適化"""
w_optimizer = torch.optim.SGD(
model.model_parameters(), lr=0.025, momentum=0.9, weight_decay=3e-4
)
a_optimizer = torch.optim.Adam(
model.arch_parameters(), lr=3e-4, betas=(0.5, 0.999), weight_decay=1e-3
)
w_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(w_optimizer, T_max=epochs)
for epoch in range(epochs):
model.train()
train_iter = iter(train_loader)
val_iter = iter(val_loader)
for step in range(min(len(train_loader), len(val_loader))):
# ステップ1:検証データを使ってアーキテクチャパラメータを更新
try:
X_val, y_val = next(val_iter)
except StopIteration:
val_iter = iter(val_loader)
X_val, y_val = next(val_iter)
a_optimizer.zero_grad()
val_loss = F.cross_entropy(model(X_val), y_val)
val_loss.backward()
a_optimizer.step()
# ステップ2:トレーニングデータを使って重みパラメータを更新
X_train, y_train = next(train_iter)
w_optimizer.zero_grad()
train_loss = F.cross_entropy(model(X_train), y_train)
train_loss.backward()
nn.utils.clip_grad_norm_(model.model_parameters(), 5.0)
w_optimizer.step()
w_scheduler.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}")
# 発見されたアーキテクチャの抽出
for i, cell in enumerate(model.cells):
weights = F.softmax(cell.mixed_op.alphas, dim=0).detach()
best_op = weights.argmax().item()
print(f"Cell {i}: Best op index = {best_op}, weights = {weights.numpy()}")
return model
ワンショットNAS
class SuperNetwork(nn.Module):
"""ワンショットNAS:単一のスーパーネットワークからサブネットワークをサンプリング"""
def __init__(self, num_classes=10, max_channels=256):
super().__init__()
self.max_channels = max_channels
self.channel_options = [64, 128, 256]
self.conv1 = nn.Conv2d(3, max_channels, 3, padding=1)
self.conv2 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.conv3 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(max_channels)
self.bn2 = nn.BatchNorm2d(max_channels)
self.bn3 = nn.BatchNorm2d(max_channels)
self.classifier = nn.Linear(max_channels, num_classes)
def forward(self, x, arch_config=None):
if arch_config is None:
arch_config = {
'conv1_out': torch.randint(0, len(self.channel_options), (1,)).item(),
'conv2_out': torch.randint(0, len(self.channel_options), (1,)).item(),
}
c1 = self.channel_options[arch_config['conv1_out']]
c2 = self.channel_options[arch_config['conv2_out']]
x = F.relu(self.bn1(self.conv1(x)[:, :c1]))
x = F.relu(self.bn2(self.conv2(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c1))
)[:, :c2]))
x = F.relu(self.bn3(self.conv3(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c2))
)))
x = x.mean([2, 3])
return self.classifier(x)
9. パイプライン自動化
Auto-sklearn
pip install auto-sklearn
import autosklearn.classification
import autosklearn.regression
from autosklearn.metrics import roc_auc, mean_squared_error
def auto_sklearn_example(X_train, y_train, X_test, task='classification'):
"""Auto-sklearn:scikit-learn互換AutoML"""
if task == 'classification':
automl = autosklearn.classification.AutoSklearnClassifier(
time_left_for_this_task=3600,
per_run_time_limit=360,
n_jobs=-1,
memory_limit=8192,
ensemble_size=50,
ensemble_nbest=50,
max_models_on_disc=50,
include={
'classifier': [
'random_forest', 'gradient_boosting',
'extra_trees', 'liblinear_svc'
]
},
metric=roc_auc,
resampling_strategy='cv',
resampling_strategy_arguments={'folds': 5},
seed=42,
)
else:
automl = autosklearn.regression.AutoSklearnRegressor(
time_left_for_this_task=3600,
per_run_time_limit=360,
n_jobs=-1,
metric=mean_squared_error,
seed=42,
)
automl.fit(X_train, y_train)
print(automl.sprint_statistics())
print(automl.leaderboard())
predictions = automl.predict(X_test)
return automl, predictions
10. LLM時代のAutoML
AutoMLへのLLMの活用
大規模言語モデル(LLM)はAutoMLに新しい可能性を開いています:
- ハイパーパラメータ提案:LLMがデータセットの特性に基づいて初期設定を推薦
- 特徴量エンジニアリング:LLMがドメイン知識を使用して新しい特徴量のアイデアを提案
- コード生成:前処理とトレーニングコードを自動生成
- エラーデバッグ:トレーニングの失敗を診断し、解決策を提案
# LLMガイドのハイパーパラメータ最適化(概念的コード)
from openai import OpenAI
def llm_hyperparameter_suggestion(dataset_description, model_type, previous_results=None):
"""LLMを使ってハイパーパラメータを提案"""
client = OpenAI()
prompt = f"""
Dataset characteristics:
{dataset_description}
Model type: {model_type}
Previous results:
{previous_results if previous_results else 'None (first attempt)'}
Based on this information, suggest optimal hyperparameters for {model_type} in JSON format.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system",
"content": "You are a machine learning expert. Help optimize hyperparameters."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
return response.choices[0].message.content
# AutoMLエージェント(実験的)
class AutoMLAgent:
"""LLMガイドのAutoMLエージェント"""
def __init__(self, llm_client, X_train, y_train, X_val, y_val, max_iterations=10):
self.client = llm_client
self.X_train = X_train
self.y_train = y_train
self.X_val = X_val
self.y_val = y_val
self.max_iterations = max_iterations
self.history = []
self.best_score = 0
self.best_params = None
def get_next_config(self):
"""LLMに次の設定を尋ねる"""
history_str = "\n".join([
f"Iteration {i+1}: params={h['params']}, score={h['score']:.4f}"
for i, h in enumerate(self.history[-5:])
])
prompt = f"""
LightGBM parameter attempts so far:
{history_str if history_str else 'None (first attempt)'}
Suggest the next parameter combination to try in JSON format.
Valid ranges: num_leaves(10-300), learning_rate(0.001-0.3),
n_estimators(100-2000), subsample(0.5-1.0), colsample_bytree(0.5-1.0)
"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an HPO expert."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
def evaluate(self, params):
"""パラメータ設定を評価"""
import lightgbm as lgb
from sklearn.metrics import roc_auc_score
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
model.fit(self.X_train, self.y_train)
preds = model.predict_proba(self.X_val)[:, 1]
return roc_auc_score(self.y_val, preds)
def run(self):
"""AutoMLエージェントループを実行"""
for i in range(self.max_iterations):
config = self.get_next_config()
score = self.evaluate(config)
self.history.append({'params': config, 'score': score})
if score > self.best_score:
self.best_score = score
self.best_params = config
print(f"Iteration {i+1}: New best score {score:.4f}")
print(f"\nBest score: {self.best_score:.4f}")
print(f"Best params: {self.best_params}")
return self.best_params
まとめ
このガイドでAutoMLエコシステム全体を網羅しました:
- ハイパーパラメータ最適化:グリッド探索からベイズ最適化まで、体系的な直感を構築
- Optuna:最も柔軟なPythonネイティブHPOフレームワーク、プルーニングと可視化
- Ray Tune:複数のGPUとノード間での大規模分散HPO
- AutoGluon:Amazonの強力なマルチモーダルAutoML(表形式、画像、テキスト)
- FLAML:MicrosoftのコスコートEfficient AutoML、最小限のオーバーヘッド
- H2O AutoML:解釈可能性ツールを備えたエンタープライズグレードのAutoML
- NAS:DARTSとワンショット手法による最適なニューラルアーキテクチャの自動設計
- LLM + AutoML:インテリジェントで言語ガイドの自動化の次のフロンティア
主な推奨事項:
- 時間制約がある場合:FLAMLまたはAutoGluonを
good_qualityプリセットで使用 - 特定モデルのチューニング:Optunaを使用
- 大規模または分散実験:Ray Tuneを使用
- エンタープライズ環境:解釈可能性ツールのためにH2O AutoMLを活用
- LLMベースのAutoMLはまだ研究段階ですが、注目に値します
AutoMLはツールであり、魔法ではありません。ドメイン知識、データ品質、正しい評価フレームワークが成功の最も重要な要素であり続けます。
参考文献
- Optunaドキュメント
- AutoGluonドキュメント
- FLAMLドキュメント
- H2O AutoML
- Ray Tuneドキュメント
- DARTS: Differentiable Architecture Search
- Bergstra, J., & Bengio, Y. (2012). Random search for hyper-parameter optimization.
- Feurer, M., et al. (2015). Efficient and Robust Automated Machine Learning (Auto-sklearn).
- He, X., et al. (2021). AutoML: A Survey of the State-of-the-Art.