Skip to content
Published on

AI 플랫폼 스택 설계: Kubeflow, MLflow, KServe 통합 운영

Authors
AI 플랫폼 스택 설계: Kubeflow, MLflow, KServe 통합 운영

세 도구가 하나의 스택이 되어야 하는 이유

ML 프로젝트가 노트북에서 프로덕션으로 넘어가는 순간, 팀은 세 가지 독립적인 문제와 마주한다.

  1. 파이프라인 오케스트레이션: 데이터 전처리, 학습, 평가를 재현 가능한 워크플로로 실행해야 한다 -- Kubeflow Pipelines.
  2. 실험/모델 추적: 하이퍼파라미터, 메트릭, artifact를 중앙에서 관리하고 모델 레지스트리로 승격해야 한다 -- MLflow.
  3. 모델 서빙: 학습된 모델을 auto-scaling, canary, A/B 테스트가 가능한 inference endpoint로 배포해야 한다 -- KServe.

이 세 도구를 각각 운영하면 "학습 artifact 경로가 서빙 매니페스트와 안 맞는다", "파이프라인에서 등록한 모델 버전이 서빙에서 참조하는 버전과 다르다" 같은 접합부 장애가 반복된다. 이 글에서는 Kubeflow Pipelines v2 (2.3+), MLflow 2.17+, KServe 0.14+를 기준으로 세 도구의 통합 지점을 설계한다.

각 도구의 역할 경계와 통합 지점

구간담당 도구입력출력통합 인터페이스
데이터 검증 + 전처리Kubeflow PipelineRaw data (GCS/S3)전처리된 데이터셋Pipeline parameter로 데이터 경로 전달
학습 + 하이퍼파라미터 탐색Kubeflow Pipeline + MLflow전처리 데이터MLflow Run (메트릭, artifact)MLflow tracking URI를 Pipeline 환경변수로 주입
모델 등록 + 승격MLflow Model Registry학습 완료 artifactRegistered Model VersionMLflow의 model URI를 KServe storageUri에 매핑
모델 서빙 + 트래픽 관리KServeModel URI (GCS/S3)Inference endpointKServe InferenceService가 MLflow model URI 참조
모니터링 + 롤백KServe + PrometheusInference metrics알림 / 자동 롤백Prometheus 메트릭 기반 canary 판정

Kubeflow Pipeline에서 MLflow 실험 추적하기

Kubeflow Pipeline의 각 component에서 MLflow로 메트릭과 artifact를 기록하는 것이 통합의 첫 단계다.

Pipeline Component 정의

"""
Kubeflow Pipelines v2 component로 학습을 수행하면서
MLflow에 실험 결과를 기록하는 예시.
"""
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=[
        "mlflow==2.17.2",
        "scikit-learn==1.5.2",
        "pandas==2.2.3",
        "boto3==1.35.0",
    ],
)
def train_model(
    training_data: Input[Dataset],
    model_output: Output[Model],
    metrics_output: Output[Metrics],
    mlflow_tracking_uri: str,
    mlflow_experiment_name: str,
    n_estimators: int = 200,
    max_depth: int = 10,
    learning_rate: float = 0.1,
):
    import mlflow
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import cross_val_score
    import json
    import os

    # MLflow 연결
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(mlflow_experiment_name)

    # 데이터 로드
    df = pd.read_parquet(training_data.path)
    X = df.drop(columns=["label"])
    y = df["label"]

    with mlflow.start_run() as run:
        # 하이퍼파라미터 기록
        mlflow.log_params({
            "n_estimators": n_estimators,
            "max_depth": max_depth,
            "learning_rate": learning_rate,
            "feature_count": X.shape[1],
            "training_rows": X.shape[0],
        })

        # 학습
        clf = GradientBoostingClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            learning_rate=learning_rate,
            random_state=42,
        )
        cv_scores = cross_val_score(clf, X, y, cv=5, scoring="f1_weighted")
        clf.fit(X, y)

        # 메트릭 기록
        mlflow.log_metrics({
            "cv_f1_mean": float(cv_scores.mean()),
            "cv_f1_std": float(cv_scores.std()),
        })

        # 모델 저장 (MLflow Model Registry 형식)
        mlflow.sklearn.log_model(
            clf,
            artifact_path="model",
            registered_model_name="recommendation-classifier",
        )

        # Kubeflow Metrics에도 기록 (UI 표시용)
        metrics_output.log_metric("cv_f1_mean", float(cv_scores.mean()))
        metrics_output.log_metric("cv_f1_std", float(cv_scores.std()))
        metrics_output.log_metric("mlflow_run_id", run.info.run_id)

        # 모델 경로를 output으로 전달 (다음 component에서 사용)
        model_output.uri = f"runs:/{run.info.run_id}/model"
        model_output.metadata["mlflow_run_id"] = run.info.run_id

Pipeline 전체 구성

from kfp import dsl, compiler

@dsl.pipeline(
    name="recommendation-training-pipeline",
    description="데이터 검증 -> 학습 -> 모델 등록 -> 서빙 배포",
)
def training_pipeline(
    data_path: str = "gs://ml-data/recommendation/2026-03-04/",
    mlflow_tracking_uri: str = "http://mlflow.ml-platform.svc:5000",
    mlflow_experiment: str = "recommendation-v3",
    serving_namespace: str = "model-serving",
):
    # Step 1: 데이터 검증
    validate_task = validate_data(data_path=data_path)

    # Step 2: 전처리
    preprocess_task = preprocess_data(
        raw_data=validate_task.outputs["validated_data"],
    )

    # Step 3: 학습 + MLflow 추적
    train_task = train_model(
        training_data=preprocess_task.outputs["processed_data"],
        mlflow_tracking_uri=mlflow_tracking_uri,
        mlflow_experiment_name=mlflow_experiment,
        n_estimators=300,
        max_depth=8,
        learning_rate=0.05,
    )
    train_task.set_cpu_limit("4").set_memory_limit("16Gi")
    train_task.set_accelerator_type("nvidia.com/gpu").set_accelerator_limit(1)

    # Step 4: 모델 품질 게이트
    gate_task = quality_gate(
        metrics=train_task.outputs["metrics_output"],
        f1_threshold=0.80,
    )

    # Step 5: KServe 배포 (품질 게이트 통과 시)
    with dsl.Condition(gate_task.outputs["passed"] == "true"):
        deploy_task = deploy_to_kserve(
            model=train_task.outputs["model_output"],
            serving_namespace=serving_namespace,
        )

compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path="recommendation_pipeline.yaml",
)

MLflow에서 KServe로: 모델 배포 자동화

MLflow Model Registry에서 모델을 "Production" 스테이지로 승격하면, KServe InferenceService를 자동 생성하는 컴포넌트다.

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["kubernetes==31.0.0", "mlflow==2.17.2"],
)
def deploy_to_kserve(
    model: Input[Model],
    serving_namespace: str,
    canary_traffic_percent: int = 20,
):
    """
    MLflow에 등록된 모델을 KServe InferenceService로 배포한다.
    canary 방식으로 신규 버전에 트래픽 일부를 할당한다.
    """
    from kubernetes import client, config
    import json
    import mlflow

    config.load_incluster_config()
    api = client.CustomObjectsApi()

    mlflow_run_id = model.metadata.get("mlflow_run_id")
    model_uri = model.uri  # runs:/<run_id>/model

    # MLflow에서 GCS 경로 추출
    mlflow_client = mlflow.tracking.MlflowClient()
    run = mlflow_client.get_run(mlflow_run_id)
    artifact_uri = run.info.artifact_uri  # gs://mlflow-artifacts/<exp>/<run>/artifacts

    storage_uri = f"{artifact_uri}/model"

    inference_service = {
        "apiVersion": "serving.kserve.io/v1beta1",
        "kind": "InferenceService",
        "metadata": {
            "name": "recommendation-classifier",
            "namespace": serving_namespace,
            "labels": {
                "mlflow-run-id": mlflow_run_id,
                "pipeline": "recommendation-training",
            },
            "annotations": {
                "serving.kserve.io/deploymentMode": "Serverless",
                "serving.kserve.io/autoscalerClass": "hpa",
                "serving.kserve.io/metrics": "cpu",
                "serving.kserve.io/targetUtilizationPercentage": "70",
            },
        },
        "spec": {
            "predictor": {
                "canaryTrafficPercent": canary_traffic_percent,
                "minReplicas": 2,
                "maxReplicas": 10,
                "model": {
                    "modelFormat": {"name": "mlflow"},
                    "storageUri": storage_uri,
                    "resources": {
                        "requests": {"cpu": "1", "memory": "2Gi"},
                        "limits": {"cpu": "2", "memory": "4Gi"},
                    },
                },
            },
        },
    }

    try:
        api.patch_namespaced_custom_object(
            group="serving.kserve.io",
            version="v1beta1",
            namespace=serving_namespace,
            plural="inferenceservices",
            name="recommendation-classifier",
            body=inference_service,
        )
        print(f"Updated InferenceService with canary {canary_traffic_percent}%")
    except client.exceptions.ApiException as e:
        if e.status == 404:
            api.create_namespaced_custom_object(
                group="serving.kserve.io",
                version="v1beta1",
                namespace=serving_namespace,
                plural="inferenceservices",
                body=inference_service,
            )
            print("Created new InferenceService")
        else:
            raise

Canary 배포와 자동 롤백

KServe의 canary 기능과 Prometheus 메트릭을 결합하여 자동 롤백 판정을 수행한다.

Canary 승격/롤백 판정 스크립트

"""
Canary 배포 후 일정 시간 동안 메트릭을 관찰하여
자동으로 승격하거나 롤백하는 판정 스크립트.
CronJob 또는 Argo Workflow step으로 실행한다.
"""
import requests
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class CanaryConfig:
    service_name: str
    namespace: str
    prometheus_url: str
    observation_minutes: int = 15
    check_interval_seconds: int = 60
    error_rate_threshold: float = 0.02    # 2%
    p99_latency_threshold_ms: float = 500  # 500ms
    min_request_count: int = 100           # 최소 관찰 요청 수

def query_prometheus(config: CanaryConfig, query: str) -> Optional[float]:
    """Prometheus에서 메트릭 값을 조회한다."""
    resp = requests.get(
        f"{config.prometheus_url}/api/v1/query",
        params={"query": query},
        timeout=10,
    )
    results = resp.json().get("data", {}).get("result", [])
    if results:
        return float(results[0]["value"][1])
    return None

def evaluate_canary(config: CanaryConfig) -> dict:
    """Canary 버전의 error rate와 latency를 평가한다."""
    error_rate_query = (
        f'sum(rate(kserve_request_total{{service_name="{config.service_name}",'
        f'namespace="{config.namespace}",response_code=~"5.."}}[5m])) / '
        f'sum(rate(kserve_request_total{{service_name="{config.service_name}",'
        f'namespace="{config.namespace}"}}[5m]))'
    )
    p99_query = (
        f'histogram_quantile(0.99, sum(rate(kserve_request_duration_seconds_bucket'
        f'{{service_name="{config.service_name}",namespace="{config.namespace}"}}'
        f'[5m])) by (le)) * 1000'
    )
    request_count_query = (
        f'sum(increase(kserve_request_total{{service_name="{config.service_name}",'
        f'namespace="{config.namespace}"}}[{config.observation_minutes}m]))'
    )

    error_rate = query_prometheus(config, error_rate_query) or 0.0
    p99_latency = query_prometheus(config, p99_query) or 0.0
    request_count = query_prometheus(config, request_count_query) or 0

    return {
        "error_rate": error_rate,
        "p99_latency_ms": p99_latency,
        "request_count": request_count,
        "error_rate_ok": error_rate < config.error_rate_threshold,
        "latency_ok": p99_latency < config.p99_latency_threshold_ms,
        "sufficient_traffic": request_count >= config.min_request_count,
    }

def run_canary_judgment(config: CanaryConfig) -> str:
    """
    observation_minutes 동안 메트릭을 관찰하고 승격/롤백을 결정한다.
    Returns: "promote" or "rollback"
    """
    checks_passed = 0
    total_checks = config.observation_minutes * 60 // config.check_interval_seconds

    for i in range(total_checks):
        result = evaluate_canary(config)
        print(f"Check {i+1}/{total_checks}: {result}")

        if not result["sufficient_traffic"]:
            print("Insufficient traffic, waiting...")
            time.sleep(config.check_interval_seconds)
            continue

        if result["error_rate_ok"] and result["latency_ok"]:
            checks_passed += 1
        else:
            # 즉시 롤백 조건: error rate가 임계치의 3배 초과
            if result["error_rate"] > config.error_rate_threshold * 3:
                print(f"Immediate rollback: error_rate={result['error_rate']}")
                return "rollback"

        time.sleep(config.check_interval_seconds)

    pass_ratio = checks_passed / max(total_checks, 1)
    decision = "promote" if pass_ratio >= 0.9 else "rollback"
    print(f"Decision: {decision} (pass_ratio={pass_ratio:.2f})")
    return decision

세 도구의 버전 호환성 매트릭스

실제 운영에서 버전 조합 문제가 빈번하게 발생한다. 검증된 조합을 정리한다.

Kubeflow PipelinesMLflowKServeKubernetesPython비고
v2.3.02.17.x0.14.x1.28-1.303.112026년 3월 기준 안정 조합
v2.2.02.15.x0.13.x1.27-1.293.10-3.11이전 안정 버전
v2.1.02.12.x0.12.x1.26-1.283.10LTS 유지보수 중

주의사항: MLflow 2.16에서 model signature 검증이 기본값으로 활성화되었다. KServe의 MLflow 서버가 signature가 없는 구 모델을 로드하면 MlflowException: Model signature is missing 에러가 발생한다. 기존 모델에 signature를 추가하거나 MLFLOW_MODEL_SIGNATURE_ENFORCEMENT=disabled 환경변수를 설정해야 한다.

장애 시나리오별 대응

시나리오 1: Kubeflow Pipeline 완료 후 MLflow에 Run이 기록되지 않음

증상: Pipeline이 성공(green)으로 표시되지만 MLflow UI에 해당 Run이 없음
에러 로그 (Pipeline pod):
  requests.exceptions.ConnectionError: HTTPConnectionPool(host='mlflow.ml-platform.svc', port=5000):
  Max retries exceeded with url: /api/2.0/mlflow/runs/create

원인: Kubeflow Pipeline pod의 ServiceAccount에 MLflow service에 대한
      NetworkPolicy 접근이 허용되지 않음

해결:
  1. NetworkPolicy에서 pipeline runner namespace -> mlflow namespace 허용
  2. MLflow tracking URI가 cluster-internal DNS인지 확인
  3. Pipeline component에 retry 로직 추가

시나리오 2: KServe가 MLflow 모델을 로드하지 못함

증상: InferenceService가 "FailedCreate" 상태로 멈춤
에러 로그 (kserve-container):
  mlflow.exceptions.MlflowException: Could not find an "MLmodel" configuration file
  at "gs://mlflow-artifacts/3/abc123def/artifacts/model"

원인: MLflow log_model의 artifact_path와 KServe storageUri의 경로 불일치
      (artifact_path="model" 이지만 storageUri에 "/model" 이 빠짐)

해결:
  storageUri를 artifact_uri + "/model" 형태로 조합해야 한다.
  잘못된: gs://mlflow-artifacts/3/abc123def/artifacts
  올바른 예: gs://mlflow-artifacts/3/abc123def/artifacts/model

시나리오 3: Canary 배포 중 이전 버전으로 롤백 불가

증상: canaryTrafficPercent을 0으로 설정해도 이전 버전으로 안 돌아감
에러 로그 (kserve-controller):
  RevisionFailed: Revision "recommendation-classifier-predictor-prev"
  has no ready pods

원인: KServe가 이전 Revision의 pod를 scale-to-zero한 상태에서
      cold start 시간이 readiness probe timeout을 초과

해결:
  1. 서빙 서비스에 minReplicas >= 1 설정 (중요 서비스는 scale-to-zero 비활성화)
  2. readinessProbe timeout을 모델 로딩 시간 기준으로 늘림
  3. 롤백 시에는 canary 비율 조정이 아닌 storageUri를 이전 버전으로 교체
# 롤백 시 적용할 KServe 매니페스트
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: recommendation-classifier
  namespace: model-serving
spec:
  predictor:
    canaryTrafficPercent: 0 # canary 트래픽 제거
    minReplicas: 2 # scale-to-zero 방지
    model:
      modelFormat:
        name: mlflow
      # 이전 안정 버전의 model URI로 교체
      storageUri: gs://mlflow-artifacts/3/previous-stable-run/artifacts/model
      resources:
        requests:
          cpu: '1'
          memory: '2Gi'
        limits:
          cpu: '2'
          memory: '4Gi'
    containerSpec:
      readinessProbe:
        initialDelaySeconds: 30
        timeoutSeconds: 10
        periodSeconds: 5

통합 운영 대시보드 구성

세 도구에서 수집해야 할 핵심 메트릭을 Grafana 대시보드 하나에 통합한다.

영역메트릭소스알림 임계치
Pipeline실행 성공률, 평균 소요 시간Kubeflow Pipeline API성공률 < 95%
학습cv_f1_mean 추이, 학습 시간MLflow Trackingf1 < 이전 버전 - 0.02
RegistryPending 모델 수, 승격 대기 시간MLflow Model Registry대기 > 24h
서빙error rate, p99 latency, RPSKServe + Prometheuserror > 1%, p99 > 500ms
Canarycanary vs stable 메트릭 비교Prometheuscanary error > stable * 2
인프라GPU 사용률, Pod OOM 횟수Kubernetes metricsOOM > 0/day
퀴즈

Q1. Kubeflow Pipeline에서 MLflow tracking URI를 환경변수로 주입하는 이유는?

||Pipeline component가 Kubernetes Pod로 실행되므로 cluster-internal DNS를 통해 MLflow 서버에 접근해야 하고, 환경별(dev/staging/prod) URI가 다르기 때문이다.||

Q2. MLflow의 model artifact_path와 KServe storageUri 연결 시 가장 흔한 실수는?

||MLflow log_model에서 artifact_path="model"로 설정했을 때 storageUri에 "/model" suffix를 빠뜨려서 "MLmodel file not found" 에러가 발생하는 것이다.||

Q3. Canary 배포에서 자동 롤백 판정 시 error rate만 보면 안 되는 이유는?

||Error rate가 낮더라도 p99 latency가 급증하면 사용자 경험이 나빠지고, 또한 최소 요청 수(traffic volume) 없이 판정하면 통계적으로 무의미한 결론을 내릴 수 있다.||

Q4. KServe에서 scale-to-zero를 중요 서비스에 비활성화해야 하는 상황은?

||모델 로딩 시간이 길어서 cold start가 readiness probe timeout을 초과하는 경우, 또는 롤백 시 이전 버전의 pod이 즉시 필요한 경우에는 minReplicas >= 1로 설정해야 한다.||

Q5. MLflow 2.16+에서 model signature enforcement가 기존 모델에 미치는 영향은?

||Signature가 없는 구 모델을 KServe가 로드할 때 MlflowException이 발생하여 서빙이 실패한다. 기존 모델에 signature를 추가하거나 enforcement를 비활성화해야 한다.||

Q6. Pipeline의 quality gate에서 F1 threshold를 절대값(0.80)으로만 설정하면 생기는 문제는?

||이전 버전 대비 성능 하락을 감지하지 못한다. 예를 들어 기존 0.92에서 0.81로 급락해도 절대 threshold만 통과하면 배포된다. 상대 비교(이전 버전 대비 delta) 조건을 함께 걸어야 한다.||

Q7. 세 도구 통합 시 가장 먼저 표준화해야 할 인터페이스는?

||모델 artifact 경로 규칙이다. MLflow가 저장하는 artifact URI 형식과 KServe가 참조하는 storageUri 형식이 일관되어야 파이프라인에서 서빙까지 자동화된다.||

참고 자료