Skip to content

Split View: 표 형식 데이터 ML 완전 가이드: XGBoost, LightGBM, CatBoost, TabNet 마스터하기

|

표 형식 데이터 ML 완전 가이드: XGBoost, LightGBM, CatBoost, TabNet 마스터하기

1. 표 형식 데이터 ML 개요

왜 트리 기반 모델이 표 형식 데이터에 강한가?

표 형식(tabular) 데이터는 행과 열로 구성된 전통적인 데이터 형식으로, 현실 세계 비즈니스 문제의 80% 이상을 차지합니다. 금융 사기 탐지, 고객 이탈 예측, 부동산 가격 예측, 의료 진단 등 다양한 도메인에서 사용됩니다.

딥러닝이 이미지, 텍스트, 음성 분야에서 혁명을 일으켰지만, 표 형식 데이터에서는 여전히 그래디언트 부스팅 트리(Gradient Boosted Trees) 계열 모델이 강세를 보입니다. 그 이유를 살펴보겠습니다:

트리 기반 모델의 강점:

  1. 불규칙한 경계면 학습: 트리는 특성 공간을 직교 경계면으로 분할하므로, 복잡한 비선형 관계를 자연스럽게 표현합니다.
  2. 스케일 불변성: 특성 정규화나 표준화 없이도 잘 작동합니다. 그래디언트 부스팅은 데이터의 순서(rank)만 사용하기 때문입니다.
  3. 결측치 처리: XGBoost, LightGBM, CatBoost 모두 결측치를 내부적으로 처리합니다.
  4. 범주형 특성: 레이블 인코딩만으로도 충분히 좋은 성능을 냅니다.
  5. 해석 가능성: 특성 중요도(feature importance)와 SHAP 값으로 모델을 설명할 수 있습니다.
  6. 과적합 저항성: 앙상블 방법은 단일 모델보다 과적합에 강합니다.

EDA (탐색적 데이터 분석) 전략

본격적인 모델 학습 전에 데이터를 깊이 이해하는 것이 중요합니다.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

# 기본 정보 확인
def eda_overview(df):
    print("=== 데이터 개요 ===")
    print(f"Shape: {df.shape}")
    print(f"\n데이터 타입:\n{df.dtypes}")
    print(f"\n결측치 현황:\n{df.isnull().sum()}")
    print(f"\n결측치 비율:\n{df.isnull().mean() * 100:.2f}%")
    print(f"\n수치형 통계:\n{df.describe()}")
    print(f"\n범주형 특성 고유값 수:")
    for col in df.select_dtypes(include='object').columns:
        print(f"  {col}: {df[col].nunique()} unique values")

# 타겟 분포 확인
def plot_target_distribution(df, target_col):
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    # 분포 히스토그램
    axes[0].hist(df[target_col], bins=50, color='steelblue', edgecolor='white')
    axes[0].set_title(f'{target_col} 분포')
    axes[0].set_xlabel(target_col)

    # QQ plot (정규성 확인)
    stats.probplot(df[target_col].dropna(), dist="norm", plot=axes[1])
    axes[1].set_title('Q-Q Plot (정규성 확인)')

    plt.tight_layout()
    plt.show()

# 수치형 특성 간 상관관계
def plot_correlation_matrix(df, target_col, top_n=20):
    numeric_df = df.select_dtypes(include=[np.number])

    # 타겟과의 상관관계 기준으로 상위 N개 선택
    corr_with_target = abs(numeric_df.corr()[target_col]).sort_values(ascending=False)
    top_features = corr_with_target.head(top_n + 1).index.tolist()

    corr_matrix = numeric_df[top_features].corr()

    plt.figure(figsize=(12, 10))
    mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
    sns.heatmap(
        corr_matrix, mask=mask, annot=True, fmt='.2f',
        cmap='RdYlBu_r', center=0, square=True, linewidths=0.5
    )
    plt.title(f'상위 {top_n}개 특성 상관관계')
    plt.tight_layout()
    plt.show()

# 이상치 탐지 (IQR 방법)
def detect_outliers_iqr(df, columns):
    outlier_info = {}
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower = Q1 - 1.5 * IQR
        upper = Q3 + 1.5 * IQR
        outliers = df[(df[col] < lower) | (df[col] > upper)]
        outlier_info[col] = {
            'count': len(outliers),
            'ratio': len(outliers) / len(df),
            'lower': lower,
            'upper': upper
        }
    return pd.DataFrame(outlier_info).T

결측치 처리 전략

from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

# 1. 단순 대체
def simple_imputation(df):
    # 수치형: 중앙값으로 대체
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    num_imputer = SimpleImputer(strategy='median')
    df[numeric_cols] = num_imputer.fit_transform(df[numeric_cols])

    # 범주형: 최빈값으로 대체
    cat_cols = df.select_dtypes(include='object').columns
    cat_imputer = SimpleImputer(strategy='most_frequent')
    df[cat_cols] = cat_imputer.fit_transform(df[cat_cols])

    return df

# 2. KNN 대체 (소규모 데이터셋에 효과적)
def knn_imputation(df, n_neighbors=5):
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    imputer = KNNImputer(n_neighbors=n_neighbors)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    return df

# 3. MICE (Multiple Imputation by Chained Equations)
def mice_imputation(df, max_iter=10):
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    imputer = IterativeImputer(max_iter=max_iter, random_state=42)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    return df

# 4. 결측 지시 변수 추가 (결측 패턴 자체가 정보일 때)
def add_missing_indicators(df):
    cols_with_missing = df.columns[df.isnull().any()].tolist()
    for col in cols_with_missing:
        df[f'{col}_missing'] = df[col].isnull().astype(int)
    return df

2. 결정 트리 (Decision Tree)

ID3와 CART 알고리즘

결정 트리는 데이터를 재귀적으로 분할하여 트리 구조를 만드는 알고리즘입니다. 두 가지 주요 알고리즘이 있습니다:

ID3 (Iterative Dichotomiser 3):

  • 정보 이득(Information Gain)을 기준으로 분할
  • 범주형 특성만 처리 (다중 분기)
  • 범주형 타겟에만 사용

CART (Classification and Regression Trees):

  • 분류: 지니 불순도(Gini Impurity)
  • 회귀: 평균 제곱 오차(MSE)
  • 이진 분기만 수행 (항상 두 자식 노드)
  • sklearn이 사용하는 알고리즘

정보 이득과 지니 불순도

엔트로피와 정보 이득:

엔트로피는 데이터의 불순도(impurity)를 나타냅니다. 클래스가 k개인 경우:

H(S) = -sum(p_i * log2(p_i))

정보 이득은 분할 전후 엔트로피 감소량입니다:

IG(S, A) = H(S) - sum(|S_v|/|S| * H(S_v))

지니 불순도:

Gini(S) = 1 - sum(p_i^2)
import numpy as np
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.tree import export_text, plot_tree
import matplotlib.pyplot as plt

# 결정 트리 학습
def train_decision_tree(X_train, y_train, task='classification'):
    if task == 'classification':
        model = DecisionTreeClassifier(
            criterion='gini',      # 'gini' 또는 'entropy'
            max_depth=5,           # 트리 최대 깊이
            min_samples_split=10,  # 분할에 필요한 최소 샘플
            min_samples_leaf=5,    # 리프 노드 최소 샘플
            max_features=None,     # 고려할 최대 특성 수
            random_state=42
        )
    else:
        model = DecisionTreeRegressor(
            criterion='squared_error',
            max_depth=5,
            min_samples_split=10,
            min_samples_leaf=5,
            random_state=42
        )

    model.fit(X_train, y_train)
    return model

# 트리 시각화
def visualize_tree(model, feature_names, class_names=None, max_depth=3):
    plt.figure(figsize=(20, 10))
    plot_tree(
        model,
        feature_names=feature_names,
        class_names=class_names,
        filled=True,
        rounded=True,
        max_depth=max_depth,
        fontsize=10
    )
    plt.title('Decision Tree Visualization')
    plt.tight_layout()
    plt.show()

    # 텍스트 형식 출력
    print(export_text(model, feature_names=feature_names, max_depth=3))

3. 랜덤 포레스트 (Random Forest)

배깅과 특성 무작위화

랜덤 포레스트는 배깅(Bootstrap Aggregating)과 특성 무작위화(Feature Randomization)를 결합한 앙상블 방법입니다.

핵심 아이디어:

  1. 훈련 데이터에서 부트스트랩 샘플(복원 추출)을 생성
  2. 각 샘플에서 독립적인 결정 트리 학습
  3. 각 분할 시 전체 특성 중 sqrt(n_features)개만 무작위 선택
  4. 예측 시 모든 트리의 결과를 평균(회귀) 또는 투표(분류)
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.model_selection import cross_val_score
import joblib

def train_random_forest(X_train, y_train, task='classification'):
    if task == 'classification':
        model = RandomForestClassifier(
            n_estimators=300,         # 트리 수
            max_depth=None,           # 무제한 (과적합 주의)
            min_samples_split=2,
            min_samples_leaf=1,
            max_features='sqrt',      # 각 분할에서 고려할 특성 수
            bootstrap=True,           # 부트스트랩 사용
            oob_score=True,          # OOB 점수 계산
            n_jobs=-1,               # 병렬 처리
            random_state=42
        )
    else:
        model = RandomForestRegressor(
            n_estimators=300,
            max_features='sqrt',
            oob_score=True,
            n_jobs=-1,
            random_state=42
        )

    model.fit(X_train, y_train)

    print(f"OOB Score: {model.oob_score_:.4f}")
    return model

# 특성 중요도 분석
def plot_feature_importance(model, feature_names, top_n=20):
    importances = pd.Series(
        model.feature_importances_,
        index=feature_names
    ).sort_values(ascending=False)

    plt.figure(figsize=(10, 8))
    importances.head(top_n).plot(kind='barh', color='steelblue')
    plt.title(f'랜덤 포레스트 특성 중요도 (상위 {top_n}개)')
    plt.xlabel('Importance')
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()

    return importances

# 순열 중요도 (더 신뢰할 수 있는 중요도)
from sklearn.inspection import permutation_importance

def compute_permutation_importance(model, X_val, y_val, feature_names):
    result = permutation_importance(
        model, X_val, y_val,
        n_repeats=10,
        random_state=42,
        n_jobs=-1
    )

    perm_imp = pd.DataFrame({
        'feature': feature_names,
        'importance_mean': result.importances_mean,
        'importance_std': result.importances_std
    }).sort_values('importance_mean', ascending=False)

    return perm_imp

4. 그래디언트 부스팅 (Gradient Boosting)

AdaBoost

AdaBoost는 부스팅의 원조로, 이전 모델이 틀린 샘플에 더 높은 가중치를 부여하면서 순차적으로 약한 학습기(weak learner)를 결합합니다.

그래디언트 부스팅의 수학적 원리

그래디언트 부스팅은 손실 함수의 그래디언트(기울기)를 최소화하는 방향으로 트리를 순차적으로 추가합니다.

각 단계에서는 이전 앙상블 예측에 새 트리를 learning rate만큼 더해 다음 앙상블을 만듭니다.

여기서는 m번째 단계의 트리가 손실 함수의 음의 그래디언트(pseudo-residual)에 피팅됩니다:

r_i = -[dL(y_i, F(x_i)) / dF(x_i)]

회귀에서 MSE 손실을 쓰면 의사 잔차는 타깃 값에서 현재 앙상블 예측값을 뺀 실제 잔차가 됩니다. 이진 분류에서 로그 손실을 쓰면 의사 잔차는 타깃 값에서 현재 앙상블 점수에 시그모이드를 적용한 값을 뺀 값이 됩니다.

from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor

def train_sklearn_gbm(X_train, y_train, task='classification'):
    if task == 'classification':
        model = GradientBoostingClassifier(
            n_estimators=200,
            learning_rate=0.1,
            max_depth=3,
            subsample=0.8,        # 각 트리에 사용할 샘플 비율
            max_features='sqrt',
            random_state=42
        )
    else:
        model = GradientBoostingRegressor(
            n_estimators=200,
            learning_rate=0.1,
            max_depth=3,
            subsample=0.8,
            random_state=42
        )

    model.fit(X_train, y_train)
    return model

5. XGBoost

XGBoost의 혁신

XGBoost(eXtreme Gradient Boosting)는 Chen & Guestrin(2016)이 개발한 고성능 그래디언트 부스팅 라이브러리입니다. 기존 GBM 대비 혁신적인 개선점을 제공합니다:

  1. 정규화 항 추가: 목적 함수에 L1(Lasso), L2(Ridge) 정규화 추가로 과적합 방지
  2. 2차 테일러 전개: 손실 함수를 2차까지 근사하여 더 정확한 트리 구조 탐색
  3. 결측치 처리: 결측 샘플을 좌/우 자식 중 어디로 보낼지 학습
  4. 병렬 처리: 특성 분할점 탐색 시 병렬 처리 (트리 구조 탐색은 순차적)
  5. 캐시 최적화: 메모리 접근 패턴 최적화로 속도 향상
  6. 블록 구조: 희소(sparse) 데이터 처리를 위한 압축 열 저장

완전한 하이퍼파라미터 설명

import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score

# XGBoost 완전 파라미터 가이드
xgb_params = {
    # === 학습 파라미터 ===
    'objective': 'binary:logistic',   # 목적 함수
    # 분류: 'binary:logistic', 'multi:softprob'
    # 회귀: 'reg:squarederror', 'reg:absoluteerror', 'reg:tweedie'
    # 랭킹: 'rank:pairwise'

    'eval_metric': 'auc',             # 평가 지표
    # 'logloss', 'rmse', 'mae', 'auc', 'aucpr', 'merror', 'mlogloss'

    # === 트리 파라미터 ===
    'n_estimators': 1000,             # 트리 수 (조기 종료와 함께 사용)
    'max_depth': 6,                   # 트리 최대 깊이 (기본: 6)
    'min_child_weight': 1,            # 리프 노드 최소 가중치 합 (과적합 방지)
    'gamma': 0,                       # 분기 시 최소 손실 감소량 (0이면 항상 분기)
    'max_delta_step': 0,              # 각 트리 가중치 최대 변화량 (불균형 데이터에 유용)

    # === 샘플링 파라미터 ===
    'subsample': 0.8,                 # 각 트리 학습 시 샘플 비율 (0.5~0.9)
    'colsample_bytree': 0.8,          # 트리당 특성 샘플링 비율
    'colsample_bylevel': 1.0,         # 트리 레벨당 특성 샘플링
    'colsample_bynode': 1.0,          # 분기 노드당 특성 샘플링

    # === 정규화 파라미터 ===
    'reg_alpha': 0,                   # L1 정규화 (특성 선택 효과)
    'reg_lambda': 1,                  # L2 정규화 (기본: 1)

    # === 학습률 ===
    'learning_rate': 0.01,            # 학습률 (eta). 낮을수록 과적합 방지

    # === 기타 ===
    'scale_pos_weight': 1,            # 불균형 데이터: 음성/양성 비율
    'tree_method': 'hist',            # 'hist' (빠름), 'exact', 'approx', 'gpu_hist'
    'seed': 42,
    'n_jobs': -1,
}

def train_xgboost_full(X, y, task='binary'):
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y if task=='binary' else None
    )

    if task == 'binary':
        objective = 'binary:logistic'
        eval_metric = 'auc'
    elif task == 'multiclass':
        objective = 'multi:softprob'
        eval_metric = 'mlogloss'
    else:  # regression
        objective = 'reg:squarederror'
        eval_metric = 'rmse'

    model = xgb.XGBClassifier(
        n_estimators=1000,
        max_depth=6,
        min_child_weight=1,
        gamma=0,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_alpha=0.1,
        reg_lambda=1.0,
        learning_rate=0.01,
        objective=objective,
        eval_metric=eval_metric,
        tree_method='hist',
        random_state=42,
        n_jobs=-1,
    )

    # 조기 종료 (early stopping)
    model.fit(
        X_train, y_train,
        eval_set=[(X_train, y_train), (X_val, y_val)],
        early_stopping_rounds=50,
        verbose=100,
    )

    print(f"Best iteration: {model.best_iteration}")
    print(f"Best score: {model.best_score:.4f}")

    return model, X_val, y_val

# SHAP 값으로 모델 해석
def explain_with_shap(model, X_val, feature_names):
    import shap

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_val)

    # 요약 플롯
    plt.figure(figsize=(10, 8))
    shap.summary_plot(shap_values, X_val, feature_names=feature_names, show=False)
    plt.tight_layout()
    plt.show()

    # 특성 중요도 (mean absolute SHAP)
    mean_abs_shap = pd.DataFrame({
        'feature': feature_names,
        'importance': np.abs(shap_values).mean(axis=0)
    }).sort_values('importance', ascending=False)

    return mean_abs_shap

GPU 지원 XGBoost

# GPU를 사용한 XGBoost (NVIDIA GPU 필요)
model_gpu = xgb.XGBClassifier(
    n_estimators=1000,
    tree_method='gpu_hist',    # GPU 히스토그램 방법
    predictor='gpu_predictor', # GPU 예측
    device='cuda',             # XGBoost 2.0+에서는 device='cuda'
    random_state=42
)

6. LightGBM

Leaf-wise vs Level-wise 트리 성장

LightGBM은 Microsoft가 개발한 고속 그래디언트 부스팅 프레임워크입니다. 가장 중요한 혁신은 Leaf-wise(Best-first) 트리 성장 방식입니다.

Level-wise (기존 방법):

  • 트리의 모든 리프를 동시에 분할 (깊이 기준)
  • 균형 잡힌 트리 생성
  • 계산이 낭비됨 (손실 개선이 적은 노드도 분할)

Leaf-wise (LightGBM):

  • 현재 모든 리프 중 손실 감소가 가장 큰 리프 하나만 분할
  • 불균형 트리 생성 가능
  • 동일한 리프 수에서 더 낮은 손실 달성
  • 과적합 주의 필요 (max_depth 설정 권장)

GOSS와 EFB

GOSS (Gradient-based One-Side Sampling):

  • 그래디언트 절댓값이 큰 샘플(정보량 많음)은 모두 유지
  • 그래디언트 절댓값이 작은 샘플(이미 잘 학습된)은 일부만 샘플링
  • 데이터 수를 줄이면서도 성능 유지

EFB (Exclusive Feature Bundling):

  • 동시에 0이 아닌 값을 가지지 않는 희소 특성들을 하나로 묶음
  • 특성 수를 줄여 속도 향상 (희소 데이터에 특히 효과적)
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
import numpy as np

# LightGBM 완전 파라미터 가이드
lgb_params = {
    # === 핵심 파라미터 ===
    'objective': 'binary',           # 목적 함수
    # 'binary', 'multiclass', 'regression', 'regression_l1', 'huber'
    # 'cross_entropy', 'mape', 'gamma', 'tweedie'

    'metric': 'auc',                 # 평가 지표
    # 'auc', 'binary_logloss', 'rmse', 'mae', 'mse', 'multi_logloss'

    # === 트리 파라미터 ===
    'n_estimators': 1000,
    'num_leaves': 31,                # 리프 노드 수 (2^max_depth보다 작게 설정)
    'max_depth': -1,                 # -1이면 무제한 (num_leaves로 제어)
    'min_child_samples': 20,         # 리프 최소 샘플 수 (과적합 방지)
    'min_child_weight': 0.001,       # 리프 최소 가중치 합
    'max_bin': 255,                  # 히스토그램 최대 구간 수

    # === 샘플링 ===
    'subsample': 0.8,                # 행 샘플링 비율
    'subsample_freq': 1,             # 샘플링 주기
    'colsample_bytree': 0.8,         # 열(특성) 샘플링 비율

    # === 정규화 ===
    'reg_alpha': 0.1,                # L1 정규화
    'reg_lambda': 0.1,               # L2 정규화
    'min_split_gain': 0.0,           # 최소 분기 이득

    # === 학습률 ===
    'learning_rate': 0.01,

    # === 범주형 특성 ===
    'cat_smooth': 10,                # 범주형 특성 평탄화

    # === 고급 ===
    'boosting_type': 'gbdt',         # 'gbdt', 'rf', 'dart', 'goss'
    'is_unbalance': False,           # 불균형 데이터
    'scale_pos_weight': 1,

    # === 시스템 ===
    'device': 'cpu',                 # 'cpu', 'gpu', 'cuda'
    'n_jobs': -1,
    'random_state': 42,
    'verbose': -1,
}

def train_lightgbm_cv(X, y, n_folds=5):
    """Stratified K-Fold CV로 LightGBM 학습"""
    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)

    oof_preds = np.zeros(len(X))
    models = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        print(f"\n--- Fold {fold + 1}/{n_folds} ---")

        X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

        model = lgb.LGBMClassifier(
            n_estimators=2000,
            num_leaves=31,
            learning_rate=0.05,
            subsample=0.8,
            colsample_bytree=0.8,
            reg_alpha=0.1,
            reg_lambda=0.1,
            min_child_samples=20,
            random_state=42,
            n_jobs=-1,
            verbose=-1,
        )

        model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            callbacks=[
                lgb.early_stopping(stopping_rounds=100, verbose=True),
                lgb.log_evaluation(period=200),
            ],
        )

        oof_preds[val_idx] = model.predict_proba(X_val)[:, 1]
        models.append(model)

        fold_auc = roc_auc_score(y_val, oof_preds[val_idx])
        print(f"Fold {fold+1} AUC: {fold_auc:.4f}")

    overall_auc = roc_auc_score(y, oof_preds)
    print(f"\nOverall OOF AUC: {overall_auc:.4f}")

    return models, oof_preds

# 범주형 특성 처리
def lgbm_with_categorical(X_train, y_train, X_val, y_val, cat_features):
    """LightGBM의 내장 범주형 특성 처리 사용"""

    # 범주형 특성을 integer로 인코딩
    for col in cat_features:
        X_train[col] = X_train[col].astype('category').cat.codes
        X_val[col] = X_val[col].astype('category').cat.codes

    train_data = lgb.Dataset(
        X_train, y_train,
        categorical_feature=cat_features,
        free_raw_data=False
    )
    val_data = lgb.Dataset(
        X_val, y_val,
        reference=train_data,
        categorical_feature=cat_features
    )

    params = {
        'objective': 'binary',
        'metric': 'auc',
        'num_leaves': 31,
        'learning_rate': 0.05,
        'verbose': -1,
    }

    model = lgb.train(
        params, train_data,
        num_boost_round=1000,
        valid_sets=[val_data],
        callbacks=[lgb.early_stopping(100), lgb.log_evaluation(100)],
    )

    return model

7. CatBoost

범주형 특성 자동 처리

CatBoost(Categorical Boosting)는 Yandex가 개발한 그래디언트 부스팅 라이브러리로, 범주형 특성 처리에서 독보적인 강점을 가집니다.

CatBoost의 범주형 처리 방법:

  1. Target Statistics (TS): 각 범주의 타겟 평균으로 인코딩
  2. Ordered Target Statistics: 데이터 순서를 활용한 리키지 없는 TS
  3. One-Hot Encoding: 고유값이 적은 경우 자동 적용

Ordered Boosting

CatBoost의 핵심 혁신인 Ordered Boosting은 타겟 통계량 계산 시 발생하는 예측 편향(prediction shift)을 해결합니다:

  • 데이터를 랜덤한 순서로 배열
  • i번째 샘플의 통계량은 0~i-1번째 샘플로만 계산
  • 이전에 본 샘플에서만 학습하므로 리키지 없음
from catboost import CatBoostClassifier, CatBoostRegressor, Pool
import pandas as pd
import numpy as np

def train_catboost(X_train, y_train, X_val, y_val, cat_features=None):
    """CatBoost 학습 - 범주형 특성을 직접 문자열로 전달 가능"""

    # CatBoost Pool 생성 (효율적인 데이터 처리)
    train_pool = Pool(
        data=X_train,
        label=y_train,
        cat_features=cat_features  # 범주형 특성 인덱스 또는 이름 리스트
    )
    val_pool = Pool(
        data=X_val,
        label=y_val,
        cat_features=cat_features
    )

    model = CatBoostClassifier(
        # === 기본 파라미터 ===
        iterations=1000,              # n_estimators와 동일
        learning_rate=0.05,
        depth=6,                      # 트리 깊이 (기본: 6, 최대: 16)

        # === 정규화 ===
        l2_leaf_reg=3.0,             # L2 정규화
        min_data_in_leaf=1,           # 리프 최소 샘플

        # === 샘플링 ===
        subsample=0.8,               # 행 샘플링 (Bernoulli/MVS 부스팅)
        colsample_bylevel=0.8,       # 레벨당 열 샘플링

        # === 범주형 처리 ===
        cat_features=cat_features,
        one_hot_max_size=2,          # OHE 적용할 최대 고유값 수

        # === 부스팅 타입 ===
        boosting_type='Ordered',      # 'Ordered' 또는 'Plain'
        bootstrap_type='Bayesian',    # 'Bayesian', 'Bernoulli', 'MVS', 'No'
        bagging_temperature=1.0,      # Bayesian bootstrap 온도

        # === 손실 함수 ===
        loss_function='Logloss',      # 분류: 'Logloss', 'CrossEntropy'
        eval_metric='AUC',           # 'AUC', 'Accuracy', 'F1', 'RMSE'

        # === 시스템 ===
        task_type='CPU',             # 'CPU' 또는 'GPU'
        devices='0',                 # GPU 장치 ID
        random_seed=42,
        verbose=100,
        use_best_model=True,         # 최적 반복에서의 모델 사용
        early_stopping_rounds=50,
    )

    model.fit(
        train_pool,
        eval_set=val_pool,
        plot=False,                   # 학습 곡선 시각화
    )

    print(f"Best iteration: {model.get_best_iteration()}")
    print(f"Best score: {model.get_best_score()}")

    return model

# XGBoost vs LightGBM vs CatBoost 비교
def compare_gbm_models(X, y, cat_features=None):
    """세 모델의 성능, 속도 비교"""
    import time
    from sklearn.model_selection import cross_val_score

    results = {}

    # XGBoost (범주형은 레이블 인코딩 필요)
    start = time.time()
    xgb_model = xgb.XGBClassifier(
        n_estimators=300, max_depth=6, learning_rate=0.1,
        subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1
    )
    xgb_scores = cross_val_score(xgb_model, X, y, cv=5, scoring='roc_auc')
    results['XGBoost'] = {
        'mean_auc': xgb_scores.mean(),
        'std_auc': xgb_scores.std(),
        'time': time.time() - start
    }

    # LightGBM
    start = time.time()
    lgb_model = lgb.LGBMClassifier(
        n_estimators=300, num_leaves=31, learning_rate=0.1,
        subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1, verbose=-1
    )
    lgb_scores = cross_val_score(lgb_model, X, y, cv=5, scoring='roc_auc')
    results['LightGBM'] = {
        'mean_auc': lgb_scores.mean(),
        'std_auc': lgb_scores.std(),
        'time': time.time() - start
    }

    # CatBoost
    start = time.time()
    cb_model = CatBoostClassifier(
        iterations=300, depth=6, learning_rate=0.1,
        random_seed=42, verbose=0
    )
    cb_scores = cross_val_score(cb_model, X, y, cv=5, scoring='roc_auc')
    results['CatBoost'] = {
        'mean_auc': cb_scores.mean(),
        'std_auc': cb_scores.std(),
        'time': time.time() - start
    }

    # 결과 출력
    result_df = pd.DataFrame(results).T
    print("=== GBM 모델 비교 ===")
    print(result_df.to_string())

    return result_df

XGBoost vs LightGBM vs CatBoost 비교표:

항목XGBoostLightGBMCatBoost
트리 성장Level-wiseLeaf-wiseSymmetric
속도중간빠름중간-빠름
메모리중간적음중간
범주형 처리수동내장 (제한적)자동 (최고)
GPU 지원OOO
하이퍼파라미터 수많음많음적음
최고 성능숫자 특성대규모 데이터범주형 특성

8. 피처 엔지니어링

수치형 특성 변환

from sklearn.preprocessing import (
    StandardScaler, MinMaxScaler, RobustScaler,
    PowerTransformer, QuantileTransformer
)
from scipy.stats import boxcox
import numpy as np

def transform_numeric_features(df, columns):
    """수치형 특성 변환 전략"""
    transformed = df.copy()

    for col in columns:
        # 1. 로그 변환 (양의 왜도, 우측 꼬리)
        if (df[col] > 0).all():
            transformed[f'{col}_log'] = np.log1p(df[col])

        # 2. 제곱근 변환
        if (df[col] >= 0).all():
            transformed[f'{col}_sqrt'] = np.sqrt(df[col])

        # 3. Box-Cox 변환 (양수 값만)
        if (df[col] > 0).all():
            transformed[f'{col}_boxcox'], _ = boxcox(df[col] + 1)

        # 4. Yeo-Johnson 변환 (음수 포함 가능)
        pt = PowerTransformer(method='yeo-johnson')
        transformed[f'{col}_yeojohnson'] = pt.fit_transform(df[[col]])

        # 5. 분위수 변환 (정규 분포로 변환)
        qt = QuantileTransformer(output_distribution='normal', n_quantiles=1000)
        transformed[f'{col}_quantile'] = qt.fit_transform(df[[col]])

    return transformed

# 구간화 (Binning)
def create_bins(df, col, n_bins=10, strategy='quantile'):
    """연속형 변수를 구간으로 변환"""
    from sklearn.preprocessing import KBinsDiscretizer

    kbd = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy=strategy)
    # strategy: 'uniform', 'quantile', 'kmeans'
    df[f'{col}_bin'] = kbd.fit_transform(df[[col]]).astype(int)

    return df

범주형 인코딩

from category_encoders import (
    TargetEncoder, LeaveOneOutEncoder,
    CatBoostEncoder, BinaryEncoder, HashingEncoder
)

def encode_categorical_features(X_train, X_val, y_train, cat_features):
    """다양한 범주형 인코딩 방법"""
    results = {}

    # 1. One-Hot Encoding (낮은 카디널리티)
    from sklearn.preprocessing import OneHotEncoder
    ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
    # 카디널리티 10 이하인 특성에만 적용 권장

    # 2. Label Encoding (순서가 있는 범주)
    from sklearn.preprocessing import LabelEncoder
    le = LabelEncoder()

    # 3. Target Encoding (중~고 카디널리티)
    # 주의: 훈련 데이터에서 타겟 리키지 발생 가능 -> CV 내부에서만 사용
    te = TargetEncoder(cols=cat_features, smoothing=1.0)
    X_train_te = te.fit_transform(X_train, y_train)
    X_val_te = te.transform(X_val)
    results['target_enc'] = (X_train_te, X_val_te)

    # 4. Leave-One-Out Encoding (타겟 리키지 방지)
    loo = LeaveOneOutEncoder(cols=cat_features, sigma=0.05)
    X_train_loo = loo.fit_transform(X_train, y_train)
    X_val_loo = loo.transform(X_val)
    results['loo_enc'] = (X_train_loo, X_val_loo)

    # 5. CatBoost Encoding (Ordered TS)
    cbe = CatBoostEncoder(cols=cat_features)
    X_train_cbe = cbe.fit_transform(X_train, y_train)
    X_val_cbe = cbe.transform(X_val)
    results['catboost_enc'] = (X_train_cbe, X_val_cbe)

    return results

# 날짜/시간 특성 엔지니어링
def extract_datetime_features(df, date_col):
    """날짜/시간 특성 추출"""
    df[date_col] = pd.to_datetime(df[date_col])

    df[f'{date_col}_year'] = df[date_col].dt.year
    df[f'{date_col}_month'] = df[date_col].dt.month
    df[f'{date_col}_day'] = df[date_col].dt.day
    df[f'{date_col}_dayofweek'] = df[date_col].dt.dayofweek
    df[f'{date_col}_dayofyear'] = df[date_col].dt.dayofyear
    df[f'{date_col}_weekofyear'] = df[date_col].dt.isocalendar().week.astype(int)
    df[f'{date_col}_quarter'] = df[date_col].dt.quarter
    df[f'{date_col}_hour'] = df[date_col].dt.hour
    df[f'{date_col}_is_weekend'] = (df[date_col].dt.dayofweek >= 5).astype(int)
    df[f'{date_col}_is_month_start'] = df[date_col].dt.is_month_start.astype(int)
    df[f'{date_col}_is_month_end'] = df[date_col].dt.is_month_end.astype(int)

    # 주기적 인코딩 (sin/cos 변환)
    df[f'{date_col}_month_sin'] = np.sin(2 * np.pi * df[f'{date_col}_month'] / 12)
    df[f'{date_col}_month_cos'] = np.cos(2 * np.pi * df[f'{date_col}_month'] / 12)
    df[f'{date_col}_hour_sin'] = np.sin(2 * np.pi * df[f'{date_col}_hour'] / 24)
    df[f'{date_col}_hour_cos'] = np.cos(2 * np.pi * df[f'{date_col}_hour'] / 24)

    return df

# 집계 특성 (GroupBy 특성)
def create_aggregate_features(df, group_cols, agg_cols):
    """그룹별 집계 통계 특성 생성"""
    for group_col in group_cols:
        for agg_col in agg_cols:
            prefix = f'{agg_col}_by_{group_col}'
            agg = df.groupby(group_col)[agg_col].agg([
                'mean', 'std', 'min', 'max', 'median',
                lambda x: x.quantile(0.25),
                lambda x: x.quantile(0.75)
            ])
            agg.columns = [
                f'{prefix}_mean', f'{prefix}_std',
                f'{prefix}_min', f'{prefix}_max',
                f'{prefix}_median', f'{prefix}_q25', f'{prefix}_q75'
            ]
            df = df.join(agg, on=group_col)

    return df

9. 앙상블 기법

Stacking (메타 러너)

스태킹은 여러 베이스 모델의 예측을 특성으로 사용하여 메타 모델을 학습하는 앙상블 방법입니다.

from sklearn.model_selection import cross_val_predict, StratifiedKFold
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.metrics import roc_auc_score
import numpy as np
import pandas as pd

class StackingEnsemble:
    """Stacking 앙상블 구현"""

    def __init__(self, base_models, meta_model, n_folds=5):
        self.base_models = base_models
        self.meta_model = meta_model
        self.n_folds = n_folds
        self.trained_base_models = []

    def fit(self, X, y):
        """베이스 모델 학습 및 메타 특성 생성"""
        meta_features_train = np.zeros((len(X), len(self.base_models)))
        skf = StratifiedKFold(n_splits=self.n_folds, shuffle=True, random_state=42)

        for i, (name, model) in enumerate(self.base_models):
            print(f"Training base model: {name}")
            fold_models = []

            for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
                if hasattr(X, 'iloc'):
                    X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
                    y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
                else:
                    X_tr, X_val = X[train_idx], X[val_idx]
                    y_tr, y_val = y[train_idx], y[val_idx]

                model_clone = type(model)(**model.get_params())
                model_clone.fit(X_tr, y_tr)
                fold_models.append(model_clone)

                if hasattr(model_clone, 'predict_proba'):
                    meta_features_train[val_idx, i] = model_clone.predict_proba(X_val)[:, 1]
                else:
                    meta_features_train[val_idx, i] = model_clone.predict(X_val)

            self.trained_base_models.append((name, fold_models))

        # 메타 모델 학습
        self.meta_model.fit(meta_features_train, y)

        meta_auc = roc_auc_score(y, meta_features_train.mean(axis=1))
        print(f"Meta features mean AUC: {meta_auc:.4f}")

        return self

    def predict_proba(self, X):
        """테스트 데이터 예측"""
        meta_features_test = np.zeros((len(X), len(self.base_models)))

        for i, (name, fold_models) in enumerate(self.trained_base_models):
            fold_preds = []
            for model in fold_models:
                if hasattr(model, 'predict_proba'):
                    fold_preds.append(model.predict_proba(X)[:, 1])
                else:
                    fold_preds.append(model.predict(X))
            meta_features_test[:, i] = np.mean(fold_preds, axis=0)

        return self.meta_model.predict_proba(meta_features_test)

# 실전 앙상블 구현
def build_ensemble(X_train, y_train, X_test):
    base_models = [
        ('xgb', xgb.XGBClassifier(
            n_estimators=500, max_depth=6, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, random_state=42
        )),
        ('lgb', lgb.LGBMClassifier(
            n_estimators=500, num_leaves=31, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, random_state=42, verbose=-1
        )),
        ('cb', CatBoostClassifier(
            iterations=500, depth=6, learning_rate=0.05,
            random_seed=42, verbose=0
        )),
    ]

    meta_model = LogisticRegression(C=1.0, max_iter=1000)

    stacker = StackingEnsemble(base_models, meta_model, n_folds=5)
    stacker.fit(X_train, y_train)

    return stacker.predict_proba(X_test)[:, 1]

# 가중 앙상블 (Weighted Blending)
def weighted_blend(predictions, weights):
    """
    predictions: list of arrays (각 모델의 예측 확률)
    weights: list of floats (각 모델의 가중치, 합 = 1.0)
    """
    weights = np.array(weights) / sum(weights)
    blended = sum(w * p for w, p in zip(weights, predictions))
    return blended

10. TabNet (딥러닝 for 표 형식)

TabNet 아키텍처

TabNet은 Google이 2019년 발표한 표 형식 데이터를 위한 딥러닝 아키텍처입니다. 핵심은 Sequential Attention 메커니즘으로, 각 예측 단계에서 어떤 특성에 집중할지 동적으로 선택합니다.

주요 구성 요소:

  1. Feature Transformer: 선택된 특성을 처리하는 공유/스텝별 레이어
  2. Attentive Transformer: 다음 단계에서 집중할 특성 마스크 생성
  3. Sequential Steps: 여러 단계로 순차적 특성 선택
  4. Sparse Attention: 엔트로피 정규화로 희소한 특성 선택 유도
from pytorch_tabnet.tab_model import TabNetClassifier, TabNetRegressor
import numpy as np
from sklearn.preprocessing import LabelEncoder

def train_tabnet(X_train, y_train, X_val, y_val, cat_features=None, cat_dims=None):
    """TabNet 학습"""

    # 범주형 특성 처리
    if cat_features is None:
        cat_features = []
        cat_dims = []

    model = TabNetClassifier(
        # === 아키텍처 ===
        n_d=64,                      # 예측 레이어 차원 (n_a와 같게)
        n_a=64,                      # Attention 임베딩 차원
        n_steps=5,                   # Sequential attention 단계 수
        gamma=1.3,                   # 특성 재사용 계수 (1.0~2.0)
        n_independent=2,             # 독립 GLU 레이어 수
        n_shared=2,                  # 공유 GLU 레이어 수

        # === 범주형 임베딩 ===
        cat_idxs=list(range(len(cat_features))),
        cat_dims=cat_dims,
        cat_emb_dim=1,               # 범주형 임베딩 차원

        # === 정규화 ===
        lambda_sparse=1e-3,          # Sparsity 정규화 계수
        momentum=0.02,               # BatchNorm momentum
        epsilon=1e-15,               # 수치 안정성

        # === 학습 ===
        optimizer_fn=torch.optim.Adam,
        optimizer_params=dict(lr=2e-2),
        scheduler_params=dict(
            mode='min', patience=5, min_lr=1e-5, factor=0.9
        ),
        scheduler_fn=torch.optim.lr_scheduler.ReduceLROnPlateau,
        mask_type='entmax',          # 'sparsemax' 또는 'entmax'

        # === 기타 ===
        verbose=10,
        seed=42,
        device_name='auto',          # 'cpu', 'cuda', 'auto'
    )

    # 학습
    model.fit(
        X_train=X_train.values if hasattr(X_train, 'values') else X_train,
        y_train=y_train.values if hasattr(y_train, 'values') else y_train,
        eval_set=[(
            X_val.values if hasattr(X_val, 'values') else X_val,
            y_val.values if hasattr(y_val, 'values') else y_val
        )],
        eval_name=['val'],
        eval_metric=['auc'],
        max_epochs=200,
        patience=20,                 # Early stopping patience
        batch_size=1024,
        virtual_batch_size=128,      # Ghost batch normalization
        num_workers=0,
        drop_last=False,
        pretraining_ratio=0.8,       # 사전학습 마스킹 비율
    )

    return model

# TabNet 특성 중요도
def tabnet_feature_importance(model, feature_names):
    importances = model.feature_importances_
    imp_df = pd.DataFrame({
        'feature': feature_names,
        'importance': importances
    }).sort_values('importance', ascending=False)

    plt.figure(figsize=(10, 8))
    imp_df.head(20).plot(x='feature', y='importance', kind='barh', color='coral')
    plt.title('TabNet 특성 중요도')
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()

    return imp_df

11. Kaggle 수준 파이프라인

Cross-Validation 전략

from sklearn.model_selection import (
    StratifiedKFold, KFold, GroupKFold,
    StratifiedGroupKFold, TimeSeriesSplit
)
import numpy as np

def kaggle_cv_pipeline(X, y, model, groups=None, time_col=None, n_folds=5):
    """Kaggle 수준 CV 파이프라인"""

    # CV 전략 선택
    if time_col is not None:
        # 시계열 데이터
        cv = TimeSeriesSplit(n_splits=n_folds)
        print("TimeSeriesSplit 사용")
    elif groups is not None:
        # 그룹 데이터 (같은 그룹은 같은 폴드)
        cv = StratifiedGroupKFold(n_splits=n_folds, shuffle=True, random_state=42)
        print("StratifiedGroupKFold 사용")
    else:
        # 표준 분류
        cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
        print("StratifiedKFold 사용")

    oof_predictions = np.zeros(len(X))
    feature_importances = []

    for fold, (train_idx, val_idx) in enumerate(
        cv.split(X, y, groups) if groups is not None else cv.split(X, y)
    ):
        print(f"\n{'='*50}")
        print(f"Fold {fold + 1}/{n_folds}")
        print(f"Train size: {len(train_idx)}, Val size: {len(val_idx)}")

        if hasattr(X, 'iloc'):
            X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
        else:
            X_tr, X_val = X[train_idx], X[val_idx]
            y_tr, y_val = y[train_idx], y[val_idx]

        model.fit(X_tr, y_tr)

        if hasattr(model, 'predict_proba'):
            oof_predictions[val_idx] = model.predict_proba(X_val)[:, 1]
        else:
            oof_predictions[val_idx] = model.predict(X_val)

        # 특성 중요도 수집
        if hasattr(model, 'feature_importances_'):
            feature_importances.append(model.feature_importances_)

        fold_score = roc_auc_score(y_val, oof_predictions[val_idx])
        print(f"Fold {fold+1} AUC: {fold_score:.5f}")

    overall_score = roc_auc_score(y, oof_predictions)
    print(f"\n{'='*50}")
    print(f"Overall OOF AUC: {overall_score:.5f}")

    # 평균 특성 중요도
    if feature_importances:
        mean_importance = np.mean(feature_importances, axis=0)
    else:
        mean_importance = None

    return oof_predictions, mean_importance, overall_score

# 리키지(Leakage) 방지
def prevent_leakage_pipeline(X_train, X_val, y_train):
    """
    데이터 리키지를 방지하는 전처리 파이프라인
    핵심: fit은 훈련 데이터에만, transform은 훈련/검증 모두에 적용
    """
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler
    from sklearn.impute import SimpleImputer

    # 잘못된 방법 (리키지 발생):
    # scaler = StandardScaler()
    # X_train_scaled = scaler.fit_transform(X_train)  # 전체 데이터로 fit
    # X_val_scaled = scaler.transform(X_val)

    # 올바른 방법 (파이프라인 사용):
    pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
    ])

    # fit은 훈련 데이터에만
    X_train_processed = pipeline.fit_transform(X_train)
    # transform은 검증/테스트 데이터에 적용
    X_val_processed = pipeline.transform(X_val)

    return X_train_processed, X_val_processed, pipeline

12. 특성 선택 기법

Boruta 알고리즘

Boruta는 랜덤 포레스트 기반의 강력한 특성 선택 알고리즘입니다. 원래 특성의 복사본(shadow features)을 만들어 비교하는 방식으로 작동합니다.

# pip install boruta
from boruta import BorutaPy
from sklearn.ensemble import RandomForestClassifier

def boruta_feature_selection(X, y, max_iter=100):
    """Boruta 알고리즘으로 특성 선택"""
    rf = RandomForestClassifier(
        n_estimators=200,
        max_depth=7,
        random_state=42,
        n_jobs=-1
    )

    boruta = BorutaPy(
        estimator=rf,
        n_estimators='auto',
        perc=100,           # 퍼센타일 (100 = 최대값과 비교)
        alpha=0.05,         # 유의 수준
        max_iter=max_iter,
        random_state=42,
        verbose=1
    )

    boruta.fit(X.values, y.values)

    # 결과 분석
    feature_ranking = pd.DataFrame({
        'feature': X.columns,
        'ranking': boruta.ranking_,
        'selected': boruta.support_,
        'tentative': boruta.support_weak_
    }).sort_values('ranking')

    selected_features = X.columns[boruta.support_].tolist()
    tentative_features = X.columns[boruta.support_weak_].tolist()

    print(f"선택된 특성: {len(selected_features)}개")
    print(f"잠정 특성: {len(tentative_features)}개")
    print(f"제거된 특성: {len(X.columns) - len(selected_features) - len(tentative_features)}개")

    return selected_features, tentative_features, feature_ranking

# SHAP 기반 특성 선택
def shap_feature_selection(model, X_val, threshold=0.01):
    """SHAP 값 기반 특성 선택"""
    import shap

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_val)

    if isinstance(shap_values, list):
        shap_abs = np.abs(shap_values[1])
    else:
        shap_abs = np.abs(shap_values)

    mean_shap = shap_abs.mean(axis=0)
    total_shap = mean_shap.sum()

    feature_importance = pd.DataFrame({
        'feature': X_val.columns,
        'mean_abs_shap': mean_shap,
        'shap_ratio': mean_shap / total_shap
    }).sort_values('mean_abs_shap', ascending=False)

    # 누적 SHAP 비율 기준 선택
    feature_importance['cumulative_ratio'] = feature_importance['shap_ratio'].cumsum()
    selected = feature_importance[feature_importance['mean_abs_shap'] > threshold * total_shap]

    print(f"SHAP 기반 선택: {len(selected)}개 / 전체 {len(X_val.columns)}개")

    return selected['feature'].tolist(), feature_importance

완전한 Kaggle 파이프라인 예시

def complete_kaggle_pipeline(train_df, test_df, target_col, cat_features=None):
    """
    완전한 Kaggle ML 파이프라인
    - EDA -> 전처리 -> 피처 엔지니어링 -> 모델 학습 -> 앙상블
    """
    from sklearn.metrics import roc_auc_score

    # 1. 분리
    y = train_df[target_col]
    X = train_df.drop(columns=[target_col])

    # 2. 결측치 처리
    for col in X.select_dtypes(include=[np.number]).columns:
        X[col] = X[col].fillna(X[col].median())
        test_df[col] = test_df[col].fillna(X[col].median())

    # 3. 범주형 인코딩
    if cat_features:
        for col in cat_features:
            X[col] = X[col].astype('category').cat.codes
            test_df[col] = test_df[col].astype('category').cat.codes

    # 4. 앙상블 모델들 학습
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    xgb_oof = np.zeros(len(X))
    lgb_oof = np.zeros(len(X))
    cb_oof = np.zeros(len(X))

    xgb_test_preds = np.zeros(len(test_df))
    lgb_test_preds = np.zeros(len(test_df))
    cb_test_preds = np.zeros(len(test_df))

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]

        # XGBoost
        xgb_model = xgb.XGBClassifier(
            n_estimators=1000, max_depth=6, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1,
            random_state=42, n_jobs=-1, eval_metric='auc'
        )
        xgb_model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
                      early_stopping_rounds=50, verbose=False)
        xgb_oof[val_idx] = xgb_model.predict_proba(X_val)[:, 1]
        xgb_test_preds += xgb_model.predict_proba(test_df)[:, 1] / 5

        # LightGBM
        lgb_model = lgb.LGBMClassifier(
            n_estimators=1000, num_leaves=31, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, random_state=42,
            n_jobs=-1, verbose=-1
        )
        lgb_model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
                      callbacks=[lgb.early_stopping(50, verbose=False),
                                lgb.log_evaluation(-1)])
        lgb_oof[val_idx] = lgb_model.predict_proba(X_val)[:, 1]
        lgb_test_preds += lgb_model.predict_proba(test_df)[:, 1] / 5

        # CatBoost
        cb_model = CatBoostClassifier(
            iterations=1000, depth=6, learning_rate=0.05,
            random_seed=42, verbose=0, early_stopping_rounds=50
        )
        cb_model.fit(X_tr, y_tr, eval_set=(X_val, y_val))
        cb_oof[val_idx] = cb_model.predict_proba(X_val)[:, 1]
        cb_test_preds += cb_model.predict_proba(test_df)[:, 1] / 5

    print(f"XGBoost OOF AUC: {roc_auc_score(y, xgb_oof):.5f}")
    print(f"LightGBM OOF AUC: {roc_auc_score(y, lgb_oof):.5f}")
    print(f"CatBoost OOF AUC: {roc_auc_score(y, cb_oof):.5f}")

    # 앙상블
    ensemble_oof = (xgb_oof + lgb_oof + cb_oof) / 3
    ensemble_test = (xgb_test_preds + lgb_test_preds + cb_test_preds) / 3

    print(f"Ensemble OOF AUC: {roc_auc_score(y, ensemble_oof):.5f}")

    return ensemble_test, ensemble_oof

# 실행 예시
if __name__ == "__main__":
    # 예시 데이터 생성
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split

    X, y = make_classification(
        n_samples=10000, n_features=30, n_informative=20,
        n_redundant=5, random_state=42
    )
    X = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(30)])
    y = pd.Series(y)

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # LightGBM CV 학습
    models, oof_preds = train_lightgbm_cv(X_train, y_train)
    print(f"Final OOF AUC: {roc_auc_score(y_train, oof_preds):.4f}")

마무리

이 가이드에서는 표 형식 데이터 ML의 전체 파이프라인을 다루었습니다:

  1. EDA와 전처리: 데이터를 이해하고 결측치/이상치를 처리하는 체계적인 접근
  2. 트리 기반 모델: 결정 트리에서 랜덤 포레스트, 그래디언트 부스팅까지 단계적 이해
  3. 최신 GBM: XGBoost, LightGBM, CatBoost의 각 특성과 장단점
  4. 피처 엔지니어링: 도메인 지식을 코드로 변환하는 다양한 기법
  5. 앙상블: 여러 모델을 결합하여 성능 극대화
  6. TabNet: 표 형식 데이터를 위한 딥러닝 접근법
  7. Kaggle 파이프라인: 실전에서 사용하는 완전한 워크플로우
  8. 특성 선택: 불필요한 특성을 제거하여 성능과 해석 가능성 향상

핵심 조언:

  • 항상 EDA로 데이터를 먼저 이해하세요
  • CV는 반드시 리키지 없이 설계하세요
  • 단일 모델보다 앙상블이 대부분 더 나은 성능을 보입니다
  • SHAP 값으로 모델을 해석하고 도메인 지식과 결합하세요
  • LightGBM은 속도, CatBoost는 범주형, XGBoost는 안정성에서 강점

참고자료

Tabular Data ML Complete Guide: Master XGBoost, LightGBM, CatBoost, and TabNet

1. Overview of Tabular Data Machine Learning

Why Do Tree-Based Models Excel at Tabular Data?

Tabular data — structured as rows and columns — represents over 80% of real-world business problems. It appears across domains including financial fraud detection, customer churn prediction, real estate pricing, and medical diagnosis.

While deep learning has revolutionized images, text, and audio, Gradient Boosted Trees still dominate tabular data competitions. Here is why:

Strengths of Tree-Based Models:

  1. Irregular decision boundaries: Trees partition feature space with axis-aligned splits, naturally capturing complex non-linear relationships.
  2. Scale invariance: Works well without feature normalization since gradient boosting only uses the rank ordering of data.
  3. Missing value handling: XGBoost, LightGBM, and CatBoost all handle missing values internally.
  4. Categorical features: Label encoding alone is often sufficient for good performance.
  5. Interpretability: Feature importance and SHAP values allow explaining model decisions.
  6. Resistance to overfitting: Ensemble methods are more robust than single models.

EDA (Exploratory Data Analysis) Strategy

Understanding your data deeply before model training is crucial.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

def eda_overview(df):
    """Print comprehensive overview of a dataframe"""
    print("=== Data Overview ===")
    print(f"Shape: {df.shape}")
    print(f"\nData Types:\n{df.dtypes}")
    print(f"\nMissing Values:\n{df.isnull().sum()}")
    print(f"\nMissing Ratios:\n{(df.isnull().mean() * 100).round(2)}")
    print(f"\nNumeric Summary:\n{df.describe()}")
    print(f"\nCategorical Feature Cardinality:")
    for col in df.select_dtypes(include='object').columns:
        print(f"  {col}: {df[col].nunique()} unique values")

def plot_target_distribution(df, target_col):
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    axes[0].hist(df[target_col], bins=50, color='steelblue', edgecolor='white')
    axes[0].set_title(f'{target_col} Distribution')
    stats.probplot(df[target_col].dropna(), dist="norm", plot=axes[1])
    axes[1].set_title('Q-Q Plot (Normality Check)')
    plt.tight_layout()
    plt.show()

def detect_outliers_iqr(df, columns):
    """Detect outliers using IQR method"""
    outlier_info = {}
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower = Q1 - 1.5 * IQR
        upper = Q3 + 1.5 * IQR
        outliers = df[(df[col] < lower) | (df[col] > upper)]
        outlier_info[col] = {
            'count': len(outliers),
            'ratio': len(outliers) / len(df),
            'lower_bound': lower,
            'upper_bound': upper
        }
    return pd.DataFrame(outlier_info).T

Missing Value Handling Strategies

from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

def simple_imputation(df):
    """Median imputation for numeric, mode for categorical"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    num_imputer = SimpleImputer(strategy='median')
    df[numeric_cols] = num_imputer.fit_transform(df[numeric_cols])

    cat_cols = df.select_dtypes(include='object').columns
    cat_imputer = SimpleImputer(strategy='most_frequent')
    df[cat_cols] = cat_imputer.fit_transform(df[cat_cols])
    return df

def knn_imputation(df, n_neighbors=5):
    """KNN-based imputation (effective for small datasets)"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    imputer = KNNImputer(n_neighbors=n_neighbors)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    return df

def mice_imputation(df, max_iter=10):
    """MICE: Multiple Imputation by Chained Equations"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    imputer = IterativeImputer(max_iter=max_iter, random_state=42)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    return df

def add_missing_indicators(df):
    """Add binary columns indicating where values were missing"""
    cols_with_missing = df.columns[df.isnull().any()].tolist()
    for col in cols_with_missing:
        df[f'{col}_missing'] = df[col].isnull().astype(int)
    return df

2. Decision Trees

ID3 and CART Algorithms

Decision trees recursively partition data to build a tree structure. Two major algorithms exist:

ID3 (Iterative Dichotomiser 3):

  • Uses Information Gain as the splitting criterion
  • Only handles categorical features (multi-way splits)
  • Only for categorical targets

CART (Classification and Regression Trees):

  • Classification: Gini Impurity
  • Regression: Mean Squared Error
  • Always binary splits (two child nodes)
  • Algorithm used by scikit-learn

Information Gain and Gini Impurity

Entropy and Information Gain:

Entropy measures impurity of a dataset. For k classes:

H(S) = -sum(p_i * log2(p_i))

Information Gain is the entropy reduction after a split:

IG(S, A) = H(S) - sum(|S_v|/|S| * H(S_v))

Gini Impurity:

Gini(S) = 1 - sum(p_i^2)
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.tree import export_text, plot_tree
import matplotlib.pyplot as plt

def train_decision_tree(X_train, y_train, task='classification'):
    if task == 'classification':
        model = DecisionTreeClassifier(
            criterion='gini',        # 'gini' or 'entropy'
            max_depth=5,
            min_samples_split=10,
            min_samples_leaf=5,
            max_features=None,
            random_state=42
        )
    else:
        model = DecisionTreeRegressor(
            criterion='squared_error',
            max_depth=5,
            min_samples_split=10,
            min_samples_leaf=5,
            random_state=42
        )
    model.fit(X_train, y_train)
    return model

def visualize_tree(model, feature_names, class_names=None, max_depth=3):
    plt.figure(figsize=(20, 10))
    plot_tree(
        model, feature_names=feature_names, class_names=class_names,
        filled=True, rounded=True, max_depth=max_depth, fontsize=10
    )
    plt.title('Decision Tree Visualization')
    plt.tight_layout()
    plt.show()
    print(export_text(model, feature_names=feature_names, max_depth=3))

3. Random Forest

Bagging and Feature Randomization

Random Forest combines Bootstrap Aggregating (Bagging) with Feature Randomization:

  1. Generate bootstrap samples (sampling with replacement) from training data
  2. Train an independent decision tree on each sample
  3. At each split, randomly select only sqrt(n_features) features to consider
  4. Aggregate predictions via averaging (regression) or majority voting (classification)
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.inspection import permutation_importance

def train_random_forest(X_train, y_train, task='classification'):
    if task == 'classification':
        model = RandomForestClassifier(
            n_estimators=300,
            max_depth=None,
            min_samples_split=2,
            min_samples_leaf=1,
            max_features='sqrt',
            bootstrap=True,
            oob_score=True,
            n_jobs=-1,
            random_state=42
        )
    else:
        model = RandomForestRegressor(
            n_estimators=300,
            max_features='sqrt',
            oob_score=True,
            n_jobs=-1,
            random_state=42
        )
    model.fit(X_train, y_train)
    print(f"OOB Score: {model.oob_score_:.4f}")
    return model

def plot_feature_importance(model, feature_names, top_n=20):
    importances = pd.Series(
        model.feature_importances_, index=feature_names
    ).sort_values(ascending=False)

    plt.figure(figsize=(10, 8))
    importances.head(top_n).plot(kind='barh', color='steelblue')
    plt.title(f'Random Forest Feature Importance (Top {top_n})')
    plt.xlabel('Importance')
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()
    return importances

def compute_permutation_importance(model, X_val, y_val, feature_names):
    result = permutation_importance(
        model, X_val, y_val, n_repeats=10, random_state=42, n_jobs=-1
    )
    return pd.DataFrame({
        'feature': feature_names,
        'importance_mean': result.importances_mean,
        'importance_std': result.importances_std
    }).sort_values('importance_mean', ascending=False)

4. Gradient Boosting

Mathematical Principles

Gradient Boosting minimizes a loss function by sequentially adding trees in the direction of the negative gradient:

F_m(x) = F_{m-1}(x) + learning_rate * h_m(x)

Here, the stage-m tree is fitted to the pseudo-residuals:

r_i = -[dL(y_i, F(x_i)) / dF(x_i)]

For regression with MSE loss, the pseudo-residual is the target minus the current ensemble prediction, which is the ordinary residual. For binary classification with log loss, the pseudo-residual is the target minus the sigmoid of the current ensemble score.

from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor

def train_sklearn_gbm(X_train, y_train, task='classification'):
    if task == 'classification':
        model = GradientBoostingClassifier(
            n_estimators=200, learning_rate=0.1, max_depth=3,
            subsample=0.8, max_features='sqrt', random_state=42
        )
    else:
        model = GradientBoostingRegressor(
            n_estimators=200, learning_rate=0.1, max_depth=3,
            subsample=0.8, random_state=42
        )
    model.fit(X_train, y_train)
    return model

5. XGBoost

XGBoost's Innovations

XGBoost (eXtreme Gradient Boosting), developed by Chen & Guestrin (2016), delivers major improvements over vanilla GBM:

  1. Regularization: Adds L1 and L2 regularization to the objective function to prevent overfitting
  2. Second-order Taylor expansion: More accurate tree structure search using both first and second derivatives
  3. Missing value learning: Learns the optimal default direction for missing samples at each split
  4. Parallel processing: Parallel feature split search (tree construction itself is sequential)
  5. Cache optimization: Optimized memory access patterns for speed
  6. Block structure: Compressed column storage for sparse data

Complete Hyperparameter Reference

import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

def train_xgboost_full(X, y, task='binary'):
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42,
        stratify=y if task == 'binary' else None
    )

    if task == 'binary':
        objective = 'binary:logistic'
        eval_metric = 'auc'
    elif task == 'multiclass':
        objective = 'multi:softprob'
        eval_metric = 'mlogloss'
    else:
        objective = 'reg:squarederror'
        eval_metric = 'rmse'

    model = xgb.XGBClassifier(
        # Tree parameters
        n_estimators=1000,
        max_depth=6,            # Default: 6. Deeper = more complex
        min_child_weight=1,     # Minimum sum of instance weight in a leaf
        gamma=0,                # Minimum loss reduction for a split
        max_delta_step=0,       # Max delta step per tree weight (useful for imbalance)

        # Sampling
        subsample=0.8,          # Row sampling ratio per tree
        colsample_bytree=0.8,   # Column sampling ratio per tree
        colsample_bylevel=1.0,  # Column sampling per level
        colsample_bynode=1.0,   # Column sampling per node

        # Regularization
        reg_alpha=0.1,          # L1 regularization
        reg_lambda=1.0,         # L2 regularization (default: 1)

        # Learning
        learning_rate=0.01,     # Step size shrinkage (eta)
        scale_pos_weight=1,     # For imbalanced: neg/pos ratio

        # System
        objective=objective,
        eval_metric=eval_metric,
        tree_method='hist',     # 'hist' (fast), 'exact', 'approx', 'gpu_hist'
        random_state=42,
        n_jobs=-1,
    )

    model.fit(
        X_train, y_train,
        eval_set=[(X_train, y_train), (X_val, y_val)],
        early_stopping_rounds=50,
        verbose=100,
    )

    print(f"Best iteration: {model.best_iteration}")
    print(f"Best score: {model.best_score:.4f}")
    return model, X_val, y_val

def explain_with_shap(model, X_val, feature_names):
    """SHAP-based model explanation"""
    import shap

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_val)

    plt.figure(figsize=(10, 8))
    shap.summary_plot(shap_values, X_val, feature_names=feature_names, show=False)
    plt.tight_layout()
    plt.show()

    mean_abs_shap = pd.DataFrame({
        'feature': feature_names,
        'importance': np.abs(shap_values).mean(axis=0)
    }).sort_values('importance', ascending=False)

    return mean_abs_shap

GPU-Accelerated XGBoost

# GPU training (requires NVIDIA GPU)
model_gpu = xgb.XGBClassifier(
    n_estimators=1000,
    tree_method='gpu_hist',     # GPU histogram method
    predictor='gpu_predictor',  # GPU prediction
    device='cuda',              # XGBoost 2.0+: use device='cuda'
    random_state=42
)

6. LightGBM

Leaf-wise vs Level-wise Tree Growth

LightGBM (Light Gradient Boosting Machine), developed by Microsoft, introduces critical innovations for speed and memory efficiency.

Level-wise (traditional):

  • Splits all leaves simultaneously by depth level
  • Produces balanced trees
  • Wastes computation on nodes with little gain

Leaf-wise (LightGBM):

  • Always splits the single leaf with the greatest loss reduction
  • May produce highly unbalanced trees
  • Achieves lower loss with the same number of leaves
  • Requires max_depth to prevent overfitting

GOSS and EFB

GOSS (Gradient-based One-Side Sampling):

  • Keeps all samples with large gradients (high information content)
  • Randomly drops samples with small gradients (already well-learned)
  • Reduces data size while preserving learning accuracy

EFB (Exclusive Feature Bundling):

  • Bundles mutually exclusive sparse features (never both non-zero simultaneously)
  • Reduces feature count, speeding up training (especially effective for sparse data)
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
import numpy as np

def train_lightgbm_cv(X, y, n_folds=5):
    """Train LightGBM with Stratified K-Fold CV"""
    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
    oof_preds = np.zeros(len(X))
    models = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        print(f"\n--- Fold {fold + 1}/{n_folds} ---")
        X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

        model = lgb.LGBMClassifier(
            n_estimators=2000,
            num_leaves=31,       # Main complexity parameter
            max_depth=-1,        # -1 means unlimited (control via num_leaves)
            min_child_samples=20,
            learning_rate=0.05,
            subsample=0.8,
            subsample_freq=1,
            colsample_bytree=0.8,
            reg_alpha=0.1,
            reg_lambda=0.1,
            boosting_type='gbdt',  # 'gbdt', 'dart', 'goss', 'rf'
            random_state=42,
            n_jobs=-1,
            verbose=-1,
        )

        model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            callbacks=[
                lgb.early_stopping(stopping_rounds=100, verbose=True),
                lgb.log_evaluation(period=200),
            ],
        )

        oof_preds[val_idx] = model.predict_proba(X_val)[:, 1]
        models.append(model)

        fold_auc = roc_auc_score(y_val, oof_preds[val_idx])
        print(f"Fold {fold+1} AUC: {fold_auc:.4f}")

    overall_auc = roc_auc_score(y, oof_preds)
    print(f"\nOverall OOF AUC: {overall_auc:.4f}")
    return models, oof_preds

def lgbm_with_categorical(X_train, y_train, X_val, y_val, cat_features):
    """LightGBM with native categorical feature handling"""
    for col in cat_features:
        X_train[col] = X_train[col].astype('category').cat.codes
        X_val[col] = X_val[col].astype('category').cat.codes

    train_data = lgb.Dataset(
        X_train, y_train,
        categorical_feature=cat_features,
        free_raw_data=False
    )
    val_data = lgb.Dataset(X_val, y_val, reference=train_data)

    params = {
        'objective': 'binary', 'metric': 'auc',
        'num_leaves': 31, 'learning_rate': 0.05, 'verbose': -1,
    }

    model = lgb.train(
        params, train_data, num_boost_round=1000,
        valid_sets=[val_data],
        callbacks=[lgb.early_stopping(100), lgb.log_evaluation(100)],
    )
    return model

7. CatBoost

Automatic Categorical Feature Handling

CatBoost (Categorical Boosting), developed by Yandex, excels at handling categorical features natively.

CatBoost's categorical encoding:

  1. Target Statistics (TS): Encode each category using the target mean
  2. Ordered Target Statistics: Leak-free TS using data ordering
  3. One-Hot Encoding: Automatically applied for low-cardinality features

Ordered Boosting

CatBoost's key innovation, Ordered Boosting, resolves prediction shift bias that arises when computing target statistics:

  • Arrange data in a random order
  • Statistics for sample i are computed using only samples 0 through i-1
  • Only uses previously seen samples, eliminating leakage
from catboost import CatBoostClassifier, CatBoostRegressor, Pool

def train_catboost(X_train, y_train, X_val, y_val, cat_features=None):
    """CatBoost training — can pass raw string categories directly"""

    train_pool = Pool(data=X_train, label=y_train, cat_features=cat_features)
    val_pool = Pool(data=X_val, label=y_val, cat_features=cat_features)

    model = CatBoostClassifier(
        iterations=1000,
        learning_rate=0.05,
        depth=6,
        l2_leaf_reg=3.0,
        subsample=0.8,
        colsample_bylevel=0.8,
        cat_features=cat_features,
        one_hot_max_size=2,
        boosting_type='Ordered',
        bootstrap_type='Bayesian',
        bagging_temperature=1.0,
        loss_function='Logloss',
        eval_metric='AUC',
        task_type='CPU',
        random_seed=42,
        verbose=100,
        use_best_model=True,
        early_stopping_rounds=50,
    )

    model.fit(train_pool, eval_set=val_pool, plot=False)
    print(f"Best iteration: {model.get_best_iteration()}")
    print(f"Best score: {model.get_best_score()}")
    return model

def compare_gbm_models(X, y):
    """Compare performance and speed across the three GBM frameworks"""
    import time
    from sklearn.model_selection import cross_val_score

    results = {}

    start = time.time()
    xgb_model = xgb.XGBClassifier(
        n_estimators=300, max_depth=6, learning_rate=0.1,
        subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1
    )
    xgb_scores = cross_val_score(xgb_model, X, y, cv=5, scoring='roc_auc')
    results['XGBoost'] = {
        'mean_auc': xgb_scores.mean(), 'std_auc': xgb_scores.std(),
        'time_sec': time.time() - start
    }

    start = time.time()
    lgb_model = lgb.LGBMClassifier(
        n_estimators=300, num_leaves=31, learning_rate=0.1,
        subsample=0.8, colsample_bytree=0.8, random_state=42,
        n_jobs=-1, verbose=-1
    )
    lgb_scores = cross_val_score(lgb_model, X, y, cv=5, scoring='roc_auc')
    results['LightGBM'] = {
        'mean_auc': lgb_scores.mean(), 'std_auc': lgb_scores.std(),
        'time_sec': time.time() - start
    }

    start = time.time()
    cb_model = CatBoostClassifier(
        iterations=300, depth=6, learning_rate=0.1, random_seed=42, verbose=0
    )
    cb_scores = cross_val_score(cb_model, X, y, cv=5, scoring='roc_auc')
    results['CatBoost'] = {
        'mean_auc': cb_scores.mean(), 'std_auc': cb_scores.std(),
        'time_sec': time.time() - start
    }

    result_df = pd.DataFrame(results).T
    print("=== GBM Model Comparison ===")
    print(result_df.to_string())
    return result_df

XGBoost vs LightGBM vs CatBoost Comparison:

AspectXGBoostLightGBMCatBoost
Tree growthLevel-wiseLeaf-wiseSymmetric
SpeedMediumFastMedium-Fast
MemoryMediumLowMedium
CategoricalManualBuilt-in (limited)Automatic (best)
GPU supportYesYesYes
HyperparametersManyManyFewer
Best forNumeric featuresLarge datasetsCategorical features

8. Feature Engineering

Numeric Feature Transformations

from sklearn.preprocessing import (
    StandardScaler, MinMaxScaler, RobustScaler,
    PowerTransformer, QuantileTransformer, KBinsDiscretizer
)
from scipy.stats import boxcox
import numpy as np

def transform_numeric_features(df, columns):
    """Apply various numeric transformations"""
    transformed = df.copy()

    for col in columns:
        # 1. Log transform (right-skewed, positive values)
        if (df[col] > 0).all():
            transformed[f'{col}_log'] = np.log1p(df[col])

        # 2. Square root transform
        if (df[col] >= 0).all():
            transformed[f'{col}_sqrt'] = np.sqrt(df[col])

        # 3. Box-Cox transform (positive values only)
        if (df[col] > 0).all():
            transformed[f'{col}_boxcox'], _ = boxcox(df[col] + 1)

        # 4. Yeo-Johnson transform (handles negative values)
        pt = PowerTransformer(method='yeo-johnson')
        transformed[f'{col}_yeojohnson'] = pt.fit_transform(df[[col]])

        # 5. Quantile transform (maps to normal distribution)
        qt = QuantileTransformer(output_distribution='normal', n_quantiles=1000)
        transformed[f'{col}_quantile'] = qt.fit_transform(df[[col]])

    return transformed

def create_bins(df, col, n_bins=10, strategy='quantile'):
    """Discretize continuous variable into bins"""
    kbd = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy=strategy)
    df[f'{col}_bin'] = kbd.fit_transform(df[[col]]).astype(int)
    return df

Categorical Encoding

from category_encoders import (
    TargetEncoder, LeaveOneOutEncoder,
    CatBoostEncoder, BinaryEncoder
)

def encode_categorical_features(X_train, X_val, y_train, cat_features):
    """Multiple categorical encoding strategies"""
    results = {}

    # Target Encoding (medium-high cardinality)
    # Warning: can cause target leakage — only use inside CV folds
    te = TargetEncoder(cols=cat_features, smoothing=1.0)
    results['target_enc'] = (
        te.fit_transform(X_train, y_train),
        te.transform(X_val)
    )

    # Leave-One-Out Encoding (prevents target leakage)
    loo = LeaveOneOutEncoder(cols=cat_features, sigma=0.05)
    results['loo_enc'] = (
        loo.fit_transform(X_train, y_train),
        loo.transform(X_val)
    )

    # CatBoost Encoding (Ordered Target Statistics)
    cbe = CatBoostEncoder(cols=cat_features)
    results['catboost_enc'] = (
        cbe.fit_transform(X_train, y_train),
        cbe.transform(X_val)
    )

    return results

def extract_datetime_features(df, date_col):
    """Extract rich datetime features"""
    df[date_col] = pd.to_datetime(df[date_col])
    df[f'{date_col}_year'] = df[date_col].dt.year
    df[f'{date_col}_month'] = df[date_col].dt.month
    df[f'{date_col}_day'] = df[date_col].dt.day
    df[f'{date_col}_dayofweek'] = df[date_col].dt.dayofweek
    df[f'{date_col}_dayofyear'] = df[date_col].dt.dayofyear
    df[f'{date_col}_weekofyear'] = df[date_col].dt.isocalendar().week.astype(int)
    df[f'{date_col}_quarter'] = df[date_col].dt.quarter
    df[f'{date_col}_hour'] = df[date_col].dt.hour
    df[f'{date_col}_is_weekend'] = (df[date_col].dt.dayofweek >= 5).astype(int)
    df[f'{date_col}_is_month_end'] = df[date_col].dt.is_month_end.astype(int)

    # Cyclical encoding (sin/cos for periodic features)
    df[f'{date_col}_month_sin'] = np.sin(2 * np.pi * df[f'{date_col}_month'] / 12)
    df[f'{date_col}_month_cos'] = np.cos(2 * np.pi * df[f'{date_col}_month'] / 12)
    df[f'{date_col}_hour_sin'] = np.sin(2 * np.pi * df[f'{date_col}_hour'] / 24)
    df[f'{date_col}_hour_cos'] = np.cos(2 * np.pi * df[f'{date_col}_hour'] / 24)
    return df

def create_aggregate_features(df, group_cols, agg_cols):
    """Create group-level aggregation features"""
    for group_col in group_cols:
        for agg_col in agg_cols:
            prefix = f'{agg_col}_by_{group_col}'
            agg = df.groupby(group_col)[agg_col].agg([
                'mean', 'std', 'min', 'max', 'median'
            ])
            agg.columns = [
                f'{prefix}_mean', f'{prefix}_std',
                f'{prefix}_min', f'{prefix}_max', f'{prefix}_median'
            ]
            df = df.join(agg, on=group_col)
    return df

9. Ensemble Methods

Stacking (Meta-Learning)

Stacking uses predictions from base models as features to train a meta-model.

from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
import numpy as np

class StackingEnsemble:
    """Full stacking ensemble implementation"""

    def __init__(self, base_models, meta_model, n_folds=5):
        self.base_models = base_models
        self.meta_model = meta_model
        self.n_folds = n_folds
        self.trained_base_models = []

    def fit(self, X, y):
        meta_features_train = np.zeros((len(X), len(self.base_models)))
        skf = StratifiedKFold(n_splits=self.n_folds, shuffle=True, random_state=42)

        for i, (name, model) in enumerate(self.base_models):
            print(f"Training base model: {name}")
            fold_models = []

            for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
                if hasattr(X, 'iloc'):
                    X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
                    y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
                else:
                    X_tr, X_val = X[train_idx], X[val_idx]
                    y_tr, y_val = y[train_idx], y[val_idx]

                model_clone = type(model)(**model.get_params())
                model_clone.fit(X_tr, y_tr)
                fold_models.append(model_clone)

                if hasattr(model_clone, 'predict_proba'):
                    meta_features_train[val_idx, i] = model_clone.predict_proba(X_val)[:, 1]
                else:
                    meta_features_train[val_idx, i] = model_clone.predict(X_val)

            self.trained_base_models.append((name, fold_models))

        self.meta_model.fit(meta_features_train, y)
        return self

    def predict_proba(self, X):
        meta_features_test = np.zeros((len(X), len(self.base_models)))
        for i, (name, fold_models) in enumerate(self.trained_base_models):
            fold_preds = []
            for model in fold_models:
                if hasattr(model, 'predict_proba'):
                    fold_preds.append(model.predict_proba(X)[:, 1])
                else:
                    fold_preds.append(model.predict(X))
            meta_features_test[:, i] = np.mean(fold_preds, axis=0)
        return self.meta_model.predict_proba(meta_features_test)

def weighted_blend(predictions, weights):
    """Weighted average blend of multiple model predictions"""
    weights = np.array(weights) / sum(weights)
    return sum(w * p for w, p in zip(weights, predictions))

10. TabNet (Deep Learning for Tabular Data)

TabNet Architecture

TabNet, published by Google in 2019, applies deep learning to tabular data using a Sequential Attention mechanism that dynamically selects which features to focus on at each step.

Key components:

  1. Feature Transformer: Shared and step-specific layers processing selected features
  2. Attentive Transformer: Generates sparse feature masks for each step
  3. Sequential Steps: Multiple steps of sequential feature selection
  4. Sparse Attention: Entropy regularization encourages sparse feature selection
from pytorch_tabnet.tab_model import TabNetClassifier, TabNetRegressor
import torch

def train_tabnet(X_train, y_train, X_val, y_val, cat_idxs=None, cat_dims=None):
    """Train TabNet with optional categorical embeddings"""

    if cat_idxs is None:
        cat_idxs = []
        cat_dims = []

    model = TabNetClassifier(
        # Architecture
        n_d=64,                   # Prediction layer dimension
        n_a=64,                   # Attention embedding dimension
        n_steps=5,                # Number of sequential attention steps
        gamma=1.3,                # Feature reusage coefficient (1.0-2.0)
        n_independent=2,          # Number of independent GLU layers
        n_shared=2,               # Number of shared GLU layers

        # Categorical embeddings
        cat_idxs=cat_idxs,
        cat_dims=cat_dims,
        cat_emb_dim=1,

        # Regularization
        lambda_sparse=1e-3,       # Sparsity regularization coefficient
        momentum=0.02,
        epsilon=1e-15,

        # Optimizer
        optimizer_fn=torch.optim.Adam,
        optimizer_params=dict(lr=2e-2),
        scheduler_params=dict(mode='min', patience=5, min_lr=1e-5, factor=0.9),
        scheduler_fn=torch.optim.lr_scheduler.ReduceLROnPlateau,
        mask_type='entmax',       # 'sparsemax' or 'entmax'

        verbose=10,
        seed=42,
        device_name='auto',
    )

    X_tr = X_train.values if hasattr(X_train, 'values') else X_train
    y_tr = y_train.values if hasattr(y_train, 'values') else y_train
    X_v = X_val.values if hasattr(X_val, 'values') else X_val
    y_v = y_val.values if hasattr(y_val, 'values') else y_val

    model.fit(
        X_train=X_tr, y_train=y_tr,
        eval_set=[(X_v, y_v)],
        eval_name=['val'],
        eval_metric=['auc'],
        max_epochs=200,
        patience=20,
        batch_size=1024,
        virtual_batch_size=128,  # Ghost batch normalization size
        num_workers=0,
        drop_last=False,
    )
    return model

11. Kaggle-Level Pipeline

Cross-Validation Strategies

from sklearn.model_selection import (
    StratifiedKFold, GroupKFold, StratifiedGroupKFold, TimeSeriesSplit
)

def kaggle_cv_pipeline(X, y, model, groups=None, n_folds=5):
    """Production-grade CV pipeline with leakage prevention"""

    if groups is not None:
        cv = StratifiedGroupKFold(n_splits=n_folds, shuffle=True, random_state=42)
        split_args = (X, y, groups)
    else:
        cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
        split_args = (X, y)

    oof_predictions = np.zeros(len(X))
    feature_importances = []

    for fold, (train_idx, val_idx) in enumerate(cv.split(*split_args)):
        if hasattr(X, 'iloc'):
            X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
        else:
            X_tr, X_val = X[train_idx], X[val_idx]
            y_tr, y_val = y[train_idx], y[val_idx]

        model.fit(X_tr, y_tr)

        if hasattr(model, 'predict_proba'):
            oof_predictions[val_idx] = model.predict_proba(X_val)[:, 1]
        else:
            oof_predictions[val_idx] = model.predict(X_val)

        if hasattr(model, 'feature_importances_'):
            feature_importances.append(model.feature_importances_)

        fold_score = roc_auc_score(y_val, oof_predictions[val_idx])
        print(f"Fold {fold+1} AUC: {fold_score:.5f}")

    overall_score = roc_auc_score(y, oof_predictions)
    print(f"Overall OOF AUC: {overall_score:.5f}")

    mean_importance = np.mean(feature_importances, axis=0) if feature_importances else None
    return oof_predictions, mean_importance, overall_score

def prevent_leakage_pipeline(X_train, X_val):
    """
    Leakage-free preprocessing pipeline.
    Critical rule: fit only on training data, transform both train and val.
    """
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler
    from sklearn.impute import SimpleImputer

    pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
    ])

    X_train_processed = pipeline.fit_transform(X_train)
    X_val_processed = pipeline.transform(X_val)

    return X_train_processed, X_val_processed, pipeline

12. Feature Selection

Boruta Algorithm

Boruta is a robust Random Forest-based feature selection algorithm. It creates shadow copies of all features and compares real feature importance against the best shadow feature.

from boruta import BorutaPy
from sklearn.ensemble import RandomForestClassifier

def boruta_feature_selection(X, y, max_iter=100):
    rf = RandomForestClassifier(
        n_estimators=200, max_depth=7, random_state=42, n_jobs=-1
    )

    boruta = BorutaPy(
        estimator=rf, n_estimators='auto',
        perc=100, alpha=0.05, max_iter=max_iter,
        random_state=42, verbose=1
    )
    boruta.fit(X.values, y.values)

    feature_ranking = pd.DataFrame({
        'feature': X.columns,
        'ranking': boruta.ranking_,
        'selected': boruta.support_,
        'tentative': boruta.support_weak_
    }).sort_values('ranking')

    selected = X.columns[boruta.support_].tolist()
    tentative = X.columns[boruta.support_weak_].tolist()

    print(f"Selected: {len(selected)}, Tentative: {len(tentative)}, "
          f"Rejected: {len(X.columns) - len(selected) - len(tentative)}")

    return selected, tentative, feature_ranking

def shap_feature_selection(model, X_val, threshold=0.01):
    """Select features based on mean absolute SHAP values"""
    import shap

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_val)
    shap_abs = np.abs(shap_values[1] if isinstance(shap_values, list) else shap_values)
    mean_shap = shap_abs.mean(axis=0)
    total_shap = mean_shap.sum()

    feature_importance = pd.DataFrame({
        'feature': X_val.columns,
        'mean_abs_shap': mean_shap,
        'shap_ratio': mean_shap / total_shap
    }).sort_values('mean_abs_shap', ascending=False)

    selected = feature_importance[feature_importance['mean_abs_shap'] > threshold * total_shap]
    print(f"SHAP selection: {len(selected)} / {len(X_val.columns)} features")
    return selected['feature'].tolist(), feature_importance

Complete Kaggle Pipeline Example

def complete_kaggle_pipeline(train_df, test_df, target_col, cat_features=None):
    """End-to-end Kaggle ML pipeline with ensemble"""

    y = train_df[target_col]
    X = train_df.drop(columns=[target_col])

    # Impute missing values
    for col in X.select_dtypes(include=[np.number]).columns:
        col_median = X[col].median()
        X[col] = X[col].fillna(col_median)
        test_df[col] = test_df[col].fillna(col_median)

    if cat_features:
        for col in cat_features:
            X[col] = X[col].astype('category').cat.codes
            test_df[col] = test_df[col].astype('category').cat.codes

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    xgb_oof = np.zeros(len(X))
    lgb_oof = np.zeros(len(X))
    cb_oof = np.zeros(len(X))
    xgb_test = np.zeros(len(test_df))
    lgb_test = np.zeros(len(test_df))
    cb_test = np.zeros(len(test_df))

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]

        # XGBoost
        xgb_m = xgb.XGBClassifier(
            n_estimators=1000, max_depth=6, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1,
            random_state=42, n_jobs=-1, eval_metric='auc'
        )
        xgb_m.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
                  early_stopping_rounds=50, verbose=False)
        xgb_oof[val_idx] = xgb_m.predict_proba(X_val)[:, 1]
        xgb_test += xgb_m.predict_proba(test_df)[:, 1] / 5

        # LightGBM
        lgb_m = lgb.LGBMClassifier(
            n_estimators=1000, num_leaves=31, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, random_state=42,
            n_jobs=-1, verbose=-1
        )
        lgb_m.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
                  callbacks=[lgb.early_stopping(50, verbose=False),
                             lgb.log_evaluation(-1)])
        lgb_oof[val_idx] = lgb_m.predict_proba(X_val)[:, 1]
        lgb_test += lgb_m.predict_proba(test_df)[:, 1] / 5

        # CatBoost
        cb_m = CatBoostClassifier(
            iterations=1000, depth=6, learning_rate=0.05,
            random_seed=42, verbose=0, early_stopping_rounds=50
        )
        cb_m.fit(X_tr, y_tr, eval_set=(X_val, y_val))
        cb_oof[val_idx] = cb_m.predict_proba(X_val)[:, 1]
        cb_test += cb_m.predict_proba(test_df)[:, 1] / 5

    print(f"XGBoost OOF AUC: {roc_auc_score(y, xgb_oof):.5f}")
    print(f"LightGBM OOF AUC: {roc_auc_score(y, lgb_oof):.5f}")
    print(f"CatBoost OOF AUC: {roc_auc_score(y, cb_oof):.5f}")

    ensemble_oof = (xgb_oof + lgb_oof + cb_oof) / 3
    ensemble_test = (xgb_test + lgb_test + cb_test) / 3
    print(f"Ensemble OOF AUC: {roc_auc_score(y, ensemble_oof):.5f}")

    return ensemble_test, ensemble_oof

Conclusion

This guide covered the complete pipeline for tabular data machine learning:

  1. EDA and preprocessing: Systematic approach to understanding data and handling missing values and outliers
  2. Tree-based models: Stepwise progression from decision trees to random forests and gradient boosting
  3. Modern GBM frameworks: Strengths, weaknesses, and use cases for XGBoost, LightGBM, and CatBoost
  4. Feature engineering: Translating domain knowledge into code with diverse techniques
  5. Ensembling: Maximizing performance by combining multiple models
  6. TabNet: Deep learning approach for tabular data
  7. Kaggle pipeline: A complete production-level workflow
  8. Feature selection: Removing irrelevant features to improve performance and interpretability

Key takeaways:

  • Always start with thorough EDA to understand your data
  • Design CV carefully to prevent leakage
  • Ensembles almost always outperform single models
  • Use SHAP values to interpret your models and combine with domain knowledge
  • LightGBM excels at speed, CatBoost at categorical handling, XGBoost at stability

References