Skip to content
Published on

Kubeflow Pipelines v2 実践ガイド — KFP SDKでMLパイプラインを構築する

Authors
  • Name
    Twitter
Kubeflow Pipelines v2

はじめに

MLモデルを実験からプロダクションに移行する過程で、再現性、自動化、バージョン管理は必須です。Kubeflow Pipelines(KFP)v2はKubernetes上でMLワークフローを定義・実行するフレームワークで、Pythonデコレータだけでパイプラインを構成できます。

本記事では、KFP v2 SDKの主要機能と実践的なパイプライン構築を解説します。

KFP v2のインストールと基本概念

インストール

pip install kfp==2.7.0

# Kubeflow Pipelinesバックエンドのインストール(Kubernetes)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=2.2.0"

# ポートフォワーディング
kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80

基本概念

# 1. Component: パイプラインの作業単位(Python関数)
# 2. Pipeline: ComponentのDAG(有向非巡回グラフ)
# 3. Artifact: 入出力データ(Dataset, Model, Metricsなど)
# 4. Run: パイプラインの1回の実行
# 5. Experiment: Runの論理的なグループ

コンポーネントの定義

軽量Pythonコンポーネント

from kfp import dsl
from kfp.dsl import (
    Dataset, Input, Output, Model, Metrics,
    ClassificationMetrics, component
)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]
)
def load_data(
    dataset_url: str,
    output_dataset: Output[Dataset]
):
    """データ読み込みコンポーネント"""
    import pandas as pd

    df = pd.read_csv(dataset_url)
    print(f"Loaded {len(df)} rows")

    # 出力アーティファクトに保存
    df.to_csv(output_dataset.path, index=False)
    output_dataset.metadata["num_rows"] = len(df)
    output_dataset.metadata["num_columns"] = len(df.columns)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]
)
def preprocess_data(
    input_dataset: Input[Dataset],
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    test_size: float = 0.2
):
    """データの前処理と分割"""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(input_dataset.path)

    # 前処理
    df = df.dropna()
    df = df.drop_duplicates()

    # 分割
    train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)

    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)

    train_dataset.metadata["num_rows"] = len(train_df)
    test_dataset.metadata["num_rows"] = len(test_df)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=[
        "pandas==2.1.4", "scikit-learn==1.4.0",
        "joblib==1.3.2", "xgboost==2.0.3"
    ]
)
def train_model(
    train_dataset: Input[Dataset],
    model_output: Output[Model],
    metrics_output: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 6,
    learning_rate: float = 0.1
):
    """モデル学習"""
    import pandas as pd
    import joblib
    from xgboost import XGBClassifier
    from sklearn.model_selection import cross_val_score

    df = pd.read_csv(train_dataset.path)
    X = df.drop("target", axis=1)
    y = df["target"]

    # 学習
    model = XGBClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate,
        random_state=42
    )
    model.fit(X, y)

    # 交差検証
    cv_scores = cross_val_score(model, X, y, cv=5, scoring="accuracy")

    # モデル保存
    joblib.dump(model, model_output.path)
    model_output.metadata["framework"] = "xgboost"
    model_output.metadata["n_estimators"] = n_estimators

    # メトリクス記録
    metrics_output.log_metric("cv_accuracy_mean", float(cv_scores.mean()))
    metrics_output.log_metric("cv_accuracy_std", float(cv_scores.std()))
    metrics_output.log_metric("n_estimators", n_estimators)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=[
        "pandas==2.1.4", "scikit-learn==1.4.0",
        "joblib==1.3.2", "xgboost==2.0.3"
    ]
)
def evaluate_model(
    test_dataset: Input[Dataset],
    model_input: Input[Model],
    metrics_output: Output[ClassificationMetrics],
    eval_metrics: Output[Metrics]
) -> float:
    """モデル評価"""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, classification_report

    df = pd.read_csv(test_dataset.path)
    X = df.drop("target", axis=1)
    y = df["target"]

    model = joblib.load(model_input.path)
    y_pred = model.predict(X)
    y_prob = model.predict_proba(X)

    accuracy = accuracy_score(y, y_pred)

    # 分類メトリクス(混同行列の可視化)
    metrics_output.log_confusion_matrix(
        categories=["Class 0", "Class 1"],
        matrix=[[int(sum((y == 0) & (y_pred == 0))), int(sum((y == 0) & (y_pred == 1)))],
                [int(sum((y == 1) & (y_pred == 0))), int(sum((y == 1) & (y_pred == 1)))]]
    )

    eval_metrics.log_metric("test_accuracy", accuracy)

    return accuracy

カスタムDockerイメージコンポーネント

@dsl.component(
    base_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime",
    packages_to_install=["transformers==4.37.0", "datasets==2.16.0"]
)
def finetune_llm(
    model_name: str,
    train_dataset: Input[Dataset],
    output_model: Output[Model],
    epochs: int = 3,
    batch_size: int = 8
):
    """LLMファインチューニング(GPU使用)"""
    from transformers import AutoModelForSequenceClassification, Trainer
    # ... 学習コード
    pass

パイプラインの作成

基本パイプライン

@dsl.pipeline(
    name="ML Training Pipeline",
    description="データ読み込み → 前処理 → 学習 → 評価パイプライン"
)
def ml_training_pipeline(
    dataset_url: str = "https://example.com/data.csv",
    test_size: float = 0.2,
    n_estimators: int = 100,
    max_depth: int = 6,
    learning_rate: float = 0.1,
    accuracy_threshold: float = 0.85
):
    # Step 1: データ読み込み
    load_task = load_data(dataset_url=dataset_url)

    # Step 2: 前処理(load_task完了後に実行)
    preprocess_task = preprocess_data(
        input_dataset=load_task.outputs["output_dataset"],
        test_size=test_size
    )

    # Step 3: モデル学習
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate
    )
    # リソース制限の設定
    train_task.set_cpu_limit("4")
    train_task.set_memory_limit("8Gi")

    # Step 4: 評価
    eval_task = evaluate_model(
        test_dataset=preprocess_task.outputs["test_dataset"],
        model_input=train_task.outputs["model_output"]
    )

    # Step 5: 条件付きデプロイ
    with dsl.If(eval_task.output >= accuracy_threshold):
        deploy_task = deploy_model(
            model_input=train_task.outputs["model_output"],
            accuracy=eval_task.output
        )


@dsl.component(base_image="python:3.11-slim")
def deploy_model(
    model_input: Input[Model],
    accuracy: float
):
    """モデルデプロイ(条件充足時)"""
    print(f"Deploying model with accuracy: {accuracy:.4f}")
    print(f"Model path: {model_input.path}")
    # 実際のデプロイロジック(K8s Serving、BentoMLなど)

パイプラインのコンパイルと実行

from kfp import compiler
from kfp.client import Client

# 1. YAMLにコンパイル
compiler.Compiler().compile(
    pipeline_func=ml_training_pipeline,
    package_path="ml_pipeline.yaml"
)

# 2. KFPサーバーに送信
client = Client(host="http://localhost:8080")

# Experimentの作成
experiment = client.create_experiment(name="ml-experiments")

# Runの実行
run = client.create_run_from_pipeline_func(
    ml_training_pipeline,
    experiment_name="ml-experiments",
    run_name="training-run-001",
    arguments={
        "dataset_url": "gs://my-bucket/data.csv",
        "n_estimators": 200,
        "max_depth": 8,
        "accuracy_threshold": 0.90
    }
)

print(f"Run ID: {run.run_id}")
print(f"Run URL: http://localhost:8080/#/runs/details/{run.run_id}")

定期実行(Recurring Run)

# 毎日午前2時に実行
client.create_recurring_run(
    experiment_id=experiment.experiment_id,
    job_name="daily-retraining",
    pipeline_func=ml_training_pipeline,
    cron_expression="0 2 * * *",
    max_concurrency=1,
    arguments={
        "dataset_url": "gs://my-bucket/latest-data.csv",
        "accuracy_threshold": 0.85
    }
)

高度なパターン

並列実行(ParallelFor)

@dsl.pipeline(name="Hyperparameter Search")
def hp_search_pipeline():
    # ハイパーパラメータの組み合わせを定義
    hp_configs = [
        {"n_estimators": 100, "max_depth": 4, "lr": 0.1},
        {"n_estimators": 200, "max_depth": 6, "lr": 0.05},
        {"n_estimators": 300, "max_depth": 8, "lr": 0.01},
    ]

    # 並列学習
    with dsl.ParallelFor(hp_configs) as config:
        train_task = train_model(
            train_dataset=load_task.outputs["output_dataset"],
            n_estimators=config.n_estimators,
            max_depth=config.max_depth,
            learning_rate=config.lr
        )

キャッシング

# コンポーネントレベルでキャッシュを無効化
load_task = load_data(dataset_url=dataset_url)
load_task.set_caching_options(False)  # 常に再実行

# パイプラインレベルでキャッシュを設定
run = client.create_run_from_pipeline_func(
    ml_training_pipeline,
    enable_caching=True  # 同一入力の場合はキャッシュを使用
)

ボリュームマウント

@dsl.component(base_image="python:3.11-slim")
def process_large_data(output_data: Output[Dataset]):
    """大容量データの処理"""
    pass

# PVCマウント
process_task = process_large_data()
process_task.add_pvolumes({
    "/mnt/data": dsl.PipelineVolume(pvc="data-pvc")
})

CI/CD統合

GitHub Actions + KFP

# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
      - 'components/**'

jobs:
  deploy-pipeline:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install kfp==2.7.0

      - name: Compile pipeline
        run: python pipelines/compile.py

      - name: Upload and run pipeline
        env:
          KFP_HOST: ${{ secrets.KFP_HOST }}
        run: |
          python -c "
          from kfp.client import Client
          client = Client(host='$KFP_HOST')
          client.upload_pipeline(
            pipeline_package_path='ml_pipeline.yaml',
            pipeline_name='ml-training-v2',
            description='Automated ML training pipeline'
          )
          "

まとめ

Kubeflow Pipelines v2の要点整理:

  1. @dsl.component: Python関数をコンテナ化されたコンポーネントに変換
  2. @dsl.pipeline: コンポーネントをDAGとして接続
  3. Artifactシステム: Dataset、Model、Metricsタイプで入出力を管理
  4. 条件/繰り返し: dsl.If、dsl.ParallelForで動的パイプラインを構築
  5. キャッシング: 同一入力時の再実行を防止してコストを削減

クイズ(6問)

Q1. KFP v2でコンポーネントを定義するデコレータは? @dsl.component

Q2. Output[Dataset]とOutput[Model]の違いは? 型ヒントでアーティファクトの種類を区別します。Datasetはデータ、Modelは学習済みモデルのアーティファクトです。

Q3. パイプラインで条件付き実行を実装する方法は? dsl.Ifコンテキストマネージャを使用します(例:with dsl.If(accuracy >= threshold))

Q4. キャッシュが有効な状態で同一入力で実行するとどうなる? 前回の実行結果を再利用し、コンポーネントをスキップします。

Q5. ParallelForの用途は? 同一コンポーネントを異なるパラメータで並列実行します(例:ハイパーパラメータサーチ)

Q6. KFP v1からv2への移行で最大の変更点は? ContainerOpの代わりに@dsl.componentデコレータを使用し、Artifactタイプシステムが導入されました。