Skip to content
Published on

表形式データML完全ガイド: XGBoost、LightGBM、CatBoost、TabNetをマスター

Authors

1. 表形式データ機械学習の概要

なぜツリーベースモデルは表形式データに強いのか?

行と列で構造化された表形式データは、実世界のビジネス問題の80%以上を占めます。金融詐欺検出、顧客離脱予測、不動産価格設定、医療診断など、あらゆるドメインで登場します。

ディープラーニングが画像、テキスト、音声に革命をもたらした一方で、勾配ブーストツリーは依然として表形式データのコンペティションを支配しています。その理由を見ていきましょう:

ツリーベースモデルの強み:

  1. 不規則な決定境界:ツリーは軸に平行な分割で特徴空間を分割し、複雑な非線形関係を自然に捉えます。
  2. スケール不変性:勾配ブースティングはデータのランク順序のみを使用するため、特徴量の正規化なしに良好に動作します。
  3. 欠損値の処理:XGBoost、LightGBM、CatBoostはいずれも欠損値を内部的に処理します。
  4. カテゴリカル特徴量:ラベルエンコーディングだけで良好なパフォーマンスが得られることが多いです。
  5. 解釈可能性:特徴重要度とSHAP値によりモデルの意思決定を説明できます。
  6. 過学習への耐性:アンサンブル手法は単一モデルよりもロバストです。

EDA(探索的データ分析)戦略

モデルトレーニング前にデータを深く理解することが重要です。

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

def eda_overview(df):
    """Print comprehensive overview of a dataframe"""
    print("=== Data Overview ===")
    print(f"Shape: {df.shape}")
    print(f"\nData Types:\n{df.dtypes}")
    print(f"\nMissing Values:\n{df.isnull().sum()}")
    print(f"\nMissing Ratios:\n{(df.isnull().mean() * 100).round(2)}")
    print(f"\nNumeric Summary:\n{df.describe()}")
    print(f"\nCategorical Feature Cardinality:")
    for col in df.select_dtypes(include='object').columns:
        print(f"  {col}: {df[col].nunique()} unique values")

def plot_target_distribution(df, target_col):
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    axes[0].hist(df[target_col], bins=50, color='steelblue', edgecolor='white')
    axes[0].set_title(f'{target_col} Distribution')
    stats.probplot(df[target_col].dropna(), dist="norm", plot=axes[1])
    axes[1].set_title('Q-Q Plot (Normality Check)')
    plt.tight_layout()
    plt.show()

def detect_outliers_iqr(df, columns):
    """Detect outliers using IQR method"""
    outlier_info = {}
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower = Q1 - 1.5 * IQR
        upper = Q3 + 1.5 * IQR
        outliers = df[(df[col] < lower) | (df[col] > upper)]
        outlier_info[col] = {
            'count': len(outliers),
            'ratio': len(outliers) / len(df),
            'lower_bound': lower,
            'upper_bound': upper
        }
    return pd.DataFrame(outlier_info).T

欠損値処理戦略

from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

def simple_imputation(df):
    """Median imputation for numeric, mode for categorical"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    num_imputer = SimpleImputer(strategy='median')
    df[numeric_cols] = num_imputer.fit_transform(df[numeric_cols])

    cat_cols = df.select_dtypes(include='object').columns
    cat_imputer = SimpleImputer(strategy='most_frequent')
    df[cat_cols] = cat_imputer.fit_transform(df[cat_cols])
    return df

def knn_imputation(df, n_neighbors=5):
    """KNN-based imputation (effective for small datasets)"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    imputer = KNNImputer(n_neighbors=n_neighbors)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    return df

def mice_imputation(df, max_iter=10):
    """MICE: Multiple Imputation by Chained Equations"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    imputer = IterativeImputer(max_iter=max_iter, random_state=42)
    df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
    return df

def add_missing_indicators(df):
    """Add binary columns indicating where values were missing"""
    cols_with_missing = df.columns[df.isnull().any()].tolist()
    for col in cols_with_missing:
        df[f'{col}_missing'] = df[col].isnull().astype(int)
    return df

2. 決定木

ID3とCARTアルゴリズム

決定木はデータを再帰的に分割してツリー構造を構築します。2つの主要なアルゴリズムがあります:

ID3 (Iterative Dichotomiser 3):

  • 分割基準として情報利得を使用
  • カテゴリカル特徴量のみを処理(多方向分割)
  • カテゴリカルターゲットのみ対応

CART (Classification and Regression Trees):

  • 分類:ジニ不純度
  • 回帰:平均二乗誤差
  • 常に二項分割(2つの子ノード)
  • scikit-learnで使用されるアルゴリズム

情報利得とジニ不純度

エントロピーと情報利得:

エントロピーはデータセットの不純度を測定します。k個のクラスの場合:

H(S) = -sum(p_i * log2(p_i))

情報利得は分割後のエントロピー減少量です:

IG(S, A) = H(S) - sum(|S_v|/|S| * H(S_v))

ジニ不純度:

Gini(S) = 1 - sum(p_i^2)
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.tree import export_text, plot_tree
import matplotlib.pyplot as plt

def train_decision_tree(X_train, y_train, task='classification'):
    if task == 'classification':
        model = DecisionTreeClassifier(
            criterion='gini',        # 'gini' or 'entropy'
            max_depth=5,
            min_samples_split=10,
            min_samples_leaf=5,
            max_features=None,
            random_state=42
        )
    else:
        model = DecisionTreeRegressor(
            criterion='squared_error',
            max_depth=5,
            min_samples_split=10,
            min_samples_leaf=5,
            random_state=42
        )
    model.fit(X_train, y_train)
    return model

def visualize_tree(model, feature_names, class_names=None, max_depth=3):
    plt.figure(figsize=(20, 10))
    plot_tree(
        model, feature_names=feature_names, class_names=class_names,
        filled=True, rounded=True, max_depth=max_depth, fontsize=10
    )
    plt.title('Decision Tree Visualization')
    plt.tight_layout()
    plt.show()
    print(export_text(model, feature_names=feature_names, max_depth=3))

3. ランダムフォレスト

バギングと特徴量ランダム化

ランダムフォレストはブートストラップ集約(バギング)特徴量ランダム化を組み合わせます:

  1. トレーニングデータからブートストラップサンプル(復元抽出)を生成
  2. 各サンプルで独立した決定木を訓練
  3. 各分割で sqrt(n_features) 個の特徴量のみをランダムに選択して検討
  4. 予測を平均(回帰)または多数決(分類)で集約
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.inspection import permutation_importance

def train_random_forest(X_train, y_train, task='classification'):
    if task == 'classification':
        model = RandomForestClassifier(
            n_estimators=300,
            max_depth=None,
            min_samples_split=2,
            min_samples_leaf=1,
            max_features='sqrt',
            bootstrap=True,
            oob_score=True,
            n_jobs=-1,
            random_state=42
        )
    else:
        model = RandomForestRegressor(
            n_estimators=300,
            max_features='sqrt',
            oob_score=True,
            n_jobs=-1,
            random_state=42
        )
    model.fit(X_train, y_train)
    print(f"OOB Score: {model.oob_score_:.4f}")
    return model

def plot_feature_importance(model, feature_names, top_n=20):
    importances = pd.Series(
        model.feature_importances_, index=feature_names
    ).sort_values(ascending=False)

    plt.figure(figsize=(10, 8))
    importances.head(top_n).plot(kind='barh', color='steelblue')
    plt.title(f'Random Forest Feature Importance (Top {top_n})')
    plt.xlabel('Importance')
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()
    return importances

def compute_permutation_importance(model, X_val, y_val, feature_names):
    result = permutation_importance(
        model, X_val, y_val, n_repeats=10, random_state=42, n_jobs=-1
    )
    return pd.DataFrame({
        'feature': feature_names,
        'importance_mean': result.importances_mean,
        'importance_std': result.importances_std
    }).sort_values('importance_mean', ascending=False)

4. 勾配ブースティング

数学的原理

勾配ブースティングは負の勾配の方向にツリーを順番に追加して損失関数を最小化します:

F_m(x) = F_{m-1}(x) + learning_rate * h_m(x)

ここでは、ステージmのツリーが疑似残差にフィットされます:

r_i = -[dL(y_i, F(x_i)) / dF(x_i)]

MSE損失を使った回帰では、疑似残差はターゲット値から現在のアンサンブル予測を引いた通常の残差になります。 対数損失を使ったバイナリ分類では、疑似残差はターゲット値から現在のアンサンブルスコアにシグモイドを適用した値を引いたものになります。

from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor

def train_sklearn_gbm(X_train, y_train, task='classification'):
    if task == 'classification':
        model = GradientBoostingClassifier(
            n_estimators=200, learning_rate=0.1, max_depth=3,
            subsample=0.8, max_features='sqrt', random_state=42
        )
    else:
        model = GradientBoostingRegressor(
            n_estimators=200, learning_rate=0.1, max_depth=3,
            subsample=0.8, random_state=42
        )
    model.fit(X_train, y_train)
    return model

5. XGBoost

XGBoostのイノベーション

XGBoost(eXtreme Gradient Boosting)はChen & Guestrin(2016年)が開発し、バニラGBMに対して大幅な改善をもたらします:

  1. 正則化:過学習を防ぐためL1とL2正則化を目的関数に追加
  2. 2次テイラー展開:1次と2次の導関数を使ったより正確なツリー構造探索
  3. 欠損値学習:各分割で欠損サンプルの最適なデフォルト方向を学習
  4. 並列処理:並列特徴分割探索(ツリー構築自体は逐次的)
  5. キャッシュ最適化:速度のためのメモリアクセスパターン最適化
  6. ブロック構造:スパースデータのための圧縮列ストレージ

完全ハイパーパラメータリファレンス

import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

def train_xgboost_full(X, y, task='binary'):
    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42,
        stratify=y if task == 'binary' else None
    )

    if task == 'binary':
        objective = 'binary:logistic'
        eval_metric = 'auc'
    elif task == 'multiclass':
        objective = 'multi:softprob'
        eval_metric = 'mlogloss'
    else:
        objective = 'reg:squarederror'
        eval_metric = 'rmse'

    model = xgb.XGBClassifier(
        # Tree parameters
        n_estimators=1000,
        max_depth=6,            # Default: 6. Deeper = more complex
        min_child_weight=1,     # Minimum sum of instance weight in a leaf
        gamma=0,                # Minimum loss reduction for a split
        max_delta_step=0,       # Max delta step per tree weight (useful for imbalance)

        # Sampling
        subsample=0.8,          # Row sampling ratio per tree
        colsample_bytree=0.8,   # Column sampling ratio per tree
        colsample_bylevel=1.0,  # Column sampling per level
        colsample_bynode=1.0,   # Column sampling per node

        # Regularization
        reg_alpha=0.1,          # L1 regularization
        reg_lambda=1.0,         # L2 regularization (default: 1)

        # Learning
        learning_rate=0.01,     # Step size shrinkage (eta)
        scale_pos_weight=1,     # For imbalanced: neg/pos ratio

        # System
        objective=objective,
        eval_metric=eval_metric,
        tree_method='hist',     # 'hist' (fast), 'exact', 'approx', 'gpu_hist'
        random_state=42,
        n_jobs=-1,
    )

    model.fit(
        X_train, y_train,
        eval_set=[(X_train, y_train), (X_val, y_val)],
        early_stopping_rounds=50,
        verbose=100,
    )

    print(f"Best iteration: {model.best_iteration}")
    print(f"Best score: {model.best_score:.4f}")
    return model, X_val, y_val

def explain_with_shap(model, X_val, feature_names):
    """SHAP-based model explanation"""
    import shap

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_val)

    plt.figure(figsize=(10, 8))
    shap.summary_plot(shap_values, X_val, feature_names=feature_names, show=False)
    plt.tight_layout()
    plt.show()

    mean_abs_shap = pd.DataFrame({
        'feature': feature_names,
        'importance': np.abs(shap_values).mean(axis=0)
    }).sort_values('importance', ascending=False)

    return mean_abs_shap

GPU加速XGBoost

# GPU training (requires NVIDIA GPU)
model_gpu = xgb.XGBClassifier(
    n_estimators=1000,
    tree_method='gpu_hist',     # GPU histogram method
    predictor='gpu_predictor',  # GPU prediction
    device='cuda',              # XGBoost 2.0+: use device='cuda'
    random_state=42
)

6. LightGBM

リーフワイズとレベルワイズのツリー成長

LightGBM(Light Gradient Boosting Machine)はMicrosoftが開発し、速度とメモリ効率のための重要なイノベーションを導入します。

レベルワイズ(従来手法):

  • 深さレベルごとに全リーフを同時に分割
  • バランスの取れたツリーを生成
  • 利得の少ないノードの計算を無駄にする

リーフワイズ(LightGBM):

  • 常に最大の損失削減を持つ単一のリーフを分割
  • 高度に不均衡なツリーが生成される可能性あり
  • 同じ数のリーフでより低い損失を達成
  • 過学習を防ぐために max_depth が必要

GOSSとEFB

GOSS (Gradient-based One-Side Sampling):

  • 大きな勾配を持つサンプル(高情報量)を全て保持
  • 小さな勾配のサンプル(既によく学習済み)をランダムに削除
  • 学習精度を保ちながらデータサイズを削減

EFB (Exclusive Feature Bundling):

  • 相互に排他的なスパース特徴量をバンドル(同時に非ゼロにならないもの)
  • 特徴量数を削減し、トレーニングを高速化(スパースデータに特に効果的)
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
import numpy as np

def train_lightgbm_cv(X, y, n_folds=5):
    """Train LightGBM with Stratified K-Fold CV"""
    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
    oof_preds = np.zeros(len(X))
    models = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        print(f"\n--- Fold {fold + 1}/{n_folds} ---")
        X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

        model = lgb.LGBMClassifier(
            n_estimators=2000,
            num_leaves=31,       # Main complexity parameter
            max_depth=-1,        # -1 means unlimited (control via num_leaves)
            min_child_samples=20,
            learning_rate=0.05,
            subsample=0.8,
            subsample_freq=1,
            colsample_bytree=0.8,
            reg_alpha=0.1,
            reg_lambda=0.1,
            boosting_type='gbdt',  # 'gbdt', 'dart', 'goss', 'rf'
            random_state=42,
            n_jobs=-1,
            verbose=-1,
        )

        model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            callbacks=[
                lgb.early_stopping(stopping_rounds=100, verbose=True),
                lgb.log_evaluation(period=200),
            ],
        )

        oof_preds[val_idx] = model.predict_proba(X_val)[:, 1]
        models.append(model)

        fold_auc = roc_auc_score(y_val, oof_preds[val_idx])
        print(f"Fold {fold+1} AUC: {fold_auc:.4f}")

    overall_auc = roc_auc_score(y, oof_preds)
    print(f"\nOverall OOF AUC: {overall_auc:.4f}")
    return models, oof_preds

def lgbm_with_categorical(X_train, y_train, X_val, y_val, cat_features):
    """LightGBM with native categorical feature handling"""
    for col in cat_features:
        X_train[col] = X_train[col].astype('category').cat.codes
        X_val[col] = X_val[col].astype('category').cat.codes

    train_data = lgb.Dataset(
        X_train, y_train,
        categorical_feature=cat_features,
        free_raw_data=False
    )
    val_data = lgb.Dataset(X_val, y_val, reference=train_data)

    params = {
        'objective': 'binary', 'metric': 'auc',
        'num_leaves': 31, 'learning_rate': 0.05, 'verbose': -1,
    }

    model = lgb.train(
        params, train_data, num_boost_round=1000,
        valid_sets=[val_data],
        callbacks=[lgb.early_stopping(100), lgb.log_evaluation(100)],
    )
    return model

7. CatBoost

カテゴリカル特徴量の自動処理

CatBoost(Categorical Boosting)はYandexが開発し、カテゴリカル特徴量をネイティブに処理することに優れています。

CatBoostのカテゴリカルエンコーディング:

  1. ターゲット統計量(TS):ターゲット平均を使って各カテゴリをエンコード
  2. 順序付きターゲット統計量:データ順序を使ったリーク不要のTS
  3. ワンホットエンコーディング:低カーディナリティ特徴量に自動適用

順序付きブースティング

CatBoostのキーイノベーション、順序付きブースティングは、ターゲット統計量の計算時に生じる予測シフトバイアスを解決します:

  • データをランダムな順序で並べる
  • サンプル i の統計量はサンプル 0 から i-1 のみを使って計算
  • 以前に見たサンプルのみを使用し、リークを排除
from catboost import CatBoostClassifier, CatBoostRegressor, Pool

def train_catboost(X_train, y_train, X_val, y_val, cat_features=None):
    """CatBoost training — can pass raw string categories directly"""

    train_pool = Pool(data=X_train, label=y_train, cat_features=cat_features)
    val_pool = Pool(data=X_val, label=y_val, cat_features=cat_features)

    model = CatBoostClassifier(
        iterations=1000,
        learning_rate=0.05,
        depth=6,
        l2_leaf_reg=3.0,
        subsample=0.8,
        colsample_bylevel=0.8,
        cat_features=cat_features,
        one_hot_max_size=2,
        boosting_type='Ordered',
        bootstrap_type='Bayesian',
        bagging_temperature=1.0,
        loss_function='Logloss',
        eval_metric='AUC',
        task_type='CPU',
        random_seed=42,
        verbose=100,
        use_best_model=True,
        early_stopping_rounds=50,
    )

    model.fit(train_pool, eval_set=val_pool, plot=False)
    print(f"Best iteration: {model.get_best_iteration()}")
    print(f"Best score: {model.get_best_score()}")
    return model

def compare_gbm_models(X, y):
    """Compare performance and speed across the three GBM frameworks"""
    import time
    from sklearn.model_selection import cross_val_score

    results = {}

    start = time.time()
    xgb_model = xgb.XGBClassifier(
        n_estimators=300, max_depth=6, learning_rate=0.1,
        subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1
    )
    xgb_scores = cross_val_score(xgb_model, X, y, cv=5, scoring='roc_auc')
    results['XGBoost'] = {
        'mean_auc': xgb_scores.mean(), 'std_auc': xgb_scores.std(),
        'time_sec': time.time() - start
    }

    start = time.time()
    lgb_model = lgb.LGBMClassifier(
        n_estimators=300, num_leaves=31, learning_rate=0.1,
        subsample=0.8, colsample_bytree=0.8, random_state=42,
        n_jobs=-1, verbose=-1
    )
    lgb_scores = cross_val_score(lgb_model, X, y, cv=5, scoring='roc_auc')
    results['LightGBM'] = {
        'mean_auc': lgb_scores.mean(), 'std_auc': lgb_scores.std(),
        'time_sec': time.time() - start
    }

    start = time.time()
    cb_model = CatBoostClassifier(
        iterations=300, depth=6, learning_rate=0.1, random_seed=42, verbose=0
    )
    cb_scores = cross_val_score(cb_model, X, y, cv=5, scoring='roc_auc')
    results['CatBoost'] = {
        'mean_auc': cb_scores.mean(), 'std_auc': cb_scores.std(),
        'time_sec': time.time() - start
    }

    result_df = pd.DataFrame(results).T
    print("=== GBM Model Comparison ===")
    print(result_df.to_string())
    return result_df

XGBoost vs LightGBM vs CatBoost 比較:

側面XGBoostLightGBMCatBoost
ツリー成長レベルワイズリーフワイズ対称
速度中程度速い中程度-速い
メモリ中程度低い中程度
カテゴリカル手動組み込み(限定)自動(最良)
GPUサポートありありあり
ハイパーパラメータ多い多い少ない
最適な用途数値特徴量大規模データセットカテゴリカル特徴量

8. 特徴エンジニアリング

数値特徴量の変換

from sklearn.preprocessing import (
    StandardScaler, MinMaxScaler, RobustScaler,
    PowerTransformer, QuantileTransformer, KBinsDiscretizer
)
from scipy.stats import boxcox
import numpy as np

def transform_numeric_features(df, columns):
    """Apply various numeric transformations"""
    transformed = df.copy()

    for col in columns:
        # 1. Log transform (right-skewed, positive values)
        if (df[col] > 0).all():
            transformed[f'{col}_log'] = np.log1p(df[col])

        # 2. Square root transform
        if (df[col] >= 0).all():
            transformed[f'{col}_sqrt'] = np.sqrt(df[col])

        # 3. Box-Cox transform (positive values only)
        if (df[col] > 0).all():
            transformed[f'{col}_boxcox'], _ = boxcox(df[col] + 1)

        # 4. Yeo-Johnson transform (handles negative values)
        pt = PowerTransformer(method='yeo-johnson')
        transformed[f'{col}_yeojohnson'] = pt.fit_transform(df[[col]])

        # 5. Quantile transform (maps to normal distribution)
        qt = QuantileTransformer(output_distribution='normal', n_quantiles=1000)
        transformed[f'{col}_quantile'] = qt.fit_transform(df[[col]])

    return transformed

def create_bins(df, col, n_bins=10, strategy='quantile'):
    """Discretize continuous variable into bins"""
    kbd = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy=strategy)
    df[f'{col}_bin'] = kbd.fit_transform(df[[col]]).astype(int)
    return df

カテゴリカルエンコーディング

from category_encoders import (
    TargetEncoder, LeaveOneOutEncoder,
    CatBoostEncoder, BinaryEncoder
)

def encode_categorical_features(X_train, X_val, y_train, cat_features):
    """Multiple categorical encoding strategies"""
    results = {}

    # Target Encoding (medium-high cardinality)
    # Warning: can cause target leakage — only use inside CV folds
    te = TargetEncoder(cols=cat_features, smoothing=1.0)
    results['target_enc'] = (
        te.fit_transform(X_train, y_train),
        te.transform(X_val)
    )

    # Leave-One-Out Encoding (prevents target leakage)
    loo = LeaveOneOutEncoder(cols=cat_features, sigma=0.05)
    results['loo_enc'] = (
        loo.fit_transform(X_train, y_train),
        loo.transform(X_val)
    )

    # CatBoost Encoding (Ordered Target Statistics)
    cbe = CatBoostEncoder(cols=cat_features)
    results['catboost_enc'] = (
        cbe.fit_transform(X_train, y_train),
        cbe.transform(X_val)
    )

    return results

def extract_datetime_features(df, date_col):
    """Extract rich datetime features"""
    df[date_col] = pd.to_datetime(df[date_col])
    df[f'{date_col}_year'] = df[date_col].dt.year
    df[f'{date_col}_month'] = df[date_col].dt.month
    df[f'{date_col}_day'] = df[date_col].dt.day
    df[f'{date_col}_dayofweek'] = df[date_col].dt.dayofweek
    df[f'{date_col}_dayofyear'] = df[date_col].dt.dayofyear
    df[f'{date_col}_weekofyear'] = df[date_col].dt.isocalendar().week.astype(int)
    df[f'{date_col}_quarter'] = df[date_col].dt.quarter
    df[f'{date_col}_hour'] = df[date_col].dt.hour
    df[f'{date_col}_is_weekend'] = (df[date_col].dt.dayofweek >= 5).astype(int)
    df[f'{date_col}_is_month_end'] = df[date_col].dt.is_month_end.astype(int)

    # Cyclical encoding (sin/cos for periodic features)
    df[f'{date_col}_month_sin'] = np.sin(2 * np.pi * df[f'{date_col}_month'] / 12)
    df[f'{date_col}_month_cos'] = np.cos(2 * np.pi * df[f'{date_col}_month'] / 12)
    df[f'{date_col}_hour_sin'] = np.sin(2 * np.pi * df[f'{date_col}_hour'] / 24)
    df[f'{date_col}_hour_cos'] = np.cos(2 * np.pi * df[f'{date_col}_hour'] / 24)
    return df

def create_aggregate_features(df, group_cols, agg_cols):
    """Create group-level aggregation features"""
    for group_col in group_cols:
        for agg_col in agg_cols:
            prefix = f'{agg_col}_by_{group_col}'
            agg = df.groupby(group_col)[agg_col].agg([
                'mean', 'std', 'min', 'max', 'median'
            ])
            agg.columns = [
                f'{prefix}_mean', f'{prefix}_std',
                f'{prefix}_min', f'{prefix}_max', f'{prefix}_median'
            ]
            df = df.join(agg, on=group_col)
    return df

9. アンサンブル手法

スタッキング(メタ学習)

スタッキングはベースモデルからの予測を特徴量として使用し、メタモデルを訓練します。

from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
import numpy as np

class StackingEnsemble:
    """Full stacking ensemble implementation"""

    def __init__(self, base_models, meta_model, n_folds=5):
        self.base_models = base_models
        self.meta_model = meta_model
        self.n_folds = n_folds
        self.trained_base_models = []

    def fit(self, X, y):
        meta_features_train = np.zeros((len(X), len(self.base_models)))
        skf = StratifiedKFold(n_splits=self.n_folds, shuffle=True, random_state=42)

        for i, (name, model) in enumerate(self.base_models):
            print(f"Training base model: {name}")
            fold_models = []

            for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
                if hasattr(X, 'iloc'):
                    X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
                    y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
                else:
                    X_tr, X_val = X[train_idx], X[val_idx]
                    y_tr, y_val = y[train_idx], y[val_idx]

                model_clone = type(model)(**model.get_params())
                model_clone.fit(X_tr, y_tr)
                fold_models.append(model_clone)

                if hasattr(model_clone, 'predict_proba'):
                    meta_features_train[val_idx, i] = model_clone.predict_proba(X_val)[:, 1]
                else:
                    meta_features_train[val_idx, i] = model_clone.predict(X_val)

            self.trained_base_models.append((name, fold_models))

        self.meta_model.fit(meta_features_train, y)
        return self

    def predict_proba(self, X):
        meta_features_test = np.zeros((len(X), len(self.base_models)))
        for i, (name, fold_models) in enumerate(self.trained_base_models):
            fold_preds = []
            for model in fold_models:
                if hasattr(model, 'predict_proba'):
                    fold_preds.append(model.predict_proba(X)[:, 1])
                else:
                    fold_preds.append(model.predict(X))
            meta_features_test[:, i] = np.mean(fold_preds, axis=0)
        return self.meta_model.predict_proba(meta_features_test)

def weighted_blend(predictions, weights):
    """Weighted average blend of multiple model predictions"""
    weights = np.array(weights) / sum(weights)
    return sum(w * p for w, p in zip(weights, predictions))

10. TabNet(表形式データ向けディープラーニング)

TabNetアーキテクチャ

TabNetは2019年にGoogleが発表し、各ステップでどの特徴量に注目するかを動的に選択する逐次的アテンションメカニズムを使って表形式データにディープラーニングを適用します。

主要コンポーネント:

  1. 特徴量トランスフォーマー:選択された特徴量を処理する共有とステップ固有のレイヤー
  2. アテンション付きトランスフォーマー:各ステップのスパース特徴量マスクを生成
  3. 逐次ステップ:逐次特徴量選択の複数ステップ
  4. スパースアテンション:エントロピー正則化がスパースな特徴量選択を促進
from pytorch_tabnet.tab_model import TabNetClassifier, TabNetRegressor
import torch

def train_tabnet(X_train, y_train, X_val, y_val, cat_idxs=None, cat_dims=None):
    """Train TabNet with optional categorical embeddings"""

    if cat_idxs is None:
        cat_idxs = []
        cat_dims = []

    model = TabNetClassifier(
        # Architecture
        n_d=64,                   # Prediction layer dimension
        n_a=64,                   # Attention embedding dimension
        n_steps=5,                # Number of sequential attention steps
        gamma=1.3,                # Feature reusage coefficient (1.0-2.0)
        n_independent=2,          # Number of independent GLU layers
        n_shared=2,               # Number of shared GLU layers

        # Categorical embeddings
        cat_idxs=cat_idxs,
        cat_dims=cat_dims,
        cat_emb_dim=1,

        # Regularization
        lambda_sparse=1e-3,       # Sparsity regularization coefficient
        momentum=0.02,
        epsilon=1e-15,

        # Optimizer
        optimizer_fn=torch.optim.Adam,
        optimizer_params=dict(lr=2e-2),
        scheduler_params=dict(mode='min', patience=5, min_lr=1e-5, factor=0.9),
        scheduler_fn=torch.optim.lr_scheduler.ReduceLROnPlateau,
        mask_type='entmax',       # 'sparsemax' or 'entmax'

        verbose=10,
        seed=42,
        device_name='auto',
    )

    X_tr = X_train.values if hasattr(X_train, 'values') else X_train
    y_tr = y_train.values if hasattr(y_train, 'values') else y_train
    X_v = X_val.values if hasattr(X_val, 'values') else X_val
    y_v = y_val.values if hasattr(y_val, 'values') else y_val

    model.fit(
        X_train=X_tr, y_train=y_tr,
        eval_set=[(X_v, y_v)],
        eval_name=['val'],
        eval_metric=['auc'],
        max_epochs=200,
        patience=20,
        batch_size=1024,
        virtual_batch_size=128,  # Ghost batch normalization size
        num_workers=0,
        drop_last=False,
    )
    return model

11. Kaggleレベルパイプライン

クロスバリデーション戦略

from sklearn.model_selection import (
    StratifiedKFold, GroupKFold, StratifiedGroupKFold, TimeSeriesSplit
)

def kaggle_cv_pipeline(X, y, model, groups=None, n_folds=5):
    """Production-grade CV pipeline with leakage prevention"""

    if groups is not None:
        cv = StratifiedGroupKFold(n_splits=n_folds, shuffle=True, random_state=42)
        split_args = (X, y, groups)
    else:
        cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
        split_args = (X, y)

    oof_predictions = np.zeros(len(X))
    feature_importances = []

    for fold, (train_idx, val_idx) in enumerate(cv.split(*split_args)):
        if hasattr(X, 'iloc'):
            X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
        else:
            X_tr, X_val = X[train_idx], X[val_idx]
            y_tr, y_val = y[train_idx], y[val_idx]

        model.fit(X_tr, y_tr)

        if hasattr(model, 'predict_proba'):
            oof_predictions[val_idx] = model.predict_proba(X_val)[:, 1]
        else:
            oof_predictions[val_idx] = model.predict(X_val)

        if hasattr(model, 'feature_importances_'):
            feature_importances.append(model.feature_importances_)

        fold_score = roc_auc_score(y_val, oof_predictions[val_idx])
        print(f"Fold {fold+1} AUC: {fold_score:.5f}")

    overall_score = roc_auc_score(y, oof_predictions)
    print(f"Overall OOF AUC: {overall_score:.5f}")

    mean_importance = np.mean(feature_importances, axis=0) if feature_importances else None
    return oof_predictions, mean_importance, overall_score

def prevent_leakage_pipeline(X_train, X_val):
    """
    Leakage-free preprocessing pipeline.
    Critical rule: fit only on training data, transform both train and val.
    """
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler
    from sklearn.impute import SimpleImputer

    pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
    ])

    X_train_processed = pipeline.fit_transform(X_train)
    X_val_processed = pipeline.transform(X_val)

    return X_train_processed, X_val_processed, pipeline

12. 特徴量選択

Borutaアルゴリズム

BorutaはロバストなランダムフォレストベースのF特徴量選択アルゴリズムです。全特徴量のシャドウコピーを作成し、実際の特徴重要度を最良のシャドウ特徴と比較します。

from boruta import BorutaPy
from sklearn.ensemble import RandomForestClassifier

def boruta_feature_selection(X, y, max_iter=100):
    rf = RandomForestClassifier(
        n_estimators=200, max_depth=7, random_state=42, n_jobs=-1
    )

    boruta = BorutaPy(
        estimator=rf, n_estimators='auto',
        perc=100, alpha=0.05, max_iter=max_iter,
        random_state=42, verbose=1
    )
    boruta.fit(X.values, y.values)

    feature_ranking = pd.DataFrame({
        'feature': X.columns,
        'ranking': boruta.ranking_,
        'selected': boruta.support_,
        'tentative': boruta.support_weak_
    }).sort_values('ranking')

    selected = X.columns[boruta.support_].tolist()
    tentative = X.columns[boruta.support_weak_].tolist()

    print(f"Selected: {len(selected)}, Tentative: {len(tentative)}, "
          f"Rejected: {len(X.columns) - len(selected) - len(tentative)}")

    return selected, tentative, feature_ranking

def shap_feature_selection(model, X_val, threshold=0.01):
    """Select features based on mean absolute SHAP values"""
    import shap

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_val)
    shap_abs = np.abs(shap_values[1] if isinstance(shap_values, list) else shap_values)
    mean_shap = shap_abs.mean(axis=0)
    total_shap = mean_shap.sum()

    feature_importance = pd.DataFrame({
        'feature': X_val.columns,
        'mean_abs_shap': mean_shap,
        'shap_ratio': mean_shap / total_shap
    }).sort_values('mean_abs_shap', ascending=False)

    selected = feature_importance[feature_importance['mean_abs_shap'] > threshold * total_shap]
    print(f"SHAP selection: {len(selected)} / {len(X_val.columns)} features")
    return selected['feature'].tolist(), feature_importance

完全なKaggleパイプライン例

def complete_kaggle_pipeline(train_df, test_df, target_col, cat_features=None):
    """End-to-end Kaggle ML pipeline with ensemble"""

    y = train_df[target_col]
    X = train_df.drop(columns=[target_col])

    # Impute missing values
    for col in X.select_dtypes(include=[np.number]).columns:
        col_median = X[col].median()
        X[col] = X[col].fillna(col_median)
        test_df[col] = test_df[col].fillna(col_median)

    if cat_features:
        for col in cat_features:
            X[col] = X[col].astype('category').cat.codes
            test_df[col] = test_df[col].astype('category').cat.codes

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    xgb_oof = np.zeros(len(X))
    lgb_oof = np.zeros(len(X))
    cb_oof = np.zeros(len(X))
    xgb_test = np.zeros(len(test_df))
    lgb_test = np.zeros(len(test_df))
    cb_test = np.zeros(len(test_df))

    for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
        X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
        y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]

        # XGBoost
        xgb_m = xgb.XGBClassifier(
            n_estimators=1000, max_depth=6, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1,
            random_state=42, n_jobs=-1, eval_metric='auc'
        )
        xgb_m.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
                  early_stopping_rounds=50, verbose=False)
        xgb_oof[val_idx] = xgb_m.predict_proba(X_val)[:, 1]
        xgb_test += xgb_m.predict_proba(test_df)[:, 1] / 5

        # LightGBM
        lgb_m = lgb.LGBMClassifier(
            n_estimators=1000, num_leaves=31, learning_rate=0.05,
            subsample=0.8, colsample_bytree=0.8, random_state=42,
            n_jobs=-1, verbose=-1
        )
        lgb_m.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
                  callbacks=[lgb.early_stopping(50, verbose=False),
                             lgb.log_evaluation(-1)])
        lgb_oof[val_idx] = lgb_m.predict_proba(X_val)[:, 1]
        lgb_test += lgb_m.predict_proba(test_df)[:, 1] / 5

        # CatBoost
        cb_m = CatBoostClassifier(
            iterations=1000, depth=6, learning_rate=0.05,
            random_seed=42, verbose=0, early_stopping_rounds=50
        )
        cb_m.fit(X_tr, y_tr, eval_set=(X_val, y_val))
        cb_oof[val_idx] = cb_m.predict_proba(X_val)[:, 1]
        cb_test += cb_m.predict_proba(test_df)[:, 1] / 5

    print(f"XGBoost OOF AUC: {roc_auc_score(y, xgb_oof):.5f}")
    print(f"LightGBM OOF AUC: {roc_auc_score(y, lgb_oof):.5f}")
    print(f"CatBoost OOF AUC: {roc_auc_score(y, cb_oof):.5f}")

    ensemble_oof = (xgb_oof + lgb_oof + cb_oof) / 3
    ensemble_test = (xgb_test + lgb_test + cb_test) / 3
    print(f"Ensemble OOF AUC: {roc_auc_score(y, ensemble_oof):.5f}")

    return ensemble_test, ensemble_oof

まとめ

このガイドでは表形式データ機械学習の完全なパイプラインを解説しました:

  1. EDAと前処理:データを理解し、欠損値や外れ値を処理するための体系的なアプローチ
  2. ツリーベースモデル:決定木からランダムフォレスト、勾配ブースティングへの段階的な発展
  3. モダンなGBMフレームワーク:XGBoost、LightGBM、CatBoostの強み、弱み、ユースケース
  4. 特徴エンジニアリング:多様なテクニックでドメイン知識をコードに変換する
  5. アンサンブリング:複数のモデルを組み合わせてパフォーマンスを最大化する
  6. TabNet:表形式データ向けディープラーニングアプローチ
  7. Kaggleパイプライン:完全なプロダクションレベルのワークフロー
  8. 特徴量選択:無関係な特徴量を削除してパフォーマンスと解釈可能性を改善する

重要なポイント:

  • 常に徹底的なEDAからスタートしてデータを理解する
  • リークを防ぐためにCVを慎重に設計する
  • アンサンブルはほぼ常に単一モデルを上回る
  • SHAP値を使ってモデルを解釈し、ドメイン知識と組み合わせる
  • LightGBMは速度に優れ、CatBoostはカテゴリカル処理に優れ、XGBoostは安定性に優れる

参考文献