Skip to content

필사 모드: AI 플랫폼 스택 설계: Kubeflow, MLflow, KServe 통합 운영

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

세 도구가 하나의 스택이 되어야 하는 이유

ML 프로젝트가 노트북에서 프로덕션으로 넘어가는 순간, 팀은 세 가지 독립적인 문제와 마주한다.

1. **파이프라인 오케스트레이션**: 데이터 전처리, 학습, 평가를 재현 가능한 워크플로로 실행해야 한다 -- Kubeflow Pipelines.

2. **실험/모델 추적**: 하이퍼파라미터, 메트릭, artifact를 중앙에서 관리하고 모델 레지스트리로 승격해야 한다 -- MLflow.

3. **모델 서빙**: 학습된 모델을 auto-scaling, canary, A/B 테스트가 가능한 inference endpoint로 배포해야 한다 -- KServe.

이 세 도구를 각각 운영하면 "학습 artifact 경로가 서빙 매니페스트와 안 맞는다", "파이프라인에서 등록한 모델 버전이 서빙에서 참조하는 버전과 다르다" 같은 접합부 장애가 반복된다. 이 글에서는 Kubeflow Pipelines v2 (2.3+), MLflow 2.17+, KServe 0.14+를 기준으로 세 도구의 통합 지점을 설계한다.

각 도구의 역할 경계와 통합 지점

| 구간 | 담당 도구 | 입력 | 출력 | 통합 인터페이스 |

| -------------------------- | -------------------------- | ------------------ | ----------------------------- | ----------------------------------------------- |

| 데이터 검증 + 전처리 | Kubeflow Pipeline | Raw data (GCS/S3) | 전처리된 데이터셋 | Pipeline parameter로 데이터 경로 전달 |

| 학습 + 하이퍼파라미터 탐색 | Kubeflow Pipeline + MLflow | 전처리 데이터 | MLflow Run (메트릭, artifact) | MLflow tracking URI를 Pipeline 환경변수로 주입 |

| 모델 등록 + 승격 | MLflow Model Registry | 학습 완료 artifact | Registered Model Version | MLflow의 model URI를 KServe storageUri에 매핑 |

| 모델 서빙 + 트래픽 관리 | KServe | Model URI (GCS/S3) | Inference endpoint | KServe InferenceService가 MLflow model URI 참조 |

| 모니터링 + 롤백 | KServe + Prometheus | Inference metrics | 알림 / 자동 롤백 | Prometheus 메트릭 기반 canary 판정 |

Kubeflow Pipeline에서 MLflow 실험 추적하기

Kubeflow Pipeline의 각 component에서 MLflow로 메트릭과 artifact를 기록하는 것이 통합의 첫 단계다.

Pipeline Component 정의

"""

Kubeflow Pipelines v2 component로 학습을 수행하면서

MLflow에 실험 결과를 기록하는 예시.

"""

from kfp import dsl

from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=[

"mlflow==2.17.2",

"scikit-learn==1.5.2",

"pandas==2.2.3",

"boto3==1.35.0",

],

)

def train_model(

training_data: Input[Dataset],

model_output: Output[Model],

metrics_output: Output[Metrics],

mlflow_tracking_uri: str,

mlflow_experiment_name: str,

n_estimators: int = 200,

max_depth: int = 10,

learning_rate: float = 0.1,

):

from sklearn.ensemble import GradientBoostingClassifier

from sklearn.model_selection import cross_val_score

MLflow 연결

mlflow.set_tracking_uri(mlflow_tracking_uri)

mlflow.set_experiment(mlflow_experiment_name)

데이터 로드

df = pd.read_parquet(training_data.path)

X = df.drop(columns=["label"])

y = df["label"]

with mlflow.start_run() as run:

하이퍼파라미터 기록

mlflow.log_params({

"n_estimators": n_estimators,

"max_depth": max_depth,

"learning_rate": learning_rate,

"feature_count": X.shape[1],

"training_rows": X.shape[0],

})

학습

clf = GradientBoostingClassifier(

n_estimators=n_estimators,

max_depth=max_depth,

learning_rate=learning_rate,

random_state=42,

)

cv_scores = cross_val_score(clf, X, y, cv=5, scoring="f1_weighted")

clf.fit(X, y)

메트릭 기록

mlflow.log_metrics({

"cv_f1_mean": float(cv_scores.mean()),

"cv_f1_std": float(cv_scores.std()),

})

모델 저장 (MLflow Model Registry 형식)

mlflow.sklearn.log_model(

clf,

artifact_path="model",

registered_model_name="recommendation-classifier",

)

Kubeflow Metrics에도 기록 (UI 표시용)

metrics_output.log_metric("cv_f1_mean", float(cv_scores.mean()))

metrics_output.log_metric("cv_f1_std", float(cv_scores.std()))

metrics_output.log_metric("mlflow_run_id", run.info.run_id)

모델 경로를 output으로 전달 (다음 component에서 사용)

model_output.uri = f"runs:/{run.info.run_id}/model"

model_output.metadata["mlflow_run_id"] = run.info.run_id

Pipeline 전체 구성

from kfp import dsl, compiler

@dsl.pipeline(

name="recommendation-training-pipeline",

description="데이터 검증 -> 학습 -> 모델 등록 -> 서빙 배포",

)

def training_pipeline(

data_path: str = "gs://ml-data/recommendation/2026-03-04/",

mlflow_tracking_uri: str = "http://mlflow.ml-platform.svc:5000",

mlflow_experiment: str = "recommendation-v3",

serving_namespace: str = "model-serving",

):

Step 1: 데이터 검증

validate_task = validate_data(data_path=data_path)

Step 2: 전처리

preprocess_task = preprocess_data(

raw_data=validate_task.outputs["validated_data"],

)

Step 3: 학습 + MLflow 추적

train_task = train_model(

training_data=preprocess_task.outputs["processed_data"],

mlflow_tracking_uri=mlflow_tracking_uri,

mlflow_experiment_name=mlflow_experiment,

n_estimators=300,

max_depth=8,

learning_rate=0.05,

)

train_task.set_cpu_limit("4").set_memory_limit("16Gi")

train_task.set_accelerator_type("nvidia.com/gpu").set_accelerator_limit(1)

Step 4: 모델 품질 게이트

gate_task = quality_gate(

metrics=train_task.outputs["metrics_output"],

f1_threshold=0.80,

)

Step 5: KServe 배포 (품질 게이트 통과 시)

with dsl.Condition(gate_task.outputs["passed"] == "true"):

deploy_task = deploy_to_kserve(

model=train_task.outputs["model_output"],

serving_namespace=serving_namespace,

)

compiler.Compiler().compile(

pipeline_func=training_pipeline,

package_path="recommendation_pipeline.yaml",

)

MLflow에서 KServe로: 모델 배포 자동화

MLflow Model Registry에서 모델을 "Production" 스테이지로 승격하면, KServe InferenceService를 자동 생성하는 컴포넌트다.

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["kubernetes==31.0.0", "mlflow==2.17.2"],

)

def deploy_to_kserve(

model: Input[Model],

serving_namespace: str,

canary_traffic_percent: int = 20,

):

"""

MLflow에 등록된 모델을 KServe InferenceService로 배포한다.

canary 방식으로 신규 버전에 트래픽 일부를 할당한다.

"""

from kubernetes import client, config

config.load_incluster_config()

api = client.CustomObjectsApi()

mlflow_run_id = model.metadata.get("mlflow_run_id")

model_uri = model.uri # runs:/<run_id>/model

MLflow에서 GCS 경로 추출

mlflow_client = mlflow.tracking.MlflowClient()

run = mlflow_client.get_run(mlflow_run_id)

artifact_uri = run.info.artifact_uri # gs://mlflow-artifacts/<exp>/<run>/artifacts

storage_uri = f"{artifact_uri}/model"

inference_service = {

"apiVersion": "serving.kserve.io/v1beta1",

"kind": "InferenceService",

"metadata": {

"name": "recommendation-classifier",

"namespace": serving_namespace,

"labels": {

"mlflow-run-id": mlflow_run_id,

"pipeline": "recommendation-training",

},

"annotations": {

"serving.kserve.io/deploymentMode": "Serverless",

"serving.kserve.io/autoscalerClass": "hpa",

"serving.kserve.io/metrics": "cpu",

"serving.kserve.io/targetUtilizationPercentage": "70",

},

},

"spec": {

"predictor": {

"canaryTrafficPercent": canary_traffic_percent,

"minReplicas": 2,

"maxReplicas": 10,

"model": {

"modelFormat": {"name": "mlflow"},

"storageUri": storage_uri,

"resources": {

"requests": {"cpu": "1", "memory": "2Gi"},

"limits": {"cpu": "2", "memory": "4Gi"},

},

},

},

},

}

try:

api.patch_namespaced_custom_object(

group="serving.kserve.io",

version="v1beta1",

namespace=serving_namespace,

plural="inferenceservices",

name="recommendation-classifier",

body=inference_service,

)

print(f"Updated InferenceService with canary {canary_traffic_percent}%")

except client.exceptions.ApiException as e:

if e.status == 404:

api.create_namespaced_custom_object(

group="serving.kserve.io",

version="v1beta1",

namespace=serving_namespace,

plural="inferenceservices",

body=inference_service,

)

print("Created new InferenceService")

else:

raise

Canary 배포와 자동 롤백

KServe의 canary 기능과 Prometheus 메트릭을 결합하여 자동 롤백 판정을 수행한다.

Canary 승격/롤백 판정 스크립트

"""

Canary 배포 후 일정 시간 동안 메트릭을 관찰하여

자동으로 승격하거나 롤백하는 판정 스크립트.

CronJob 또는 Argo Workflow step으로 실행한다.

"""

from dataclasses import dataclass

from typing import Optional

@dataclass

class CanaryConfig:

service_name: str

namespace: str

prometheus_url: str

observation_minutes: int = 15

check_interval_seconds: int = 60

error_rate_threshold: float = 0.02 # 2%

p99_latency_threshold_ms: float = 500 # 500ms

min_request_count: int = 100 # 최소 관찰 요청 수

def query_prometheus(config: CanaryConfig, query: str) -> Optional[float]:

"""Prometheus에서 메트릭 값을 조회한다."""

resp = requests.get(

f"{config.prometheus_url}/api/v1/query",

params={"query": query},

timeout=10,

)

results = resp.json().get("data", {}).get("result", [])

if results:

return float(results[0]["value"][1])

return None

def evaluate_canary(config: CanaryConfig) -> dict:

"""Canary 버전의 error rate와 latency를 평가한다."""

error_rate_query = (

f'sum(rate(kserve_request_total{{service_name="{config.service_name}",'

f'namespace="{config.namespace}",response_code=~"5.."}}[5m])) / '

f'sum(rate(kserve_request_total{{service_name="{config.service_name}",'

f'namespace="{config.namespace}"}}[5m]))'

)

p99_query = (

f'histogram_quantile(0.99, sum(rate(kserve_request_duration_seconds_bucket'

f'{{service_name="{config.service_name}",namespace="{config.namespace}"}}'

f'[5m])) by (le)) * 1000'

)

request_count_query = (

f'sum(increase(kserve_request_total{{service_name="{config.service_name}",'

f'namespace="{config.namespace}"}}[{config.observation_minutes}m]))'

)

error_rate = query_prometheus(config, error_rate_query) or 0.0

p99_latency = query_prometheus(config, p99_query) or 0.0

request_count = query_prometheus(config, request_count_query) or 0

return {

"error_rate": error_rate,

"p99_latency_ms": p99_latency,

"request_count": request_count,

"error_rate_ok": error_rate < config.error_rate_threshold,

"latency_ok": p99_latency < config.p99_latency_threshold_ms,

"sufficient_traffic": request_count >= config.min_request_count,

}

def run_canary_judgment(config: CanaryConfig) -> str:

"""

observation_minutes 동안 메트릭을 관찰하고 승격/롤백을 결정한다.

Returns: "promote" or "rollback"

"""

checks_passed = 0

total_checks = config.observation_minutes * 60 // config.check_interval_seconds

for i in range(total_checks):

result = evaluate_canary(config)

print(f"Check {i+1}/{total_checks}: {result}")

if not result["sufficient_traffic"]:

print("Insufficient traffic, waiting...")

time.sleep(config.check_interval_seconds)

continue

if result["error_rate_ok"] and result["latency_ok"]:

checks_passed += 1

else:

즉시 롤백 조건: error rate가 임계치의 3배 초과

if result["error_rate"] > config.error_rate_threshold * 3:

print(f"Immediate rollback: error_rate={result['error_rate']}")

return "rollback"

time.sleep(config.check_interval_seconds)

pass_ratio = checks_passed / max(total_checks, 1)

decision = "promote" if pass_ratio >= 0.9 else "rollback"

print(f"Decision: {decision} (pass_ratio={pass_ratio:.2f})")

return decision

세 도구의 버전 호환성 매트릭스

실제 운영에서 버전 조합 문제가 빈번하게 발생한다. 검증된 조합을 정리한다.

| Kubeflow Pipelines | MLflow | KServe | Kubernetes | Python | 비고 |

| ------------------ | ------ | ------ | ---------- | --------- | ------------------------- |

| v2.3.0 | 2.17.x | 0.14.x | 1.28-1.30 | 3.11 | 2026년 3월 기준 안정 조합 |

| v2.2.0 | 2.15.x | 0.13.x | 1.27-1.29 | 3.10-3.11 | 이전 안정 버전 |

| v2.1.0 | 2.12.x | 0.12.x | 1.26-1.28 | 3.10 | LTS 유지보수 중 |

**주의사항**: MLflow 2.16에서 model signature 검증이 기본값으로 활성화되었다. KServe의 MLflow 서버가 signature가 없는 구 모델을 로드하면 `MlflowException: Model signature is missing` 에러가 발생한다. 기존 모델에 signature를 추가하거나 `MLFLOW_MODEL_SIGNATURE_ENFORCEMENT=disabled` 환경변수를 설정해야 한다.

장애 시나리오별 대응

시나리오 1: Kubeflow Pipeline 완료 후 MLflow에 Run이 기록되지 않음

증상: Pipeline이 성공(green)으로 표시되지만 MLflow UI에 해당 Run이 없음

에러 로그 (Pipeline pod):

requests.exceptions.ConnectionError: HTTPConnectionPool(host='mlflow.ml-platform.svc', port=5000):

Max retries exceeded with url: /api/2.0/mlflow/runs/create

원인: Kubeflow Pipeline pod의 ServiceAccount에 MLflow service에 대한

NetworkPolicy 접근이 허용되지 않음

해결:

1. NetworkPolicy에서 pipeline runner namespace -> mlflow namespace 허용

2. MLflow tracking URI가 cluster-internal DNS인지 확인

3. Pipeline component에 retry 로직 추가

시나리오 2: KServe가 MLflow 모델을 로드하지 못함

증상: InferenceService가 "FailedCreate" 상태로 멈춤

에러 로그 (kserve-container):

mlflow.exceptions.MlflowException: Could not find an "MLmodel" configuration file

at "gs://mlflow-artifacts/3/abc123def/artifacts/model"

원인: MLflow log_model의 artifact_path와 KServe storageUri의 경로 불일치

(artifact_path="model" 이지만 storageUri에 "/model" 이 빠짐)

해결:

storageUri를 artifact_uri + "/model" 형태로 조합해야 한다.

잘못된 예: gs://mlflow-artifacts/3/abc123def/artifacts

올바른 예: gs://mlflow-artifacts/3/abc123def/artifacts/model

시나리오 3: Canary 배포 중 이전 버전으로 롤백 불가

증상: canaryTrafficPercent을 0으로 설정해도 이전 버전으로 안 돌아감

에러 로그 (kserve-controller):

RevisionFailed: Revision "recommendation-classifier-predictor-prev"

has no ready pods

원인: KServe가 이전 Revision의 pod를 scale-to-zero한 상태에서

cold start 시간이 readiness probe timeout을 초과

해결:

1. 서빙 서비스에 minReplicas >= 1 설정 (중요 서비스는 scale-to-zero 비활성화)

2. readinessProbe timeout을 모델 로딩 시간 기준으로 늘림

3. 롤백 시에는 canary 비율 조정이 아닌 storageUri를 이전 버전으로 교체

롤백 시 적용할 KServe 매니페스트

apiVersion: serving.kserve.io/v1beta1

kind: InferenceService

metadata:

name: recommendation-classifier

namespace: model-serving

spec:

predictor:

canaryTrafficPercent: 0 # canary 트래픽 제거

minReplicas: 2 # scale-to-zero 방지

model:

modelFormat:

name: mlflow

이전 안정 버전의 model URI로 교체

storageUri: gs://mlflow-artifacts/3/previous-stable-run/artifacts/model

resources:

requests:

cpu: '1'

memory: '2Gi'

limits:

cpu: '2'

memory: '4Gi'

containerSpec:

readinessProbe:

initialDelaySeconds: 30

timeoutSeconds: 10

periodSeconds: 5

통합 운영 대시보드 구성

세 도구에서 수집해야 할 핵심 메트릭을 Grafana 대시보드 하나에 통합한다.

| 영역 | 메트릭 | 소스 | 알림 임계치 |

| -------- | ------------------------------- | --------------------- | -------------------------- |

| Pipeline | 실행 성공률, 평균 소요 시간 | Kubeflow Pipeline API | 성공률 < 95% |

| 학습 | cv_f1_mean 추이, 학습 시간 | MLflow Tracking | f1 < 이전 버전 - 0.02 |

| Registry | Pending 모델 수, 승격 대기 시간 | MLflow Model Registry | 대기 > 24h |

| 서빙 | error rate, p99 latency, RPS | KServe + Prometheus | error > 1%, p99 > 500ms |

| Canary | canary vs stable 메트릭 비교 | Prometheus | canary error > stable \* 2 |

| 인프라 | GPU 사용률, Pod OOM 횟수 | Kubernetes metrics | OOM > 0/day |

**Q1. Kubeflow Pipeline에서 MLflow tracking URI를 환경변수로 주입하는 이유는?**

||Pipeline component가 Kubernetes Pod로 실행되므로 cluster-internal DNS를 통해 MLflow 서버에 접근해야 하고, 환경별(dev/staging/prod) URI가 다르기 때문이다.||

**Q2. MLflow의 model artifact_path와 KServe storageUri 연결 시 가장 흔한 실수는?**

||MLflow log_model에서 artifact_path="model"로 설정했을 때 storageUri에 "/model" suffix를 빠뜨려서 "MLmodel file not found" 에러가 발생하는 것이다.||

**Q3. Canary 배포에서 자동 롤백 판정 시 error rate만 보면 안 되는 이유는?**

||Error rate가 낮더라도 p99 latency가 급증하면 사용자 경험이 나빠지고, 또한 최소 요청 수(traffic volume) 없이 판정하면 통계적으로 무의미한 결론을 내릴 수 있다.||

**Q4. KServe에서 scale-to-zero를 중요 서비스에 비활성화해야 하는 상황은?**

||모델 로딩 시간이 길어서 cold start가 readiness probe timeout을 초과하는 경우, 또는 롤백 시 이전 버전의 pod이 즉시 필요한 경우에는 minReplicas >= 1로 설정해야 한다.||

**Q5. MLflow 2.16+에서 model signature enforcement가 기존 모델에 미치는 영향은?**

||Signature가 없는 구 모델을 KServe가 로드할 때 MlflowException이 발생하여 서빙이 실패한다. 기존 모델에 signature를 추가하거나 enforcement를 비활성화해야 한다.||

**Q6. Pipeline의 quality gate에서 F1 threshold를 절대값(0.80)으로만 설정하면 생기는 문제는?**

||이전 버전 대비 성능 하락을 감지하지 못한다. 예를 들어 기존 0.92에서 0.81로 급락해도 절대 threshold만 통과하면 배포된다. 상대 비교(이전 버전 대비 delta) 조건을 함께 걸어야 한다.||

**Q7. 세 도구 통합 시 가장 먼저 표준화해야 할 인터페이스는?**

||모델 artifact 경로 규칙이다. MLflow가 저장하는 artifact URI 형식과 KServe가 참조하는 storageUri 형식이 일관되어야 파이프라인에서 서빙까지 자동화된다.||

참고 자료

- [Kubeflow Pipelines v2 공식 문서](https://www.kubeflow.org/docs/components/pipelines/)

- [MLflow 2.17 Release Notes](https://mlflow.org/docs/latest/index.html)

- [KServe 0.14 공식 문서](https://kserve.github.io/website/)

- [KServe + MLflow Integration Guide](https://kserve.github.io/website/latest/modelserving/v1beta1/mlflow/v2/)

- [Kubeflow Pipelines SDK v2 Reference](https://kubeflow-pipelines.readthedocs.io/)

현재 단락 (1/363)

ML 프로젝트가 노트북에서 프로덕션으로 넘어가는 순간, 팀은 세 가지 독립적인 문제와 마주한다.

작성 글자: 0원문 글자: 13,265작성 단락: 0/363