- Authors
- Name
- 1. はじめに:本番モデルは静かに劣化する
- 2. ドリフトの種類:何が変わるのか
- 3. Evidently AI アーキテクチャと主要機能
- 4. Evidently AI 実践的な使い方
- 5. MLflowモデルレジストリとモニタリング連携
- 6. 自動再学習パイプラインの構築
- 7. モニタリングツール比較:Evidently vs NannyML vs WhyLabs vs Alibi Detect
- 8. Grafana/Prometheusダッシュボード構成
- 9. 運用上の注意事項
- 10. 障害事例と復旧手順
- 11. 本番モニタリングチェックリスト
- 12. 参考資料

1. はじめに:本番モデルは静かに劣化する
MLモデルの精度はデプロイの瞬間がピークである。その後は現実世界の変化に伴い、予測品質が段階的に低下していく。問題は、この劣化が明示的なエラーなしに進行することである。HTTP 500は発生せず、ログにCRITICALが出力されず、サービスは正常に応答する。ただレコメンドが徐々に的外れになり、不正検知が新しいパターンを見逃し、需要予測が現実と乖離し始めるだけである。
Googleの研究によると、本番MLシステムで発生する障害の60%以上がモデルコードではなくデータ関連の問題に起因する。モデル自体が壊れるのではなく、モデルが学習した世界と現実世界との間の乖離が拡大していくことが根本原因である。
本記事では、オープンソースモニタリングツールであるEvidently AIと実験/モデル管理プラットフォームMLflowを組み合わせて、本番環境でMLモデルの健全性を継続的に監視し、ドリフトを検知し、自動再学習をトリガーするパイプラインを構築する方法を解説する。
2. ドリフトの種類:何が変わるのか
ドリフト(Drift)とは、モデルが学習したデータ分布と実際のサービング時点のデータ分布との間の不一致を指す。ドリフトは発生箇所と性質によって大きく3つに分類される。
データドリフト(Data Drift、Covariate Shift)
入力特徴量の分布が変化する現象である。モデルの入力空間P(X)が時間とともに移動する。例えば、ECサイトのレコメンドモデルでユーザーの年齢層分布が変わったり、季節によって購入カテゴリの比率が変化するケースがこれに該当する。ターゲット変数Yと特徴量Xの関係P(Y|X)はそのまま維持された状態で、入力自体の統計的特性が変わるものである。
コンセプトドリフト(Concept Drift)
特徴量とターゲットの関係そのものが変化する現象である。P(Y|X)が変わる。データドリフトより深刻な問題であり、同一の入力に対して正解自体が変わるためである。コロナ禍において需要予測モデルが完全に無効化された事例、金融不正検知において犯罪者の手口が進化し既存パターンが有効でなくなった事例が代表的である。
予測ドリフト(Prediction Drift)
モデル出力P(Y_pred)の分布が変化する現象である。入力ドリフトの結果として現れることもあれば、モデル内部の問題により独立して発生することもある。分類モデルで特定クラスの予測比率が急に偏ったり、回帰モデルで予測値の平均や分散が大きく変化するケースが含まれる。
| ドリフト種類 | 変化対象 | 検知難易度 | 代表的な検知方法 | 再学習緊急度 |
|---|---|---|---|---|
| データドリフト | P(X) 入力分布 | 中程度 | PSI、KS検定、Wasserstein | 中程度 |
| コンセプトドリフト | P(Y|X) 関係 | 高い | 性能指標モニタリング、ADWIN | 高い |
| 予測ドリフト | P(Y_pred) 出力 | 低い | 出力分布統計、Chi-squared | 状況次第 |
| ラベルドリフト | P(Y) ターゲット分布 | 中程度 | ラベル分布比較 | 高い |
3. Evidently AI アーキテクチャと主要機能
Evidently AIはMLモデルモニタリングとデータ品質検証のためのオープンソースライブラリである。Pythonネイティブ環境で動作し、20種類以上の統計的ドリフト検知手法を内蔵している。
コアコンポーネント
- Report: 一回限りのデータ分析レポート。HTML、JSON、Python辞書形式で出力可能。探索的分析やデバッグに適している。
- Test Suite: 事前定義された条件に対する自動化された検証。CI/CDパイプラインに統合してデータ品質ゲートとして使用する。
- Metric: 個別の測定項目。DataDriftTable、DatasetSummaryMetric、ColumnCorrelationsMetricなど数十種類のメトリクスが標準提供される。
- Collector/Workspace: Evidentlyサーバーモード。モニタリング結果を時系列で保存し、ダッシュボードで閲覧する。
主要ドリフト検知アルゴリズム
Evidentlyは特徴量のタイプ(数値型/カテゴリ型)とデータセットサイズに応じて、最適な検知アルゴリズムを自動選択する。
| アルゴリズム | 対象 | 原理 | 長所 | 限界 |
|---|---|---|---|---|
| Kolmogorov-Smirnov (KS) | 数値型、小規模 | 累積分布関数の最大差 | 分布の仮定が不要 | 大規模データで過敏 |
| Population Stability Index (PSI) | 数値型/カテゴリ型 | 2つの分布のログ比率の加重和 | 業界標準、解釈が容易 | ビン(bin)設定に敏感 |
| Wasserstein Distance | 数値型 | 2つの分布間の最小移動コスト | 分布形状の差異を反映 | 計算コストが高い |
| Jensen-Shannon Divergence | 数値型/カテゴリ型 | KL Divergenceの対称版 | 常に有限値、対称的 | テール分布の変化に鈍感 |
| Chi-squared Test | カテゴリ型 | 観測/期待頻度の差 | カテゴリ型に直感的 | 低頻度カテゴリで不安定 |
| Z検定(比率検定) | カテゴリ型、大規模 | 比率差の標準化 | 大規模データに効率的 | 正規近似の前提 |
4. Evidently AI 実践的な使い方
インストールと基本設定
# Evidently AI インストール(MLflow連携含む)
# pip install evidently mlflow scikit-learn pandas
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
# リファレンス / 現在データの準備
data = load_iris(as_frame=True)
df = data.frame
df.columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "target"]
reference_data = df.sample(frac=0.5, random_state=42)
current_data = df.drop(reference_data.index)
# データドリフトのあるシミュレーションデータ生成
current_drifted = current_data.copy()
current_drifted["sepal_length"] = current_drifted["sepal_length"] + np.random.normal(2.0, 0.5, len(current_drifted))
current_drifted["petal_width"] = current_drifted["petal_width"] * 1.8
# ドリフトレポート生成
drift_report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
])
drift_report.run(
reference_data=reference_data,
current_data=current_drifted,
)
# 結果を辞書として抽出(プログラム的活用)
result = drift_report.as_dict()
dataset_drift = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
print(f"データセットドリフト検知: {dataset_drift}")
print(f"ドリフトカラム比率: {drift_share:.2%}")
# HTMLレポートとして保存
drift_report.save_html("drift_report.html")
Test Suiteを活用した自動化データ品質検証
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestNumberOfMissingValues,
TestShareOfOutRangeValues,
TestMeanInNSigmas,
)
# データドリフト + 品質テストスイートの構成
monitoring_suite = TestSuite(tests=[
# ドリフトテスト:全カラムの30%以上がドリフトした場合に失敗
TestShareOfDriftedColumns(lt=0.3),
# 個別の重要特徴量ドリフト検証
TestColumnDrift(column_name="sepal_length"),
TestColumnDrift(column_name="petal_width"),
# データ品質テスト
TestNumberOfMissingValues(eq=0),
# 値範囲検証:sepal_lengthがリファレンスデータ基準±3シグマ以内
TestMeanInNSigmas(column_name="sepal_length", n=3),
])
monitoring_suite.run(
reference_data=reference_data,
current_data=current_drifted,
)
# テスト結果をプログラム的に確認
suite_result = monitoring_suite.as_dict()
all_passed = all(
test["status"] == "SUCCESS"
for test in suite_result["tests"]
)
print(f"全テスト通過: {all_passed}")
for test in suite_result["tests"]:
status_icon = "PASS" if test["status"] == "SUCCESS" else "FAIL"
print(f" [{status_icon}] {test['name']}: {test['status']}")
# CI/CDパイプラインでexit codeとして活用
if not all_passed:
print("ALERT: データドリフトまたは品質異常を検知。再学習パイプラインのトリガーが必要。")
# sys.exit(1) # CIでビルド失敗処理
5. MLflowモデルレジストリとモニタリング連携
MLflowは実験追跡、モデルパッケージング、モデルレジストリ機能を提供する。Evidentlyのドリフト検知結果をMLflowに記録することで、モデルバージョンごとの性能履歴とドリフト状態を1つのプラットフォームで追跡できる。
ドリフトメトリクスをMLflowに記録する
import mlflow
from evidently.report import Report
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
import json
from datetime import datetime
# MLflowトラッキングサーバー設定
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("model-monitoring/fraud-detection-v2")
def log_drift_to_mlflow(
reference_data,
current_data,
model_name: str,
model_version: str,
batch_id: str,
):
"""ドリフト分析結果をMLflowに記録する関数"""
# Evidentlyドリフトレポート生成
drift_report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
])
drift_report.run(
reference_data=reference_data,
current_data=current_data,
)
result = drift_report.as_dict()
drift_result = result["metrics"][0]["result"]
# MLflow Runとして記録
with mlflow.start_run(run_name=f"drift-check-{batch_id}") as run:
# 基本ドリフトメトリクス
mlflow.log_metric("dataset_drift_detected", int(drift_result["dataset_drift"]))
mlflow.log_metric("drifted_columns_share", drift_result["share_of_drifted_columns"])
mlflow.log_metric("number_of_drifted_columns", drift_result["number_of_drifted_columns"])
mlflow.log_metric("total_columns", drift_result["number_of_columns"])
# 個別カラムドリフトスコアの記録
column_drift = result["metrics"][1]["result"]["drift_by_columns"]
for col_name, col_info in column_drift.items():
safe_col_name = col_name.replace(" ", "_").replace("/", "_")
mlflow.log_metric(
f"drift_score_{safe_col_name}",
col_info.get("drift_score", 0.0),
)
mlflow.log_metric(
f"drift_detected_{safe_col_name}",
int(col_info.get("column_drift", False)),
)
# タグとしてメタデータを記録
mlflow.set_tags({
"monitoring.type": "drift_detection",
"monitoring.model_name": model_name,
"monitoring.model_version": model_version,
"monitoring.batch_id": batch_id,
"monitoring.timestamp": datetime.utcnow().isoformat(),
"monitoring.reference_size": str(len(reference_data)),
"monitoring.current_size": str(len(current_data)),
})
# HTMLレポートをアーティファクトとして保存
report_path = f"/tmp/drift_report_{batch_id}.html"
drift_report.save_html(report_path)
mlflow.log_artifact(report_path, artifact_path="drift_reports")
# JSON結果もアーティファクトとして保存
json_path = f"/tmp/drift_result_{batch_id}.json"
with open(json_path, "w") as f:
json.dump(result, f, indent=2, default=str)
mlflow.log_artifact(json_path, artifact_path="drift_reports")
print(f"ドリフト結果をMLflowに記録完了。Run ID: {run.info.run_id}")
return drift_result["dataset_drift"], drift_result["share_of_drifted_columns"]
# 使用例
is_drifted, drift_share = log_drift_to_mlflow(
reference_data=reference_data,
current_data=current_drifted,
model_name="fraud-detector",
model_version="3",
batch_id="2026-03-06-batch-001",
)
エイリアス(Alias)ベースのモデルレジストリ管理
MLflow 2.x以降、従来のStage(Staging/Production/Archived)の代わりにエイリアスベースのモデル管理が推奨されている。ドリフト検知結果に基づいてモデルのエイリアスを自動的に切り替える戦略を適用できる。
from mlflow import MlflowClient
client = MlflowClient(tracking_uri="http://mlflow.internal:5000")
MODEL_NAME = "fraud-detector"
def handle_drift_detection(
is_drifted: bool,
drift_share: float,
model_name: str = MODEL_NAME,
drift_threshold_warn: float = 0.2,
drift_threshold_critical: float = 0.5,
):
"""ドリフト検知結果に基づくモデルレジストリアクションの実行"""
# 現在の本番モデルバージョンの確認
try:
prod_version = client.get_model_version_by_alias(model_name, "production")
current_version = prod_version.version
print(f"現在の本番モデルバージョン: {current_version}")
except Exception as e:
print(f"本番モデルエイリアスの取得に失敗: {e}")
return
if not is_drifted:
print("ドリフト未検知。現在のモデルを維持。")
client.set_model_version_tag(
model_name, current_version,
key="last_drift_check",
value="passed",
)
return
if drift_share >= drift_threshold_critical:
# 臨界ドリフト:即座にフォールバックモデルに切り替え + 再学習トリガー
print(f"CRITICAL: ドリフト比率 {drift_share:.1%} - フォールバックモデルへの切り替えと再学習トリガー")
client.set_model_version_tag(
model_name, current_version,
key="drift_status", value="critical",
)
# フォールバックモデルがあれば切り替え
try:
fallback = client.get_model_version_by_alias(model_name, "fallback")
client.set_registered_model_alias(model_name, "production", fallback.version)
print(f"フォールバックモデルバージョン {fallback.version} に切り替え完了")
except Exception:
print("WARNING: フォールバックモデルなし。現在のモデルを維持しつつ緊急再学習が必要。")
# 再学習トリガー(外部システム呼び出し)
trigger_retraining(model_name, reason="critical_drift")
elif drift_share >= drift_threshold_warn:
# 警告レベルドリフト:タグ記録 + 通知
print(f"WARNING: ドリフト比率 {drift_share:.1%} - モニタリング強化と再学習予約")
client.set_model_version_tag(
model_name, current_version,
key="drift_status", value="warning",
)
# スケジュール再学習キューに追加
schedule_retraining(model_name, priority="normal")
def trigger_retraining(model_name: str, reason: str):
"""緊急再学習トリガー(Airflow DAG、Kubeflow Pipelineなどの呼び出し)"""
print(f"再学習トリガー: model={model_name}, reason={reason}")
# requests.post("http://airflow.internal/api/v1/dags/retrain/dagRuns", ...)
def schedule_retraining(model_name: str, priority: str):
"""スケジュール再学習キューへの登録"""
print(f"再学習スケジュール登録: model={model_name}, priority={priority}")
# 実行
handle_drift_detection(
is_drifted=True,
drift_share=0.55,
model_name=MODEL_NAME,
)
6. 自動再学習パイプラインの構築
ドリフト検知から再学習までの自動化パイプラインは以下のステージで構成される。
パイプライン全体フロー
- スケジューラー: バッチ推論後または一定周期(日次/週次)でドリフトチェックをトリガー
- ドリフト分析器: Evidentlyでリファレンスデータと現在データを比較分析
- 判断エンジン: ドリフト閾値に基づいて再学習の必要性を判定
- 再学習オーケストレーター: Airflow/Kubeflowで学習ジョブを実行
- チャンピオン/チャレンジャー評価: 新モデルを既存モデルと比較評価
- デプロイゲート: 性能基準通過時に自動デプロイ、失敗時にロールバック
Airflow DAGとの連携パターン
# Airflow DAG 例:ドリフトチェック + 条件付き再学習
# dag_drift_monitor.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
default_args = {
"owner": "ml-platform",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=30),
}
dag = DAG(
dag_id="ml_drift_monitor_fraud_detection",
default_args=default_args,
description="日次ドリフトモニタリングと条件付き再学習",
schedule_interval="0 6 * * *", # 毎日午前6時
start_date=days_ago(1),
catchup=False,
tags=["ml-monitoring", "drift-detection"],
)
def fetch_data(**context):
"""リファレンスデータと直近24時間のサービングデータをロード"""
from sqlalchemy import create_engine
engine = create_engine("postgresql://reader:password@db.internal/features")
reference = pd.read_sql(
"SELECT * FROM fraud_features_reference", engine
)
current = pd.read_sql(
"""SELECT * FROM fraud_features_serving
WHERE created_at >= NOW() - INTERVAL '24 hours'""",
engine,
)
# XComでパスを転送(大容量はS3に保存)
ref_path = "/tmp/reference_data.parquet"
cur_path = "/tmp/current_data.parquet"
reference.to_parquet(ref_path)
current.to_parquet(cur_path)
context["ti"].xcom_push(key="reference_path", value=ref_path)
context["ti"].xcom_push(key="current_path", value=cur_path)
context["ti"].xcom_push(key="current_size", value=len(current))
def run_drift_check(**context):
"""Evidentlyドリフト分析の実行とMLflowへの記録"""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
import mlflow
ti = context["ti"]
ref_path = ti.xcom_pull(key="reference_path")
cur_path = ti.xcom_pull(key="current_path")
reference = pd.read_parquet(ref_path)
current = pd.read_parquet(cur_path)
# 最小サンプル数の検証
if len(current) < 100:
print(f"現在データのサンプル数不足: {len(current)}。ドリフトチェックをスキップ。")
ti.xcom_push(key="drift_action", value="skip")
return "skip_retraining"
report = Report(metrics=[DatasetDriftMetric(), DataDriftTable()])
report.run(reference_data=reference, current_data=current)
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
# MLflowに記録
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("monitoring/fraud-detection")
with mlflow.start_run(run_name=f"drift-{context['ds']}"):
mlflow.log_metric("drift_detected", int(drift_detected))
mlflow.log_metric("drift_share", drift_share)
ti.xcom_push(key="drift_detected", value=drift_detected)
ti.xcom_push(key="drift_share", value=drift_share)
def decide_action(**context):
"""ドリフトレベルに応じた再学習の判断"""
ti = context["ti"]
drift_detected = ti.xcom_pull(key="drift_detected")
drift_share = ti.xcom_pull(key="drift_share")
if drift_share is None or drift_share < 0.2:
return "skip_retraining"
elif drift_share >= 0.5:
return "trigger_emergency_retrain"
else:
return "trigger_scheduled_retrain"
fetch_task = PythonOperator(
task_id="fetch_data", python_callable=fetch_data, dag=dag,
)
drift_task = PythonOperator(
task_id="run_drift_check", python_callable=run_drift_check, dag=dag,
)
branch_task = BranchPythonOperator(
task_id="decide_action", python_callable=decide_action, dag=dag,
)
skip_task = EmptyOperator(task_id="skip_retraining", dag=dag)
scheduled_retrain = EmptyOperator(task_id="trigger_scheduled_retrain", dag=dag)
emergency_retrain = EmptyOperator(task_id="trigger_emergency_retrain", dag=dag)
fetch_task >> drift_task >> branch_task >> [skip_task, scheduled_retrain, emergency_retrain]
再学習トリガー閾値ガイドライン
| ドリフトレベル | drift_share範囲 | 推奨アクション | 対応時間 |
|---|---|---|---|
| 正常 | 0% ~ 15% | モニタリング維持 | - |
| 注意 | 15% ~ 30% | アラート送信、原因分析開始 | 48時間以内 |
| 警告 | 30% ~ 50% | スケジュール再学習キューに登録 | 24時間以内 |
| 危険 | 50%以上 | 即時再学習 + フォールバックモデル切替 | 即時 |
注意: 閾値はドメインとモデルの特性に応じて調整が必要である。金融不正検知のように見逃しコストが高いドメインではより低い閾値(10
20%)を、レコメンドシステムのように許容範囲が広いドメインでは高い閾値(3050%)を適用するのが適切である。
7. モニタリングツール比較:Evidently vs NannyML vs WhyLabs vs Alibi Detect
本番MLモニタリングツールには複数の選択肢がある。各ツールの強みと弱みを比較する。
| 基準 | Evidently AI | NannyML | WhyLabs | Alibi Detect |
|---|---|---|---|---|
| ライセンス | Apache 2.0(OSS) | BSD-3(OSS) | SaaS + 無料ティア | BSD-3(OSS) |
| コア強み | 汎用データ/モデルモニタリング | ラベルなし性能推定(CBPE) | リアルタイムストリーミングプロファイリング | 高度なドリフト検知アルゴリズム |
| ドリフト検知手法数 | 20+ | 10+ | 15+ | 15+ |
| ラベルなし性能推定 | 限定的 | コア機能(CBPE、DLE) | 非対応 | 非対応 |
| リアルタイムモニタリング | Collectorモード | 非対応(バッチ) | ネイティブサポート | 非対応(バッチ) |
| 可視化 | 内蔵HTML/ダッシュボード | 内蔵HTML | Webダッシュボード(SaaS) | 基本的な可視化 |
| CI/CD統合 | Test Suite(ネイティブ) | 限定的 | APIベース | 手動設定が必要 |
| Prometheus連携 | 公式サポート | カスタム構築が必要 | 内蔵 | カスタム構築が必要 |
| MLflow連携 | 容易(Pythonネイティブ) | 手動設定 | API連携 | 手動設定 |
| 学習コスト | 低い | 中程度 | 低い(SaaS) | 高い |
| 本番ユースケース | 汎用 | ラベル遅延環境 | 大規模リアルタイム | 研究/高度な検知 |
選択ガイド:
- ラベルを即座に取得できない環境(例:金融不正検知でラベル確定まで数か月かかる場合):NannyMLのCBPE(Confidence-Based Performance Estimation)ベースの性能推定が唯一の選択肢。
- オープンソース優先 + 迅速な導入:Evidently AIが最も幅広い機能範囲と低い導入難易度を提供。
- 大規模リアルタイムストリーミング:WhyLabsのデータプロファイリングが毎秒数万件の処理に最適化。
- 高度な統計的検知が必要な研究環境:Alibi Detectの深層カーネルMMD、Learned Kernelドリフト検知が適している。
8. Grafana/Prometheusダッシュボード構成
Evidentlyのモニタリング結果をPrometheusメトリクスとしてエクスポートし、Grafanaダッシュボードで時系列として可視化する構成を見ていく。
Prometheusメトリクスエクスポート
# prometheus_drift_exporter.py
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
import pandas as pd
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheusメトリクス定義
DRIFT_DETECTED = Gauge(
"ml_model_drift_detected",
"データセットドリフト検知ステータス (0/1)",
["model_name", "model_version"],
)
DRIFT_SHARE = Gauge(
"ml_model_drift_column_share",
"ドリフト検知カラムの比率",
["model_name", "model_version"],
)
COLUMN_DRIFT_SCORE = Gauge(
"ml_model_column_drift_score",
"個別カラムドリフトスコア",
["model_name", "model_version", "column_name"],
)
DRIFT_CHECK_TOTAL = Counter(
"ml_model_drift_checks_total",
"ドリフトチェック実行回数",
["model_name"],
)
DRIFT_CHECK_DURATION = Histogram(
"ml_model_drift_check_duration_seconds",
"ドリフトチェック所要時間",
["model_name"],
buckets=[0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
)
MODEL_NAME = "fraud-detector"
MODEL_VERSION = "3"
def run_periodic_drift_check(
reference_path: str,
current_query_fn,
interval_seconds: int = 300,
):
"""定期ドリフトチェックとPrometheusメトリクス更新"""
reference = pd.read_parquet(reference_path)
while True:
try:
start_time = time.time()
# 最新データのロード
current = current_query_fn()
if current is None or len(current) < 50:
logger.warning(f"現在データ不足: {len(current) if current is not None else 0}件")
time.sleep(interval_seconds)
continue
# 特徴量カラムのみフィルタリング(ターゲット、メタデータカラムを除外)
feature_cols = [c for c in reference.columns if c not in ["target", "id", "timestamp"]]
ref_features = reference[feature_cols]
cur_features = current[feature_cols]
# ドリフト分析
report = Report(metrics=[DatasetDriftMetric(), DataDriftTable()])
report.run(reference_data=ref_features, current_data=cur_features)
result = report.as_dict()
drift_result = result["metrics"][0]["result"]
column_results = result["metrics"][1]["result"]["drift_by_columns"]
# Prometheusメトリクス更新
DRIFT_DETECTED.labels(MODEL_NAME, MODEL_VERSION).set(
int(drift_result["dataset_drift"])
)
DRIFT_SHARE.labels(MODEL_NAME, MODEL_VERSION).set(
drift_result["share_of_drifted_columns"]
)
for col_name, col_info in column_results.items():
COLUMN_DRIFT_SCORE.labels(MODEL_NAME, MODEL_VERSION, col_name).set(
col_info.get("drift_score", 0.0)
)
DRIFT_CHECK_TOTAL.labels(MODEL_NAME).inc()
duration = time.time() - start_time
DRIFT_CHECK_DURATION.labels(MODEL_NAME).observe(duration)
logger.info(
f"ドリフトチェック完了: drift={drift_result['dataset_drift']}, "
f"share={drift_result['share_of_drifted_columns']:.2%}, "
f"duration={duration:.1f}s"
)
except Exception as e:
logger.error(f"ドリフトチェック失敗: {e}", exc_info=True)
time.sleep(interval_seconds)
if __name__ == "__main__":
# PrometheusメトリクスHTTPサーバー起動(ポート8000)
start_http_server(8000)
logger.info("Prometheusメトリクスエクスポーター起動(ポート8000)")
# 定期ドリフトチェック開始(5分間隔)
run_periodic_drift_check(
reference_path="/data/reference/fraud_features_v3.parquet",
current_query_fn=lambda: pd.read_parquet("/data/serving/latest_batch.parquet"),
interval_seconds=300,
)
Grafanaダッシュボード構成要素
Grafanaで以下のパネルを構成し、MLモデルの健全性を総合的にモニタリングする。
| パネル | メトリクス | 可視化タイプ | アラートルール |
|---|---|---|---|
| ドリフト状態 | ml_model_drift_detected | Stat(最新値) | 値が1の場合Criticalアラート |
| ドリフトカラム比率推移 | ml_model_drift_column_share | Time Series | 30%超過時にWarning |
| カラム別ドリフトスコア | ml_model_column_drift_score | Heatmap | 閾値超過カラムを強調 |
| チェック所要時間 | ml_model_drift_check_duration_seconds | Histogram | 60秒超過時にWarning |
| チェック実行回数 | rate(ml_model_drift_checks_total[1h]) | Time Series | 0の場合チェック停止アラート |
Alertmanagerアラートルール例
# prometheus-alerts.yaml
groups:
- name: ml_model_drift_alerts
rules:
- alert: MLModelDriftDetected
expr: ml_model_drift_detected == 1
for: 5m
labels:
severity: warning
team: ml-platform
annotations:
summary: 'MLモデルデータドリフト検知'
description: 'モデル {{ $labels.model_name }} v{{ $labels.model_version }} でデータドリフトが検知されました。ドリフトカラム比率: {{ $value }}'
- alert: MLModelCriticalDrift
expr: ml_model_drift_column_share > 0.5
for: 0m
labels:
severity: critical
team: ml-platform
annotations:
summary: 'MLモデル臨界ドリフト - 即時対応が必要'
description: 'モデル {{ $labels.model_name }} のドリフトカラム比率が {{ $value | humanizePercentage }} です。即時再学習またはフォールバック切替が必要です。'
- alert: MLDriftCheckStalled
expr: rate(ml_model_drift_checks_total[1h]) == 0
for: 30m
labels:
severity: warning
team: ml-platform
annotations:
summary: 'MLドリフトチェック停止'
description: 'モデル {{ $labels.model_name }} のドリフトチェックが30分以上実行されていません。モニタリングパイプラインの点検が必要です。'
9. 運用上の注意事項
偽陽性(False Positive)ドリフト管理
統計的ドリフト検知の最も一般的な落とし穴は偽陽性である。特に以下の状況では、実際に問題がないにもかかわらずドリフトとして誤検知される可能性がある。
サンプルサイズ効果: 現在データのサンプル数が非常に大きい場合、KS検定やChi-squared検定は統計的には有意だが実質的には無意味な差もドリフトとして検知する。PSIやWasserstein距離のような効果量(effect size)ベースの指標を併用して、実質的な有意性を検証する必要がある。
季節性(Seasonality): ECサイトにおけるブラックフライデー期間の購買パターンは通常とは明らかに異なる。これをドリフトとして検知すると、毎年同じ時期に不要なアラートが大量発生する。リファレンスデータを同時期の過去データに設定するか、季節調整ロジックを適用する必要がある。
特徴量間の相関: 個別特徴量単位のドリフト検知だけでは多変量分布の変化を捉えられない。特徴量AとBそれぞれの分布は類似しているが、A-B間の相関関係が変わっているケースがある。EvidentlyのDatasetDriftMetricはデータセット全体レベルの判定を提供するが、明示的な多変量検知が必要な場合はAlibi DetectのMMD(Maximum Mean Discrepancy)手法を検討すべきである。
リファレンスデータ管理戦略
リファレンスデータはドリフト検知の基準線である。不適切なリファレンスデータはすべての検知結果を無効化する。
| 戦略 | 説明 | 適した状況 | 注意点 |
|---|---|---|---|
| 学習データ固定 | モデル学習に使用したデータをリファレンスとして固定 | 安定したドメイン、変化が少ない環境 | 時間が経つとリファレンス自体が陳腐化 |
| スライディングウィンドウ | 直近N日/N週のデータでリファレンスを更新 | 漸進的変化が正常な環境 | 漸進的ドリフトを見逃すリスク |
| 再学習時点で更新 | モデル再学習の度にリファレンスを更新 | 定期再学習があるパイプライン | 再学習周期に依存 |
| 二重基準線 | 学習データ + 直近安定期間データの両方と比較 | 高精度が求められる環境 | 管理の複雑度が増加 |
要点: リファレンスデータをバージョン管理し、モデルバージョンと1:1で紐づけて追跡可能にすることが重要である。MLflowアーティファクトとしてリファレンスデータのスナップショットを保存することを推奨する。
フィーチャーストアとの連携
オフライン学習時点とオンラインサービング時点の特徴量計算ロジックが異なると(Training-Serving Skew)、実際のドリフトではなく実装の不一致による偽ドリフトが発生する。Feastのようなフィーチャーストアを使用して学習/サービング間の特徴量一貫性を保証することが根本的な解決策である。
10. 障害事例と復旧手順
事例1:サイレントモデル性能劣化(Silent Model Degradation)
状況: ECサイトのレコメンドモデルが3か月間にわたり段階的に性能劣化。CTRが12%から7%に低下したが、ドリフトモニタリングが個別特徴量単位でのみ設定されていたため検知できず。
原因: ユーザー行動パターンの多変量変化。個別特徴量(閲覧数、滞在時間、カテゴリ比率)それぞれの分布は大きく変わらなかったが、特徴量間の相関関係が変化。特に「滞在時間-購入コンバージョン」の関係がショートフォームコンテンツ消費パターンの変化により弱体化。
復旧手順:
- 多変量ドリフト検知を追加(特徴量相関行列の比較)
- ビジネスKPI(CTR、コンバージョン率)を直接モニタリングするコンセプトドリフト監視レイヤーを追加
- 直近2週間のデータでモデルを再学習し、A/Bテストでデプロイ
- 再学習周期を月1回から週1回に短縮
教訓: データドリフトだけではコンセプトドリフトを捉えることは困難。ビジネスメトリクスのモニタリングを必ず並行して実施すべきである。
事例2:データパイプライン障害による偽ドリフト
状況: 金曜夜の深夜にドリフトCriticalアラートが大量発生。3つのモデルで同時に80%以上のカラムドリフトを検知。
原因: 上流データパイプラインのETLジョブが失敗し、サービング特徴量テーブルの一部カラムがデフォルト値(0またはnull)で埋められた。データ品質の問題がドリフトとして誤検知されたケース。
復旧手順:
- EvidentlyのTestSuiteに
TestNumberOfMissingValuesとTestShareOfOutRangeValuesをドリフトチェックの前段階に配置 - データ品質の検証失敗時にドリフトチェックをスキップし、別途データパイプラインアラートを送信
- 上流ETLにデータ完全性検証ゲートを追加
- ドリフトアラートに「直近のデータ品質チェック結果」の情報を含める
教訓: ドリフト検知パイプラインの前にデータ品質検証ステップを必ず配置すべきである。データ品質の問題と実際の分布変化を区別することが運用の要である。
事例3:リファレンスデータの汚染
状況: モデル再学習後、新しいリファレンスデータに更新。その後ドリフトがまったく検知されなくなり、モニタリングシステムが無意味に。
原因: 再学習に使用したデータ自体にすでにドリフトが含まれており、この汚染されたデータが新しいリファレンスとなった。結果としてドリフトが「正常」としてベースラインがリセットされた。
復旧手順:
- リファレンスデータ更新時に前回リファレンスとのドリフト比較を自動化
- ドリフト比率が一定水準以上の場合にリファレンス更新をブロックするゲートを追加
- リファレンスデータの変更履歴をMLflowアーティファクトとしてバージョン管理
- ゴールデンデータセット(手動検証済みの高品質データ)との比較を定期的に実施
教訓: リファレンスデータはモニタリングシステムの基準線であるため、変更時には必ず検証プロセスを経る必要がある。
11. 本番モニタリングチェックリスト
デプロイ前チェックリスト
- リファレンスデータがモデルバージョンとともにバージョン管理されているか
- Evidently Report/TestSuiteがデプロイパイプラインに統合されているか
- ドリフト閾値がドメイン特性に合わせて調整されているか
- データ品質検証がドリフト検知の前段階に配置されているか
- フォールバックモデルがレジストリに登録されているか
- Grafanaダッシュボードとアラートルールが設定されているか
運用中チェックリスト
- ドリフトチェックが正常な周期で実行されているか(モニターのモニター)
- 偽陽性率が管理可能なレベルか(月5件以内推奨)
- リファレンスデータが適切なタイミングで更新されているか
- 再学習トリガーが正常に動作し、チャンピオン/チャレンジャー評価が実施されているか
- ビジネスKPIとモデル性能指標が連動して追跡されているか
- アラート受信後の平均対応時間(MTTR)がSLA内にあるか
コンセプトドリフト対応チェックリスト
- ラベル取得パイプラインが構築されているか(遅延ラベルを含む)
- モデル性能指標(Accuracy、F1、AUC)の時系列推移をモニタリングしているか
- ラベルがない期間に対する代理指標(proxy metric)が定義されているか
- A/Bテストインフラが準備されているか
12. 参考資料
- Evidently AI - Data Drift 公式ガイド - データドリフトの概念、検知手法、実際の事例を含む総合ガイド。
- Evidently AI GitHub リポジトリ - オープンソースコード、サンプルノートブック、コミュニティディスカッション。
- MLflow Model Registry 公式ドキュメント - モデルレジストリAPI、エイリアスシステム、デプロイワークフローガイド。
- Evidently AI 公式ドキュメント - Report、TestSuite、Metricの完全なAPIリファレンスとチュートリアル。
- Advanced ML Model Monitoring: Drift Detection, Explainability, and Automated Retraining - ドリフト検知と自動再学習パイプラインの高度なパターン。
- Google - ML Technical Debt (Hidden Technical Debt in Machine Learning Systems) - MLシステムの技術的負債とモニタリングの必要性に関する基礎論文。
- NannyML - Estimating Model Performance without Ground Truth - ラベルなしでモデル性能を推定するCBPE手法。