- Published on
Kubeflow Pipelines ML 워크플로우 오케스트레이션 실전 가이드: KFP v2 SDK부터 프로덕션 배포까지
- Authors
- Name
- 들어가며
- Kubeflow Pipelines 아키텍처
- KFP v2 SDK 기본 사용법
- 고급 파이프라인 패턴
- 파이프라인 캐싱과 아티팩트 관리
- 워크플로우 오케스트레이션 도구 비교
- Multi-Step ML 파이프라인 실전 예제
- Kubernetes 리소스 관리
- 운영 시 주의사항
- 장애 사례와 복구 절차
- 프로덕션 체크리스트
- 참고자료

들어가며
ML 프로젝트가 프로덕션 규모로 성장하면, 데이터 전처리에서 모델 학습, 평가, 배포까지의 전체 워크플로우를 안정적으로 관리하는 것이 핵심 과제가 된다. Jupyter Notebook에서의 실험은 재현성이 떨어지고, 수동 스크립트 실행은 오류가 발생하기 쉽다. 이러한 문제를 해결하기 위해 ML 파이프라인 오케스트레이션 도구가 필요하다.
Kubeflow Pipelines(KFP) 는 Google이 주도하는 오픈소스 프로젝트로, Kubernetes 위에서 ML 워크플로우를 정의하고 실행하는 플랫폼이다. 각 단계를 독립적인 컨테이너로 실행하여 재현성을 보장하고, 파이프라인 버전 관리와 실험 추적을 지원한다. 이 글에서는 KFP v2 SDK의 아키텍처부터 실전 파이프라인 구축, 프로덕션 운영 전략까지를 상세히 다룬다.
Kubeflow Pipelines 아키텍처
핵심 컴포넌트 구조
Kubeflow Pipelines는 여러 마이크로서비스로 구성된다.
| 컴포넌트 | 역할 | 기술 스택 |
|---|---|---|
| Pipeline Service | 파이프라인 CRUD, 실행 관리 | gRPC/REST API |
| Metadata Service | 아티팩트 및 실행 메타데이터 저장 | ML Metadata (MLMD) |
| Persistence Agent | 워크플로우 상태를 DB에 동기화 | Kubernetes Controller |
| Scheduler | 반복 실행(Recurring Run) 관리 | CronJob 기반 |
| UI Server | 웹 대시보드 | React 기반 SPA |
| Artifact Store | 파이프라인 산출물 저장 | MinIO / S3 / GCS |
KFP v2 아키텍처 변경사항
KFP v2는 v1 대비 근본적인 아키텍처 변경이 이루어졌다. 기존 Argo Workflows 의존성을 제거하고, 자체 워크플로우 엔진을 도입했다.
# KFP v2 vs v1 주요 차이점 비교
"""
KFP v1:
- Argo Workflows 기반 실행
- kfp.dsl.ContainerOp 사용
- YAML 기반 파이프라인 정의 가능
KFP v2:
- 자체 워크플로우 엔진 (또는 Argo 선택 가능)
- kfp.dsl.component 데코레이터 사용
- IR (Intermediate Representation) YAML 도입
- ML Metadata 네이티브 통합
- 타입 안전한 컴포넌트 인터페이스
"""
# v2 아키텍처 레이어
ARCHITECTURE_LAYERS = {
"SDK Layer": "Python DSL로 파이프라인 정의 (kfp.dsl)",
"IR Layer": "플랫폼 독립적 중간 표현 (PipelineSpec YAML)",
"Backend Layer": "파이프라인 실행 및 관리 (API Server)",
"Runtime Layer": "컨테이너 오케스트레이션 (K8s Pod)",
"Metadata Layer": "실행 이력 및 아티팩트 추적 (MLMD)",
}
KFP v2 SDK 기본 사용법
컴포넌트 작성
KFP v2에서 컴포넌트는 @component 데코레이터로 정의한다. 각 컴포넌트는 독립적인 컨테이너에서 실행된다.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
# 경량 Python 컴포넌트 (의존성이 적은 경우)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"],
)
def preprocess_data(
raw_data_path: str,
test_size: float,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
metrics: Output[Metrics],
):
"""데이터 전처리 컴포넌트"""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# 결측값 처리
df = df.dropna(subset=["target"])
df = df.fillna(df.median(numeric_only=True))
# 학습/테스트 분리
train_df, test_df = train_test_split(
df, test_size=test_size, random_state=42, stratify=df["target"]
)
# 아티팩트로 저장
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)
# 메트릭 로깅
metrics.log_metric("total_samples", len(df))
metrics.log_metric("train_samples", len(train_df))
metrics.log_metric("test_samples", len(test_df))
metrics.log_metric("feature_count", len(df.columns) - 1)
커스텀 컨테이너 컴포넌트
무거운 의존성이 필요한 경우 커스텀 컨테이너 이미지를 사용한다.
# 커스텀 이미지 기반 컴포넌트
@dsl.component(
base_image="gcr.io/my-project/ml-training:v2.1",
)
def train_model(
train_dataset: Input[Dataset],
model_type: str,
hyperparameters: dict,
trained_model: Output[Model],
metrics: Output[Metrics],
):
"""모델 학습 컴포넌트"""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score
train_df = pd.read_csv(train_dataset.path)
X_train = train_df.drop("target", axis=1)
y_train = train_df["target"]
# 모델 선택
model_map = {
"random_forest": RandomForestClassifier,
"gradient_boosting": GradientBoostingClassifier,
}
model_cls = model_map[model_type]
model = model_cls(**hyperparameters)
model.fit(X_train, y_train)
# 모델 저장
joblib.dump(model, trained_model.path)
# 학습 메트릭
y_pred = model.predict(X_train)
metrics.log_metric("train_accuracy", accuracy_score(y_train, y_pred))
metrics.log_metric("train_f1", f1_score(y_train, y_pred, average="weighted"))
metrics.log_metric("model_type", model_type)
# 모델 메타데이터
trained_model.metadata["framework"] = "scikit-learn"
trained_model.metadata["model_type"] = model_type
파이프라인 정의
컴포넌트를 조합하여 전체 파이프라인을 정의한다.
from kfp import dsl, compiler
@dsl.pipeline(
name="ml-training-pipeline",
description="End-to-end ML training pipeline with evaluation and deployment",
)
def ml_training_pipeline(
raw_data_path: str = "gs://my-bucket/data/raw.csv",
test_size: float = 0.2,
model_type: str = "random_forest",
accuracy_threshold: float = 0.85,
):
# Step 1: 데이터 전처리
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=test_size,
)
preprocess_task.set_cpu_limit("2")
preprocess_task.set_memory_limit("4Gi")
# Step 2: 모델 학습
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
model_type=model_type,
hyperparameters={
"n_estimators": 200,
"max_depth": 10,
"min_samples_split": 5,
},
)
train_task.set_cpu_limit("4")
train_task.set_memory_limit("8Gi")
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(1)
# Step 3: 모델 평가
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold,
)
# Step 4: 조건부 배포 (정확도 임계값 초과 시)
with dsl.Condition(
eval_task.outputs["deploy_decision"] == "approved",
name="check-accuracy",
):
deploy_task = deploy_model(
model=train_task.outputs["trained_model"],
serving_endpoint="ml-model-serving",
)
# 파이프라인 컴파일
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path="ml_pipeline.yaml",
)
고급 파이프라인 패턴
병렬 실행과 조건 분기
@dsl.pipeline(name="parallel-training-pipeline")
def parallel_training_pipeline(
raw_data_path: str,
accuracy_threshold: float = 0.85,
):
# 데이터 전처리 (공통)
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=0.2,
)
# 여러 모델 병렬 학습
models = ["random_forest", "gradient_boosting", "xgboost"]
train_tasks = []
for model_type in models:
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
model_type=model_type,
hyperparameters={"n_estimators": 200, "max_depth": 10},
)
train_task.set_display_name(f"Train {model_type}")
train_tasks.append(train_task)
# 최적 모델 선택
select_task = select_best_model(
models=[t.outputs["trained_model"] for t in train_tasks],
metrics=[t.outputs["metrics"] for t in train_tasks],
)
# 챔피언 모델 배포
with dsl.Condition(
select_task.outputs["best_accuracy"] >= accuracy_threshold,
name="accuracy-gate",
):
deploy_model(
model=select_task.outputs["best_model"],
serving_endpoint="champion-model",
)
반복 실행과 Exit Handler
@dsl.pipeline(name="robust-ml-pipeline")
def robust_ml_pipeline(raw_data_path: str):
# Exit Handler: 파이프라인 완료/실패 시 알림 전송
notify_task = send_notification(
pipeline_name="robust-ml-pipeline",
notification_channel="slack",
)
with dsl.ExitHandler(exit_task=notify_task):
# 메인 파이프라인 로직
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=0.2,
)
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
model_type="gradient_boosting",
hyperparameters={"n_estimators": 300, "max_depth": 12},
)
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=0.85,
)
# Recurring Run 설정 (KFP 클라이언트)
from kfp.client import Client
client = Client(host="https://kubeflow.example.com/pipeline")
# 매일 오전 2시에 파이프라인 실행
client.create_recurring_run(
experiment_id="daily-training-exp",
job_name="daily-model-retraining",
pipeline_id="robust-ml-pipeline-v2",
cron_expression="0 2 * * *",
max_concurrency=1,
params={
"raw_data_path": "gs://my-bucket/data/daily/latest.csv",
},
)
파이프라인 캐싱과 아티팩트 관리
캐싱 전략
KFP는 컴포넌트 입력이 동일하면 이전 실행 결과를 재사용하는 캐싱 기능을 지원한다.
# 캐싱 설정
@dsl.pipeline(name="cached-pipeline")
def cached_pipeline(raw_data_path: str):
# 캐싱 활성화 (기본값: True)
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=0.2,
)
preprocess_task.set_caching_options(enable_caching=True)
# 학습 단계는 캐싱 비활성화 (최신 데이터로 항상 재학습)
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
model_type="random_forest",
hyperparameters={"n_estimators": 200},
)
train_task.set_caching_options(enable_caching=False)
아티팩트 타입과 관리
from kfp.dsl import (
Input, Output,
Dataset, Model, Metrics,
ClassificationMetrics, SlicedClassifications,
Artifact, HTML, Markdown,
)
@dsl.component(base_image="python:3.11-slim")
def generate_evaluation_report(
test_dataset: Input[Dataset],
trained_model: Input[Model],
classification_metrics: Output[ClassificationMetrics],
html_report: Output[HTML],
eval_metrics: Output[Metrics],
):
"""평가 보고서 생성 컴포넌트"""
import json
# ClassificationMetrics: 혼동 행렬 시각화
classification_metrics.log_confusion_matrix(
categories=["negative", "positive"],
matrix=[[850, 50], [30, 270]],
)
# ROC 커브 로깅
classification_metrics.log_roc_curve(
fpr=[0.0, 0.1, 0.2, 0.5, 1.0],
tpr=[0.0, 0.6, 0.8, 0.95, 1.0],
threshold=[1.0, 0.8, 0.5, 0.2, 0.0],
)
# HTML 리포트 생성
report_content = "<h1>Model Evaluation Report</h1>"
report_content += "<p>Accuracy: 0.933</p>"
report_content += "<p>F1 Score: 0.891</p>"
with open(html_report.path, "w") as f:
f.write(report_content)
# 수치 메트릭
eval_metrics.log_metric("accuracy", 0.933)
eval_metrics.log_metric("f1_score", 0.891)
eval_metrics.log_metric("precision", 0.844)
eval_metrics.log_metric("recall", 0.900)
워크플로우 오케스트레이션 도구 비교
ML 워크플로우 오케스트레이션에 사용할 수 있는 도구는 여러 가지가 있다. 프로젝트 요구사항에 따라 적절한 도구를 선택해야 한다.
| 특성 | Kubeflow Pipelines | Apache Airflow | Argo Workflows | Prefect |
|---|---|---|---|---|
| 주요 용도 | ML 파이프라인 전용 | 범용 데이터 파이프라인 | 범용 워크플로우 | 범용 데이터 파이프라인 |
| 실행 환경 | Kubernetes 필수 | 다양한 Executor | Kubernetes 필수 | 하이브리드 (서버/클라우드) |
| ML 네이티브 | 높음 (MLMD, 아티팩트) | 낮음 (플러그인 필요) | 중간 | 중간 |
| UI/시각화 | ML 실험 대시보드 | DAG 모니터링 | 워크플로우 시각화 | 플로우 대시보드 |
| 캐싱 | 컴포넌트 레벨 캐싱 | 태스크 레벨 캐싱 | Memoization | 태스크 레벨 캐싱 |
| 스케일링 | Kubernetes 네이티브 | Celery/K8s Executor | Kubernetes 네이티브 | Dask/Ray 통합 |
| 학습 곡선 | 높음 | 중간 | 높음 | 낮음 |
| 커뮤니티 | 활발 (CNCF) | 매우 활발 (Apache) | 활발 (CNCF) | 성장 중 |
| GPU 지원 | 네이티브 | 제한적 | 네이티브 | 외부 통합 필요 |
선택 기준
- Kubeflow Pipelines: Kubernetes 인프라가 있고 ML 전용 파이프라인이 필요한 경우
- Airflow: 데이터 엔지니어링과 ML을 함께 관리하고 성숙한 에코시스템이 필요한 경우
- Argo Workflows: Kubernetes 네이티브 범용 워크플로우가 필요하고 ML 특화 기능은 직접 구현 가능한 경우
- Prefect: 빠른 시작과 유연한 배포 환경이 필요한 경우
Multi-Step ML 파이프라인 실전 예제
전체 파이프라인: 데이터 준비에서 배포까지
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.1.4", "great-expectations==0.18.8"],
)
def validate_data(
raw_data_path: str,
validated_data: Output[Dataset],
validation_metrics: Output[Metrics],
) -> str:
"""데이터 품질 검증"""
import pandas as pd
df = pd.read_csv(raw_data_path)
# 기본 데이터 품질 검사
checks = {
"row_count_check": len(df) > 100,
"null_ratio_check": df.isnull().mean().max() < 0.3,
"duplicate_check": df.duplicated().mean() < 0.05,
"target_balance_check": df["target"].value_counts(normalize=True).min() > 0.1,
}
all_passed = all(checks.values())
for check_name, passed in checks.items():
validation_metrics.log_metric(check_name, int(passed))
validation_metrics.log_metric("total_rows", len(df))
validation_metrics.log_metric("all_checks_passed", int(all_passed))
if all_passed:
df.to_csv(validated_data.path, index=False)
return "passed"
else:
failed = [k for k, v in checks.items() if not v]
raise ValueError(f"Data validation failed: {failed}")
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"],
)
def feature_engineering(
validated_data: Input[Dataset],
feature_config: dict,
features_dataset: Output[Dataset],
feature_metrics: Output[Metrics],
):
"""피처 엔지니어링"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
df = pd.read_csv(validated_data.path)
# 수치형 피처 스케일링
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
numeric_cols = [c for c in numeric_cols if c != "target"]
scaler = StandardScaler()
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
# 범주형 피처 인코딩
cat_cols = df.select_dtypes(include=["object"]).columns.tolist()
for col in cat_cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
df.to_csv(features_dataset.path, index=False)
feature_metrics.log_metric("numeric_features", len(numeric_cols))
feature_metrics.log_metric("categorical_features", len(cat_cols))
feature_metrics.log_metric("total_features", len(df.columns) - 1)
@dsl.component(
base_image="gcr.io/my-project/ml-serving:v1.0",
)
def deploy_to_kserve(
model: Input[Model],
serving_endpoint: str,
namespace: str,
) -> str:
"""KServe에 모델 배포"""
import subprocess
import json
import yaml
inference_service = {
"apiVersion": "serving.kserve.io/v1beta1",
"kind": "InferenceService",
"metadata": {
"name": serving_endpoint,
"namespace": namespace,
},
"spec": {
"predictor": {
"model": {
"modelFormat": {"name": "sklearn"},
"storageUri": model.uri,
"resources": {
"requests": {"cpu": "1", "memory": "2Gi"},
"limits": {"cpu": "2", "memory": "4Gi"},
},
}
}
},
}
manifest_path = "/tmp/isvc.yaml"
with open(manifest_path, "w") as f:
yaml.dump(inference_service, f)
result = subprocess.run(
["kubectl", "apply", "-f", manifest_path],
capture_output=True, text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Deploy failed: {result.stderr}")
return f"Deployed to {namespace}/{serving_endpoint}"
@dsl.pipeline(
name="e2e-ml-pipeline",
description="데이터 검증부터 모델 배포까지 전체 ML 파이프라인",
)
def e2e_ml_pipeline(
raw_data_path: str = "gs://ml-data/raw/dataset.csv",
model_type: str = "gradient_boosting",
accuracy_threshold: float = 0.85,
serving_endpoint: str = "fraud-detector",
namespace: str = "ml-serving",
):
# 알림을 위한 Exit Handler
notify = send_notification(
pipeline_name="e2e-ml-pipeline",
notification_channel="slack",
)
with dsl.ExitHandler(exit_task=notify):
# 1. 데이터 검증
validate_task = validate_data(raw_data_path=raw_data_path)
# 2. 피처 엔지니어링
feature_task = feature_engineering(
validated_data=validate_task.outputs["validated_data"],
feature_config={"scaling": "standard", "encoding": "label"},
)
# 3. 데이터 분할
split_task = preprocess_data(
raw_data_path=feature_task.outputs["features_dataset"].uri,
test_size=0.2,
)
# 4. 모델 학습
train_task = train_model(
train_dataset=split_task.outputs["train_dataset"],
model_type=model_type,
hyperparameters={"n_estimators": 300, "max_depth": 12},
)
train_task.set_cpu_limit("4")
train_task.set_memory_limit("16Gi")
# 5. 모델 평가
eval_task = evaluate_model(
test_dataset=split_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold,
)
# 6. 조건부 배포
with dsl.Condition(
eval_task.outputs["deploy_decision"] == "approved",
name="deploy-gate",
):
deploy_to_kserve(
model=train_task.outputs["trained_model"],
serving_endpoint=serving_endpoint,
namespace=namespace,
)
# 컴파일 및 제출
compiler.Compiler().compile(
pipeline_func=e2e_ml_pipeline,
package_path="e2e_ml_pipeline.yaml",
)
Kubernetes 리소스 관리
Pod 리소스 및 노드 어피니티 설정
@dsl.pipeline(name="resource-managed-pipeline")
def resource_managed_pipeline():
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
model_type="xgboost",
hyperparameters={"n_estimators": 500},
)
# 리소스 제한
train_task.set_cpu_limit("8")
train_task.set_memory_limit("32Gi")
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(2)
# 노드 셀렉터 (GPU 노드에서 실행)
train_task.add_node_selector_constraint(
label_name="cloud.google.com/gke-accelerator",
value="nvidia-tesla-v100",
)
# Toleration 설정
train_task.set_gpu_limit(2).add_toleration(
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule",
)
# PVC 마운트 (대용량 데이터)
train_task.add_pvolumes({
"/mnt/data": dsl.PipelineVolume(
pvc="ml-data-pvc",
volume_name="data-volume",
),
})
# 타임아웃 설정 (초 단위)
train_task.set_timeout(3600) # 1시간
# 재시도 설정
train_task.set_retry(
num_retries=3,
policy="Always",
backoff_duration="30s",
backoff_factor=2.0,
backoff_max_duration="600s",
)
운영 시 주의사항
리소스 관련 주의점
- 메모리 OOM: 대용량 데이터셋을 처리하는 컴포넌트는 충분한 메모리를 할당해야 한다. Pandas의
read_csv는 데이터 크기의 3-5배 메모리를 소비한다. - GPU 리소스 경합: 여러 파이프라인이 동시에 GPU를 요청하면 Pending 상태가 길어진다. ResourceQuota와 PriorityClass를 설정하라.
- PVC 동시 접근: ReadWriteOnce PVC는 하나의 Pod만 마운트 가능하다. 병렬 컴포넌트가 동일 PVC에 접근하면 실패한다.
보안 관련 주의점
- 시크릿 관리: 파이프라인 파라미터에 API 키나 비밀번호를 직접 전달하지 마라. Kubernetes Secret을 환경 변수로 마운트하라.
- 이미지 취약점: 베이스 이미지의 보안 취약점을 정기적으로 스캔하라.
python:3.11-slim대신distroless이미지 사용을 고려하라. - RBAC 설정: 파이프라인 서비스 계정에 최소 권한 원칙을 적용하라.
장애 사례와 복구 절차
사례 1: Pod OOMKilled
증상: 컴포넌트 Pod이 OOMKilled 상태로 실패
# Pod 상태 확인
kubectl get pods -n kubeflow -l pipeline/runid=run-abc123
kubectl describe pod train-model-xxxxx -n kubeflow
# 이벤트에서 OOMKilled 확인
# Last State: Terminated
# Reason: OOMKilled
# Exit Code: 137
복구 절차:
# 1. 메모리 제한 증가
train_task.set_memory_limit("64Gi")
# 2. 데이터를 청크 단위로 처리하도록 컴포넌트 수정
@dsl.component(base_image="python:3.11-slim")
def train_with_chunks(
train_dataset: Input[Dataset],
chunk_size: int,
trained_model: Output[Model],
):
import pandas as pd
from sklearn.linear_model import SGDClassifier
model = SGDClassifier(loss="log_loss")
chunks = pd.read_csv(train_dataset.path, chunksize=chunk_size)
for chunk in chunks:
X = chunk.drop("target", axis=1)
y = chunk["target"]
model.partial_fit(X, y, classes=[0, 1])
import joblib
joblib.dump(model, trained_model.path)
사례 2: 파이프라인 버전 충돌
증상: 파이프라인 업데이트 후 기존 Recurring Run이 실패
복구 절차:
from kfp.client import Client
client = Client(host="https://kubeflow.example.com/pipeline")
# 1. 기존 Recurring Run 비활성화
client.disable_recurring_run(recurring_run_id="run-xxx")
# 2. 새 파이프라인 버전 업로드
pipeline_version = client.upload_pipeline_version(
pipeline_package_path="ml_pipeline_v3.yaml",
pipeline_version_name="v3.0",
pipeline_id="ml-training-pipeline",
)
# 3. 새 Recurring Run 생성
client.create_recurring_run(
experiment_id="daily-training-exp",
job_name="daily-model-retraining-v3",
version_id=pipeline_version.pipeline_version_id,
cron_expression="0 2 * * *",
max_concurrency=1,
)
사례 3: 메타데이터 DB 연결 실패
증상: ML Metadata Service 연결 오류로 아티팩트 추적 실패
# MLMD 서비스 상태 확인
kubectl get pods -n kubeflow -l app=metadata-grpc-server
kubectl logs metadata-grpc-server-xxxxx -n kubeflow
# MySQL/PostgreSQL 연결 확인
kubectl exec -it metadata-grpc-server-xxxxx -n kubeflow -- \
mysql -h metadata-db -u root -p -e "SHOW DATABASES;"
# MLMD 서비스 재시작
kubectl rollout restart deployment metadata-grpc-server -n kubeflow
프로덕션 체크리스트
인프라 설정
- Kubernetes 클러스터에 Kubeflow Pipelines 설치 및 버전 확인 (KFP v2 권장)
- Artifact Store (MinIO/S3/GCS) 설정 및 접근 권한 확인
- Metadata DB (MySQL/PostgreSQL) 고가용성 구성
- RBAC 및 네임스페이스 격리 설정
- GPU 노드풀 및 오토스케일링 구성
파이프라인 개발
- 모든 컴포넌트에 리소스 제한(CPU/Memory/GPU) 설정
- 재시도 정책 및 타임아웃 설정
- 캐싱 전략 수립 (어떤 스텝을 캐싱할지 결정)
- 파이프라인 파라미터 기본값 설정
- 데이터 검증 컴포넌트 포함
운영 및 모니터링
- Recurring Run 설정 및 동시성 제한
- 파이프라인 실패 알림 (Slack/PagerDuty) 설정
- 아티팩트 스토리지 용량 모니터링
- MLMD 백업 스케줄 설정
- 파이프라인 실행 이력 정리 정책 (retention policy)
보안
- 컨테이너 이미지 취약점 스캔 자동화
- Kubernetes Secret으로 민감 정보 관리
- 네트워크 정책으로 Pod 간 통신 제한
- 서비스 계정에 최소 권한 원칙 적용