- Authors
- Name
- 概要
- KFP v2 アーキテクチャ
- KFP SDK v2 パイプライン定義
- コンポーネントタイプ別の活用
- アーティファクト管理とキャッシング戦略
- CI/CD統合
- MLワークフローオーケストレーションツール比較
- Kubernetes拡張機能
- モニタリングとトラブルシューティング
- 本番運用チェックリスト
- 参考資料

概要
MLモデルを開発することと、本番環境で安定的に運用することはまったく異なる問題である。データ前処理、学習、評価、デプロイまでのパイプラインを手動で管理すると、再現性の欠如、実験追跡の失敗、デプロイの遅延などの問題が繰り返し発生する。Kubeflow Pipelines(KFP)v2はKubernetes上でMLワークフローを宣言的に定義し自動化するフレームワークであり、Pythonデコレータだけで複雑なMLパイプラインを構築できる。
KFP v2はv1から大幅に改善された。パイプラインのコンパイル結果がArgo Workflow YAMLではなくIR(Intermediate Representation)YAMLに抽象化され、さまざまな実行バックエンドをサポートする。アーティファクトシステムが強化され、型安全性も向上した。本記事では、KFP v2のアーキテクチャ、SDK活用法、キャッシング戦略、CI/CD統合、そして本番運用トラブルシューティングまで、実践的な観点で解説する。
KFP v2 アーキテクチャ
コアコンポーネント
KFP v2のアーキテクチャは以下のコアレイヤーで構成される。
| コンポーネント | 役割 | 技術スタック |
|---|---|---|
| KFP SDK | パイプライン/コンポーネント定義、コンパイル | Python(kfpパッケージ) |
| IR Compiler | Python DSLをIR YAMLに変換 | Protocol Buffers ベース |
| KFP Backend | パイプライン実行管理、APIサーバー | Go、gRPC/REST |
| Workflow Engine | 実際のワークフローオーケストレーション | Argo Workflows / Tekton |
| Metadata Store | 実行メタデータ、アーティファクト追跡 | ML Metadata(MLMD) |
| Artifact Store | モデル、データセットなどのアーティファクト保存 | MinIO / GCS / S3 |
| UI Dashboard | パイプライン可視化、実行モニタリング | Reactベース Web UI |
KFP v2における最大の変化はIR YAMLの導入である。v1ではパイプラインをArgo Workflow YAMLに直接コンパイルしていたため、Argoと強く結合していた。v2ではIRという中間表現にまずコンパイルし、各バックエンドドライバーがそれを解釈して実行する。これにより、同じパイプライン定義をKubeflowクラスターだけでなく、Google Vertex AI Pipelinesでも実行できるようになった。
インストールとクラスター構成
# KFP SDK v2 インストール
pip install kfp==2.7.0
# Kubernetes拡張ライブラリ(GPU、Volumeなど K8s特化機能)
pip install kfp-kubernetes==1.2.0
# Kubeflow Pipelinesバックエンドデプロイ(Kubernetesクラスターに)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
# ポートフォワーディングでUIにアクセス
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
KFP SDK v2 パイプライン定義
基本コンポーネントとパイプライン
KFP v2では@dsl.componentと@dsl.pipelineデコレータでパイプラインを定義する。コンポーネントはパイプラインの最小実行単位であり、各コンポーネントは独立したコンテナで実行される。
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]
)
def preprocess_data(
raw_data_path: str,
test_size: float,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
data_stats: Output[Metrics]
):
"""データをロードし、学習/テストに分割する。"""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# 欠損値処理
df = df.dropna(subset=["target"])
df = df.fillna(df.median(numeric_only=True))
train_df, test_df = train_test_split(
df, test_size=test_size, random_state=42, stratify=df["target"]
)
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)
# メトリクス記録
data_stats.log_metric("total_rows", len(df))
data_stats.log_metric("train_rows", len(train_df))
data_stats.log_metric("test_rows", len(test_df))
data_stats.log_metric("feature_count", len(df.columns) - 1)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def train_model(
train_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
trained_model: Output[Model],
training_metrics: Output[Metrics]
):
"""Random Forestモデルを学習する。"""
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
train_df = pd.read_csv(train_dataset.path)
X_train = train_df.drop(columns=["target"])
y_train = train_df["target"]
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# 学習精度の記録
train_pred = model.predict(X_train)
training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))
training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))
# モデル保存
joblib.dump(model, trained_model.path)
trained_model.metadata["framework"] = "sklearn"
trained_model.metadata["model_type"] = "RandomForestClassifier"
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]
)
def evaluate_model(
test_dataset: Input[Dataset],
trained_model: Input[Model],
eval_metrics: Output[Metrics],
accuracy_threshold: float = 0.85
) -> bool:
"""テストデータでモデルを評価し、閾値を超えるか確認する。"""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score, classification_report
test_df = pd.read_csv(test_dataset.path)
X_test = test_df.drop(columns=["target"])
y_test = test_df["target"]
model = joblib.load(trained_model.path)
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
f1 = f1_score(y_test, predictions, average="weighted")
eval_metrics.log_metric("test_accuracy", accuracy)
eval_metrics.log_metric("test_f1", f1)
eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)
return accuracy >= accuracy_threshold
パイプラインの組み合わせと条件付き実行
@dsl.pipeline(
name="ml-training-pipeline",
description="データ前処理 → 学習 → 評価 → 条件付きデプロイパイプライン"
)
def ml_training_pipeline(
raw_data_path: str = "gs://my-bucket/data/raw.csv",
test_size: float = 0.2,
n_estimators: int = 100,
max_depth: int = 10,
accuracy_threshold: float = 0.85
):
# ステップ1: データ前処理
preprocess_task = preprocess_data(
raw_data_path=raw_data_path,
test_size=test_size
)
preprocess_task.set_display_name("Data Preprocessing")
# ステップ2: モデル学習
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=n_estimators,
max_depth=max_depth
)
train_task.set_display_name("Model Training")
train_task.set_cpu_limit("4")
train_task.set_memory_limit("8Gi")
# ステップ3: モデル評価
eval_task = evaluate_model(
test_dataset=preprocess_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold
)
eval_task.set_display_name("Model Evaluation")
# ステップ4: 条件付きデプロイ(精度閾値通過時)
with dsl.If(eval_task.output == True):
deploy_task = deploy_model(
model=train_task.outputs["trained_model"],
model_name="fraud-detector",
serving_endpoint="https://serving.example.com"
)
deploy_task.set_display_name("Model Deployment")
# パイプラインコンパイル
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path="ml_training_pipeline.yaml"
)
コンポーネントタイプ別の活用
KFP v2は3種類のコンポーネントタイプをサポートしており、それぞれ適した使用シナリオがある。
| コンポーネントタイプ | 定義方法 | 適した状況 | メリット | デメリット |
|---|---|---|---|---|
| Lightweight Python | @dsl.componentデコレータ | 純粋なPythonロジック、高速プロトタイピング | コードと定義が一か所、高速な反復 | 外部ファイル参照不可、関数内import必須 |
| Container | @dsl.container_component | 既存Dockerイメージ活用、非Python作業 | 言語不問、既存イメージの再利用 | アーティファクトタイプが制限的 |
| Importer | dsl.importer() | 外部アーティファクトをパイプラインに導入 | 既存アーティファクトをパイプライン内で追跡 | データ移動なし、メタデータ登録のみ |
Container Component の例
@dsl.container_component
def run_spark_job(
input_data: Input[Dataset],
output_data: Output[Dataset],
spark_config: str
):
"""Sparkジョブをコンテナとして実行する。"""
return dsl.ContainerSpec(
image="my-registry/spark-processor:3.5",
command=["spark-submit"],
args=[
"--master", "k8s://https://kubernetes.default.svc",
"--conf", spark_config,
"--input", input_data.path,
"--output", output_data.path,
"/app/etl_job.py"
]
)
# Importer活用:外部モデルをパイプラインに導入
@dsl.pipeline(name="model-comparison-pipeline")
def comparison_pipeline():
existing_model = dsl.importer(
artifact_uri="gs://models/production/v2.1/model.pkl",
artifact_class=Model,
reimport=False,
metadata={"version": "2.1", "framework": "sklearn"}
)
# 既存モデルと新モデルを比較
compare_task = compare_models(
baseline_model=existing_model.output,
candidate_model=train_task.outputs["trained_model"]
)
アーティファクト管理とキャッシング戦略
アーティファクトシステム
KFP v2のアーティファクトシステムは、ML Metadata(MLMD)をベースにすべての入出力を追跡する。主要なアーティファクトタイプは以下の通りである。
| アーティファクトタイプ | 用途 | 例 |
|---|---|---|
Dataset | データセット | CSV、Parquetファイル |
Model | 学習済みモデル | pickle、ONNX、SavedModel |
Metrics | 数値メトリクス | accuracy、loss、f1-score |
ClassificationMetrics | 分類メトリクス | confusion matrix、ROC curve |
HTML | HTMLレポート | 可視化レポート |
Markdown | Markdownレポート | テキストベースレポート |
Artifact | 汎用アーティファクト | その他ファイル |
キャッシング戦略
KFP v2はコンポーネントレベルでの自動キャッシングをサポートする。同一の入力とコードで実行されたコンポーネントの結果を再利用し、実行時間を大幅に短縮できる。
@dsl.pipeline(name="caching-example")
def caching_pipeline(data_path: str, retrain: bool = False):
# データ前処理はキャッシング有効化(同一データなら再利用)
preprocess_task = preprocess_data(raw_data_path=data_path)
preprocess_task.set_caching_options(True)
# 学習はretrainフラグに応じてキャッシング制御
train_task = train_model(
train_dataset=preprocess_task.outputs["train_dataset"],
n_estimators=200,
max_depth=15
)
if retrain:
train_task.set_caching_options(False) # 強制再学習
# 外部API呼び出しコンポーネントはキャッシング無効化
deploy_task = deploy_model(model=train_task.outputs["trained_model"])
deploy_task.set_caching_options(False) # 常に新規実行
キャッシング運用時の注意点がある。第一に、コンポーネントコードが純粋関数(同じ入力なら同じ出力)であることがキャッシングの意味を持つ前提である。第二に、KFP v2 SDKではキャッシュ有効期限の設定がサポートされていないため、時間に敏感なデータを扱うコンポーネントはキャッシングを無効化すべきである。第三に、環境変数KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULTをtrueに設定すると、デフォルトですべてのパイプラインでキャッシングが無効化される。
CI/CD統合
GitHub Actionsによるパイプライン自動デプロイ
# .github/workflows/kfp-deploy.yaml
name: KFP Pipeline CI/CD
on:
push:
branches: [main]
paths:
- 'pipelines/**'
- 'components/**'
env:
KFP_HOST: ${{ secrets.KFP_HOST }}
KFP_NAMESPACE: kubeflow
jobs:
validate-and-compile:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install kfp==2.7.0 kfp-kubernetes==1.2.0
pip install pytest
- name: Lint pipeline code
run: |
pip install ruff
ruff check pipelines/ components/
- name: Run unit tests
run: pytest tests/unit/ -v
- name: Compile pipeline
run: |
python -c "
from pipelines.training_pipeline import ml_training_pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=ml_training_pipeline,
package_path='compiled_pipeline.yaml'
)
print('Pipeline compiled successfully')
"
- name: Upload compiled pipeline
uses: actions/upload-artifact@v4
with:
name: compiled-pipeline
path: compiled_pipeline.yaml
deploy-pipeline:
needs: validate-and-compile
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Download compiled pipeline
uses: actions/download-artifact@v4
with:
name: compiled-pipeline
- name: Deploy to KFP
run: |
pip install kfp==2.7.0
python -c "
from kfp.client import Client
client = Client(host='${KFP_HOST}')
# パイプラインのアップロードまたは更新
try:
pipeline = client.upload_pipeline(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
description='Automated ML training pipeline'
)
print(f'Pipeline uploaded: {pipeline.pipeline_id}')
except Exception:
# 既に存在する場合は新バージョンとしてアップロード
pipeline = client.upload_pipeline_version(
pipeline_package_path='compiled_pipeline.yaml',
pipeline_name='ml-training-pipeline',
pipeline_version_name='v${GITHUB_SHA::7}'
)
print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')
"
プログラムによるパイプライン実行
from kfp.client import Client
def trigger_pipeline_run(
host: str,
pipeline_name: str,
experiment_name: str,
params: dict
) -> str:
"""プログラムからパイプライン実行をトリガーする。"""
client = Client(host=host)
# 実験の作成または取得
experiment = client.create_experiment(
name=experiment_name,
namespace="kubeflow"
)
# パイプラインの取得
pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')
if not pipelines.pipelines:
raise ValueError(f"Pipeline '{pipeline_name}' not found")
pipeline_id = pipelines.pipelines[0].pipeline_id
# 実行の作成
run = client.run_pipeline(
experiment_id=experiment.experiment_id,
job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",
pipeline_id=pipeline_id,
params=params
)
print(f"Run created: {run.run_id}")
print(f"Monitor at: {host}/#/runs/details/{run.run_id}")
return run.run_id
# 定期スケジュール実行の設定
def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):
"""毎日深夜0時にパイプラインを実行するスケジュールを作成する。"""
recurring_run = client.create_recurring_run(
experiment_id=experiment_id,
job_name="daily-training",
pipeline_id=pipeline_id,
params={"raw_data_path": "gs://data/daily/latest.csv"},
cron_expression="0 0 * * *",
max_concurrency=1,
enabled=True
)
return recurring_run
MLワークフローオーケストレーションツール比較
| 項目 | KFP v2 | Apache Airflow | Prefect | Vertex AI Pipelines |
|---|---|---|---|---|
| 主な用途 | MLパイプライン専用 | 汎用データワークフロー | 汎用ワークフロー | マネージドMLパイプライン |
| インフラ | Kubernetes必須 | スタンドアロン実行可能 | スタンドアロン / クラウド | Google Cloudマネージド |
| パイプライン定義 | Pythonデコレータ | Python(DAGクラス) | Pythonデコレータ | KFP SDK(同一) |
| アーティファクト追跡 | ML Metadata(内蔵) | XCom(限定的) | 外部連携が必要 | Vertex ML Metadata |
| 実験管理 | 内蔵 | 非対応(MLflow連携) | 非対応 | 内蔵 |
| キャッシング | コンポーネントレベル自動 | タスクレベル手動 | タスクレベル内蔵 | コンポーネントレベル自動 |
| GPUサポート | Kubernetesネイティブ | K8s Executorが必要 | Kubernetes連携 | 自動 |
| UI | パイプライン可視化内蔵 | Web UI内蔵 | Prefect Cloud UI | Google Cloud Console |
| スケーラビリティ | Kubernetesスケーリング | Celery/K8s Executor | Dask/Ray連携 | オートスケーリング |
| 学習コスト | 高い(K8s知識が必要) | 中程度 | 低い | 中程度(GCP依存) |
| コスト | インフラ自己運用 | インフラ自己運用 | Prefect Cloud有料 | 従量課金 |
選択基準まとめ: KubernetesベースのML専用パイプラインが必要ならKFP v2、データエンジニアリングとMLを統合するならAirflow、素早く始めたいならPrefect、Google Cloudに全面的にコミットしているならVertex AI Pipelinesが適している。
Kubernetes拡張機能
KFP v2はkfp-kubernetes拡張ライブラリを通じてKubernetes特化機能をサポートする。
from kfp import dsl
from kfp import kubernetes
@dsl.pipeline(name="gpu-training-pipeline")
def gpu_training_pipeline():
train_task = train_deep_learning_model(
dataset_path="gs://data/training",
epochs=50,
batch_size=64
)
# GPUリソース割り当て
kubernetes.add_node_selector(
train_task,
label_key="accelerator",
label_value="nvidia-a100"
)
kubernetes.add_toleration(
train_task,
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule"
)
train_task.set_accelerator_type("nvidia.com/gpu")
train_task.set_accelerator_limit(2)
train_task.set_cpu_limit("16")
train_task.set_memory_limit("64Gi")
# Secretマウント(モデルレジストリ認証情報)
kubernetes.use_secret_as_env(
train_task,
secret_name="model-registry-credentials",
secret_key_to_env={
"username": "REGISTRY_USERNAME",
"password": "REGISTRY_PASSWORD"
}
)
# PVCマウント(共有データボリューム)
kubernetes.mount_pvc(
train_task,
pvc_name="shared-data-pvc",
mount_path="/mnt/shared-data"
)
モニタリングとトラブルシューティング
よくある問題と解決方法
問題1: パイプラインのコンパイルは成功するが実行が失敗する場合
最も一般的な原因は、コンポーネント内部で使用しているパッケージがpackages_to_installに記載されていないことである。@dsl.componentデコレータのLightweight Pythonコンポーネントは隔離された環境で実行されるため、関数内部で使用するすべてのimportを関数本体内に記述し、必要なパッケージを明示する必要がある。
問題2: キャッシングが期待通りに動作しない場合
キャッシングはコンポーネントの入力パラメータ、コンポーネントコード、ベースイメージに基づいてキャッシュキーを生成する。コードを変更したにもかかわらずキャッシュがヒットする場合、コンポーネントコードが実際には変更されていないか、キャッシュキーに含まれない部分のみが変更されている。パイプラインを再コンパイルしてアップロードし直したか確認すること。
問題3: OOM(Out of Memory)でPodが終了する場合
大規模データを処理するコンポーネントで頻繁に発生する。set_memory_limit()とset_memory_request()を設定し、requestはlimitの約80%程度に設定するのが望ましい。データをチャンク単位で処理したり、不要な変数を明示的にdelすることも有効である。
問題4: パイプライン実行がPending状態で停止する場合
Kubernetesノードのリソース不足が原因であることが多い。kubectl describe podコマンドでイベントを確認し、ノードオートスケーラーの設定を点検すること。GPUノードの場合はノードプールの最大サイズを確認する必要がある。
問題5: ParallelFor内部でアーティファクトが上書きされる場合
KFP v2の既知の問題(GitHub Issue #10186)で、ParallelForやSub-DAG内部で同時に実行されるコンポーネントのアーティファクトが競合する可能性がある。アーティファクトパスに一意の識別子を含めるか、コンポーネント内部で一意のファイル名を生成して回避する。
ログ確認とデバッグ
# パイプライン実行の全ログ確認
kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers
# 特定コンポーネントのログ確認
kubectl logs -n kubeflow <pod-name> -c main
# Argo Workflowステータス確認
kubectl get workflows -n kubeflow
kubectl describe workflow <workflow-name> -n kubeflow
# ML Metadataの直接クエリ(デバッグ用)
kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080
本番運用チェックリスト
本番環境でKFP v2を運用する際、以下の事項を点検する必要がある。
- リソース管理: すべてのコンポーネントにCPU/メモリのrequestとlimitを設定する。GPUコンポーネントには
tolerationsとnodeSelectorを明示する。 - リトライポリシー: 一時的なエラーに備えて
task.set_retry(num_retries=3, backoff_duration="60s")を設定する。 - タイムアウト: 長時間実行されるパイプラインに
timeout設定でリソースの浪費を防止する。 - ネームスペース分離: 開発/ステージング/本番パイプラインを別々のネームスペースに分離する。
- アーティファクトクリーンアップ: 古い実行のアーティファクトを定期的にクリーンアップするCronJobを設定する。MinIO/S3のLifecycle Policyを活用する。
- モニタリング連携: Prometheus + Grafanaでパイプラインの実行時間、成功率、リソース使用量をモニタリングする。
- アラート設定: パイプライン失敗時にSlack/PagerDutyに通知を送るWebhookを連携する。