Skip to content

필사 모드: AutoML完全ガイド:AutoGluon、FLAML、Optunaで自動化されたMLパイプライン

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

1. AutoML概要

AutoMLとは?

AutoML(Automated Machine Learning)は機械学習パイプラインのさまざまな段階を自動化します。データサイエンティストがかつて手動で行っていた作業、データの前処理、特徴量エンジニアリング、モデル選択、ハイパーパラメータ最適化、アンサンブルなどをアルゴリズムが自動的に処理します。

**AutoMLが自動化すること:**

1. **データ前処理の自動化**

- 欠損値補完戦略の選択

- スケーリングと正規化手法の選択

- 外れ値処理

2. **特徴量エンジニアリングの自動化**

- 特徴量変換(対数、二乗、相互作用)

- カテゴリエンコーディング手法の選択

- 特徴量選択と生成

3. **モデル選択(アルゴリズム選択)**

- 多様なアルゴリズムの探索

- メタ学習(過去のタスクからの経験を活用)

4. **ハイパーパラメータ最適化(HPO)**

- グリッド/ランダム探索

- ベイズ最適化

- 進化的アルゴリズム

5. **アンサンブルの自動化**

- 最適なアンサンブル設定の探索

- 自動スタッキングとブレンディング

6. **ニューラルアーキテクチャ探索(NAS)**

- 最適なニューラルネットワークアーキテクチャの自動設計

AutoMLの応用分野

**産業への応用:**

- **金融**:信用リスクモデル、自動不正検出

- **医療**:診断支援システムの迅速なプロトタイピング

- **小売**:自動需要予測モデルの更新

- **製造**:品質管理モデルの自動化

**主要なオープンソースAutoMLツール:**

| ツール | 開発者 | 強み |

| ------------ | ------------------ | -------------------------------------- |

| AutoGluon | Amazon | マルチモーダル、表形式、画像、テキスト |

| FLAML | Microsoft | コスト効率、高速 |

| Optuna | Preferred Networks | HPO、可視化 |

| H2O AutoML | H2O.ai | エンタープライズ、解釈可能性 |

| Auto-sklearn | AutoML Group | scikit-learn互換 |

| Ray Tune | Anyscale | 分散HPO |

| NNI | Microsoft | NAS、HPO |

AutoMLの長所と短所

**長所:**

- 非専門家でも高品質なモデルを構築できる

- 繰り返し実験の自動化により時間を節約

- 人間が見逃すかもしれないハイパーパラメータの組み合わせを発見

- 再現可能なパイプラインを提供

**短所:**

- 計算コストが非常に高くなる可能性がある

- ドメイン知識を組み込む能力が限られている

- ブラックボックスの性質(内部動作が理解しにくい)

- 特化した問題ではカスタムソリューションがより効果的

- データ漏洩のリスク

2. ハイパーパラメータ最適化(HPO)

グリッド探索

最も単純なHPO手法 — 探索空間のすべての組み合わせを網羅的に試みます。

from sklearn.model_selection import GridSearchCV

def grid_search_example(X_train, y_train):

"""グリッド探索:網羅的(小さな探索空間でのみ実用的)"""

param_grid = {

'max_depth': [3, 5, 7],

'learning_rate': [0.01, 0.1, 0.3],

'n_estimators': [100, 300, 500],

'subsample': [0.7, 0.9],

}

組み合わせ総数: 3 * 3 * 3 * 2 = 54 * CVフォールド

model = xgb.XGBClassifier(random_state=42, n_jobs=-1)

grid_search = GridSearchCV(

model, param_grid,

cv=5, scoring='roc_auc', n_jobs=-1, verbose=1, refit=True

)

grid_search.fit(X_train, y_train)

print(f"Best params: {grid_search.best_params_}")

print(f"Best CV score: {grid_search.best_score_:.4f}")

results = pd.DataFrame(grid_search.cv_results_)

print(results.sort_values('mean_test_score', ascending=False)[

['params', 'mean_test_score', 'std_test_score']

].head(10))

return grid_search.best_estimator_

ランダム探索

Bergstra & Bengio(2012)が提案 — パラメータ分布からランダムにサンプリングし、グリッド探索よりもはるかに効率的な場合が多いです。

from sklearn.model_selection import RandomizedSearchCV

from scipy.stats import uniform, randint, loguniform

def random_search_example(X_train, y_train, n_iter=100):

"""ランダム探索:連続分布からサンプリング"""

param_distributions = {

'max_depth': randint(3, 10),

'learning_rate': loguniform(1e-3, 0.5), # [0.001, 0.5]の対数一様分布

'n_estimators': randint(100, 1000),

'subsample': uniform(0.6, 0.4), # [0.6, 1.0]の一様分布

'colsample_bytree': uniform(0.6, 0.4),

'reg_alpha': loguniform(1e-4, 10),

'reg_lambda': loguniform(1e-4, 10),

'min_child_weight': randint(1, 10),

'gamma': uniform(0, 0.5),

}

model = xgb.XGBClassifier(random_state=42, n_jobs=-1)

random_search = RandomizedSearchCV(

model, param_distributions,

n_iter=n_iter, cv=5, scoring='roc_auc',

n_jobs=-1, verbose=1, random_state=42, refit=True

)

random_search.fit(X_train, y_train)

print(f"Best params: {random_search.best_params_}")

print(f"Best CV score: {random_search.best_score_:.4f}")

return random_search.best_estimator_

ベイズ最適化

ベイズ最適化は過去の評価結果を使用して、次に評価する点をインテリジェントに選択します。

**コアコンポーネント:**

1. **代理モデル**:目的関数の確率的近似(通常はガウス過程)

2. **獲得関数**:次の評価点を決定

- EI(Expected Improvement):現在のベストに対する期待改善量

- UCB(Upper Confidence Bound):探索と活用のバランス

- PI(Probability of Improvement):現在のベストを改善する確率

**TPE(Tree-structured Parzen Estimator):**

- Optunaが使用するデフォルトアルゴリズム

- p(x|y)の代わりに2つの密度関数l(x)とg(x)をモデル化

- l(x)は良い結果(上位gamma%)につながったパラメータの分布をモデル化

- g(x)は残りをモデル化

- 次の点はl(x)/g(x)の比を最大化するように選択

3. Optuna

コアコンセプト

Preferred NetworksによってPythonネイティブHPOフレームワークとして開発されたOptunaは、シンプルさと柔軟性で知られています。

**主要コンセプト:**

- **Study**:最適化実験全体(Trialsのコレクション)

- **Trial**:単一のハイパーパラメータ設定の試み

- **Objective Function**:最適化する関数(最小化または最大化)

- **Sampler**:パラメータ提案アルゴリズム(TPE、CMA-ES、Randomなど)

- **Pruner**:有望でないTrialの早期終了

pip install optuna optuna-dashboard

from optuna.samplers import TPESampler, CmaEsSampler, RandomSampler

from optuna.pruners import MedianPruner, HyperbandPruner

from sklearn.model_selection import StratifiedKFold

from sklearn.metrics import roc_auc_score

optuna.logging.set_verbosity(optuna.logging.WARNING)

def objective_lgbm(trial, X, y):

"""LightGBM最適化のためのOptuna目的関数"""

params = {

'objective': 'binary',

'metric': 'auc',

'verbosity': -1,

'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),

'num_leaves': trial.suggest_int('num_leaves', 20, 300),

'max_depth': trial.suggest_int('max_depth', 3, 12),

'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),

'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),

'n_estimators': trial.suggest_int('n_estimators', 100, 2000),

'subsample': trial.suggest_float('subsample', 0.5, 1.0),

'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),

'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),

'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),

'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),

'min_split_gain': trial.suggest_float('min_split_gain', 0, 1),

'n_jobs': -1,

}

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

cv_scores = []

for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):

X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]

y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

train_data = lgb.Dataset(X_train, y_train)

val_data = lgb.Dataset(X_val, y_val, reference=train_data)

model = lgb.train(

params, train_data,

num_boost_round=params['n_estimators'],

valid_sets=[val_data],

callbacks=[

lgb.early_stopping(stopping_rounds=50, verbose=False),

lgb.log_evaluation(-1),

],

)

preds = model.predict(X_val)

fold_score = roc_auc_score(y_val, preds)

cv_scores.append(fold_score)

プルーニングのための中間結果の報告

trial.report(fold_score, fold)

if trial.should_prune():

raise optuna.exceptions.TrialPruned()

return np.mean(cv_scores)

def run_optuna_study(X, y, n_trials=100, n_jobs=1):

"""TPEサンプラーとメジアンプルーニングでOptunaスタディを実行"""

sampler = TPESampler(

n_startup_trials=20,

n_ei_candidates=24,

multivariate=True,

seed=42

)

pruner = MedianPruner(

n_startup_trials=5,

n_warmup_steps=10,

interval_steps=1

)

study = optuna.create_study(

direction='maximize',

sampler=sampler,

pruner=pruner,

study_name='lgbm_optimization',

storage='sqlite:///optuna.db', # 結果を永続化

load_if_exists=True, # 既存のスタディを再開

)

study.optimize(

lambda trial: objective_lgbm(trial, X, y),

n_trials=n_trials,

n_jobs=n_jobs,

show_progress_bar=True,

)

print(f"\nBest params:")

for key, value in study.best_params.items():

print(f" {key}: {value}")

print(f"Best AUC: {study.best_value:.4f}")

print(f"Completed trials: {len(study.trials)}")

pruned = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]

print(f"Pruned trials: {len(pruned)}")

return study

def visualize_optuna_study(study):

"""Optuna最適化結果の可視化"""

vis.plot_optimization_history(study).show()

vis.plot_param_importances(study).show()

vis.plot_parallel_coordinate(study).show()

vis.plot_slice(study).show()

vis.plot_contour(study, params=['learning_rate', 'num_leaves']).show()

PyTorch + Optuna完全例

from torch.utils.data import DataLoader, TensorDataset

def create_model(trial, input_dim):

"""Optunaトライアルパラメータからニューラルネットワークを動的に構築"""

n_layers = trial.suggest_int('n_layers', 1, 4)

dropout = trial.suggest_float('dropout', 0.1, 0.5)

activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'elu'])

activation_map = {'relu': nn.ReLU(), 'tanh': nn.Tanh(), 'elu': nn.ELU()}

layers = []

in_features = input_dim

for i in range(n_layers):

out_features = trial.suggest_int(f'n_units_l{i}', 32, 512)

layers.extend([

nn.Linear(in_features, out_features),

nn.BatchNorm1d(out_features),

activation_map[activation_name],

nn.Dropout(dropout),

])

in_features = out_features

layers.extend([nn.Linear(in_features, 1), nn.Sigmoid()])

return nn.Sequential(*layers)

def objective_pytorch(trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim):

"""PyTorchニューラルネットワーク用Optuna目的関数"""

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = create_model(trial, input_dim).to(device)

lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)

optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])

batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])

weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)

if optimizer_name == 'Adam':

optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

elif optimizer_name == 'RMSprop':

optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)

else:

optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay,

momentum=0.9)

scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)

criterion = nn.BCELoss()

train_dataset = TensorDataset(

X_train_t.to(device),

y_train_t.to(device).float().unsqueeze(1)

)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

best_val_loss = float('inf')

patience_counter = 0

patience = 10

for epoch in range(100):

model.train()

for X_batch, y_batch in train_loader:

optimizer.zero_grad()

loss = criterion(model(X_batch), y_batch)

loss.backward()

optimizer.step()

scheduler.step()

model.eval()

with torch.no_grad():

val_loss = criterion(

model(X_val_t.to(device)),

y_val_t.to(device).float().unsqueeze(1)

).item()

trial.report(val_loss, epoch)

if trial.should_prune():

raise optuna.exceptions.TrialPruned()

if val_loss < best_val_loss:

best_val_loss = val_loss

patience_counter = 0

else:

patience_counter += 1

if patience_counter >= patience:

break

return best_val_loss

def run_pytorch_optuna(X_train, y_train, X_val, y_val, n_trials=50):

X_train_t = torch.FloatTensor(X_train.values if hasattr(X_train, 'values') else X_train)

y_train_t = torch.FloatTensor(y_train.values if hasattr(y_train, 'values') else y_train)

X_val_t = torch.FloatTensor(X_val.values if hasattr(X_val, 'values') else X_val)

y_val_t = torch.FloatTensor(y_val.values if hasattr(y_val, 'values') else y_val)

input_dim = X_train_t.shape[1]

study = optuna.create_study(

direction='minimize',

pruner=optuna.pruners.HyperbandPruner(

min_resource=5, max_resource=100, reduction_factor=3

),

sampler=TPESampler(seed=42)

)

study.optimize(

lambda trial: objective_pytorch(

trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim

),

n_trials=n_trials,

show_progress_bar=True,

)

print(f"Best val loss: {study.best_value:.4f}")

print(f"Best params: {study.best_params}")

return study

CMA-ESサンプラー

CMA-ESは連続ハイパーパラメータ空間でより効率的

study_cmaes = optuna.create_study(

direction='maximize',

sampler=CmaEsSampler(

n_startup_trials=10,

restart_strategy='ipop', # 局所最適からの脱出のための再起動戦略

seed=42

)

)

4. Ray Tune

Ray Tuneによる分散HPO

AnyscaleのRay Tuneは、複数のGPUとノード間での並列トレーニングを自動的に処理します。

pip install ray[tune] ray[air]

from ray import tune

from ray.tune import CLIReporter

from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining

from ray.tune.search.optuna import OptunaSearch

ray.init(ignore_reinit_error=True)

def train_with_tune(config, data=None):

"""Ray Tuneが呼び出すトレーニング関数"""

X_train, y_train, X_val, y_val = data

model = nn.Sequential(

nn.Linear(X_train.shape[1], config['hidden_size']),

nn.ReLU(),

nn.Dropout(config['dropout']),

nn.Linear(config['hidden_size'], config['hidden_size'] // 2),

nn.ReLU(),

nn.Linear(config['hidden_size'] // 2, 1),

nn.Sigmoid()

)

optimizer = torch.optim.Adam(

model.parameters(), lr=config['lr'], weight_decay=config['weight_decay']

)

criterion = nn.BCELoss()

X_train_t = torch.FloatTensor(X_train)

y_train_t = torch.FloatTensor(y_train).unsqueeze(1)

X_val_t = torch.FloatTensor(X_val)

y_val_t = torch.FloatTensor(y_val).unsqueeze(1)

for epoch in range(config['max_epochs']):

model.train()

optimizer.zero_grad()

loss = criterion(model(X_train_t), y_train_t)

loss.backward()

optimizer.step()

if epoch % 5 == 0:

model.eval()

with torch.no_grad():

val_loss = criterion(model(X_val_t), y_val_t).item()

tune.report(val_loss=val_loss, training_iteration=epoch)

def run_ray_tune(X_train, y_train, X_val, y_val, num_samples=50):

"""Ray Tuneで分散HPOを実行"""

config = {

'hidden_size': tune.choice([64, 128, 256, 512]),

'dropout': tune.uniform(0.1, 0.5),

'lr': tune.loguniform(1e-5, 1e-1),

'weight_decay': tune.loguniform(1e-8, 1e-3),

'max_epochs': tune.choice([50, 100, 200]),

}

ASHA:非同期逐次分割法

scheduler = ASHAScheduler(

metric='val_loss',

mode='min',

max_t=200, # 最大エポック

grace_period=10, # プルーニング前の最小エポック

reduction_factor=3,

)

search_alg = OptunaSearch(metric='val_loss', mode='min')

result = tune.run(

tune.with_parameters(

train_with_tune,

data=(X_train, y_train, X_val, y_val)

),

config=config,

num_samples=num_samples,

scheduler=scheduler,

search_alg=search_alg,

progress_reporter=CLIReporter(

metric_columns=['val_loss', 'training_iteration'],

max_progress_rows=10

),

verbose=1,

resources_per_trial={'cpu': 2, 'gpu': 0},

)

best_trial = result.get_best_trial('val_loss', 'min', 'last')

print(f"Best val loss: {best_trial.last_result['val_loss']:.4f}")

print(f"Best config: {best_trial.config}")

return result

def run_pbt(X_train, y_train, X_val, y_val):

"""Population Based Training:トレーニング中にハイパーパラメータを動的に変化させる"""

pbt_scheduler = PopulationBasedTraining(

time_attr='training_iteration',

metric='val_loss',

mode='min',

perturbation_interval=20,

hyperparam_mutations={

'lr': tune.loguniform(1e-5, 1e-1),

'dropout': tune.uniform(0.1, 0.5),

},

quantile_fraction=0.25, # 下位25%を上位25%に置き換える

)

result = tune.run(

tune.with_parameters(

train_with_tune,

data=(X_train, y_train, X_val, y_val)

),

config={

'hidden_size': 256,

'dropout': tune.uniform(0.1, 0.5),

'lr': tune.loguniform(1e-4, 1e-1),

'weight_decay': 1e-5,

'max_epochs': 200,

},

num_samples=8,

scheduler=pbt_scheduler,

verbose=1,

)

return result

5. AutoGluon

AutoGluon概要

AmazonのAutoGluonは、最小限のコード(時には3行)でKaggleレベルのパフォーマンスを達成します。

pip install autogluon

表形式データ(TabularPredictor)

from autogluon.tabular import TabularPredictor

def autogluon_tabular_example(train_df, test_df, target_col, eval_metric='roc_auc'):

"""AutoGluon表形式トレーニング"""

predictor = TabularPredictor(

label=target_col,

eval_metric=eval_metric,

path='autogluon_models/',

problem_type='binary', # 'binary'、'multiclass'、'regression'、'softclass'

)

predictor.fit(

train_data=train_df,

time_limit=3600,

presets='best_quality', # 'best_quality'、'good_quality'、'medium_quality'、

'optimize_for_deployment'

excluded_model_types=['KNN'],

verbosity=2,

)

leaderboard = predictor.leaderboard(test_df, silent=True)

print(leaderboard[['model', 'score_test', 'score_val', 'pred_time_test']].head(10))

predictions = predictor.predict(test_df)

pred_proba = predictor.predict_proba(test_df)

feature_importance = predictor.feature_importance(test_df)

print(feature_importance.head(20))

return predictor, predictions, pred_proba

def autogluon_advanced(train_df, test_df, target_col):

"""カスタムハイパーパラメータを使ったAutoGluon"""

hyperparameters = {

'GBM': [

{'num_boost_round': 300, 'ag_args': {'name_suffix': 'fast'}},

{'num_boost_round': 1000, 'learning_rate': 0.03,

'ag_args': {'name_suffix': 'slow', 'priority': 0}},

],

'XGB': [{'n_estimators': 300, 'max_depth': 6}],

'CAT': [{'iterations': 500, 'depth': 6}],

'NN_TORCH': [{'num_epochs': 50, 'learning_rate': 1e-3, 'dropout_prob': 0.1}],

'RF': [{'n_estimators': 300}],

}

predictor = TabularPredictor(

label=target_col, eval_metric='roc_auc', path='autogluon_advanced/'

)

predictor.fit(

train_data=train_df,

hyperparameters=hyperparameters,

time_limit=7200,

num_stack_levels=1, # スタッキングレベル数

num_bag_folds=5, # バギングのCVフォールド数

num_bag_sets=1, # バギングセット数

verbosity=3,

)

return predictor

マルチモーダル学習

from autogluon.multimodal import MultiModalPredictor

def autogluon_image_classification(train_df, test_df, label_col):

"""AutoGluon画像分類"""

predictor = MultiModalPredictor(label=label_col)

predictor.fit(

train_data=train_df,

time_limit=3600,

hyperparameters={

'model.timm_image.checkpoint_name': 'efficientnet_b4',

'optimization.learning_rate': 1e-4,

'optimization.max_epochs': 20,

}

)

return predictor

def autogluon_multimodal(train_df, test_df, target_col):

"""AutoGluonマルチモーダル:テキスト + 表形式特徴量を組み合わせる"""

predictor = MultiModalPredictor(label=target_col, problem_type='binary')

predictor.fit(

train_data=train_df,

time_limit=3600,

hyperparameters={

'model.hf_text.checkpoint_name': 'bert-base-uncased',

}

)

return predictor

6. FLAML

Microsoft FLAML

Microsoft ResearchのFLAML(Fast and Lightweight AutoML)はコスト効率の高い自動化に特化しています。

pip install flaml

from flaml import AutoML

def flaml_basic_example(X_train, y_train, X_test, task='classification'):

"""FLAMLの基本的な使用法"""

automl = AutoML()

automl_settings = {

'time_budget': 300,

'metric': 'roc_auc',

'task': task, # 'classification'、'regression'、'ranking'

'estimator_list': [

'lgbm', 'xgboost', 'catboost',

'rf', 'extra_tree', 'lrl1', 'lrl2', 'kneighbor'

],

'log_file_name': 'flaml_log.log',

'seed': 42,

'n_jobs': -1,

'verbose': 1,

'retrain_full': True, # 最終モデルを全データで再トレーニング

'max_iter': 100,

'ensemble': True,

'eval_method': 'cv',

'n_splits': 5,

}

automl.fit(X_train, y_train, **automl_settings)

print(f"Best estimator: {automl.best_estimator}")

print(f"Best loss: {automl.best_loss:.4f}")

print(f"Best config: {automl.best_config}")

print(f"Time to find best model: {automl.time_to_find_best_model:.1f}s")

predictions = automl.predict(X_test)

pred_proba = automl.predict_proba(X_test)

return automl, predictions, pred_proba

def flaml_sklearn_pipeline(X_train, y_train, X_test):

"""FLAMLをscikit-learnパイプラインに統合"""

from sklearn.pipeline import Pipeline

from sklearn.preprocessing import StandardScaler

automl = AutoML()

pipeline = Pipeline([

('scaler', StandardScaler()),

('automl', automl),

])

pipeline.fit(

X_train, y_train,

automl__time_budget=120,

automl__metric='roc_auc',

automl__task='classification',

)

return pipeline

def flaml_custom_objective(X_train, y_train):

"""カスタム評価指標を使ったFLAML"""

def custom_metric(

X_val, y_val, estimator, labels, X_train, y_train,

weight_val=None, weight_train=None, *args

):

"""F-betaスコアを最適化"""

from sklearn.metrics import fbeta_score

y_pred = estimator.predict(X_val)

score = fbeta_score(y_val, y_pred, beta=2, average='weighted')

return -score, {'f2_score': score} # (loss、metrics_dict)

automl = AutoML()

automl.fit(

X_train, y_train,

metric=custom_metric,

task='classification',

time_budget=120,

)

return automl

7. H2O AutoML

H2Oクラスター

H2O AutoMLは幅広い解釈可能性ツールを備えたエンタープライズグレードのAutoMLプラットフォームです。

pip install h2o

from h2o.automl import H2OAutoML

def h2o_automl_example(train_df, test_df, target_col, max_models=20):

"""H2O AutoMLのエンドツーエンド例"""

h2o.init(nthreads=-1, max_mem_size='8G', port=54321)

train_h2o = h2o.H2OFrame(train_df)

test_h2o = h2o.H2OFrame(test_df)

分類のためにターゲットをfactorとしてマーク

train_h2o[target_col] = train_h2o[target_col].asfactor()

feature_cols = [col for col in train_df.columns if col != target_col]

aml = H2OAutoML(

max_models=max_models,

max_runtime_secs=3600,

seed=42,

sort_metric='AUC',

balance_classes=False,

include_algos=[

'GBM', 'GLM', 'DRF', 'DeepLearning',

'StackedEnsemble', 'XGBoost'

],

keep_cross_validation_predictions=True,

keep_cross_validation_models=True,

nfolds=5,

verbosity='info',

)

aml.fit(

x=feature_cols, y=target_col,

training_frame=train_h2o,

leaderboard_frame=test_h2o,

)

lb = aml.leaderboard

print("H2O AutoML Leaderboard:")

print(lb.head(20))

best_model = aml.leader

print(f"\nBest model: {best_model.model_id}")

predictions = best_model.predict(test_h2o).as_data_frame()

モデルの保存

model_path = h2o.save_model(model=best_model, path='h2o_models/', force=True)

print(f"Model saved to: {model_path}")

return aml, best_model, predictions

def cleanup_h2o():

h2o.cluster().shutdown()

8. ニューラルアーキテクチャ探索(NAS)

NAS概要

ニューラルアーキテクチャ探索(NAS)は最適なニューラルネットワークアーキテクチャを自動的に見つけます。

**NASの3つのコンポーネント:**

1. **探索空間**:可能なアーキテクチャのセット

2. **探索戦略**:空間の探索方法(ランダム、進化的、RL、勾配ベース)

3. **性能評価**:候補アーキテクチャの評価方法

DARTS(Differentiable Architecture Search)

DARTSは(Liu et al., 2019)は離散的な選択の連続緩和によってアーキテクチャ探索を微分可能にします。

class MixedOperation(nn.Module):

"""DARTS混合演算:候補演算の重み付き和"""

def __init__(self, operations):

super().__init__()

self.ops = nn.ModuleList(operations)

self.alphas = nn.Parameter(torch.randn(len(operations)))

def forward(self, x):

weights = F.softmax(self.alphas, dim=0)

return sum(w * op(x) for w, op in zip(weights, self.ops))

class DARTSCell(nn.Module):

"""単一のDARTSセル"""

def __init__(self, in_channels, out_channels):

super().__init__()

operations = [

nn.Conv2d(in_channels, out_channels, 3, padding=1),

nn.Conv2d(in_channels, out_channels, 5, padding=2),

nn.MaxPool2d(3, stride=1, padding=1),

nn.AvgPool2d(3, stride=1, padding=1),

nn.Identity() if in_channels == out_channels

else nn.Conv2d(in_channels, out_channels, 1),

]

self.mixed_op = MixedOperation(operations)

self.bn = nn.BatchNorm2d(out_channels)

def forward(self, x):

return F.relu(self.bn(self.mixed_op(x)))

class SimpleDARTS(nn.Module):

"""簡略化されたDARTSネットワーク"""

def __init__(self, num_classes=10, num_cells=6):

super().__init__()

self.stem = nn.Conv2d(3, 64, 3, padding=1)

self.cells = nn.ModuleList([DARTSCell(64, 64) for _ in range(num_cells)])

self.classifier = nn.Linear(64, num_classes)

def forward(self, x):

x = self.stem(x)

for cell in self.cells:

x = cell(x)

x = x.mean([2, 3]) # グローバル平均プーリング

return self.classifier(x)

def arch_parameters(self):

return [p for n, p in self.named_parameters() if 'alphas' in n]

def model_parameters(self):

return [p for n, p in self.named_parameters() if 'alphas' not in n]

def train_darts(model, train_loader, val_loader, epochs=50):

"""DARTSの二値最適化"""

w_optimizer = torch.optim.SGD(

model.model_parameters(), lr=0.025, momentum=0.9, weight_decay=3e-4

)

a_optimizer = torch.optim.Adam(

model.arch_parameters(), lr=3e-4, betas=(0.5, 0.999), weight_decay=1e-3

)

w_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(w_optimizer, T_max=epochs)

for epoch in range(epochs):

model.train()

train_iter = iter(train_loader)

val_iter = iter(val_loader)

for step in range(min(len(train_loader), len(val_loader))):

ステップ1:検証データを使ってアーキテクチャパラメータを更新

try:

X_val, y_val = next(val_iter)

except StopIteration:

val_iter = iter(val_loader)

X_val, y_val = next(val_iter)

a_optimizer.zero_grad()

val_loss = F.cross_entropy(model(X_val), y_val)

val_loss.backward()

a_optimizer.step()

ステップ2:トレーニングデータを使って重みパラメータを更新

X_train, y_train = next(train_iter)

w_optimizer.zero_grad()

train_loss = F.cross_entropy(model(X_train), y_train)

train_loss.backward()

nn.utils.clip_grad_norm_(model.model_parameters(), 5.0)

w_optimizer.step()

w_scheduler.step()

if epoch % 10 == 0:

print(f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}")

発見されたアーキテクチャの抽出

for i, cell in enumerate(model.cells):

weights = F.softmax(cell.mixed_op.alphas, dim=0).detach()

best_op = weights.argmax().item()

print(f"Cell {i}: Best op index = {best_op}, weights = {weights.numpy()}")

return model

ワンショットNAS

class SuperNetwork(nn.Module):

"""ワンショットNAS:単一のスーパーネットワークからサブネットワークをサンプリング"""

def __init__(self, num_classes=10, max_channels=256):

super().__init__()

self.max_channels = max_channels

self.channel_options = [64, 128, 256]

self.conv1 = nn.Conv2d(3, max_channels, 3, padding=1)

self.conv2 = nn.Conv2d(max_channels, max_channels, 3, padding=1)

self.conv3 = nn.Conv2d(max_channels, max_channels, 3, padding=1)

self.bn1 = nn.BatchNorm2d(max_channels)

self.bn2 = nn.BatchNorm2d(max_channels)

self.bn3 = nn.BatchNorm2d(max_channels)

self.classifier = nn.Linear(max_channels, num_classes)

def forward(self, x, arch_config=None):

if arch_config is None:

arch_config = {

'conv1_out': torch.randint(0, len(self.channel_options), (1,)).item(),

'conv2_out': torch.randint(0, len(self.channel_options), (1,)).item(),

}

c1 = self.channel_options[arch_config['conv1_out']]

c2 = self.channel_options[arch_config['conv2_out']]

x = F.relu(self.bn1(self.conv1(x)[:, :c1]))

x = F.relu(self.bn2(self.conv2(

F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c1))

)[:, :c2]))

x = F.relu(self.bn3(self.conv3(

F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c2))

)))

x = x.mean([2, 3])

return self.classifier(x)

9. パイプライン自動化

Auto-sklearn

pip install auto-sklearn

from autosklearn.metrics import roc_auc, mean_squared_error

def auto_sklearn_example(X_train, y_train, X_test, task='classification'):

"""Auto-sklearn:scikit-learn互換AutoML"""

if task == 'classification':

automl = autosklearn.classification.AutoSklearnClassifier(

time_left_for_this_task=3600,

per_run_time_limit=360,

n_jobs=-1,

memory_limit=8192,

ensemble_size=50,

ensemble_nbest=50,

max_models_on_disc=50,

include={

'classifier': [

'random_forest', 'gradient_boosting',

'extra_trees', 'liblinear_svc'

]

},

metric=roc_auc,

resampling_strategy='cv',

resampling_strategy_arguments={'folds': 5},

seed=42,

)

else:

automl = autosklearn.regression.AutoSklearnRegressor(

time_left_for_this_task=3600,

per_run_time_limit=360,

n_jobs=-1,

metric=mean_squared_error,

seed=42,

)

automl.fit(X_train, y_train)

print(automl.sprint_statistics())

print(automl.leaderboard())

predictions = automl.predict(X_test)

return automl, predictions

10. LLM時代のAutoML

AutoMLへのLLMの活用

大規模言語モデル(LLM)はAutoMLに新しい可能性を開いています:

1. **ハイパーパラメータ提案**:LLMがデータセットの特性に基づいて初期設定を推薦

2. **特徴量エンジニアリング**:LLMがドメイン知識を使用して新しい特徴量のアイデアを提案

3. **コード生成**:前処理とトレーニングコードを自動生成

4. **エラーデバッグ**:トレーニングの失敗を診断し、解決策を提案

LLMガイドのハイパーパラメータ最適化(概念的コード)

from openai import OpenAI

def llm_hyperparameter_suggestion(dataset_description, model_type, previous_results=None):

"""LLMを使ってハイパーパラメータを提案"""

client = OpenAI()

prompt = f"""

Dataset characteristics:

{dataset_description}

Model type: {model_type}

Previous results:

{previous_results if previous_results else 'None (first attempt)'}

Based on this information, suggest optimal hyperparameters for {model_type} in JSON format.

"""

response = client.chat.completions.create(

model="gpt-4",

messages=[

{"role": "system",

"content": "You are a machine learning expert. Help optimize hyperparameters."},

{"role": "user", "content": prompt}

],

response_format={"type": "json_object"}

)

return response.choices[0].message.content

AutoMLエージェント(実験的)

class AutoMLAgent:

"""LLMガイドのAutoMLエージェント"""

def __init__(self, llm_client, X_train, y_train, X_val, y_val, max_iterations=10):

self.client = llm_client

self.X_train = X_train

self.y_train = y_train

self.X_val = X_val

self.y_val = y_val

self.max_iterations = max_iterations

self.history = []

self.best_score = 0

self.best_params = None

def get_next_config(self):

"""LLMに次の設定を尋ねる"""

history_str = "\n".join([

f"Iteration {i+1}: params={h['params']}, score={h['score']:.4f}"

for i, h in enumerate(self.history[-5:])

])

prompt = f"""

LightGBM parameter attempts so far:

{history_str if history_str else 'None (first attempt)'}

Suggest the next parameter combination to try in JSON format.

Valid ranges: num_leaves(10-300), learning_rate(0.001-0.3),

n_estimators(100-2000), subsample(0.5-1.0), colsample_bytree(0.5-1.0)

"""

response = self.client.chat.completions.create(

model="gpt-4",

messages=[

{"role": "system", "content": "You are an HPO expert."},

{"role": "user", "content": prompt}

],

response_format={"type": "json_object"}

)

return json.loads(response.choices[0].message.content)

def evaluate(self, params):

"""パラメータ設定を評価"""

from sklearn.metrics import roc_auc_score

model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)

model.fit(self.X_train, self.y_train)

preds = model.predict_proba(self.X_val)[:, 1]

return roc_auc_score(self.y_val, preds)

def run(self):

"""AutoMLエージェントループを実行"""

for i in range(self.max_iterations):

config = self.get_next_config()

score = self.evaluate(config)

self.history.append({'params': config, 'score': score})

if score > self.best_score:

self.best_score = score

self.best_params = config

print(f"Iteration {i+1}: New best score {score:.4f}")

print(f"\nBest score: {self.best_score:.4f}")

print(f"Best params: {self.best_params}")

return self.best_params

まとめ

このガイドでAutoMLエコシステム全体を網羅しました:

1. **ハイパーパラメータ最適化**:グリッド探索からベイズ最適化まで、体系的な直感を構築

2. **Optuna**:最も柔軟なPythonネイティブHPOフレームワーク、プルーニングと可視化

3. **Ray Tune**:複数のGPUとノード間での大規模分散HPO

4. **AutoGluon**:Amazonの強力なマルチモーダルAutoML(表形式、画像、テキスト)

5. **FLAML**:MicrosoftのコスコートEfficient AutoML、最小限のオーバーヘッド

6. **H2O AutoML**:解釈可能性ツールを備えたエンタープライズグレードのAutoML

7. **NAS**:DARTSとワンショット手法による最適なニューラルアーキテクチャの自動設計

8. **LLM + AutoML**:インテリジェントで言語ガイドの自動化の次のフロンティア

**主な推奨事項:**

- 時間制約がある場合:FLAMLまたはAutoGluonを`good_quality`プリセットで使用

- 特定モデルのチューニング:Optunaを使用

- 大規模または分散実験:Ray Tuneを使用

- エンタープライズ環境:解釈可能性ツールのためにH2O AutoMLを活用

- LLMベースのAutoMLはまだ研究段階ですが、注目に値します

AutoMLはツールであり、魔法ではありません。ドメイン知識、データ品質、正しい評価フレームワークが成功の最も重要な要素であり続けます。

参考文献

- [Optunaドキュメント](https://optuna.org/)

- [AutoGluonドキュメント](https://auto.gluon.ai/)

- [FLAMLドキュメント](https://microsoft.github.io/FLAML/)

- [H2O AutoML](https://h2o.ai/products/h2o-automl/)

- [Ray Tuneドキュメント](https://docs.ray.io/en/latest/tune/index.html)

- [DARTS: Differentiable Architecture Search](https://arxiv.org/abs/1806.09055)

- Bergstra, J., & Bengio, Y. (2012). Random search for hyper-parameter optimization.

- Feurer, M., et al. (2015). Efficient and Robust Automated Machine Learning (Auto-sklearn).

- He, X., et al. (2021). AutoML: A Survey of the State-of-the-Art.

현재 단락 (1/854)

AutoML(Automated Machine Learning)は機械学習パイプラインのさまざまな段階を自動化します。データサイエンティストがかつて手動で行っていた作業、データの前処理、特徴量エン...

작성 글자: 0원문 글자: 26,873작성 단락: 0/854