Skip to content

필사 모드: Kubeflow Pipelines v2 Practical Guide — Building ML Pipelines with KFP SDK

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Introduction

When moving ML models from experimentation to production, **reproducibility, automation, and version management** are essential. **Kubeflow Pipelines (KFP) v2** is a framework for defining and running ML workflows on Kubernetes, allowing you to compose pipelines using nothing but Python decorators.

This article covers the core features of the KFP v2 SDK and hands-on pipeline construction.

KFP v2 Installation and Core Concepts

Installation

pip install kfp==2.7.0

Install Kubeflow Pipelines backend (Kubernetes)

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=2.2.0"

Port forwarding

kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80

Core Concepts

1. Component: A unit of work in the pipeline (Python function)

2. Pipeline: A DAG (Directed Acyclic Graph) of Components

3. Artifact: Input/output data (Dataset, Model, Metrics, etc.)

4. Run: A single execution of a pipeline

5. Experiment: A logical group of Runs

Defining Components

Lightweight Python Component

from kfp import dsl

from kfp.dsl import (

Dataset, Input, Output, Model, Metrics,

ClassificationMetrics, component

)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]

)

def load_data(

dataset_url: str,

output_dataset: Output[Dataset]

):

"""Data loading component"""

df = pd.read_csv(dataset_url)

print(f"Loaded {len(df)} rows")

Save to output artifact

df.to_csv(output_dataset.path, index=False)

output_dataset.metadata["num_rows"] = len(df)

output_dataset.metadata["num_columns"] = len(df.columns)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]

)

def preprocess_data(

input_dataset: Input[Dataset],

train_dataset: Output[Dataset],

test_dataset: Output[Dataset],

test_size: float = 0.2

):

"""Data preprocessing and splitting"""

from sklearn.model_selection import train_test_split

df = pd.read_csv(input_dataset.path)

Preprocessing

df = df.dropna()

df = df.drop_duplicates()

Splitting

train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)

train_df.to_csv(train_dataset.path, index=False)

test_df.to_csv(test_dataset.path, index=False)

train_dataset.metadata["num_rows"] = len(train_df)

test_dataset.metadata["num_rows"] = len(test_df)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=[

"pandas==2.1.4", "scikit-learn==1.4.0",

"joblib==1.3.2", "xgboost==2.0.3"

]

)

def train_model(

train_dataset: Input[Dataset],

model_output: Output[Model],

metrics_output: Output[Metrics],

n_estimators: int = 100,

max_depth: int = 6,

learning_rate: float = 0.1

):

"""Model training"""

from xgboost import XGBClassifier

from sklearn.model_selection import cross_val_score

df = pd.read_csv(train_dataset.path)

X = df.drop("target", axis=1)

y = df["target"]

Training

model = XGBClassifier(

n_estimators=n_estimators,

max_depth=max_depth,

learning_rate=learning_rate,

random_state=42

)

model.fit(X, y)

Cross-validation

cv_scores = cross_val_score(model, X, y, cv=5, scoring="accuracy")

Save model

joblib.dump(model, model_output.path)

model_output.metadata["framework"] = "xgboost"

model_output.metadata["n_estimators"] = n_estimators

Log metrics

metrics_output.log_metric("cv_accuracy_mean", float(cv_scores.mean()))

metrics_output.log_metric("cv_accuracy_std", float(cv_scores.std()))

metrics_output.log_metric("n_estimators", n_estimators)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=[

"pandas==2.1.4", "scikit-learn==1.4.0",

"joblib==1.3.2", "xgboost==2.0.3"

]

)

def evaluate_model(

test_dataset: Input[Dataset],

model_input: Input[Model],

metrics_output: Output[ClassificationMetrics],

eval_metrics: Output[Metrics]

) -> float:

"""Model evaluation"""

from sklearn.metrics import accuracy_score, classification_report

df = pd.read_csv(test_dataset.path)

X = df.drop("target", axis=1)

y = df["target"]

model = joblib.load(model_input.path)

y_pred = model.predict(X)

y_prob = model.predict_proba(X)

accuracy = accuracy_score(y, y_pred)

Classification metrics (Confusion Matrix visualization)

metrics_output.log_confusion_matrix(

categories=["Class 0", "Class 1"],

matrix=[[int(sum((y == 0) & (y_pred == 0))), int(sum((y == 0) & (y_pred == 1)))],

[int(sum((y == 1) & (y_pred == 0))), int(sum((y == 1) & (y_pred == 1)))]]

)

eval_metrics.log_metric("test_accuracy", accuracy)

return accuracy

Custom Docker Image Component

@dsl.component(

base_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime",

packages_to_install=["transformers==4.37.0", "datasets==2.16.0"]

)

def finetune_llm(

model_name: str,

train_dataset: Input[Dataset],

output_model: Output[Model],

epochs: int = 3,

batch_size: int = 8

):

"""LLM fine-tuning (GPU required)"""

from transformers import AutoModelForSequenceClassification, Trainer

... training code

pass

Authoring Pipelines

Basic Pipeline

@dsl.pipeline(

name="ML Training Pipeline",

description="Data Load -> Preprocess -> Train -> Evaluate Pipeline"

)

def ml_training_pipeline(

dataset_url: str = "https://example.com/data.csv",

test_size: float = 0.2,

n_estimators: int = 100,

max_depth: int = 6,

learning_rate: float = 0.1,

accuracy_threshold: float = 0.85

):

Step 1: Load data

load_task = load_data(dataset_url=dataset_url)

Step 2: Preprocess (runs after load_task completes)

preprocess_task = preprocess_data(

input_dataset=load_task.outputs["output_dataset"],

test_size=test_size

)

Step 3: Train model

train_task = train_model(

train_dataset=preprocess_task.outputs["train_dataset"],

n_estimators=n_estimators,

max_depth=max_depth,

learning_rate=learning_rate

)

Set resource limits

train_task.set_cpu_limit("4")

train_task.set_memory_limit("8Gi")

Step 4: Evaluate

eval_task = evaluate_model(

test_dataset=preprocess_task.outputs["test_dataset"],

model_input=train_task.outputs["model_output"]

)

Step 5: Conditional deployment

with dsl.If(eval_task.output >= accuracy_threshold):

deploy_task = deploy_model(

model_input=train_task.outputs["model_output"],

accuracy=eval_task.output

)

@dsl.component(base_image="python:3.11-slim")

def deploy_model(

model_input: Input[Model],

accuracy: float

):

"""Deploy model (when conditions are met)"""

print(f"Deploying model with accuracy: {accuracy:.4f}")

print(f"Model path: {model_input.path}")

Actual deployment logic (K8s Serving, BentoML, etc.)

Pipeline Compilation and Execution

from kfp import compiler

from kfp.client import Client

1. Compile to YAML

compiler.Compiler().compile(

pipeline_func=ml_training_pipeline,

package_path="ml_pipeline.yaml"

)

2. Submit to KFP server

client = Client(host="http://localhost:8080")

Create Experiment

experiment = client.create_experiment(name="ml-experiments")

Execute Run

run = client.create_run_from_pipeline_func(

ml_training_pipeline,

experiment_name="ml-experiments",

run_name="training-run-001",

arguments={

"dataset_url": "gs://my-bucket/data.csv",

"n_estimators": 200,

"max_depth": 8,

"accuracy_threshold": 0.90

}

)

print(f"Run ID: {run.run_id}")

print(f"Run URL: http://localhost:8080/#/runs/details/{run.run_id}")

Recurring Run

Run daily at 2 AM

client.create_recurring_run(

experiment_id=experiment.experiment_id,

job_name="daily-retraining",

pipeline_func=ml_training_pipeline,

cron_expression="0 2 * * *",

max_concurrency=1,

arguments={

"dataset_url": "gs://my-bucket/latest-data.csv",

"accuracy_threshold": 0.85

}

)

Advanced Patterns

Parallel Execution (ParallelFor)

@dsl.pipeline(name="Hyperparameter Search")

def hp_search_pipeline():

Define hyperparameter combinations

hp_configs = [

{"n_estimators": 100, "max_depth": 4, "lr": 0.1},

{"n_estimators": 200, "max_depth": 6, "lr": 0.05},

{"n_estimators": 300, "max_depth": 8, "lr": 0.01},

]

Parallel training

with dsl.ParallelFor(hp_configs) as config:

train_task = train_model(

train_dataset=load_task.outputs["output_dataset"],

n_estimators=config.n_estimators,

max_depth=config.max_depth,

learning_rate=config.lr

)

Caching

Disable caching at the component level

load_task = load_data(dataset_url=dataset_url)

load_task.set_caching_options(False) # Always re-execute

Configure caching at the pipeline level

run = client.create_run_from_pipeline_func(

ml_training_pipeline,

enable_caching=True # Use cache for identical inputs

)

Volume Mounts

@dsl.component(base_image="python:3.11-slim")

def process_large_data(output_data: Output[Dataset]):

"""Process large datasets"""

pass

PVC mount

process_task = process_large_data()

process_task.add_pvolumes({

"/mnt/data": dsl.PipelineVolume(pvc="data-pvc")

})

CI/CD Integration

GitHub Actions + KFP

.github/workflows/ml-pipeline.yml

name: ML Pipeline CI/CD

on:

push:

branches: [main]

paths:

- 'pipelines/**'

- 'components/**'

jobs:

deploy-pipeline:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Set up Python

uses: actions/setup-python@v5

with:

python-version: '3.11'

- name: Install dependencies

run: pip install kfp==2.7.0

- name: Compile pipeline

run: python pipelines/compile.py

- name: Upload and run pipeline

env:

KFP_HOST: ${{ secrets.KFP_HOST }}

run: |

python -c "

from kfp.client import Client

client = Client(host='$KFP_HOST')

client.upload_pipeline(

pipeline_package_path='ml_pipeline.yaml',

pipeline_name='ml-training-v2',

description='Automated ML training pipeline'

)

"

Conclusion

Kubeflow Pipelines v2 key takeaways:

1. **@dsl.component**: Converts Python functions into containerized components

2. **@dsl.pipeline**: Connects components into a DAG

3. **Artifact System**: Manages inputs/outputs with typed artifacts like Dataset, Model, and Metrics

4. **Conditionals/Loops**: Dynamic pipelines with dsl.If and dsl.ParallelFor

5. **Caching**: Cost reduction by skipping re-execution for identical inputs

**Q1. What is the decorator used to define a component in KFP v2?**

@dsl.component

**Q2. What is the difference between Output[Dataset] and Output[Model]?**

They are type hints that distinguish artifact types. Dataset is for data artifacts, and Model is for trained model artifacts.

**Q3. How do you implement conditional execution in a pipeline?**

Use the dsl.If context manager (e.g., with dsl.If(accuracy \>= threshold))

**Q4. What happens when you run with identical inputs while caching is enabled?**

The component is skipped and the previous execution results are reused.

**Q5. What is ParallelFor used for?**

Running the same component in parallel with different parameters (e.g., hyperparameter search)

**Q6. What is the biggest change when migrating from KFP v1 to v2?**

Using @dsl.component decorator instead of ContainerOp, and the introduction of the Artifact type system.

Quiz

Q1: What is the main topic covered in "Kubeflow Pipelines v2 Practical Guide — Building ML

Pipelines with KFP SDK"?

A practical guide to building ML pipelines with the KFP SDK in Kubeflow Pipelines v2. Covers

component definitions, pipeline authoring, artifact management, and Kubernetes deployment with a

code-first approach.

Installation Core Concepts

Lightweight Python Component Custom Docker Image Component

Basic Pipeline Pipeline Compilation and Execution Recurring Run

Parallel Execution (ParallelFor) Caching Volume Mounts

현재 단락 (1/284)

When moving ML models from experimentation to production, **reproducibility, automation, and version...

작성 글자: 0원문 글자: 10,176작성 단락: 0/284