- Authors

- Name
- Youngju Kim
- @fjvbn20031
1. 表形式データ機械学習の概要
なぜツリーベースモデルは表形式データに強いのか?
行と列で構造化された表形式データは、実世界のビジネス問題の80%以上を占めます。金融詐欺検出、顧客離脱予測、不動産価格設定、医療診断など、あらゆるドメインで登場します。
ディープラーニングが画像、テキスト、音声に革命をもたらした一方で、勾配ブーストツリーは依然として表形式データのコンペティションを支配しています。その理由を見ていきましょう:
ツリーベースモデルの強み:
- 不規則な決定境界:ツリーは軸に平行な分割で特徴空間を分割し、複雑な非線形関係を自然に捉えます。
- スケール不変性:勾配ブースティングはデータのランク順序のみを使用するため、特徴量の正規化なしに良好に動作します。
- 欠損値の処理:XGBoost、LightGBM、CatBoostはいずれも欠損値を内部的に処理します。
- カテゴリカル特徴量:ラベルエンコーディングだけで良好なパフォーマンスが得られることが多いです。
- 解釈可能性:特徴重要度とSHAP値によりモデルの意思決定を説明できます。
- 過学習への耐性:アンサンブル手法は単一モデルよりもロバストです。
EDA(探索的データ分析)戦略
モデルトレーニング前にデータを深く理解することが重要です。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
def eda_overview(df):
"""Print comprehensive overview of a dataframe"""
print("=== Data Overview ===")
print(f"Shape: {df.shape}")
print(f"\nData Types:\n{df.dtypes}")
print(f"\nMissing Values:\n{df.isnull().sum()}")
print(f"\nMissing Ratios:\n{(df.isnull().mean() * 100).round(2)}")
print(f"\nNumeric Summary:\n{df.describe()}")
print(f"\nCategorical Feature Cardinality:")
for col in df.select_dtypes(include='object').columns:
print(f" {col}: {df[col].nunique()} unique values")
def plot_target_distribution(df, target_col):
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].hist(df[target_col], bins=50, color='steelblue', edgecolor='white')
axes[0].set_title(f'{target_col} Distribution')
stats.probplot(df[target_col].dropna(), dist="norm", plot=axes[1])
axes[1].set_title('Q-Q Plot (Normality Check)')
plt.tight_layout()
plt.show()
def detect_outliers_iqr(df, columns):
"""Detect outliers using IQR method"""
outlier_info = {}
for col in columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = df[(df[col] < lower) | (df[col] > upper)]
outlier_info[col] = {
'count': len(outliers),
'ratio': len(outliers) / len(df),
'lower_bound': lower,
'upper_bound': upper
}
return pd.DataFrame(outlier_info).T
欠損値処理戦略
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
def simple_imputation(df):
"""Median imputation for numeric, mode for categorical"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
num_imputer = SimpleImputer(strategy='median')
df[numeric_cols] = num_imputer.fit_transform(df[numeric_cols])
cat_cols = df.select_dtypes(include='object').columns
cat_imputer = SimpleImputer(strategy='most_frequent')
df[cat_cols] = cat_imputer.fit_transform(df[cat_cols])
return df
def knn_imputation(df, n_neighbors=5):
"""KNN-based imputation (effective for small datasets)"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
imputer = KNNImputer(n_neighbors=n_neighbors)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
return df
def mice_imputation(df, max_iter=10):
"""MICE: Multiple Imputation by Chained Equations"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
imputer = IterativeImputer(max_iter=max_iter, random_state=42)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
return df
def add_missing_indicators(df):
"""Add binary columns indicating where values were missing"""
cols_with_missing = df.columns[df.isnull().any()].tolist()
for col in cols_with_missing:
df[f'{col}_missing'] = df[col].isnull().astype(int)
return df
2. 決定木
ID3とCARTアルゴリズム
決定木はデータを再帰的に分割してツリー構造を構築します。2つの主要なアルゴリズムがあります:
ID3 (Iterative Dichotomiser 3):
- 分割基準として情報利得を使用
- カテゴリカル特徴量のみを処理(多方向分割)
- カテゴリカルターゲットのみ対応
CART (Classification and Regression Trees):
- 分類:ジニ不純度
- 回帰:平均二乗誤差
- 常に二項分割(2つの子ノード)
- scikit-learnで使用されるアルゴリズム
情報利得とジニ不純度
エントロピーと情報利得:
エントロピーはデータセットの不純度を測定します。k個のクラスの場合:
H(S) = -sum(p_i * log2(p_i))
情報利得は分割後のエントロピー減少量です:
IG(S, A) = H(S) - sum(|S_v|/|S| * H(S_v))
ジニ不純度:
Gini(S) = 1 - sum(p_i^2)
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.tree import export_text, plot_tree
import matplotlib.pyplot as plt
def train_decision_tree(X_train, y_train, task='classification'):
if task == 'classification':
model = DecisionTreeClassifier(
criterion='gini', # 'gini' or 'entropy'
max_depth=5,
min_samples_split=10,
min_samples_leaf=5,
max_features=None,
random_state=42
)
else:
model = DecisionTreeRegressor(
criterion='squared_error',
max_depth=5,
min_samples_split=10,
min_samples_leaf=5,
random_state=42
)
model.fit(X_train, y_train)
return model
def visualize_tree(model, feature_names, class_names=None, max_depth=3):
plt.figure(figsize=(20, 10))
plot_tree(
model, feature_names=feature_names, class_names=class_names,
filled=True, rounded=True, max_depth=max_depth, fontsize=10
)
plt.title('Decision Tree Visualization')
plt.tight_layout()
plt.show()
print(export_text(model, feature_names=feature_names, max_depth=3))
3. ランダムフォレスト
バギングと特徴量ランダム化
ランダムフォレストはブートストラップ集約(バギング)と特徴量ランダム化を組み合わせます:
- トレーニングデータからブートストラップサンプル(復元抽出)を生成
- 各サンプルで独立した決定木を訓練
- 各分割で sqrt(n_features) 個の特徴量のみをランダムに選択して検討
- 予測を平均(回帰)または多数決(分類)で集約
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.inspection import permutation_importance
def train_random_forest(X_train, y_train, task='classification'):
if task == 'classification':
model = RandomForestClassifier(
n_estimators=300,
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
max_features='sqrt',
bootstrap=True,
oob_score=True,
n_jobs=-1,
random_state=42
)
else:
model = RandomForestRegressor(
n_estimators=300,
max_features='sqrt',
oob_score=True,
n_jobs=-1,
random_state=42
)
model.fit(X_train, y_train)
print(f"OOB Score: {model.oob_score_:.4f}")
return model
def plot_feature_importance(model, feature_names, top_n=20):
importances = pd.Series(
model.feature_importances_, index=feature_names
).sort_values(ascending=False)
plt.figure(figsize=(10, 8))
importances.head(top_n).plot(kind='barh', color='steelblue')
plt.title(f'Random Forest Feature Importance (Top {top_n})')
plt.xlabel('Importance')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()
return importances
def compute_permutation_importance(model, X_val, y_val, feature_names):
result = permutation_importance(
model, X_val, y_val, n_repeats=10, random_state=42, n_jobs=-1
)
return pd.DataFrame({
'feature': feature_names,
'importance_mean': result.importances_mean,
'importance_std': result.importances_std
}).sort_values('importance_mean', ascending=False)
4. 勾配ブースティング
数学的原理
勾配ブースティングは負の勾配の方向にツリーを順番に追加して損失関数を最小化します:
F_m(x) = F_{m-1}(x) + learning_rate * h_m(x)
ここでは、ステージmのツリーが疑似残差にフィットされます:
r_i = -[dL(y_i, F(x_i)) / dF(x_i)]
MSE損失を使った回帰では、疑似残差はターゲット値から現在のアンサンブル予測を引いた通常の残差になります。 対数損失を使ったバイナリ分類では、疑似残差はターゲット値から現在のアンサンブルスコアにシグモイドを適用した値を引いたものになります。
from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor
def train_sklearn_gbm(X_train, y_train, task='classification'):
if task == 'classification':
model = GradientBoostingClassifier(
n_estimators=200, learning_rate=0.1, max_depth=3,
subsample=0.8, max_features='sqrt', random_state=42
)
else:
model = GradientBoostingRegressor(
n_estimators=200, learning_rate=0.1, max_depth=3,
subsample=0.8, random_state=42
)
model.fit(X_train, y_train)
return model
5. XGBoost
XGBoostのイノベーション
XGBoost(eXtreme Gradient Boosting)はChen & Guestrin(2016年)が開発し、バニラGBMに対して大幅な改善をもたらします:
- 正則化:過学習を防ぐためL1とL2正則化を目的関数に追加
- 2次テイラー展開:1次と2次の導関数を使ったより正確なツリー構造探索
- 欠損値学習:各分割で欠損サンプルの最適なデフォルト方向を学習
- 並列処理:並列特徴分割探索(ツリー構築自体は逐次的)
- キャッシュ最適化:速度のためのメモリアクセスパターン最適化
- ブロック構造:スパースデータのための圧縮列ストレージ
完全ハイパーパラメータリファレンス
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
def train_xgboost_full(X, y, task='binary'):
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42,
stratify=y if task == 'binary' else None
)
if task == 'binary':
objective = 'binary:logistic'
eval_metric = 'auc'
elif task == 'multiclass':
objective = 'multi:softprob'
eval_metric = 'mlogloss'
else:
objective = 'reg:squarederror'
eval_metric = 'rmse'
model = xgb.XGBClassifier(
# Tree parameters
n_estimators=1000,
max_depth=6, # Default: 6. Deeper = more complex
min_child_weight=1, # Minimum sum of instance weight in a leaf
gamma=0, # Minimum loss reduction for a split
max_delta_step=0, # Max delta step per tree weight (useful for imbalance)
# Sampling
subsample=0.8, # Row sampling ratio per tree
colsample_bytree=0.8, # Column sampling ratio per tree
colsample_bylevel=1.0, # Column sampling per level
colsample_bynode=1.0, # Column sampling per node
# Regularization
reg_alpha=0.1, # L1 regularization
reg_lambda=1.0, # L2 regularization (default: 1)
# Learning
learning_rate=0.01, # Step size shrinkage (eta)
scale_pos_weight=1, # For imbalanced: neg/pos ratio
# System
objective=objective,
eval_metric=eval_metric,
tree_method='hist', # 'hist' (fast), 'exact', 'approx', 'gpu_hist'
random_state=42,
n_jobs=-1,
)
model.fit(
X_train, y_train,
eval_set=[(X_train, y_train), (X_val, y_val)],
early_stopping_rounds=50,
verbose=100,
)
print(f"Best iteration: {model.best_iteration}")
print(f"Best score: {model.best_score:.4f}")
return model, X_val, y_val
def explain_with_shap(model, X_val, feature_names):
"""SHAP-based model explanation"""
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_val)
plt.figure(figsize=(10, 8))
shap.summary_plot(shap_values, X_val, feature_names=feature_names, show=False)
plt.tight_layout()
plt.show()
mean_abs_shap = pd.DataFrame({
'feature': feature_names,
'importance': np.abs(shap_values).mean(axis=0)
}).sort_values('importance', ascending=False)
return mean_abs_shap
GPU加速XGBoost
# GPU training (requires NVIDIA GPU)
model_gpu = xgb.XGBClassifier(
n_estimators=1000,
tree_method='gpu_hist', # GPU histogram method
predictor='gpu_predictor', # GPU prediction
device='cuda', # XGBoost 2.0+: use device='cuda'
random_state=42
)
6. LightGBM
リーフワイズとレベルワイズのツリー成長
LightGBM(Light Gradient Boosting Machine)はMicrosoftが開発し、速度とメモリ効率のための重要なイノベーションを導入します。
レベルワイズ(従来手法):
- 深さレベルごとに全リーフを同時に分割
- バランスの取れたツリーを生成
- 利得の少ないノードの計算を無駄にする
リーフワイズ(LightGBM):
- 常に最大の損失削減を持つ単一のリーフを分割
- 高度に不均衡なツリーが生成される可能性あり
- 同じ数のリーフでより低い損失を達成
- 過学習を防ぐために
max_depthが必要
GOSSとEFB
GOSS (Gradient-based One-Side Sampling):
- 大きな勾配を持つサンプル(高情報量)を全て保持
- 小さな勾配のサンプル(既によく学習済み)をランダムに削除
- 学習精度を保ちながらデータサイズを削減
EFB (Exclusive Feature Bundling):
- 相互に排他的なスパース特徴量をバンドル(同時に非ゼロにならないもの)
- 特徴量数を削減し、トレーニングを高速化(スパースデータに特に効果的)
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
import numpy as np
def train_lightgbm_cv(X, y, n_folds=5):
"""Train LightGBM with Stratified K-Fold CV"""
skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
oof_preds = np.zeros(len(X))
models = []
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
print(f"\n--- Fold {fold + 1}/{n_folds} ---")
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
model = lgb.LGBMClassifier(
n_estimators=2000,
num_leaves=31, # Main complexity parameter
max_depth=-1, # -1 means unlimited (control via num_leaves)
min_child_samples=20,
learning_rate=0.05,
subsample=0.8,
subsample_freq=1,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=0.1,
boosting_type='gbdt', # 'gbdt', 'dart', 'goss', 'rf'
random_state=42,
n_jobs=-1,
verbose=-1,
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
callbacks=[
lgb.early_stopping(stopping_rounds=100, verbose=True),
lgb.log_evaluation(period=200),
],
)
oof_preds[val_idx] = model.predict_proba(X_val)[:, 1]
models.append(model)
fold_auc = roc_auc_score(y_val, oof_preds[val_idx])
print(f"Fold {fold+1} AUC: {fold_auc:.4f}")
overall_auc = roc_auc_score(y, oof_preds)
print(f"\nOverall OOF AUC: {overall_auc:.4f}")
return models, oof_preds
def lgbm_with_categorical(X_train, y_train, X_val, y_val, cat_features):
"""LightGBM with native categorical feature handling"""
for col in cat_features:
X_train[col] = X_train[col].astype('category').cat.codes
X_val[col] = X_val[col].astype('category').cat.codes
train_data = lgb.Dataset(
X_train, y_train,
categorical_feature=cat_features,
free_raw_data=False
)
val_data = lgb.Dataset(X_val, y_val, reference=train_data)
params = {
'objective': 'binary', 'metric': 'auc',
'num_leaves': 31, 'learning_rate': 0.05, 'verbose': -1,
}
model = lgb.train(
params, train_data, num_boost_round=1000,
valid_sets=[val_data],
callbacks=[lgb.early_stopping(100), lgb.log_evaluation(100)],
)
return model
7. CatBoost
カテゴリカル特徴量の自動処理
CatBoost(Categorical Boosting)はYandexが開発し、カテゴリカル特徴量をネイティブに処理することに優れています。
CatBoostのカテゴリカルエンコーディング:
- ターゲット統計量(TS):ターゲット平均を使って各カテゴリをエンコード
- 順序付きターゲット統計量:データ順序を使ったリーク不要のTS
- ワンホットエンコーディング:低カーディナリティ特徴量に自動適用
順序付きブースティング
CatBoostのキーイノベーション、順序付きブースティングは、ターゲット統計量の計算時に生じる予測シフトバイアスを解決します:
- データをランダムな順序で並べる
- サンプル i の統計量はサンプル 0 から i-1 のみを使って計算
- 以前に見たサンプルのみを使用し、リークを排除
from catboost import CatBoostClassifier, CatBoostRegressor, Pool
def train_catboost(X_train, y_train, X_val, y_val, cat_features=None):
"""CatBoost training — can pass raw string categories directly"""
train_pool = Pool(data=X_train, label=y_train, cat_features=cat_features)
val_pool = Pool(data=X_val, label=y_val, cat_features=cat_features)
model = CatBoostClassifier(
iterations=1000,
learning_rate=0.05,
depth=6,
l2_leaf_reg=3.0,
subsample=0.8,
colsample_bylevel=0.8,
cat_features=cat_features,
one_hot_max_size=2,
boosting_type='Ordered',
bootstrap_type='Bayesian',
bagging_temperature=1.0,
loss_function='Logloss',
eval_metric='AUC',
task_type='CPU',
random_seed=42,
verbose=100,
use_best_model=True,
early_stopping_rounds=50,
)
model.fit(train_pool, eval_set=val_pool, plot=False)
print(f"Best iteration: {model.get_best_iteration()}")
print(f"Best score: {model.get_best_score()}")
return model
def compare_gbm_models(X, y):
"""Compare performance and speed across the three GBM frameworks"""
import time
from sklearn.model_selection import cross_val_score
results = {}
start = time.time()
xgb_model = xgb.XGBClassifier(
n_estimators=300, max_depth=6, learning_rate=0.1,
subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1
)
xgb_scores = cross_val_score(xgb_model, X, y, cv=5, scoring='roc_auc')
results['XGBoost'] = {
'mean_auc': xgb_scores.mean(), 'std_auc': xgb_scores.std(),
'time_sec': time.time() - start
}
start = time.time()
lgb_model = lgb.LGBMClassifier(
n_estimators=300, num_leaves=31, learning_rate=0.1,
subsample=0.8, colsample_bytree=0.8, random_state=42,
n_jobs=-1, verbose=-1
)
lgb_scores = cross_val_score(lgb_model, X, y, cv=5, scoring='roc_auc')
results['LightGBM'] = {
'mean_auc': lgb_scores.mean(), 'std_auc': lgb_scores.std(),
'time_sec': time.time() - start
}
start = time.time()
cb_model = CatBoostClassifier(
iterations=300, depth=6, learning_rate=0.1, random_seed=42, verbose=0
)
cb_scores = cross_val_score(cb_model, X, y, cv=5, scoring='roc_auc')
results['CatBoost'] = {
'mean_auc': cb_scores.mean(), 'std_auc': cb_scores.std(),
'time_sec': time.time() - start
}
result_df = pd.DataFrame(results).T
print("=== GBM Model Comparison ===")
print(result_df.to_string())
return result_df
XGBoost vs LightGBM vs CatBoost 比較:
| 側面 | XGBoost | LightGBM | CatBoost |
|---|---|---|---|
| ツリー成長 | レベルワイズ | リーフワイズ | 対称 |
| 速度 | 中程度 | 速い | 中程度-速い |
| メモリ | 中程度 | 低い | 中程度 |
| カテゴリカル | 手動 | 組み込み(限定) | 自動(最良) |
| GPUサポート | あり | あり | あり |
| ハイパーパラメータ | 多い | 多い | 少ない |
| 最適な用途 | 数値特徴量 | 大規模データセット | カテゴリカル特徴量 |
8. 特徴エンジニアリング
数値特徴量の変換
from sklearn.preprocessing import (
StandardScaler, MinMaxScaler, RobustScaler,
PowerTransformer, QuantileTransformer, KBinsDiscretizer
)
from scipy.stats import boxcox
import numpy as np
def transform_numeric_features(df, columns):
"""Apply various numeric transformations"""
transformed = df.copy()
for col in columns:
# 1. Log transform (right-skewed, positive values)
if (df[col] > 0).all():
transformed[f'{col}_log'] = np.log1p(df[col])
# 2. Square root transform
if (df[col] >= 0).all():
transformed[f'{col}_sqrt'] = np.sqrt(df[col])
# 3. Box-Cox transform (positive values only)
if (df[col] > 0).all():
transformed[f'{col}_boxcox'], _ = boxcox(df[col] + 1)
# 4. Yeo-Johnson transform (handles negative values)
pt = PowerTransformer(method='yeo-johnson')
transformed[f'{col}_yeojohnson'] = pt.fit_transform(df[[col]])
# 5. Quantile transform (maps to normal distribution)
qt = QuantileTransformer(output_distribution='normal', n_quantiles=1000)
transformed[f'{col}_quantile'] = qt.fit_transform(df[[col]])
return transformed
def create_bins(df, col, n_bins=10, strategy='quantile'):
"""Discretize continuous variable into bins"""
kbd = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy=strategy)
df[f'{col}_bin'] = kbd.fit_transform(df[[col]]).astype(int)
return df
カテゴリカルエンコーディング
from category_encoders import (
TargetEncoder, LeaveOneOutEncoder,
CatBoostEncoder, BinaryEncoder
)
def encode_categorical_features(X_train, X_val, y_train, cat_features):
"""Multiple categorical encoding strategies"""
results = {}
# Target Encoding (medium-high cardinality)
# Warning: can cause target leakage — only use inside CV folds
te = TargetEncoder(cols=cat_features, smoothing=1.0)
results['target_enc'] = (
te.fit_transform(X_train, y_train),
te.transform(X_val)
)
# Leave-One-Out Encoding (prevents target leakage)
loo = LeaveOneOutEncoder(cols=cat_features, sigma=0.05)
results['loo_enc'] = (
loo.fit_transform(X_train, y_train),
loo.transform(X_val)
)
# CatBoost Encoding (Ordered Target Statistics)
cbe = CatBoostEncoder(cols=cat_features)
results['catboost_enc'] = (
cbe.fit_transform(X_train, y_train),
cbe.transform(X_val)
)
return results
def extract_datetime_features(df, date_col):
"""Extract rich datetime features"""
df[date_col] = pd.to_datetime(df[date_col])
df[f'{date_col}_year'] = df[date_col].dt.year
df[f'{date_col}_month'] = df[date_col].dt.month
df[f'{date_col}_day'] = df[date_col].dt.day
df[f'{date_col}_dayofweek'] = df[date_col].dt.dayofweek
df[f'{date_col}_dayofyear'] = df[date_col].dt.dayofyear
df[f'{date_col}_weekofyear'] = df[date_col].dt.isocalendar().week.astype(int)
df[f'{date_col}_quarter'] = df[date_col].dt.quarter
df[f'{date_col}_hour'] = df[date_col].dt.hour
df[f'{date_col}_is_weekend'] = (df[date_col].dt.dayofweek >= 5).astype(int)
df[f'{date_col}_is_month_end'] = df[date_col].dt.is_month_end.astype(int)
# Cyclical encoding (sin/cos for periodic features)
df[f'{date_col}_month_sin'] = np.sin(2 * np.pi * df[f'{date_col}_month'] / 12)
df[f'{date_col}_month_cos'] = np.cos(2 * np.pi * df[f'{date_col}_month'] / 12)
df[f'{date_col}_hour_sin'] = np.sin(2 * np.pi * df[f'{date_col}_hour'] / 24)
df[f'{date_col}_hour_cos'] = np.cos(2 * np.pi * df[f'{date_col}_hour'] / 24)
return df
def create_aggregate_features(df, group_cols, agg_cols):
"""Create group-level aggregation features"""
for group_col in group_cols:
for agg_col in agg_cols:
prefix = f'{agg_col}_by_{group_col}'
agg = df.groupby(group_col)[agg_col].agg([
'mean', 'std', 'min', 'max', 'median'
])
agg.columns = [
f'{prefix}_mean', f'{prefix}_std',
f'{prefix}_min', f'{prefix}_max', f'{prefix}_median'
]
df = df.join(agg, on=group_col)
return df
9. アンサンブル手法
スタッキング(メタ学習)
スタッキングはベースモデルからの予測を特徴量として使用し、メタモデルを訓練します。
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
import numpy as np
class StackingEnsemble:
"""Full stacking ensemble implementation"""
def __init__(self, base_models, meta_model, n_folds=5):
self.base_models = base_models
self.meta_model = meta_model
self.n_folds = n_folds
self.trained_base_models = []
def fit(self, X, y):
meta_features_train = np.zeros((len(X), len(self.base_models)))
skf = StratifiedKFold(n_splits=self.n_folds, shuffle=True, random_state=42)
for i, (name, model) in enumerate(self.base_models):
print(f"Training base model: {name}")
fold_models = []
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
if hasattr(X, 'iloc'):
X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
else:
X_tr, X_val = X[train_idx], X[val_idx]
y_tr, y_val = y[train_idx], y[val_idx]
model_clone = type(model)(**model.get_params())
model_clone.fit(X_tr, y_tr)
fold_models.append(model_clone)
if hasattr(model_clone, 'predict_proba'):
meta_features_train[val_idx, i] = model_clone.predict_proba(X_val)[:, 1]
else:
meta_features_train[val_idx, i] = model_clone.predict(X_val)
self.trained_base_models.append((name, fold_models))
self.meta_model.fit(meta_features_train, y)
return self
def predict_proba(self, X):
meta_features_test = np.zeros((len(X), len(self.base_models)))
for i, (name, fold_models) in enumerate(self.trained_base_models):
fold_preds = []
for model in fold_models:
if hasattr(model, 'predict_proba'):
fold_preds.append(model.predict_proba(X)[:, 1])
else:
fold_preds.append(model.predict(X))
meta_features_test[:, i] = np.mean(fold_preds, axis=0)
return self.meta_model.predict_proba(meta_features_test)
def weighted_blend(predictions, weights):
"""Weighted average blend of multiple model predictions"""
weights = np.array(weights) / sum(weights)
return sum(w * p for w, p in zip(weights, predictions))
10. TabNet(表形式データ向けディープラーニング)
TabNetアーキテクチャ
TabNetは2019年にGoogleが発表し、各ステップでどの特徴量に注目するかを動的に選択する逐次的アテンションメカニズムを使って表形式データにディープラーニングを適用します。
主要コンポーネント:
- 特徴量トランスフォーマー:選択された特徴量を処理する共有とステップ固有のレイヤー
- アテンション付きトランスフォーマー:各ステップのスパース特徴量マスクを生成
- 逐次ステップ:逐次特徴量選択の複数ステップ
- スパースアテンション:エントロピー正則化がスパースな特徴量選択を促進
from pytorch_tabnet.tab_model import TabNetClassifier, TabNetRegressor
import torch
def train_tabnet(X_train, y_train, X_val, y_val, cat_idxs=None, cat_dims=None):
"""Train TabNet with optional categorical embeddings"""
if cat_idxs is None:
cat_idxs = []
cat_dims = []
model = TabNetClassifier(
# Architecture
n_d=64, # Prediction layer dimension
n_a=64, # Attention embedding dimension
n_steps=5, # Number of sequential attention steps
gamma=1.3, # Feature reusage coefficient (1.0-2.0)
n_independent=2, # Number of independent GLU layers
n_shared=2, # Number of shared GLU layers
# Categorical embeddings
cat_idxs=cat_idxs,
cat_dims=cat_dims,
cat_emb_dim=1,
# Regularization
lambda_sparse=1e-3, # Sparsity regularization coefficient
momentum=0.02,
epsilon=1e-15,
# Optimizer
optimizer_fn=torch.optim.Adam,
optimizer_params=dict(lr=2e-2),
scheduler_params=dict(mode='min', patience=5, min_lr=1e-5, factor=0.9),
scheduler_fn=torch.optim.lr_scheduler.ReduceLROnPlateau,
mask_type='entmax', # 'sparsemax' or 'entmax'
verbose=10,
seed=42,
device_name='auto',
)
X_tr = X_train.values if hasattr(X_train, 'values') else X_train
y_tr = y_train.values if hasattr(y_train, 'values') else y_train
X_v = X_val.values if hasattr(X_val, 'values') else X_val
y_v = y_val.values if hasattr(y_val, 'values') else y_val
model.fit(
X_train=X_tr, y_train=y_tr,
eval_set=[(X_v, y_v)],
eval_name=['val'],
eval_metric=['auc'],
max_epochs=200,
patience=20,
batch_size=1024,
virtual_batch_size=128, # Ghost batch normalization size
num_workers=0,
drop_last=False,
)
return model
11. Kaggleレベルパイプライン
クロスバリデーション戦略
from sklearn.model_selection import (
StratifiedKFold, GroupKFold, StratifiedGroupKFold, TimeSeriesSplit
)
def kaggle_cv_pipeline(X, y, model, groups=None, n_folds=5):
"""Production-grade CV pipeline with leakage prevention"""
if groups is not None:
cv = StratifiedGroupKFold(n_splits=n_folds, shuffle=True, random_state=42)
split_args = (X, y, groups)
else:
cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)
split_args = (X, y)
oof_predictions = np.zeros(len(X))
feature_importances = []
for fold, (train_idx, val_idx) in enumerate(cv.split(*split_args)):
if hasattr(X, 'iloc'):
X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
else:
X_tr, X_val = X[train_idx], X[val_idx]
y_tr, y_val = y[train_idx], y[val_idx]
model.fit(X_tr, y_tr)
if hasattr(model, 'predict_proba'):
oof_predictions[val_idx] = model.predict_proba(X_val)[:, 1]
else:
oof_predictions[val_idx] = model.predict(X_val)
if hasattr(model, 'feature_importances_'):
feature_importances.append(model.feature_importances_)
fold_score = roc_auc_score(y_val, oof_predictions[val_idx])
print(f"Fold {fold+1} AUC: {fold_score:.5f}")
overall_score = roc_auc_score(y, oof_predictions)
print(f"Overall OOF AUC: {overall_score:.5f}")
mean_importance = np.mean(feature_importances, axis=0) if feature_importances else None
return oof_predictions, mean_importance, overall_score
def prevent_leakage_pipeline(X_train, X_val):
"""
Leakage-free preprocessing pipeline.
Critical rule: fit only on training data, transform both train and val.
"""
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
])
X_train_processed = pipeline.fit_transform(X_train)
X_val_processed = pipeline.transform(X_val)
return X_train_processed, X_val_processed, pipeline
12. 特徴量選択
Borutaアルゴリズム
BorutaはロバストなランダムフォレストベースのF特徴量選択アルゴリズムです。全特徴量のシャドウコピーを作成し、実際の特徴重要度を最良のシャドウ特徴と比較します。
from boruta import BorutaPy
from sklearn.ensemble import RandomForestClassifier
def boruta_feature_selection(X, y, max_iter=100):
rf = RandomForestClassifier(
n_estimators=200, max_depth=7, random_state=42, n_jobs=-1
)
boruta = BorutaPy(
estimator=rf, n_estimators='auto',
perc=100, alpha=0.05, max_iter=max_iter,
random_state=42, verbose=1
)
boruta.fit(X.values, y.values)
feature_ranking = pd.DataFrame({
'feature': X.columns,
'ranking': boruta.ranking_,
'selected': boruta.support_,
'tentative': boruta.support_weak_
}).sort_values('ranking')
selected = X.columns[boruta.support_].tolist()
tentative = X.columns[boruta.support_weak_].tolist()
print(f"Selected: {len(selected)}, Tentative: {len(tentative)}, "
f"Rejected: {len(X.columns) - len(selected) - len(tentative)}")
return selected, tentative, feature_ranking
def shap_feature_selection(model, X_val, threshold=0.01):
"""Select features based on mean absolute SHAP values"""
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_val)
shap_abs = np.abs(shap_values[1] if isinstance(shap_values, list) else shap_values)
mean_shap = shap_abs.mean(axis=0)
total_shap = mean_shap.sum()
feature_importance = pd.DataFrame({
'feature': X_val.columns,
'mean_abs_shap': mean_shap,
'shap_ratio': mean_shap / total_shap
}).sort_values('mean_abs_shap', ascending=False)
selected = feature_importance[feature_importance['mean_abs_shap'] > threshold * total_shap]
print(f"SHAP selection: {len(selected)} / {len(X_val.columns)} features")
return selected['feature'].tolist(), feature_importance
完全なKaggleパイプライン例
def complete_kaggle_pipeline(train_df, test_df, target_col, cat_features=None):
"""End-to-end Kaggle ML pipeline with ensemble"""
y = train_df[target_col]
X = train_df.drop(columns=[target_col])
# Impute missing values
for col in X.select_dtypes(include=[np.number]).columns:
col_median = X[col].median()
X[col] = X[col].fillna(col_median)
test_df[col] = test_df[col].fillna(col_median)
if cat_features:
for col in cat_features:
X[col] = X[col].astype('category').cat.codes
test_df[col] = test_df[col].astype('category').cat.codes
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
xgb_oof = np.zeros(len(X))
lgb_oof = np.zeros(len(X))
cb_oof = np.zeros(len(X))
xgb_test = np.zeros(len(test_df))
lgb_test = np.zeros(len(test_df))
cb_test = np.zeros(len(test_df))
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
X_tr, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_tr, y_val = y.iloc[train_idx], y.iloc[val_idx]
# XGBoost
xgb_m = xgb.XGBClassifier(
n_estimators=1000, max_depth=6, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8, reg_alpha=0.1,
random_state=42, n_jobs=-1, eval_metric='auc'
)
xgb_m.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
early_stopping_rounds=50, verbose=False)
xgb_oof[val_idx] = xgb_m.predict_proba(X_val)[:, 1]
xgb_test += xgb_m.predict_proba(test_df)[:, 1] / 5
# LightGBM
lgb_m = lgb.LGBMClassifier(
n_estimators=1000, num_leaves=31, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8, random_state=42,
n_jobs=-1, verbose=-1
)
lgb_m.fit(X_tr, y_tr, eval_set=[(X_val, y_val)],
callbacks=[lgb.early_stopping(50, verbose=False),
lgb.log_evaluation(-1)])
lgb_oof[val_idx] = lgb_m.predict_proba(X_val)[:, 1]
lgb_test += lgb_m.predict_proba(test_df)[:, 1] / 5
# CatBoost
cb_m = CatBoostClassifier(
iterations=1000, depth=6, learning_rate=0.05,
random_seed=42, verbose=0, early_stopping_rounds=50
)
cb_m.fit(X_tr, y_tr, eval_set=(X_val, y_val))
cb_oof[val_idx] = cb_m.predict_proba(X_val)[:, 1]
cb_test += cb_m.predict_proba(test_df)[:, 1] / 5
print(f"XGBoost OOF AUC: {roc_auc_score(y, xgb_oof):.5f}")
print(f"LightGBM OOF AUC: {roc_auc_score(y, lgb_oof):.5f}")
print(f"CatBoost OOF AUC: {roc_auc_score(y, cb_oof):.5f}")
ensemble_oof = (xgb_oof + lgb_oof + cb_oof) / 3
ensemble_test = (xgb_test + lgb_test + cb_test) / 3
print(f"Ensemble OOF AUC: {roc_auc_score(y, ensemble_oof):.5f}")
return ensemble_test, ensemble_oof
まとめ
このガイドでは表形式データ機械学習の完全なパイプラインを解説しました:
- EDAと前処理:データを理解し、欠損値や外れ値を処理するための体系的なアプローチ
- ツリーベースモデル:決定木からランダムフォレスト、勾配ブースティングへの段階的な発展
- モダンなGBMフレームワーク:XGBoost、LightGBM、CatBoostの強み、弱み、ユースケース
- 特徴エンジニアリング:多様なテクニックでドメイン知識をコードに変換する
- アンサンブリング:複数のモデルを組み合わせてパフォーマンスを最大化する
- TabNet:表形式データ向けディープラーニングアプローチ
- Kaggleパイプライン:完全なプロダクションレベルのワークフロー
- 特徴量選択:無関係な特徴量を削除してパフォーマンスと解釈可能性を改善する
重要なポイント:
- 常に徹底的なEDAからスタートしてデータを理解する
- リークを防ぐためにCVを慎重に設計する
- アンサンブルはほぼ常に単一モデルを上回る
- SHAP値を使ってモデルを解釈し、ドメイン知識と組み合わせる
- LightGBMは速度に優れ、CatBoostはカテゴリカル処理に優れ、XGBoostは安定性に優れる
参考文献
- XGBoostドキュメント
- LightGBMドキュメント
- CatBoostドキュメント
- SHAP GitHub
- pytorch-tabnet
- Chen, T., & Guestrin, C. (2016). XGBoost: A scalable tree boosting system.
- Ke, G., et al. (2017). LightGBM: A highly efficient gradient boosting decision tree.
- Prokhorenkova, L., et al. (2018). CatBoost: unbiased boosting with categorical features.
- Arik, S. O., & Pfister, T. (2021). TabNet: Attentive Interpretable Tabular Learning.