- Authors
- Name
- Overview
- KFP v2 Architecture
- KFP SDK v2 Pipeline Definition
- Component Types and Usage
- Artifact Management and Caching Strategies
- CI/CD Integration
- ML Workflow Orchestration Tool Comparison
- Kubernetes Extension Features
- Monitoring and Troubleshooting
- Production Operations Checklist
- References

Overview
Developing an ML model and operating it reliably in production are entirely different problems. Manually managing a pipeline that spans data preprocessing, training, evaluation, and deployment leads to recurring issues such as lack of reproducibility, failed experiment tracking, and deployment delays. Kubeflow Pipelines (KFP) v2 is a framework that declaratively defines and automates ML workflows on Kubernetes, allowing you to build complex ML pipelines using just Python decorators.
KFP v2 is a major improvement over v1. Pipeline compilation results are now abstracted into IR (Intermediate Representation) YAML instead of Argo Workflow YAML, supporting various execution backends. The artifact system has been strengthened, and type safety has been improved. This article covers KFP v2's architecture, SDK usage, caching strategies, CI/CD integration, and production troubleshooting from a practical perspective.
KFP v2 Architecture
Core Components
The KFP v2 architecture consists of the following core layers.
| Component | Role | Tech Stack |
|---|---|---|
| KFP SDK | Pipeline/component definition, compilation | Python (kfp package) |
| IR Compiler | Converts Python DSL to IR YAML | Protocol Buffers based |
| KFP Backend | Pipeline execution management, API server | Go, gRPC/REST |
| Workflow Engine | Actual workflow orchestration | Argo Workflows / Tekton |
| Metadata Store | Execution metadata, artifact tracking | ML Metadata (MLMD) |
| Artifact Store | Stores models, datasets, and other artifacts | MinIO / GCS / S3 |
| UI Dashboard | Pipeline visualization, execution monitoring | React-based web UI |
The biggest change in KFP v2 is the introduction of IR YAML. In v1, pipelines were compiled directly into Argo Workflow YAML, creating tight coupling with Argo. In v2, pipelines are first compiled into an intermediate representation called IR, which is then interpreted and executed by each backend driver. This means the same pipeline definition can run not only on a Kubeflow cluster but also on Google Vertex AI Pipelines.
Installation and Cluster Setup
# KFP SDK v2 installation
pip install kfp==2.7.0
# Kubernetes extension library (GPU, Volume, and other K8s-specific features)
pip install kfp-kubernetes==1.2.0
# Kubeflow Pipelines backend deployment (on Kubernetes cluster)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
# Access UI via port forwarding
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
KFP SDK v2 Pipeline Definition
Basic Components and Pipelines
In KFP v2, pipelines are defined using the @dsl.component and @dsl.pipeline decorators. A component is the smallest execution unit in a pipeline, and each component runs in an isolated container.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]
)
def preprocess_data(
raw_data_path: str,
test_size: float,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
data_stats: Output[Metrics]
):
"""Load data and split into train/test sets."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# Handle missing values
df = df.dropna(subset=["target"])
df = df.fillna(df.median(numeric_only=True))
train_df, test_df = train_test_split(
df, test_size=test_size, random_state=42, stratify=df["target"]
)
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)
# Log metrics
data_stats.log_metric("total_rows", len(df))
data_stats.log_metric("train_rows", len(train_df))
data_stats.log_metric("test_rows", len(test_df))
data_stats.log_metric("feature_count", len(df.columns) - 1)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def train_model(
train_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
trained_model: Output[Model],
training_metrics: Output[Metrics]
):
"""Train a Random Forest model."""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
train_df = pd.read_csv(train_dataset.path)
X_train = train_df.drop(columns=["target"])
y_train = train_df["target"]
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Log training accuracy
train_pred = model.predict(X_train)
training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))
training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))
# Save model
joblib.dump(model, trained_model.path)
trained_model.metadata["framework"] = "sklearn"
trained_model.metadata["model_type"] = "RandomForestClassifier"
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def evaluate_model(
test_dataset: Input[Dataset],
trained_model: Input[Model],
eval_metrics: Output[Metrics],
accuracy_threshold: float = 0.85
) -> bool:
"""Evaluate the model on test data and check if it passes the threshold."""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score, classification_report
test_df = pd.read_csv(test_dataset.path)
X_test = test_df.drop(columns=["target"])
y_test = test_df["target"]
model = joblib.load(trained_model.path)
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
f1 = f1_score(y_test, predictions, average="weighted")
eval_metrics.log_metric("test_accuracy", accuracy)
eval_metrics.log_metric("test_f1", f1)
eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)
return accuracy >= accuracy_threshold
Pipeline Composition and Conditional Execution
@dsl.pipeline(
name="ml-training-pipeline",
description="Data preprocessing -> Training -> Evaluation -> Conditional deployment pipeline"
)
def ml_training_pipeline(
raw_data_path: str = "gs://my-bucket/data/raw.csv",
test_size: float = 0.2,
n_estimators: int = 100,
max_depth: int = 10,
accuracy_threshold: float = 0.85
):
# Step 1: Data preprocessing
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=test_size
)
preprocess_task.set_display_name("Data Preprocessing")
# Step 2: Model training
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=n_estimators,
max_depth=max_depth
)
train_task.set_display_name("Model Training")
train_task.set_cpu_limit("4")
train_task.set_memory_limit("8Gi")
# Step 3: Model evaluation
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold
)
eval_task.set_display_name("Model Evaluation")
# Step 4: Conditional deployment (if accuracy threshold is met)
with dsl.If(eval_task.output == True):
deploy_task = deploy_model(
model=train_task.outputs["trained_model"],
model_name="fraud-detector",
serving_endpoint="https://serving.example.com"
)
deploy_task.set_display_name("Model Deployment")
# Compile the pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path="ml_training_pipeline.yaml"
)
Component Types and Usage
KFP v2 supports three component types, each suited for different scenarios.
| Component Type | Definition Method | Best For | Pros | Cons |
|---|---|---|---|---|
| Lightweight Python | @dsl.component decorator | Pure Python logic, rapid prototyping | Code and definition in one place, fast iteration | Cannot reference external files, imports must be inside function |
| Container | @dsl.container_component | Using existing Docker images, non-Python tasks | Language agnostic, reuse existing images | Limited artifact type support |
| Importer | dsl.importer() | Bringing external artifacts into pipeline | Track existing artifacts within pipeline | No data movement, only metadata registration |
Container Component Example
@dsl.container_component
def run_spark_job(
input_data: Input[Dataset],
output_data: Output[Dataset],
spark_config: str
):
"""Run a Spark job as a container."""
return dsl.ContainerSpec(
image="my-registry/spark-processor:3.5",
command=["spark-submit"],
args=[
"--master", "k8s://https://kubernetes.default.svc",
"--conf", spark_config,
"--input", input_data.path,
"--output", output_data.path,
"/app/etl_job.py"
]
)
# Using Importer: bring an external model into the pipeline
@dsl.pipeline(name="model-comparison-pipeline")
def comparison_pipeline():
existing_model = dsl.importer(
artifact_uri="gs://models/production/v2.1/model.pkl",
artifact_class=Model,
reimport=False,
metadata={"version": "2.1", "framework": "sklearn"}
)
# Compare existing model with new model
compare_task = compare_models(
baseline_model=existing_model.output,
candidate_model=train_task.outputs["trained_model"]
)
Artifact Management and Caching Strategies
Artifact System
KFP v2's artifact system tracks all inputs and outputs based on ML Metadata (MLMD). The key artifact types are as follows.
| Artifact Type | Purpose | Examples |
|---|---|---|
Dataset | Datasets | CSV, Parquet files |
Model | Trained models | pickle, ONNX, SavedModel |
Metrics | Numeric metrics | accuracy, loss, f1-score |
ClassificationMetrics | Classification metrics | confusion matrix, ROC curve |
HTML | HTML reports | Visualization reports |
Markdown | Markdown reports | Text-based reports |
Artifact | General artifacts | Other files |
Caching Strategies
KFP v2 supports automatic caching at the component level. You can significantly reduce execution time by reusing results from components that were run with identical inputs and code.
@dsl.pipeline(name="caching-example")
def caching_pipeline(data_path: str, retrain: bool = False):
# Enable caching for data preprocessing (reuse if same data)
preprocess_task = preprocess_data(raw_data_path=data_path)
preprocess_task.set_caching_options(True)
# Control caching for training based on retrain flag
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=200,
max_depth=15
)
if retrain:
train_task.set_caching_options(False) # Force retraining
# Disable caching for external API call components
deploy_task = deploy_model(model=train_task.outputs["trained_model"])
deploy_task.set_caching_options(False) # Always execute fresh
There are important considerations when operating with caching. First, caching is only meaningful when component code is a pure function (same input produces same output). Second, the KFP v2 SDK does not support cache expiration time settings, so components dealing with time-sensitive data should have caching disabled. Third, setting the environment variable KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULT to true disables caching by default for all pipelines.
CI/CD Integration
Automated Pipeline Deployment with GitHub Actions
# .github/workflows/kfp-deploy.yaml
name: KFP Pipeline CI/CD
on:
push:
branches: [main]
paths:
- 'pipelines/**'
- 'components/**'
env:
KFP_HOST: ${{ secrets.KFP_HOST }}
KFP_NAMESPACE: kubeflow
jobs:
validate-and-compile:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install kfp==2.7.0 kfp-kubernetes==1.2.0
pip install pytest
- name: Lint pipeline code
run: |
pip install ruff
ruff check pipelines/ components/
- name: Run unit tests
run: pytest tests/unit/ -v
- name: Compile pipeline
run: |
python -c "
from pipelines.training_pipeline import ml_training_pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path='compiled_pipeline.yaml'
)
print('Pipeline compiled successfully')
"
- name: Upload compiled pipeline
uses: actions/upload-artifact@v4
with:
name: compiled-pipeline
path: compiled_pipeline.yaml
deploy-pipeline:
needs: validate-and-compile
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Download compiled pipeline
uses: actions/download-artifact@v4
with:
name: compiled-pipeline
- name: Deploy to KFP
run: |
pip install kfp==2.7.0
python -c "
from kfp.client import Client
client = Client(host='${KFP_HOST}')
# Upload or update pipeline
try:
pipeline = client.upload_pipeline(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
description='Automated ML training pipeline'
)
print(f'Pipeline uploaded: {pipeline.pipeline_id}')
except Exception:
# Upload as new version if already exists
pipeline = client.upload_pipeline_version(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
pipeline_version_name='v${GITHUB_SHA::7}'
)
print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')
"
Programmatic Pipeline Execution
from kfp.client import Client
def trigger_pipeline_run(
host: str,
pipeline_name: str,
experiment_name: str,
params: dict
) -> str:
"""Trigger a pipeline run programmatically."""
client = Client(host=host)
# Create or retrieve experiment
experiment = client.create_experiment(
name=experiment_name,
namespace="kubeflow"
)
# Retrieve pipeline
pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')
if not pipelines.pipelines:
raise ValueError(f"Pipeline '{pipeline_name}' not found")
pipeline_id = pipelines.pipelines[0].pipeline_id
# Create run
run = client.run_pipeline(
experiment_id=experiment.experiment_id,
job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",
pipeline_id=pipeline_id,
params=params
)
print(f"Run created: {run.run_id}")
print(f"Monitor at: {host}/#/runs/details/{run.run_id}")
return run.run_id
# Set up recurring schedule
def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):
"""Create a schedule to run the pipeline daily at midnight."""
recurring_run = client.create_recurring_run(
experiment_id=experiment_id,
job_name="daily-training",
pipeline_id=pipeline_id,
params={"raw_data_path": "gs://data/daily/latest.csv"},
cron_expression="0 0 * * *",
max_concurrency=1,
enabled=True
)
return recurring_run
ML Workflow Orchestration Tool Comparison
| Criteria | KFP v2 | Apache Airflow | Prefect | Vertex AI Pipelines |
|---|---|---|---|---|
| Primary Use | ML pipeline dedicated | General data workflows | General workflows | Managed ML pipelines |
| Infrastructure | Kubernetes required | Standalone possible | Standalone / Cloud | Google Cloud managed |
| Pipeline Definition | Python decorators | Python (DAG classes) | Python decorators | KFP SDK (same) |
| Artifact Tracking | ML Metadata (built-in) | XCom (limited) | External integration needed | Vertex ML Metadata |
| Experiment Management | Built-in | Not supported (MLflow integration) | Not supported | Built-in |
| Caching | Component-level automatic | Task-level manual | Task-level built-in | Component-level automatic |
| GPU Support | Kubernetes native | K8s Executor required | Kubernetes integration | Automatic |
| UI | Pipeline visualization built-in | Web UI built-in | Prefect Cloud UI | Google Cloud Console |
| Scalability | Kubernetes scaling | Celery/K8s Executor | Dask/Ray integration | Auto scaling |
| Learning Curve | High (K8s knowledge required) | Medium | Low | Medium (GCP dependency) |
| Cost | Self-managed infrastructure | Self-managed infrastructure | Prefect Cloud paid | Usage-based billing |
Selection Criteria Summary: If you need a Kubernetes-based ML-dedicated pipeline, choose KFP v2. To integrate data engineering and ML, choose Airflow. To get started quickly, choose Prefect. If you are all-in on Google Cloud, Vertex AI Pipelines is the best fit.
Kubernetes Extension Features
KFP v2 supports Kubernetes-specific features through the kfp-kubernetes extension library.
from kfp import dsl
from kfp import kubernetes
@dsl.pipeline(name="gpu-training-pipeline")
def gpu_training_pipeline():
train_task = train_deep_learning_model(
dataset_path="gs://data/training",
epochs=50,
batch_size=64
)
# GPU resource allocation
kubernetes.add_node_selector(
train_task,
label_key="accelerator",
label_value="nvidia-a100"
)
kubernetes.add_toleration(
train_task,
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule"
)
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(2)
train_task.set_cpu_limit("16")
train_task.set_memory_limit("64Gi")
# Secret mount (model registry credentials)
kubernetes.use_secret_as_env(
train_task,
secret_name="model-registry-credentials",
secret_key_to_env={
"username": "REGISTRY_USERNAME",
"password": "REGISTRY_PASSWORD"
}
)
# PVC mount (shared data volume)
kubernetes.mount_pvc(
train_task,
pvc_name="shared-data-pvc",
mount_path="/mnt/shared-data"
)
Monitoring and Troubleshooting
Common Issues and Solutions
Issue 1: Pipeline compiles successfully but fails at runtime
The most common cause is missing packages in packages_to_install that are used inside the component. Since @dsl.component decorator Lightweight Python components run in isolated environments, all imports used within the function must be written inside the function body, and required packages must be explicitly declared.
Issue 2: Caching does not work as expected
Caching generates cache keys based on a component's input parameters, component code, and base image. If the cache hits even though you changed the code, either the component code was not actually changed, or only parts not included in the cache key were modified. Verify that you recompiled and uploaded the pipeline.
Issue 3: Pod terminated due to OOM (Out of Memory)
This frequently occurs in components that process large-scale data. Set set_memory_limit() and set_memory_request(), with the request set to approximately 80% of the limit. Processing data in chunks or explicitly using del to free unnecessary variables also helps.
Issue 4: Pipeline execution stuck in Pending state
This is often caused by insufficient resources on Kubernetes nodes. Check events using the kubectl describe pod command and review the node autoscaler configuration. For GPU nodes, verify the maximum node pool size.
Issue 5: Artifacts overwritten inside ParallelFor
This is a known issue in KFP v2 (GitHub Issue #10186), where artifacts from concurrently executing components within ParallelFor or Sub-DAG can conflict. Work around this by including unique identifiers in artifact paths or generating unique filenames within the component.
Log Inspection and Debugging
# View full logs for a pipeline run
kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers
# View logs for a specific component
kubectl logs -n kubeflow <pod-name> -c main
# Check Argo Workflow status
kubectl get workflows -n kubeflow
kubectl describe workflow <workflow-name> -n kubeflow
# Query ML Metadata directly (for debugging)
kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080
Production Operations Checklist
When operating KFP v2 in production, the following items should be verified.
- Resource Management: Set CPU/memory requests and limits for all components. For GPU components, specify
tolerationsandnodeSelector. - Retry Policy: Set
task.set_retry(num_retries=3, backoff_duration="60s")to handle transient errors. - Timeout: Use
timeoutsettings for long-running pipelines to prevent resource waste. - Namespace Separation: Separate development/staging/production pipelines into distinct namespaces.
- Artifact Cleanup: Set up a CronJob to periodically clean up artifacts from old runs. Leverage MinIO/S3 Lifecycle Policies.
- Monitoring Integration: Monitor pipeline execution time, success rate, and resource usage with Prometheus + Grafana.
- Alert Configuration: Set up Webhooks to send notifications to Slack/PagerDuty when a pipeline fails.