Skip to content
Published on

AutoML 완전 가이드: AutoGluon, FLAML, Optuna로 자동화된 ML 파이프라인

Authors

1. AutoML 개요

AutoML이란?

AutoML(Automated Machine Learning)은 머신러닝 파이프라인의 다양한 단계를 자동화하는 기술입니다. 데이터 과학자가 수동으로 수행하던 작업들 — 데이터 전처리, 특성 엔지니어링, 모델 선택, 하이퍼파라미터 최적화, 앙상블 — 을 알고리즘이 자동으로 수행합니다.

AutoML이 자동화하는 영역:

  1. 데이터 전처리 자동화

    • 결측치 대체 전략 선택
    • 스케일링/정규화 방법 선택
    • 이상치 처리
  2. 피처 엔지니어링 자동화

    • 특성 변환 (log, 제곱, 상호작용)
    • 범주형 인코딩 방법 선택
    • 특성 선택 및 생성
  3. 모델 선택 (Algorithm Selection)

    • 다양한 알고리즘 탐색
    • 메타 학습 (이전 경험 활용)
  4. 하이퍼파라미터 최적화 (HPO)

    • 그리드/랜덤 서치
    • 베이지안 최적화
    • 진화 알고리즘
  5. 앙상블 자동화

    • 최적 앙상블 구성 탐색
    • 스태킹, 블렌딩 자동화
  6. 신경망 구조 탐색 (NAS)

    • 최적 신경망 아키텍처 자동 설계

AutoML의 응용 분야

산업 응용:

  • 금융: 신용 위험 모델, 사기 탐지 자동화
  • 의료: 진단 보조 시스템의 빠른 프로토타입
  • 유통: 수요 예측 모델 자동 갱신
  • 제조: 품질 관리 모델 자동화

주요 오픈소스 AutoML 도구:

도구개발사강점
AutoGluonAmazon멀티모달, 표 형식, 이미지, 텍스트
FLAMLMicrosoft비용 효율적, 빠름
OptunaPreferred NetworksHPO, 시각화
H2O AutoMLH2O.ai기업용, 해석 가능
Auto-sklearnAutoML Groupscikit-learn 기반
Ray TuneAnyscale분산 HPO
NNIMicrosoftNAS, HPO

AutoML의 장단점

장점:

  • 비전문가도 고품질 모델 구축 가능
  • 반복적인 실험을 자동화하여 시간 절약
  • 인간이 놓치는 하이퍼파라미터 조합 발견 가능
  • 재현 가능한 파이프라인 제공

단점:

  • 계산 비용이 매우 높을 수 있음
  • 도메인 지식 활용에 한계
  • 블랙박스 특성 (내부 동작 이해 어려움)
  • 특수한 문제에는 맞춤형 솔루션이 더 효과적
  • 데이터 리키지 위험성

2. 하이퍼파라미터 최적화 (HPO)

가장 단순한 HPO 방법으로, 모든 하이퍼파라미터 조합을 완전 탐색합니다.

from sklearn.model_selection import GridSearchCV
import xgboost as xgb

def grid_search_example(X_train, y_train):
    """그리드 서치: 완전 탐색 (작은 파라미터 공간에만 적합)"""
    param_grid = {
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1, 0.3],
        'n_estimators': [100, 300, 500],
        'subsample': [0.7, 0.9],
    }
    # 총 조합 수: 3 * 3 * 3 * 2 = 54가지 x CV 폴드 수

    model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
    grid_search = GridSearchCV(
        model, param_grid,
        cv=5,
        scoring='roc_auc',
        n_jobs=-1,
        verbose=1,
        refit=True  # 최적 파라미터로 전체 데이터 재학습
    )
    grid_search.fit(X_train, y_train)

    print(f"최적 파라미터: {grid_search.best_params_}")
    print(f"최적 CV 점수: {grid_search.best_score_:.4f}")

    # 결과 DataFrame으로 분석
    results = pd.DataFrame(grid_search.cv_results_)
    results_sorted = results.sort_values('mean_test_score', ascending=False)
    print(results_sorted[['params', 'mean_test_score', 'std_test_score']].head(10))

    return grid_search.best_estimator_

Bergstra & Bengio(2012)가 제안한 방법으로, 파라미터 공간에서 무작위로 샘플링합니다.

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint, loguniform

def random_search_example(X_train, y_train, n_iter=100):
    """랜덤 서치: 연속 분포에서 샘플링하여 효율적 탐색"""
    param_distributions = {
        'max_depth': randint(3, 10),
        'learning_rate': loguniform(1e-3, 0.5),   # 로그 균등 분포
        'n_estimators': randint(100, 1000),
        'subsample': uniform(0.6, 0.4),           # [0.6, 1.0]
        'colsample_bytree': uniform(0.6, 0.4),
        'reg_alpha': loguniform(1e-4, 10),
        'reg_lambda': loguniform(1e-4, 10),
        'min_child_weight': randint(1, 10),
        'gamma': uniform(0, 0.5),
    }

    model = xgb.XGBClassifier(random_state=42, n_jobs=-1)
    random_search = RandomizedSearchCV(
        model, param_distributions,
        n_iter=n_iter,   # 시도할 조합 수
        cv=5,
        scoring='roc_auc',
        n_jobs=-1,
        verbose=1,
        random_state=42,
        refit=True
    )
    random_search.fit(X_train, y_train)

    print(f"최적 파라미터: {random_search.best_params_}")
    print(f"최적 CV 점수: {random_search.best_score_:.4f}")

    return random_search.best_estimator_

베이지안 최적화 (Bayesian Optimization)

베이지안 최적화는 이전 평가 결과를 활용하여 다음 탐색 지점을 지능적으로 선택합니다.

핵심 구성요소:

  1. 대리 모델 (Surrogate Model): 목적 함수의 확률적 근사 (주로 가우시안 프로세스)
  2. 획득 함수 (Acquisition Function): 다음 탐색 지점 결정
    • EI (Expected Improvement): 현재 최솟값 대비 기댓값 개선
    • UCB (Upper Confidence Bound): 탐색/활용 균형
    • PI (Probability of Improvement): 개선 확률

TPE (Tree-structured Parzen Estimator):

  • Optuna에서 기본으로 사용하는 알고리즘
  • p(x|y) 대신 두 개의 밀도 모델 l(x), g(x)를 학습
  • 좋은 결과(상위 gamma%)를 낸 파라미터의 분포 l(x)와 나머지 g(x) 모델링
  • EI를 극대화하는 파라미터는 l(x)/g(x) 비율이 큰 지점

3. Optuna

Optuna 기본 개념

Optuna는 Preferred Networks에서 개발한 하이퍼파라미터 최적화 프레임워크로, Python-native이고 사용하기 매우 쉽습니다.

핵심 개념:

  • Study: 최적화 실험 전체 (여러 Trial의 집합)
  • Trial: 단일 하이퍼파라미터 조합 시도
  • Objective Function: 최적화할 목적 함수 (minimize 또는 maximize)
  • Sampler: 파라미터 제안 알고리즘 (TPE, CMA-ES, 랜덤 등)
  • Pruner: 유망하지 않은 Trial 조기 종료
pip install optuna optuna-dashboard
import optuna
from optuna.samplers import TPESampler, CmaEsSampler, RandomSampler
from optuna.pruners import MedianPruner, HyperbandPruner
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import numpy as np

# 로그 레벨 설정
optuna.logging.set_verbosity(optuna.logging.WARNING)

def objective_lgbm(trial, X, y):
    """LightGBM 최적화를 위한 Optuna 목적 함수"""

    # 하이퍼파라미터 탐색 공간 정의
    params = {
        'objective': 'binary',
        'metric': 'auc',
        'verbosity': -1,
        'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt', 'dart']),
        'num_leaves': trial.suggest_int('num_leaves', 20, 300),
        'max_depth': trial.suggest_int('max_depth', 3, 12),
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
        'learning_rate': trial.suggest_float('learning_rate', 1e-4, 0.3, log=True),
        'n_estimators': trial.suggest_int('n_estimators', 100, 2000),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
        'min_split_gain': trial.suggest_float('min_split_gain', 0, 1),
        'cat_smooth': trial.suggest_int('cat_smooth', 1, 100),
        'n_jobs': -1,
    }

    # 5-Fold CV로 평가
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_scores = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

        train_data = lgb.Dataset(X_train, y_train)
        val_data = lgb.Dataset(X_val, y_val, reference=train_data)

        callbacks = [
            lgb.early_stopping(stopping_rounds=50, verbose=False),
            lgb.log_evaluation(-1),
        ]

        model = lgb.train(
            params, train_data,
            num_boost_round=params['n_estimators'],
            valid_sets=[val_data],
            callbacks=callbacks,
        )

        preds = model.predict(X_val)
        fold_score = roc_auc_score(y_val, preds)
        cv_scores.append(fold_score)

        # 가지치기 (pruning)
        trial.report(fold_score, fold)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return np.mean(cv_scores)

def run_optuna_study(X, y, n_trials=100, n_jobs=1):
    """Optuna Study 실행"""

    # Sampler 선택
    sampler = TPESampler(
        n_startup_trials=20,       # 초기 랜덤 탐색 횟수
        n_ei_candidates=24,        # EI 후보 수
        multivariate=True,         # 다변량 TPE
        seed=42
    )

    # Pruner 설정
    pruner = MedianPruner(
        n_startup_trials=5,        # 가지치기 시작 전 최소 Trial 수
        n_warmup_steps=10,         # 가지치기 전 warm-up 스텝
        interval_steps=1
    )

    study = optuna.create_study(
        direction='maximize',      # AUC 최대화
        sampler=sampler,
        pruner=pruner,
        study_name='lgbm_optimization',
        # storage='sqlite:///optuna.db',   # 결과 영구 저장
        # load_if_exists=True,             # 기존 Study 재개
    )

    # 최적화 실행
    study.optimize(
        lambda trial: objective_lgbm(trial, X, y),
        n_trials=n_trials,
        n_jobs=n_jobs,             # 병렬 실행 (주의: DB storage 필요)
        show_progress_bar=True,
        callbacks=[
            lambda study, trial: print(
                f"Trial {trial.number}: {trial.value:.4f} "
                f"(Best: {study.best_value:.4f})"
            ) if trial.value is not None else None
        ],
    )

    print(f"\n최적 파라미터:")
    for key, value in study.best_params.items():
        print(f"  {key}: {value}")
    print(f"최적 AUC: {study.best_value:.4f}")
    print(f"완료된 Trial: {len(study.trials)}")
    print(f"가지치기된 Trial: {len([t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED])}")

    return study

# Optuna 시각화
def visualize_optuna_study(study):
    """Optuna 최적화 결과 시각화"""
    import optuna.visualization as vis

    # 최적화 히스토리
    fig1 = vis.plot_optimization_history(study)
    fig1.show()

    # 파라미터 중요도
    fig2 = vis.plot_param_importances(study)
    fig2.show()

    # 파라미터 관계
    fig3 = vis.plot_parallel_coordinate(study)
    fig3.show()

    # 파라미터 슬라이스
    fig4 = vis.plot_slice(study)
    fig4.show()

    # 등고선 플롯
    fig5 = vis.plot_contour(study, params=['learning_rate', 'num_leaves'])
    fig5.show()

PyTorch + Optuna 완전한 예제

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna

def create_model(trial, input_dim):
    """Optuna Trial에서 신경망 아키텍처 동적 생성"""
    n_layers = trial.suggest_int('n_layers', 1, 4)
    dropout = trial.suggest_float('dropout', 0.1, 0.5)
    activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'elu'])

    activation_map = {
        'relu': nn.ReLU(),
        'tanh': nn.Tanh(),
        'elu': nn.ELU()
    }

    layers = []
    in_features = input_dim

    for i in range(n_layers):
        out_features = trial.suggest_int(f'n_units_l{i}', 32, 512)
        layers.extend([
            nn.Linear(in_features, out_features),
            nn.BatchNorm1d(out_features),
            activation_map[activation_name],
            nn.Dropout(dropout),
        ])
        in_features = out_features

    layers.append(nn.Linear(in_features, 1))
    layers.append(nn.Sigmoid())

    return nn.Sequential(*layers)

def objective_pytorch(trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim):
    """PyTorch 신경망 Optuna 목적 함수"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    model = create_model(trial, input_dim).to(device)

    # 최적화 파라미터
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
    weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)

    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    elif optimizer_name == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
    else:
        optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay,
                              momentum=0.9)

    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
    criterion = nn.BCELoss()

    train_dataset = TensorDataset(
        X_train_t.to(device), y_train_t.to(device).float().unsqueeze(1)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    best_val_loss = float('inf')
    patience = 10
    patience_counter = 0

    for epoch in range(100):
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            output = model(X_batch)
            loss = criterion(output, y_batch)
            loss.backward()
            optimizer.step()
        scheduler.step()

        # 검증
        model.eval()
        with torch.no_grad():
            val_output = model(X_val_t.to(device))
            val_loss = criterion(
                val_output, y_val_t.to(device).float().unsqueeze(1)
            ).item()

        # 조기 가지치기 보고
        trial.report(val_loss, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                break

    return best_val_loss

def run_pytorch_optuna(X_train, y_train, X_val, y_val, n_trials=50):
    """PyTorch + Optuna 통합 실행"""
    X_train_t = torch.FloatTensor(X_train.values if hasattr(X_train, 'values') else X_train)
    y_train_t = torch.FloatTensor(y_train.values if hasattr(y_train, 'values') else y_train)
    X_val_t = torch.FloatTensor(X_val.values if hasattr(X_val, 'values') else X_val)
    y_val_t = torch.FloatTensor(y_val.values if hasattr(y_val, 'values') else y_val)

    input_dim = X_train_t.shape[1]

    study = optuna.create_study(
        direction='minimize',
        pruner=optuna.pruners.HyperbandPruner(
            min_resource=5, max_resource=100, reduction_factor=3
        ),
        sampler=TPESampler(seed=42)
    )

    study.optimize(
        lambda trial: objective_pytorch(
            trial, X_train_t, y_train_t, X_val_t, y_val_t, input_dim
        ),
        n_trials=n_trials,
        show_progress_bar=True,
    )

    print(f"최적 검증 손실: {study.best_value:.4f}")
    print(f"최적 파라미터: {study.best_params}")

    return study

CMA-ES Sampler

# 연속 공간에서 더 효율적인 CMA-ES
study_cmaes = optuna.create_study(
    direction='maximize',
    sampler=CmaEsSampler(
        n_startup_trials=10,    # 초기 랜덤 탐색 수
        restart_strategy='ipop', # 재시작 전략
        seed=42
    )
)

4. Ray Tune

Ray Tune과 분산 HPO

Ray Tune은 Anyscale이 개발한 분산 하이퍼파라미터 최적화 라이브러리입니다. 여러 GPU/노드에 걸쳐 병렬 학습을 자동으로 처리합니다.

pip install ray[tune] ray[air]
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, HyperBandScheduler
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.bayesopt import BayesOptSearch
import torch
import torch.nn as nn

ray.init(ignore_reinit_error=True)

def train_with_tune(config, data=None):
    """Ray Tune 학습 함수"""
    X_train, y_train, X_val, y_val = data

    # 모델 생성
    model = nn.Sequential(
        nn.Linear(X_train.shape[1], config['hidden_size']),
        nn.ReLU(),
        nn.Dropout(config['dropout']),
        nn.Linear(config['hidden_size'], config['hidden_size'] // 2),
        nn.ReLU(),
        nn.Linear(config['hidden_size'] // 2, 1),
        nn.Sigmoid()
    )

    optimizer = torch.optim.Adam(
        model.parameters(),
        lr=config['lr'],
        weight_decay=config['weight_decay']
    )
    criterion = nn.BCELoss()

    X_train_t = torch.FloatTensor(X_train)
    y_train_t = torch.FloatTensor(y_train).unsqueeze(1)
    X_val_t = torch.FloatTensor(X_val)
    y_val_t = torch.FloatTensor(y_val).unsqueeze(1)

    for epoch in range(config['max_epochs']):
        model.train()
        optimizer.zero_grad()
        output = model(X_train_t)
        loss = criterion(output, y_train_t)
        loss.backward()
        optimizer.step()

        if epoch % 5 == 0:
            model.eval()
            with torch.no_grad():
                val_output = model(X_val_t)
                val_loss = criterion(val_output, y_val_t).item()

            # Ray Tune에 중간 결과 보고
            tune.report(
                val_loss=val_loss,
                training_iteration=epoch,
            )

def run_ray_tune(X_train, y_train, X_val, y_val, num_samples=50):
    """Ray Tune 분산 HPO 실행"""

    config = {
        'hidden_size': tune.choice([64, 128, 256, 512]),
        'dropout': tune.uniform(0.1, 0.5),
        'lr': tune.loguniform(1e-5, 1e-1),
        'weight_decay': tune.loguniform(1e-8, 1e-3),
        'max_epochs': tune.choice([50, 100, 200]),
    }

    # ASHA (Asynchronous Successive Halving Algorithm) 스케줄러
    scheduler = ASHAScheduler(
        metric='val_loss',
        mode='min',
        max_t=200,              # 최대 에폭
        grace_period=10,        # 최소 실행 에폭
        reduction_factor=3,     # 제거 비율
    )

    reporter = CLIReporter(
        metric_columns=['val_loss', 'training_iteration'],
        max_progress_rows=10
    )

    # OptunaSearch 통합
    search_alg = OptunaSearch(metric='val_loss', mode='min')

    result = tune.run(
        tune.with_parameters(
            train_with_tune,
            data=(X_train, y_train, X_val, y_val)
        ),
        config=config,
        num_samples=num_samples,
        scheduler=scheduler,
        search_alg=search_alg,
        progress_reporter=reporter,
        verbose=1,
        resources_per_trial={'cpu': 2, 'gpu': 0},
    )

    best_trial = result.get_best_trial('val_loss', 'min', 'last')
    print(f"최적 검증 손실: {best_trial.last_result['val_loss']:.4f}")
    print(f"최적 파라미터: {best_trial.config}")

    return result

# Population Based Training (PBT)
from ray.tune.schedulers import PopulationBasedTraining

def run_pbt(X_train, y_train, X_val, y_val):
    """PBT: 학습 중 하이퍼파라미터를 동적으로 변경"""

    pbt_scheduler = PopulationBasedTraining(
        time_attr='training_iteration',
        metric='val_loss',
        mode='min',
        perturbation_interval=20,    # N 스텝마다 변이
        hyperparam_mutations={
            'lr': tune.loguniform(1e-5, 1e-1),
            'dropout': tune.uniform(0.1, 0.5),
        },
        quantile_fraction=0.25,      # 상위 25%의 파라미터로 하위 25% 교체
    )

    result = tune.run(
        tune.with_parameters(
            train_with_tune,
            data=(X_train, y_train, X_val, y_val)
        ),
        config={
            'hidden_size': 256,
            'dropout': tune.uniform(0.1, 0.5),
            'lr': tune.loguniform(1e-4, 1e-1),
            'weight_decay': 1e-5,
            'max_epochs': 200,
        },
        num_samples=8,
        scheduler=pbt_scheduler,
        verbose=1,
    )

    return result

5. AutoGluon

AutoGluon 개요

AutoGluon은 Amazon이 개발한 오픈소스 AutoML 라이브러리로, 최소한의 코드로 Kaggle 수준의 성능을 달성합니다.

pip install autogluon

표 형식 데이터 (TabularPredictor)

from autogluon.tabular import TabularPredictor
import pandas as pd

def autogluon_tabular_example(train_df, test_df, target_col, eval_metric='roc_auc'):
    """AutoGluon 표 형식 데이터 학습"""

    # 기본 사용법 (3줄로 학습!)
    predictor = TabularPredictor(
        label=target_col,
        eval_metric=eval_metric,
        path='autogluon_models/',     # 모델 저장 경로
        problem_type='binary',        # 'binary', 'multiclass', 'regression', 'softclass'
    )

    predictor.fit(
        train_data=train_df,
        time_limit=3600,              # 최대 학습 시간 (초)
        presets='best_quality',       # 품질 프리셋
        # 'best_quality': 최고 성능 (느림)
        # 'good_quality': 좋은 성능/속도 균형
        # 'medium_quality': 빠른 학습
        # 'optimize_for_deployment': 빠른 예측에 최적화
        excluded_model_types=['KNN'], # 제외할 모델 타입
        verbosity=2,
    )

    # 리더보드 출력
    leaderboard = predictor.leaderboard(test_df, silent=True)
    print(leaderboard[['model', 'score_test', 'score_val', 'pred_time_test']].head(10))

    # 예측
    predictions = predictor.predict(test_df)
    pred_proba = predictor.predict_proba(test_df)

    # 특성 중요도
    feature_importance = predictor.feature_importance(test_df)
    print(feature_importance.head(20))

    return predictor, predictions, pred_proba

# 고급 설정: 하이퍼파라미터 커스터마이징
def autogluon_advanced(train_df, test_df, target_col):
    """AutoGluon 고급 설정"""
    import lightgbm as lgb
    import xgboost as xgb

    hyperparameters = {
        'GBM': [
            {'num_boost_round': 300, 'ag_args': {'name_suffix': 'fast'}},
            {'num_boost_round': 1000, 'learning_rate': 0.03,
             'ag_args': {'name_suffix': 'slow', 'priority': 0}},
        ],
        'XGB': [
            {'n_estimators': 300, 'max_depth': 6},
        ],
        'CAT': [
            {'iterations': 500, 'depth': 6},
        ],
        'NN_TORCH': [
            {'num_epochs': 50, 'learning_rate': 1e-3,
             'dropout_prob': 0.1},
        ],
        'RF': [
            {'n_estimators': 300},
        ],
    }

    predictor = TabularPredictor(
        label=target_col,
        eval_metric='roc_auc',
        path='autogluon_advanced/',
    )

    predictor.fit(
        train_data=train_df,
        hyperparameters=hyperparameters,
        time_limit=7200,
        num_stack_levels=1,       # 스태킹 레벨 수
        num_bag_folds=5,          # 배깅 폴드 수 (0이면 배깅 없음)
        num_bag_sets=1,           # 배깅 세트 수
        verbosity=3,
    )

    return predictor

이미지 분류

from autogluon.multimodal import MultiModalPredictor

def autogluon_image_classification(train_df, test_df, label_col, image_col):
    """AutoGluon 이미지 분류"""
    predictor = MultiModalPredictor(label=label_col)

    predictor.fit(
        train_data=train_df,
        time_limit=3600,
        hyperparameters={
            'model.timm_image.checkpoint_name': 'efficientnet_b4',
            'optimization.learning_rate': 1e-4,
            'optimization.max_epochs': 20,
        }
    )

    predictions = predictor.predict(test_df)
    return predictor, predictions

# 텍스트 + 표 형식 멀티모달
def autogluon_multimodal(train_df, test_df, target_col):
    """AutoGluon 멀티모달 학습 (텍스트 + 수치형 특성)"""
    predictor = MultiModalPredictor(
        label=target_col,
        problem_type='binary',
    )

    predictor.fit(
        train_data=train_df,
        time_limit=3600,
        hyperparameters={
            'model.hf_text.checkpoint_name': 'bert-base-uncased',
        }
    )

    return predictor

6. FLAML

Microsoft FLAML

FLAML(Fast and Lightweight AutoML)은 Microsoft Research에서 개발한 AutoML 라이브러리로, 비용 효율적인 자동화에 특화되어 있습니다.

pip install flaml
from flaml import AutoML
import pandas as pd
import numpy as np

def flaml_basic_example(X_train, y_train, X_test, task='classification'):
    """FLAML 기본 사용"""
    automl = AutoML()

    automl_settings = {
        'time_budget': 300,           # 최대 학습 시간 (초)
        'metric': 'roc_auc',          # 최적화 지표
        'task': task,                 # 'classification', 'regression', 'ranking'
        'estimator_list': [           # 시도할 모델 목록
            'lgbm', 'xgboost', 'catboost',
            'rf', 'extra_tree', 'lrl1', 'lrl2',
            'kneighbor', 'prophet', 'arima'
        ],
        'log_file_name': 'flaml_log.log',
        'seed': 42,
        'n_jobs': -1,
        'verbose': 1,

        # 비용 효율적 탐색 설정
        'retrain_full': True,         # 최종 모델을 전체 데이터로 재학습
        'max_iter': 100,              # 최대 반복 수
        'ensemble': True,             # 앙상블 사용 여부
        'eval_method': 'cv',          # 'cv' 또는 'holdout'
        'n_splits': 5,                # CV 폴드 수
    }

    automl.fit(X_train, y_train, **automl_settings)

    print(f"최적 모델: {automl.best_estimator}")
    print(f"최적 손실: {automl.best_loss:.4f}")
    print(f"최적 설정: {automl.best_config}")
    print(f"총 학습 시간: {automl.time_to_find_best_model:.1f}초")

    predictions = automl.predict(X_test)
    pred_proba = automl.predict_proba(X_test)

    return automl, predictions, pred_proba

# FLAML + scikit-learn 파이프라인
def flaml_sklearn_pipeline(X_train, y_train, X_test):
    """FLAML을 scikit-learn 파이프라인에 통합"""
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler

    automl = AutoML()

    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('automl', automl),
    ])

    pipeline.fit(
        X_train, y_train,
        automl__time_budget=120,
        automl__metric='roc_auc',
        automl__task='classification',
    )

    return pipeline

# FLAML 커스텀 목적 함수
def flaml_custom_objective(X_train, y_train):
    """FLAML 커스텀 평가 지표"""
    from flaml.automl.data import get_output_from_log

    def custom_metric(
        X_val, y_val, estimator, labels, X_train, y_train,
        weight_val=None, weight_train=None, *args
    ):
        """F-beta 점수를 최적화하는 커스텀 지표"""
        from sklearn.metrics import fbeta_score
        y_pred = estimator.predict(X_val)
        score = fbeta_score(y_val, y_pred, beta=2, average='weighted')
        return -score, {'f2_score': score}  # (손실, 메트릭 딕셔너리)

    automl = AutoML()
    automl.fit(
        X_train, y_train,
        metric=custom_metric,
        task='classification',
        time_budget=120,
    )

    return automl

7. H2O AutoML

H2O 클러스터

H2O AutoML은 엔터프라이즈 수준의 AutoML 플랫폼으로, 다양한 알고리즘과 해석 가능성 도구를 제공합니다.

pip install h2o
import h2o
from h2o.automl import H2OAutoML
import pandas as pd

def h2o_automl_example(train_df, test_df, target_col, max_models=20):
    """H2O AutoML 사용"""

    # H2O 클러스터 시작
    h2o.init(
        nthreads=-1,              # 모든 CPU 사용
        max_mem_size='8G',        # 최대 메모리
        port=54321,
    )

    # H2O Frame으로 변환
    train_h2o = h2o.H2OFrame(train_df)
    test_h2o = h2o.H2OFrame(test_df)

    # 타겟 컬럼을 카테고리형으로 변환 (분류)
    train_h2o[target_col] = train_h2o[target_col].asfactor()

    feature_cols = [col for col in train_df.columns if col != target_col]

    # AutoML 실행
    aml = H2OAutoML(
        max_models=max_models,     # 시도할 최대 모델 수
        max_runtime_secs=3600,     # 최대 실행 시간
        seed=42,
        sort_metric='AUC',         # 정렬 기준 지표
        balance_classes=False,     # 클래스 불균형 처리

        # 포함/제외할 알고리즘
        include_algos=[
            'GBM', 'GLM', 'DRF', 'DeepLearning',
            'StackedEnsemble', 'XGBoost'
        ],
        # exclude_algos=['DeepLearning'],

        # 스태킹 설정
        keep_cross_validation_predictions=True,
        keep_cross_validation_models=True,
        nfolds=5,

        verbosity='info',
    )

    aml.fit(
        x=feature_cols,
        y=target_col,
        training_frame=train_h2o,
        validation_frame=None,     # None이면 CV 사용
        leaderboard_frame=test_h2o,
    )

    # 리더보드 출력
    lb = aml.leaderboard
    print("H2O AutoML 리더보드:")
    print(lb.head(20))

    # 최적 모델
    best_model = aml.leader
    print(f"\n최적 모델: {best_model.model_id}")

    # 예측
    predictions = best_model.predict(test_h2o)
    pred_df = predictions.as_data_frame()

    # 모델 설명
    explainability = aml.explain(test_h2o)

    # SHAP 값 (지원하는 모델)
    if hasattr(best_model, 'shap_values'):
        shap_vals = best_model.shap_values(test_h2o)

    # 모델 저장
    model_path = h2o.save_model(model=best_model, path='h2o_models/', force=True)
    print(f"모델 저장됨: {model_path}")

    return aml, best_model, pred_df

# H2O 종료
def cleanup_h2o():
    h2o.cluster().shutdown()

8. 신경망 구조 탐색 (NAS)

NAS 개요

Neural Architecture Search(NAS)는 최적의 신경망 아키텍처를 자동으로 탐색하는 기술입니다.

NAS의 세 가지 구성요소:

  1. 탐색 공간 (Search Space): 가능한 아키텍처의 범위
  2. 탐색 전략 (Search Strategy): 공간 탐색 방법 (랜덤, 진화, RL, 그래디언트)
  3. 성능 평가 (Performance Estimation): 아키텍처 평가 방법

DARTS (미분 가능한 NAS)

DARTS(Differentiable Architecture Search, Liu et al. 2019)는 아키텍처 탐색을 연속 완화(continuous relaxation)를 통해 미분 가능하게 만듭니다.

import torch
import torch.nn as nn
import torch.nn.functional as F

class MixedOperation(nn.Module):
    """DARTS의 혼합 연산 (가중 합)"""
    def __init__(self, operations):
        super().__init__()
        self.ops = nn.ModuleList(operations)
        # 아키텍처 파라미터 (alpha)
        self.alphas = nn.Parameter(torch.randn(len(operations)))

    def forward(self, x):
        weights = F.softmax(self.alphas, dim=0)
        return sum(w * op(x) for w, op in zip(weights, self.ops))

class DARTSCell(nn.Module):
    """DARTS 셀 구조"""
    def __init__(self, in_channels, out_channels):
        super().__init__()

        # 사용 가능한 연산 정의
        operations = [
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.Conv2d(in_channels, out_channels, 5, padding=2),
            nn.MaxPool2d(3, stride=1, padding=1),
            nn.AvgPool2d(3, stride=1, padding=1),
            nn.Identity() if in_channels == out_channels else
                nn.Conv2d(in_channels, out_channels, 1),
        ]

        self.mixed_op = MixedOperation(operations)
        self.bn = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        return F.relu(self.bn(self.mixed_op(x)))

class SimpleDARTS(nn.Module):
    """간단한 DARTS 아키텍처"""
    def __init__(self, num_classes=10, num_cells=6):
        super().__init__()
        self.stem = nn.Conv2d(3, 64, 3, padding=1)

        self.cells = nn.ModuleList([
            DARTSCell(64, 64) for _ in range(num_cells)
        ])

        self.classifier = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.stem(x)
        for cell in self.cells:
            x = cell(x)
        x = x.mean([2, 3])  # Global Average Pooling
        return self.classifier(x)

    def arch_parameters(self):
        """아키텍처 파라미터만 반환"""
        return [p for n, p in self.named_parameters() if 'alphas' in n]

    def model_parameters(self):
        """가중치 파라미터만 반환"""
        return [p for n, p in self.named_parameters() if 'alphas' not in n]

def train_darts(model, train_loader, val_loader, epochs=50):
    """DARTS 이중 최적화 학습"""
    # 가중치 최적화기
    w_optimizer = torch.optim.SGD(
        model.model_parameters(), lr=0.025, momentum=0.9, weight_decay=3e-4
    )
    # 아키텍처 파라미터 최적화기
    a_optimizer = torch.optim.Adam(
        model.arch_parameters(), lr=3e-4, betas=(0.5, 0.999), weight_decay=1e-3
    )

    w_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
        w_optimizer, T_max=epochs
    )

    for epoch in range(epochs):
        model.train()
        train_iter = iter(train_loader)
        val_iter = iter(val_loader)

        for step in range(min(len(train_loader), len(val_loader))):
            # 1. 아키텍처 파라미터 업데이트 (검증 데이터)
            try:
                X_val, y_val = next(val_iter)
            except StopIteration:
                val_iter = iter(val_loader)
                X_val, y_val = next(val_iter)

            a_optimizer.zero_grad()
            val_logits = model(X_val)
            val_loss = F.cross_entropy(val_logits, y_val)
            val_loss.backward()
            a_optimizer.step()

            # 2. 가중치 파라미터 업데이트 (훈련 데이터)
            X_train, y_train = next(train_iter)

            w_optimizer.zero_grad()
            train_logits = model(X_train)
            train_loss = F.cross_entropy(train_logits, y_train)
            train_loss.backward()
            nn.utils.clip_grad_norm_(model.model_parameters(), 5.0)
            w_optimizer.step()

        w_scheduler.step()

        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss = {train_loss.item():.4f}")

    # 최적 아키텍처 추출
    for i, cell in enumerate(model.cells):
        weights = F.softmax(cell.mixed_op.alphas, dim=0).detach()
        best_op = weights.argmax().item()
        print(f"Cell {i}: 최적 연산 인덱스 = {best_op}, 가중치 = {weights}")

    return model

One-Shot NAS

class SuperNetwork(nn.Module):
    """One-Shot NAS: 단일 슈퍼네트워크에서 서브네트워크 샘플링"""

    def __init__(self, num_classes=10, max_channels=256):
        super().__init__()
        self.max_channels = max_channels

        # 가능한 채널 수 옵션
        self.channel_options = [64, 128, 256]

        # 전체 파라미터로 레이어 정의
        self.conv1 = nn.Conv2d(3, max_channels, 3, padding=1)
        self.conv2 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
        self.conv3 = nn.Conv2d(max_channels, max_channels, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(max_channels)
        self.bn2 = nn.BatchNorm2d(max_channels)
        self.bn3 = nn.BatchNorm2d(max_channels)
        self.classifier = nn.Linear(max_channels, num_classes)

    def forward(self, x, arch_config=None):
        """arch_config: 각 레이어의 채널 수"""
        if arch_config is None:
            # 무작위 서브네트워크 샘플링
            arch_config = {
                'conv1_out': torch.randint(0, len(self.channel_options), (1,)).item(),
                'conv2_out': torch.randint(0, len(self.channel_options), (1,)).item(),
            }

        # 슬라이스된 채널로 순전파
        c1 = self.channel_options[arch_config['conv1_out']]
        c2 = self.channel_options[arch_config['conv2_out']]

        x = F.relu(self.bn1(self.conv1(x)[:, :c1]))
        x = F.relu(self.bn2(self.conv2(
            F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c1))
        )[:, :c2]))
        x = F.relu(self.bn3(self.conv3(
            F.pad(x, (0, 0, 0, 0, 0, self.max_channels - c2))
        )))
        x = x.mean([2, 3])
        return self.classifier(x)

9. 파이프라인 자동화

Auto-sklearn

pip install auto-sklearn
import autosklearn.classification
import autosklearn.regression
from autosklearn.metrics import roc_auc, mean_squared_error

def auto_sklearn_example(X_train, y_train, X_test, task='classification'):
    """Auto-sklearn: scikit-learn 기반 AutoML"""

    if task == 'classification':
        automl = autosklearn.classification.AutoSklearnClassifier(
            time_left_for_this_task=3600,    # 전체 시간 제한 (초)
            per_run_time_limit=360,          # 단일 모델 시간 제한
            n_jobs=-1,
            memory_limit=8192,               # 메모리 제한 (MB)
            ensemble_size=50,                # 앙상블 크기
            ensemble_nbest=50,               # 앙상블에 사용할 최적 모델 수
            max_models_on_disc=50,

            # 포함/제외 알고리즘
            include={
                'classifier': [
                    'random_forest', 'gradient_boosting',
                    'extra_trees', 'liblinear_svc'
                ]
            },
            # exclude={'classifier': ['k_nearest_neighbors']},

            # 메트릭
            metric=roc_auc,
            resampling_strategy='cv',
            resampling_strategy_arguments={'folds': 5},

            seed=42,
        )
    else:
        automl = autosklearn.regression.AutoSklearnRegressor(
            time_left_for_this_task=3600,
            per_run_time_limit=360,
            n_jobs=-1,
            metric=mean_squared_error,
            seed=42,
        )

    automl.fit(X_train, y_train)

    # 통계 출력
    print(automl.sprint_statistics())
    print(automl.leaderboard())

    predictions = automl.predict(X_test)
    if task == 'classification':
        pred_proba = automl.predict_proba(X_test)

    return automl, predictions

10. LLM 시대의 AutoML

LLM을 AutoML에 활용

대규모 언어 모델(LLM)은 AutoML에 새로운 가능성을 열고 있습니다:

  1. 하이퍼파라미터 제안: LLM이 데이터셋 특성에 맞는 초기 하이퍼파라미터 추천
  2. 특성 엔지니어링: LLM이 도메인 지식을 활용하여 새로운 특성 생성 아이디어 제안
  3. 코드 생성: 전처리, 모델 학습 코드 자동 생성
  4. 에러 디버깅: 학습 실패 원인 분석 및 해결책 제안
# LLM 기반 하이퍼파라미터 최적화 (개념 코드)
from openai import OpenAI

def llm_hyperparameter_suggestion(dataset_description, model_type, previous_results=None):
    """LLM을 활용한 하이퍼파라미터 제안"""
    client = OpenAI()

    prompt = f"""
    데이터셋 특성:
    {dataset_description}

    모델 타입: {model_type}

    이전 시도 결과:
    {previous_results if previous_results else '없음'}

    위 정보를 바탕으로 {model_type}의 최적 하이퍼파라미터를 JSON 형식으로 제안해주세요.
    """

    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system",
             "content": "당신은 머신러닝 전문가입니다. 하이퍼파라미터 최적화를 도와주세요."},
            {"role": "user", "content": prompt}
        ],
        response_format={"type": "json_object"}
    )

    return response.choices[0].message.content

# Few-shot learning을 활용한 AutoML
def llm_feature_engineering(df_description, target_description):
    """LLM이 제안하는 피처 엔지니어링"""
    client = OpenAI()

    prompt = f"""
    데이터프레임 컬럼:
    {df_description}

    예측 목표:
    {target_description}

    유용할 수 있는 파생 특성과 해당 Python 코드를 제안해주세요.
    """

    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system",
             "content": "머신러닝 특성 엔지니어링 전문가로서 유용한 파생 특성을 제안하세요."},
            {"role": "user", "content": prompt}
        ]
    )

    return response.choices[0].message.content

# AutoML 에이전트 (실험적)
class AutoMLAgent:
    """LLM 기반 AutoML 에이전트 (개념 구현)"""

    def __init__(self, llm_client, X_train, y_train, X_val, y_val, max_iterations=10):
        self.client = llm_client
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.max_iterations = max_iterations
        self.history = []
        self.best_score = 0
        self.best_params = None

    def get_next_config(self):
        """LLM에게 다음 시도할 설정 요청"""
        history_str = "\n".join([
            f"Iteration {i+1}: params={h['params']}, score={h['score']:.4f}"
            for i, h in enumerate(self.history[-5:])  # 최근 5개
        ])

        prompt = f"""
        현재까지 시도한 LightGBM 파라미터와 결과:
        {history_str if history_str else '없음 (첫 번째 시도)'}

        다음에 시도할 파라미터 조합을 JSON으로 제안하세요.
        가능한 범위: num_leaves(10-300), learning_rate(0.001-0.3),
        n_estimators(100-2000), subsample(0.5-1.0), colsample_bytree(0.5-1.0)
        """

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "당신은 HPO 전문가입니다."},
                {"role": "user", "content": prompt}
            ],
            response_format={"type": "json_object"}
        )

        import json
        return json.loads(response.choices[0].message.content)

    def evaluate(self, params):
        """파라미터 평가"""
        import lightgbm as lgb
        from sklearn.metrics import roc_auc_score

        model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
        model.fit(self.X_train, self.y_train)
        preds = model.predict_proba(self.X_val)[:, 1]
        return roc_auc_score(self.y_val, preds)

    def run(self):
        """에이전트 실행"""
        for i in range(self.max_iterations):
            config = self.get_next_config()
            score = self.evaluate(config)
            self.history.append({'params': config, 'score': score})

            if score > self.best_score:
                self.best_score = score
                self.best_params = config
                print(f"Iteration {i+1}: 새로운 최적 점수 {score:.4f}")

        print(f"\n최적 점수: {self.best_score:.4f}")
        print(f"최적 파라미터: {self.best_params}")
        return self.best_params

마무리

이 가이드에서 AutoML의 전체 생태계를 다루었습니다:

  1. 하이퍼파라미터 최적화: 그리드 서치에서 베이지안 최적화까지 체계적 이해
  2. Optuna: 가장 유연한 Python-native HPO 프레임워크
  3. Ray Tune: 분산 환경에서의 대규모 HPO
  4. AutoGluon: Amazon의 강력한 멀티모달 AutoML
  5. FLAML: Microsoft의 비용 효율적 AutoML
  6. H2O AutoML: 기업용 AutoML 플랫폼
  7. NAS: 최적 신경망 아키텍처 자동 탐색
  8. LLM + AutoML: 차세대 지능형 AutoML

핵심 권장 사항:

  • 시간 제약이 있다면 FLAML 또는 AutoGluon (good_quality 프리셋) 사용
  • 특정 모델을 최적화하려면 Optuna 사용
  • 분산 환경이나 대규모 실험에는 Ray Tune 사용
  • 기업 환경에서는 H2O AutoML의 해석 가능성 도구 활용
  • LLM 기반 AutoML은 아직 연구 단계지만 주목해야 할 방향

AutoML은 도구이지 마법이 아닙니다. 도메인 지식, 데이터 품질, 올바른 평가 프레임워크가 여전히 가장 중요합니다.

참고자료