- 세 도구가 하나의 스택이 되어야 하는 이유
- 각 도구의 역할 경계와 통합 지점
- Kubeflow Pipeline에서 MLflow 실험 추적하기
- MLflow에서 KServe로: 모델 배포 자동화
- Canary 배포와 자동 롤백
- 세 도구의 버전 호환성 매트릭스
- 장애 시나리오별 대응
- 통합 운영 대시보드 구성
- 참고 자료

세 도구가 하나의 스택이 되어야 하는 이유
ML 프로젝트가 노트북에서 프로덕션으로 넘어가는 순간, 팀은 세 가지 독립적인 문제와 마주한다.
- 파이프라인 오케스트레이션: 데이터 전처리, 학습, 평가를 재현 가능한 워크플로로 실행해야 한다 -- Kubeflow Pipelines.
- 실험/모델 추적: 하이퍼파라미터, 메트릭, artifact를 중앙에서 관리하고 모델 레지스트리로 승격해야 한다 -- MLflow.
- 모델 서빙: 학습된 모델을 auto-scaling, canary, A/B 테스트가 가능한 inference endpoint로 배포해야 한다 -- KServe.
이 세 도구를 각각 운영하면 "학습 artifact 경로가 서빙 매니페스트와 안 맞는다", "파이프라인에서 등록한 모델 버전이 서빙에서 참조하는 버전과 다르다" 같은 접합부 장애가 반복된다. 이 글에서는 Kubeflow Pipelines v2 (2.3+), MLflow 2.17+, KServe 0.14+를 기준으로 세 도구의 통합 지점을 설계한다.
각 도구의 역할 경계와 통합 지점
| 구간 | 담당 도구 | 입력 | 출력 | 통합 인터페이스 |
|---|---|---|---|---|
| 데이터 검증 + 전처리 | Kubeflow Pipeline | Raw data (GCS/S3) | 전처리된 데이터셋 | Pipeline parameter로 데이터 경로 전달 |
| 학습 + 하이퍼파라미터 탐색 | Kubeflow Pipeline + MLflow | 전처리 데이터 | MLflow Run (메트릭, artifact) | MLflow tracking URI를 Pipeline 환경변수로 주입 |
| 모델 등록 + 승격 | MLflow Model Registry | 학습 완료 artifact | Registered Model Version | MLflow의 model URI를 KServe storageUri에 매핑 |
| 모델 서빙 + 트래픽 관리 | KServe | Model URI (GCS/S3) | Inference endpoint | KServe InferenceService가 MLflow model URI 참조 |
| 모니터링 + 롤백 | KServe + Prometheus | Inference metrics | 알림 / 자동 롤백 | Prometheus 메트릭 기반 canary 판정 |
Kubeflow Pipeline에서 MLflow 실험 추적하기
Kubeflow Pipeline의 각 component에서 MLflow로 메트릭과 artifact를 기록하는 것이 통합의 첫 단계다.
Pipeline Component 정의
"""
Kubeflow Pipelines v2 component로 학습을 수행하면서
MLflow에 실험 결과를 기록하는 예시.
"""
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=[
"mlflow==2.17.2",
"scikit-learn==1.5.2",
"pandas==2.2.3",
"boto3==1.35.0",
],
)
def train_model(
training_data: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics],
mlflow_tracking_uri: str,
mlflow_experiment_name: str,
n_estimators: int = 200,
max_depth: int = 10,
learning_rate: float = 0.1,
):
import mlflow
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
import json
import os
# MLflow 연결
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(mlflow_experiment_name)
# 데이터 로드
df = pd.read_parquet(training_data.path)
X = df.drop(columns=["label"])
y = df["label"]
with mlflow.start_run() as run:
# 하이퍼파라미터 기록
mlflow.log_params({
"n_estimators": n_estimators,
"max_depth": max_depth,
"learning_rate": learning_rate,
"feature_count": X.shape[1],
"training_rows": X.shape[0],
})
# 학습
clf = GradientBoostingClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
random_state=42,
)
cv_scores = cross_val_score(clf, X, y, cv=5, scoring="f1_weighted")
clf.fit(X, y)
# 메트릭 기록
mlflow.log_metrics({
"cv_f1_mean": float(cv_scores.mean()),
"cv_f1_std": float(cv_scores.std()),
})
# 모델 저장 (MLflow Model Registry 형식)
mlflow.sklearn.log_model(
clf,
artifact_path="model",
registered_model_name="recommendation-classifier",
)
# Kubeflow Metrics에도 기록 (UI 표시용)
metrics_output.log_metric("cv_f1_mean", float(cv_scores.mean()))
metrics_output.log_metric("cv_f1_std", float(cv_scores.std()))
metrics_output.log_metric("mlflow_run_id", run.info.run_id)
# 모델 경로를 output으로 전달 (다음 component에서 사용)
model_output.uri = f"runs:/{run.info.run_id}/model"
model_output.metadata["mlflow_run_id"] = run.info.run_id
Pipeline 전체 구성
from kfp import dsl, compiler
@dsl.pipeline(
name="recommendation-training-pipeline",
description="데이터 검증 -> 학습 -> 모델 등록 -> 서빙 배포",
)
def training_pipeline(
data_path: str = "gs://ml-data/recommendation/2026-03-04/",
mlflow_tracking_uri: str = "http://mlflow.ml-platform.svc:5000",
mlflow_experiment: str = "recommendation-v3",
serving_namespace: str = "model-serving",
):
# Step 1: 데이터 검증
validate_task = validate_data(data_path=data_path)
# Step 2: 전처리
preprocess_task = preprocess_data(
raw_data=validate_task.outputs["validated_data"],
)
# Step 3: 학습 + MLflow 추적
train_task = train_model(
training_data=preprocess_task.outputs["processed_data"],
mlflow_tracking_uri=mlflow_tracking_uri,
mlflow_experiment_name=mlflow_experiment,
n_estimators=300,
max_depth=8,
learning_rate=0.05,
)
train_task.set_cpu_limit("4").set_memory_limit("16Gi")
train_task.set_accelerator_type("nvidia.com/gpu").set_accelerator_limit(1)
# Step 4: 모델 품질 게이트
gate_task = quality_gate(
metrics=train_task.outputs["metrics_output"],
f1_threshold=0.80,
)
# Step 5: KServe 배포 (품질 게이트 통과 시)
with dsl.Condition(gate_task.outputs["passed"] == "true"):
deploy_task = deploy_to_kserve(
model=train_task.outputs["model_output"],
serving_namespace=serving_namespace,
)
compiler.Compiler().compile(
pipeline_func=training_pipeline,
package_path="recommendation_pipeline.yaml",
)
MLflow에서 KServe로: 모델 배포 자동화
MLflow Model Registry에서 모델을 "Production" 스테이지로 승격하면, KServe InferenceService를 자동 생성하는 컴포넌트다.
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["kubernetes==31.0.0", "mlflow==2.17.2"],
)
def deploy_to_kserve(
model: Input[Model],
serving_namespace: str,
canary_traffic_percent: int = 20,
):
"""
MLflow에 등록된 모델을 KServe InferenceService로 배포한다.
canary 방식으로 신규 버전에 트래픽 일부를 할당한다.
"""
from kubernetes import client, config
import json
import mlflow
config.load_incluster_config()
api = client.CustomObjectsApi()
mlflow_run_id = model.metadata.get("mlflow_run_id")
model_uri = model.uri # runs:/<run_id>/model
# MLflow에서 GCS 경로 추출
mlflow_client = mlflow.tracking.MlflowClient()
run = mlflow_client.get_run(mlflow_run_id)
artifact_uri = run.info.artifact_uri # gs://mlflow-artifacts/<exp>/<run>/artifacts
storage_uri = f"{artifact_uri}/model"
inference_service = {
"apiVersion": "serving.kserve.io/v1beta1",
"kind": "InferenceService",
"metadata": {
"name": "recommendation-classifier",
"namespace": serving_namespace,
"labels": {
"mlflow-run-id": mlflow_run_id,
"pipeline": "recommendation-training",
},
"annotations": {
"serving.kserve.io/deploymentMode": "Serverless",
"serving.kserve.io/autoscalerClass": "hpa",
"serving.kserve.io/metrics": "cpu",
"serving.kserve.io/targetUtilizationPercentage": "70",
},
},
"spec": {
"predictor": {
"canaryTrafficPercent": canary_traffic_percent,
"minReplicas": 2,
"maxReplicas": 10,
"model": {
"modelFormat": {"name": "mlflow"},
"storageUri": storage_uri,
"resources": {
"requests": {"cpu": "1", "memory": "2Gi"},
"limits": {"cpu": "2", "memory": "4Gi"},
},
},
},
},
}
try:
api.patch_namespaced_custom_object(
group="serving.kserve.io",
version="v1beta1",
namespace=serving_namespace,
plural="inferenceservices",
name="recommendation-classifier",
body=inference_service,
)
print(f"Updated InferenceService with canary {canary_traffic_percent}%")
except client.exceptions.ApiException as e:
if e.status == 404:
api.create_namespaced_custom_object(
group="serving.kserve.io",
version="v1beta1",
namespace=serving_namespace,
plural="inferenceservices",
body=inference_service,
)
print("Created new InferenceService")
else:
raise
Canary 배포와 자동 롤백
KServe의 canary 기능과 Prometheus 메트릭을 결합하여 자동 롤백 판정을 수행한다.
Canary 승격/롤백 판정 스크립트
"""
Canary 배포 후 일정 시간 동안 메트릭을 관찰하여
자동으로 승격하거나 롤백하는 판정 스크립트.
CronJob 또는 Argo Workflow step으로 실행한다.
"""
import requests
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class CanaryConfig:
service_name: str
namespace: str
prometheus_url: str
observation_minutes: int = 15
check_interval_seconds: int = 60
error_rate_threshold: float = 0.02 # 2%
p99_latency_threshold_ms: float = 500 # 500ms
min_request_count: int = 100 # 최소 관찰 요청 수
def query_prometheus(config: CanaryConfig, query: str) -> Optional[float]:
"""Prometheus에서 메트릭 값을 조회한다."""
resp = requests.get(
f"{config.prometheus_url}/api/v1/query",
params={"query": query},
timeout=10,
)
results = resp.json().get("data", {}).get("result", [])
if results:
return float(results[0]["value"][1])
return None
def evaluate_canary(config: CanaryConfig) -> dict:
"""Canary 버전의 error rate와 latency를 평가한다."""
error_rate_query = (
f'sum(rate(kserve_request_total{{service_name="{config.service_name}",'
f'namespace="{config.namespace}",response_code=~"5.."}}[5m])) / '
f'sum(rate(kserve_request_total{{service_name="{config.service_name}",'
f'namespace="{config.namespace}"}}[5m]))'
)
p99_query = (
f'histogram_quantile(0.99, sum(rate(kserve_request_duration_seconds_bucket'
f'{{service_name="{config.service_name}",namespace="{config.namespace}"}}'
f'[5m])) by (le)) * 1000'
)
request_count_query = (
f'sum(increase(kserve_request_total{{service_name="{config.service_name}",'
f'namespace="{config.namespace}"}}[{config.observation_minutes}m]))'
)
error_rate = query_prometheus(config, error_rate_query) or 0.0
p99_latency = query_prometheus(config, p99_query) or 0.0
request_count = query_prometheus(config, request_count_query) or 0
return {
"error_rate": error_rate,
"p99_latency_ms": p99_latency,
"request_count": request_count,
"error_rate_ok": error_rate < config.error_rate_threshold,
"latency_ok": p99_latency < config.p99_latency_threshold_ms,
"sufficient_traffic": request_count >= config.min_request_count,
}
def run_canary_judgment(config: CanaryConfig) -> str:
"""
observation_minutes 동안 메트릭을 관찰하고 승격/롤백을 결정한다.
Returns: "promote" or "rollback"
"""
checks_passed = 0
total_checks = config.observation_minutes * 60 // config.check_interval_seconds
for i in range(total_checks):
result = evaluate_canary(config)
print(f"Check {i+1}/{total_checks}: {result}")
if not result["sufficient_traffic"]:
print("Insufficient traffic, waiting...")
time.sleep(config.check_interval_seconds)
continue
if result["error_rate_ok"] and result["latency_ok"]:
checks_passed += 1
else:
# 즉시 롤백 조건: error rate가 임계치의 3배 초과
if result["error_rate"] > config.error_rate_threshold * 3:
print(f"Immediate rollback: error_rate={result['error_rate']}")
return "rollback"
time.sleep(config.check_interval_seconds)
pass_ratio = checks_passed / max(total_checks, 1)
decision = "promote" if pass_ratio >= 0.9 else "rollback"
print(f"Decision: {decision} (pass_ratio={pass_ratio:.2f})")
return decision
세 도구의 버전 호환성 매트릭스
실제 운영에서 버전 조합 문제가 빈번하게 발생한다. 검증된 조합을 정리한다.
| Kubeflow Pipelines | MLflow | KServe | Kubernetes | Python | 비고 |
|---|---|---|---|---|---|
| v2.3.0 | 2.17.x | 0.14.x | 1.28-1.30 | 3.11 | 2026년 3월 기준 안정 조합 |
| v2.2.0 | 2.15.x | 0.13.x | 1.27-1.29 | 3.10-3.11 | 이전 안정 버전 |
| v2.1.0 | 2.12.x | 0.12.x | 1.26-1.28 | 3.10 | LTS 유지보수 중 |
주의사항: MLflow 2.16에서 model signature 검증이 기본값으로 활성화되었다. KServe의 MLflow 서버가 signature가 없는 구 모델을 로드하면 MlflowException: Model signature is missing 에러가 발생한다. 기존 모델에 signature를 추가하거나 MLFLOW_MODEL_SIGNATURE_ENFORCEMENT=disabled 환경변수를 설정해야 한다.
장애 시나리오별 대응
시나리오 1: Kubeflow Pipeline 완료 후 MLflow에 Run이 기록되지 않음
증상: Pipeline이 성공(green)으로 표시되지만 MLflow UI에 해당 Run이 없음
에러 로그 (Pipeline pod):
requests.exceptions.ConnectionError: HTTPConnectionPool(host='mlflow.ml-platform.svc', port=5000):
Max retries exceeded with url: /api/2.0/mlflow/runs/create
원인: Kubeflow Pipeline pod의 ServiceAccount에 MLflow service에 대한
NetworkPolicy 접근이 허용되지 않음
해결:
1. NetworkPolicy에서 pipeline runner namespace -> mlflow namespace 허용
2. MLflow tracking URI가 cluster-internal DNS인지 확인
3. Pipeline component에 retry 로직 추가
시나리오 2: KServe가 MLflow 모델을 로드하지 못함
증상: InferenceService가 "FailedCreate" 상태로 멈춤
에러 로그 (kserve-container):
mlflow.exceptions.MlflowException: Could not find an "MLmodel" configuration file
at "gs://mlflow-artifacts/3/abc123def/artifacts/model"
원인: MLflow log_model의 artifact_path와 KServe storageUri의 경로 불일치
(artifact_path="model" 이지만 storageUri에 "/model" 이 빠짐)
해결:
storageUri를 artifact_uri + "/model" 형태로 조합해야 한다.
잘못된 예: gs://mlflow-artifacts/3/abc123def/artifacts
올바른 예: gs://mlflow-artifacts/3/abc123def/artifacts/model
시나리오 3: Canary 배포 중 이전 버전으로 롤백 불가
증상: canaryTrafficPercent을 0으로 설정해도 이전 버전으로 안 돌아감
에러 로그 (kserve-controller):
RevisionFailed: Revision "recommendation-classifier-predictor-prev"
has no ready pods
원인: KServe가 이전 Revision의 pod를 scale-to-zero한 상태에서
cold start 시간이 readiness probe timeout을 초과
해결:
1. 서빙 서비스에 minReplicas >= 1 설정 (중요 서비스는 scale-to-zero 비활성화)
2. readinessProbe timeout을 모델 로딩 시간 기준으로 늘림
3. 롤백 시에는 canary 비율 조정이 아닌 storageUri를 이전 버전으로 교체
# 롤백 시 적용할 KServe 매니페스트
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: recommendation-classifier
namespace: model-serving
spec:
predictor:
canaryTrafficPercent: 0 # canary 트래픽 제거
minReplicas: 2 # scale-to-zero 방지
model:
modelFormat:
name: mlflow
# 이전 안정 버전의 model URI로 교체
storageUri: gs://mlflow-artifacts/3/previous-stable-run/artifacts/model
resources:
requests:
cpu: '1'
memory: '2Gi'
limits:
cpu: '2'
memory: '4Gi'
containerSpec:
readinessProbe:
initialDelaySeconds: 30
timeoutSeconds: 10
periodSeconds: 5
통합 운영 대시보드 구성
세 도구에서 수집해야 할 핵심 메트릭을 Grafana 대시보드 하나에 통합한다.
| 영역 | 메트릭 | 소스 | 알림 임계치 |
|---|---|---|---|
| Pipeline | 실행 성공률, 평균 소요 시간 | Kubeflow Pipeline API | 성공률 < 95% |
| 학습 | cv_f1_mean 추이, 학습 시간 | MLflow Tracking | f1 < 이전 버전 - 0.02 |
| Registry | Pending 모델 수, 승격 대기 시간 | MLflow Model Registry | 대기 > 24h |
| 서빙 | error rate, p99 latency, RPS | KServe + Prometheus | error > 1%, p99 > 500ms |
| Canary | canary vs stable 메트릭 비교 | Prometheus | canary error > stable * 2 |
| 인프라 | GPU 사용률, Pod OOM 횟수 | Kubernetes metrics | OOM > 0/day |
퀴즈
Q1. Kubeflow Pipeline에서 MLflow tracking URI를 환경변수로 주입하는 이유는?
||Pipeline component가 Kubernetes Pod로 실행되므로 cluster-internal DNS를 통해 MLflow 서버에 접근해야 하고, 환경별(dev/staging/prod) URI가 다르기 때문이다.||
Q2. MLflow의 model artifact_path와 KServe storageUri 연결 시 가장 흔한 실수는?
||MLflow log_model에서 artifact_path="model"로 설정했을 때 storageUri에 "/model" suffix를 빠뜨려서 "MLmodel file not found" 에러가 발생하는 것이다.||
Q3. Canary 배포에서 자동 롤백 판정 시 error rate만 보면 안 되는 이유는?
||Error rate가 낮더라도 p99 latency가 급증하면 사용자 경험이 나빠지고, 또한 최소 요청 수(traffic volume) 없이 판정하면 통계적으로 무의미한 결론을 내릴 수 있다.||
Q4. KServe에서 scale-to-zero를 중요 서비스에 비활성화해야 하는 상황은?
||모델 로딩 시간이 길어서 cold start가 readiness probe timeout을 초과하는 경우, 또는 롤백 시 이전 버전의 pod이 즉시 필요한 경우에는 minReplicas >= 1로 설정해야 한다.||
Q5. MLflow 2.16+에서 model signature enforcement가 기존 모델에 미치는 영향은?
||Signature가 없는 구 모델을 KServe가 로드할 때 MlflowException이 발생하여 서빙이 실패한다. 기존 모델에 signature를 추가하거나 enforcement를 비활성화해야 한다.||
Q6. Pipeline의 quality gate에서 F1 threshold를 절대값(0.80)으로만 설정하면 생기는 문제는?
||이전 버전 대비 성능 하락을 감지하지 못한다. 예를 들어 기존 0.92에서 0.81로 급락해도 절대 threshold만 통과하면 배포된다. 상대 비교(이전 버전 대비 delta) 조건을 함께 걸어야 한다.||
Q7. 세 도구 통합 시 가장 먼저 표준화해야 할 인터페이스는?
||모델 artifact 경로 규칙이다. MLflow가 저장하는 artifact URI 형식과 KServe가 참조하는 storageUri 형식이 일관되어야 파이프라인에서 서빙까지 자동화된다.||
참고 자료
현재 단락 (1/366)
ML 프로젝트가 노트북에서 프로덕션으로 넘어가는 순간, 팀은 세 가지 독립적인 문제와 마주한다.