Skip to content
Published on

Kubeflow Pipelines v2 MLワークフロー自動化と運用ガイド

Authors
  • Name
    Twitter
Kubeflow Pipelines v2 MLワークフロー

概要

MLモデルを開発することと、本番環境で安定的に運用することはまったく異なる問題である。データ前処理、学習、評価、デプロイまでのパイプラインを手動で管理すると、再現性の欠如、実験追跡の失敗、デプロイの遅延などの問題が繰り返し発生する。Kubeflow Pipelines(KFP)v2はKubernetes上でMLワークフローを宣言的に定義し自動化するフレームワークであり、Pythonデコレータだけで複雑なMLパイプラインを構築できる。

KFP v2はv1から大幅に改善された。パイプラインのコンパイル結果がArgo Workflow YAMLではなくIR(Intermediate Representation)YAMLに抽象化され、さまざまな実行バックエンドをサポートする。アーティファクトシステムが強化され、型安全性も向上した。本記事では、KFP v2のアーキテクチャ、SDK活用法、キャッシング戦略、CI/CD統合、そして本番運用トラブルシューティングまで、実践的な観点で解説する。

KFP v2 アーキテクチャ

コアコンポーネント

KFP v2のアーキテクチャは以下のコアレイヤーで構成される。

コンポーネント役割技術スタック
KFP SDKパイプライン/コンポーネント定義、コンパイルPython(kfpパッケージ)
IR CompilerPython DSLをIR YAMLに変換Protocol Buffers ベース
KFP Backendパイプライン実行管理、APIサーバーGo、gRPC/REST
Workflow Engine実際のワークフローオーケストレーションArgo Workflows / Tekton
Metadata Store実行メタデータ、アーティファクト追跡ML Metadata(MLMD)
Artifact Storeモデル、データセットなどのアーティファクト保存MinIO / GCS / S3
UI Dashboardパイプライン可視化、実行モニタリングReactベース Web UI

KFP v2における最大の変化はIR YAMLの導入である。v1ではパイプラインをArgo Workflow YAMLに直接コンパイルしていたため、Argoと強く結合していた。v2ではIRという中間表現にまずコンパイルし、各バックエンドドライバーがそれを解釈して実行する。これにより、同じパイプライン定義をKubeflowクラスターだけでなく、Google Vertex AI Pipelinesでも実行できるようになった。

インストールとクラスター構成

# KFP SDK v2 インストール
pip install kfp==2.7.0

# Kubernetes拡張ライブラリ(GPU、Volumeなど K8s特化機能)
pip install kfp-kubernetes==1.2.0

# Kubeflow Pipelinesバックエンドデプロイ(Kubernetesクラスターに)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"

# ポートフォワーディングでUIにアクセス
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

KFP SDK v2 パイプライン定義

基本コンポーネントとパイプライン

KFP v2では@dsl.component@dsl.pipelineデコレータでパイプラインを定義する。コンポーネントはパイプラインの最小実行単位であり、各コンポーネントは独立したコンテナで実行される。

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]
)
def preprocess_data(
    raw_data_path: str,
    test_size: float,
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    data_stats: Output[Metrics]
):
    """データをロードし、学習/テストに分割する。"""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(raw_data_path)

    # 欠損値処理
    df = df.dropna(subset=["target"])
    df = df.fillna(df.median(numeric_only=True))

    train_df, test_df = train_test_split(
        df, test_size=test_size, random_state=42, stratify=df["target"]
    )

    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)

    # メトリクス記録
    data_stats.log_metric("total_rows", len(df))
    data_stats.log_metric("train_rows", len(train_df))
    data_stats.log_metric("test_rows", len(test_df))
    data_stats.log_metric("feature_count", len(df.columns) - 1)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def train_model(
    train_dataset: Input[Dataset],
    n_estimators: int,
    max_depth: int,
    trained_model: Output[Model],
    training_metrics: Output[Metrics]
):
    """Random Forestモデルを学習する。"""
    import pandas as pd
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, f1_score

    train_df = pd.read_csv(train_dataset.path)
    X_train = train_df.drop(columns=["target"])
    y_train = train_df["target"]

    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
        n_jobs=-1
    )
    model.fit(X_train, y_train)

    # 学習精度の記録
    train_pred = model.predict(X_train)
    training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))
    training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))

    # モデル保存
    joblib.dump(model, trained_model.path)
    trained_model.metadata["framework"] = "sklearn"
    trained_model.metadata["model_type"] = "RandomForestClassifier"


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def evaluate_model(
    test_dataset: Input[Dataset],
    trained_model: Input[Model],
    eval_metrics: Output[Metrics],
    accuracy_threshold: float = 0.85
) -> bool:
    """テストデータでモデルを評価し、閾値を超えるか確認する。"""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score, classification_report

    test_df = pd.read_csv(test_dataset.path)
    X_test = test_df.drop(columns=["target"])
    y_test = test_df["target"]

    model = joblib.load(trained_model.path)
    predictions = model.predict(X_test)

    accuracy = accuracy_score(y_test, predictions)
    f1 = f1_score(y_test, predictions, average="weighted")

    eval_metrics.log_metric("test_accuracy", accuracy)
    eval_metrics.log_metric("test_f1", f1)
    eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)

    return accuracy >= accuracy_threshold

パイプラインの組み合わせと条件付き実行

@dsl.pipeline(
    name="ml-training-pipeline",
    description="データ前処理 → 学習 → 評価 → 条件付きデプロイパイプライン"
)
def ml_training_pipeline(
    raw_data_path: str = "gs://my-bucket/data/raw.csv",
    test_size: float = 0.2,
    n_estimators: int = 100,
    max_depth: int = 10,
    accuracy_threshold: float = 0.85
):
    # ステップ1: データ前処理
    preprocess_task = preprocess_data(
        raw_data_path=raw_data_path,
        test_size=test_size
    )
    preprocess_task.set_display_name("Data Preprocessing")

    # ステップ2: モデル学習
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=n_estimators,
        max_depth=max_depth
    )
    train_task.set_display_name("Model Training")
    train_task.set_cpu_limit("4")
    train_task.set_memory_limit("8Gi")

    # ステップ3: モデル評価
    eval_task = evaluate_model(
        test_dataset=preprocess_task.outputs["test_dataset"],
        trained_model=train_task.outputs["trained_model"],
        accuracy_threshold=accuracy_threshold
    )
    eval_task.set_display_name("Model Evaluation")

    # ステップ4: 条件付きデプロイ(精度閾値通過時)
    with dsl.If(eval_task.output == True):
        deploy_task = deploy_model(
            model=train_task.outputs["trained_model"],
            model_name="fraud-detector",
            serving_endpoint="https://serving.example.com"
        )
        deploy_task.set_display_name("Model Deployment")


# パイプラインコンパイル
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=ml_training_pipeline,
    package_path="ml_training_pipeline.yaml"
)

コンポーネントタイプ別の活用

KFP v2は3種類のコンポーネントタイプをサポートしており、それぞれ適した使用シナリオがある。

コンポーネントタイプ定義方法適した状況メリットデメリット
Lightweight Python@dsl.componentデコレータ純粋なPythonロジック、高速プロトタイピングコードと定義が一か所、高速な反復外部ファイル参照不可、関数内import必須
Container@dsl.container_component既存Dockerイメージ活用、非Python作業言語不問、既存イメージの再利用アーティファクトタイプが制限的
Importerdsl.importer()外部アーティファクトをパイプラインに導入既存アーティファクトをパイプライン内で追跡データ移動なし、メタデータ登録のみ

Container Component の例

@dsl.container_component
def run_spark_job(
    input_data: Input[Dataset],
    output_data: Output[Dataset],
    spark_config: str
):
    """Sparkジョブをコンテナとして実行する。"""
    return dsl.ContainerSpec(
        image="my-registry/spark-processor:3.5",
        command=["spark-submit"],
        args=[
            "--master", "k8s://https://kubernetes.default.svc",
            "--conf", spark_config,
            "--input", input_data.path,
            "--output", output_data.path,
            "/app/etl_job.py"
        ]
    )


# Importer活用:外部モデルをパイプラインに導入
@dsl.pipeline(name="model-comparison-pipeline")
def comparison_pipeline():
    existing_model = dsl.importer(
        artifact_uri="gs://models/production/v2.1/model.pkl",
        artifact_class=Model,
        reimport=False,
        metadata={"version": "2.1", "framework": "sklearn"}
    )

    # 既存モデルと新モデルを比較
    compare_task = compare_models(
        baseline_model=existing_model.output,
        candidate_model=train_task.outputs["trained_model"]
    )

アーティファクト管理とキャッシング戦略

アーティファクトシステム

KFP v2のアーティファクトシステムは、ML Metadata(MLMD)をベースにすべての入出力を追跡する。主要なアーティファクトタイプは以下の通りである。

アーティファクトタイプ用途
DatasetデータセットCSV、Parquetファイル
Model学習済みモデルpickle、ONNX、SavedModel
Metrics数値メトリクスaccuracy、loss、f1-score
ClassificationMetrics分類メトリクスconfusion matrix、ROC curve
HTMLHTMLレポート可視化レポート
MarkdownMarkdownレポートテキストベースレポート
Artifact汎用アーティファクトその他ファイル

キャッシング戦略

KFP v2はコンポーネントレベルでの自動キャッシングをサポートする。同一の入力とコードで実行されたコンポーネントの結果を再利用し、実行時間を大幅に短縮できる。

@dsl.pipeline(name="caching-example")
def caching_pipeline(data_path: str, retrain: bool = False):
    # データ前処理はキャッシング有効化(同一データなら再利用)
    preprocess_task = preprocess_data(raw_data_path=data_path)
    preprocess_task.set_caching_options(True)

    # 学習はretrainフラグに応じてキャッシング制御
    train_task = train_model(
        train_dataset=preprocess_task.outputs["train_dataset"],
        n_estimators=200,
        max_depth=15
    )
    if retrain:
        train_task.set_caching_options(False)  # 強制再学習

    # 外部API呼び出しコンポーネントはキャッシング無効化
    deploy_task = deploy_model(model=train_task.outputs["trained_model"])
    deploy_task.set_caching_options(False)  # 常に新規実行

キャッシング運用時の注意点がある。第一に、コンポーネントコードが純粋関数(同じ入力なら同じ出力)であることがキャッシングの意味を持つ前提である。第二に、KFP v2 SDKではキャッシュ有効期限の設定がサポートされていないため、時間に敏感なデータを扱うコンポーネントはキャッシングを無効化すべきである。第三に、環境変数KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULTtrueに設定すると、デフォルトですべてのパイプラインでキャッシングが無効化される。

CI/CD統合

GitHub Actionsによるパイプライン自動デプロイ

# .github/workflows/kfp-deploy.yaml
name: KFP Pipeline CI/CD

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
      - 'components/**'

env:
  KFP_HOST: ${{ secrets.KFP_HOST }}
  KFP_NAMESPACE: kubeflow

jobs:
  validate-and-compile:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          pip install kfp==2.7.0 kfp-kubernetes==1.2.0
          pip install pytest

      - name: Lint pipeline code
        run: |
          pip install ruff
          ruff check pipelines/ components/

      - name: Run unit tests
        run: pytest tests/unit/ -v

      - name: Compile pipeline
        run: |
          python -c "
          from pipelines.training_pipeline import ml_training_pipeline
          from kfp import compiler
          compiler.Compiler().compile(
              pipeline_func=ml_training_pipeline,
              package_path='compiled_pipeline.yaml'
          )
          print('Pipeline compiled successfully')
          "

      - name: Upload compiled pipeline
        uses: actions/upload-artifact@v4
        with:
          name: compiled-pipeline
          path: compiled_pipeline.yaml

  deploy-pipeline:
    needs: validate-and-compile
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4

      - name: Download compiled pipeline
        uses: actions/download-artifact@v4
        with:
          name: compiled-pipeline

      - name: Deploy to KFP
        run: |
          pip install kfp==2.7.0
          python -c "
          from kfp.client import Client

          client = Client(host='${KFP_HOST}')

          # パイプラインのアップロードまたは更新
          try:
              pipeline = client.upload_pipeline(
                  pipeline_package_path='compiled_pipeline.yaml',
                  pipeline_name='ml-training-pipeline',
                  description='Automated ML training pipeline'
              )
              print(f'Pipeline uploaded: {pipeline.pipeline_id}')
          except Exception:
              # 既に存在する場合は新バージョンとしてアップロード
              pipeline = client.upload_pipeline_version(
                  pipeline_package_path='compiled_pipeline.yaml',
                  pipeline_name='ml-training-pipeline',
                  pipeline_version_name='v${GITHUB_SHA::7}'
              )
              print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')
          "

プログラムによるパイプライン実行

from kfp.client import Client

def trigger_pipeline_run(
    host: str,
    pipeline_name: str,
    experiment_name: str,
    params: dict
) -> str:
    """プログラムからパイプライン実行をトリガーする。"""
    client = Client(host=host)

    # 実験の作成または取得
    experiment = client.create_experiment(
        name=experiment_name,
        namespace="kubeflow"
    )

    # パイプラインの取得
    pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')
    if not pipelines.pipelines:
        raise ValueError(f"Pipeline '{pipeline_name}' not found")

    pipeline_id = pipelines.pipelines[0].pipeline_id

    # 実行の作成
    run = client.run_pipeline(
        experiment_id=experiment.experiment_id,
        job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",
        pipeline_id=pipeline_id,
        params=params
    )

    print(f"Run created: {run.run_id}")
    print(f"Monitor at: {host}/#/runs/details/{run.run_id}")
    return run.run_id


# 定期スケジュール実行の設定
def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):
    """毎日深夜0時にパイプラインを実行するスケジュールを作成する。"""
    recurring_run = client.create_recurring_run(
        experiment_id=experiment_id,
        job_name="daily-training",
        pipeline_id=pipeline_id,
        params={"raw_data_path": "gs://data/daily/latest.csv"},
        cron_expression="0 0 * * *",
        max_concurrency=1,
        enabled=True
    )
    return recurring_run

MLワークフローオーケストレーションツール比較

項目KFP v2Apache AirflowPrefectVertex AI Pipelines
主な用途MLパイプライン専用汎用データワークフロー汎用ワークフローマネージドMLパイプライン
インフラKubernetes必須スタンドアロン実行可能スタンドアロン / クラウドGoogle Cloudマネージド
パイプライン定義PythonデコレータPython(DAGクラス)PythonデコレータKFP SDK(同一)
アーティファクト追跡ML Metadata(内蔵)XCom(限定的)外部連携が必要Vertex ML Metadata
実験管理内蔵非対応(MLflow連携)非対応内蔵
キャッシングコンポーネントレベル自動タスクレベル手動タスクレベル内蔵コンポーネントレベル自動
GPUサポートKubernetesネイティブK8s Executorが必要Kubernetes連携自動
UIパイプライン可視化内蔵Web UI内蔵Prefect Cloud UIGoogle Cloud Console
スケーラビリティKubernetesスケーリングCelery/K8s ExecutorDask/Ray連携オートスケーリング
学習コスト高い(K8s知識が必要)中程度低い中程度(GCP依存)
コストインフラ自己運用インフラ自己運用Prefect Cloud有料従量課金

選択基準まとめ: KubernetesベースのML専用パイプラインが必要ならKFP v2、データエンジニアリングとMLを統合するならAirflow、素早く始めたいならPrefect、Google Cloudに全面的にコミットしているならVertex AI Pipelinesが適している。

Kubernetes拡張機能

KFP v2はkfp-kubernetes拡張ライブラリを通じてKubernetes特化機能をサポートする。

from kfp import dsl
from kfp import kubernetes

@dsl.pipeline(name="gpu-training-pipeline")
def gpu_training_pipeline():
    train_task = train_deep_learning_model(
        dataset_path="gs://data/training",
        epochs=50,
        batch_size=64
    )

    # GPUリソース割り当て
    kubernetes.add_node_selector(
        train_task,
        label_key="accelerator",
        label_value="nvidia-a100"
    )
    kubernetes.add_toleration(
        train_task,
        key="nvidia.com/gpu",
        operator="Exists",
        effect="NoSchedule"
    )
    train_task.set_accelerator_type("nvidia.com/gpu")
    train_task.set_accelerator_limit(2)
    train_task.set_cpu_limit("16")
    train_task.set_memory_limit("64Gi")

    # Secretマウント(モデルレジストリ認証情報)
    kubernetes.use_secret_as_env(
        train_task,
        secret_name="model-registry-credentials",
        secret_key_to_env={
            "username": "REGISTRY_USERNAME",
            "password": "REGISTRY_PASSWORD"
        }
    )

    # PVCマウント(共有データボリューム)
    kubernetes.mount_pvc(
        train_task,
        pvc_name="shared-data-pvc",
        mount_path="/mnt/shared-data"
    )

モニタリングとトラブルシューティング

よくある問題と解決方法

問題1: パイプラインのコンパイルは成功するが実行が失敗する場合

最も一般的な原因は、コンポーネント内部で使用しているパッケージがpackages_to_installに記載されていないことである。@dsl.componentデコレータのLightweight Pythonコンポーネントは隔離された環境で実行されるため、関数内部で使用するすべてのimportを関数本体内に記述し、必要なパッケージを明示する必要がある。

問題2: キャッシングが期待通りに動作しない場合

キャッシングはコンポーネントの入力パラメータ、コンポーネントコード、ベースイメージに基づいてキャッシュキーを生成する。コードを変更したにもかかわらずキャッシュがヒットする場合、コンポーネントコードが実際には変更されていないか、キャッシュキーに含まれない部分のみが変更されている。パイプラインを再コンパイルしてアップロードし直したか確認すること。

問題3: OOM(Out of Memory)でPodが終了する場合

大規模データを処理するコンポーネントで頻繁に発生する。set_memory_limit()set_memory_request()を設定し、requestはlimitの約80%程度に設定するのが望ましい。データをチャンク単位で処理したり、不要な変数を明示的にdelすることも有効である。

問題4: パイプライン実行がPending状態で停止する場合

Kubernetesノードのリソース不足が原因であることが多い。kubectl describe podコマンドでイベントを確認し、ノードオートスケーラーの設定を点検すること。GPUノードの場合はノードプールの最大サイズを確認する必要がある。

問題5: ParallelFor内部でアーティファクトが上書きされる場合

KFP v2の既知の問題(GitHub Issue #10186)で、ParallelForやSub-DAG内部で同時に実行されるコンポーネントのアーティファクトが競合する可能性がある。アーティファクトパスに一意の識別子を含めるか、コンポーネント内部で一意のファイル名を生成して回避する。

ログ確認とデバッグ

# パイプライン実行の全ログ確認
kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers

# 特定コンポーネントのログ確認
kubectl logs -n kubeflow <pod-name> -c main

# Argo Workflowステータス確認
kubectl get workflows -n kubeflow
kubectl describe workflow <workflow-name> -n kubeflow

# ML Metadataの直接クエリ(デバッグ用)
kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080

本番運用チェックリスト

本番環境でKFP v2を運用する際、以下の事項を点検する必要がある。

  • リソース管理: すべてのコンポーネントにCPU/メモリのrequestとlimitを設定する。GPUコンポーネントにはtolerationsnodeSelectorを明示する。
  • リトライポリシー: 一時的なエラーに備えてtask.set_retry(num_retries=3, backoff_duration="60s")を設定する。
  • タイムアウト: 長時間実行されるパイプラインにtimeout設定でリソースの浪費を防止する。
  • ネームスペース分離: 開発/ステージング/本番パイプラインを別々のネームスペースに分離する。
  • アーティファクトクリーンアップ: 古い実行のアーティファクトを定期的にクリーンアップするCronJobを設定する。MinIO/S3のLifecycle Policyを活用する。
  • モニタリング連携: Prometheus + Grafanaでパイプラインの実行時間、成功率、リソース使用量をモニタリングする。
  • アラート設定: パイプライン失敗時にSlack/PagerDutyに通知を送るWebhookを連携する。

参考資料

  1. Kubeflow Pipelines 公式ドキュメント
  2. KFP SDK v2 API Reference
  3. Kubeflow Pipelines GitHub リポジトリ
  4. KFP v2 キャッシングガイド
  5. KFP v2 アーティファクト管理
  6. KFP v1からv2へのマイグレーションガイド
  7. KFP Python SDK DeepWiki
  8. Vertex AI Pipelinesへのマイグレーション