Skip to content
Published on

Kubeflow Pipelines v2 ML Workflow Automation and Operations Guide

Authors
  • Name
    Twitter
Kubeflow Pipelines v2 ML Workflow

Overview

Developing an ML model and operating it reliably in production are entirely different problems. Manually managing a pipeline that spans data preprocessing, training, evaluation, and deployment leads to recurring issues such as lack of reproducibility, failed experiment tracking, and deployment delays. Kubeflow Pipelines (KFP) v2 is a framework that declaratively defines and automates ML workflows on Kubernetes, allowing you to build complex ML pipelines using just Python decorators.

KFP v2 is a major improvement over v1. Pipeline compilation results are now abstracted into IR (Intermediate Representation) YAML instead of Argo Workflow YAML, supporting various execution backends. The artifact system has been strengthened, and type safety has been improved. This article covers KFP v2's architecture, SDK usage, caching strategies, CI/CD integration, and production troubleshooting from a practical perspective.

KFP v2 Architecture

Core Components

The KFP v2 architecture consists of the following core layers.

ComponentRoleTech Stack
KFP SDKPipeline/component definition, compilationPython (kfp package)
IR CompilerConverts Python DSL to IR YAMLProtocol Buffers based
KFP BackendPipeline execution management, API serverGo, gRPC/REST
Workflow EngineActual workflow orchestrationArgo Workflows / Tekton
Metadata StoreExecution metadata, artifact trackingML Metadata (MLMD)
Artifact StoreStores models, datasets, and other artifactsMinIO / GCS / S3
UI DashboardPipeline visualization, execution monitoringReact-based web UI

The biggest change in KFP v2 is the introduction of IR YAML. In v1, pipelines were compiled directly into Argo Workflow YAML, creating tight coupling with Argo. In v2, pipelines are first compiled into an intermediate representation called IR, which is then interpreted and executed by each backend driver. This means the same pipeline definition can run not only on a Kubeflow cluster but also on Google Vertex AI Pipelines.

Installation and Cluster Setup

# KFP SDK v2 installation
pip install kfp==2.7.0

# Kubernetes extension library (GPU, Volume, and other K8s-specific features)
pip install kfp-kubernetes==1.2.0

# Kubeflow Pipelines backend deployment (on Kubernetes cluster)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"

# Access UI via port forwarding
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

KFP SDK v2 Pipeline Definition

Basic Components and Pipelines

In KFP v2, pipelines are defined using the @dsl.component and @dsl.pipeline decorators. A component is the smallest execution unit in a pipeline, and each component runs in an isolated container.

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]
)
def preprocess_data(
    raw_data_path: str,
    test_size: float,
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    data_stats: Output[Metrics]
):
    """Load data and split into train/test sets."""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(raw_data_path)

    # Handle missing values
    df = df.dropna(subset=["target"])
    df = df.fillna(df.median(numeric_only=True))

    train_df, test_df = train_test_split(
        df, test_size=test_size, random_state=42, stratify=df["target"]
    )

    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)

    # Log metrics
    data_stats.log_metric("total_rows", len(df))
    data_stats.log_metric("train_rows", len(train_df))
    data_stats.log_metric("test_rows", len(test_df))
    data_stats.log_metric("feature_count", len(df.columns) - 1)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def train_model(
    train_dataset: Input[Dataset],
    n_estimators: int,
    max_depth: int,
    trained_model: Output[Model],
    training_metrics: Output[Metrics]
):
    """Train a Random Forest model."""
    import pandas as pd
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, f1_score

    train_df = pd.read_csv(train_dataset.path)
    X_train = train_df.drop(columns=["target"])
    y_train = train_df["target"]

    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
        n_jobs=-1
    )
    model.fit(X_train, y_train)

    # Log training accuracy
    train_pred = model.predict(X_train)
    training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))
    training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))

    # Save model
    joblib.dump(model, trained_model.path)
    trained_model.metadata["framework"] = "sklearn"
    trained_model.metadata["model_type"] = "RandomForestClassifier"


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def evaluate_model(
    test_dataset: Input[Dataset],
    trained_model: Input[Model],
    eval_metrics: Output[Metrics],
    accuracy_threshold: float = 0.85
) -> bool:
    """Evaluate the model on test data and check if it passes the threshold."""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score, classification_report

    test_df = pd.read_csv(test_dataset.path)
    X_test = test_df.drop(columns=["target"])
    y_test = test_df["target"]

    model = joblib.load(trained_model.path)
    predictions = model.predict(X_test)

    accuracy = accuracy_score(y_test, predictions)
    f1 = f1_score(y_test, predictions, average="weighted")

    eval_metrics.log_metric("test_accuracy", accuracy)
    eval_metrics.log_metric("test_f1", f1)
    eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)

    return accuracy >= accuracy_threshold

Pipeline Composition and Conditional Execution

@dsl.pipeline(
    name="ml-training-pipeline",
    description="Data preprocessing -> Training -> Evaluation -> Conditional deployment pipeline"
)
def ml_training_pipeline(
    raw_data_path: str = "gs://my-bucket/data/raw.csv",
    test_size: float = 0.2,
    n_estimators: int = 100,
    max_depth: int = 10,
    accuracy_threshold: float = 0.85
):
    # Step 1: Data preprocessing
    preprocess_task = preprocess_data(
        raw_data_path=raw_data_path,
        test_size=test_size
    )
    preprocess_task.set_display_name("Data Preprocessing")

    # Step 2: Model training
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=n_estimators,
        max_depth=max_depth
    )
    train_task.set_display_name("Model Training")
    train_task.set_cpu_limit("4")
    train_task.set_memory_limit("8Gi")

    # Step 3: Model evaluation
    eval_task = evaluate_model(
        test_dataset=preprocess_task.outputs["test_dataset"],
        trained_model=train_task.outputs["trained_model"],
        accuracy_threshold=accuracy_threshold
    )
    eval_task.set_display_name("Model Evaluation")

    # Step 4: Conditional deployment (if accuracy threshold is met)
    with dsl.If(eval_task.output == True):
        deploy_task = deploy_model(
            model=train_task.outputs["trained_model"],
            model_name="fraud-detector",
            serving_endpoint="https://serving.example.com"
        )
        deploy_task.set_display_name("Model Deployment")


# Compile the pipeline
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=ml_training_pipeline,
    package_path="ml_training_pipeline.yaml"
)

Component Types and Usage

KFP v2 supports three component types, each suited for different scenarios.

Component TypeDefinition MethodBest ForProsCons
Lightweight Python@dsl.component decoratorPure Python logic, rapid prototypingCode and definition in one place, fast iterationCannot reference external files, imports must be inside function
Container@dsl.container_componentUsing existing Docker images, non-Python tasksLanguage agnostic, reuse existing imagesLimited artifact type support
Importerdsl.importer()Bringing external artifacts into pipelineTrack existing artifacts within pipelineNo data movement, only metadata registration

Container Component Example

@dsl.container_component
def run_spark_job(
    input_data: Input[Dataset],
    output_data: Output[Dataset],
    spark_config: str
):
    """Run a Spark job as a container."""
    return dsl.ContainerSpec(
        image="my-registry/spark-processor:3.5",
        command=["spark-submit"],
        args=[
            "--master", "k8s://https://kubernetes.default.svc",
            "--conf", spark_config,
            "--input", input_data.path,
            "--output", output_data.path,
            "/app/etl_job.py"
        ]
    )


# Using Importer: bring an external model into the pipeline
@dsl.pipeline(name="model-comparison-pipeline")
def comparison_pipeline():
    existing_model = dsl.importer(
        artifact_uri="gs://models/production/v2.1/model.pkl",
        artifact_class=Model,
        reimport=False,
        metadata={"version": "2.1", "framework": "sklearn"}
    )

    # Compare existing model with new model
    compare_task = compare_models(
        baseline_model=existing_model.output,
        candidate_model=train_task.outputs["trained_model"]
    )

Artifact Management and Caching Strategies

Artifact System

KFP v2's artifact system tracks all inputs and outputs based on ML Metadata (MLMD). The key artifact types are as follows.

Artifact TypePurposeExamples
DatasetDatasetsCSV, Parquet files
ModelTrained modelspickle, ONNX, SavedModel
MetricsNumeric metricsaccuracy, loss, f1-score
ClassificationMetricsClassification metricsconfusion matrix, ROC curve
HTMLHTML reportsVisualization reports
MarkdownMarkdown reportsText-based reports
ArtifactGeneral artifactsOther files

Caching Strategies

KFP v2 supports automatic caching at the component level. You can significantly reduce execution time by reusing results from components that were run with identical inputs and code.

@dsl.pipeline(name="caching-example")
def caching_pipeline(data_path: str, retrain: bool = False):
    # Enable caching for data preprocessing (reuse if same data)
    preprocess_task = preprocess_data(raw_data_path=data_path)
    preprocess_task.set_caching_options(True)

    # Control caching for training based on retrain flag
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=200,
        max_depth=15
    )
    if retrain:
        train_task.set_caching_options(False)  # Force retraining

    # Disable caching for external API call components
    deploy_task = deploy_model(model=train_task.outputs["trained_model"])
    deploy_task.set_caching_options(False)  # Always execute fresh

There are important considerations when operating with caching. First, caching is only meaningful when component code is a pure function (same input produces same output). Second, the KFP v2 SDK does not support cache expiration time settings, so components dealing with time-sensitive data should have caching disabled. Third, setting the environment variable KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULT to true disables caching by default for all pipelines.

CI/CD Integration

Automated Pipeline Deployment with GitHub Actions

# .github/workflows/kfp-deploy.yaml
name: KFP Pipeline CI/CD

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
      - 'components/**'

env:
  KFP_HOST: ${{ secrets.KFP_HOST }}
  KFP_NAMESPACE: kubeflow

jobs:
  validate-and-compile:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          pip install kfp==2.7.0 kfp-kubernetes==1.2.0
          pip install pytest

      - name: Lint pipeline code
        run: |
          pip install ruff
          ruff check pipelines/ components/

      - name: Run unit tests
        run: pytest tests/unit/ -v

      - name: Compile pipeline
        run: |
          python -c "
          from pipelines.training_pipeline import ml_training_pipeline
          from kfp import compiler
          compiler.Compiler().compile(
              pipeline_func=ml_training_pipeline,
              package_path='compiled_pipeline.yaml'
          )
          print('Pipeline compiled successfully')
          "

      - name: Upload compiled pipeline
        uses: actions/upload-artifact@v4
        with:
          name: compiled-pipeline
          path: compiled_pipeline.yaml

  deploy-pipeline:
    needs: validate-and-compile
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4

      - name: Download compiled pipeline
        uses: actions/download-artifact@v4
        with:
          name: compiled-pipeline

      - name: Deploy to KFP
        run: |
          pip install kfp==2.7.0
          python -c "
          from kfp.client import Client

          client = Client(host='${KFP_HOST}')

          # Upload or update pipeline
          try:
              pipeline = client.upload_pipeline(
                  pipeline_package_path='compiled_pipeline.yaml',
                  pipeline_name='ml-training-pipeline',
                  description='Automated ML training pipeline'
              )
              print(f'Pipeline uploaded: {pipeline.pipeline_id}')
          except Exception:
              # Upload as new version if already exists
              pipeline = client.upload_pipeline_version(
                  pipeline_package_path='compiled_pipeline.yaml',
                  pipeline_name='ml-training-pipeline',
                  pipeline_version_name='v${GITHUB_SHA::7}'
              )
              print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')
          "

Programmatic Pipeline Execution

from kfp.client import Client

def trigger_pipeline_run(
    host: str,
    pipeline_name: str,
    experiment_name: str,
    params: dict
) -> str:
    """Trigger a pipeline run programmatically."""
    client = Client(host=host)

    # Create or retrieve experiment
    experiment = client.create_experiment(
        name=experiment_name,
        namespace="kubeflow"
    )

    # Retrieve pipeline
    pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')
    if not pipelines.pipelines:
        raise ValueError(f"Pipeline '{pipeline_name}' not found")

    pipeline_id = pipelines.pipelines[0].pipeline_id

    # Create run
    run = client.run_pipeline(
        experiment_id=experiment.experiment_id,
        job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",
        pipeline_id=pipeline_id,
        params=params
    )

    print(f"Run created: {run.run_id}")
    print(f"Monitor at: {host}/#/runs/details/{run.run_id}")
    return run.run_id


# Set up recurring schedule
def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):
    """Create a schedule to run the pipeline daily at midnight."""
    recurring_run = client.create_recurring_run(
        experiment_id=experiment_id,
        job_name="daily-training",
        pipeline_id=pipeline_id,
        params={"raw_data_path": "gs://data/daily/latest.csv"},
        cron_expression="0 0 * * *",
        max_concurrency=1,
        enabled=True
    )
    return recurring_run

ML Workflow Orchestration Tool Comparison

CriteriaKFP v2Apache AirflowPrefectVertex AI Pipelines
Primary UseML pipeline dedicatedGeneral data workflowsGeneral workflowsManaged ML pipelines
InfrastructureKubernetes requiredStandalone possibleStandalone / CloudGoogle Cloud managed
Pipeline DefinitionPython decoratorsPython (DAG classes)Python decoratorsKFP SDK (same)
Artifact TrackingML Metadata (built-in)XCom (limited)External integration neededVertex ML Metadata
Experiment ManagementBuilt-inNot supported (MLflow integration)Not supportedBuilt-in
CachingComponent-level automaticTask-level manualTask-level built-inComponent-level automatic
GPU SupportKubernetes nativeK8s Executor requiredKubernetes integrationAutomatic
UIPipeline visualization built-inWeb UI built-inPrefect Cloud UIGoogle Cloud Console
ScalabilityKubernetes scalingCelery/K8s ExecutorDask/Ray integrationAuto scaling
Learning CurveHigh (K8s knowledge required)MediumLowMedium (GCP dependency)
CostSelf-managed infrastructureSelf-managed infrastructurePrefect Cloud paidUsage-based billing

Selection Criteria Summary: If you need a Kubernetes-based ML-dedicated pipeline, choose KFP v2. To integrate data engineering and ML, choose Airflow. To get started quickly, choose Prefect. If you are all-in on Google Cloud, Vertex AI Pipelines is the best fit.

Kubernetes Extension Features

KFP v2 supports Kubernetes-specific features through the kfp-kubernetes extension library.

from kfp import dsl
from kfp import kubernetes

@dsl.pipeline(name="gpu-training-pipeline")
def gpu_training_pipeline():
    train_task = train_deep_learning_model(
        dataset_path="gs://data/training",
        epochs=50,
        batch_size=64
    )

    # GPU resource allocation
    kubernetes.add_node_selector(
        train_task,
        label_key="accelerator",
        label_value="nvidia-a100"
    )
    kubernetes.add_toleration(
        train_task,
        key="nvidia.com/gpu",
        operator="Exists",
        effect="NoSchedule"
    )
    train_task.set_accelerator_type("nvidia.com/gpu")
    train_task.set_accelerator_limit(2)
    train_task.set_cpu_limit("16")
    train_task.set_memory_limit("64Gi")

    # Secret mount (model registry credentials)
    kubernetes.use_secret_as_env(
        train_task,
        secret_name="model-registry-credentials",
        secret_key_to_env={
            "username": "REGISTRY_USERNAME",
            "password": "REGISTRY_PASSWORD"
        }
    )

    # PVC mount (shared data volume)
    kubernetes.mount_pvc(
        train_task,
        pvc_name="shared-data-pvc",
        mount_path="/mnt/shared-data"
    )

Monitoring and Troubleshooting

Common Issues and Solutions

Issue 1: Pipeline compiles successfully but fails at runtime

The most common cause is missing packages in packages_to_install that are used inside the component. Since @dsl.component decorator Lightweight Python components run in isolated environments, all imports used within the function must be written inside the function body, and required packages must be explicitly declared.

Issue 2: Caching does not work as expected

Caching generates cache keys based on a component's input parameters, component code, and base image. If the cache hits even though you changed the code, either the component code was not actually changed, or only parts not included in the cache key were modified. Verify that you recompiled and uploaded the pipeline.

Issue 3: Pod terminated due to OOM (Out of Memory)

This frequently occurs in components that process large-scale data. Set set_memory_limit() and set_memory_request(), with the request set to approximately 80% of the limit. Processing data in chunks or explicitly using del to free unnecessary variables also helps.

Issue 4: Pipeline execution stuck in Pending state

This is often caused by insufficient resources on Kubernetes nodes. Check events using the kubectl describe pod command and review the node autoscaler configuration. For GPU nodes, verify the maximum node pool size.

Issue 5: Artifacts overwritten inside ParallelFor

This is a known issue in KFP v2 (GitHub Issue #10186), where artifacts from concurrently executing components within ParallelFor or Sub-DAG can conflict. Work around this by including unique identifiers in artifact paths or generating unique filenames within the component.

Log Inspection and Debugging

# View full logs for a pipeline run
kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers

# View logs for a specific component
kubectl logs -n kubeflow <pod-name> -c main

# Check Argo Workflow status
kubectl get workflows -n kubeflow
kubectl describe workflow <workflow-name> -n kubeflow

# Query ML Metadata directly (for debugging)
kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080

Production Operations Checklist

When operating KFP v2 in production, the following items should be verified.

  • Resource Management: Set CPU/memory requests and limits for all components. For GPU components, specify tolerations and nodeSelector.
  • Retry Policy: Set task.set_retry(num_retries=3, backoff_duration="60s") to handle transient errors.
  • Timeout: Use timeout settings for long-running pipelines to prevent resource waste.
  • Namespace Separation: Separate development/staging/production pipelines into distinct namespaces.
  • Artifact Cleanup: Set up a CronJob to periodically clean up artifacts from old runs. Leverage MinIO/S3 Lifecycle Policies.
  • Monitoring Integration: Monitor pipeline execution time, success rate, and resource usage with Prometheus + Grafana.
  • Alert Configuration: Set up Webhooks to send notifications to Slack/PagerDuty when a pipeline fails.

References

  1. Kubeflow Pipelines Official Documentation
  2. KFP SDK v2 API Reference
  3. Kubeflow Pipelines GitHub Repository
  4. KFP v2 Caching Guide
  5. KFP v2 Artifact Management
  6. KFP v1 to v2 Migration Guide
  7. KFP Python SDK DeepWiki
  8. Migrating to Vertex AI Pipelines