Skip to content
Published on

Kubeflow Pipelines v2 Practical Guide — Building ML Pipelines with KFP SDK

Authors
  • Name
    Twitter
Kubeflow Pipelines v2

Introduction

When moving ML models from experimentation to production, reproducibility, automation, and version management are essential. Kubeflow Pipelines (KFP) v2 is a framework for defining and running ML workflows on Kubernetes, allowing you to compose pipelines using nothing but Python decorators.

This article covers the core features of the KFP v2 SDK and hands-on pipeline construction.

KFP v2 Installation and Core Concepts

Installation

pip install kfp==2.7.0

# Install Kubeflow Pipelines backend (Kubernetes)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=2.2.0"

# Port forwarding
kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80

Core Concepts

# 1. Component: A unit of work in the pipeline (Python function)
# 2. Pipeline: A DAG (Directed Acyclic Graph) of Components
# 3. Artifact: Input/output data (Dataset, Model, Metrics, etc.)
# 4. Run: A single execution of a pipeline
# 5. Experiment: A logical group of Runs

Defining Components

Lightweight Python Component

from kfp import dsl
from kfp.dsl import (
    Dataset, Input, Output, Model, Metrics,
    ClassificationMetrics, component
)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]
)
def load_data(
    dataset_url: str,
    output_dataset: Output[Dataset]
):
    """Data loading component"""
    import pandas as pd

    df = pd.read_csv(dataset_url)
    print(f"Loaded {len(df)} rows")

    # Save to output artifact
    df.to_csv(output_dataset.path, index=False)
    output_dataset.metadata["num_rows"] = len(df)
    output_dataset.metadata["num_columns"] = len(df.columns)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]
)
def preprocess_data(
    input_dataset: Input[Dataset],
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    test_size: float = 0.2
):
    """Data preprocessing and splitting"""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(input_dataset.path)

    # Preprocessing
    df = df.dropna()
    df = df.drop_duplicates()

    # Splitting
    train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)

    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)

    train_dataset.metadata["num_rows"] = len(train_df)
    test_dataset.metadata["num_rows"] = len(test_df)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=[
        "pandas==2.1.4", "scikit-learn==1.4.0",
        "joblib==1.3.2", "xgboost==2.0.3"
    ]
)
def train_model(
    train_dataset: Input[Dataset],
    model_output: Output[Model],
    metrics_output: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 6,
    learning_rate: float = 0.1
):
    """Model training"""
    import pandas as pd
    import joblib
    from xgboost import XGBClassifier
    from sklearn.model_selection import cross_val_score

    df = pd.read_csv(train_dataset.path)
    X = df.drop("target", axis=1)
    y = df["target"]

    # Training
    model = XGBClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate,
        random_state=42
    )
    model.fit(X, y)

    # Cross-validation
    cv_scores = cross_val_score(model, X, y, cv=5, scoring="accuracy")

    # Save model
    joblib.dump(model, model_output.path)
    model_output.metadata["framework"] = "xgboost"
    model_output.metadata["n_estimators"] = n_estimators

    # Log metrics
    metrics_output.log_metric("cv_accuracy_mean", float(cv_scores.mean()))
    metrics_output.log_metric("cv_accuracy_std", float(cv_scores.std()))
    metrics_output.log_metric("n_estimators", n_estimators)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=[
        "pandas==2.1.4", "scikit-learn==1.4.0",
        "joblib==1.3.2", "xgboost==2.0.3"
    ]
)
def evaluate_model(
    test_dataset: Input[Dataset],
    model_input: Input[Model],
    metrics_output: Output[ClassificationMetrics],
    eval_metrics: Output[Metrics]
) -> float:
    """Model evaluation"""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, classification_report

    df = pd.read_csv(test_dataset.path)
    X = df.drop("target", axis=1)
    y = df["target"]

    model = joblib.load(model_input.path)
    y_pred = model.predict(X)
    y_prob = model.predict_proba(X)

    accuracy = accuracy_score(y, y_pred)

    # Classification metrics (Confusion Matrix visualization)
    metrics_output.log_confusion_matrix(
        categories=["Class 0", "Class 1"],
        matrix=[[int(sum((y == 0) & (y_pred == 0))), int(sum((y == 0) & (y_pred == 1)))],
                [int(sum((y == 1) & (y_pred == 0))), int(sum((y == 1) & (y_pred == 1)))]]
    )

    eval_metrics.log_metric("test_accuracy", accuracy)

    return accuracy

Custom Docker Image Component

@dsl.component(
    base_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime",
    packages_to_install=["transformers==4.37.0", "datasets==2.16.0"]
)
def finetune_llm(
    model_name: str,
    train_dataset: Input[Dataset],
    output_model: Output[Model],
    epochs: int = 3,
    batch_size: int = 8
):
    """LLM fine-tuning (GPU required)"""
    from transformers import AutoModelForSequenceClassification, Trainer
    # ... training code
    pass

Authoring Pipelines

Basic Pipeline

@dsl.pipeline(
    name="ML Training Pipeline",
    description="Data Load -> Preprocess -> Train -> Evaluate Pipeline"
)
def ml_training_pipeline(
    dataset_url: str = "https://example.com/data.csv",
    test_size: float = 0.2,
    n_estimators: int = 100,
    max_depth: int = 6,
    learning_rate: float = 0.1,
    accuracy_threshold: float = 0.85
):
    # Step 1: Load data
    load_task = load_data(dataset_url=dataset_url)

    # Step 2: Preprocess (runs after load_task completes)
    preprocess_task = preprocess_data(
        input_dataset=load_task.outputs["output_dataset"],
        test_size=test_size
    )

    # Step 3: Train model
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate
    )
    # Set resource limits
    train_task.set_cpu_limit("4")
    train_task.set_memory_limit("8Gi")

    # Step 4: Evaluate
    eval_task = evaluate_model(
        test_dataset=preprocess_task.outputs["test_dataset"],
        model_input=train_task.outputs["model_output"]
    )

    # Step 5: Conditional deployment
    with dsl.If(eval_task.output >= accuracy_threshold):
        deploy_task = deploy_model(
            model_input=train_task.outputs["model_output"],
            accuracy=eval_task.output
        )


@dsl.component(base_image="python:3.11-slim")
def deploy_model(
    model_input: Input[Model],
    accuracy: float
):
    """Deploy model (when conditions are met)"""
    print(f"Deploying model with accuracy: {accuracy:.4f}")
    print(f"Model path: {model_input.path}")
    # Actual deployment logic (K8s Serving, BentoML, etc.)

Pipeline Compilation and Execution

from kfp import compiler
from kfp.client import Client

# 1. Compile to YAML
compiler.Compiler().compile(
    pipeline_func=ml_training_pipeline,
    package_path="ml_pipeline.yaml"
)

# 2. Submit to KFP server
client = Client(host="http://localhost:8080")

# Create Experiment
experiment = client.create_experiment(name="ml-experiments")

# Execute Run
run = client.create_run_from_pipeline_func(
    ml_training_pipeline,
    experiment_name="ml-experiments",
    run_name="training-run-001",
    arguments={
        "dataset_url": "gs://my-bucket/data.csv",
        "n_estimators": 200,
        "max_depth": 8,
        "accuracy_threshold": 0.90
    }
)

print(f"Run ID: {run.run_id}")
print(f"Run URL: http://localhost:8080/#/runs/details/{run.run_id}")

Recurring Run

# Run daily at 2 AM
client.create_recurring_run(
    experiment_id=experiment.experiment_id,
    job_name="daily-retraining",
    pipeline_func=ml_training_pipeline,
    cron_expression="0 2 * * *",
    max_concurrency=1,
    arguments={
        "dataset_url": "gs://my-bucket/latest-data.csv",
        "accuracy_threshold": 0.85
    }
)

Advanced Patterns

Parallel Execution (ParallelFor)

@dsl.pipeline(name="Hyperparameter Search")
def hp_search_pipeline():
    # Define hyperparameter combinations
    hp_configs = [
        {"n_estimators": 100, "max_depth": 4, "lr": 0.1},
        {"n_estimators": 200, "max_depth": 6, "lr": 0.05},
        {"n_estimators": 300, "max_depth": 8, "lr": 0.01},
    ]

    # Parallel training
    with dsl.ParallelFor(hp_configs) as config:
        train_task = train_model(
            train_dataset=load_task.outputs["output_dataset"],
            n_estimators=config.n_estimators,
            max_depth=config.max_depth,
            learning_rate=config.lr
        )

Caching

# Disable caching at the component level
load_task = load_data(dataset_url=dataset_url)
load_task.set_caching_options(False)  # Always re-execute

# Configure caching at the pipeline level
run = client.create_run_from_pipeline_func(
    ml_training_pipeline,
    enable_caching=True  # Use cache for identical inputs
)

Volume Mounts

@dsl.component(base_image="python:3.11-slim")
def process_large_data(output_data: Output[Dataset]):
    """Process large datasets"""
    pass

# PVC mount
process_task = process_large_data()
process_task.add_pvolumes({
    "/mnt/data": dsl.PipelineVolume(pvc="data-pvc")
})

CI/CD Integration

GitHub Actions + KFP

# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
      - 'components/**'

jobs:
  deploy-pipeline:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install kfp==2.7.0

      - name: Compile pipeline
        run: python pipelines/compile.py

      - name: Upload and run pipeline
        env:
          KFP_HOST: ${{ secrets.KFP_HOST }}
        run: |
          python -c "
          from kfp.client import Client
          client = Client(host='$KFP_HOST')
          client.upload_pipeline(
            pipeline_package_path='ml_pipeline.yaml',
            pipeline_name='ml-training-v2',
            description='Automated ML training pipeline'
          )
          "

Conclusion

Kubeflow Pipelines v2 key takeaways:

  1. @dsl.component: Converts Python functions into containerized components
  2. @dsl.pipeline: Connects components into a DAG
  3. Artifact System: Manages inputs/outputs with typed artifacts like Dataset, Model, and Metrics
  4. Conditionals/Loops: Dynamic pipelines with dsl.If and dsl.ParallelFor
  5. Caching: Cost reduction by skipping re-execution for identical inputs

Quiz (6 Questions)

Q1. What is the decorator used to define a component in KFP v2? @dsl.component

Q2. What is the difference between Output[Dataset] and Output[Model]? They are type hints that distinguish artifact types. Dataset is for data artifacts, and Model is for trained model artifacts.

Q3. How do you implement conditional execution in a pipeline? Use the dsl.If context manager (e.g., with dsl.If(accuracy >= threshold))

Q4. What happens when you run with identical inputs while caching is enabled? The component is skipped and the previous execution results are reused.

Q5. What is ParallelFor used for? Running the same component in parallel with different parameters (e.g., hyperparameter search)

Q6. What is the biggest change when migrating from KFP v1 to v2? Using @dsl.component decorator instead of ContainerOp, and the introduction of the Artifact type system.