1. AutoML 개요
AutoML이란?
AutoML(Automated Machine Learning)은 머신러닝 파이프라인의 다양한 단계를 자동화하는 기술입니다. 데이터 과학자가 수동으로 수행하던 작업들 — 데이터 전처리, 특성 엔지니어링, 모델 선택, 하이퍼파라미터 최적화, 앙상블 — 을 알고리즘이 자동으로 수행합니다.
**AutoML이 자동화하는 영역:**
1. **데이터 전처리 자동화**
- 결측치 대체 전략 선택
- 스케일링/정규화 방법 선택
- 이상치 처리
2. **피처 엔지니어링 자동화**
- 특성 변환 (log, 제곱, 상호작용)
- 범주형 인코딩 방법 선택
- 특성 선택 및 생성
3. **모델 선택 (Algorithm Selection)**
- 다양한 알고리즘 탐색
- 메타 학습 (이전 경험 활용)
4. **하이퍼파라미터 최적화 (HPO)**
- 그리드/랜덤 서치
- 베이지안 최적화
- 진화 알고리즘
5. **앙상블 자동화**
- 최적 앙상블 구성 탐색
- 스태킹, 블렌딩 자동화
6. **신경망 구조 탐색 (NAS)**
- 최적 신경망 아키텍처 자동 설계
AutoML의 응용 분야
**산업 응용:**
- **금융**: 신용 위험 모델, 사기 탐지 자동화
- **의료**: 진단 보조 시스템의 빠른 프로토타입
- **유통**: 수요 예측 모델 자동 갱신
- **제조**: 품질 관리 모델 자동화
**주요 오픈소스 AutoML 도구:**
| 도구 | 개발사 | 강점 |
| ------------ | ------------------ | --------------------------------- |
| AutoGluon | Amazon | 멀티모달, 표 형식, 이미지, 텍스트 |
| FLAML | Microsoft | 비용 효율적, 빠름 |
| Optuna | Preferred Networks | HPO, 시각화 |
| H2O AutoML | H2O.ai | 기업용, 해석 가능 |
| Auto-sklearn | AutoML Group | scikit-learn 기반 |
| Ray Tune | Anyscale | 분산 HPO |
| NNI | Microsoft | NAS, HPO |
AutoML의 장단점
**장점:**
- 비전문가도 고품질 모델 구축 가능
- 반복적인 실험을 자동화하여 시간 절약
- 인간이 놓치는 하이퍼파라미터 조합 발견 가능
- 재현 가능한 파이프라인 제공
**단점:**
- 계산 비용이 매우 높을 수 있음
- 도메인 지식 활용에 한계
- 블랙박스 특성 (내부 동작 이해 어려움)
- 특수한 문제에는 맞춤형 솔루션이 더 효과적
- 데이터 리키지 위험성
2. 하이퍼파라미터 최적화 (HPO)
그리드 서치 (Grid Search)
가장 단순한 HPO 방법으로, 모든 하이퍼파라미터 조합을 완전 탐색합니다.
from sklearn.model_selection import GridSearchCV
def grid_search_example(X_train, y_train):
"""그리드 서치: 완전 탐색 (작은 파라미터 공간에만 적합)"""
param_grid = {
'max_depth': [3, 5, 7],
'learning_rate': [0.01, 0.1, 0.3],
'n_estimators': [100, 300, 500],
'subsample': [0.7, 0.9],
}
총 조합 수: 3 * 3 * 3 * 2 = 54가지 x CV 폴드 수
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
grid_search = GridSearchCV(
model, param_grid,
cv=5,
scoring='roc_auc',
n_jobs=-1,
verbose=1,
refit=True # 최적 파라미터로 전체 데이터 재학습
)
grid_search.fit(X_train, y_train)
print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최적 CV 점수: {grid_search.best_score_:.4f}")
결과 DataFrame으로 분석
results = pd.DataFrame(grid_search.cv_results_)
results_sorted = results.sort_values('mean_test_score', ascending=False)
print(results_sorted[['params', 'mean_test_score', 'std_test_score']].head(10))
return grid_search.best_estimator_
랜덤 서치 (Random Search)
Bergstra & Bengio(2012)가 제안한 방법으로, 파라미터 공간에서 무작위로 샘플링합니다.
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint, loguniform
def random_search_example(X_train, y_train, n_iter=100):
"""랜덤 서치: 연속 분포에서 샘플링하여 효율적 탐색"""
param_distributions = {
'max_depth': randint(3, 10),
'learning_rate': loguniform(1e-3, 0.5), # 로그 균등 분포
'n_estimators': randint(100, 1000),
'subsample': uniform(0.6, 0.4), # [0.6, 1.0]
'colsample_bytree': uniform(0.6, 0.4),
'reg_alpha': loguniform(1e-4, 10),
'reg_lambda': loguniform(1e-4, 10),
'min_child_weight': randint(1, 10),
'gamma': uniform(0, 0.5),
}
model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
random_search = RandomizedSearchCV(
model, param_distributions,
n_iter=n_iter, # 시도할 조합 수
cv=5,
scoring='roc_auc',
n_jobs=-1,
verbose=1,
random_state=42,
refit=True
)
random_search.fit(X_train, y_train)
print(f"최적 파라미터: {random_search.best_params_}")
print(f"최적 CV 점수: {random_search.best_score_:.4f}")
return random_search.best_estimator_
베이지안 최적화 (Bayesian Optimization)
베이지안 최적화는 이전 평가 결과를 활용하여 다음 탐색 지점을 지능적으로 선택합니다.
**핵심 구성요소:**
1. **대리 모델 (Surrogate Model)**: 목적 함수의 확률적 근사 (주로 가우시안 프로세스)
2. **획득 함수 (Acquisition Function)**: 다음 탐색 지점 결정
- EI (Expected Improvement): 현재 최솟값 대비 기댓값 개선
- UCB (Upper Confidence Bound): 탐색/활용 균형
- PI (Probability of Improvement): 개선 확률
**TPE (Tree-structured Parzen Estimator):**
- Optuna에서 기본으로 사용하는 알고리즘
- p(x|y) 대신 두 개의 밀도 모델 l(x), g(x)를 학습
- 좋은 결과(상위 gamma%)를 낸 파라미터의 분포 l(x)와 나머지 g(x) 모델링
- EI를 극대화하는 파라미터는 l(x)/g(x) 비율이 큰 지점
3. Optuna
Optuna 기본 개념
Optuna는 Preferred Networks에서 개발한 하이퍼파라미터 최적화 프레임워크로, Python-native이고 사용하기 매우 쉽습니다.
**핵심 개념:**
- **Study**: 최적화 실험 전체 (여러 Trial의 집합)
- **Trial**: 단일 하이퍼파라미터 조합 시도
- **Objective Function**: 최적화할 목적 함수 (minimize 또는 maximize)
- **Sampler**: 파라미터 제안 알고리즘 (TPE, CMA-ES, 랜덤 등)
- **Pruner**: 유망하지 않은 Trial 조기 종료
pip install optuna optuna-dashboard
from optuna.samplers import TPESampler, CmaEsSampler, RandomSampler
from optuna.pruners import MedianPruner, HyperbandPruner
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
로그 레벨 설정
optuna.logging.set_verbosity(optuna.logging.WARNING)
def objective_lgbm(trial, X, y):
"""LightGBM 최적화를 위한 Optuna 목적 함수"""
하이퍼파라미터 탐색 공간 정의
params = {
'objective': 'binary',
'metric': 'auc',
'verbosity': -1,
'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
'num_leaves': trial.suggest_int('num_leaves', 20, 300),
'max_depth': trial.suggest_int('max_depth', 3, 12),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),
'n_estimators': trial.suggest_int('n_estimators', 100, 2000),
'subsample': trial.suggest_float('subsample', 0.5, 1.0),
'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
'min_split_gain': trial.suggest_float('min_split_gain', 0, 1),
'cat_smooth': trial.suggest_int('cat_smooth', 1, 100),
'n_jobs': -1,
}
5-Fold CV로 평가
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
train_data = lgb.Dataset(X_train, y_train)
val_data = lgb.Dataset(X_val, y_val, reference=train_data)
callbacks = [
lgb.early_stopping(stopping_rounds=50, verbose=False),
lgb.log_evaluation(-1),
]
model = lgb.train(
params, train_data,
num_boost_round=params['n_estimators'],
valid_sets=[val_data],
callbacks=callbacks,
)
preds = model.predict(X_val)
fold_score = roc_auc_score(y_val, preds)
cv_scores.append(fold_score)
가지치기 (pruning)
trial.report(fold_score, fold)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
return np.mean(cv_scores)
def run_optuna_study(X, y, n_trials=100, n_jobs=1):
"""Optuna Study 실행"""
Sampler 선택
sampler = TPESampler(
n_startup_trials=20, # 초기 랜덤 탐색 횟수
n_ei_candidates=24, # EI 후보 수
multivariate=True, # 다변량 TPE
seed=42
)
Pruner 설정
pruner = MedianPruner(
n_startup_trials=5, # 가지치기 시작 전 최소 Trial 수
n_warmup_steps=10, # 가지치기 전 warm-up 스텝
interval_steps=1
)
study = optuna.create_study(
direction='maximize', # AUC 최대화
sampler=sampler,
pruner=pruner,
study_name='lgbm_optimization',
storage='sqlite:///optuna.db', # 결과 영구 저장
load_if_exists=True, # 기존 Study 재개
)
최적화 실행
study.optimize(
lambda trial: objective_lgbm(trial, X, y),
n_trials=n_trials,
n_jobs=n_jobs, # 병렬 실행 (주의: DB storage 필요)
show_progress_bar=True,
callbacks=[
lambda study, trial: print(
f"Trial {trial.number}: {trial.value:.4f} "
f"(Best: {study.best_value:.4f})"
) if trial.value is not None else None
],
)
print(f"\n최적 파라미터:")
for key, value in study.best_params.items():
print(f" {key}: {value}")
print(f"최적 AUC: {study.best_value:.4f}")
print(f"완료된 Trial: {len(study.trials)}")
print(f"가지치기된 Trial: {len([t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED])}")
return study
Optuna 시각화
def visualize_optuna_study(study):
"""Optuna 최적화 결과 시각화"""
최적화 히스토리
fig1 = vis.plot_optimization_history(study)
fig1.show()
파라미터 중요도
fig2 = vis.plot_param_importances(study)
fig2.show()
파라미터 관계
fig3 = vis.plot_parallel_coordinate(study)
fig3.show()
파라미터 슬라이스
fig4 = vis.plot_slice(study)
fig4.show()
등고선 플롯
fig5 = vis.plot_contour(study, params=['learning_rate', 'num_leaves'])
fig5.show()
PyTorch + Optuna 완전한 예제
from torch.utils.data import DataLoader, TensorDataset
def create_model(trial, input_dim):
"""Optuna Trial에서 신경망 아키텍처 동적 생성"""
n_layers = trial.suggest_int('n_layers', 1, 4)
dropout = trial.suggest_float('dropout', 0.1, 0.5)
activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'elu'])
activation_map = {
'relu': nn.ReLU(),
'tanh': nn.Tanh(),
'elu': nn.ELU()
}
layers = []
in_features = input_dim
for i in range(n_layers):
out_features = trial.suggest_int(f'n_units_l{i}', 32, 512)
layers.extend([
nn.Linear(in_features, out_features),
nn.BatchNorm1d(out_features),
activation_map[activation_name],
nn.Dropout(dropout),
])
in_features = out_features
layers.append(nn.Linear(in_features, 1))
layers.append(nn.Sigmoid())
return nn.Sequential(*layers)
def objective_pytorch(trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim):
"""PyTorch 신경망 Optuna 목적 함수"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = create_model(trial, input_dim).to(device)
최적화 파라미터
lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)
if optimizer_name == 'Adam':
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
elif optimizer_name == 'RMSprop':
optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
else:
optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay,
momentum=0.9)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
criterion = nn.BCELoss()
train_dataset = TensorDataset(
X_train_t.to(device), y_train_t.to(device).float().unsqueeze(1)
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
best_val_loss = float('inf')
patience = 10
patience_counter = 0
for epoch in range(100):
model.train()
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
output = model(X_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
scheduler.step()
검증
model.eval()
with torch.no_grad():
val_output = model(X_val_t.to(device))
val_loss = criterion(
val_output, y_val_t.to(device).float().unsqueeze(1)
).item()
조기 가지치기 보고
trial.report(val_loss, epoch)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
Early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
break
return best_val_loss
def run_pytorch_optuna(X_train, y_train, X_val, y_val, n_trials=50):
"""PyTorch + Optuna 통합 실행"""
X_train_t = torch.FloatTensor(X_train.values if hasattr(X_train, 'values') else X_train)
y_train_t = torch.FloatTensor(y_train.values if hasattr(y_train, 'values') else y_train)
X_val_t = torch.FloatTensor(X_val.values if hasattr(X_val, 'values') else X_val)
y_val_t = torch.FloatTensor(y_val.values if hasattr(y_val, 'values') else y_val)
input_dim = X_train_t.shape[1]
study = optuna.create_study(
direction='minimize',
pruner=optuna.pruners.HyperbandPruner(
min_resource=5, max_resource=100, reduction_factor=3
),
sampler=TPESampler(seed=42)
)
study.optimize(
lambda trial: objective_pytorch(
trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim
),
n_trials=n_trials,
show_progress_bar=True,
)
print(f"최적 검증 손실: {study.best_value:.4f}")
print(f"최적 파라미터: {study.best_params}")
return study
CMA-ES Sampler
연속 공간에서 더 효율적인 CMA-ES
study_cmaes = optuna.create_study(
direction='maximize',
sampler=CmaEsSampler(
n_startup_trials=10, # 초기 랜덤 탐색 수
restart_strategy='ipop', # 재시작 전략
seed=42
)
)
4. Ray Tune
Ray Tune과 분산 HPO
Ray Tune은 Anyscale이 개발한 분산 하이퍼파라미터 최적화 라이브러리입니다. 여러 GPU/노드에 걸쳐 병렬 학습을 자동으로 처리합니다.
pip install ray[tune] ray[air]
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, HyperBandScheduler
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.bayesopt import BayesOptSearch
ray.init(ignore_reinit_error=True)
def train_with_tune(config, data=None):
"""Ray Tune 학습 함수"""
X_train, y_train, X_val, y_val = data
모델 생성
model = nn.Sequential(
nn.Linear(X_train.shape[1], config['hidden_size']),
nn.ReLU(),
nn.Dropout(config['dropout']),
nn.Linear(config['hidden_size'], config['hidden_size'] // 2),
nn.ReLU(),
nn.Linear(config['hidden_size'] // 2, 1),
nn.Sigmoid()
)
optimizer = torch.optim.Adam(
model.parameters(),
lr=config['lr'],
weight_decay=config['weight_decay']
)
criterion = nn.BCELoss()
X_train_t = torch.FloatTensor(X_train)
y_train_t = torch.FloatTensor(y_train).unsqueeze(1)
X_val_t = torch.FloatTensor(X_val)
y_val_t = torch.FloatTensor(y_val).unsqueeze(1)
for epoch in range(config['max_epochs']):
model.train()
optimizer.zero_grad()
output = model(X_train_t)
loss = criterion(output, y_train_t)
loss.backward()
optimizer.step()
if epoch % 5 == 0:
model.eval()
with torch.no_grad():
val_output = model(X_val_t)
val_loss = criterion(val_output, y_val_t).item()
Ray Tune에 중간 결과 보고
tune.report(
val_loss=val_loss,
training_iteration=epoch,
)
def run_ray_tune(X_train, y_train, X_val, y_val, num_samples=50):
"""Ray Tune 분산 HPO 실행"""
config = {
'hidden_size': tune.choice([64, 128, 256, 512]),
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-5, 1e-1),
'weight_decay': tune.loguniform(1e-8, 1e-3),
'max_epochs': tune.choice([50, 100, 200]),
}
ASHA (Asynchronous Successive Halving Algorithm) 스케줄러
scheduler = ASHAScheduler(
metric='val_loss',
mode='min',
max_t=200, # 최대 에폭
grace_period=10, # 최소 실행 에폭
reduction_factor=3, # 제거 비율
)
reporter = CLIReporter(
metric_columns=['val_loss', 'training_iteration'],
max_progress_rows=10
)
OptunaSearch 통합
search_alg = OptunaSearch(metric='val_loss', mode='min')
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config=config,
num_samples=num_samples,
scheduler=scheduler,
search_alg=search_alg,
progress_reporter=reporter,
verbose=1,
resources_per_trial={'cpu': 2, 'gpu': 0},
)
best_trial = result.get_best_trial('val_loss', 'min', 'last')
print(f"최적 검증 손실: {best_trial.last_result['val_loss']:.4f}")
print(f"최적 파라미터: {best_trial.config}")
return result
Population Based Training (PBT)
from ray.tune.schedulers import PopulationBasedTraining
def run_pbt(X_train, y_train, X_val, y_val):
"""PBT: 학습 중 하이퍼파라미터를 동적으로 변경"""
pbt_scheduler = PopulationBasedTraining(
time_attr='training_iteration',
metric='val_loss',
mode='min',
perturbation_interval=20, # N 스텝마다 변이
hyperparam_mutations={
'lr': tune.loguniform(1e-5, 1e-1),
'dropout': tune.uniform(0.1, 0.5),
},
quantile_fraction=0.25, # 상위 25%의 파라미터로 하위 25% 교체
)
result = tune.run(
tune.with_parameters(
train_with_tune,
data=(X_train, y_train, X_val, y_val)
),
config={
'hidden_size': 256,
'dropout': tune.uniform(0.1, 0.5),
'lr': tune.loguniform(1e-4, 1e-1),
'weight_decay': 1e-5,
'max_epochs': 200,
},
num_samples=8,
scheduler=pbt_scheduler,
verbose=1,
)
return result
5. AutoGluon
AutoGluon 개요
AutoGluon은 Amazon이 개발한 오픈소스 AutoML 라이브러리로, 최소한의 코드로 Kaggle 수준의 성능을 달성합니다.
pip install autogluon
표 형식 데이터 (TabularPredictor)
from autogluon.tabular import TabularPredictor
def autogluon_tabular_example(train_df, test_df, target_col, eval_metric='roc_auc'):
"""AutoGluon 표 형식 데이터 학습"""
기본 사용법 (3줄로 학습!)
predictor = TabularPredictor(
label=target_col,
eval_metric=eval_metric,
path='autogluon_models/', # 모델 저장 경로
problem_type='binary', # 'binary', 'multiclass', 'regression', 'softclass'
)
predictor.fit(
train_data=train_df,
time_limit=3600, # 최대 학습 시간 (초)
presets='best_quality', # 품질 프리셋
'best_quality': 최고 성능 (느림)
'good_quality': 좋은 성능/속도 균형
'medium_quality': 빠른 학습
'optimize_for_deployment': 빠른 예측에 최적화
excluded_model_types=['KNN'], # 제외할 모델 타입
verbosity=2,
)
리더보드 출력
leaderboard = predictor.leaderboard(test_df, silent=True)
print(leaderboard[['model', 'score_test', 'score_val', 'pred_time_test']].head(10))
예측
predictions = predictor.predict(test_df)
pred_proba = predictor.predict_proba(test_df)
특성 중요도
feature_importance = predictor.feature_importance(test_df)
print(feature_importance.head(20))
return predictor, predictions, pred_proba
고급 설정: 하이퍼파라미터 커스터마이징
def autogluon_advanced(train_df, test_df, target_col):
"""AutoGluon 고급 설정"""
hyperparameters = {
'GBM': [
{'num_boost_round': 300, 'ag_args': {'name_suffix': 'fast'}},
{'num_boost_round': 1000, 'learning_rate': 0.03,
'ag_args': {'name_suffix': 'slow', 'priority': 0}},
],
'XGB': [
{'n_estimators': 300, 'max_depth': 6},
],
'CAT': [
{'iterations': 500, 'depth': 6},
],
'NN_TORCH': [
{'num_epochs': 50, 'learning_rate': 1e-3,
'dropout_prob': 0.1},
],
'RF': [
{'n_estimators': 300},
],
}
predictor = TabularPredictor(
label=target_col,
eval_metric='roc_auc',
path='autogluon_advanced/',
)
predictor.fit(
train_data=train_df,
hyperparameters=hyperparameters,
time_limit=7200,
num_stack_levels=1, # 스태킹 레벨 수
num_bag_folds=5, # 배깅 폴드 수 (0이면 배깅 없음)
num_bag_sets=1, # 배깅 세트 수
verbosity=3,
)
return predictor
이미지 분류
from autogluon.multimodal import MultiModalPredictor
def autogluon_image_classification(train_df, test_df, label_col, image_col):
"""AutoGluon 이미지 분류"""
predictor = MultiModalPredictor(label=label_col)
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.timm_image.checkpoint_name': 'efficientnet_b4',
'optimization.learning_rate': 1e-4,
'optimization.max_epochs': 20,
}
)
predictions = predictor.predict(test_df)
return predictor, predictions
텍스트 + 표 형식 멀티모달
def autogluon_multimodal(train_df, test_df, target_col):
"""AutoGluon 멀티모달 학습 (텍스트 + 수치형 특성)"""
predictor = MultiModalPredictor(
label=target_col,
problem_type='binary',
)
predictor.fit(
train_data=train_df,
time_limit=3600,
hyperparameters={
'model.hf_text.checkpoint_name': 'bert-base-uncased',
}
)
return predictor
6. FLAML
Microsoft FLAML
FLAML(Fast and Lightweight AutoML)은 Microsoft Research에서 개발한 AutoML 라이브러리로, 비용 효율적인 자동화에 특화되어 있습니다.
pip install flaml
from flaml import AutoML
def flaml_basic_example(X_train, y_train, X_test, task='classification'):
"""FLAML 기본 사용"""
automl = AutoML()
automl_settings = {
'time_budget': 300, # 최대 학습 시간 (초)
'metric': 'roc_auc', # 최적화 지표
'task': task, # 'classification', 'regression', 'ranking'
'estimator_list': [ # 시도할 모델 목록
'lgbm', 'xgboost', 'catboost',
'rf', 'extra_tree', 'lrl1', 'lrl2',
'kneighbor', 'prophet', 'arima'
],
'log_file_name': 'flaml_log.log',
'seed': 42,
'n_jobs': -1,
'verbose': 1,
비용 효율적 탐색 설정
'retrain_full': True, # 최종 모델을 전체 데이터로 재학습
'max_iter': 100, # 최대 반복 수
'ensemble': True, # 앙상블 사용 여부
'eval_method': 'cv', # 'cv' 또는 'holdout'
'n_splits': 5, # CV 폴드 수
}
automl.fit(X_train, y_train, **automl_settings)
print(f"최적 모델: {automl.best_estimator}")
print(f"최적 손실: {automl.best_loss:.4f}")
print(f"최적 설정: {automl.best_config}")
print(f"총 학습 시간: {automl.time_to_find_best_model:.1f}초")
predictions = automl.predict(X_test)
pred_proba = automl.predict_proba(X_test)
return automl, predictions, pred_proba
FLAML + scikit-learn 파이프라인
def flaml_sklearn_pipeline(X_train, y_train, X_test):
"""FLAML을 scikit-learn 파이프라인에 통합"""
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
automl = AutoML()
pipeline = Pipeline([
('scaler', StandardScaler()),
('automl', automl),
])
pipeline.fit(
X_train, y_train,
automl__time_budget=120,
automl__metric='roc_auc',
automl__task='classification',
)
return pipeline
FLAML 커스텀 목적 함수
def flaml_custom_objective(X_train, y_train):
"""FLAML 커스텀 평가 지표"""
from flaml.automl.data import get_output_from_log
def custom_metric(
X_val, y_val, estimator, labels, X_train, y_train,
weight_val=None, weight_train=None, *args
):
"""F-beta 점수를 최적화하는 커스텀 지표"""
from sklearn.metrics import fbeta_score
y_pred = estimator.predict(X_val)
score = fbeta_score(y_val, y_pred, beta=2, average='weighted')
return -score, {'f2_score': score} # (손실, 메트릭 딕셔너리)
automl = AutoML()
automl.fit(
X_train, y_train,
metric=custom_metric,
task='classification',
time_budget=120,
)
return automl
7. H2O AutoML
H2O 클러스터
H2O AutoML은 엔터프라이즈 수준의 AutoML 플랫폼으로, 다양한 알고리즘과 해석 가능성 도구를 제공합니다.
pip install h2o
from h2o.automl import H2OAutoML
def h2o_automl_example(train_df, test_df, target_col, max_models=20):
"""H2O AutoML 사용"""
H2O 클러스터 시작
h2o.init(
nthreads=-1, # 모든 CPU 사용
max_mem_size='8G', # 최대 메모리
port=54321,
)
H2O Frame으로 변환
train_h2o = h2o.H2OFrame(train_df)
test_h2o = h2o.H2OFrame(test_df)
타겟 컬럼을 카테고리형으로 변환 (분류)
train_h2o[target_col] = train_h2o[target_col].asfactor()
feature_cols = [col for col in train_df.columns if col != target_col]
AutoML 실행
aml = H2OAutoML(
max_models=max_models, # 시도할 최대 모델 수
max_runtime_secs=3600, # 최대 실행 시간
seed=42,
sort_metric='AUC', # 정렬 기준 지표
balance_classes=False, # 클래스 불균형 처리
포함/제외할 알고리즘
include_algos=[
'GBM', 'GLM', 'DRF', 'DeepLearning',
'StackedEnsemble', 'XGBoost'
],
exclude_algos=['DeepLearning'],
스태킹 설정
keep_cross_validation_predictions=True,
keep_cross_validation_models=True,
nfolds=5,
verbosity='info',
)
aml.fit(
x=feature_cols,
y=target_col,
training_frame=train_h2o,
validation_frame=None, # None이면 CV 사용
leaderboard_frame=test_h2o,
)
리더보드 출력
lb = aml.leaderboard
print("H2O AutoML 리더보드:")
print(lb.head(20))
최적 모델
best_model = aml.leader
print(f"\n최적 모델: {best_model.model_id}")
예측
predictions = best_model.predict(test_h2o)
pred_df = predictions.as_data_frame()
모델 설명
explainability = aml.explain(test_h2o)
SHAP 값 (지원하는 모델)
if hasattr(best_model, 'shap_values'):
shap_vals = best_model.shap_values(test_h2o)
모델 저장
model_path = h2o.save_model(model=best_model, path='h2o_models/', force=True)
print(f"모델 저장됨: {model_path}")
return aml, best_model, pred_df
H2O 종료
def cleanup_h2o():
h2o.cluster().shutdown()
8. 신경망 구조 탐색 (NAS)
NAS 개요
Neural Architecture Search(NAS)는 최적의 신경망 아키텍처를 자동으로 탐색하는 기술입니다.
**NAS의 세 가지 구성요소:**
1. **탐색 공간 (Search Space)**: 가능한 아키텍처의 범위
2. **탐색 전략 (Search Strategy)**: 공간 탐색 방법 (랜덤, 진화, RL, 그래디언트)
3. **성능 평가 (Performance Estimation)**: 아키텍처 평가 방법
DARTS (미분 가능한 NAS)
DARTS(Differentiable Architecture Search, Liu et al. 2019)는 아키텍처 탐색을 연속 완화(continuous relaxation)를 통해 미분 가능하게 만듭니다.
class MixedOperation(nn.Module):
"""DARTS의 혼합 연산 (가중 합)"""
def __init__(self, operations):
super().__init__()
self.ops = nn.ModuleList(operations)
아키텍처 파라미터 (alpha)
self.alphas = nn.Parameter(torch.randn(len(operations)))
def forward(self, x):
weights = F.softmax(self.alphas, dim=0)
return sum(w * op(x) for w, op in zip(weights, self.ops))
class DARTSCell(nn.Module):
"""DARTS 셀 구조"""
def __init__(self, in_channels, out_channels):
super().__init__()
사용 가능한 연산 정의
operations = [
nn.Conv2d(in_channels, out_channels, 3, padding=1),
nn.Conv2d(in_channels, out_channels, 5, padding=2),
nn.MaxPool2d(3, stride=1, padding=1),
nn.AvgPool2d(3, stride=1, padding=1),
nn.Identity() if in_channels == out_channels else
nn.Conv2d(in_channels, out_channels, 1),
]
self.mixed_op = MixedOperation(operations)
self.bn = nn.BatchNorm2d(out_channels)
def forward(self, x):
return F.relu(self.bn(self.mixed_op(x)))
class SimpleDARTS(nn.Module):
"""간단한 DARTS 아키텍처"""
def __init__(self, num_classes=10, num_cells=6):
super().__init__()
self.stem = nn.Conv2d(3, 64, 3, padding=1)
self.cells = nn.ModuleList([
DARTSCell(64, 64) for _ in range(num_cells)
])
self.classifier = nn.Linear(64, num_classes)
def forward(self, x):
x = self.stem(x)
for cell in self.cells:
x = cell(x)
x = x.mean([2, 3]) # Global Average Pooling
return self.classifier(x)
def arch_parameters(self):
"""아키텍처 파라미터만 반환"""
return [p for n, p in self.named_parameters() if 'alphas' in n]
def model_parameters(self):
"""가중치 파라미터만 반환"""
return [p for n, p in self.named_parameters() if 'alphas' not in n]
def train_darts(model, train_loader, val_loader, epochs=50):
"""DARTS 이중 최적화 학습"""
가중치 최적화기
w_optimizer = torch.optim.SGD(
model.model_parameters(), lr=0.025, momentum=0.9, weight_decay=3e-4
)
아키텍처 파라미터 최적화기
a_optimizer = torch.optim.Adam(
model.arch_parameters(), lr=3e-4, betas=(0.5, 0.999), weight_decay=1e-3
)
w_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
w_optimizer, T_max=epochs
)
for epoch in range(epochs):
model.train()
train_iter = iter(train_loader)
val_iter = iter(val_loader)
for step in range(min(len(train_loader), len(val_loader))):
1. 아키텍처 파라미터 업데이트 (검증 데이터)
try:
X_val, y_val = next(val_iter)
except StopIteration:
val_iter = iter(val_loader)
X_val, y_val = next(val_iter)
a_optimizer.zero_grad()
val_logits = model(X_val)
val_loss = F.cross_entropy(val_logits, y_val)
val_loss.backward()
a_optimizer.step()
2. 가중치 파라미터 업데이트 (훈련 데이터)
X_train, y_train = next(train_iter)
w_optimizer.zero_grad()
train_logits = model(X_train)
train_loss = F.cross_entropy(train_logits, y_train)
train_loss.backward()
nn.utils.clip_grad_norm_(model.model_parameters(), 5.0)
w_optimizer.step()
w_scheduler.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}")
최적 아키텍처 추출
for i, cell in enumerate(model.cells):
weights = F.softmax(cell.mixed_op.alphas, dim=0).detach()
best_op = weights.argmax().item()
print(f"Cell {i}: 최적 연산 인덱스 = {best_op}, 가중치 = {weights}")
return model
One-Shot NAS
class SuperNetwork(nn.Module):
"""One-Shot NAS: 단일 슈퍼네트워크에서 서브네트워크 샘플링"""
def __init__(self, num_classes=10, max_channels=256):
super().__init__()
self.max_channels = max_channels
가능한 채널 수 옵션
self.channel_options = [64, 128, 256]
전체 파라미터로 레이어 정의
self.conv1 = nn.Conv2d(3, max_channels, 3, padding=1)
self.conv2 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.conv3 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(max_channels)
self.bn2 = nn.BatchNorm2d(max_channels)
self.bn3 = nn.BatchNorm2d(max_channels)
self.classifier = nn.Linear(max_channels, num_classes)
def forward(self, x, arch_config=None):
"""arch_config: 각 레이어의 채널 수"""
if arch_config is None:
무작위 서브네트워크 샘플링
arch_config = {
'conv1_out': torch.randint(0, len(self.channel_options), (1,)).item(),
'conv2_out': torch.randint(0, len(self.channel_options), (1,)).item(),
}
슬라이스된 채널로 순전파
c1 = self.channel_options[arch_config['conv1_out']]
c2 = self.channel_options[arch_config['conv2_out']]
x = F.relu(self.bn1(self.conv1(x)[:, :c1]))
x = F.relu(self.bn2(self.conv2(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c1))
)[:, :c2]))
x = F.relu(self.bn3(self.conv3(
F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c2))
)))
x = x.mean([2, 3])
return self.classifier(x)
9. 파이프라인 자동화
Auto-sklearn
pip install auto-sklearn
from autosklearn.metrics import roc_auc, mean_squared_error
def auto_sklearn_example(X_train, y_train, X_test, task='classification'):
"""Auto-sklearn: scikit-learn 기반 AutoML"""
if task == 'classification':
automl = autosklearn.classification.AutoSklearnClassifier(
time_left_for_this_task=3600, # 전체 시간 제한 (초)
per_run_time_limit=360, # 단일 모델 시간 제한
n_jobs=-1,
memory_limit=8192, # 메모리 제한 (MB)
ensemble_size=50, # 앙상블 크기
ensemble_nbest=50, # 앙상블에 사용할 최적 모델 수
max_models_on_disc=50,
포함/제외 알고리즘
include={
'classifier': [
'random_forest', 'gradient_boosting',
'extra_trees', 'liblinear_svc'
]
},
exclude={'classifier': ['k_nearest_neighbors']},
메트릭
metric=roc_auc,
resampling_strategy='cv',
resampling_strategy_arguments={'folds': 5},
seed=42,
)
else:
automl = autosklearn.regression.AutoSklearnRegressor(
time_left_for_this_task=3600,
per_run_time_limit=360,
n_jobs=-1,
metric=mean_squared_error,
seed=42,
)
automl.fit(X_train, y_train)
통계 출력
print(automl.sprint_statistics())
print(automl.leaderboard())
predictions = automl.predict(X_test)
if task == 'classification':
pred_proba = automl.predict_proba(X_test)
return automl, predictions
10. LLM 시대의 AutoML
LLM을 AutoML에 활용
대규모 언어 모델(LLM)은 AutoML에 새로운 가능성을 열고 있습니다:
1. **하이퍼파라미터 제안**: LLM이 데이터셋 특성에 맞는 초기 하이퍼파라미터 추천
2. **특성 엔지니어링**: LLM이 도메인 지식을 활용하여 새로운 특성 생성 아이디어 제안
3. **코드 생성**: 전처리, 모델 학습 코드 자동 생성
4. **에러 디버깅**: 학습 실패 원인 분석 및 해결책 제안
LLM 기반 하이퍼파라미터 최적화 (개념 코드)
from openai import OpenAI
def llm_hyperparameter_suggestion(dataset_description, model_type, previous_results=None):
"""LLM을 활용한 하이퍼파라미터 제안"""
client = OpenAI()
prompt = f"""
데이터셋 특성:
{dataset_description}
모델 타입: {model_type}
이전 시도 결과:
{previous_results if previous_results else '없음'}
위 정보를 바탕으로 {model_type}의 최적 하이퍼파라미터를 JSON 형식으로 제안해주세요.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system",
"content": "당신은 머신러닝 전문가입니다. 하이퍼파라미터 최적화를 도와주세요."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
return response.choices[0].message.content
Few-shot learning을 활용한 AutoML
def llm_feature_engineering(df_description, target_description):
"""LLM이 제안하는 피처 엔지니어링"""
client = OpenAI()
prompt = f"""
데이터프레임 컬럼:
{df_description}
예측 목표:
{target_description}
유용할 수 있는 파생 특성과 해당 Python 코드를 제안해주세요.
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system",
"content": "머신러닝 특성 엔지니어링 전문가로서 유용한 파생 특성을 제안하세요."},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
AutoML 에이전트 (실험적)
class AutoMLAgent:
"""LLM 기반 AutoML 에이전트 (개념 구현)"""
def __init__(self, llm_client, X_train, y_train, X_val, y_val, max_iterations=10):
self.client = llm_client
self.X_train = X_train
self.y_train = y_train
self.X_val = X_val
self.y_val = y_val
self.max_iterations = max_iterations
self.history = []
self.best_score = 0
self.best_params = None
def get_next_config(self):
"""LLM에게 다음 시도할 설정 요청"""
history_str = "\n".join([
f"Iteration {i+1}: params={h['params']}, score={h['score']:.4f}"
for i, h in enumerate(self.history[-5:]) # 최근 5개
])
prompt = f"""
현재까지 시도한 LightGBM 파라미터와 결과:
{history_str if history_str else '없음 (첫 번째 시도)'}
다음에 시도할 파라미터 조합을 JSON으로 제안하세요.
가능한 범위: num_leaves(10-300), learning_rate(0.001-0.3),
n_estimators(100-2000), subsample(0.5-1.0), colsample_bytree(0.5-1.0)
"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "당신은 HPO 전문가입니다."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
def evaluate(self, params):
"""파라미터 평가"""
from sklearn.metrics import roc_auc_score
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
model.fit(self.X_train, self.y_train)
preds = model.predict_proba(self.X_val)[:, 1]
return roc_auc_score(self.y_val, preds)
def run(self):
"""에이전트 실행"""
for i in range(self.max_iterations):
config = self.get_next_config()
score = self.evaluate(config)
self.history.append({'params': config, 'score': score})
if score > self.best_score:
self.best_score = score
self.best_params = config
print(f"Iteration {i+1}: 새로운 최적 점수 {score:.4f}")
print(f"\n최적 점수: {self.best_score:.4f}")
print(f"최적 파라미터: {self.best_params}")
return self.best_params
마무리
이 가이드에서 AutoML의 전체 생태계를 다루었습니다:
1. **하이퍼파라미터 최적화**: 그리드 서치에서 베이지안 최적화까지 체계적 이해
2. **Optuna**: 가장 유연한 Python-native HPO 프레임워크
3. **Ray Tune**: 분산 환경에서의 대규모 HPO
4. **AutoGluon**: Amazon의 강력한 멀티모달 AutoML
5. **FLAML**: Microsoft의 비용 효율적 AutoML
6. **H2O AutoML**: 기업용 AutoML 플랫폼
7. **NAS**: 최적 신경망 아키텍처 자동 탐색
8. **LLM + AutoML**: 차세대 지능형 AutoML
**핵심 권장 사항:**
- 시간 제약이 있다면 FLAML 또는 AutoGluon (good_quality 프리셋) 사용
- 특정 모델을 최적화하려면 Optuna 사용
- 분산 환경이나 대규모 실험에는 Ray Tune 사용
- 기업 환경에서는 H2O AutoML의 해석 가능성 도구 활용
- LLM 기반 AutoML은 아직 연구 단계지만 주목해야 할 방향
AutoML은 도구이지 마법이 아닙니다. 도메인 지식, 데이터 품질, 올바른 평가 프레임워크가 여전히 가장 중요합니다.
참고자료
- [Optuna 공식 문서](https://optuna.org/)
- [AutoGluon 공식 문서](https://auto.gluon.ai/)
- [FLAML 공식 문서](https://microsoft.github.io/FLAML/)
- [H2O AutoML](https://h2o.ai/products/h2o-automl/)
- [Ray Tune 문서](https://docs.ray.io/en/latest/tune/index.html)
- [DARTS: Differentiable Architecture Search](https://arxiv.org/abs/1806.09055)
- Bergstra, J., & Bengio, Y. (2012). Random search for hyper-parameter optimization.
- Feurer, M., et al. (2015). Efficient and Robust Automated Machine Learning (Auto-sklearn).
- He, X., et al. (2021). AutoML: A Survey of the State-of-the-Art.
현재 단락 (1/946)
AutoML(Automated Machine Learning)은 머신러닝 파이프라인의 다양한 단계를 자동화하는 기술입니다. 데이터 과학자가 수동으로 수행하던 작업들 — 데이터 전처...