- Authors
- Name
- Introduction
- KFP v2 Installation and Core Concepts
- Defining Components
- Authoring Pipelines
- Advanced Patterns
- CI/CD Integration
- Conclusion

Introduction
When moving ML models from experimentation to production, reproducibility, automation, and version management are essential. Kubeflow Pipelines (KFP) v2 is a framework for defining and running ML workflows on Kubernetes, allowing you to compose pipelines using nothing but Python decorators.
This article covers the core features of the KFP v2 SDK and hands-on pipeline construction.
KFP v2 Installation and Core Concepts
Installation
pip install kfp==2.7.0
# Install Kubeflow Pipelines backend (Kubernetes)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=2.2.0"
# Port forwarding
kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80
Core Concepts
# 1. Component: A unit of work in the pipeline (Python function)
# 2. Pipeline: A DAG (Directed Acyclic Graph) of Components
# 3. Artifact: Input/output data (Dataset, Model, Metrics, etc.)
# 4. Run: A single execution of a pipeline
# 5. Experiment: A logical group of Runs
Defining Components
Lightweight Python Component
from kfp import dsl
from kfp.dsl import (
Dataset, Input, Output, Model, Metrics,
ClassificationMetrics, component
)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]
)
def load_data(
dataset_url: str,
output_dataset: Output[Dataset]
):
"""Data loading component"""
import pandas as pd
df = pd.read_csv(dataset_url)
print(f"Loaded {len(df)} rows")
# Save to output artifact
df.to_csv(output_dataset.path, index=False)
output_dataset.metadata["num_rows"] = len(df)
output_dataset.metadata["num_columns"] = len(df.columns)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]
)
def preprocess_data(
input_dataset: Input[Dataset],
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
test_size: float = 0.2
):
"""Data preprocessing and splitting"""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(input_dataset.path)
# Preprocessing
df = df.dropna()
df = df.drop_duplicates()
# Splitting
train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)
train_dataset.metadata["num_rows"] = len(train_df)
test_dataset.metadata["num_rows"] = len(test_df)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=[
"pandas==2.1.4", "scikit-learn==1.4.0",
"joblib==1.3.2", "xgboost==2.0.3"
]
)
def train_model(
train_dataset: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 6,
learning_rate: float = 0.1
):
"""Model training"""
import pandas as pd
import joblib
from xgboost import XGBClassifier
from sklearn.model_selection import cross_val_score
df = pd.read_csv(train_dataset.path)
X = df.drop("target", axis=1)
y = df["target"]
# Training
model = XGBClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate,
random_state=42
)
model.fit(X, y)
# Cross-validation
cv_scores = cross_val_score(model, X, y, cv=5, scoring="accuracy")
# Save model
joblib.dump(model, model_output.path)
model_output.metadata["framework"] = "xgboost"
model_output.metadata["n_estimators"] = n_estimators
# Log metrics
metrics_output.log_metric("cv_accuracy_mean", float(cv_scores.mean()))
metrics_output.log_metric("cv_accuracy_std", float(cv_scores.std()))
metrics_output.log_metric("n_estimators", n_estimators)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=[
"pandas==2.1.4", "scikit-learn==1.4.0",
"joblib==1.3.2", "xgboost==2.0.3"
]
)
def evaluate_model(
test_dataset: Input[Dataset],
model_input: Input[Model],
metrics_output: Output[ClassificationMetrics],
eval_metrics: Output[Metrics]
) -> float:
"""Model evaluation"""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, classification_report
df = pd.read_csv(test_dataset.path)
X = df.drop("target", axis=1)
y = df["target"]
model = joblib.load(model_input.path)
y_pred = model.predict(X)
y_prob = model.predict_proba(X)
accuracy = accuracy_score(y, y_pred)
# Classification metrics (Confusion Matrix visualization)
metrics_output.log_confusion_matrix(
categories=["Class 0", "Class 1"],
matrix=[[int(sum((y == 0) & (y_pred == 0))), int(sum((y == 0) & (y_pred == 1)))],
[int(sum((y == 1) & (y_pred == 0))), int(sum((y == 1) & (y_pred == 1)))]]
)
eval_metrics.log_metric("test_accuracy", accuracy)
return accuracy
Custom Docker Image Component
@dsl.component(
base_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime",
packages_to_install=["transformers==4.37.0", "datasets==2.16.0"]
)
def finetune_llm(
model_name: str,
train_dataset: Input[Dataset],
output_model: Output[Model],
epochs: int = 3,
batch_size: int = 8
):
"""LLM fine-tuning (GPU required)"""
from transformers import AutoModelForSequenceClassification, Trainer
# ... training code
pass
Authoring Pipelines
Basic Pipeline
@dsl.pipeline(
name="ML Training Pipeline",
description="Data Load -> Preprocess -> Train -> Evaluate Pipeline"
)
def ml_training_pipeline(
dataset_url: str = "https://example.com/data.csv",
test_size: float = 0.2,
n_estimators: int = 100,
max_depth: int = 6,
learning_rate: float = 0.1,
accuracy_threshold: float = 0.85
):
# Step 1: Load data
load_task = load_data(dataset_url=dataset_url)
# Step 2: Preprocess (runs after load_task completes)
preprocess_task = preprocess_data(
input_dataset=load_task.outputs["output_dataset"],
test_size=test_size
)
# Step 3: Train model
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=learning_rate
)
# Set resource limits
train_task.set_cpu_limit("4")
train_task.set_memory_limit("8Gi")
# Step 4: Evaluate
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
model_input=train_task.outputs["model_output"]
)
# Step 5: Conditional deployment
with dsl.If(eval_task.output >= accuracy_threshold):
deploy_task = deploy_model(
model_input=train_task.outputs["model_output"],
accuracy=eval_task.output
)
@dsl.component(base_image="python:3.11-slim")
def deploy_model(
model_input: Input[Model],
accuracy: float
):
"""Deploy model (when conditions are met)"""
print(f"Deploying model with accuracy: {accuracy:.4f}")
print(f"Model path: {model_input.path}")
# Actual deployment logic (K8s Serving, BentoML, etc.)
Pipeline Compilation and Execution
from kfp import compiler
from kfp.client import Client
# 1. Compile to YAML
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path="ml_pipeline.yaml"
)
# 2. Submit to KFP server
client = Client(host="http://localhost:8080")
# Create Experiment
experiment = client.create_experiment(name="ml-experiments")
# Execute Run
run = client.create_run_from_pipeline_func(
ml_training_pipeline,
experiment_name="ml-experiments",
run_name="training-run-001",
arguments={
"dataset_url": "gs://my-bucket/data.csv",
"n_estimators": 200,
"max_depth": 8,
"accuracy_threshold": 0.90
}
)
print(f"Run ID: {run.run_id}")
print(f"Run URL: http://localhost:8080/#/runs/details/{run.run_id}")
Recurring Run
# Run daily at 2 AM
client.create_recurring_run(
experiment_id=experiment.experiment_id,
job_name="daily-retraining",
pipeline_func=ml_training_pipeline,
cron_expression="0 2 * * *",
max_concurrency=1,
arguments={
"dataset_url": "gs://my-bucket/latest-data.csv",
"accuracy_threshold": 0.85
}
)
Advanced Patterns
Parallel Execution (ParallelFor)
@dsl.pipeline(name="Hyperparameter Search")
def hp_search_pipeline():
# Define hyperparameter combinations
hp_configs = [
{"n_estimators": 100, "max_depth": 4, "lr": 0.1},
{"n_estimators": 200, "max_depth": 6, "lr": 0.05},
{"n_estimators": 300, "max_depth": 8, "lr": 0.01},
]
# Parallel training
with dsl.ParallelFor(hp_configs) as config:
train_task = train_model(
train_dataset=load_task.outputs["output_dataset"],
n_estimators=config.n_estimators,
max_depth=config.max_depth,
learning_rate=config.lr
)
Caching
# Disable caching at the component level
load_task = load_data(dataset_url=dataset_url)
load_task.set_caching_options(False) # Always re-execute
# Configure caching at the pipeline level
run = client.create_run_from_pipeline_func(
ml_training_pipeline,
enable_caching=True # Use cache for identical inputs
)
Volume Mounts
@dsl.component(base_image="python:3.11-slim")
def process_large_data(output_data: Output[Dataset]):
"""Process large datasets"""
pass
# PVC mount
process_task = process_large_data()
process_task.add_pvolumes({
"/mnt/data": dsl.PipelineVolume(pvc="data-pvc")
})
CI/CD Integration
GitHub Actions + KFP
# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD
on:
push:
branches: [main]
paths:
- 'pipelines/**'
- 'components/**'
jobs:
deploy-pipeline:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install kfp==2.7.0
- name: Compile pipeline
run: python pipelines/compile.py
- name: Upload and run pipeline
env:
KFP_HOST: ${{ secrets.KFP_HOST }}
run: |
python -c "
from kfp.client import Client
client = Client(host='$KFP_HOST')
client.upload_pipeline(
pipeline_package_path='ml_pipeline.yaml',
pipeline_name='ml-training-v2',
description='Automated ML training pipeline'
)
"
Conclusion
Kubeflow Pipelines v2 key takeaways:
- @dsl.component: Converts Python functions into containerized components
- @dsl.pipeline: Connects components into a DAG
- Artifact System: Manages inputs/outputs with typed artifacts like Dataset, Model, and Metrics
- Conditionals/Loops: Dynamic pipelines with dsl.If and dsl.ParallelFor
- Caching: Cost reduction by skipping re-execution for identical inputs
Quiz (6 Questions)
Q1. What is the decorator used to define a component in KFP v2? @dsl.component
Q2. What is the difference between Output[Dataset] and Output[Model]? They are type hints that distinguish artifact types. Dataset is for data artifacts, and Model is for trained model artifacts.
Q3. How do you implement conditional execution in a pipeline? Use the dsl.If context manager (e.g., with dsl.If(accuracy >= threshold))
Q4. What happens when you run with identical inputs while caching is enabled? The component is skipped and the previous execution results are reused.
Q5. What is ParallelFor used for? Running the same component in parallel with different parameters (e.g., hyperparameter search)
Q6. What is the biggest change when migrating from KFP v1 to v2? Using @dsl.component decorator instead of ContainerOp, and the introduction of the Artifact type system.