Split View: ML 모델 모니터링과 드리프트 탐지: Evidently AI + MLflow 프로덕션 운영 가이드
ML 모델 모니터링과 드리프트 탐지: Evidently AI + MLflow 프로덕션 운영 가이드
- 1. 들어가며: 프로덕션 모델은 조용히 망가진다
- 2. 드리프트의 종류: 무엇이 변하는가
- 3. Evidently AI 아키텍처와 핵심 기능
- 4. Evidently AI 실전 사용법
- 5. MLflow 모델 레지스트리와 모니터링 연동
- 6. 자동 재학습 파이프라인 구축
- 7. 모니터링 도구 비교: Evidently vs NannyML vs WhyLabs vs Alibi Detect
- 8. Grafana/Prometheus 대시보드 구성
- 9. 운영 시 주의사항
- 10. 실패 사례와 복구 절차
- 11. 프로덕션 모니터링 체크리스트
- 12. 참고자료

1. 들어가며: 프로덕션 모델은 조용히 망가진다
ML 모델의 정확도는 배포 순간이 최고점이다. 그 이후로는 현실 세계의 변화에 따라 예측 품질이 점진적으로 하락한다. 문제는 이 열화가 명시적 에러 없이 진행된다는 것이다. HTTP 500이 발생하지 않고, 로그에 CRITICAL이 찍히지 않으며, 서비스는 정상적으로 응답한다. 단지 추천이 점점 엉뚱해지고, 사기 탐지가 새로운 패턴을 놓치고, 수요 예측이 현실과 괴리되기 시작한다.
Google의 연구에 따르면, 프로덕션 ML 시스템에서 발생하는 장애의 60% 이상이 모델 코드가 아닌 데이터 관련 이슈에서 기인한다. 모델 자체가 고장 나는 것이 아니라, 모델이 학습한 세계와 현실 세계 사이의 괴리가 커지는 것이 핵심 원인이다.
이 글에서는 오픈소스 모니터링 도구인 Evidently AI와 실험/모델 관리 플랫폼 MLflow를 조합하여, 프로덕션 환경에서 ML 모델의 건강 상태를 지속적으로 감시하고, 드리프트를 탐지하며, 자동 재학습을 트리거하는 파이프라인을 구축하는 방법을 다룬다.
2. 드리프트의 종류: 무엇이 변하는가
드리프트(Drift)는 모델이 학습한 데이터 분포와 실제 서빙 시점의 데이터 분포 사이의 불일치를 의미한다. 드리프트는 발생 위치와 성격에 따라 크게 세 가지로 분류된다.
데이터 드리프트 (Data Drift, Covariate Shift)
입력 피처의 분포가 변하는 현상이다. 모델의 입력 공간 P(X)가 시간에 따라 이동한다. 예를 들어, 전자상거래 추천 모델에서 사용자 연령대 분포가 바뀌거나, 계절에 따라 구매 카테고리 비중이 변하는 경우가 이에 해당한다. 타겟 변수 Y와 피처 X 사이의 관계 P(Y|X)는 그대로인 상태에서, 입력 자체의 통계적 특성이 달라지는 것이다.
컨셉 드리프트 (Concept Drift)
피처와 타겟 사이의 관계 자체가 변하는 현상이다. P(Y|X)가 변한다. 데이터 드리프트보다 심각한 문제인데, 동일한 입력에 대해 정답 자체가 달라지기 때문이다. 코로나19 팬데믹 시기에 수요 예측 모델이 완전히 무효화된 사례, 금융 사기 탐지에서 사기범의 수법이 진화하면서 기존 패턴이 더 이상 유효하지 않게 된 사례가 대표적이다.
예측 드리프트 (Prediction Drift)
모델 출력 P(Y_pred)의 분포가 변하는 현상이다. 입력 드리프트의 결과로 나타나기도 하고, 모델 내부 문제로 인해 독립적으로 발생하기도 한다. 분류 모델에서 특정 클래스의 예측 비율이 갑자기 치우치거나, 회귀 모델에서 예측값의 평균이나 분산이 크게 변하는 경우를 포함한다.
| 드리프트 유형 | 변화 대상 | 탐지 난이도 | 대표 탐지 방법 | 재학습 긴급도 |
|---|---|---|---|---|
| 데이터 드리프트 | P(X) 입력 분포 | 중간 | PSI, KS test, Wasserstein | 중간 |
| 컨셉 드리프트 | P(Y|X) 관계 | 높음 | 성능 지표 모니터링, ADWIN | 높음 |
| 예측 드리프트 | P(Y_pred) 출력 | 낮음 | 출력 분포 통계, Chi-squared | 상황별 |
| 라벨 드리프트 | P(Y) 타겟 분포 | 중간 | 라벨 분포 비교 | 높음 |
3. Evidently AI 아키텍처와 핵심 기능
Evidently AI는 ML 모델 모니터링과 데이터 품질 검증을 위한 오픈소스 라이브러리다. Python 네이티브 환경에서 동작하며, 20가지 이상의 통계적 드리프트 탐지 방법을 내장하고 있다.
핵심 구성 요소
- Report: 일회성 데이터 분석 보고서. HTML, JSON, Python 딕셔너리 형태로 출력 가능. 탐색적 분석과 디버깅에 적합하다.
- Test Suite: 사전 정의된 조건에 대한 자동화된 검증. CI/CD 파이프라인에 통합하여 데이터 품질 게이트로 사용한다.
- Metric: 개별 측정 항목. DataDriftTable, DatasetSummaryMetric, ColumnCorrelationsMetric 등 수십 가지 메트릭이 기본 제공된다.
- Collector/Workspace: Evidently 서버 모드. 모니터링 결과를 시계열로 저장하고 대시보드에서 조회한다.
주요 드리프트 탐지 알고리즘
Evidently는 피처 타입(수치형/범주형)과 데이터셋 크기에 따라 자동으로 최적의 탐지 알고리즘을 선택한다.
| 알고리즘 | 적용 대상 | 원리 | 장점 | 한계 |
|---|---|---|---|---|
| Kolmogorov-Smirnov (KS) | 수치형, 소규모 | 누적분포함수 최대 차이 | 분포 가정 불필요 | 대규모 데이터에서 과민 |
| Population Stability Index (PSI) | 수치형/범주형 | 두 분포의 로그 비율 가중합 | 업계 표준, 해석 용이 | 빈(bin) 설정에 민감 |
| Wasserstein Distance | 수치형 | 두 분포 간 최소 이동 비용 | 분포 형태 차이 반영 | 계산 비용 높음 |
| Jensen-Shannon Divergence | 수치형/범주형 | KL Divergence의 대칭 버전 | 항상 유한값, 대칭적 | 꼬리 분포 변화에 둔감 |
| Chi-squared Test | 범주형 | 관측/기대 빈도 차이 | 범주형에 직관적 | 저빈도 범주에 불안정 |
| Z-test (비율 검정) | 범주형, 대규모 | 비율 차이의 표준화 | 대규모 데이터에 효율적 | 정규 근사 전제 |
4. Evidently AI 실전 사용법
설치 및 기본 설정
# Evidently AI 설치 (MLflow 연동 포함)
# pip install evidently mlflow scikit-learn pandas
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
# 레퍼런스 / 현재 데이터 준비
data = load_iris(as_frame=True)
df = data.frame
df.columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "target"]
reference_data = df.sample(frac=0.5, random_state=42)
current_data = df.drop(reference_data.index)
# 데이터 드리프트가 있는 시뮬레이션 데이터 생성
current_drifted = current_data.copy()
current_drifted["sepal_length"] = current_drifted["sepal_length"] + np.random.normal(2.0, 0.5, len(current_drifted))
current_drifted["petal_width"] = current_drifted["petal_width"] * 1.8
# 드리프트 리포트 생성
drift_report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
])
drift_report.run(
reference_data=reference_data,
current_data=current_drifted,
)
# 결과를 딕셔너리로 추출 (프로그래밍적 활용)
result = drift_report.as_dict()
dataset_drift = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
print(f"데이터셋 드리프트 감지: {dataset_drift}")
print(f"드리프트 컬럼 비율: {drift_share:.2%}")
# HTML 보고서로 저장
drift_report.save_html("drift_report.html")
Test Suite를 활용한 자동화된 데이터 품질 검증
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestNumberOfMissingValues,
TestShareOfOutRangeValues,
TestMeanInNSigmas,
)
# 데이터 드리프트 + 품질 테스트 스위트 구성
monitoring_suite = TestSuite(tests=[
# 드리프트 테스트: 전체 컬럼 중 30% 이상 드리프트 시 실패
TestShareOfDriftedColumns(lt=0.3),
# 개별 핵심 피처 드리프트 검증
TestColumnDrift(column_name="sepal_length"),
TestColumnDrift(column_name="petal_width"),
# 데이터 품질 테스트
TestNumberOfMissingValues(eq=0),
# 값 범위 검증: sepal_length가 참조 데이터 기준 ±3 시그마 이내
TestMeanInNSigmas(column_name="sepal_length", n=3),
])
monitoring_suite.run(
reference_data=reference_data,
current_data=current_drifted,
)
# 테스트 결과 프로그래밍적으로 확인
suite_result = monitoring_suite.as_dict()
all_passed = all(
test["status"] == "SUCCESS"
for test in suite_result["tests"]
)
print(f"전체 테스트 통과 여부: {all_passed}")
for test in suite_result["tests"]:
status_icon = "PASS" if test["status"] == "SUCCESS" else "FAIL"
print(f" [{status_icon}] {test['name']}: {test['status']}")
# CI/CD 파이프라인에서 exit code로 활용
if not all_passed:
print("ALERT: 데이터 드리프트 또는 품질 이상 감지. 재학습 파이프라인 트리거 필요.")
# sys.exit(1) # CI에서 빌드 실패 처리
5. MLflow 모델 레지스트리와 모니터링 연동
MLflow는 실험 추적, 모델 패키징, 모델 레지스트리 기능을 제공한다. Evidently의 드리프트 탐지 결과를 MLflow에 기록하면, 모델 버전별 성능 이력과 드리프트 상태를 하나의 플랫폼에서 추적할 수 있다.
드리프트 메트릭을 MLflow에 기록하기
import mlflow
from evidently.report import Report
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
import json
from datetime import datetime
# MLflow 추적 서버 설정
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("model-monitoring/fraud-detection-v2")
def log_drift_to_mlflow(
reference_data,
current_data,
model_name: str,
model_version: str,
batch_id: str,
):
"""드리프트 분석 결과를 MLflow에 기록하는 함수"""
# Evidently 드리프트 리포트 생성
drift_report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
])
drift_report.run(
reference_data=reference_data,
current_data=current_data,
)
result = drift_report.as_dict()
drift_result = result["metrics"][0]["result"]
# MLflow Run으로 기록
with mlflow.start_run(run_name=f"drift-check-{batch_id}") as run:
# 기본 드리프트 메트릭
mlflow.log_metric("dataset_drift_detected", int(drift_result["dataset_drift"]))
mlflow.log_metric("drifted_columns_share", drift_result["share_of_drifted_columns"])
mlflow.log_metric("number_of_drifted_columns", drift_result["number_of_drifted_columns"])
mlflow.log_metric("total_columns", drift_result["number_of_columns"])
# 개별 컬럼 드리프트 점수 기록
column_drift = result["metrics"][1]["result"]["drift_by_columns"]
for col_name, col_info in column_drift.items():
safe_col_name = col_name.replace(" ", "_").replace("/", "_")
mlflow.log_metric(
f"drift_score_{safe_col_name}",
col_info.get("drift_score", 0.0),
)
mlflow.log_metric(
f"drift_detected_{safe_col_name}",
int(col_info.get("column_drift", False)),
)
# 태그로 메타데이터 기록
mlflow.set_tags({
"monitoring.type": "drift_detection",
"monitoring.model_name": model_name,
"monitoring.model_version": model_version,
"monitoring.batch_id": batch_id,
"monitoring.timestamp": datetime.utcnow().isoformat(),
"monitoring.reference_size": str(len(reference_data)),
"monitoring.current_size": str(len(current_data)),
})
# HTML 리포트를 아티팩트로 저장
report_path = f"/tmp/drift_report_{batch_id}.html"
drift_report.save_html(report_path)
mlflow.log_artifact(report_path, artifact_path="drift_reports")
# JSON 결과도 아티팩트로 저장
json_path = f"/tmp/drift_result_{batch_id}.json"
with open(json_path, "w") as f:
json.dump(result, f, indent=2, default=str)
mlflow.log_artifact(json_path, artifact_path="drift_reports")
print(f"드리프트 결과 MLflow에 기록 완료. Run ID: {run.info.run_id}")
return drift_result["dataset_drift"], drift_result["share_of_drifted_columns"]
# 사용 예시
is_drifted, drift_share = log_drift_to_mlflow(
reference_data=reference_data,
current_data=current_drifted,
model_name="fraud-detector",
model_version="3",
batch_id="2026-03-06-batch-001",
)
모델 레지스트리 별칭(Alias) 기반 관리
MLflow 2.x부터 기존의 Stage(Staging/Production/Archived) 대신 Alias 기반 모델 관리를 권장한다. 드리프트 탐지 결과에 따라 모델 별칭을 자동으로 전환하는 전략을 적용할 수 있다.
from mlflow import MlflowClient
client = MlflowClient(tracking_uri="http://mlflow.internal:5000")
MODEL_NAME = "fraud-detector"
def handle_drift_detection(
is_drifted: bool,
drift_share: float,
model_name: str = MODEL_NAME,
drift_threshold_warn: float = 0.2,
drift_threshold_critical: float = 0.5,
):
"""드리프트 탐지 결과에 따른 모델 레지스트리 액션 수행"""
# 현재 프로덕션 모델 버전 확인
try:
prod_version = client.get_model_version_by_alias(model_name, "production")
current_version = prod_version.version
print(f"현재 프로덕션 모델 버전: {current_version}")
except Exception as e:
print(f"프로덕션 모델 별칭 조회 실패: {e}")
return
if not is_drifted:
print("드리프트 미감지. 현재 모델 유지.")
client.set_model_version_tag(
model_name, current_version,
key="last_drift_check",
value="passed",
)
return
if drift_share >= drift_threshold_critical:
# 임계 드리프트: 즉시 폴백 모델로 전환 + 재학습 트리거
print(f"CRITICAL: 드리프트 비율 {drift_share:.1%} - 폴백 모델 전환 및 재학습 트리거")
client.set_model_version_tag(
model_name, current_version,
key="drift_status", value="critical",
)
# 폴백 모델이 있으면 전환
try:
fallback = client.get_model_version_by_alias(model_name, "fallback")
client.set_registered_model_alias(model_name, "production", fallback.version)
print(f"폴백 모델 버전 {fallback.version}으로 전환 완료")
except Exception:
print("WARNING: 폴백 모델이 없음. 현재 모델 유지하면서 긴급 재학습 필요.")
# 재학습 트리거 (외부 시스템 호출)
trigger_retraining(model_name, reason="critical_drift")
elif drift_share >= drift_threshold_warn:
# 경고 수준 드리프트: 태그 기록 + 알림
print(f"WARNING: 드리프트 비율 {drift_share:.1%} - 모니터링 강화 및 재학습 예약")
client.set_model_version_tag(
model_name, current_version,
key="drift_status", value="warning",
)
# 스케줄된 재학습 큐에 추가
schedule_retraining(model_name, priority="normal")
def trigger_retraining(model_name: str, reason: str):
"""긴급 재학습 트리거 (Airflow DAG, Kubeflow Pipeline 등 호출)"""
print(f"재학습 트리거: model={model_name}, reason={reason}")
# requests.post("http://airflow.internal/api/v1/dags/retrain/dagRuns", ...)
def schedule_retraining(model_name: str, priority: str):
"""스케줄된 재학습 큐에 등록"""
print(f"재학습 스케줄 등록: model={model_name}, priority={priority}")
# 실행
handle_drift_detection(
is_drifted=True,
drift_share=0.55,
model_name=MODEL_NAME,
)
6. 자동 재학습 파이프라인 구축
드리프트 탐지에서 재학습까지의 자동화된 파이프라인은 다음 단계로 구성된다.
전체 파이프라인 흐름
- 스케줄러: 배치 추론 후 또는 일정 주기(일/주)로 드리프트 체크 트리거
- 드리프트 분석기: Evidently로 레퍼런스 데이터 대비 현재 데이터 분석
- 판단 엔진: 드리프트 임계치 기반으로 재학습 필요 여부 결정
- 재학습 오케스트레이터: Airflow/Kubeflow에서 학습 Job 실행
- 챔피언/챌린저 평가: 신규 모델을 기존 모델과 비교 평가
- 배포 게이트: 성능 기준 통과 시 자동 배포, 실패 시 롤백
Airflow DAG와의 연동 패턴
# Airflow DAG 예시: 드리프트 체크 + 조건부 재학습
# dag_drift_monitor.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
default_args = {
"owner": "ml-platform",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=30),
}
dag = DAG(
dag_id="ml_drift_monitor_fraud_detection",
default_args=default_args,
description="일일 드리프트 모니터링 및 조건부 재학습",
schedule_interval="0 6 * * *", # 매일 오전 6시
start_date=days_ago(1),
catchup=False,
tags=["ml-monitoring", "drift-detection"],
)
def fetch_data(**context):
"""레퍼런스 데이터와 최근 24시간 서빙 데이터 로드"""
from sqlalchemy import create_engine
engine = create_engine("postgresql://reader:password@db.internal/features")
reference = pd.read_sql(
"SELECT * FROM fraud_features_reference", engine
)
current = pd.read_sql(
"""SELECT * FROM fraud_features_serving
WHERE created_at >= NOW() - INTERVAL '24 hours'""",
engine,
)
# XCom으로 경로 전달 (대용량은 S3에 저장)
ref_path = "/tmp/reference_data.parquet"
cur_path = "/tmp/current_data.parquet"
reference.to_parquet(ref_path)
current.to_parquet(cur_path)
context["ti"].xcom_push(key="reference_path", value=ref_path)
context["ti"].xcom_push(key="current_path", value=cur_path)
context["ti"].xcom_push(key="current_size", value=len(current))
def run_drift_check(**context):
"""Evidently 드리프트 분석 실행 및 MLflow 기록"""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
import mlflow
ti = context["ti"]
ref_path = ti.xcom_pull(key="reference_path")
cur_path = ti.xcom_pull(key="current_path")
reference = pd.read_parquet(ref_path)
current = pd.read_parquet(cur_path)
# 최소 샘플 수 검증
if len(current) < 100:
print(f"현재 데이터 샘플 수 부족: {len(current)}. 드리프트 체크 건너뜀.")
ti.xcom_push(key="drift_action", value="skip")
return "skip_retraining"
report = Report(metrics=[DatasetDriftMetric(), DataDriftTable()])
report.run(reference_data=reference, current_data=current)
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
# MLflow에 기록
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("monitoring/fraud-detection")
with mlflow.start_run(run_name=f"drift-{context['ds']}"):
mlflow.log_metric("drift_detected", int(drift_detected))
mlflow.log_metric("drift_share", drift_share)
ti.xcom_push(key="drift_detected", value=drift_detected)
ti.xcom_push(key="drift_share", value=drift_share)
def decide_action(**context):
"""드리프트 수준에 따라 재학습 여부 결정"""
ti = context["ti"]
drift_detected = ti.xcom_pull(key="drift_detected")
drift_share = ti.xcom_pull(key="drift_share")
if drift_share is None or drift_share < 0.2:
return "skip_retraining"
elif drift_share >= 0.5:
return "trigger_emergency_retrain"
else:
return "trigger_scheduled_retrain"
fetch_task = PythonOperator(
task_id="fetch_data", python_callable=fetch_data, dag=dag,
)
drift_task = PythonOperator(
task_id="run_drift_check", python_callable=run_drift_check, dag=dag,
)
branch_task = BranchPythonOperator(
task_id="decide_action", python_callable=decide_action, dag=dag,
)
skip_task = EmptyOperator(task_id="skip_retraining", dag=dag)
scheduled_retrain = EmptyOperator(task_id="trigger_scheduled_retrain", dag=dag)
emergency_retrain = EmptyOperator(task_id="trigger_emergency_retrain", dag=dag)
fetch_task >> drift_task >> branch_task >> [skip_task, scheduled_retrain, emergency_retrain]
재학습 트리거 임계치 가이드라인
| 드리프트 수준 | drift_share 범위 | 권장 액션 | 대응 시간 |
|---|---|---|---|
| 정상 | 0% ~ 15% | 모니터링 유지 | - |
| 주의 | 15% ~ 30% | 알림 발송, 원인 분석 시작 | 48시간 내 |
| 경고 | 30% ~ 50% | 스케줄 재학습 큐 등록 | 24시간 내 |
| 위험 | 50% 이상 | 즉시 재학습 + 폴백 모델 전환 | 즉시 |
주의: 임계치는 도메인과 모델 특성에 따라 조정해야 한다. 금융 사기 탐지처럼 미탐지 비용이 높은 도메인은 더 낮은 임계치(10
20%)를, 추천 시스템처럼 허용 범위가 넓은 도메인은 높은 임계치(3050%)를 적용하는 것이 적절하다.
7. 모니터링 도구 비교: Evidently vs NannyML vs WhyLabs vs Alibi Detect
프로덕션 ML 모니터링 도구는 여러 선택지가 있다. 각 도구의 강점과 약점을 비교한다.
| 기준 | Evidently AI | NannyML | WhyLabs | Alibi Detect |
|---|---|---|---|---|
| 라이선스 | Apache 2.0 (OSS) | BSD-3 (OSS) | SaaS + 무료 티어 | BSD-3 (OSS) |
| 핵심 강점 | 범용 데이터/모델 모니터링 | 라벨 없이 성능 추정 (CBPE) | 실시간 스트리밍 프로파일링 | 고급 드리프트 탐지 알고리즘 |
| 드리프트 탐지 방법 수 | 20+ | 10+ | 15+ | 15+ |
| 라벨 없는 성능 추정 | 제한적 | 핵심 기능 (CBPE, DLE) | 미지원 | 미지원 |
| 실시간 모니터링 | Collector 모드 | 미지원 (배치) | 네이티브 지원 | 미지원 (배치) |
| 시각화 | 내장 HTML/대시보드 | 내장 HTML | 웹 대시보드 (SaaS) | 기본 시각화 |
| CI/CD 통합 | Test Suite (네이티브) | 제한적 | API 기반 | 수동 구성 필요 |
| Prometheus 연동 | 공식 지원 | 커스텀 필요 | 내장 | 커스텀 필요 |
| MLflow 연동 | 쉬움 (Python 네이티브) | 수동 구성 | API 연동 | 수동 구성 |
| 학습 곡선 | 낮음 | 중간 | 낮음 (SaaS) | 높음 |
| 프로덕션 사용 사례 | 범용 | 라벨 지연 환경 | 대규모 실시간 | 연구/고급 탐지 |
선택 가이드:
- 라벨을 즉시 획득할 수 없는 환경 (예: 금융 사기 탐지에서 라벨 확정까지 수개월 소요): NannyML의 CBPE(Confidence-Based Performance Estimation) 기반 성능 추정이 유일한 선택지다.
- 오픈소스 우선 + 빠른 도입: Evidently AI가 가장 넓은 기능 범위와 낮은 도입 난이도를 제공한다.
- 대규모 실시간 스트리밍: WhyLabs의 데이터 프로파일링이 초당 수만 건 처리에 최적화되어 있다.
- 고급 통계 탐지가 필요한 연구 환경: Alibi Detect의 심층 커널 MMD, Learned Kernel 드리프트 탐지가 적합하다.
8. Grafana/Prometheus 대시보드 구성
Evidently의 모니터링 결과를 Prometheus 메트릭으로 노출하고, Grafana 대시보드에서 시계열로 시각화하는 구성을 살펴본다.
Prometheus 메트릭 익스포트
# prometheus_drift_exporter.py
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
import pandas as pd
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheus 메트릭 정의
DRIFT_DETECTED = Gauge(
"ml_model_drift_detected",
"데이터셋 드리프트 탐지 여부 (0/1)",
["model_name", "model_version"],
)
DRIFT_SHARE = Gauge(
"ml_model_drift_column_share",
"드리프트 감지된 컬럼 비율",
["model_name", "model_version"],
)
COLUMN_DRIFT_SCORE = Gauge(
"ml_model_column_drift_score",
"개별 컬럼 드리프트 점수",
["model_name", "model_version", "column_name"],
)
DRIFT_CHECK_TOTAL = Counter(
"ml_model_drift_checks_total",
"드리프트 체크 실행 횟수",
["model_name"],
)
DRIFT_CHECK_DURATION = Histogram(
"ml_model_drift_check_duration_seconds",
"드리프트 체크 소요 시간",
["model_name"],
buckets=[0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
)
MODEL_NAME = "fraud-detector"
MODEL_VERSION = "3"
def run_periodic_drift_check(
reference_path: str,
current_query_fn,
interval_seconds: int = 300,
):
"""주기적 드리프트 체크 및 Prometheus 메트릭 업데이트"""
reference = pd.read_parquet(reference_path)
while True:
try:
start_time = time.time()
# 최근 데이터 로드
current = current_query_fn()
if current is None or len(current) < 50:
logger.warning(f"현재 데이터 부족: {len(current) if current is not None else 0}건")
time.sleep(interval_seconds)
continue
# 피처 컬럼만 필터링 (타겟, 메타데이터 컬럼 제외)
feature_cols = [c for c in reference.columns if c not in ["target", "id", "timestamp"]]
ref_features = reference[feature_cols]
cur_features = current[feature_cols]
# 드리프트 분석
report = Report(metrics=[DatasetDriftMetric(), DataDriftTable()])
report.run(reference_data=ref_features, current_data=cur_features)
result = report.as_dict()
drift_result = result["metrics"][0]["result"]
column_results = result["metrics"][1]["result"]["drift_by_columns"]
# Prometheus 메트릭 업데이트
DRIFT_DETECTED.labels(MODEL_NAME, MODEL_VERSION).set(
int(drift_result["dataset_drift"])
)
DRIFT_SHARE.labels(MODEL_NAME, MODEL_VERSION).set(
drift_result["share_of_drifted_columns"]
)
for col_name, col_info in column_results.items():
COLUMN_DRIFT_SCORE.labels(MODEL_NAME, MODEL_VERSION, col_name).set(
col_info.get("drift_score", 0.0)
)
DRIFT_CHECK_TOTAL.labels(MODEL_NAME).inc()
duration = time.time() - start_time
DRIFT_CHECK_DURATION.labels(MODEL_NAME).observe(duration)
logger.info(
f"드리프트 체크 완료: drift={drift_result['dataset_drift']}, "
f"share={drift_result['share_of_drifted_columns']:.2%}, "
f"duration={duration:.1f}s"
)
except Exception as e:
logger.error(f"드리프트 체크 실패: {e}", exc_info=True)
time.sleep(interval_seconds)
if __name__ == "__main__":
# Prometheus 메트릭 HTTP 서버 시작 (포트 8000)
start_http_server(8000)
logger.info("Prometheus 메트릭 익스포터 시작 (포트 8000)")
# 주기적 드리프트 체크 시작 (5분 간격)
run_periodic_drift_check(
reference_path="/data/reference/fraud_features_v3.parquet",
current_query_fn=lambda: pd.read_parquet("/data/serving/latest_batch.parquet"),
interval_seconds=300,
)
Grafana 대시보드 구성 요소
Grafana에서 다음 패널들을 구성하여 ML 모델 건강 상태를 종합적으로 모니터링한다.
| 패널 | 메트릭 | 시각화 타입 | 알림 규칙 |
|---|---|---|---|
| 드리프트 상태 | ml_model_drift_detected | Stat (최신값) | 값이 1이면 Critical 알림 |
| 드리프트 컬럼 비율 추이 | ml_model_drift_column_share | Time Series | 30% 초과 시 Warning |
| 컬럼별 드리프트 점수 | ml_model_column_drift_score | Heatmap | 임계치 초과 컬럼 강조 |
| 체크 소요 시간 | ml_model_drift_check_duration_seconds | Histogram | 60초 초과 시 Warning |
| 체크 실행 횟수 | rate(ml_model_drift_checks_total[1h]) | Time Series | 0이면 체크 중단 알림 |
Alertmanager 알림 규칙 예시
# prometheus-alerts.yaml
groups:
- name: ml_model_drift_alerts
rules:
- alert: MLModelDriftDetected
expr: ml_model_drift_detected == 1
for: 5m
labels:
severity: warning
team: ml-platform
annotations:
summary: 'ML 모델 데이터 드리프트 감지'
description: '모델 {{ $labels.model_name }} v{{ $labels.model_version }}에서 데이터 드리프트가 감지되었습니다. 드리프트 컬럼 비율: {{ $value }}'
- alert: MLModelCriticalDrift
expr: ml_model_drift_column_share > 0.5
for: 0m
labels:
severity: critical
team: ml-platform
annotations:
summary: 'ML 모델 임계 드리프트 - 즉시 대응 필요'
description: '모델 {{ $labels.model_name }}의 드리프트 컬럼 비율이 {{ $value | humanizePercentage }}입니다. 즉시 재학습 또는 폴백 전환이 필요합니다.'
- alert: MLDriftCheckStalled
expr: rate(ml_model_drift_checks_total[1h]) == 0
for: 30m
labels:
severity: warning
team: ml-platform
annotations:
summary: 'ML 드리프트 체크 중단됨'
description: '모델 {{ $labels.model_name }}의 드리프트 체크가 30분 이상 실행되지 않았습니다. 모니터링 파이프라인 점검이 필요합니다.'
9. 운영 시 주의사항
거짓 양성(False Positive) 드리프트 관리
통계적 드리프트 탐지의 가장 흔한 함정은 거짓 양성이다. 특히 다음 상황에서 실제 문제가 없음에도 드리프트로 오탐될 수 있다.
샘플 크기 효과: 현재 데이터의 샘플 수가 매우 클 때, KS 검정이나 Chi-squared 검정은 통계적으로 유의미하지만 실질적으로 무의미한 차이도 드리프트로 탐지한다. PSI나 Wasserstein 거리처럼 효과 크기(effect size) 기반 지표를 병행하여 실질적 유의미성을 검증해야 한다.
계절성(Seasonality): 전자상거래에서 블랙프라이데이 기간의 구매 패턴은 평소와 확연히 다르다. 이를 드리프트로 탐지하면 매년 같은 시기에 불필요한 알림이 폭주한다. 레퍼런스 데이터를 동일 시기의 과거 데이터로 설정하거나, 계절 조정 로직을 적용해야 한다.
피처 간 상관관계: 개별 피처 단위의 드리프트 탐지만으로는 다변량 분포 변화를 포착하지 못한다. 피처 A와 B 각각의 분포는 유사하지만, A-B 사이의 상관관계가 바뀐 경우가 있다. Evidently의 DatasetDriftMetric은 전체 데이터셋 수준의 판단을 제공하지만, 명시적인 다변량 탐지가 필요하면 Alibi Detect의 MMD(Maximum Mean Discrepancy) 방법을 고려해야 한다.
레퍼런스 데이터 관리 전략
레퍼런스 데이터는 드리프트 탐지의 기준선이다. 잘못된 레퍼런스 데이터는 모든 탐지 결과를 무효화한다.
| 전략 | 설명 | 적합한 상황 | 주의점 |
|---|---|---|---|
| 학습 데이터 고정 | 모델 학습에 사용된 데이터를 레퍼런스로 고정 | 안정적 도메인, 변화가 적은 환경 | 시간이 지나면 레퍼런스 자체가 구식화 |
| 슬라이딩 윈도우 | 최근 N일/N주 데이터를 레퍼런스로 갱신 | 점진적 변화가 정상인 환경 | 점진적 드리프트를 놓칠 위험 |
| 재학습 시점 갱신 | 모델 재학습 시마다 레퍼런스 갱신 | 정기 재학습이 있는 파이프라인 | 재학습 주기에 종속 |
| 이중 기준선 | 학습 데이터 + 최근 안정 기간 데이터를 모두 비교 | 높은 정확도가 필요한 환경 | 관리 복잡도 증가 |
핵심: 레퍼런스 데이터를 버전 관리하고, 모델 버전과 1:1로 연결하여 추적 가능하게 유지해야 한다. MLflow 아티팩트로 레퍼런스 데이터 스냅샷을 저장하는 것을 권장한다.
피처 스토어와의 연동
오프라인 학습 시점과 온라인 서빙 시점의 피처 계산 로직이 다르면(Training-Serving Skew), 실제 드리프트가 아닌 구현 불일치로 인한 가짜 드리프트가 발생한다. Feast 같은 피처 스토어를 사용하여 학습/서빙 간 피처 일관성을 보장하는 것이 근본적 해결책이다.
10. 실패 사례와 복구 절차
사례 1: 조용한 모델 성능 저하 (Silent Model Degradation)
상황: 전자상거래 추천 모델이 3개월간 점진적으로 성능 저하. CTR이 12%에서 7%로 하락했지만, 드리프트 모니터링이 개별 피처 단위로만 설정되어 있어 탐지하지 못함.
원인: 사용자 행동 패턴의 다변량 변화. 개별 피처(조회 수, 체류 시간, 카테고리 비율) 각각의 분포는 크게 변하지 않았으나, 피처 간 상관관계가 변경됨. 특히 "체류 시간-구매 전환" 관계가 숏폼 콘텐츠 소비 패턴 변화로 약화됨.
복구 절차:
- 다변량 드리프트 탐지 추가 (피처 상관관계 매트릭스 비교)
- 비즈니스 KPI(CTR, 전환율)를 직접 모니터링하는 컨셉 드리프트 감시 레이어 추가
- 최근 2주 데이터로 모델 재학습 후 A/B 테스트 배포
- 재학습 주기를 월 1회에서 주 1회로 단축
교훈: 데이터 드리프트만으로는 컨셉 드리프트를 포착하기 어렵다. 비즈니스 메트릭 모니터링을 반드시 병행해야 한다.
사례 2: 데이터 파이프라인 장애로 인한 가짜 드리프트
상황: 금요일 밤 새벽에 드리프트 Critical 알림이 폭주. 3개 모델에서 동시에 80% 이상의 컬럼 드리프트 탐지.
원인: 업스트림 데이터 파이프라인의 ETL 작업이 실패하여, 서빙 피처 테이블의 일부 컬럼이 기본값(0 또는 null)으로 채워짐. 데이터 품질 이슈가 드리프트로 오탐된 사례.
복구 절차:
- Evidently TestSuite에
TestNumberOfMissingValues와TestShareOfOutRangeValues를 드리프트 체크 이전 단계에 배치 - 데이터 품질 실패 시 드리프트 체크를 스킵하고, 별도의 데이터 파이프라인 알림 발송
- 업스트림 ETL에 데이터 완전성 검증 게이트 추가
- 드리프트 알림에 "최근 데이터 품질 체크 결과" 정보를 포함
교훈: 드리프트 탐지 파이프라인 앞에 데이터 품질 검증 단계를 반드시 배치해야 한다. 데이터 품질 이슈와 실제 분포 변화를 구분하는 것이 운영의 핵심이다.
사례 3: 레퍼런스 데이터 오염
상황: 모델 재학습 후 새 레퍼런스 데이터로 갱신. 이후 드리프트가 전혀 탐지되지 않아 모니터링이 무용지물이 됨.
원인: 재학습에 사용된 데이터 자체에 이미 드리프트가 포함되어 있었고, 이 오염된 데이터가 새 레퍼런스가 됨. 결과적으로 드리프트가 "정상"으로 베이스라인이 재설정됨.
복구 절차:
- 레퍼런스 데이터 갱신 시 이전 레퍼런스와의 드리프트 비교 자동화
- 드리프트 비율이 일정 수준 이상이면 레퍼런스 갱신을 차단하는 게이트 추가
- 레퍼런스 데이터의 변경 이력을 MLflow 아티팩트로 버전 관리
- 골든 데이터셋(수동 검증된 고품질 데이터)과의 비교를 주기적으로 수행
교훈: 레퍼런스 데이터는 모니터링 시스템의 기준선이므로, 변경 시 반드시 검증 절차를 거쳐야 한다.
11. 프로덕션 모니터링 체크리스트
배포 전 체크리스트
- 레퍼런스 데이터가 모델 버전과 함께 버전 관리되고 있는가
- Evidently Report/TestSuite가 배포 파이프라인에 통합되어 있는가
- 드리프트 임계치가 도메인 특성에 맞게 조정되었는가
- 데이터 품질 검증이 드리프트 탐지 이전 단계에 배치되었는가
- 폴백 모델이 레지스트리에 등록되어 있는가
- Grafana 대시보드와 알림 규칙이 설정되어 있는가
운영 중 체크리스트
- 드리프트 체크가 정상 주기로 실행되고 있는가 (모니터의 모니터)
- 거짓 양성 비율이 관리 가능한 수준인가 (월 5건 이내 권장)
- 레퍼런스 데이터가 적절한 시점에 갱신되고 있는가
- 재학습 트리거가 정상 동작하고, 챔피언/챌린저 평가가 수행되는가
- 비즈니스 KPI와 모델 성능 지표가 함께 추적되고 있는가
- 알림 수신 후 평균 대응 시간(MTTR)이 SLA 내에 있는가
컨셉 드리프트 대응 체크리스트
- 라벨 획득 파이프라인이 구축되어 있는가 (지연 라벨 포함)
- 모델 성능 지표(Accuracy, F1, AUC)의 시계열 추이를 모니터링하고 있는가
- 라벨이 없는 구간에 대한 대리 지표(proxy metric)가 정의되어 있는가
- A/B 테스트 인프라가 준비되어 있는가
12. 참고자료
- Evidently AI - Data Drift 공식 가이드 - 데이터 드리프트의 개념, 탐지 방법론, 실제 사례를 포함한 종합 가이드.
- Evidently AI GitHub 저장소 - 오픈소스 코드, 예제 노트북, 커뮤니티 디스커션.
- MLflow Model Registry 공식 문서 - 모델 레지스트리 API, 별칭 시스템, 배포 워크플로우 가이드.
- Evidently AI 공식 문서 - Report, TestSuite, Metric 전체 API 레퍼런스와 튜토리얼.
- Advanced ML Model Monitoring: Drift Detection, Explainability, and Automated Retraining - 드리프트 탐지와 자동 재학습 파이프라인의 고급 패턴.
- Google - ML Technical Debt (Hidden Technical Debt in Machine Learning Systems) - ML 시스템의 기술 부채와 모니터링 필요성에 대한 원론적 논문.
- NannyML - Estimating Model Performance without Ground Truth - 라벨 없이 모델 성능을 추정하는 CBPE 방법론.
ML Model Monitoring and Drift Detection: Evidently AI + MLflow Production Operations Guide
- 1. Introduction: Production Models Silently Degrade
- 2. Types of Drift: What Changes
- 3. Evidently AI Architecture and Core Features
- 4. Evidently AI Practical Usage
- 5. MLflow Model Registry and Monitoring Integration
- 6. Building an Automatic Retraining Pipeline
- 7. Monitoring Tool Comparison: Evidently vs NannyML vs WhyLabs vs Alibi Detect
- 8. Grafana/Prometheus Dashboard Configuration
- 9. Operational Considerations
- 10. Failure Cases and Recovery Procedures
- 11. Production Monitoring Checklist
- 12. References
- Quiz

1. Introduction: Production Models Silently Degrade
An ML model's accuracy peaks at the moment of deployment. After that, prediction quality gradually declines as the real world changes. The problem is that this degradation progresses without explicit errors. No HTTP 500s are thrown, no CRITICAL logs appear, and the service responds normally. It's just that recommendations become increasingly irrelevant, fraud detection misses new patterns, and demand forecasts start diverging from reality.
According to Google's research, over 60% of failures in production ML systems originate from data-related issues, not model code. The model itself doesn't break -- rather, the gap between the world the model learned and the real world keeps growing.
This article covers how to combine the open-source monitoring tool Evidently AI with the experiment/model management platform MLflow to continuously monitor the health of ML models in production, detect drift, and trigger automatic retraining pipelines.
2. Types of Drift: What Changes
Drift refers to the discrepancy between the data distribution the model was trained on and the data distribution at the time of serving. Drift is broadly classified into three categories based on where and how it occurs.
Data Drift (Covariate Shift)
This is the phenomenon where the distribution of input features changes. The model's input space P(X) shifts over time. For example, in an e-commerce recommendation model, the age distribution of users changes, or the proportion of purchase categories shifts with seasons. The relationship P(Y|X) between the target variable Y and features X remains the same, but the statistical characteristics of the inputs themselves change.
Concept Drift
This is the phenomenon where the relationship between features and the target itself changes. P(Y|X) changes. This is a more serious problem than data drift because the correct answer itself changes for the same input. Representative examples include demand forecasting models becoming completely invalidated during the COVID-19 pandemic, and financial fraud detection where fraudsters' methods evolve, making existing patterns no longer valid.
Prediction Drift
This is the phenomenon where the distribution of model output P(Y_pred) changes. It can appear as a result of input drift or occur independently due to internal model issues. It includes cases where the prediction ratio for a specific class in a classification model suddenly skews, or the mean or variance of predicted values in a regression model changes significantly.
| Drift Type | What Changes | Detection Difficulty | Representative Detection Methods | Retraining Urgency |
|---|---|---|---|---|
| Data Drift | P(X) input distribution | Medium | PSI, KS test, Wasserstein | Medium |
| Concept Drift | P(Y|X) relationship | High | Performance metric monitoring, ADWIN | High |
| Prediction Drift | P(Y_pred) output | Low | Output distribution statistics, Chi-squared | Situational |
| Label Drift | P(Y) target distribution | Medium | Label distribution comparison | High |
3. Evidently AI Architecture and Core Features
Evidently AI is an open-source library for ML model monitoring and data quality validation. It operates in a Python-native environment and has over 20 built-in statistical drift detection methods.
Core Components
- Report: One-time data analysis report. Can be output as HTML, JSON, or Python dictionary format. Suitable for exploratory analysis and debugging.
- Test Suite: Automated validation against predefined conditions. Integrated into CI/CD pipelines as data quality gates.
- Metric: Individual measurement items. Dozens of metrics such as DataDriftTable, DatasetSummaryMetric, and ColumnCorrelationsMetric are provided out of the box.
- Collector/Workspace: Evidently server mode. Stores monitoring results as time series and queries them on dashboards.
Key Drift Detection Algorithms
Evidently automatically selects the optimal detection algorithm based on feature type (numerical/categorical) and dataset size.
| Algorithm | Target Type | Principle | Pros | Limitations |
|---|---|---|---|---|
| Kolmogorov-Smirnov (KS) | Numerical, small | Maximum difference in cumulative distribution | No distribution assumptions | Oversensitive with large data |
| Population Stability Index (PSI) | Numerical/Categorical | Weighted sum of log ratios of two distributions | Industry standard, easy to interpret | Sensitive to bin settings |
| Wasserstein Distance | Numerical | Minimum transport cost between two distributions | Reflects distribution shape differences | High computational cost |
| Jensen-Shannon Divergence | Numerical/Categorical | Symmetric version of KL Divergence | Always finite, symmetric | Insensitive to tail changes |
| Chi-squared Test | Categorical | Difference between observed/expected frequencies | Intuitive for categorical | Unstable with low-frequency categories |
| Z-test (Proportion test) | Categorical, large | Standardization of proportion differences | Efficient for large data | Assumes normal approximation |
4. Evidently AI Practical Usage
Installation and Basic Setup
# Evidently AI installation (including MLflow integration)
# pip install evidently mlflow scikit-learn pandas
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
# Prepare reference / current data
data = load_iris(as_frame=True)
df = data.frame
df.columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "target"]
reference_data = df.sample(frac=0.5, random_state=42)
current_data = df.drop(reference_data.index)
# Create simulated data with data drift
current_drifted = current_data.copy()
current_drifted["sepal_length"] = current_drifted["sepal_length"] + np.random.normal(2.0, 0.5, len(current_drifted))
current_drifted["petal_width"] = current_drifted["petal_width"] * 1.8
# Generate drift report
drift_report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
])
drift_report.run(
reference_data=reference_data,
current_data=current_drifted,
)
# Extract results as dictionary (for programmatic use)
result = drift_report.as_dict()
dataset_drift = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
print(f"Dataset drift detected: {dataset_drift}")
print(f"Drifted column ratio: {drift_share:.2%}")
# Save as HTML report
drift_report.save_html("drift_report.html")
Automated Data Quality Validation with Test Suite
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestNumberOfMissingValues,
TestShareOfOutRangeValues,
TestMeanInNSigmas,
)
# Configure data drift + quality test suite
monitoring_suite = TestSuite(tests=[
# Drift test: fail if 30% or more of columns drift
TestShareOfDriftedColumns(lt=0.3),
# Individual key feature drift validation
TestColumnDrift(column_name="sepal_length"),
TestColumnDrift(column_name="petal_width"),
# Data quality tests
TestNumberOfMissingValues(eq=0),
# Value range validation: sepal_length within +/- 3 sigma of reference data
TestMeanInNSigmas(column_name="sepal_length", n=3),
])
monitoring_suite.run(
reference_data=reference_data,
current_data=current_drifted,
)
# Programmatically check test results
suite_result = monitoring_suite.as_dict()
all_passed = all(
test["status"] == "SUCCESS"
for test in suite_result["tests"]
)
print(f"All tests passed: {all_passed}")
for test in suite_result["tests"]:
status_icon = "PASS" if test["status"] == "SUCCESS" else "FAIL"
print(f" [{status_icon}] {test['name']}: {test['status']}")
# Use as exit code in CI/CD pipelines
if not all_passed:
print("ALERT: Data drift or quality anomaly detected. Retraining pipeline trigger required.")
# sys.exit(1) # Fail the build in CI
5. MLflow Model Registry and Monitoring Integration
MLflow provides experiment tracking, model packaging, and model registry functionality. By recording Evidently's drift detection results in MLflow, you can track performance history and drift status per model version on a single platform.
Logging Drift Metrics to MLflow
import mlflow
from evidently.report import Report
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
import json
from datetime import datetime
# MLflow tracking server configuration
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("model-monitoring/fraud-detection-v2")
def log_drift_to_mlflow(
reference_data,
current_data,
model_name: str,
model_version: str,
batch_id: str,
):
"""Log drift analysis results to MLflow"""
# Generate Evidently drift report
drift_report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
])
drift_report.run(
reference_data=reference_data,
current_data=current_data,
)
result = drift_report.as_dict()
drift_result = result["metrics"][0]["result"]
# Record as MLflow Run
with mlflow.start_run(run_name=f"drift-check-{batch_id}") as run:
# Basic drift metrics
mlflow.log_metric("dataset_drift_detected", int(drift_result["dataset_drift"]))
mlflow.log_metric("drifted_columns_share", drift_result["share_of_drifted_columns"])
mlflow.log_metric("number_of_drifted_columns", drift_result["number_of_drifted_columns"])
mlflow.log_metric("total_columns", drift_result["number_of_columns"])
# Log individual column drift scores
column_drift = result["metrics"][1]["result"]["drift_by_columns"]
for col_name, col_info in column_drift.items():
safe_col_name = col_name.replace(" ", "_").replace("/", "_")
mlflow.log_metric(
f"drift_score_{safe_col_name}",
col_info.get("drift_score", 0.0),
)
mlflow.log_metric(
f"drift_detected_{safe_col_name}",
int(col_info.get("column_drift", False)),
)
# Record metadata as tags
mlflow.set_tags({
"monitoring.type": "drift_detection",
"monitoring.model_name": model_name,
"monitoring.model_version": model_version,
"monitoring.batch_id": batch_id,
"monitoring.timestamp": datetime.utcnow().isoformat(),
"monitoring.reference_size": str(len(reference_data)),
"monitoring.current_size": str(len(current_data)),
})
# Save HTML report as artifact
report_path = f"/tmp/drift_report_{batch_id}.html"
drift_report.save_html(report_path)
mlflow.log_artifact(report_path, artifact_path="drift_reports")
# Save JSON results as artifact
json_path = f"/tmp/drift_result_{batch_id}.json"
with open(json_path, "w") as f:
json.dump(result, f, indent=2, default=str)
mlflow.log_artifact(json_path, artifact_path="drift_reports")
print(f"Drift results logged to MLflow. Run ID: {run.info.run_id}")
return drift_result["dataset_drift"], drift_result["share_of_drifted_columns"]
# Usage example
is_drifted, drift_share = log_drift_to_mlflow(
reference_data=reference_data,
current_data=current_drifted,
model_name="fraud-detector",
model_version="3",
batch_id="2026-03-06-batch-001",
)
Alias-Based Model Registry Management
Starting with MLflow 2.x, alias-based model management is recommended over the traditional Stage system (Staging/Production/Archived). You can apply a strategy that automatically switches model aliases based on drift detection results.
from mlflow import MlflowClient
client = MlflowClient(tracking_uri="http://mlflow.internal:5000")
MODEL_NAME = "fraud-detector"
def handle_drift_detection(
is_drifted: bool,
drift_share: float,
model_name: str = MODEL_NAME,
drift_threshold_warn: float = 0.2,
drift_threshold_critical: float = 0.5,
):
"""Perform model registry actions based on drift detection results"""
# Check current production model version
try:
prod_version = client.get_model_version_by_alias(model_name, "production")
current_version = prod_version.version
print(f"Current production model version: {current_version}")
except Exception as e:
print(f"Failed to retrieve production model alias: {e}")
return
if not is_drifted:
print("No drift detected. Maintaining current model.")
client.set_model_version_tag(
model_name, current_version,
key="last_drift_check",
value="passed",
)
return
if drift_share >= drift_threshold_critical:
# Critical drift: immediately switch to fallback model + trigger retraining
print(f"CRITICAL: Drift ratio {drift_share:.1%} - Switching to fallback model and triggering retraining")
client.set_model_version_tag(
model_name, current_version,
key="drift_status", value="critical",
)
# Switch to fallback model if available
try:
fallback = client.get_model_version_by_alias(model_name, "fallback")
client.set_registered_model_alias(model_name, "production", fallback.version)
print(f"Switched to fallback model version {fallback.version}")
except Exception:
print("WARNING: No fallback model available. Maintaining current model while emergency retraining is needed.")
# Trigger retraining (external system call)
trigger_retraining(model_name, reason="critical_drift")
elif drift_share >= drift_threshold_warn:
# Warning level drift: record tag + notification
print(f"WARNING: Drift ratio {drift_share:.1%} - Enhanced monitoring and scheduling retraining")
client.set_model_version_tag(
model_name, current_version,
key="drift_status", value="warning",
)
# Add to scheduled retraining queue
schedule_retraining(model_name, priority="normal")
def trigger_retraining(model_name: str, reason: str):
"""Trigger emergency retraining (call Airflow DAG, Kubeflow Pipeline, etc.)"""
print(f"Retraining triggered: model={model_name}, reason={reason}")
# requests.post("http://airflow.internal/api/v1/dags/retrain/dagRuns", ...)
def schedule_retraining(model_name: str, priority: str):
"""Register in scheduled retraining queue"""
print(f"Retraining scheduled: model={model_name}, priority={priority}")
# Execute
handle_drift_detection(
is_drifted=True,
drift_share=0.55,
model_name=MODEL_NAME,
)
6. Building an Automatic Retraining Pipeline
The automated pipeline from drift detection to retraining consists of the following stages.
Overall Pipeline Flow
- Scheduler: Trigger drift check after batch inference or at regular intervals (daily/weekly)
- Drift Analyzer: Analyze current data against reference data with Evidently
- Decision Engine: Determine whether retraining is needed based on drift thresholds
- Retraining Orchestrator: Execute training jobs in Airflow/Kubeflow
- Champion/Challenger Evaluation: Compare and evaluate the new model against the existing model
- Deployment Gate: Auto-deploy if performance criteria are met, rollback on failure
Airflow DAG Integration Pattern
# Airflow DAG example: drift check + conditional retraining
# dag_drift_monitor.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
default_args = {
"owner": "ml-platform",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=30),
}
dag = DAG(
dag_id="ml_drift_monitor_fraud_detection",
default_args=default_args,
description="Daily drift monitoring and conditional retraining",
schedule_interval="0 6 * * *", # Daily at 6 AM
start_date=days_ago(1),
catchup=False,
tags=["ml-monitoring", "drift-detection"],
)
def fetch_data(**context):
"""Load reference data and last 24 hours of serving data"""
from sqlalchemy import create_engine
engine = create_engine("postgresql://reader:password@db.internal/features")
reference = pd.read_sql(
"SELECT * FROM fraud_features_reference", engine
)
current = pd.read_sql(
"""SELECT * FROM fraud_features_serving
WHERE created_at >= NOW() - INTERVAL '24 hours'""",
engine,
)
# Pass paths via XCom (store large data in S3)
ref_path = "/tmp/reference_data.parquet"
cur_path = "/tmp/current_data.parquet"
reference.to_parquet(ref_path)
current.to_parquet(cur_path)
context["ti"].xcom_push(key="reference_path", value=ref_path)
context["ti"].xcom_push(key="current_path", value=cur_path)
context["ti"].xcom_push(key="current_size", value=len(current))
def run_drift_check(**context):
"""Run Evidently drift analysis and log to MLflow"""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
import mlflow
ti = context["ti"]
ref_path = ti.xcom_pull(key="reference_path")
cur_path = ti.xcom_pull(key="current_path")
reference = pd.read_parquet(ref_path)
current = pd.read_parquet(cur_path)
# Validate minimum sample count
if len(current) < 100:
print(f"Insufficient current data samples: {len(current)}. Skipping drift check.")
ti.xcom_push(key="drift_action", value="skip")
return "skip_retraining"
report = Report(metrics=[DatasetDriftMetric(), DataDriftTable()])
report.run(reference_data=reference, current_data=current)
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
# Log to MLflow
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("monitoring/fraud-detection")
with mlflow.start_run(run_name=f"drift-{context['ds']}"):
mlflow.log_metric("drift_detected", int(drift_detected))
mlflow.log_metric("drift_share", drift_share)
ti.xcom_push(key="drift_detected", value=drift_detected)
ti.xcom_push(key="drift_share", value=drift_share)
def decide_action(**context):
"""Decide whether to retrain based on drift level"""
ti = context["ti"]
drift_detected = ti.xcom_pull(key="drift_detected")
drift_share = ti.xcom_pull(key="drift_share")
if drift_share is None or drift_share < 0.2:
return "skip_retraining"
elif drift_share >= 0.5:
return "trigger_emergency_retrain"
else:
return "trigger_scheduled_retrain"
fetch_task = PythonOperator(
task_id="fetch_data", python_callable=fetch_data, dag=dag,
)
drift_task = PythonOperator(
task_id="run_drift_check", python_callable=run_drift_check, dag=dag,
)
branch_task = BranchPythonOperator(
task_id="decide_action", python_callable=decide_action, dag=dag,
)
skip_task = EmptyOperator(task_id="skip_retraining", dag=dag)
scheduled_retrain = EmptyOperator(task_id="trigger_scheduled_retrain", dag=dag)
emergency_retrain = EmptyOperator(task_id="trigger_emergency_retrain", dag=dag)
fetch_task >> drift_task >> branch_task >> [skip_task, scheduled_retrain, emergency_retrain]
Retraining Trigger Threshold Guidelines
| Drift Level | drift_share Range | Recommended Action | Response Time |
|---|---|---|---|
| Normal | 0% ~ 15% | Maintain monitoring | - |
| Caution | 15% ~ 30% | Send alert, begin root cause analysis | Within 48 hrs |
| Warning | 30% ~ 50% | Register in scheduled retraining queue | Within 24 hrs |
| Critical | 50% or above | Immediate retraining + fallback model switch | Immediately |
Note: Thresholds should be adjusted based on domain and model characteristics. Domains with high missed detection costs like financial fraud detection should use lower thresholds (10-20%), while domains with wider tolerance like recommendation systems should apply higher thresholds (30-50%).
7. Monitoring Tool Comparison: Evidently vs NannyML vs WhyLabs vs Alibi Detect
There are several options for production ML monitoring tools. Let's compare the strengths and weaknesses of each.
| Criteria | Evidently AI | NannyML | WhyLabs | Alibi Detect |
|---|---|---|---|---|
| License | Apache 2.0 (OSS) | BSD-3 (OSS) | SaaS + Free tier | BSD-3 (OSS) |
| Core Strength | General data/model monitoring | Label-free performance estimation (CBPE) | Real-time streaming profiling | Advanced drift detection algorithms |
| Number of Drift Methods | 20+ | 10+ | 15+ | 15+ |
| Label-free Performance Estimation | Limited | Core feature (CBPE, DLE) | Not supported | Not supported |
| Real-time Monitoring | Collector mode | Not supported (batch) | Native support | Not supported (batch) |
| Visualization | Built-in HTML/dashboard | Built-in HTML | Web dashboard (SaaS) | Basic visualization |
| CI/CD Integration | Test Suite (native) | Limited | API-based | Manual configuration required |
| Prometheus Integration | Officially supported | Custom required | Built-in | Custom required |
| MLflow Integration | Easy (Python native) | Manual configuration | API integration | Manual configuration |
| Learning Curve | Low | Medium | Low (SaaS) | High |
| Production Use Cases | General purpose | Label-delayed environments | Large-scale real-time | Research/advanced detection |
Selection Guide:
- Environments where labels cannot be obtained immediately (e.g., financial fraud detection where label confirmation takes months): NannyML's CBPE (Confidence-Based Performance Estimation) is the only option.
- Open-source first + rapid adoption: Evidently AI provides the widest feature range with the lowest adoption barrier.
- Large-scale real-time streaming: WhyLabs' data profiling is optimized for processing tens of thousands of records per second.
- Research environments needing advanced statistical detection: Alibi Detect's deep kernel MMD and Learned Kernel drift detection are well-suited.
8. Grafana/Prometheus Dashboard Configuration
Let's look at how to expose Evidently's monitoring results as Prometheus metrics and visualize them as time series on Grafana dashboards.
Prometheus Metrics Export
# prometheus_drift_exporter.py
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
import pandas as pd
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheus metric definitions
DRIFT_DETECTED = Gauge(
"ml_model_drift_detected",
"Dataset drift detection status (0/1)",
["model_name", "model_version"],
)
DRIFT_SHARE = Gauge(
"ml_model_drift_column_share",
"Share of drifted columns",
["model_name", "model_version"],
)
COLUMN_DRIFT_SCORE = Gauge(
"ml_model_column_drift_score",
"Individual column drift score",
["model_name", "model_version", "column_name"],
)
DRIFT_CHECK_TOTAL = Counter(
"ml_model_drift_checks_total",
"Total drift check executions",
["model_name"],
)
DRIFT_CHECK_DURATION = Histogram(
"ml_model_drift_check_duration_seconds",
"Drift check execution duration",
["model_name"],
buckets=[0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
)
MODEL_NAME = "fraud-detector"
MODEL_VERSION = "3"
def run_periodic_drift_check(
reference_path: str,
current_query_fn,
interval_seconds: int = 300,
):
"""Periodic drift check and Prometheus metric update"""
reference = pd.read_parquet(reference_path)
while True:
try:
start_time = time.time()
# Load recent data
current = current_query_fn()
if current is None or len(current) < 50:
logger.warning(f"Insufficient current data: {len(current) if current is not None else 0} records")
time.sleep(interval_seconds)
continue
# Filter to feature columns only (exclude target and metadata columns)
feature_cols = [c for c in reference.columns if c not in ["target", "id", "timestamp"]]
ref_features = reference[feature_cols]
cur_features = current[feature_cols]
# Drift analysis
report = Report(metrics=[DatasetDriftMetric(), DataDriftTable()])
report.run(reference_data=ref_features, current_data=cur_features)
result = report.as_dict()
drift_result = result["metrics"][0]["result"]
column_results = result["metrics"][1]["result"]["drift_by_columns"]
# Update Prometheus metrics
DRIFT_DETECTED.labels(MODEL_NAME, MODEL_VERSION).set(
int(drift_result["dataset_drift"])
)
DRIFT_SHARE.labels(MODEL_NAME, MODEL_VERSION).set(
drift_result["share_of_drifted_columns"]
)
for col_name, col_info in column_results.items():
COLUMN_DRIFT_SCORE.labels(MODEL_NAME, MODEL_VERSION, col_name).set(
col_info.get("drift_score", 0.0)
)
DRIFT_CHECK_TOTAL.labels(MODEL_NAME).inc()
duration = time.time() - start_time
DRIFT_CHECK_DURATION.labels(MODEL_NAME).observe(duration)
logger.info(
f"Drift check complete: drift={drift_result['dataset_drift']}, "
f"share={drift_result['share_of_drifted_columns']:.2%}, "
f"duration={duration:.1f}s"
)
except Exception as e:
logger.error(f"Drift check failed: {e}", exc_info=True)
time.sleep(interval_seconds)
if __name__ == "__main__":
# Start Prometheus metrics HTTP server (port 8000)
start_http_server(8000)
logger.info("Prometheus metrics exporter started (port 8000)")
# Start periodic drift check (5-minute intervals)
run_periodic_drift_check(
reference_path="/data/reference/fraud_features_v3.parquet",
current_query_fn=lambda: pd.read_parquet("/data/serving/latest_batch.parquet"),
interval_seconds=300,
)
Grafana Dashboard Components
Configure the following panels in Grafana to comprehensively monitor ML model health status.
| Panel | Metric | Visualization Type | Alert Rule |
|---|---|---|---|
| Drift Status | ml_model_drift_detected | Stat (latest) | Critical alert when value is 1 |
| Drifted Column Ratio Trend | ml_model_drift_column_share | Time Series | Warning when exceeding 30% |
| Per-Column Drift Score | ml_model_column_drift_score | Heatmap | Highlight columns exceeding threshold |
| Check Duration | ml_model_drift_check_duration_seconds | Histogram | Warning when exceeding 60s |
| Check Execution Count | rate(ml_model_drift_checks_total[1h]) | Time Series | Alert when 0 (check stalled) |
Alertmanager Alert Rules Example
# prometheus-alerts.yaml
groups:
- name: ml_model_drift_alerts
rules:
- alert: MLModelDriftDetected
expr: ml_model_drift_detected == 1
for: 5m
labels:
severity: warning
team: ml-platform
annotations:
summary: 'ML model data drift detected'
description: 'Data drift detected in model {{ $labels.model_name }} v{{ $labels.model_version }}. Drifted column ratio: {{ $value }}'
- alert: MLModelCriticalDrift
expr: ml_model_drift_column_share > 0.5
for: 0m
labels:
severity: critical
team: ml-platform
annotations:
summary: 'ML model critical drift - Immediate action required'
description: 'Model {{ $labels.model_name }} drift column ratio is {{ $value | humanizePercentage }}. Immediate retraining or fallback switch is required.'
- alert: MLDriftCheckStalled
expr: rate(ml_model_drift_checks_total[1h]) == 0
for: 30m
labels:
severity: warning
team: ml-platform
annotations:
summary: 'ML drift check stalled'
description: 'Drift check for model {{ $labels.model_name }} has not run for over 30 minutes. Monitoring pipeline inspection is needed.'
9. Operational Considerations
False Positive Drift Management
The most common pitfall of statistical drift detection is false positives. Drift may be falsely detected in the following situations even when there is no actual problem.
Sample Size Effect: When the current data sample size is very large, KS tests and Chi-squared tests detect statistically significant but practically meaningless differences as drift. Complement with effect-size-based metrics such as PSI or Wasserstein distance to verify practical significance.
Seasonality: Purchase patterns during Black Friday in e-commerce are distinctly different from normal periods. If detected as drift, unnecessary alerts flood in at the same time every year. Set reference data to historical data from the same period, or apply seasonal adjustment logic.
Inter-feature Correlations: Drift detection on individual features alone cannot capture multivariate distribution changes. There are cases where features A and B each have similar distributions, but the correlation between A and B has changed. Evidently's DatasetDriftMetric provides dataset-level judgment, but if explicit multivariate detection is needed, consider Alibi Detect's MMD (Maximum Mean Discrepancy) method.
Reference Data Management Strategies
Reference data is the baseline for drift detection. Incorrect reference data invalidates all detection results.
| Strategy | Description | Suitable For | Caution |
|---|---|---|---|
| Fixed Training Data | Fix data used for model training as reference | Stable domains with little change | Reference itself becomes outdated over time |
| Sliding Window | Update reference with data from recent N days/weeks | Environments where gradual change is normal | Risk of missing gradual drift |
| Update at Retraining | Update reference each time model is retrained | Pipelines with regular retraining | Dependent on retraining cycle |
| Dual Baseline | Compare against both training data and recent stable period data | Environments requiring high accuracy | Increased management complexity |
Key Point: Reference data should be version-controlled and tracked with a 1:1 mapping to model versions. Storing reference data snapshots as MLflow artifacts is recommended.
Feature Store Integration
If feature computation logic differs between offline training and online serving (Training-Serving Skew), fake drift caused by implementation inconsistency rather than actual drift will occur. Using a feature store like Feast to ensure feature consistency between training and serving is the fundamental solution.
10. Failure Cases and Recovery Procedures
Case 1: Silent Model Degradation
Situation: An e-commerce recommendation model gradually degraded over 3 months. CTR dropped from 12% to 7%, but drift monitoring was configured only at the individual feature level and failed to detect it.
Root Cause: Multivariate change in user behavior patterns. Individual feature distributions (views, dwell time, category ratio) didn't change significantly, but the correlations between features changed. Specifically, the "dwell time - purchase conversion" relationship weakened due to changes in short-form content consumption patterns.
Recovery Procedure:
- Added multivariate drift detection (feature correlation matrix comparison)
- Added a concept drift monitoring layer that directly monitors business KPIs (CTR, conversion rate)
- Retrained model with last 2 weeks of data and deployed via A/B testing
- Shortened retraining cycle from monthly to weekly
Lesson: Data drift alone is insufficient to capture concept drift. Business metric monitoring must always be conducted in parallel.
Case 2: Fake Drift from Data Pipeline Failure
Situation: Late Friday night, a flood of Critical drift alerts. Over 80% column drift detected across 3 models simultaneously.
Root Cause: An upstream data pipeline ETL job failed, causing some columns in the serving feature table to be filled with default values (0 or null). A data quality issue was falsely detected as drift.
Recovery Procedure:
- Placed
TestNumberOfMissingValuesandTestShareOfOutRangeValuesin the Evidently TestSuite before the drift check stage - Skip drift check on data quality failure and send a separate data pipeline alert
- Added data completeness validation gate to upstream ETL
- Included "recent data quality check results" information in drift alerts
Lesson: A data quality validation step must always be placed before the drift detection pipeline. Distinguishing between data quality issues and actual distribution changes is the key to operations.
Case 3: Reference Data Contamination
Situation: After model retraining, reference data was updated with the new dataset. Afterwards, no drift was detected at all, rendering the monitoring system useless.
Root Cause: The data used for retraining already contained drift, and this contaminated data became the new reference. As a result, the drift was "normalized" and the baseline was reset.
Recovery Procedure:
- Automated drift comparison between new and previous reference data during reference update
- Added a gate to block reference updates when drift ratio exceeds a certain level
- Version-controlled reference data change history as MLflow artifacts
- Periodically compared against a golden dataset (manually verified high-quality data)
Lesson: Reference data is the baseline of the monitoring system, so any changes must go through a validation process.
11. Production Monitoring Checklist
Pre-deployment Checklist
- Is reference data version-controlled alongside model versions
- Are Evidently Report/TestSuite integrated into the deployment pipeline
- Are drift thresholds adjusted for domain characteristics
- Is data quality validation placed before the drift detection stage
- Is a fallback model registered in the registry
- Are Grafana dashboards and alert rules configured
Operational Checklist
- Are drift checks running on a normal schedule (monitoring the monitor)
- Is the false positive rate at a manageable level (recommended under 5 per month)
- Is reference data being updated at appropriate intervals
- Are retraining triggers working properly, and is champion/challenger evaluation being performed
- Are business KPIs and model performance metrics being tracked together
- Is the mean time to respond (MTTR) after receiving alerts within SLA
Concept Drift Response Checklist
- Is a label acquisition pipeline built (including delayed labels)
- Are model performance metrics (Accuracy, F1, AUC) time-series trends being monitored
- Are proxy metrics defined for periods without labels
- Is A/B testing infrastructure ready
12. References
- Evidently AI - Data Drift Official Guide - A comprehensive guide covering data drift concepts, detection methodologies, and real-world cases.
- Evidently AI GitHub Repository - Open-source code, example notebooks, and community discussions.
- MLflow Model Registry Official Documentation - Model registry API, alias system, and deployment workflow guide.
- Evidently AI Official Documentation - Complete API reference and tutorials for Report, TestSuite, and Metric.
- Advanced ML Model Monitoring: Drift Detection, Explainability, and Automated Retraining - Advanced patterns for drift detection and automatic retraining pipelines.
- Google - ML Technical Debt (Hidden Technical Debt in Machine Learning Systems) - Foundational paper on technical debt and monitoring needs in ML systems.
- NannyML - Estimating Model Performance without Ground Truth - CBPE methodology for estimating model performance without labels.
Quiz
Q1: What is the main topic covered in "ML Model Monitoring and Drift Detection: Evidently AI +
MLflow Production Operations Guide"?
A comprehensive guide covering production monitoring pipeline construction with Evidently AI and MLflow, data/concept drift detection, automatic retraining triggers, and operational troubleshooting.
Q2: What is Types of Drift: What Changes?
Drift refers to the discrepancy between the data distribution the model was trained on and the
data distribution at the time of serving. Drift is broadly classified into three categories based
on where and how it occurs.
Q3: Describe the Evidently AI Architecture and Core Features.
Evidently AI is an open-source library for ML model monitoring and data quality validation. It
operates in a Python-native environment and has over 20 built-in statistical drift detection
methods. Core Components Report: One-time data analysis report.
Q4: What are the key aspects of Evidently AI Practical Usage?
Installation and Basic Setup Automated Data Quality Validation with Test Suite
Q5: How does MLflow Model Registry and Monitoring Integration work?
MLflow provides experiment tracking, model packaging, and model registry functionality. By
recording Evidently's drift detection results in MLflow, you can track performance history and
drift status per model version on a single platform.