Split View: Kubeflow Pipelines v2 ML 워크플로우 자동화와 운영 가이드
Kubeflow Pipelines v2 ML 워크플로우 자동화와 운영 가이드
- 개요
- KFP v2 아키텍처
- KFP SDK v2 파이프라인 정의
- 컴포넌트 타입별 활용
- 아티팩트 관리와 캐싱 전략
- CI/CD 통합
- ML 워크플로우 오케스트레이션 도구 비교
- Kubernetes 확장 기능
- 모니터링과 트러블슈팅
- 프로덕션 운영 체크리스트
- 참고자료

개요
ML 모델을 개발하는 것과 프로덕션에서 안정적으로 운영하는 것은 완전히 다른 문제다. 데이터 전처리, 학습, 평가, 배포까지 이어지는 파이프라인을 수동으로 관리하면 재현성 부족, 실험 추적 실패, 배포 지연 등의 문제가 반복된다. Kubeflow Pipelines(KFP) v2는 Kubernetes 위에서 ML 워크플로우를 선언적으로 정의하고 자동화하는 프레임워크로, Python 데코레이터만으로 복잡한 ML 파이프라인을 구축할 수 있게 해준다.
KFP v2는 v1 대비 크게 개선되었다. 파이프라인 컴파일 결과가 Argo Workflow YAML이 아닌 IR(Intermediate Representation) YAML로 추상화되어 다양한 실행 백엔드를 지원하고, 아티팩트 시스템이 강화되었으며, 타입 안전성이 높아졌다. 이 글에서는 KFP v2의 아키텍처, SDK 활용법, 캐싱 전략, CI/CD 통합, 그리고 프로덕션 운영 트러블슈팅까지 실전 관점에서 다룬다.
KFP v2 아키텍처
핵심 구성 요소
KFP v2의 아키텍처는 다음과 같은 핵심 레이어로 구성된다.
| 구성 요소 | 역할 | 기술 스택 |
|---|---|---|
| KFP SDK | 파이프라인/컴포넌트 정의, 컴파일 | Python (kfp 패키지) |
| IR Compiler | Python DSL을 IR YAML로 변환 | Protocol Buffers 기반 |
| KFP Backend | 파이프라인 실행 관리, API 서버 | Go, gRPC/REST |
| Workflow Engine | 실제 워크플로우 오케스트레이션 | Argo Workflows / Tekton |
| Metadata Store | 실행 메타데이터, 아티팩트 추적 | ML Metadata (MLMD) |
| Artifact Store | 모델, 데이터셋 등 아티팩트 저장 | MinIO / GCS / S3 |
| UI Dashboard | 파이프라인 시각화, 실행 모니터링 | React 기반 웹 UI |
KFP v2에서 가장 큰 변화는 IR YAML 도입이다. v1에서는 파이프라인을 Argo Workflow YAML로 직접 컴파일했기 때문에 Argo에 강하게 결합되어 있었다. v2에서는 IR이라는 중간 표현으로 먼저 컴파일한 후, 각 백엔드 드라이버가 이를 해석하여 실행한다. 이 덕분에 동일한 파이프라인 정의를 Kubeflow 클러스터뿐 아니라 Google Vertex AI Pipelines에서도 실행할 수 있다.
설치와 클러스터 구성
# KFP SDK v2 설치
pip install kfp==2.7.0
# Kubernetes 확장 라이브러리 (GPU, Volume 등 K8s 특화 기능)
pip install kfp-kubernetes==1.2.0
# Kubeflow Pipelines 백엔드 배포 (Kubernetes 클러스터에)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
# 포트 포워딩으로 UI 접근
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
KFP SDK v2 파이프라인 정의
기본 컴포넌트와 파이프라인
KFP v2에서는 @dsl.component와 @dsl.pipeline 데코레이터로 파이프라인을 정의한다. 컴포넌트는 파이프라인의 최소 실행 단위이며, 각 컴포넌트는 독립된 컨테이너에서 실행된다.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]
)
def preprocess_data(
raw_data_path: str,
test_size: float,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
data_stats: Output[Metrics]
):
"""데이터를 로드하고 학습/테스트로 분할한다."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# 결측치 처리
df = df.dropna(subset=["target"])
df = df.fillna(df.median(numeric_only=True))
train_df, test_df = train_test_split(
df, test_size=test_size, random_state=42, stratify=df["target"]
)
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)
# 메트릭 기록
data_stats.log_metric("total_rows", len(df))
data_stats.log_metric("train_rows", len(train_df))
data_stats.log_metric("test_rows", len(test_df))
data_stats.log_metric("feature_count", len(df.columns) - 1)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def train_model(
train_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
trained_model: Output[Model],
training_metrics: Output[Metrics]
):
"""Random Forest 모델을 학습한다."""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
train_df = pd.read_csv(train_dataset.path)
X_train = train_df.drop(columns=["target"])
y_train = train_df["target"]
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# 학습 정확도 기록
train_pred = model.predict(X_train)
training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))
training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))
# 모델 저장
joblib.dump(model, trained_model.path)
trained_model.metadata["framework"] = "sklearn"
trained_model.metadata["model_type"] = "RandomForestClassifier"
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def evaluate_model(
test_dataset: Input[Dataset],
trained_model: Input[Model],
eval_metrics: Output[Metrics],
accuracy_threshold: float = 0.85
) -> bool:
"""테스트 데이터로 모델을 평가하고 임계값을 넘는지 확인한다."""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score, classification_report
test_df = pd.read_csv(test_dataset.path)
X_test = test_df.drop(columns=["target"])
y_test = test_df["target"]
model = joblib.load(trained_model.path)
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
f1 = f1_score(y_test, predictions, average="weighted")
eval_metrics.log_metric("test_accuracy", accuracy)
eval_metrics.log_metric("test_f1", f1)
eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)
return accuracy >= accuracy_threshold
파이프라인 조합과 조건부 실행
@dsl.pipeline(
name="ml-training-pipeline",
description="데이터 전처리 → 학습 → 평가 → 조건부 배포 파이프라인"
)
def ml_training_pipeline(
raw_data_path: str = "gs://my-bucket/data/raw.csv",
test_size: float = 0.2,
n_estimators: int = 100,
max_depth: int = 10,
accuracy_threshold: float = 0.85
):
# 1단계: 데이터 전처리
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=test_size
)
preprocess_task.set_display_name("Data Preprocessing")
# 2단계: 모델 학습
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=n_estimators,
max_depth=max_depth
)
train_task.set_display_name("Model Training")
train_task.set_cpu_limit("4")
train_task.set_memory_limit("8Gi")
# 3단계: 모델 평가
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold
)
eval_task.set_display_name("Model Evaluation")
# 4단계: 조건부 배포 (정확도 임계값 통과 시)
with dsl.If(eval_task.output == True):
deploy_task = deploy_model(
model=train_task.outputs["trained_model"],
model_name="fraud-detector",
serving_endpoint="https://serving.example.com"
)
deploy_task.set_display_name("Model Deployment")
# 파이프라인 컴파일
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path="ml_training_pipeline.yaml"
)
컴포넌트 타입별 활용
KFP v2는 세 가지 컴포넌트 타입을 지원하며, 각각의 사용 시나리오가 다르다.
| 컴포넌트 타입 | 정의 방식 | 적합한 상황 | 장점 | 단점 |
|---|---|---|---|---|
| Lightweight Python | @dsl.component 데코레이터 | 순수 Python 로직, 빠른 프로토타이핑 | 코드와 정의가 한곳에, 빠른 반복 | 외부 파일 참조 불가, 함수 내 import 필수 |
| Container | @dsl.container_component | 기존 Docker 이미지 활용, 비-Python 작업 | 언어 무관, 기존 이미지 재사용 | 아티팩트 타입 제한적 |
| Importer | dsl.importer() | 외부 아티팩트를 파이프라인에 도입 | 기존 아티팩트를 파이프라인 안에서 추적 | 데이터 이동 없음, 메타데이터만 등록 |
Container Component 예시
@dsl.container_component
def run_spark_job(
input_data: Input[Dataset],
output_data: Output[Dataset],
spark_config: str
):
"""Spark 작업을 컨테이너로 실행한다."""
return dsl.ContainerSpec(
image="my-registry/spark-processor:3.5",
command=["spark-submit"],
args=[
"--master", "k8s://https://kubernetes.default.svc",
"--conf", spark_config,
"--input", input_data.path,
"--output", output_data.path,
"/app/etl_job.py"
]
)
# Importer 활용: 외부 모델을 파이프라인에 도입
@dsl.pipeline(name="model-comparison-pipeline")
def comparison_pipeline():
existing_model = dsl.importer(
artifact_uri="gs://models/production/v2.1/model.pkl",
artifact_class=Model,
reimport=False,
metadata={"version": "2.1", "framework": "sklearn"}
)
# 기존 모델과 새 모델을 비교
compare_task = compare_models(
baseline_model=existing_model.output,
candidate_model=train_task.outputs["trained_model"]
)
아티팩트 관리와 캐싱 전략
아티팩트 시스템
KFP v2의 아티팩트 시스템은 ML Metadata(MLMD)를 기반으로 모든 입출력을 추적한다. 주요 아티팩트 타입은 다음과 같다.
| 아티팩트 타입 | 용도 | 예시 |
|---|---|---|
Dataset | 데이터셋 | CSV, Parquet 파일 |
Model | 학습된 모델 | pickle, ONNX, SavedModel |
Metrics | 수치 메트릭 | accuracy, loss, f1-score |
ClassificationMetrics | 분류 메트릭 | confusion matrix, ROC curve |
HTML | HTML 리포트 | 시각화 리포트 |
Markdown | 마크다운 리포트 | 텍스트 기반 리포트 |
Artifact | 범용 아티팩트 | 기타 파일 |
캐싱 전략
KFP v2는 컴포넌트 수준에서 자동 캐싱을 지원한다. 동일한 입력과 코드로 실행된 컴포넌트의 결과를 재사용하여 실행 시간을 크게 절약할 수 있다.
@dsl.pipeline(name="caching-example")
def caching_pipeline(data_path: str, retrain: bool = False):
# 데이터 전처리는 캐싱 활성화 (동일 데이터면 재사용)
preprocess_task = preprocess_data(raw_data_path=data_path)
preprocess_task.set_caching_options(True)
# 학습은 retrain 플래그에 따라 캐싱 제어
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=200,
max_depth=15
)
if retrain:
train_task.set_caching_options(False) # 강제 재학습
# 외부 API 호출 컴포넌트는 캐싱 비활성화
deploy_task = deploy_model(model=train_task.outputs["trained_model"])
deploy_task.set_caching_options(False) # 항상 새로 실행
캐싱 운영 시 주의할 점이 있다. 첫째, 컴포넌트 코드가 순수 함수(같은 입력이면 같은 출력)여야 캐싱이 의미 있다. 둘째, KFP v2 SDK에서는 캐시 만료 시간 설정이 지원되지 않으므로, 시간에 민감한 데이터를 다루는 컴포넌트는 캐싱을 비활성화해야 한다. 셋째, 환경 변수 KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULT를 true로 설정하면 기본적으로 모든 파이프라인에서 캐싱이 비활성화된다.
CI/CD 통합
GitHub Actions로 파이프라인 자동 배포
# .github/workflows/kfp-deploy.yaml
name: KFP Pipeline CI/CD
on:
push:
branches: [main]
paths:
- 'pipelines/**'
- 'components/**'
env:
KFP_HOST: ${{ secrets.KFP_HOST }}
KFP_NAMESPACE: kubeflow
jobs:
validate-and-compile:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install kfp==2.7.0 kfp-kubernetes==1.2.0
pip install pytest
- name: Lint pipeline code
run: |
pip install ruff
ruff check pipelines/ components/
- name: Run unit tests
run: pytest tests/unit/ -v
- name: Compile pipeline
run: |
python -c "
from pipelines.training_pipeline import ml_training_pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path='compiled_pipeline.yaml'
)
print('Pipeline compiled successfully')
"
- name: Upload compiled pipeline
uses: actions/upload-artifact@v4
with:
name: compiled-pipeline
path: compiled_pipeline.yaml
deploy-pipeline:
needs: validate-and-compile
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Download compiled pipeline
uses: actions/download-artifact@v4
with:
name: compiled-pipeline
- name: Deploy to KFP
run: |
pip install kfp==2.7.0
python -c "
from kfp.client import Client
client = Client(host='${KFP_HOST}')
# 파이프라인 업로드 또는 업데이트
try:
pipeline = client.upload_pipeline(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
description='Automated ML training pipeline'
)
print(f'Pipeline uploaded: {pipeline.pipeline_id}')
except Exception:
# 이미 존재하면 새 버전으로 업로드
pipeline = client.upload_pipeline_version(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
pipeline_version_name='v${GITHUB_SHA::7}'
)
print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')
"
프로그래밍 방식 파이프라인 실행
from kfp.client import Client
def trigger_pipeline_run(
host: str,
pipeline_name: str,
experiment_name: str,
params: dict
) -> str:
"""프로그래밍 방식으로 파이프라인 실행을 트리거한다."""
client = Client(host=host)
# 실험 생성 또는 조회
experiment = client.create_experiment(
name=experiment_name,
namespace="kubeflow"
)
# 파이프라인 조회
pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')
if not pipelines.pipelines:
raise ValueError(f"Pipeline '{pipeline_name}' not found")
pipeline_id = pipelines.pipelines[0].pipeline_id
# 실행 생성
run = client.run_pipeline(
experiment_id=experiment.experiment_id,
job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",
pipeline_id=pipeline_id,
params=params
)
print(f"Run created: {run.run_id}")
print(f"Monitor at: {host}/#/runs/details/{run.run_id}")
return run.run_id
# 반복 스케줄 실행 설정
def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):
"""매일 자정에 파이프라인을 실행하는 스케줄을 생성한다."""
recurring_run = client.create_recurring_run(
experiment_id=experiment_id,
job_name="daily-training",
pipeline_id=pipeline_id,
params={"raw_data_path": "gs://data/daily/latest.csv"},
cron_expression="0 0 * * *",
max_concurrency=1,
enabled=True
)
return recurring_run
ML 워크플로우 오케스트레이션 도구 비교
| 항목 | KFP v2 | Apache Airflow | Prefect | Vertex AI Pipelines |
|---|---|---|---|---|
| 주 용도 | ML 파이프라인 전용 | 범용 데이터 워크플로우 | 범용 워크플로우 | 관리형 ML 파이프라인 |
| 인프라 | Kubernetes 필수 | 독립 실행 가능 | 독립 실행 / 클라우드 | Google Cloud 관리형 |
| 파이프라인 정의 | Python 데코레이터 | Python (DAG 클래스) | Python 데코레이터 | KFP SDK (동일) |
| 아티팩트 추적 | ML Metadata (내장) | XCom (제한적) | 외부 연동 필요 | Vertex ML Metadata |
| 실험 관리 | 내장 | 미지원 (MLflow 연동) | 미지원 | 내장 |
| 캐싱 | 컴포넌트 수준 자동 | 태스크 수준 수동 | 태스크 수준 내장 | 컴포넌트 수준 자동 |
| GPU 지원 | Kubernetes 네이티브 | K8s Executor 필요 | Kubernetes 연동 | 자동 |
| UI | 파이프라인 시각화 내장 | 웹 UI 내장 | Prefect Cloud UI | Google Cloud Console |
| 확장성 | Kubernetes 스케일링 | Celery/K8s Executor | Dask/Ray 연동 | 자동 스케일링 |
| 러닝 커브 | 높음 (K8s 지식 필요) | 중간 | 낮음 | 중간 (GCP 종속) |
| 비용 | 인프라 자체 운영 | 인프라 자체 운영 | Prefect Cloud 유료 | 사용량 기반 과금 |
선택 기준 요약: Kubernetes 기반 ML 전용 파이프라인이 필요하면 KFP v2, 데이터 엔지니어링과 ML을 통합하려면 Airflow, 빠르게 시작하고 싶다면 Prefect, Google Cloud에 올인했다면 Vertex AI Pipelines가 적합하다.
Kubernetes 확장 기능
KFP v2는 kfp-kubernetes 확장 라이브러리를 통해 Kubernetes 특화 기능을 지원한다.
from kfp import dsl
from kfp import kubernetes
@dsl.pipeline(name="gpu-training-pipeline")
def gpu_training_pipeline():
train_task = train_deep_learning_model(
dataset_path="gs://data/training",
epochs=50,
batch_size=64
)
# GPU 리소스 할당
kubernetes.add_node_selector(
train_task,
label_key="accelerator",
label_value="nvidia-a100"
)
kubernetes.add_toleration(
train_task,
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule"
)
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(2)
train_task.set_cpu_limit("16")
train_task.set_memory_limit("64Gi")
# Secret 마운트 (모델 레지스트리 인증 정보)
kubernetes.use_secret_as_env(
train_task,
secret_name="model-registry-credentials",
secret_key_to_env={
"username": "REGISTRY_USERNAME",
"password": "REGISTRY_PASSWORD"
}
)
# PVC 마운트 (공유 데이터 볼륨)
kubernetes.mount_pvc(
train_task,
pvc_name="shared-data-pvc",
mount_path="/mnt/shared-data"
)
모니터링과 트러블슈팅
일반적인 문제와 해결 방법
문제 1: 파이프라인 컴파일은 성공하지만 실행이 실패하는 경우
가장 흔한 원인은 컴포넌트 내부에서 사용하는 패키지가 packages_to_install에 누락된 것이다. @dsl.component 데코레이터의 Lightweight Python 컴포넌트는 격리된 환경에서 실행되므로, 함수 내부에서 사용하는 모든 import를 함수 본문 안에 작성하고 필요한 패키지를 명시해야 한다.
문제 2: 캐싱이 예상대로 동작하지 않는 경우
캐싱은 컴포넌트의 입력 파라미터, 컴포넌트 코드, 베이스 이미지를 기반으로 캐시 키를 생성한다. 코드를 변경했는데도 캐시가 히트되면, 컴포넌트 코드가 실제로 변경되지 않았거나 캐시 키에 포함되지 않는 부분만 변경한 것이다. 파이프라인을 다시 컴파일하고 업로드했는지 확인하자.
문제 3: OOM(Out of Memory)으로 Pod가 종료되는 경우
대규모 데이터를 처리하는 컴포넌트에서 자주 발생한다. set_memory_limit()과 set_memory_request()를 설정하되, request는 limit의 80% 정도로 설정하는 것이 좋다. 데이터를 청크 단위로 처리하거나, 필요 없는 변수를 명시적으로 del하는 것도 도움이 된다.
문제 4: 파이프라인 실행이 Pending 상태에서 멈추는 경우
Kubernetes 노드의 리소스 부족이 원인인 경우가 많다. kubectl describe pod 명령으로 이벤트를 확인하고, 노드 오토스케일러 설정을 점검하자. GPU 노드의 경우 노드 풀 최대 크기를 확인해야 한다.
문제 5: ParallelFor 내부 아티팩트가 덮어씌워지는 경우
KFP v2의 알려진 이슈(GitHub Issue #10186)로, ParallelFor나 Sub-DAG 내부에서 동시에 실행되는 컴포넌트의 아티팩트가 충돌할 수 있다. 아티팩트 경로에 고유 식별자를 포함시키거나, 컴포넌트 내부에서 고유한 파일명을 생성하여 우회한다.
로그 확인과 디버깅
# 파이프라인 실행의 전체 로그 확인
kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers
# 특정 컴포넌트의 로그 확인
kubectl logs -n kubeflow <pod-name> -c main
# Argo Workflow 상태 확인
kubectl get workflows -n kubeflow
kubectl describe workflow <workflow-name> -n kubeflow
# ML Metadata 직접 조회 (디버깅용)
kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080
프로덕션 운영 체크리스트
프로덕션 환경에서 KFP v2를 운영할 때 다음 사항들을 점검해야 한다.
- 리소스 관리: 모든 컴포넌트에 CPU/메모리 request와 limit를 설정한다. GPU 컴포넌트는
tolerations과nodeSelector를 명시한다. - 재시도 정책: 일시적 오류에 대비해
task.set_retry(num_retries=3, backoff_duration="60s")를 설정한다. - 타임아웃: 장시간 실행되는 파이프라인에
timeout설정으로 리소스 낭비를 방지한다. - 네임스페이스 분리: 개발/스테이징/프로덕션 파이프라인을 별도 네임스페이스로 분리한다.
- 아티팩트 정리: 오래된 실행의 아티팩트를 정기적으로 정리하는 CronJob을 설정한다. MinIO/S3의 Lifecycle Policy를 활용한다.
- 모니터링 연동: Prometheus + Grafana로 파이프라인 실행 시간, 성공률, 리소스 사용량을 모니터링한다.
- 알림 설정: 파이프라인 실패 시 Slack/PagerDuty로 알림을 보내는 Webhook을 연동한다.
참고자료
Kubeflow Pipelines v2 ML Workflow Automation and Operations Guide
- Overview
- KFP v2 Architecture
- KFP SDK v2 Pipeline Definition
- Component Types and Usage
- Artifact Management and Caching Strategies
- CI/CD Integration
- ML Workflow Orchestration Tool Comparison
- Kubernetes Extension Features
- Monitoring and Troubleshooting
- Production Operations Checklist
- References
- Quiz

Overview
Developing an ML model and operating it reliably in production are entirely different problems. Manually managing a pipeline that spans data preprocessing, training, evaluation, and deployment leads to recurring issues such as lack of reproducibility, failed experiment tracking, and deployment delays. Kubeflow Pipelines (KFP) v2 is a framework that declaratively defines and automates ML workflows on Kubernetes, allowing you to build complex ML pipelines using just Python decorators.
KFP v2 is a major improvement over v1. Pipeline compilation results are now abstracted into IR (Intermediate Representation) YAML instead of Argo Workflow YAML, supporting various execution backends. The artifact system has been strengthened, and type safety has been improved. This article covers KFP v2's architecture, SDK usage, caching strategies, CI/CD integration, and production troubleshooting from a practical perspective.
KFP v2 Architecture
Core Components
The KFP v2 architecture consists of the following core layers.
| Component | Role | Tech Stack |
|---|---|---|
| KFP SDK | Pipeline/component definition, compilation | Python (kfp package) |
| IR Compiler | Converts Python DSL to IR YAML | Protocol Buffers based |
| KFP Backend | Pipeline execution management, API server | Go, gRPC/REST |
| Workflow Engine | Actual workflow orchestration | Argo Workflows / Tekton |
| Metadata Store | Execution metadata, artifact tracking | ML Metadata (MLMD) |
| Artifact Store | Stores models, datasets, and other artifacts | MinIO / GCS / S3 |
| UI Dashboard | Pipeline visualization, execution monitoring | React-based web UI |
The biggest change in KFP v2 is the introduction of IR YAML. In v1, pipelines were compiled directly into Argo Workflow YAML, creating tight coupling with Argo. In v2, pipelines are first compiled into an intermediate representation called IR, which is then interpreted and executed by each backend driver. This means the same pipeline definition can run not only on a Kubeflow cluster but also on Google Vertex AI Pipelines.
Installation and Cluster Setup
# KFP SDK v2 installation
pip install kfp==2.7.0
# Kubernetes extension library (GPU, Volume, and other K8s-specific features)
pip install kfp-kubernetes==1.2.0
# Kubeflow Pipelines backend deployment (on Kubernetes cluster)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
# Access UI via port forwarding
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
KFP SDK v2 Pipeline Definition
Basic Components and Pipelines
In KFP v2, pipelines are defined using the @dsl.component and @dsl.pipeline decorators. A component is the smallest execution unit in a pipeline, and each component runs in an isolated container.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]
)
def preprocess_data(
raw_data_path: str,
test_size: float,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
data_stats: Output[Metrics]
):
"""Load data and split into train/test sets."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# Handle missing values
df = df.dropna(subset=["target"])
df = df.fillna(df.median(numeric_only=True))
train_df, test_df = train_test_split(
df, test_size=test_size, random_state=42, stratify=df["target"]
)
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)
# Log metrics
data_stats.log_metric("total_rows", len(df))
data_stats.log_metric("train_rows", len(train_df))
data_stats.log_metric("test_rows", len(test_df))
data_stats.log_metric("feature_count", len(df.columns) - 1)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def train_model(
train_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
trained_model: Output[Model],
training_metrics: Output[Metrics]
):
"""Train a Random Forest model."""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
train_df = pd.read_csv(train_dataset.path)
X_train = train_df.drop(columns=["target"])
y_train = train_df["target"]
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Log training accuracy
train_pred = model.predict(X_train)
training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))
training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))
# Save model
joblib.dump(model, trained_model.path)
trained_model.metadata["framework"] = "sklearn"
trained_model.metadata["model_type"] = "RandomForestClassifier"
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def evaluate_model(
test_dataset: Input[Dataset],
trained_model: Input[Model],
eval_metrics: Output[Metrics],
accuracy_threshold: float = 0.85
) -> bool:
"""Evaluate the model on test data and check if it passes the threshold."""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score, classification_report
test_df = pd.read_csv(test_dataset.path)
X_test = test_df.drop(columns=["target"])
y_test = test_df["target"]
model = joblib.load(trained_model.path)
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
f1 = f1_score(y_test, predictions, average="weighted")
eval_metrics.log_metric("test_accuracy", accuracy)
eval_metrics.log_metric("test_f1", f1)
eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)
return accuracy >= accuracy_threshold
Pipeline Composition and Conditional Execution
@dsl.pipeline(
name="ml-training-pipeline",
description="Data preprocessing -> Training -> Evaluation -> Conditional deployment pipeline"
)
def ml_training_pipeline(
raw_data_path: str = "gs://my-bucket/data/raw.csv",
test_size: float = 0.2,
n_estimators: int = 100,
max_depth: int = 10,
accuracy_threshold: float = 0.85
):
# Step 1: Data preprocessing
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=test_size
)
preprocess_task.set_display_name("Data Preprocessing")
# Step 2: Model training
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=n_estimators,
max_depth=max_depth
)
train_task.set_display_name("Model Training")
train_task.set_cpu_limit("4")
train_task.set_memory_limit("8Gi")
# Step 3: Model evaluation
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold
)
eval_task.set_display_name("Model Evaluation")
# Step 4: Conditional deployment (if accuracy threshold is met)
with dsl.If(eval_task.output == True):
deploy_task = deploy_model(
model=train_task.outputs["trained_model"],
model_name="fraud-detector",
serving_endpoint="https://serving.example.com"
)
deploy_task.set_display_name("Model Deployment")
# Compile the pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path="ml_training_pipeline.yaml"
)
Component Types and Usage
KFP v2 supports three component types, each suited for different scenarios.
| Component Type | Definition Method | Best For | Pros | Cons |
|---|---|---|---|---|
| Lightweight Python | @dsl.component decorator | Pure Python logic, rapid prototyping | Code and definition in one place, fast iteration | Cannot reference external files, imports must be inside function |
| Container | @dsl.container_component | Using existing Docker images, non-Python tasks | Language agnostic, reuse existing images | Limited artifact type support |
| Importer | dsl.importer() | Bringing external artifacts into pipeline | Track existing artifacts within pipeline | No data movement, only metadata registration |
Container Component Example
@dsl.container_component
def run_spark_job(
input_data: Input[Dataset],
output_data: Output[Dataset],
spark_config: str
):
"""Run a Spark job as a container."""
return dsl.ContainerSpec(
image="my-registry/spark-processor:3.5",
command=["spark-submit"],
args=[
"--master", "k8s://https://kubernetes.default.svc",
"--conf", spark_config,
"--input", input_data.path,
"--output", output_data.path,
"/app/etl_job.py"
]
)
# Using Importer: bring an external model into the pipeline
@dsl.pipeline(name="model-comparison-pipeline")
def comparison_pipeline():
existing_model = dsl.importer(
artifact_uri="gs://models/production/v2.1/model.pkl",
artifact_class=Model,
reimport=False,
metadata={"version": "2.1", "framework": "sklearn"}
)
# Compare existing model with new model
compare_task = compare_models(
baseline_model=existing_model.output,
candidate_model=train_task.outputs["trained_model"]
)
Artifact Management and Caching Strategies
Artifact System
KFP v2's artifact system tracks all inputs and outputs based on ML Metadata (MLMD). The key artifact types are as follows.
| Artifact Type | Purpose | Examples |
|---|---|---|
Dataset | Datasets | CSV, Parquet files |
Model | Trained models | pickle, ONNX, SavedModel |
Metrics | Numeric metrics | accuracy, loss, f1-score |
ClassificationMetrics | Classification metrics | confusion matrix, ROC curve |
HTML | HTML reports | Visualization reports |
Markdown | Markdown reports | Text-based reports |
Artifact | General artifacts | Other files |
Caching Strategies
KFP v2 supports automatic caching at the component level. You can significantly reduce execution time by reusing results from components that were run with identical inputs and code.
@dsl.pipeline(name="caching-example")
def caching_pipeline(data_path: str, retrain: bool = False):
# Enable caching for data preprocessing (reuse if same data)
preprocess_task = preprocess_data(raw_data_path=data_path)
preprocess_task.set_caching_options(True)
# Control caching for training based on retrain flag
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=200,
max_depth=15
)
if retrain:
train_task.set_caching_options(False) # Force retraining
# Disable caching for external API call components
deploy_task = deploy_model(model=train_task.outputs["trained_model"])
deploy_task.set_caching_options(False) # Always execute fresh
There are important considerations when operating with caching. First, caching is only meaningful when component code is a pure function (same input produces same output). Second, the KFP v2 SDK does not support cache expiration time settings, so components dealing with time-sensitive data should have caching disabled. Third, setting the environment variable KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULT to true disables caching by default for all pipelines.
CI/CD Integration
Automated Pipeline Deployment with GitHub Actions
# .github/workflows/kfp-deploy.yaml
name: KFP Pipeline CI/CD
on:
push:
branches: [main]
paths:
- 'pipelines/**'
- 'components/**'
env:
KFP_HOST: ${{ secrets.KFP_HOST }}
KFP_NAMESPACE: kubeflow
jobs:
validate-and-compile:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install kfp==2.7.0 kfp-kubernetes==1.2.0
pip install pytest
- name: Lint pipeline code
run: |
pip install ruff
ruff check pipelines/ components/
- name: Run unit tests
run: pytest tests/unit/ -v
- name: Compile pipeline
run: |
python -c "
from pipelines.training_pipeline import ml_training_pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path='compiled_pipeline.yaml'
)
print('Pipeline compiled successfully')
"
- name: Upload compiled pipeline
uses: actions/upload-artifact@v4
with:
name: compiled-pipeline
path: compiled_pipeline.yaml
deploy-pipeline:
needs: validate-and-compile
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Download compiled pipeline
uses: actions/download-artifact@v4
with:
name: compiled-pipeline
- name: Deploy to KFP
run: |
pip install kfp==2.7.0
python -c "
from kfp.client import Client
client = Client(host='${KFP_HOST}')
# Upload or update pipeline
try:
pipeline = client.upload_pipeline(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
description='Automated ML training pipeline'
)
print(f'Pipeline uploaded: {pipeline.pipeline_id}')
except Exception:
# Upload as new version if already exists
pipeline = client.upload_pipeline_version(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
pipeline_version_name='v${GITHUB_SHA::7}'
)
print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')
"
Programmatic Pipeline Execution
from kfp.client import Client
def trigger_pipeline_run(
host: str,
pipeline_name: str,
experiment_name: str,
params: dict
) -> str:
"""Trigger a pipeline run programmatically."""
client = Client(host=host)
# Create or retrieve experiment
experiment = client.create_experiment(
name=experiment_name,
namespace="kubeflow"
)
# Retrieve pipeline
pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')
if not pipelines.pipelines:
raise ValueError(f"Pipeline '{pipeline_name}' not found")
pipeline_id = pipelines.pipelines[0].pipeline_id
# Create run
run = client.run_pipeline(
experiment_id=experiment.experiment_id,
job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",
pipeline_id=pipeline_id,
params=params
)
print(f"Run created: {run.run_id}")
print(f"Monitor at: {host}/#/runs/details/{run.run_id}")
return run.run_id
# Set up recurring schedule
def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):
"""Create a schedule to run the pipeline daily at midnight."""
recurring_run = client.create_recurring_run(
experiment_id=experiment_id,
job_name="daily-training",
pipeline_id=pipeline_id,
params={"raw_data_path": "gs://data/daily/latest.csv"},
cron_expression="0 0 * * *",
max_concurrency=1,
enabled=True
)
return recurring_run
ML Workflow Orchestration Tool Comparison
| Criteria | KFP v2 | Apache Airflow | Prefect | Vertex AI Pipelines |
|---|---|---|---|---|
| Primary Use | ML pipeline dedicated | General data workflows | General workflows | Managed ML pipelines |
| Infrastructure | Kubernetes required | Standalone possible | Standalone / Cloud | Google Cloud managed |
| Pipeline Definition | Python decorators | Python (DAG classes) | Python decorators | KFP SDK (same) |
| Artifact Tracking | ML Metadata (built-in) | XCom (limited) | External integration needed | Vertex ML Metadata |
| Experiment Management | Built-in | Not supported (MLflow integration) | Not supported | Built-in |
| Caching | Component-level automatic | Task-level manual | Task-level built-in | Component-level automatic |
| GPU Support | Kubernetes native | K8s Executor required | Kubernetes integration | Automatic |
| UI | Pipeline visualization built-in | Web UI built-in | Prefect Cloud UI | Google Cloud Console |
| Scalability | Kubernetes scaling | Celery/K8s Executor | Dask/Ray integration | Auto scaling |
| Learning Curve | High (K8s knowledge required) | Medium | Low | Medium (GCP dependency) |
| Cost | Self-managed infrastructure | Self-managed infrastructure | Prefect Cloud paid | Usage-based billing |
Selection Criteria Summary: If you need a Kubernetes-based ML-dedicated pipeline, choose KFP v2. To integrate data engineering and ML, choose Airflow. To get started quickly, choose Prefect. If you are all-in on Google Cloud, Vertex AI Pipelines is the best fit.
Kubernetes Extension Features
KFP v2 supports Kubernetes-specific features through the kfp-kubernetes extension library.
from kfp import dsl
from kfp import kubernetes
@dsl.pipeline(name="gpu-training-pipeline")
def gpu_training_pipeline():
train_task = train_deep_learning_model(
dataset_path="gs://data/training",
epochs=50,
batch_size=64
)
# GPU resource allocation
kubernetes.add_node_selector(
train_task,
label_key="accelerator",
label_value="nvidia-a100"
)
kubernetes.add_toleration(
train_task,
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule"
)
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(2)
train_task.set_cpu_limit("16")
train_task.set_memory_limit("64Gi")
# Secret mount (model registry credentials)
kubernetes.use_secret_as_env(
train_task,
secret_name="model-registry-credentials",
secret_key_to_env={
"username": "REGISTRY_USERNAME",
"password": "REGISTRY_PASSWORD"
}
)
# PVC mount (shared data volume)
kubernetes.mount_pvc(
train_task,
pvc_name="shared-data-pvc",
mount_path="/mnt/shared-data"
)
Monitoring and Troubleshooting
Common Issues and Solutions
Issue 1: Pipeline compiles successfully but fails at runtime
The most common cause is missing packages in packages_to_install that are used inside the component. Since @dsl.component decorator Lightweight Python components run in isolated environments, all imports used within the function must be written inside the function body, and required packages must be explicitly declared.
Issue 2: Caching does not work as expected
Caching generates cache keys based on a component's input parameters, component code, and base image. If the cache hits even though you changed the code, either the component code was not actually changed, or only parts not included in the cache key were modified. Verify that you recompiled and uploaded the pipeline.
Issue 3: Pod terminated due to OOM (Out of Memory)
This frequently occurs in components that process large-scale data. Set set_memory_limit() and set_memory_request(), with the request set to approximately 80% of the limit. Processing data in chunks or explicitly using del to free unnecessary variables also helps.
Issue 4: Pipeline execution stuck in Pending state
This is often caused by insufficient resources on Kubernetes nodes. Check events using the kubectl describe pod command and review the node autoscaler configuration. For GPU nodes, verify the maximum node pool size.
Issue 5: Artifacts overwritten inside ParallelFor
This is a known issue in KFP v2 (GitHub Issue #10186), where artifacts from concurrently executing components within ParallelFor or Sub-DAG can conflict. Work around this by including unique identifiers in artifact paths or generating unique filenames within the component.
Log Inspection and Debugging
# View full logs for a pipeline run
kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers
# View logs for a specific component
kubectl logs -n kubeflow <pod-name> -c main
# Check Argo Workflow status
kubectl get workflows -n kubeflow
kubectl describe workflow <workflow-name> -n kubeflow
# Query ML Metadata directly (for debugging)
kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080
Production Operations Checklist
When operating KFP v2 in production, the following items should be verified.
- Resource Management: Set CPU/memory requests and limits for all components. For GPU components, specify
tolerationsandnodeSelector. - Retry Policy: Set
task.set_retry(num_retries=3, backoff_duration="60s")to handle transient errors. - Timeout: Use
timeoutsettings for long-running pipelines to prevent resource waste. - Namespace Separation: Separate development/staging/production pipelines into distinct namespaces.
- Artifact Cleanup: Set up a CronJob to periodically clean up artifacts from old runs. Leverage MinIO/S3 Lifecycle Policies.
- Monitoring Integration: Monitor pipeline execution time, success rate, and resource usage with Prometheus + Grafana.
- Alert Configuration: Set up Webhooks to send notifications to Slack/PagerDuty when a pipeline fails.
References
- Kubeflow Pipelines Official Documentation
- KFP SDK v2 API Reference
- Kubeflow Pipelines GitHub Repository
- KFP v2 Caching Guide
- KFP v2 Artifact Management
- KFP v1 to v2 Migration Guide
- KFP Python SDK DeepWiki
- Migrating to Vertex AI Pipelines
Quiz
Q1: What is the main topic covered in "Kubeflow Pipelines v2 ML Workflow Automation and
Operations Guide"?
From KFP v2 architecture to building ML pipelines with the KFP SDK, caching, artifact management, CI/CD integration, and production troubleshooting.
Q2: Describe the KFP v2 Architecture.
Core Components The KFP v2 architecture consists of the following core layers. The biggest change
in KFP v2 is the introduction of IR YAML. In v1, pipelines were compiled directly into Argo
Workflow YAML, creating tight coupling with Argo.
Q3: Explain the core concept of KFP SDK v2 Pipeline Definition.
Basic Components and Pipelines In KFP v2, pipelines are defined using the @dsl.component and
@dsl.pipeline decorators. A component is the smallest execution unit in a pipeline, and each
component runs in an isolated container. Pipeline Composition and Conditional Execution
Q4: What are the key aspects of Component Types and Usage?
KFP v2 supports three component types, each suited for different scenarios. Container Component
Example
Q5: How does Artifact Management and Caching Strategies work?
Artifact System KFP v2's artifact system tracks all inputs and outputs based on ML Metadata
(MLMD). The key artifact types are as follows. Caching Strategies KFP v2 supports automatic
caching at the component level.