Skip to content
Published on

Kubeflow Pipelines v2 ML 워크플로우 자동화와 운영 가이드

Authors
  • Name
    Twitter
Kubeflow Pipelines v2 ML 워크플로우

개요

ML 모델을 개발하는 것과 프로덕션에서 안정적으로 운영하는 것은 완전히 다른 문제다. 데이터 전처리, 학습, 평가, 배포까지 이어지는 파이프라인을 수동으로 관리하면 재현성 부족, 실험 추적 실패, 배포 지연 등의 문제가 반복된다. Kubeflow Pipelines(KFP) v2는 Kubernetes 위에서 ML 워크플로우를 선언적으로 정의하고 자동화하는 프레임워크로, Python 데코레이터만으로 복잡한 ML 파이프라인을 구축할 수 있게 해준다.

KFP v2는 v1 대비 크게 개선되었다. 파이프라인 컴파일 결과가 Argo Workflow YAML이 아닌 IR(Intermediate Representation) YAML로 추상화되어 다양한 실행 백엔드를 지원하고, 아티팩트 시스템이 강화되었으며, 타입 안전성이 높아졌다. 이 글에서는 KFP v2의 아키텍처, SDK 활용법, 캐싱 전략, CI/CD 통합, 그리고 프로덕션 운영 트러블슈팅까지 실전 관점에서 다룬다.

KFP v2 아키텍처

핵심 구성 요소

KFP v2의 아키텍처는 다음과 같은 핵심 레이어로 구성된다.

구성 요소역할기술 스택
KFP SDK파이프라인/컴포넌트 정의, 컴파일Python (kfp 패키지)
IR CompilerPython DSL을 IR YAML로 변환Protocol Buffers 기반
KFP Backend파이프라인 실행 관리, API 서버Go, gRPC/REST
Workflow Engine실제 워크플로우 오케스트레이션Argo Workflows / Tekton
Metadata Store실행 메타데이터, 아티팩트 추적ML Metadata (MLMD)
Artifact Store모델, 데이터셋 등 아티팩트 저장MinIO / GCS / S3
UI Dashboard파이프라인 시각화, 실행 모니터링React 기반 웹 UI

KFP v2에서 가장 큰 변화는 IR YAML 도입이다. v1에서는 파이프라인을 Argo Workflow YAML로 직접 컴파일했기 때문에 Argo에 강하게 결합되어 있었다. v2에서는 IR이라는 중간 표현으로 먼저 컴파일한 후, 각 백엔드 드라이버가 이를 해석하여 실행한다. 이 덕분에 동일한 파이프라인 정의를 Kubeflow 클러스터뿐 아니라 Google Vertex AI Pipelines에서도 실행할 수 있다.

설치와 클러스터 구성

# KFP SDK v2 설치
pip install kfp==2.7.0

# Kubernetes 확장 라이브러리 (GPU, Volume 등 K8s 특화 기능)
pip install kfp-kubernetes==1.2.0

# Kubeflow Pipelines 백엔드 배포 (Kubernetes 클러스터에)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"

# 포트 포워딩으로 UI 접근
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

KFP SDK v2 파이프라인 정의

기본 컴포넌트와 파이프라인

KFP v2에서는 @dsl.component@dsl.pipeline 데코레이터로 파이프라인을 정의한다. 컴포넌트는 파이프라인의 최소 실행 단위이며, 각 컴포넌트는 독립된 컨테이너에서 실행된다.

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]
)
def preprocess_data(
    raw_data_path: str,
    test_size: float,
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    data_stats: Output[Metrics]
):
    """데이터를 로드하고 학습/테스트로 분할한다."""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(raw_data_path)

    # 결측치 처리
    df = df.dropna(subset=["target"])
    df = df.fillna(df.median(numeric_only=True))

    train_df, test_df = train_test_split(
        df, test_size=test_size, random_state=42, stratify=df["target"]
    )

    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)

    # 메트릭 기록
    data_stats.log_metric("total_rows", len(df))
    data_stats.log_metric("train_rows", len(train_df))
    data_stats.log_metric("test_rows", len(test_df))
    data_stats.log_metric("feature_count", len(df.columns) - 1)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def train_model(
    train_dataset: Input[Dataset],
    n_estimators: int,
    max_depth: int,
    trained_model: Output[Model],
    training_metrics: Output[Metrics]
):
    """Random Forest 모델을 학습한다."""
    import pandas as pd
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, f1_score

    train_df = pd.read_csv(train_dataset.path)
    X_train = train_df.drop(columns=["target"])
    y_train = train_df["target"]

    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
        n_jobs=-1
    )
    model.fit(X_train, y_train)

    # 학습 정확도 기록
    train_pred = model.predict(X_train)
    training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))
    training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))

    # 모델 저장
    joblib.dump(model, trained_model.path)
    trained_model.metadata["framework"] = "sklearn"
    trained_model.metadata["model_type"] = "RandomForestClassifier"


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def evaluate_model(
    test_dataset: Input[Dataset],
    trained_model: Input[Model],
    eval_metrics: Output[Metrics],
    accuracy_threshold: float = 0.85
) -> bool:
    """테스트 데이터로 모델을 평가하고 임계값을 넘는지 확인한다."""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score, classification_report

    test_df = pd.read_csv(test_dataset.path)
    X_test = test_df.drop(columns=["target"])
    y_test = test_df["target"]

    model = joblib.load(trained_model.path)
    predictions = model.predict(X_test)

    accuracy = accuracy_score(y_test, predictions)
    f1 = f1_score(y_test, predictions, average="weighted")

    eval_metrics.log_metric("test_accuracy", accuracy)
    eval_metrics.log_metric("test_f1", f1)
    eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)

    return accuracy >= accuracy_threshold

파이프라인 조합과 조건부 실행

@dsl.pipeline(
    name="ml-training-pipeline",
    description="데이터 전처리 → 학습 → 평가 → 조건부 배포 파이프라인"
)
def ml_training_pipeline(
    raw_data_path: str = "gs://my-bucket/data/raw.csv",
    test_size: float = 0.2,
    n_estimators: int = 100,
    max_depth: int = 10,
    accuracy_threshold: float = 0.85
):
    # 1단계: 데이터 전처리
    preprocess_task = preprocess_data(
        raw_data_path=raw_data_path,
        test_size=test_size
    )
    preprocess_task.set_display_name("Data Preprocessing")

    # 2단계: 모델 학습
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=n_estimators,
        max_depth=max_depth
    )
    train_task.set_display_name("Model Training")
    train_task.set_cpu_limit("4")
    train_task.set_memory_limit("8Gi")

    # 3단계: 모델 평가
    eval_task = evaluate_model(
        test_dataset=preprocess_task.outputs["test_dataset"],
        trained_model=train_task.outputs["trained_model"],
        accuracy_threshold=accuracy_threshold
    )
    eval_task.set_display_name("Model Evaluation")

    # 4단계: 조건부 배포 (정확도 임계값 통과 시)
    with dsl.If(eval_task.output == True):
        deploy_task = deploy_model(
            model=train_task.outputs["trained_model"],
            model_name="fraud-detector",
            serving_endpoint="https://serving.example.com"
        )
        deploy_task.set_display_name("Model Deployment")


# 파이프라인 컴파일
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=ml_training_pipeline,
    package_path="ml_training_pipeline.yaml"
)

컴포넌트 타입별 활용

KFP v2는 세 가지 컴포넌트 타입을 지원하며, 각각의 사용 시나리오가 다르다.

컴포넌트 타입정의 방식적합한 상황장점단점
Lightweight Python@dsl.component 데코레이터순수 Python 로직, 빠른 프로토타이핑코드와 정의가 한곳에, 빠른 반복외부 파일 참조 불가, 함수 내 import 필수
Container@dsl.container_component기존 Docker 이미지 활용, 비-Python 작업언어 무관, 기존 이미지 재사용아티팩트 타입 제한적
Importerdsl.importer()외부 아티팩트를 파이프라인에 도입기존 아티팩트를 파이프라인 안에서 추적데이터 이동 없음, 메타데이터만 등록

Container Component 예시

@dsl.container_component
def run_spark_job(
    input_data: Input[Dataset],
    output_data: Output[Dataset],
    spark_config: str
):
    """Spark 작업을 컨테이너로 실행한다."""
    return dsl.ContainerSpec(
        image="my-registry/spark-processor:3.5",
        command=["spark-submit"],
        args=[
            "--master", "k8s://https://kubernetes.default.svc",
            "--conf", spark_config,
            "--input", input_data.path,
            "--output", output_data.path,
            "/app/etl_job.py"
        ]
    )


# Importer 활용: 외부 모델을 파이프라인에 도입
@dsl.pipeline(name="model-comparison-pipeline")
def comparison_pipeline():
    existing_model = dsl.importer(
        artifact_uri="gs://models/production/v2.1/model.pkl",
        artifact_class=Model,
        reimport=False,
        metadata={"version": "2.1", "framework": "sklearn"}
    )

    # 기존 모델과 새 모델을 비교
    compare_task = compare_models(
        baseline_model=existing_model.output,
        candidate_model=train_task.outputs["trained_model"]
    )

아티팩트 관리와 캐싱 전략

아티팩트 시스템

KFP v2의 아티팩트 시스템은 ML Metadata(MLMD)를 기반으로 모든 입출력을 추적한다. 주요 아티팩트 타입은 다음과 같다.

아티팩트 타입용도예시
Dataset데이터셋CSV, Parquet 파일
Model학습된 모델pickle, ONNX, SavedModel
Metrics수치 메트릭accuracy, loss, f1-score
ClassificationMetrics분류 메트릭confusion matrix, ROC curve
HTMLHTML 리포트시각화 리포트
Markdown마크다운 리포트텍스트 기반 리포트
Artifact범용 아티팩트기타 파일

캐싱 전략

KFP v2는 컴포넌트 수준에서 자동 캐싱을 지원한다. 동일한 입력과 코드로 실행된 컴포넌트의 결과를 재사용하여 실행 시간을 크게 절약할 수 있다.

@dsl.pipeline(name="caching-example")
def caching_pipeline(data_path: str, retrain: bool = False):
    # 데이터 전처리는 캐싱 활성화 (동일 데이터면 재사용)
    preprocess_task = preprocess_data(raw_data_path=data_path)
    preprocess_task.set_caching_options(True)

    # 학습은 retrain 플래그에 따라 캐싱 제어
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=200,
        max_depth=15
    )
    if retrain:
        train_task.set_caching_options(False)  # 강제 재학습

    # 외부 API 호출 컴포넌트는 캐싱 비활성화
    deploy_task = deploy_model(model=train_task.outputs["trained_model"])
    deploy_task.set_caching_options(False)  # 항상 새로 실행

캐싱 운영 시 주의할 점이 있다. 첫째, 컴포넌트 코드가 순수 함수(같은 입력이면 같은 출력)여야 캐싱이 의미 있다. 둘째, KFP v2 SDK에서는 캐시 만료 시간 설정이 지원되지 않으므로, 시간에 민감한 데이터를 다루는 컴포넌트는 캐싱을 비활성화해야 한다. 셋째, 환경 변수 KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULTtrue로 설정하면 기본적으로 모든 파이프라인에서 캐싱이 비활성화된다.

CI/CD 통합

GitHub Actions로 파이프라인 자동 배포

# .github/workflows/kfp-deploy.yaml
name: KFP Pipeline CI/CD

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
      - 'components/**'

env:
  KFP_HOST: ${{ secrets.KFP_HOST }}
  KFP_NAMESPACE: kubeflow

jobs:
  validate-and-compile:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          pip install kfp==2.7.0 kfp-kubernetes==1.2.0
          pip install pytest

      - name: Lint pipeline code
        run: |
          pip install ruff
          ruff check pipelines/ components/

      - name: Run unit tests
        run: pytest tests/unit/ -v

      - name: Compile pipeline
        run: |
          python -c "
          from pipelines.training_pipeline import ml_training_pipeline
          from kfp import compiler
          compiler.Compiler().compile(
              pipeline_func=ml_training_pipeline,
              package_path='compiled_pipeline.yaml'
          )
          print('Pipeline compiled successfully')
          "

      - name: Upload compiled pipeline
        uses: actions/upload-artifact@v4
        with:
          name: compiled-pipeline
          path: compiled_pipeline.yaml

  deploy-pipeline:
    needs: validate-and-compile
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4

      - name: Download compiled pipeline
        uses: actions/download-artifact@v4
        with:
          name: compiled-pipeline

      - name: Deploy to KFP
        run: |
          pip install kfp==2.7.0
          python -c "
          from kfp.client import Client

          client = Client(host='${KFP_HOST}')

          # 파이프라인 업로드 또는 업데이트
          try:
              pipeline = client.upload_pipeline(
                  pipeline_package_path='compiled_pipeline.yaml',
                  pipeline_name='ml-training-pipeline',
                  description='Automated ML training pipeline'
              )
              print(f'Pipeline uploaded: {pipeline.pipeline_id}')
          except Exception:
              # 이미 존재하면 새 버전으로 업로드
              pipeline = client.upload_pipeline_version(
                  pipeline_package_path='compiled_pipeline.yaml',
                  pipeline_name='ml-training-pipeline',
                  pipeline_version_name='v${GITHUB_SHA::7}'
              )
              print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')
          "

프로그래밍 방식 파이프라인 실행

from kfp.client import Client

def trigger_pipeline_run(
    host: str,
    pipeline_name: str,
    experiment_name: str,
    params: dict
) -> str:
    """프로그래밍 방식으로 파이프라인 실행을 트리거한다."""
    client = Client(host=host)

    # 실험 생성 또는 조회
    experiment = client.create_experiment(
        name=experiment_name,
        namespace="kubeflow"
    )

    # 파이프라인 조회
    pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')
    if not pipelines.pipelines:
        raise ValueError(f"Pipeline '{pipeline_name}' not found")

    pipeline_id = pipelines.pipelines[0].pipeline_id

    # 실행 생성
    run = client.run_pipeline(
        experiment_id=experiment.experiment_id,
        job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",
        pipeline_id=pipeline_id,
        params=params
    )

    print(f"Run created: {run.run_id}")
    print(f"Monitor at: {host}/#/runs/details/{run.run_id}")
    return run.run_id


# 반복 스케줄 실행 설정
def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):
    """매일 자정에 파이프라인을 실행하는 스케줄을 생성한다."""
    recurring_run = client.create_recurring_run(
        experiment_id=experiment_id,
        job_name="daily-training",
        pipeline_id=pipeline_id,
        params={"raw_data_path": "gs://data/daily/latest.csv"},
        cron_expression="0 0 * * *",
        max_concurrency=1,
        enabled=True
    )
    return recurring_run

ML 워크플로우 오케스트레이션 도구 비교

항목KFP v2Apache AirflowPrefectVertex AI Pipelines
주 용도ML 파이프라인 전용범용 데이터 워크플로우범용 워크플로우관리형 ML 파이프라인
인프라Kubernetes 필수독립 실행 가능독립 실행 / 클라우드Google Cloud 관리형
파이프라인 정의Python 데코레이터Python (DAG 클래스)Python 데코레이터KFP SDK (동일)
아티팩트 추적ML Metadata (내장)XCom (제한적)외부 연동 필요Vertex ML Metadata
실험 관리내장미지원 (MLflow 연동)미지원내장
캐싱컴포넌트 수준 자동태스크 수준 수동태스크 수준 내장컴포넌트 수준 자동
GPU 지원Kubernetes 네이티브K8s Executor 필요Kubernetes 연동자동
UI파이프라인 시각화 내장웹 UI 내장Prefect Cloud UIGoogle Cloud Console
확장성Kubernetes 스케일링Celery/K8s ExecutorDask/Ray 연동자동 스케일링
러닝 커브높음 (K8s 지식 필요)중간낮음중간 (GCP 종속)
비용인프라 자체 운영인프라 자체 운영Prefect Cloud 유료사용량 기반 과금

선택 기준 요약: Kubernetes 기반 ML 전용 파이프라인이 필요하면 KFP v2, 데이터 엔지니어링과 ML을 통합하려면 Airflow, 빠르게 시작하고 싶다면 Prefect, Google Cloud에 올인했다면 Vertex AI Pipelines가 적합하다.

Kubernetes 확장 기능

KFP v2는 kfp-kubernetes 확장 라이브러리를 통해 Kubernetes 특화 기능을 지원한다.

from kfp import dsl
from kfp import kubernetes

@dsl.pipeline(name="gpu-training-pipeline")
def gpu_training_pipeline():
    train_task = train_deep_learning_model(
        dataset_path="gs://data/training",
        epochs=50,
        batch_size=64
    )

    # GPU 리소스 할당
    kubernetes.add_node_selector(
        train_task,
        label_key="accelerator",
        label_value="nvidia-a100"
    )
    kubernetes.add_toleration(
        train_task,
        key="nvidia.com/gpu",
        operator="Exists",
        effect="NoSchedule"
    )
    train_task.set_accelerator_type("nvidia.com/gpu")
    train_task.set_accelerator_limit(2)
    train_task.set_cpu_limit("16")
    train_task.set_memory_limit("64Gi")

    # Secret 마운트 (모델 레지스트리 인증 정보)
    kubernetes.use_secret_as_env(
        train_task,
        secret_name="model-registry-credentials",
        secret_key_to_env={
            "username": "REGISTRY_USERNAME",
            "password": "REGISTRY_PASSWORD"
        }
    )

    # PVC 마운트 (공유 데이터 볼륨)
    kubernetes.mount_pvc(
        train_task,
        pvc_name="shared-data-pvc",
        mount_path="/mnt/shared-data"
    )

모니터링과 트러블슈팅

일반적인 문제와 해결 방법

문제 1: 파이프라인 컴파일은 성공하지만 실행이 실패하는 경우

가장 흔한 원인은 컴포넌트 내부에서 사용하는 패키지가 packages_to_install에 누락된 것이다. @dsl.component 데코레이터의 Lightweight Python 컴포넌트는 격리된 환경에서 실행되므로, 함수 내부에서 사용하는 모든 import를 함수 본문 안에 작성하고 필요한 패키지를 명시해야 한다.

문제 2: 캐싱이 예상대로 동작하지 않는 경우

캐싱은 컴포넌트의 입력 파라미터, 컴포넌트 코드, 베이스 이미지를 기반으로 캐시 키를 생성한다. 코드를 변경했는데도 캐시가 히트되면, 컴포넌트 코드가 실제로 변경되지 않았거나 캐시 키에 포함되지 않는 부분만 변경한 것이다. 파이프라인을 다시 컴파일하고 업로드했는지 확인하자.

문제 3: OOM(Out of Memory)으로 Pod가 종료되는 경우

대규모 데이터를 처리하는 컴포넌트에서 자주 발생한다. set_memory_limit()set_memory_request()를 설정하되, request는 limit의 80% 정도로 설정하는 것이 좋다. 데이터를 청크 단위로 처리하거나, 필요 없는 변수를 명시적으로 del하는 것도 도움이 된다.

문제 4: 파이프라인 실행이 Pending 상태에서 멈추는 경우

Kubernetes 노드의 리소스 부족이 원인인 경우가 많다. kubectl describe pod 명령으로 이벤트를 확인하고, 노드 오토스케일러 설정을 점검하자. GPU 노드의 경우 노드 풀 최대 크기를 확인해야 한다.

문제 5: ParallelFor 내부 아티팩트가 덮어씌워지는 경우

KFP v2의 알려진 이슈(GitHub Issue #10186)로, ParallelFor나 Sub-DAG 내부에서 동시에 실행되는 컴포넌트의 아티팩트가 충돌할 수 있다. 아티팩트 경로에 고유 식별자를 포함시키거나, 컴포넌트 내부에서 고유한 파일명을 생성하여 우회한다.

로그 확인과 디버깅

# 파이프라인 실행의 전체 로그 확인
kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers

# 특정 컴포넌트의 로그 확인
kubectl logs -n kubeflow <pod-name> -c main

# Argo Workflow 상태 확인
kubectl get workflows -n kubeflow
kubectl describe workflow <workflow-name> -n kubeflow

# ML Metadata 직접 조회 (디버깅용)
kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080

프로덕션 운영 체크리스트

프로덕션 환경에서 KFP v2를 운영할 때 다음 사항들을 점검해야 한다.

  • 리소스 관리: 모든 컴포넌트에 CPU/메모리 request와 limit를 설정한다. GPU 컴포넌트는 tolerationsnodeSelector를 명시한다.
  • 재시도 정책: 일시적 오류에 대비해 task.set_retry(num_retries=3, backoff_duration="60s")를 설정한다.
  • 타임아웃: 장시간 실행되는 파이프라인에 timeout 설정으로 리소스 낭비를 방지한다.
  • 네임스페이스 분리: 개발/스테이징/프로덕션 파이프라인을 별도 네임스페이스로 분리한다.
  • 아티팩트 정리: 오래된 실행의 아티팩트를 정기적으로 정리하는 CronJob을 설정한다. MinIO/S3의 Lifecycle Policy를 활용한다.
  • 모니터링 연동: Prometheus + Grafana로 파이프라인 실행 시간, 성공률, 리소스 사용량을 모니터링한다.
  • 알림 설정: 파이프라인 실패 시 Slack/PagerDuty로 알림을 보내는 Webhook을 연동한다.

참고자료

  1. Kubeflow Pipelines 공식 문서
  2. KFP SDK v2 API Reference
  3. Kubeflow Pipelines GitHub 저장소
  4. KFP v2 캐싱 가이드
  5. KFP v2 아티팩트 관리
  6. KFP v1에서 v2로 마이그레이션 가이드
  7. KFP Python SDK DeepWiki
  8. Vertex AI Pipelines로 마이그레이션