Skip to content

필사 모드: Kubeflow Pipelines v2 ML Workflow Automation and Operations Guide

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Overview

Developing an ML model and operating it reliably in production are entirely different problems. Manually managing a pipeline that spans data preprocessing, training, evaluation, and deployment leads to recurring issues such as lack of reproducibility, failed experiment tracking, and deployment delays. **Kubeflow Pipelines (KFP) v2** is a framework that declaratively defines and automates ML workflows on Kubernetes, allowing you to build complex ML pipelines using just Python decorators.

KFP v2 is a major improvement over v1. Pipeline compilation results are now abstracted into **IR (Intermediate Representation) YAML** instead of Argo Workflow YAML, supporting various execution backends. The artifact system has been strengthened, and type safety has been improved. This article covers KFP v2's architecture, SDK usage, caching strategies, CI/CD integration, and production troubleshooting from a practical perspective.

KFP v2 Architecture

Core Components

The KFP v2 architecture consists of the following core layers.

| Component | Role | Tech Stack |

| ------------------- | -------------------------------------------- | ----------------------- |

| **KFP SDK** | Pipeline/component definition, compilation | Python (`kfp` package) |

| **IR Compiler** | Converts Python DSL to IR YAML | Protocol Buffers based |

| **KFP Backend** | Pipeline execution management, API server | Go, gRPC/REST |

| **Workflow Engine** | Actual workflow orchestration | Argo Workflows / Tekton |

| **Metadata Store** | Execution metadata, artifact tracking | ML Metadata (MLMD) |

| **Artifact Store** | Stores models, datasets, and other artifacts | MinIO / GCS / S3 |

| **UI Dashboard** | Pipeline visualization, execution monitoring | React-based web UI |

The biggest change in KFP v2 is the introduction of **IR YAML**. In v1, pipelines were compiled directly into Argo Workflow YAML, creating tight coupling with Argo. In v2, pipelines are first compiled into an intermediate representation called IR, which is then interpreted and executed by each backend driver. This means the same pipeline definition can run not only on a Kubeflow cluster but also on Google Vertex AI Pipelines.

Installation and Cluster Setup

KFP SDK v2 installation

pip install kfp==2.7.0

Kubernetes extension library (GPU, Volume, and other K8s-specific features)

pip install kfp-kubernetes==1.2.0

Kubeflow Pipelines backend deployment (on Kubernetes cluster)

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"

kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"

Access UI via port forwarding

kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

KFP SDK v2 Pipeline Definition

Basic Components and Pipelines

In KFP v2, pipelines are defined using the `@dsl.component` and `@dsl.pipeline` decorators. A component is the smallest execution unit in a pipeline, and each component runs in an isolated container.

from kfp import dsl

from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]

)

def preprocess_data(

raw_data_path: str,

test_size: float,

train_dataset: Output[Dataset],

test_dataset: Output[Dataset],

data_stats: Output[Metrics]

):

"""Load data and split into train/test sets."""

from sklearn.model_selection import train_test_split

df = pd.read_csv(raw_data_path)

Handle missing values

df = df.dropna(subset=["target"])

df = df.fillna(df.median(numeric_only=True))

train_df, test_df = train_test_split(

df, test_size=test_size, random_state=42, stratify=df["target"]

)

train_df.to_csv(train_dataset.path, index=False)

test_df.to_csv(test_dataset.path, index=False)

Log metrics

data_stats.log_metric("total_rows", len(df))

data_stats.log_metric("train_rows", len(train_df))

data_stats.log_metric("test_rows", len(test_df))

data_stats.log_metric("feature_count", len(df.columns) - 1)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]

)

def train_model(

train_dataset: Input[Dataset],

n_estimators: int,

max_depth: int,

trained_model: Output[Model],

training_metrics: Output[Metrics]

):

"""Train a Random Forest model."""

from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import accuracy_score, f1_score

train_df = pd.read_csv(train_dataset.path)

X_train = train_df.drop(columns=["target"])

y_train = train_df["target"]

model = RandomForestClassifier(

n_estimators=n_estimators,

max_depth=max_depth,

random_state=42,

n_jobs=-1

)

model.fit(X_train, y_train)

Log training accuracy

train_pred = model.predict(X_train)

training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))

training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))

Save model

joblib.dump(model, trained_model.path)

trained_model.metadata["framework"] = "sklearn"

trained_model.metadata["model_type"] = "RandomForestClassifier"

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]

)

def evaluate_model(

test_dataset: Input[Dataset],

trained_model: Input[Model],

eval_metrics: Output[Metrics],

accuracy_threshold: float = 0.85

) -> bool:

"""Evaluate the model on test data and check if it passes the threshold."""

from sklearn.metrics import accuracy_score, f1_score, classification_report

test_df = pd.read_csv(test_dataset.path)

X_test = test_df.drop(columns=["target"])

y_test = test_df["target"]

model = joblib.load(trained_model.path)

predictions = model.predict(X_test)

accuracy = accuracy_score(y_test, predictions)

f1 = f1_score(y_test, predictions, average="weighted")

eval_metrics.log_metric("test_accuracy", accuracy)

eval_metrics.log_metric("test_f1", f1)

eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)

return accuracy >= accuracy_threshold

Pipeline Composition and Conditional Execution

@dsl.pipeline(

name="ml-training-pipeline",

description="Data preprocessing -> Training -> Evaluation -> Conditional deployment pipeline"

)

def ml_training_pipeline(

raw_data_path: str = "gs://my-bucket/data/raw.csv",

test_size: float = 0.2,

n_estimators: int = 100,

max_depth: int = 10,

accuracy_threshold: float = 0.85

):

Step 1: Data preprocessing

preprocess_task = preprocess_data(

raw_data_path=raw_data_path,

test_size=test_size

)

preprocess_task.set_display_name("Data Preprocessing")

Step 2: Model training

train_task = train_model(

train_dataset=preprocess_task.outputs["train_dataset"],

n_estimators=n_estimators,

max_depth=max_depth

)

train_task.set_display_name("Model Training")

train_task.set_cpu_limit("4")

train_task.set_memory_limit("8Gi")

Step 3: Model evaluation

eval_task = evaluate_model(

test_dataset=preprocess_task.outputs["test_dataset"],

trained_model=train_task.outputs["trained_model"],

accuracy_threshold=accuracy_threshold

)

eval_task.set_display_name("Model Evaluation")

Step 4: Conditional deployment (if accuracy threshold is met)

with dsl.If(eval_task.output == True):

deploy_task = deploy_model(

model=train_task.outputs["trained_model"],

model_name="fraud-detector",

serving_endpoint="https://serving.example.com"

)

deploy_task.set_display_name("Model Deployment")

Compile the pipeline

from kfp import compiler

compiler.Compiler().compile(

pipeline_func=ml_training_pipeline,

package_path="ml_training_pipeline.yaml"

)

Component Types and Usage

KFP v2 supports three component types, each suited for different scenarios.

| Component Type | Definition Method | Best For | Pros | Cons |

| ---------------------- | -------------------------- | ---------------------------------------------- | ------------------------------------------------ | ---------------------------------------------------------------- |

| **Lightweight Python** | `@dsl.component` decorator | Pure Python logic, rapid prototyping | Code and definition in one place, fast iteration | Cannot reference external files, imports must be inside function |

| **Container** | `@dsl.container_component` | Using existing Docker images, non-Python tasks | Language agnostic, reuse existing images | Limited artifact type support |

| **Importer** | `dsl.importer()` | Bringing external artifacts into pipeline | Track existing artifacts within pipeline | No data movement, only metadata registration |

Container Component Example

@dsl.container_component

def run_spark_job(

input_data: Input[Dataset],

output_data: Output[Dataset],

spark_config: str

):

"""Run a Spark job as a container."""

return dsl.ContainerSpec(

image="my-registry/spark-processor:3.5",

command=["spark-submit"],

args=[

"--master", "k8s://https://kubernetes.default.svc",

"--conf", spark_config,

"--input", input_data.path,

"--output", output_data.path,

"/app/etl_job.py"

]

)

Using Importer: bring an external model into the pipeline

@dsl.pipeline(name="model-comparison-pipeline")

def comparison_pipeline():

existing_model = dsl.importer(

artifact_uri="gs://models/production/v2.1/model.pkl",

artifact_class=Model,

reimport=False,

metadata={"version": "2.1", "framework": "sklearn"}

)

Compare existing model with new model

compare_task = compare_models(

baseline_model=existing_model.output,

candidate_model=train_task.outputs["trained_model"]

)

Artifact Management and Caching Strategies

Artifact System

KFP v2's artifact system tracks all inputs and outputs based on ML Metadata (MLMD). The key artifact types are as follows.

| Artifact Type | Purpose | Examples |

| ----------------------- | ---------------------- | --------------------------- |

| `Dataset` | Datasets | CSV, Parquet files |

| `Model` | Trained models | pickle, ONNX, SavedModel |

| `Metrics` | Numeric metrics | accuracy, loss, f1-score |

| `ClassificationMetrics` | Classification metrics | confusion matrix, ROC curve |

| `HTML` | HTML reports | Visualization reports |

| `Markdown` | Markdown reports | Text-based reports |

| `Artifact` | General artifacts | Other files |

Caching Strategies

KFP v2 supports automatic caching at the component level. You can significantly reduce execution time by reusing results from components that were run with identical inputs and code.

@dsl.pipeline(name="caching-example")

def caching_pipeline(data_path: str, retrain: bool = False):

Enable caching for data preprocessing (reuse if same data)

preprocess_task = preprocess_data(raw_data_path=data_path)

preprocess_task.set_caching_options(True)

Control caching for training based on retrain flag

train_task = train_model(

train_dataset=preprocess_task.outputs["train_dataset"],

n_estimators=200,

max_depth=15

)

if retrain:

train_task.set_caching_options(False) # Force retraining

Disable caching for external API call components

deploy_task = deploy_model(model=train_task.outputs["trained_model"])

deploy_task.set_caching_options(False) # Always execute fresh

There are important considerations when operating with caching. First, caching is only meaningful when component code is a pure function (same input produces same output). Second, the KFP v2 SDK does not support cache expiration time settings, so components dealing with time-sensitive data should have caching disabled. Third, setting the environment variable `KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULT` to `true` disables caching by default for all pipelines.

CI/CD Integration

Automated Pipeline Deployment with GitHub Actions

.github/workflows/kfp-deploy.yaml

name: KFP Pipeline CI/CD

on:

push:

branches: [main]

paths:

- 'pipelines/**'

- 'components/**'

env:

KFP_HOST: ${{ secrets.KFP_HOST }}

KFP_NAMESPACE: kubeflow

jobs:

validate-and-compile:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Set up Python

uses: actions/setup-python@v5

with:

python-version: '3.11'

cache: 'pip'

- name: Install dependencies

run: |

pip install kfp==2.7.0 kfp-kubernetes==1.2.0

pip install pytest

- name: Lint pipeline code

run: |

pip install ruff

ruff check pipelines/ components/

- name: Run unit tests

run: pytest tests/unit/ -v

- name: Compile pipeline

run: |

python -c "

from pipelines.training_pipeline import ml_training_pipeline

from kfp import compiler

compiler.Compiler().compile(

pipeline_func=ml_training_pipeline,

package_path='compiled_pipeline.yaml'

)

print('Pipeline compiled successfully')

"

- name: Upload compiled pipeline

uses: actions/upload-artifact@v4

with:

name: compiled-pipeline

path: compiled_pipeline.yaml

deploy-pipeline:

needs: validate-and-compile

runs-on: ubuntu-latest

if: github.ref == 'refs/heads/main'

steps:

- uses: actions/checkout@v4

- name: Download compiled pipeline

uses: actions/download-artifact@v4

with:

name: compiled-pipeline

- name: Deploy to KFP

run: |

pip install kfp==2.7.0

python -c "

from kfp.client import Client

client = Client(host='${KFP_HOST}')

Upload or update pipeline

try:

pipeline = client.upload_pipeline(

pipeline_package_path='compiled_pipeline.yaml',

pipeline_name='ml-training-pipeline',

description='Automated ML training pipeline'

)

print(f'Pipeline uploaded: {pipeline.pipeline_id}')

except Exception:

Upload as new version if already exists

pipeline = client.upload_pipeline_version(

pipeline_package_path='compiled_pipeline.yaml',

pipeline_name='ml-training-pipeline',

pipeline_version_name='v${GITHUB_SHA::7}'

)

print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')

"

Programmatic Pipeline Execution

from kfp.client import Client

def trigger_pipeline_run(

host: str,

pipeline_name: str,

experiment_name: str,

params: dict

) -> str:

"""Trigger a pipeline run programmatically."""

client = Client(host=host)

Create or retrieve experiment

experiment = client.create_experiment(

name=experiment_name,

namespace="kubeflow"

)

Retrieve pipeline

pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')

if not pipelines.pipelines:

raise ValueError(f"Pipeline '{pipeline_name}' not found")

pipeline_id = pipelines.pipelines[0].pipeline_id

Create run

run = client.run_pipeline(

experiment_id=experiment.experiment_id,

job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",

pipeline_id=pipeline_id,

params=params

)

print(f"Run created: {run.run_id}")

print(f"Monitor at: {host}/#/runs/details/{run.run_id}")

return run.run_id

Set up recurring schedule

def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):

"""Create a schedule to run the pipeline daily at midnight."""

recurring_run = client.create_recurring_run(

experiment_id=experiment_id,

job_name="daily-training",

pipeline_id=pipeline_id,

params={"raw_data_path": "gs://data/daily/latest.csv"},

cron_expression="0 0 * * *",

max_concurrency=1,

enabled=True

)

return recurring_run

ML Workflow Orchestration Tool Comparison

| Criteria | KFP v2 | Apache Airflow | Prefect | Vertex AI Pipelines |

| ------------------------- | ------------------------------- | ---------------------------------- | --------------------------- | ------------------------- |

| **Primary Use** | ML pipeline dedicated | General data workflows | General workflows | Managed ML pipelines |

| **Infrastructure** | Kubernetes required | Standalone possible | Standalone / Cloud | Google Cloud managed |

| **Pipeline Definition** | Python decorators | Python (DAG classes) | Python decorators | KFP SDK (same) |

| **Artifact Tracking** | ML Metadata (built-in) | XCom (limited) | External integration needed | Vertex ML Metadata |

| **Experiment Management** | Built-in | Not supported (MLflow integration) | Not supported | Built-in |

| **Caching** | Component-level automatic | Task-level manual | Task-level built-in | Component-level automatic |

| **GPU Support** | Kubernetes native | K8s Executor required | Kubernetes integration | Automatic |

| **UI** | Pipeline visualization built-in | Web UI built-in | Prefect Cloud UI | Google Cloud Console |

| **Scalability** | Kubernetes scaling | Celery/K8s Executor | Dask/Ray integration | Auto scaling |

| **Learning Curve** | High (K8s knowledge required) | Medium | Low | Medium (GCP dependency) |

| **Cost** | Self-managed infrastructure | Self-managed infrastructure | Prefect Cloud paid | Usage-based billing |

**Selection Criteria Summary**: If you need a Kubernetes-based ML-dedicated pipeline, choose **KFP v2**. To integrate data engineering and ML, choose **Airflow**. To get started quickly, choose **Prefect**. If you are all-in on Google Cloud, **Vertex AI Pipelines** is the best fit.

Kubernetes Extension Features

KFP v2 supports Kubernetes-specific features through the `kfp-kubernetes` extension library.

from kfp import dsl

from kfp import kubernetes

@dsl.pipeline(name="gpu-training-pipeline")

def gpu_training_pipeline():

train_task = train_deep_learning_model(

dataset_path="gs://data/training",

epochs=50,

batch_size=64

)

GPU resource allocation

kubernetes.add_node_selector(

train_task,

label_key="accelerator",

label_value="nvidia-a100"

)

kubernetes.add_toleration(

train_task,

key="nvidia.com/gpu",

operator="Exists",

effect="NoSchedule"

)

train_task.set_accelerator_type("nvidia.com/gpu")

train_task.set_accelerator_limit(2)

train_task.set_cpu_limit("16")

train_task.set_memory_limit("64Gi")

Secret mount (model registry credentials)

kubernetes.use_secret_as_env(

train_task,

secret_name="model-registry-credentials",

secret_key_to_env={

"username": "REGISTRY_USERNAME",

"password": "REGISTRY_PASSWORD"

}

)

PVC mount (shared data volume)

kubernetes.mount_pvc(

train_task,

pvc_name="shared-data-pvc",

mount_path="/mnt/shared-data"

)

Monitoring and Troubleshooting

Common Issues and Solutions

**Issue 1: Pipeline compiles successfully but fails at runtime**

The most common cause is missing packages in `packages_to_install` that are used inside the component. Since `@dsl.component` decorator Lightweight Python components run in isolated environments, all imports used within the function must be written inside the function body, and required packages must be explicitly declared.

**Issue 2: Caching does not work as expected**

Caching generates cache keys based on a component's input parameters, component code, and base image. If the cache hits even though you changed the code, either the component code was not actually changed, or only parts not included in the cache key were modified. Verify that you recompiled and uploaded the pipeline.

**Issue 3: Pod terminated due to OOM (Out of Memory)**

This frequently occurs in components that process large-scale data. Set `set_memory_limit()` and `set_memory_request()`, with the request set to approximately 80% of the limit. Processing data in chunks or explicitly using `del` to free unnecessary variables also helps.

**Issue 4: Pipeline execution stuck in Pending state**

This is often caused by insufficient resources on Kubernetes nodes. Check events using the `kubectl describe pod` command and review the node autoscaler configuration. For GPU nodes, verify the maximum node pool size.

**Issue 5: Artifacts overwritten inside ParallelFor**

This is a known issue in KFP v2 (GitHub Issue #10186), where artifacts from concurrently executing components within ParallelFor or Sub-DAG can conflict. Work around this by including unique identifiers in artifact paths or generating unique filenames within the component.

Log Inspection and Debugging

View full logs for a pipeline run

kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers

View logs for a specific component

kubectl logs -n kubeflow <pod-name> -c main

Check Argo Workflow status

kubectl get workflows -n kubeflow

kubectl describe workflow <workflow-name> -n kubeflow

Query ML Metadata directly (for debugging)

kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080

Production Operations Checklist

When operating KFP v2 in production, the following items should be verified.

- **Resource Management**: Set CPU/memory requests and limits for all components. For GPU components, specify `tolerations` and `nodeSelector`.

- **Retry Policy**: Set `task.set_retry(num_retries=3, backoff_duration="60s")` to handle transient errors.

- **Timeout**: Use `timeout` settings for long-running pipelines to prevent resource waste.

- **Namespace Separation**: Separate development/staging/production pipelines into distinct namespaces.

- **Artifact Cleanup**: Set up a CronJob to periodically clean up artifacts from old runs. Leverage MinIO/S3 Lifecycle Policies.

- **Monitoring Integration**: Monitor pipeline execution time, success rate, and resource usage with Prometheus + Grafana.

- **Alert Configuration**: Set up Webhooks to send notifications to Slack/PagerDuty when a pipeline fails.

References

1. [Kubeflow Pipelines Official Documentation](https://www.kubeflow.org/docs/components/pipelines/)

2. [KFP SDK v2 API Reference](https://kubeflow-pipelines.readthedocs.io/)

3. [Kubeflow Pipelines GitHub Repository](https://github.com/kubeflow/pipelines)

4. [KFP v2 Caching Guide](https://www.kubeflow.org/docs/components/pipelines/caching-v2/)

5. [KFP v2 Artifact Management](https://www.kubeflow.org/docs/components/pipelines/user-guides/data-handling/artifacts/)

6. [KFP v1 to v2 Migration Guide](https://www.kubeflow.org/docs/components/pipelines/user-guides/migration/)

7. [KFP Python SDK DeepWiki](https://deepwiki.com/kubeflow/pipelines/2-kfp-python-sdk)

8. [Migrating to Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/migrate-kfp)

Quiz

Q1: What is the main topic covered in "Kubeflow Pipelines v2 ML Workflow Automation and

Operations Guide"?

From KFP v2 architecture to building ML pipelines with the KFP SDK, caching, artifact management,

CI/CD integration, and production troubleshooting.

Core Components The KFP v2 architecture consists of the following core layers. The biggest change

in KFP v2 is the introduction of IR YAML. In v1, pipelines were compiled directly into Argo

Workflow YAML, creating tight coupling with Argo.

Basic Components and Pipelines In KFP v2, pipelines are defined using the @dsl.component and

@dsl.pipeline decorators. A component is the smallest execution unit in a pipeline, and each

component runs in an isolated container. Pipeline Composition and Conditional Execution

KFP v2 supports three component types, each suited for different scenarios. Container Component

Example

Artifact System KFP v2's artifact system tracks all inputs and outputs based on ML Metadata

(MLMD). The key artifact types are as follows. Caching Strategies KFP v2 supports automatic

caching at the component level.

현재 단락 (1/416)

Developing an ML model and operating it reliably in production are entirely different problems. Manu...

작성 글자: 0원문 글자: 20,224작성 단락: 0/416