Split View: AutoML 완전 가이드: AutoGluon, FLAML, Optuna로 자동화된 ML 파이프라인
AutoML 완전 가이드: AutoGluon, FLAML, Optuna로 자동화된 ML 파이프라인
1. AutoML 개요
AutoML이란?
AutoML(Automated Machine Learning)은 머신러닝 파이프라인의 다양한 단계를 자동화하는 기술입니다. 데이터 과학자가 수동으로 수행하던 작업들 — 데이터 전처리, 특성 엔지니어링, 모델 선택, 하이퍼파라미터 최적화, 앙상블 — 을 알고리즘이 자동으로 수행합니다.
AutoML이 자동화하는 영역:
-
데이터 전처리 자동화
- 결측치 대체 전략 선택
- 스케일링/정규화 방법 선택
- 이상치 처리
-
피처 엔지니어링 자동화
- 특성 변환 (log, 제곱, 상호작용)
- 범주형 인코딩 방법 선택
- 특성 선택 및 생성
-
모델 선택 (Algorithm Selection)
- 다양한 알고리즘 탐색
- 메타 학습 (이전 경험 활용)
-
하이퍼파라미터 최적화 (HPO)
- 그리드/랜덤 서치
- 베이지안 최적화
- 진화 알고리즘
-
앙상블 자동화
- 최적 앙상블 구성 탐색
- 스태킹, 블렌딩 자동화
-
신경망 구조 탐색 (NAS)
- 최적 신경망 아키텍처 자동 설계
AutoML의 응용 분야
산업 응용:
- 금융: 신용 위험 모델, 사기 탐지 자동화
- 의료: 진단 보조 시스템의 빠른 프로토타입
- 유통: 수요 예측 모델 자동 갱신
- 제조: 품질 관리 모델 자동화
주요 오픈소스 AutoML 도구:
| 도구 | 개발사 | 강점 |
|---|---|---|
| AutoGluon | Amazon | 멀티모달, 표 형식, 이미지, 텍스트 |
| FLAML | Microsoft | 비용 효율적, 빠름 |
| Optuna | Preferred Networks | HPO, 시각화 |
| H2O AutoML | H2O.ai | 기업용, 해석 가능 |
| Auto-sklearn | AutoML Group | scikit-learn 기반 |
| Ray Tune | Anyscale | 분산 HPO |
| NNI | Microsoft | NAS, HPO |
AutoML의 장단점
장점:
- 비전문가도 고품질 모델 구축 가능
- 반복적인 실험을 자동화하여 시간 절약
- 인간이 놓치는 하이퍼파라미터 조합 발견 가능
- 재현 가능한 파이프라인 제공
단점:
- 계산 비용이 매우 높을 수 있음
- 도메인 지식 활용에 한계
- 블랙박스 특성 (내부 동작 이해 어려움)
- 특수한 문제에는 맞춤형 솔루션이 더 효과적
- 데이터 리키지 위험성
2. 하이퍼파라미터 최적화 (HPO)
그리드 서치 (Grid Search)
가장 단순한 HPO 방법으로, 모든 하이퍼파라미터 조합을 완전 탐색합니다.
from sklearn.model_selection import GridSearchCV
import xgboost as xgb
def grid_search_example(X_train, y_train):
"""그리드 서치: 완전 탐색 (작은 파라미터 공간에만 적합)"""
param_grid = {
'max_depth': [3, 5, 7],
'learning_rate': [0.01, 0.1, 0.3],
'n_estimators': [100, 300, 500],
'subsample': [0.7, 0.9],
}
# 총 조합 수: 3 * 3 * 3 * 2 = 54가지 x CV 폴드 수
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
grid_search = GridSearchCV(
model, param_grid,
cv=5,
scoring='roc_auc',
n_jobs=-1,
verbose=1,
refit=True # 최적 파라미터로 전체 데이터 재학습
)
grid_search.fit(X_train, y_train)
print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최적 CV 점수: {grid_search.best_score_:.4f}")
# 결과 DataFrame으로 분석
results = pd.DataFrame(grid_search.cv_results_)
results_sorted = results.sort_values('mean_test_score', ascending=False)
print(results_sorted[['params', 'mean_test_score', 'std_test_score']].head(10))
return grid_search.best_estimator_
랜덤 서치 (Random Search)
Bergstra & Bengio(2012)가 제안한 방법으로, 파라미터 공간에서 무작위로 샘플링합니다.
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint, loguniform
def random_search_example(X_train, y_train, n_iter=100):
"""랜덤 서치: 연속 분포에서 샘플링하여 효율적 탐색"""
param_distributions = {
'max_depth': randint(3, 10),
'learning_rate': loguniform(1e-3, 0.5), # 로그 균등 분포
'n_estimators': randint(100, 1000),
'subsample': uniform(0.6, 0.4), # [0.6, 1.0]
'colsample_bytree': uniform(0.6, 0.4),
'reg_alpha': loguniform(1e-4, 10),
'reg_lambda': loguniform(1e-4, 10),
'min_child_weight': randint(1, 10),
'gamma': uniform(0, 0.5),
}
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
random_search = RandomizedSearchCV(
model, param_distributions,
n_iter=n_iter, # 시도할 조합 수
cv=5,
scoring='roc_auc',
n_jobs=-1,
verbose=1,
random_state=42,
refit=True
)
random_search.fit(X_train, y_train)
print(f"최적 파라미터: {random_search.best_params_}")
print(f"최적 CV 점수: {random_search.best_score_:.4f}")
return random_search.best_estimator_
베이지안 최적화 (Bayesian Optimization)
베이지안 최적화는 이전 평가 결과를 활용하여 다음 탐색 지점을 지능적으로 선택합니다.
핵심 구성요소:
- 대리 모델 (Surrogate Model): 목적 함수의 확률적 근사 (주로 가우시안 프로세스)
- 획득 함수 (Acquisition Function): 다음 탐색 지점 결정
- EI (Expected Improvement): 현재 최솟값 대비 기댓값 개선
- UCB (Upper Confidence Bound): 탐색/활용 균형
- PI (Probability of Improvement): 개선 확률
TPE (Tree-structured Parzen Estimator):
- Optuna에서 기본으로 사용하는 알고리즘
- p(x|y) 대신 두 개의 밀도 모델 l(x), g(x)를 학습
- 좋은 결과(상위 gamma%)를 낸 파라미터의 분포 l(x)와 나머지 g(x) 모델링
- EI를 극대화하는 파라미터는 l(x)/g(x) 비율이 큰 지점
3. Optuna
Optuna 기본 개념
Optuna는 Preferred Networks에서 개발한 하이퍼파라미터 최적화 프레임워크로, Python-native이고 사용하기 매우 쉽습니다.
핵심 개념:
- Study: 최적화 실험 전체 (여러 Trial의 집합)
- Trial: 단일 하이퍼파라미터 조합 시도
- Objective Function: 최적화할 목적 함수 (minimize 또는 maximize)
- Sampler: 파라미터 제안 알고리즘 (TPE, CMA-ES, 랜덤 등)
- Pruner: 유망하지 않은 Trial 조기 종료
pip install optuna optuna-dashboard
import optuna
from optuna.samplers import TPESampler, CmaEsSampler, RandomSampler
from optuna.pruners import MedianPruner, HyperbandPruner
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import numpy as np
# 로그 레벨 설정
optuna.logging.set_verbosity(optuna.logging.WARNING)
def objective_lgbm(trial, X, y):
"""LightGBM 최적화를 위한 Optuna 목적 함수"""
# 하이퍼파라미터 탐색 공간 정의
params = {
'objective': 'binary',
'metric': 'auc',
'verbosity': -1,
'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
'num_leaves': trial.suggest_int('num_leaves', 20, 300),
'max_depth': trial.suggest_int('max_depth', 3, 12),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),
'n_estimators': trial.suggest_int('n_estimators', 100, 2000),
'subsample': trial.suggest_float('subsample', 0.5, 1.0),
'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
'min_split_gain': trial.suggest_float('min_split_gain', 0, 1),
'cat_smooth': trial.suggest_int('cat_smooth', 1, 100),
'n_jobs': -1,
}
# 5-Fold CV로 평가
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
train_data = lgb.Dataset(X_train, y_train)
val_data = lgb.Dataset(X_val, y_val, reference=train_data)
callbacks = [
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(-1),
]
model = lgb.train(
params, train_data,
num_boost_round=params['n_estimators'],
valid_sets=[val_data],
callbacks=callbacks,
)
preds = model.predict(X_val)
fold_score = roc_auc_score(y_val, preds)
cv_scores.append(fold_score)
# 가지치기 (pruning)
trial.report(fold_score, fold)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
return np.mean(cv_scores)
def run_optuna_study(X, y, n_trials=100, n_jobs=1):
"""Optuna Study 실행"""
# Sampler 선택
sampler = TPESampler(
n_startup_trials=20, # 초기 랜덤 탐색 횟수
n_ei_candidates=24, # EI 후보 수
multivariate=True, # 다변량 TPE
seed=42
)
# Pruner 설정
pruner = MedianPruner(
n_startup_trials=5, # 가지치기 시작 전 최소 Trial 수
n_warmup_steps=10, # 가지치기 전 warm-up 스텝
interval_steps=1
)
study = optuna.create_study(
direction='maximize', # AUC 최대화
sampler=sampler,
pruner=pruner,
study_name='lgbm_optimization',
# storage='sqlite:///optuna.db', # 결과 영구 저장
# load_if_exists=True, # 기존 Study 재개
)
# 최적화 실행
study.optimize(
lambda trial: objective_lgbm(trial, X, y),
n_trials=n_trials,
n_jobs=n_jobs, # 병렬 실행 (주의: DB storage 필요)
show_progress_bar=True,
callbacks=[
lambda study, trial: print(
f"Trial {trial.number}: {trial.value:.4f} "
f"(Best: {study.best_value:.4f})"
) if trial.value is not None else None
],
)
print(f"\n최적 파라미터:")
for key, value in study.best_params.items():
print(f" {key}: {value}")
print(f"최적 AUC: {study.best_value:.4f}")
print(f"완료된 Trial: {len(study.trials)}")
print(f"가지치기된 Trial: {len([t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED])}")
return study
# Optuna 시각화
def visualize_optuna_study(study):
"""Optuna 최적화 결과 시각화"""
import optuna.visualization as vis
# 최적화 히스토리
fig1 = vis.plot_optimization_history(study)
fig1.show()
# 파라미터 중요도
fig2 = vis.plot_param_importances(study)
fig2.show()
# 파라미터 관계
fig3 = vis.plot_parallel_coordinate(study)
fig3.show()
# 파라미터 슬라이스
fig4 = vis.plot_slice(study)
fig4.show()
# 등고선 플롯
fig5 = vis.plot_contour(study, params=['learning_rate', 'num_leaves'])
fig5.show()
PyTorch + Optuna 완전한 예제
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
def create_model(trial, input_dim):
"""Optuna Trial에서 신경망 아키텍처 동적 생성"""
n_layers = trial.suggest_int('n_layers', 1, 4)
dropout = trial.suggest_float('dropout', 0.1, 0.5)
activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'elu'])
activation_map = {
'relu': nn.ReLU(),
'tanh': nn.Tanh(),
'elu': nn.ELU()
}
layers = []
in_features = input_dim
for i in range(n_layers):
out_features = trial.suggest_int(f'n_units_l{i}', 32, 512)
layers.extend([
nn.Linear(in_features, out_features),
nn.BatchNorm1d(out_features),
activation_map[activation_name],
nn.Dropout(dropout),
])
in_features = out_features
layers.append(nn.Linear(in_features, 1))
layers.append(nn.Sigmoid())
return nn.Sequential(*layers)
def objective_pytorch(trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim):
"""PyTorch 신경망 Optuna 목적 함수"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = create_model(trial, input_dim).to(device)
# 최적화 파라미터
lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)
if optimizer_name == 'Adam':
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
elif optimizer_name == 'RMSprop':
optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
else:
optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay,
momentum=0.9)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
criterion = nn.BCELoss()
train_dataset = TensorDataset(
X_train_t.to(device), y_train_t.to(device).float().unsqueeze(1)
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
best_val_loss = float('inf')
patience = 10
patience_counter = 0
for epoch in range(100):
model.train()
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
output = model(X_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
scheduler.step()
# 검증
model.eval()
with torch.no_grad():
val_output = model(X_val_t.to(device))
val_loss = criterion(
val_output, y_val_t.to(device).float().unsqueeze(1)
).item()
# 조기 가지치기 보고
trial.report(val_loss, epoch)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
# Early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
break
return best_val_loss
def run_pytorch_optuna(X_train, y_train, X_val, y_val, n_trials=50):
"""PyTorch + Optuna 통합 실행"""
X_train_t = torch.FloatTensor(X_train.values if hasattr(X_train, 'values') else X_train)
y_train_t = torch.FloatTensor(y_train.values if hasattr(y_train, 'values') else y_train)
X_val_t = torch.FloatTensor(X_val.values if hasattr(X_val, 'values') else X_val)
y_val_t = torch.FloatTensor(y_val.values if hasattr(y_val, 'values') else y_val)
input_dim = X_train_t.shape[1]
study = optuna.create_study(
direction='minimize',
pruner=optuna.pruners.HyperbandPruner(
min_resource=5, max_resource=100, reduction_factor=3
),
sampler=TPESampler(seed=42)
)
study.optimize(
lambda trial: objective_pytorch(
trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim
),
n_trials=n_trials,
show_progress_bar=True,
)
print(f"최적 검증 손실: {study.best_value:.4f}")
print(f"최적 파라미터: {study.best_params}")
return study
CMA-ES Sampler
# 연속 공간에서 더 효율적인 CMA-ES
study_cmaes = optuna.create_study(
direction='maximize',
sampler=CmaEsSampler(
n_startup_trials=10, # 초기 랜덤 탐색 수
restart_strategy='ipop', # 재시작 전략
seed=42
)
)
4. Ray Tune
Ray Tune과 분산 HPO
Ray Tune은 Anyscale이 개발한 분산 하이퍼파라미터 최적화 라이브러리입니다. 여러 GPU/노드에 걸쳐 병렬 학습을 자동으로 처리합니다.
pip install ray[tune] ray[air]
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, HyperBandScheduler
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.bayesopt import BayesOptSearch
import torch
import torch.nn as nn
ray.init(ignore_reinit_error=True)
def train_with_tune(config, data=None):
"""Ray Tune 학습 함수"""
X_train, y_train, X_val, y_val = data
# 모델 생성
model = nn.Sequential(
nn.Linear(X_train.shape[1], config['hidden_size']),
nn.ReLU(),
nn.Dropout(config['dropout']),
nn.Linear(config['hidden_size'], config['hidden_size'] // 2),
nn.ReLU(),
nn.Linear(config['hidden_size'] // 2, 1),
nn.Sigmoid()
)
optimizer = torch.optim.Adam(
model.parameters(),
lr=config['lr'],
weight_decay=config['weight_decay']
)
criterion = nn.BCELoss()
X_train_t = torch.FloatTensor(X_train)
y_train_t = torch.FloatTensor(y_train).unsqueeze(1)
X_val_t = torch.FloatTensor(X_val)
y_val_t = torch.FloatTensor(y_val).unsqueeze(1)
for epoch in range(config['max_epochs']):
model.train()
optimizer.zero_grad()
output = model(X_train_t)
loss = criterion(output, y_train_t)
loss.backward()
optimizer.step()
if epoch % 5 == 0:
model.eval()
with torch.no_grad():
val_output = model(X_val_t)
val_loss = criterion(val_output, y_val_t).item()
# Ray Tune에 중간 결과 보고
tune.report(
val_loss=val_loss,
training_iteration=epoch,
)
def run_ray_tune(X_train, y_train, X_val, y_val, num_samples=50):
"""Ray Tune 분산 HPO 실행"""
config = {
'hidden_size': tune.choice([64, 128, 256, 512]),
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-5, 1e-1),
'weight_decay': tune.loguniform(1e-8, 1e-3),
'max_epochs': tune.choice([50, 100, 200]),
}
# ASHA (Asynchronous Successive Halving Algorithm) 스케줄러
scheduler = ASHAScheduler(
metric='val_loss',
mode='min',
max_t=200, # 최대 에폭
grace_period=10, # 최소 실행 에폭
reduction_factor=3, # 제거 비율
)
reporter = CLIReporter(
metric_columns=['val_loss', 'training_iteration'],
max_progress_rows=10
)
# OptunaSearch 통합
search_alg = OptunaSearch(metric='val_loss', mode='min')
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config=config,
num_samples=num_samples,
scheduler=scheduler,
search_alg=search_alg,
progress_reporter=reporter,
verbose=1,
resources_per_trial={'cpu': 2, 'gpu': 0},
)
best_trial = result.get_best_trial('val_loss', 'min', 'last')
print(f"최적 검증 손실: {best_trial.last_result['val_loss']:.4f}")
print(f"최적 파라미터: {best_trial.config}")
return result
# Population Based Training (PBT)
from ray.tune.schedulers import PopulationBasedTraining
def run_pbt(X_train, y_train, X_val, y_val):
"""PBT: 학습 중 하이퍼파라미터를 동적으로 변경"""
pbt_scheduler = PopulationBasedTraining(
time_attr='training_iteration',
metric='val_loss',
mode='min',
perturbation_interval=20, # N 스텝마다 변이
hyperparam_mutations={
'lr': tune.loguniform(1e-5, 1e-1),
'dropout': tune.uniform(0.1, 0.5),
},
quantile_fraction=0.25, # 상위 25%의 파라미터로 하위 25% 교체
)
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config={
'hidden_size': 256,
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-4, 1e-1),
'weight_decay': 1e-5,
'max_epochs': 200,
},
num_samples=8,
scheduler=pbt_scheduler,
verbose=1,
)
return result
5. AutoGluon
AutoGluon 개요
AutoGluon은 Amazon이 개발한 오픈소스 AutoML 라이브러리로, 최소한의 코드로 Kaggle 수준의 성능을 달성합니다.
pip install autogluon
표 형식 데이터 (TabularPredictor)
from autogluon.tabular import TabularPredictor
import pandas as pd
def autogluon_tabular_example(train_df, test_df, target_col, eval_metric='roc_auc'):
"""AutoGluon 표 형식 데이터 학습"""
# 기본 사용법 (3줄로 학습!)
predictor = TabularPredictor(
label=target_col,
eval_metric=eval_metric,
path='autogluon_models/', # 모델 저장 경로
problem_type='binary', # 'binary', 'multiclass', 'regression', 'softclass'
)
predictor.fit(
train_data=train_df,
time_limit=3600, # 최대 학습 시간 (초)
presets='best_quality', # 품질 프리셋
# 'best_quality': 최고 성능 (느림)
# 'good_quality': 좋은 성능/속도 균형
# 'medium_quality': 빠른 학습
# 'optimize_for_deployment': 빠른 예측에 최적화
excluded_model_types=['KNN'], # 제외할 모델 타입
verbosity=2,
)
# 리더보드 출력
leaderboard = predictor.leaderboard(test_df, silent=True)
print(leaderboard[['model', 'score_test', 'score_val', 'pred_time_test']].head(10))
# 예측
predictions = predictor.predict(test_df)
pred_proba = predictor.predict_proba(test_df)
# 특성 중요도
feature_importance = predictor.feature_importance(test_df)
print(feature_importance.head(20))
return predictor, predictions, pred_proba
# 고급 설정: 하이퍼파라미터 커스터마이징
def autogluon_advanced(train_df, test_df, target_col):
"""AutoGluon 고급 설정"""
import lightgbm as lgb
import xgboost as xgb
hyperparameters = {
'GBM': [
{'num_boost_round': 300, 'ag_args': {'name_suffix': 'fast'}},
{'num_boost_round': 1000, 'learning_rate': 0.03,
'ag_args': {'name_suffix': 'slow', 'priority': 0}},
],
'XGB': [
{'n_estimators': 300, 'max_depth': 6},
],
'CAT': [
{'iterations': 500, 'depth': 6},
],
'NN_TORCH': [
{'num_epochs': 50, 'learning_rate': 1e-3,
'dropout_prob': 0.1},
],
'RF': [
{'n_estimators': 300},
],
}
predictor = TabularPredictor(
label=target_col,
eval_metric='roc_auc',
path='autogluon_advanced/',
)
predictor.fit(
train_data=train_df,
hyperparameters=hyperparameters,
time_limit=7200,
num_stack_levels=1, # 스태킹 레벨 수
num_bag_folds=5, # 배깅 폴드 수 (0이면 배깅 없음)
num_bag_sets=1, # 배깅 세트 수
verbosity=3,
)
return predictor
이미지 분류
from autogluon.multimodal import MultiModalPredictor
def autogluon_image_classification(train_df, test_df, label_col, image_col):
"""AutoGluon 이미지 분류"""
predictor = MultiModalPredictor(label=label_col)
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.timm_image.checkpoint_name': 'efficientnet_b4',
'optimization.learning_rate': 1e-4,
'optimization.max_epochs': 20,
}
)
predictions = predictor.predict(test_df)
return predictor, predictions
# 텍스트 + 표 형식 멀티모달
def autogluon_multimodal(train_df, test_df, target_col):
"""AutoGluon 멀티모달 학습 (텍스트 + 수치형 특성)"""
predictor = MultiModalPredictor(
label=target_col,
problem_type='binary',
)
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.hf_text.checkpoint_name': 'bert-base-uncased',
}
)
return predictor
6. FLAML
Microsoft FLAML
FLAML(Fast and Lightweight AutoML)은 Microsoft Research에서 개발한 AutoML 라이브러리로, 비용 효율적인 자동화에 특화되어 있습니다.
pip install flaml
from flaml import AutoML
import pandas as pd
import numpy as np
def flaml_basic_example(X_train, y_train, X_test, task='classification'):
"""FLAML 기본 사용"""
automl = AutoML()
automl_settings = {
'time_budget': 300, # 최대 학습 시간 (초)
'metric': 'roc_auc', # 최적화 지표
'task': task, # 'classification', 'regression', 'ranking'
'estimator_list': [ # 시도할 모델 목록
'lgbm', 'xgboost', 'catboost',
'rf', 'extra_tree', 'lrl1', 'lrl2',
'kneighbor', 'prophet', 'arima'
],
'log_file_name': 'flaml_log.log',
'seed': 42,
'n_jobs': -1,
'verbose': 1,
# 비용 효율적 탐색 설정
'retrain_full': True, # 최종 모델을 전체 데이터로 재학습
'max_iter': 100, # 최대 반복 수
'ensemble': True, # 앙상블 사용 여부
'eval_method': 'cv', # 'cv' 또는 'holdout'
'n_splits': 5, # CV 폴드 수
}
automl.fit(X_train, y_train, **automl_settings)
print(f"최적 모델: {automl.best_estimator}")
print(f"최적 손실: {automl.best_loss:.4f}")
print(f"최적 설정: {automl.best_config}")
print(f"총 학습 시간: {automl.time_to_find_best_model:.1f}초")
predictions = automl.predict(X_test)
pred_proba = automl.predict_proba(X_test)
return automl, predictions, pred_proba
# FLAML + scikit-learn 파이프라인
def flaml_sklearn_pipeline(X_train, y_train, X_test):
"""FLAML을 scikit-learn 파이프라인에 통합"""
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
automl = AutoML()
pipeline = Pipeline([
('scaler', StandardScaler()),
('automl', automl),
])
pipeline.fit(
X_train, y_train,
automl__time_budget=120,
automl__metric='roc_auc',
automl__task='classification',
)
return pipeline
# FLAML 커스텀 목적 함수
def flaml_custom_objective(X_train, y_train):
"""FLAML 커스텀 평가 지표"""
from flaml.automl.data import get_output_from_log
def custom_metric(
X_val, y_val, estimator, labels, X_train, y_train,
weight_val=None, weight_train=None, *args
):
"""F-beta 점수를 최적화하는 커스텀 지표"""
from sklearn.metrics import fbeta_score
y_pred = estimator.predict(X_val)
score = fbeta_score(y_val, y_pred, beta=2, average='weighted')
return -score, {'f2_score': score} # (손실, 메트릭 딕셔너리)
automl = AutoML()
automl.fit(
X_train, y_train,
metric=custom_metric,
task='classification',
time_budget=120,
)
return automl
7. H2O AutoML
H2O 클러스터
H2O AutoML은 엔터프라이즈 수준의 AutoML 플랫폼으로, 다양한 알고리즘과 해석 가능성 도구를 제공합니다.
pip install h2o
import h2o
from h2o.automl import H2OAutoML
import pandas as pd
def h2o_automl_example(train_df, test_df, target_col, max_models=20):
"""H2O AutoML 사용"""
# H2O 클러스터 시작
h2o.init(
nthreads=-1, # 모든 CPU 사용
max_mem_size='8G', # 최대 메모리
port=54321,
)
# H2O Frame으로 변환
train_h2o = h2o.H2OFrame(train_df)
test_h2o = h2o.H2OFrame(test_df)
# 타겟 컬럼을 카테고리형으로 변환 (분류)
train_h2o[target_col] = train_h2o[target_col].asfactor()
feature_cols = [col for col in train_df.columns if col != target_col]
# AutoML 실행
aml = H2OAutoML(
max_models=max_models, # 시도할 최대 모델 수
max_runtime_secs=3600, # 최대 실행 시간
seed=42,
sort_metric='AUC', # 정렬 기준 지표
balance_classes=False, # 클래스 불균형 처리
# 포함/제외할 알고리즘
include_algos=[
'GBM', 'GLM', 'DRF', 'DeepLearning',
'StackedEnsemble', 'XGBoost'
],
# exclude_algos=['DeepLearning'],
# 스태킹 설정
keep_cross_validation_predictions=True,
keep_cross_validation_models=True,
nfolds=5,
verbosity='info',
)
aml.fit(
x=feature_cols,
y=target_col,
training_frame=train_h2o,
validation_frame=None, # None이면 CV 사용
leaderboard_frame=test_h2o,
)
# 리더보드 출력
lb = aml.leaderboard
print("H2O AutoML 리더보드:")
print(lb.head(20))
# 최적 모델
best_model = aml.leader
print(f"\n최적 모델: {best_model.model_id}")
# 예측
predictions = best_model.predict(test_h2o)
pred_df = predictions.as_data_frame()
# 모델 설명
explainability = aml.explain(test_h2o)
# SHAP 값 (지원하는 모델)
if hasattr(best_model, 'shap_values'):
shap_vals = best_model.shap_values(test_h2o)
# 모델 저장
model_path = h2o.save_model(model=best_model, path='h2o_models/', force=True)
print(f"모델 저장됨: {model_path}")
return aml, best_model, pred_df
# H2O 종료
def cleanup_h2o():
h2o.cluster().shutdown()
8. 신경망 구조 탐색 (NAS)
NAS 개요
Neural Architecture Search(NAS)는 최적의 신경망 아키텍처를 자동으로 탐색하는 기술입니다.
NAS의 세 가지 구성요소:
- 탐색 공간 (Search Space): 가능한 아키텍처의 범위
- 탐색 전략 (Search Strategy): 공간 탐색 방법 (랜덤, 진화, RL, 그래디언트)
- 성능 평가 (Performance Estimation): 아키텍처 평가 방법
DARTS (미분 가능한 NAS)
DARTS(Differentiable Architecture Search, Liu et al. 2019)는 아키텍처 탐색을 연속 완화(continuous relaxation)를 통해 미분 가능하게 만듭니다.
import torch
import torch.nn as nn
import torch.nn.functional as F
class MixedOperation(nn.Module):
"""DARTS의 혼합 연산 (가중 합)"""
def __init__(self, operations):
super().__init__()
self.ops = nn.ModuleList(operations)
# 아키텍처 파라미터 (alpha)
self.alphas = nn.Parameter(torch.randn(len(operations)))
def forward(self, x):
weights = F.softmax(self.alphas, dim=0)
return sum(w * op(x) for w, op in zip(weights, self.ops))
class DARTSCell(nn.Module):
"""DARTS 셀 구조"""
def __init__(self, in_channels, out_channels):
super().__init__()
# 사용 가능한 연산 정의
operations = [
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.Conv2d(in_channels, out_channels, 5, padding=2),
nn.MaxPool2d(3, stride=1, padding=1),
nn.AvgPool2d(3, stride=1, padding=1),
nn.Identity() if in_channels == out_channels else
nn.Conv2d(in_channels, out_channels, 1),
]
self.mixed_op = MixedOperation(operations)
self.bn = nn.BatchNorm2d(out_channels)
def forward(self, x):
return F.relu(self.bn(self.mixed_op(x)))
class SimpleDARTS(nn.Module):
"""간단한 DARTS 아키텍처"""
def __init__(self, num_classes=10, num_cells=6):
super().__init__()
self.stem = nn.Conv2d(3, 64, 3, padding=1)
self.cells = nn.ModuleList([
DARTSCell(64, 64) for _ in range(num_cells)
])
self.classifier = nn.Linear(64, num_classes)
def forward(self, x):
x = self.stem(x)
for cell in self.cells:
x = cell(x)
x = x.mean([2, 3]) # Global Average Pooling
return self.classifier(x)
def arch_parameters(self):
"""아키텍처 파라미터만 반환"""
return [p for n, p in self.named_parameters() if 'alphas' in n]
def model_parameters(self):
"""가중치 파라미터만 반환"""
return [p for n, p in self.named_parameters() if 'alphas' not in n]
def train_darts(model, train_loader, val_loader, epochs=50):
"""DARTS 이중 최적화 학습"""
# 가중치 최적화기
w_optimizer = torch.optim.SGD(
model.model_parameters(), lr=0.025, momentum=0.9, weight_decay=3e-4
)
# 아키텍처 파라미터 최적화기
a_optimizer = torch.optim.Adam(
model.arch_parameters(), lr=3e-4, betas=(0.5, 0.999), weight_decay=1e-3
)
w_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
w_optimizer, T_max=epochs
)
for epoch in range(epochs):
model.train()
train_iter = iter(train_loader)
val_iter = iter(val_loader)
for step in range(min(len(train_loader), len(val_loader))):
# 1. 아키텍처 파라미터 업데이트 (검증 데이터)
try:
X_val, y_val = next(val_iter)
except StopIteration:
val_iter = iter(val_loader)
X_val, y_val = next(val_iter)
a_optimizer.zero_grad()
val_logits = model(X_val)
val_loss = F.cross_entropy(val_logits, y_val)
val_loss.backward()
a_optimizer.step()
# 2. 가중치 파라미터 업데이트 (훈련 데이터)
X_train, y_train = next(train_iter)
w_optimizer.zero_grad()
train_logits = model(X_train)
train_loss = F.cross_entropy(train_logits, y_train)
train_loss.backward()
nn.utils.clip_grad_norm_(model.model_parameters(), 5.0)
w_optimizer.step()
w_scheduler.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}")
# 최적 아키텍처 추출
for i, cell in enumerate(model.cells):
weights = F.softmax(cell.mixed_op.alphas, dim=0).detach()
best_op = weights.argmax().item()
print(f"Cell {i}: 최적 연산 인덱스 = {best_op}, 가중치 = {weights}")
return model
One-Shot NAS
class SuperNetwork(nn.Module):
"""One-Shot NAS: 단일 슈퍼네트워크에서 서브네트워크 샘플링"""
def __init__(self, num_classes=10, max_channels=256):
super().__init__()
self.max_channels = max_channels
# 가능한 채널 수 옵션
self.channel_options = [64, 128, 256]
# 전체 파라미터로 레이어 정의
self.conv1 = nn.Conv2d(3, max_channels, 3, padding=1)
self.conv2 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.conv3 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(max_channels)
self.bn2 = nn.BatchNorm2d(max_channels)
self.bn3 = nn.BatchNorm2d(max_channels)
self.classifier = nn.Linear(max_channels, num_classes)
def forward(self, x, arch_config=None):
"""arch_config: 각 레이어의 채널 수"""
if arch_config is None:
# 무작위 서브네트워크 샘플링
arch_config = {
'conv1_out': torch.randint(0, len(self.channel_options), (1,)).item(),
'conv2_out': torch.randint(0, len(self.channel_options), (1,)).item(),
}
# 슬라이스된 채널로 순전파
c1 = self.channel_options[arch_config['conv1_out']]
c2 = self.channel_options[arch_config['conv2_out']]
x = F.relu(self.bn1(self.conv1(x)[:, :c1]))
x = F.relu(self.bn2(self.conv2(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c1))
)[:, :c2]))
x = F.relu(self.bn3(self.conv3(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c2))
)))
x = x.mean([2, 3])
return self.classifier(x)
9. 파이프라인 자동화
Auto-sklearn
pip install auto-sklearn
import autosklearn.classification
import autosklearn.regression
from autosklearn.metrics import roc_auc, mean_squared_error
def auto_sklearn_example(X_train, y_train, X_test, task='classification'):
"""Auto-sklearn: scikit-learn 기반 AutoML"""
if task == 'classification':
automl = autosklearn.classification.AutoSklearnClassifier(
time_left_for_this_task=3600, # 전체 시간 제한 (초)
per_run_time_limit=360, # 단일 모델 시간 제한
n_jobs=-1,
memory_limit=8192, # 메모리 제한 (MB)
ensemble_size=50, # 앙상블 크기
ensemble_nbest=50, # 앙상블에 사용할 최적 모델 수
max_models_on_disc=50,
# 포함/제외 알고리즘
include={
'classifier': [
'random_forest', 'gradient_boosting',
'extra_trees', 'liblinear_svc'
]
},
# exclude={'classifier': ['k_nearest_neighbors']},
# 메트릭
metric=roc_auc,
resampling_strategy='cv',
resampling_strategy_arguments={'folds': 5},
seed=42,
)
else:
automl = autosklearn.regression.AutoSklearnRegressor(
time_left_for_this_task=3600,
per_run_time_limit=360,
n_jobs=-1,
metric=mean_squared_error,
seed=42,
)
automl.fit(X_train, y_train)
# 통계 출력
print(automl.sprint_statistics())
print(automl.leaderboard())
predictions = automl.predict(X_test)
if task == 'classification':
pred_proba = automl.predict_proba(X_test)
return automl, predictions
10. LLM 시대의 AutoML
LLM을 AutoML에 활용
대규모 언어 모델(LLM)은 AutoML에 새로운 가능성을 열고 있습니다:
- 하이퍼파라미터 제안: LLM이 데이터셋 특성에 맞는 초기 하이퍼파라미터 추천
- 특성 엔지니어링: LLM이 도메인 지식을 활용하여 새로운 특성 생성 아이디어 제안
- 코드 생성: 전처리, 모델 학습 코드 자동 생성
- 에러 디버깅: 학습 실패 원인 분석 및 해결책 제안
# LLM 기반 하이퍼파라미터 최적화 (개념 코드)
from openai import OpenAI
def llm_hyperparameter_suggestion(dataset_description, model_type, previous_results=None):
"""LLM을 활용한 하이퍼파라미터 제안"""
client = OpenAI()
prompt = f"""
데이터셋 특성:
{dataset_description}
모델 타입: {model_type}
이전 시도 결과:
{previous_results if previous_results else '없음'}
위 정보를 바탕으로 {model_type}의 최적 하이퍼파라미터를 JSON 형식으로 제안해주세요.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system",
"content": "당신은 머신러닝 전문가입니다. 하이퍼파라미터 최적화를 도와주세요."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
return response.choices[0].message.content
# Few-shot learning을 활용한 AutoML
def llm_feature_engineering(df_description, target_description):
"""LLM이 제안하는 피처 엔지니어링"""
client = OpenAI()
prompt = f"""
데이터프레임 컬럼:
{df_description}
예측 목표:
{target_description}
유용할 수 있는 파생 특성과 해당 Python 코드를 제안해주세요.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system",
"content": "머신러닝 특성 엔지니어링 전문가로서 유용한 파생 특성을 제안하세요."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
# AutoML 에이전트 (실험적)
class AutoMLAgent:
"""LLM 기반 AutoML 에이전트 (개념 구현)"""
def __init__(self, llm_client, X_train, y_train, X_val, y_val, max_iterations=10):
self.client = llm_client
self.X_train = X_train
self.y_train = y_train
self.X_val = X_val
self.y_val = y_val
self.max_iterations = max_iterations
self.history = []
self.best_score = 0
self.best_params = None
def get_next_config(self):
"""LLM에게 다음 시도할 설정 요청"""
history_str = "\n".join([
f"Iteration {i+1}: params={h['params']}, score={h['score']:.4f}"
for i, h in enumerate(self.history[-5:]) # 최근 5개
])
prompt = f"""
현재까지 시도한 LightGBM 파라미터와 결과:
{history_str if history_str else '없음 (첫 번째 시도)'}
다음에 시도할 파라미터 조합을 JSON으로 제안하세요.
가능한 범위: num_leaves(10-300), learning_rate(0.001-0.3),
n_estimators(100-2000), subsample(0.5-1.0), colsample_bytree(0.5-1.0)
"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "당신은 HPO 전문가입니다."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
def evaluate(self, params):
"""파라미터 평가"""
import lightgbm as lgb
from sklearn.metrics import roc_auc_score
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
model.fit(self.X_train, self.y_train)
preds = model.predict_proba(self.X_val)[:, 1]
return roc_auc_score(self.y_val, preds)
def run(self):
"""에이전트 실행"""
for i in range(self.max_iterations):
config = self.get_next_config()
score = self.evaluate(config)
self.history.append({'params': config, 'score': score})
if score > self.best_score:
self.best_score = score
self.best_params = config
print(f"Iteration {i+1}: 새로운 최적 점수 {score:.4f}")
print(f"\n최적 점수: {self.best_score:.4f}")
print(f"최적 파라미터: {self.best_params}")
return self.best_params
마무리
이 가이드에서 AutoML의 전체 생태계를 다루었습니다:
- 하이퍼파라미터 최적화: 그리드 서치에서 베이지안 최적화까지 체계적 이해
- Optuna: 가장 유연한 Python-native HPO 프레임워크
- Ray Tune: 분산 환경에서의 대규모 HPO
- AutoGluon: Amazon의 강력한 멀티모달 AutoML
- FLAML: Microsoft의 비용 효율적 AutoML
- H2O AutoML: 기업용 AutoML 플랫폼
- NAS: 최적 신경망 아키텍처 자동 탐색
- LLM + AutoML: 차세대 지능형 AutoML
핵심 권장 사항:
- 시간 제약이 있다면 FLAML 또는 AutoGluon (good_quality 프리셋) 사용
- 특정 모델을 최적화하려면 Optuna 사용
- 분산 환경이나 대규모 실험에는 Ray Tune 사용
- 기업 환경에서는 H2O AutoML의 해석 가능성 도구 활용
- LLM 기반 AutoML은 아직 연구 단계지만 주목해야 할 방향
AutoML은 도구이지 마법이 아닙니다. 도메인 지식, 데이터 품질, 올바른 평가 프레임워크가 여전히 가장 중요합니다.
참고자료
- Optuna 공식 문서
- AutoGluon 공식 문서
- FLAML 공식 문서
- H2O AutoML
- Ray Tune 문서
- DARTS: Differentiable Architecture Search
- Bergstra, J., & Bengio, Y. (2012). Random search for hyper-parameter optimization.
- Feurer, M., et al. (2015). Efficient and Robust Automated Machine Learning (Auto-sklearn).
- He, X., et al. (2021). AutoML: A Survey of the State-of-the-Art.
AutoML Complete Guide: Automated ML Pipelines with AutoGluon, FLAML, and Optuna
1. AutoML Overview
What is AutoML?
AutoML (Automated Machine Learning) automates various stages of the machine learning pipeline. Tasks that data scientists previously performed manually — data preprocessing, feature engineering, model selection, hyperparameter optimization, and ensembling — are handled automatically by algorithms.
What AutoML Automates:
-
Data Preprocessing Automation
- Missing value imputation strategy selection
- Scaling and normalization method selection
- Outlier handling
-
Feature Engineering Automation
- Feature transformations (log, square, interactions)
- Categorical encoding method selection
- Feature selection and generation
-
Model Selection (Algorithm Selection)
- Searching over diverse algorithms
- Meta-learning (leveraging experience from prior tasks)
-
Hyperparameter Optimization (HPO)
- Grid/random search
- Bayesian optimization
- Evolutionary algorithms
-
Ensemble Automation
- Searching for the optimal ensemble configuration
- Automated stacking and blending
-
Neural Architecture Search (NAS)
- Automated design of optimal neural network architectures
AutoML Application Domains
Industry Applications:
- Finance: Credit risk models, automated fraud detection
- Healthcare: Rapid prototyping of diagnostic support systems
- Retail: Automated demand forecasting model refresh
- Manufacturing: Quality control model automation
Major Open-Source AutoML Tools:
| Tool | Developer | Strengths |
|---|---|---|
| AutoGluon | Amazon | Multimodal, tabular, image, text |
| FLAML | Microsoft | Cost-efficient, fast |
| Optuna | Preferred Networks | HPO, visualization |
| H2O AutoML | H2O.ai | Enterprise, interpretable |
| Auto-sklearn | AutoML Group | scikit-learn compatible |
| Ray Tune | Anyscale | Distributed HPO |
| NNI | Microsoft | NAS, HPO |
Pros and Cons of AutoML
Pros:
- Enables non-experts to build high-quality models
- Saves time by automating repetitive experiments
- Discovers hyperparameter combinations humans might miss
- Provides reproducible pipelines
Cons:
- Computational costs can be very high
- Limited ability to incorporate domain knowledge
- Black-box nature (internal workings difficult to understand)
- Custom solutions are more effective for specialized problems
- Risk of data leakage
2. Hyperparameter Optimization (HPO)
Grid Search
The simplest HPO method — exhaustively tries every combination in the search space.
from sklearn.model_selection import GridSearchCV
import xgboost as xgb
def grid_search_example(X_train, y_train):
"""Grid Search: exhaustive (only practical for small search spaces)"""
param_grid = {
'max_depth': [3, 5, 7],
'learning_rate': [0.01, 0.1, 0.3],
'n_estimators': [100, 300, 500],
'subsample': [0.7, 0.9],
}
# Total combinations: 3 * 3 * 3 * 2 = 54 * CV folds
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
grid_search = GridSearchCV(
model, param_grid,
cv=5, scoring='roc_auc', n_jobs=-1, verbose=1, refit=True
)
grid_search.fit(X_train, y_train)
print(f"Best params: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
results = pd.DataFrame(grid_search.cv_results_)
print(results.sort_values('mean_test_score', ascending=False)[
['params', 'mean_test_score', 'std_test_score']
].head(10))
return grid_search.best_estimator_
Random Search
Proposed by Bergstra & Bengio (2012) — samples randomly from parameter distributions, which is often far more efficient than grid search.
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint, loguniform
def random_search_example(X_train, y_train, n_iter=100):
"""Random Search: sample from continuous distributions"""
param_distributions = {
'max_depth': randint(3, 10),
'learning_rate': loguniform(1e-3, 0.5), # log-uniform over [0.001, 0.5]
'n_estimators': randint(100, 1000),
'subsample': uniform(0.6, 0.4), # uniform over [0.6, 1.0]
'colsample_bytree': uniform(0.6, 0.4),
'reg_alpha': loguniform(1e-4, 10),
'reg_lambda': loguniform(1e-4, 10),
'min_child_weight': randint(1, 10),
'gamma': uniform(0, 0.5),
}
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
random_search = RandomizedSearchCV(
model, param_distributions,
n_iter=n_iter, cv=5, scoring='roc_auc',
n_jobs=-1, verbose=1, random_state=42, refit=True
)
random_search.fit(X_train, y_train)
print(f"Best params: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")
return random_search.best_estimator_
Bayesian Optimization
Bayesian optimization uses the results of previous evaluations to intelligently select the next point to evaluate.
Core Components:
- Surrogate Model: Probabilistic approximation of the objective function (typically Gaussian Process)
- Acquisition Function: Determines the next evaluation point
- EI (Expected Improvement): Expected improvement over the current best
- UCB (Upper Confidence Bound): Balance between exploration and exploitation
- PI (Probability of Improvement): Probability of improving over the current best
TPE (Tree-structured Parzen Estimator):
- Default algorithm used by Optuna
- Models two density functions l(x) and g(x) instead of p(x|y)
- l(x) models the distribution of parameters that led to good results (top gamma%)
- g(x) models the rest
- The next point maximizes the l(x)/g(x) ratio
3. Optuna
Core Concepts
Optuna, developed by Preferred Networks, is a Python-native HPO framework known for its simplicity and flexibility.
Key concepts:
- Study: The entire optimization experiment (a collection of Trials)
- Trial: A single hyperparameter configuration attempt
- Objective Function: The function to optimize (minimize or maximize)
- Sampler: The parameter suggestion algorithm (TPE, CMA-ES, Random, etc.)
- Pruner: Early termination of unpromising Trials
pip install optuna optuna-dashboard
import optuna
from optuna.samplers import TPESampler, CmaEsSampler, RandomSampler
from optuna.pruners import MedianPruner, HyperbandPruner
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import numpy as np
optuna.logging.set_verbosity(optuna.logging.WARNING)
def objective_lgbm(trial, X, y):
"""Optuna objective function for LightGBM optimization"""
params = {
'objective': 'binary',
'metric': 'auc',
'verbosity': -1,
'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
'num_leaves': trial.suggest_int('num_leaves', 20, 300),
'max_depth': trial.suggest_int('max_depth', 3, 12),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),
'n_estimators': trial.suggest_int('n_estimators', 100, 2000),
'subsample': trial.suggest_float('subsample', 0.5, 1.0),
'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
'min_split_gain': trial.suggest_float('min_split_gain', 0, 1),
'n_jobs': -1,
}
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
train_data = lgb.Dataset(X_train, y_train)
val_data = lgb.Dataset(X_val, y_val, reference=train_data)
model = lgb.train(
params, train_data,
num_boost_round=params['n_estimators'],
valid_sets=[val_data],
callbacks=[
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(-1),
],
)
preds = model.predict(X_val)
fold_score = roc_auc_score(y_val, preds)
cv_scores.append(fold_score)
# Report intermediate results for pruning
trial.report(fold_score, fold)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
return np.mean(cv_scores)
def run_optuna_study(X, y, n_trials=100, n_jobs=1):
"""Run an Optuna study with TPE sampler and median pruning"""
sampler = TPESampler(
n_startup_trials=20,
n_ei_candidates=24,
multivariate=True,
seed=42
)
pruner = MedianPruner(
n_startup_trials=5,
n_warmup_steps=10,
interval_steps=1
)
study = optuna.create_study(
direction='maximize',
sampler=sampler,
pruner=pruner,
study_name='lgbm_optimization',
# storage='sqlite:///optuna.db', # persist results
# load_if_exists=True, # resume existing study
)
study.optimize(
lambda trial: objective_lgbm(trial, X, y),
n_trials=n_trials,
n_jobs=n_jobs,
show_progress_bar=True,
)
print(f"\nBest params:")
for key, value in study.best_params.items():
print(f" {key}: {value}")
print(f"Best AUC: {study.best_value:.4f}")
print(f"Completed trials: {len(study.trials)}")
pruned = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]
print(f"Pruned trials: {len(pruned)}")
return study
def visualize_optuna_study(study):
"""Visualize Optuna optimization results"""
import optuna.visualization as vis
vis.plot_optimization_history(study).show()
vis.plot_param_importances(study).show()
vis.plot_parallel_coordinate(study).show()
vis.plot_slice(study).show()
vis.plot_contour(study, params=['learning_rate', 'num_leaves']).show()
Complete PyTorch + Optuna Example
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
def create_model(trial, input_dim):
"""Dynamically build a neural network from Optuna trial parameters"""
n_layers = trial.suggest_int('n_layers', 1, 4)
dropout = trial.suggest_float('dropout', 0.1, 0.5)
activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'elu'])
activation_map = {'relu': nn.ReLU(), 'tanh': nn.Tanh(), 'elu': nn.ELU()}
layers = []
in_features = input_dim
for i in range(n_layers):
out_features = trial.suggest_int(f'n_units_l{i}', 32, 512)
layers.extend([
nn.Linear(in_features, out_features),
nn.BatchNorm1d(out_features),
activation_map[activation_name],
nn.Dropout(dropout),
])
in_features = out_features
layers.extend([nn.Linear(in_features, 1), nn.Sigmoid()])
return nn.Sequential(*layers)
def objective_pytorch(trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim):
"""Optuna objective function for PyTorch neural network"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = create_model(trial, input_dim).to(device)
lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)
if optimizer_name == 'Adam':
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
elif optimizer_name == 'RMSprop':
optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
else:
optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay,
momentum=0.9)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
criterion = nn.BCELoss()
train_dataset = TensorDataset(
X_train_t.to(device),
y_train_t.to(device).float().unsqueeze(1)
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
best_val_loss = float('inf')
patience_counter = 0
patience = 10
for epoch in range(100):
model.train()
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
loss = criterion(model(X_batch), y_batch)
loss.backward()
optimizer.step()
scheduler.step()
model.eval()
with torch.no_grad():
val_loss = criterion(
model(X_val_t.to(device)),
y_val_t.to(device).float().unsqueeze(1)
).item()
trial.report(val_loss, epoch)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
break
return best_val_loss
def run_pytorch_optuna(X_train, y_train, X_val, y_val, n_trials=50):
X_train_t = torch.FloatTensor(X_train.values if hasattr(X_train, 'values') else X_train)
y_train_t = torch.FloatTensor(y_train.values if hasattr(y_train, 'values') else y_train)
X_val_t = torch.FloatTensor(X_val.values if hasattr(X_val, 'values') else X_val)
y_val_t = torch.FloatTensor(y_val.values if hasattr(y_val, 'values') else y_val)
input_dim = X_train_t.shape[1]
study = optuna.create_study(
direction='minimize',
pruner=optuna.pruners.HyperbandPruner(
min_resource=5, max_resource=100, reduction_factor=3
),
sampler=TPESampler(seed=42)
)
study.optimize(
lambda trial: objective_pytorch(
trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim
),
n_trials=n_trials,
show_progress_bar=True,
)
print(f"Best val loss: {study.best_value:.4f}")
print(f"Best params: {study.best_params}")
return study
CMA-ES Sampler
# CMA-ES is more efficient for continuous hyperparameter spaces
study_cmaes = optuna.create_study(
direction='maximize',
sampler=CmaEsSampler(
n_startup_trials=10,
restart_strategy='ipop', # restart strategy for escaping local optima
seed=42
)
)
4. Ray Tune
Distributed HPO with Ray Tune
Ray Tune, developed by Anyscale, handles parallel training across multiple GPUs and nodes automatically.
pip install ray[tune] ray[air]
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining
from ray.tune.search.optuna import OptunaSearch
import torch
import torch.nn as nn
import torch.nn.functional as F
ray.init(ignore_reinit_error=True)
def train_with_tune(config, data=None):
"""Training function called by Ray Tune"""
X_train, y_train, X_val, y_val = data
model = nn.Sequential(
nn.Linear(X_train.shape[1], config['hidden_size']),
nn.ReLU(),
nn.Dropout(config['dropout']),
nn.Linear(config['hidden_size'], config['hidden_size'] // 2),
nn.ReLU(),
nn.Linear(config['hidden_size'] // 2, 1),
nn.Sigmoid()
)
optimizer = torch.optim.Adam(
model.parameters(), lr=config['lr'], weight_decay=config['weight_decay']
)
criterion = nn.BCELoss()
X_train_t = torch.FloatTensor(X_train)
y_train_t = torch.FloatTensor(y_train).unsqueeze(1)
X_val_t = torch.FloatTensor(X_val)
y_val_t = torch.FloatTensor(y_val).unsqueeze(1)
for epoch in range(config['max_epochs']):
model.train()
optimizer.zero_grad()
loss = criterion(model(X_train_t), y_train_t)
loss.backward()
optimizer.step()
if epoch % 5 == 0:
model.eval()
with torch.no_grad():
val_loss = criterion(model(X_val_t), y_val_t).item()
tune.report(val_loss=val_loss, training_iteration=epoch)
def run_ray_tune(X_train, y_train, X_val, y_val, num_samples=50):
"""Run distributed HPO with Ray Tune"""
config = {
'hidden_size': tune.choice([64, 128, 256, 512]),
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-5, 1e-1),
'weight_decay': tune.loguniform(1e-8, 1e-3),
'max_epochs': tune.choice([50, 100, 200]),
}
# ASHA: Asynchronous Successive Halving Algorithm
scheduler = ASHAScheduler(
metric='val_loss',
mode='min',
max_t=200, # Max epochs
grace_period=10, # Min epochs before pruning
reduction_factor=3,
)
search_alg = OptunaSearch(metric='val_loss', mode='min')
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config=config,
num_samples=num_samples,
scheduler=scheduler,
search_alg=search_alg,
progress_reporter=CLIReporter(
metric_columns=['val_loss', 'training_iteration'],
max_progress_rows=10
),
verbose=1,
resources_per_trial={'cpu': 2, 'gpu': 0},
)
best_trial = result.get_best_trial('val_loss', 'min', 'last')
print(f"Best val loss: {best_trial.last_result['val_loss']:.4f}")
print(f"Best config: {best_trial.config}")
return result
def run_pbt(X_train, y_train, X_val, y_val):
"""Population Based Training: dynamically mutate hyperparameters during training"""
pbt_scheduler = PopulationBasedTraining(
time_attr='training_iteration',
metric='val_loss',
mode='min',
perturbation_interval=20,
hyperparam_mutations={
'lr': tune.loguniform(1e-5, 1e-1),
'dropout': tune.uniform(0.1, 0.5),
},
quantile_fraction=0.25, # Replace bottom 25% with top 25%
)
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config={
'hidden_size': 256,
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-4, 1e-1),
'weight_decay': 1e-5,
'max_epochs': 200,
},
num_samples=8,
scheduler=pbt_scheduler,
verbose=1,
)
return result
5. AutoGluon
AutoGluon Overview
AutoGluon, developed by Amazon, achieves Kaggle-level performance with minimal code — sometimes just 3 lines.
pip install autogluon
Tabular Data (TabularPredictor)
from autogluon.tabular import TabularPredictor
import pandas as pd
def autogluon_tabular_example(train_df, test_df, target_col, eval_metric='roc_auc'):
"""AutoGluon tabular training"""
predictor = TabularPredictor(
label=target_col,
eval_metric=eval_metric,
path='autogluon_models/',
problem_type='binary', # 'binary', 'multiclass', 'regression', 'softclass'
)
predictor.fit(
train_data=train_df,
time_limit=3600,
presets='best_quality', # 'best_quality', 'good_quality', 'medium_quality',
# 'optimize_for_deployment'
excluded_model_types=['KNN'],
verbosity=2,
)
leaderboard = predictor.leaderboard(test_df, silent=True)
print(leaderboard[['model', 'score_test', 'score_val', 'pred_time_test']].head(10))
predictions = predictor.predict(test_df)
pred_proba = predictor.predict_proba(test_df)
feature_importance = predictor.feature_importance(test_df)
print(feature_importance.head(20))
return predictor, predictions, pred_proba
def autogluon_advanced(train_df, test_df, target_col):
"""AutoGluon with custom hyperparameters"""
hyperparameters = {
'GBM': [
{'num_boost_round': 300, 'ag_args': {'name_suffix': 'fast'}},
{'num_boost_round': 1000, 'learning_rate': 0.03,
'ag_args': {'name_suffix': 'slow', 'priority': 0}},
],
'XGB': [{'n_estimators': 300, 'max_depth': 6}],
'CAT': [{'iterations': 500, 'depth': 6}],
'NN_TORCH': [{'num_epochs': 50, 'learning_rate': 1e-3, 'dropout_prob': 0.1}],
'RF': [{'n_estimators': 300}],
}
predictor = TabularPredictor(
label=target_col, eval_metric='roc_auc', path='autogluon_advanced/'
)
predictor.fit(
train_data=train_df,
hyperparameters=hyperparameters,
time_limit=7200,
num_stack_levels=1, # Number of stacking levels
num_bag_folds=5, # Number of CV folds for bagging
num_bag_sets=1, # Number of bagging sets
verbosity=3,
)
return predictor
Multimodal Learning
from autogluon.multimodal import MultiModalPredictor
def autogluon_image_classification(train_df, test_df, label_col):
"""AutoGluon image classification"""
predictor = MultiModalPredictor(label=label_col)
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.timm_image.checkpoint_name': 'efficientnet_b4',
'optimization.learning_rate': 1e-4,
'optimization.max_epochs': 20,
}
)
return predictor
def autogluon_multimodal(train_df, test_df, target_col):
"""AutoGluon multimodal: text + tabular features together"""
predictor = MultiModalPredictor(label=target_col, problem_type='binary')
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.hf_text.checkpoint_name': 'bert-base-uncased',
}
)
return predictor
6. FLAML
Microsoft FLAML
FLAML (Fast and Lightweight AutoML), developed by Microsoft Research, specializes in cost-efficient automation.
pip install flaml
from flaml import AutoML
import pandas as pd
import numpy as np
def flaml_basic_example(X_train, y_train, X_test, task='classification'):
"""FLAML basic usage"""
automl = AutoML()
automl_settings = {
'time_budget': 300,
'metric': 'roc_auc',
'task': task, # 'classification', 'regression', 'ranking'
'estimator_list': [
'lgbm', 'xgboost', 'catboost',
'rf', 'extra_tree', 'lrl1', 'lrl2', 'kneighbor'
],
'log_file_name': 'flaml_log.log',
'seed': 42,
'n_jobs': -1,
'verbose': 1,
'retrain_full': True, # Retrain final model on all data
'max_iter': 100,
'ensemble': True,
'eval_method': 'cv',
'n_splits': 5,
}
automl.fit(X_train, y_train, **automl_settings)
print(f"Best estimator: {automl.best_estimator}")
print(f"Best loss: {automl.best_loss:.4f}")
print(f"Best config: {automl.best_config}")
print(f"Time to find best model: {automl.time_to_find_best_model:.1f}s")
predictions = automl.predict(X_test)
pred_proba = automl.predict_proba(X_test)
return automl, predictions, pred_proba
def flaml_sklearn_pipeline(X_train, y_train, X_test):
"""Integrate FLAML into a scikit-learn Pipeline"""
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
automl = AutoML()
pipeline = Pipeline([
('scaler', StandardScaler()),
('automl', automl),
])
pipeline.fit(
X_train, y_train,
automl__time_budget=120,
automl__metric='roc_auc',
automl__task='classification',
)
return pipeline
def flaml_custom_objective(X_train, y_train):
"""FLAML with a custom evaluation metric"""
def custom_metric(
X_val, y_val, estimator, labels, X_train, y_train,
weight_val=None, weight_train=None, *args
):
"""Optimize F-beta score"""
from sklearn.metrics import fbeta_score
y_pred = estimator.predict(X_val)
score = fbeta_score(y_val, y_pred, beta=2, average='weighted')
return -score, {'f2_score': score} # (loss, metrics_dict)
automl = AutoML()
automl.fit(
X_train, y_train,
metric=custom_metric,
task='classification',
time_budget=120,
)
return automl
7. H2O AutoML
H2O Cluster
H2O AutoML is an enterprise-grade AutoML platform with extensive interpretability tools.
pip install h2o
import h2o
from h2o.automl import H2OAutoML
import pandas as pd
def h2o_automl_example(train_df, test_df, target_col, max_models=20):
"""H2O AutoML end-to-end example"""
h2o.init(nthreads=-1, max_mem_size='8G', port=54321)
train_h2o = h2o.H2OFrame(train_df)
test_h2o = h2o.H2OFrame(test_df)
# Mark target as factor for classification
train_h2o[target_col] = train_h2o[target_col].asfactor()
feature_cols = [col for col in train_df.columns if col != target_col]
aml = H2OAutoML(
max_models=max_models,
max_runtime_secs=3600,
seed=42,
sort_metric='AUC',
balance_classes=False,
include_algos=[
'GBM', 'GLM', 'DRF', 'DeepLearning',
'StackedEnsemble', 'XGBoost'
],
keep_cross_validation_predictions=True,
keep_cross_validation_models=True,
nfolds=5,
verbosity='info',
)
aml.fit(
x=feature_cols, y=target_col,
training_frame=train_h2o,
leaderboard_frame=test_h2o,
)
lb = aml.leaderboard
print("H2O AutoML Leaderboard:")
print(lb.head(20))
best_model = aml.leader
print(f"\nBest model: {best_model.model_id}")
predictions = best_model.predict(test_h2o).as_data_frame()
# Save model
model_path = h2o.save_model(model=best_model, path='h2o_models/', force=True)
print(f"Model saved to: {model_path}")
return aml, best_model, predictions
def cleanup_h2o():
h2o.cluster().shutdown()
8. Neural Architecture Search (NAS)
NAS Overview
Neural Architecture Search (NAS) automatically finds optimal neural network architectures.
Three components of NAS:
- Search Space: The set of possible architectures
- Search Strategy: How to explore the space (random, evolutionary, RL, gradient-based)
- Performance Estimation: How to evaluate candidate architectures
DARTS (Differentiable Architecture Search)
DARTS (Liu et al., 2019) makes architecture search differentiable via continuous relaxation of discrete choices.
import torch
import torch.nn as nn
import torch.nn.functional as F
class MixedOperation(nn.Module):
"""DARTS mixed operation: weighted sum of candidate ops"""
def __init__(self, operations):
super().__init__()
self.ops = nn.ModuleList(operations)
self.alphas = nn.Parameter(torch.randn(len(operations)))
def forward(self, x):
weights = F.softmax(self.alphas, dim=0)
return sum(w * op(x) for w, op in zip(weights, self.ops))
class DARTSCell(nn.Module):
"""A single DARTS cell"""
def __init__(self, in_channels, out_channels):
super().__init__()
operations = [
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.Conv2d(in_channels, out_channels, 5, padding=2),
nn.MaxPool2d(3, stride=1, padding=1),
nn.AvgPool2d(3, stride=1, padding=1),
nn.Identity() if in_channels == out_channels
else nn.Conv2d(in_channels, out_channels, 1),
]
self.mixed_op = MixedOperation(operations)
self.bn = nn.BatchNorm2d(out_channels)
def forward(self, x):
return F.relu(self.bn(self.mixed_op(x)))
class SimpleDARTS(nn.Module):
"""Simplified DARTS network"""
def __init__(self, num_classes=10, num_cells=6):
super().__init__()
self.stem = nn.Conv2d(3, 64, 3, padding=1)
self.cells = nn.ModuleList([DARTSCell(64, 64) for _ in range(num_cells)])
self.classifier = nn.Linear(64, num_classes)
def forward(self, x):
x = self.stem(x)
for cell in self.cells:
x = cell(x)
x = x.mean([2, 3]) # Global average pooling
return self.classifier(x)
def arch_parameters(self):
return [p for n, p in self.named_parameters() if 'alphas' in n]
def model_parameters(self):
return [p for n, p in self.named_parameters() if 'alphas' not in n]
def train_darts(model, train_loader, val_loader, epochs=50):
"""Bilevel optimization for DARTS"""
w_optimizer = torch.optim.SGD(
model.model_parameters(), lr=0.025, momentum=0.9, weight_decay=3e-4
)
a_optimizer = torch.optim.Adam(
model.arch_parameters(), lr=3e-4, betas=(0.5, 0.999), weight_decay=1e-3
)
w_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(w_optimizer, T_max=epochs)
for epoch in range(epochs):
model.train()
train_iter = iter(train_loader)
val_iter = iter(val_loader)
for step in range(min(len(train_loader), len(val_loader))):
# Step 1: Update architecture parameters using validation data
try:
X_val, y_val = next(val_iter)
except StopIteration:
val_iter = iter(val_loader)
X_val, y_val = next(val_iter)
a_optimizer.zero_grad()
val_loss = F.cross_entropy(model(X_val), y_val)
val_loss.backward()
a_optimizer.step()
# Step 2: Update weight parameters using training data
X_train, y_train = next(train_iter)
w_optimizer.zero_grad()
train_loss = F.cross_entropy(model(X_train), y_train)
train_loss.backward()
nn.utils.clip_grad_norm_(model.model_parameters(), 5.0)
w_optimizer.step()
w_scheduler.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}")
# Extract discovered architecture
for i, cell in enumerate(model.cells):
weights = F.softmax(cell.mixed_op.alphas, dim=0).detach()
best_op = weights.argmax().item()
print(f"Cell {i}: Best op index = {best_op}, weights = {weights.numpy()}")
return model
One-Shot NAS
class SuperNetwork(nn.Module):
"""One-Shot NAS: sample sub-networks from a single super-network"""
def __init__(self, num_classes=10, max_channels=256):
super().__init__()
self.max_channels = max_channels
self.channel_options = [64, 128, 256]
self.conv1 = nn.Conv2d(3, max_channels, 3, padding=1)
self.conv2 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.conv3 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(max_channels)
self.bn2 = nn.BatchNorm2d(max_channels)
self.bn3 = nn.BatchNorm2d(max_channels)
self.classifier = nn.Linear(max_channels, num_classes)
def forward(self, x, arch_config=None):
if arch_config is None:
arch_config = {
'conv1_out': torch.randint(0, len(self.channel_options), (1,)).item(),
'conv2_out': torch.randint(0, len(self.channel_options), (1,)).item(),
}
c1 = self.channel_options[arch_config['conv1_out']]
c2 = self.channel_options[arch_config['conv2_out']]
x = F.relu(self.bn1(self.conv1(x)[:, :c1]))
x = F.relu(self.bn2(self.conv2(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c1))
)[:, :c2]))
x = F.relu(self.bn3(self.conv3(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c2))
)))
x = x.mean([2, 3])
return self.classifier(x)
9. Pipeline Automation
Auto-sklearn
pip install auto-sklearn
import autosklearn.classification
import autosklearn.regression
from autosklearn.metrics import roc_auc, mean_squared_error
def auto_sklearn_example(X_train, y_train, X_test, task='classification'):
"""Auto-sklearn: scikit-learn-compatible AutoML"""
if task == 'classification':
automl = autosklearn.classification.AutoSklearnClassifier(
time_left_for_this_task=3600,
per_run_time_limit=360,
n_jobs=-1,
memory_limit=8192,
ensemble_size=50,
ensemble_nbest=50,
max_models_on_disc=50,
include={
'classifier': [
'random_forest', 'gradient_boosting',
'extra_trees', 'liblinear_svc'
]
},
metric=roc_auc,
resampling_strategy='cv',
resampling_strategy_arguments={'folds': 5},
seed=42,
)
else:
automl = autosklearn.regression.AutoSklearnRegressor(
time_left_for_this_task=3600,
per_run_time_limit=360,
n_jobs=-1,
metric=mean_squared_error,
seed=42,
)
automl.fit(X_train, y_train)
print(automl.sprint_statistics())
print(automl.leaderboard())
predictions = automl.predict(X_test)
return automl, predictions
10. AutoML in the LLM Era
Leveraging LLMs for AutoML
Large Language Models (LLMs) are opening new possibilities for AutoML:
- Hyperparameter suggestion: LLMs recommend starting configurations based on dataset characteristics
- Feature engineering: LLMs use domain knowledge to suggest new feature ideas
- Code generation: Automatically generate preprocessing and training code
- Error debugging: Diagnose training failures and suggest solutions
# LLM-guided hyperparameter optimization (conceptual code)
from openai import OpenAI
def llm_hyperparameter_suggestion(dataset_description, model_type, previous_results=None):
"""Use LLM to suggest hyperparameters"""
client = OpenAI()
prompt = f"""
Dataset characteristics:
{dataset_description}
Model type: {model_type}
Previous results:
{previous_results if previous_results else 'None (first attempt)'}
Based on this information, suggest optimal hyperparameters for {model_type} in JSON format.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system",
"content": "You are a machine learning expert. Help optimize hyperparameters."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
return response.choices[0].message.content
# AutoML Agent (experimental)
class AutoMLAgent:
"""LLM-guided AutoML agent"""
def __init__(self, llm_client, X_train, y_train, X_val, y_val, max_iterations=10):
self.client = llm_client
self.X_train = X_train
self.y_train = y_train
self.X_val = X_val
self.y_val = y_val
self.max_iterations = max_iterations
self.history = []
self.best_score = 0
self.best_params = None
def get_next_config(self):
"""Ask the LLM for the next configuration to try"""
history_str = "\n".join([
f"Iteration {i+1}: params={h['params']}, score={h['score']:.4f}"
for i, h in enumerate(self.history[-5:])
])
prompt = f"""
LightGBM parameter attempts so far:
{history_str if history_str else 'None (first attempt)'}
Suggest the next parameter combination to try in JSON format.
Valid ranges: num_leaves(10-300), learning_rate(0.001-0.3),
n_estimators(100-2000), subsample(0.5-1.0), colsample_bytree(0.5-1.0)
"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an HPO expert."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
def evaluate(self, params):
"""Evaluate a parameter configuration"""
import lightgbm as lgb
from sklearn.metrics import roc_auc_score
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
model.fit(self.X_train, self.y_train)
preds = model.predict_proba(self.X_val)[:, 1]
return roc_auc_score(self.y_val, preds)
def run(self):
"""Run the AutoML agent loop"""
for i in range(self.max_iterations):
config = self.get_next_config()
score = self.evaluate(config)
self.history.append({'params': config, 'score': score})
if score > self.best_score:
self.best_score = score
self.best_params = config
print(f"Iteration {i+1}: New best score {score:.4f}")
print(f"\nBest score: {self.best_score:.4f}")
print(f"Best params: {self.best_params}")
return self.best_params
Conclusion
This guide covered the complete AutoML ecosystem:
- Hyperparameter Optimization: From grid search to Bayesian optimization, building systematic intuition
- Optuna: The most flexible Python-native HPO framework, with pruning and visualization
- Ray Tune: Large-scale distributed HPO across multiple GPUs and nodes
- AutoGluon: Amazon's powerful multimodal AutoML for tabular, image, and text data
- FLAML: Microsoft's cost-efficient AutoML with minimal overhead
- H2O AutoML: Enterprise-grade AutoML with interpretability tooling
- NAS: Automated design of optimal neural architectures with DARTS and one-shot methods
- LLM + AutoML: The next frontier of intelligent, language-guided automation
Key Recommendations:
- Under time constraints: use FLAML or AutoGluon with the
good_qualitypreset - Tuning a specific model: use Optuna
- Large-scale or distributed experiments: use Ray Tune
- Enterprise environments: leverage H2O AutoML for its interpretability tools
- LLM-based AutoML is still research-stage but is worth watching closely
AutoML is a tool, not magic. Domain knowledge, data quality, and a correct evaluation framework remain the most critical ingredients for success.
References
- Optuna Documentation
- AutoGluon Documentation
- FLAML Documentation
- H2O AutoML
- Ray Tune Documentation
- DARTS: Differentiable Architecture Search
- Bergstra, J., & Bengio, Y. (2012). Random search for hyper-parameter optimization.
- Feurer, M., et al. (2015). Efficient and Robust Automated Machine Learning (Auto-sklearn).
- He, X., et al. (2021). AutoML: A Survey of the State-of-the-Art.