Skip to content

필사 모드: Kubeflow Pipelines v2 実践ガイド — KFP SDKでMLパイプラインを構築する

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

はじめに

MLモデルを実験からプロダクションに移行する過程で、**再現性、自動化、バージョン管理**は必須です。**Kubeflow Pipelines(KFP)v2**はKubernetes上でMLワークフローを定義・実行するフレームワークで、Pythonデコレータだけでパイプラインを構成できます。

本記事では、KFP v2 SDKの主要機能と実践的なパイプライン構築を解説します。

KFP v2のインストールと基本概念

インストール

pip install kfp==2.7.0

Kubeflow Pipelinesバックエンドのインストール(Kubernetes)

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=2.2.0"

ポートフォワーディング

kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80

基本概念

1. Component: パイプラインの作業単位(Python関数)

2. Pipeline: ComponentのDAG(有向非巡回グラフ)

3. Artifact: 入出力データ(Dataset, Model, Metricsなど)

4. Run: パイプラインの1回の実行

5. Experiment: Runの論理的なグループ

コンポーネントの定義

軽量Pythonコンポーネント

from kfp import dsl

from kfp.dsl import (

Dataset, Input, Output, Model, Metrics,

ClassificationMetrics, component

)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]

)

def load_data(

dataset_url: str,

output_dataset: Output[Dataset]

):

"""データ読み込みコンポーネント"""

df = pd.read_csv(dataset_url)

print(f"Loaded {len(df)} rows")

出力アーティファクトに保存

df.to_csv(output_dataset.path, index=False)

output_dataset.metadata["num_rows"] = len(df)

output_dataset.metadata["num_columns"] = len(df.columns)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.1.4", "scikit-learn==1.4.0"]

)

def preprocess_data(

input_dataset: Input[Dataset],

train_dataset: Output[Dataset],

test_dataset: Output[Dataset],

test_size: float = 0.2

):

"""データの前処理と分割"""

from sklearn.model_selection import train_test_split

df = pd.read_csv(input_dataset.path)

前処理

df = df.dropna()

df = df.drop_duplicates()

分割

train_df, test_df = train_test_split(df, test_size=test_size, random_state=42)

train_df.to_csv(train_dataset.path, index=False)

test_df.to_csv(test_dataset.path, index=False)

train_dataset.metadata["num_rows"] = len(train_df)

test_dataset.metadata["num_rows"] = len(test_df)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=[

"pandas==2.1.4", "scikit-learn==1.4.0",

"joblib==1.3.2", "xgboost==2.0.3"

]

)

def train_model(

train_dataset: Input[Dataset],

model_output: Output[Model],

metrics_output: Output[Metrics],

n_estimators: int = 100,

max_depth: int = 6,

learning_rate: float = 0.1

):

"""モデル学習"""

from xgboost import XGBClassifier

from sklearn.model_selection import cross_val_score

df = pd.read_csv(train_dataset.path)

X = df.drop("target", axis=1)

y = df["target"]

学習

model = XGBClassifier(

n_estimators=n_estimators,

max_depth=max_depth,

learning_rate=learning_rate,

random_state=42

)

model.fit(X, y)

交差検証

cv_scores = cross_val_score(model, X, y, cv=5, scoring="accuracy")

モデル保存

joblib.dump(model, model_output.path)

model_output.metadata["framework"] = "xgboost"

model_output.metadata["n_estimators"] = n_estimators

メトリクス記録

metrics_output.log_metric("cv_accuracy_mean", float(cv_scores.mean()))

metrics_output.log_metric("cv_accuracy_std", float(cv_scores.std()))

metrics_output.log_metric("n_estimators", n_estimators)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=[

"pandas==2.1.4", "scikit-learn==1.4.0",

"joblib==1.3.2", "xgboost==2.0.3"

]

)

def evaluate_model(

test_dataset: Input[Dataset],

model_input: Input[Model],

metrics_output: Output[ClassificationMetrics],

eval_metrics: Output[Metrics]

) -> float:

"""モデル評価"""

from sklearn.metrics import accuracy_score, classification_report

df = pd.read_csv(test_dataset.path)

X = df.drop("target", axis=1)

y = df["target"]

model = joblib.load(model_input.path)

y_pred = model.predict(X)

y_prob = model.predict_proba(X)

accuracy = accuracy_score(y, y_pred)

分類メトリクス(混同行列の可視化)

metrics_output.log_confusion_matrix(

categories=["Class 0", "Class 1"],

matrix=[[int(sum((y == 0) & (y_pred == 0))), int(sum((y == 0) & (y_pred == 1)))],

[int(sum((y == 1) & (y_pred == 0))), int(sum((y == 1) & (y_pred == 1)))]]

)

eval_metrics.log_metric("test_accuracy", accuracy)

return accuracy

カスタムDockerイメージコンポーネント

@dsl.component(

base_image="pytorch/pytorch:2.1.0-cuda12.1-cudnn8-runtime",

packages_to_install=["transformers==4.37.0", "datasets==2.16.0"]

)

def finetune_llm(

model_name: str,

train_dataset: Input[Dataset],

output_model: Output[Model],

epochs: int = 3,

batch_size: int = 8

):

"""LLMファインチューニング(GPU使用)"""

from transformers import AutoModelForSequenceClassification, Trainer

... 学習コード

pass

パイプラインの作成

基本パイプライン

@dsl.pipeline(

name="ML Training Pipeline",

description="データ読み込み → 前処理 → 学習 → 評価パイプライン"

)

def ml_training_pipeline(

dataset_url: str = "https://example.com/data.csv",

test_size: float = 0.2,

n_estimators: int = 100,

max_depth: int = 6,

learning_rate: float = 0.1,

accuracy_threshold: float = 0.85

):

Step 1: データ読み込み

load_task = load_data(dataset_url=dataset_url)

Step 2: 前処理(load_task完了後に実行)

preprocess_task = preprocess_data(

input_dataset=load_task.outputs["output_dataset"],

test_size=test_size

)

Step 3: モデル学習

train_task = train_model(

train_dataset=preprocess_task.outputs["train_dataset"],

n_estimators=n_estimators,

max_depth=max_depth,

learning_rate=learning_rate

)

リソース制限の設定

train_task.set_cpu_limit("4")

train_task.set_memory_limit("8Gi")

Step 4: 評価

eval_task = evaluate_model(

test_dataset=preprocess_task.outputs["test_dataset"],

model_input=train_task.outputs["model_output"]

)

Step 5: 条件付きデプロイ

with dsl.If(eval_task.output >= accuracy_threshold):

deploy_task = deploy_model(

model_input=train_task.outputs["model_output"],

accuracy=eval_task.output

)

@dsl.component(base_image="python:3.11-slim")

def deploy_model(

model_input: Input[Model],

accuracy: float

):

"""モデルデプロイ(条件充足時)"""

print(f"Deploying model with accuracy: {accuracy:.4f}")

print(f"Model path: {model_input.path}")

実際のデプロイロジック(K8s Serving、BentoMLなど)

パイプラインのコンパイルと実行

from kfp import compiler

from kfp.client import Client

1. YAMLにコンパイル

compiler.Compiler().compile(

pipeline_func=ml_training_pipeline,

package_path="ml_pipeline.yaml"

)

2. KFPサーバーに送信

client = Client(host="http://localhost:8080")

Experimentの作成

experiment = client.create_experiment(name="ml-experiments")

Runの実行

run = client.create_run_from_pipeline_func(

ml_training_pipeline,

experiment_name="ml-experiments",

run_name="training-run-001",

arguments={

"dataset_url": "gs://my-bucket/data.csv",

"n_estimators": 200,

"max_depth": 8,

"accuracy_threshold": 0.90

}

)

print(f"Run ID: {run.run_id}")

print(f"Run URL: http://localhost:8080/#/runs/details/{run.run_id}")

定期実行(Recurring Run)

毎日午前2時に実行

client.create_recurring_run(

experiment_id=experiment.experiment_id,

job_name="daily-retraining",

pipeline_func=ml_training_pipeline,

cron_expression="0 2 * * *",

max_concurrency=1,

arguments={

"dataset_url": "gs://my-bucket/latest-data.csv",

"accuracy_threshold": 0.85

}

)

高度なパターン

並列実行(ParallelFor)

@dsl.pipeline(name="Hyperparameter Search")

def hp_search_pipeline():

ハイパーパラメータの組み合わせを定義

hp_configs = [

{"n_estimators": 100, "max_depth": 4, "lr": 0.1},

{"n_estimators": 200, "max_depth": 6, "lr": 0.05},

{"n_estimators": 300, "max_depth": 8, "lr": 0.01},

]

並列学習

with dsl.ParallelFor(hp_configs) as config:

train_task = train_model(

train_dataset=load_task.outputs["output_dataset"],

n_estimators=config.n_estimators,

max_depth=config.max_depth,

learning_rate=config.lr

)

キャッシング

コンポーネントレベルでキャッシュを無効化

load_task = load_data(dataset_url=dataset_url)

load_task.set_caching_options(False) # 常に再実行

パイプラインレベルでキャッシュを設定

run = client.create_run_from_pipeline_func(

ml_training_pipeline,

enable_caching=True # 同一入力の場合はキャッシュを使用

)

ボリュームマウント

@dsl.component(base_image="python:3.11-slim")

def process_large_data(output_data: Output[Dataset]):

"""大容量データの処理"""

pass

PVCマウント

process_task = process_large_data()

process_task.add_pvolumes({

"/mnt/data": dsl.PipelineVolume(pvc="data-pvc")

})

CI/CD統合

GitHub Actions + KFP

.github/workflows/ml-pipeline.yml

name: ML Pipeline CI/CD

on:

push:

branches: [main]

paths:

- 'pipelines/**'

- 'components/**'

jobs:

deploy-pipeline:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Set up Python

uses: actions/setup-python@v5

with:

python-version: '3.11'

- name: Install dependencies

run: pip install kfp==2.7.0

- name: Compile pipeline

run: python pipelines/compile.py

- name: Upload and run pipeline

env:

KFP_HOST: ${{ secrets.KFP_HOST }}

run: |

python -c "

from kfp.client import Client

client = Client(host='$KFP_HOST')

client.upload_pipeline(

pipeline_package_path='ml_pipeline.yaml',

pipeline_name='ml-training-v2',

description='Automated ML training pipeline'

)

"

まとめ

Kubeflow Pipelines v2の要点整理:

1. **@dsl.component**: Python関数をコンテナ化されたコンポーネントに変換

2. **@dsl.pipeline**: コンポーネントをDAGとして接続

3. **Artifactシステム**: Dataset、Model、Metricsタイプで入出力を管理

4. **条件/繰り返し**: dsl.If、dsl.ParallelForで動的パイプラインを構築

5. **キャッシング**: 同一入力時の再実行を防止してコストを削減

**Q1. KFP v2でコンポーネントを定義するデコレータは?**

@dsl.component

**Q2. Output[Dataset]とOutput[Model]の違いは?**

型ヒントでアーティファクトの種類を区別します。Datasetはデータ、Modelは学習済みモデルのアーティファクトです。

**Q3. パイプラインで条件付き実行を実装する方法は?**

dsl.Ifコンテキストマネージャを使用します(例:with dsl.If(accuracy \>= threshold))

**Q4. キャッシュが有効な状態で同一入力で実行するとどうなる?**

前回の実行結果を再利用し、コンポーネントをスキップします。

**Q5. ParallelForの用途は?**

同一コンポーネントを異なるパラメータで並列実行します(例:ハイパーパラメータサーチ)

**Q6. KFP v1からv2への移行で最大の変更点は?**

ContainerOpの代わりに@dsl.componentデコレータを使用し、Artifactタイプシステムが導入されました。

クイズ

Q1: 「Kubeflow Pipelines v2 実践ガイド — KFP

SDKでMLパイプラインを構築する」の主なトピックは何ですか?

Kubeflow Pipelines v2のKFP

SDKを使用してMLパイプラインを構築する実践ガイド。コンポーネント定義、パイプライン作成、アーティファクト管理、Kubernetesデプロイまでコード中心で解説します。

軽量Pythonコンポーネント カスタムDockerイメージコンポーネント

基本パイプライン パイプラインのコンパイルと実行 定期実行(Recurring Run)

並列実行(ParallelFor) キャッシング ボリュームマウント

현재 단락 (1/282)

MLモデルを実験からプロダクションに移行する過程で、**再現性、自動化、バージョン管理**は必須です。**Kubeflow Pipelines(KFP)v2**はKubernetes上でMLワークフロ...

작성 글자: 0원문 글자: 8,858작성 단락: 0/282