Skip to content

필사 모드: Kubeflow Pipelines v2 MLワークフロー自動化と運用ガイド

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

概要

MLモデルを開発することと、本番環境で安定的に運用することはまったく異なる問題である。データ前処理、学習、評価、デプロイまでのパイプラインを手動で管理すると、再現性の欠如、実験追跡の失敗、デプロイの遅延などの問題が繰り返し発生する。**Kubeflow Pipelines(KFP)v2**はKubernetes上でMLワークフローを宣言的に定義し自動化するフレームワークであり、Pythonデコレータだけで複雑なMLパイプラインを構築できる。

KFP v2はv1から大幅に改善された。パイプラインのコンパイル結果がArgo Workflow YAMLではなく**IR(Intermediate Representation)YAML**に抽象化され、さまざまな実行バックエンドをサポートする。アーティファクトシステムが強化され、型安全性も向上した。本記事では、KFP v2のアーキテクチャ、SDK活用法、キャッシング戦略、CI/CD統合、そして本番運用トラブルシューティングまで、実践的な観点で解説する。

KFP v2 アーキテクチャ

コアコンポーネント

KFP v2のアーキテクチャは以下のコアレイヤーで構成される。

| コンポーネント | 役割 | 技術スタック |

| ------------------- | ---------------------------------------------- | ------------------------- |

| **KFP SDK** | パイプライン/コンポーネント定義、コンパイル | Python(`kfp`パッケージ) |

| **IR Compiler** | Python DSLをIR YAMLに変換 | Protocol Buffers ベース |

| **KFP Backend** | パイプライン実行管理、APIサーバー | Go、gRPC/REST |

| **Workflow Engine** | 実際のワークフローオーケストレーション | Argo Workflows / Tekton |

| **Metadata Store** | 実行メタデータ、アーティファクト追跡 | ML Metadata(MLMD) |

| **Artifact Store** | モデル、データセットなどのアーティファクト保存 | MinIO / GCS / S3 |

| **UI Dashboard** | パイプライン可視化、実行モニタリング | Reactベース Web UI |

KFP v2における最大の変化は**IR YAML**の導入である。v1ではパイプラインをArgo Workflow YAMLに直接コンパイルしていたため、Argoと強く結合していた。v2ではIRという中間表現にまずコンパイルし、各バックエンドドライバーがそれを解釈して実行する。これにより、同じパイプライン定義をKubeflowクラスターだけでなく、Google Vertex AI Pipelinesでも実行できるようになった。

インストールとクラスター構成

KFP SDK v2 インストール

pip install kfp==2.7.0

Kubernetes拡張ライブラリ(GPU、Volumeなど K8s特化機能)

pip install kfp-kubernetes==1.2.0

Kubeflow Pipelinesバックエンドデプロイ(Kubernetesクラスターに)

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"

kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"

ポートフォワーディングでUIにアクセス

kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

KFP SDK v2 パイプライン定義

基本コンポーネントとパイプライン

KFP v2では`@dsl.component`と`@dsl.pipeline`デコレータでパイプラインを定義する。コンポーネントはパイプラインの最小実行単位であり、各コンポーネントは独立したコンテナで実行される。

from kfp import dsl

from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"]

)

def preprocess_data(

raw_data_path: str,

test_size: float,

train_dataset: Output[Dataset],

test_dataset: Output[Dataset],

data_stats: Output[Metrics]

):

"""データをロードし、学習/テストに分割する。"""

from sklearn.model_selection import train_test_split

df = pd.read_csv(raw_data_path)

欠損値処理

df = df.dropna(subset=["target"])

df = df.fillna(df.median(numeric_only=True))

train_df, test_df = train_test_split(

df, test_size=test_size, random_state=42, stratify=df["target"]

)

train_df.to_csv(train_dataset.path, index=False)

test_df.to_csv(test_dataset.path, index=False)

メトリクス記録

data_stats.log_metric("total_rows", len(df))

data_stats.log_metric("train_rows", len(train_df))

data_stats.log_metric("test_rows", len(test_df))

data_stats.log_metric("feature_count", len(df.columns) - 1)

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]

)

def train_model(

train_dataset: Input[Dataset],

n_estimators: int,

max_depth: int,

trained_model: Output[Model],

training_metrics: Output[Metrics]

):

"""Random Forestモデルを学習する。"""

from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import accuracy_score, f1_score

train_df = pd.read_csv(train_dataset.path)

X_train = train_df.drop(columns=["target"])

y_train = train_df["target"]

model = RandomForestClassifier(

n_estimators=n_estimators,

max_depth=max_depth,

random_state=42,

n_jobs=-1

)

model.fit(X_train, y_train)

学習精度の記録

train_pred = model.predict(X_train)

training_metrics.log_metric("train_accuracy", accuracy_score(y_train, train_pred))

training_metrics.log_metric("train_f1", f1_score(y_train, train_pred, average="weighted"))

モデル保存

joblib.dump(model, trained_model.path)

trained_model.metadata["framework"] = "sklearn"

trained_model.metadata["model_type"] = "RandomForestClassifier"

@dsl.component(

base_image="python:3.11-slim",

packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "joblib==1.3.0"]

)

def evaluate_model(

test_dataset: Input[Dataset],

trained_model: Input[Model],

eval_metrics: Output[Metrics],

accuracy_threshold: float = 0.85

) -> bool:

"""テストデータでモデルを評価し、閾値を超えるか確認する。"""

from sklearn.metrics import accuracy_score, f1_score, classification_report

test_df = pd.read_csv(test_dataset.path)

X_test = test_df.drop(columns=["target"])

y_test = test_df["target"]

model = joblib.load(trained_model.path)

predictions = model.predict(X_test)

accuracy = accuracy_score(y_test, predictions)

f1 = f1_score(y_test, predictions, average="weighted")

eval_metrics.log_metric("test_accuracy", accuracy)

eval_metrics.log_metric("test_f1", f1)

eval_metrics.log_metric("passed_threshold", accuracy >= accuracy_threshold)

return accuracy >= accuracy_threshold

パイプラインの組み合わせと条件付き実行

@dsl.pipeline(

name="ml-training-pipeline",

description="データ前処理 → 学習 → 評価 → 条件付きデプロイパイプライン"

)

def ml_training_pipeline(

raw_data_path: str = "gs://my-bucket/data/raw.csv",

test_size: float = 0.2,

n_estimators: int = 100,

max_depth: int = 10,

accuracy_threshold: float = 0.85

):

ステップ1: データ前処理

preprocess_task = preprocess_data(

raw_data_path=raw_data_path,

test_size=test_size

)

preprocess_task.set_display_name("Data Preprocessing")

ステップ2: モデル学習

train_task = train_model(

train_dataset=preprocess_task.outputs["train_dataset"],

n_estimators=n_estimators,

max_depth=max_depth

)

train_task.set_display_name("Model Training")

train_task.set_cpu_limit("4")

train_task.set_memory_limit("8Gi")

ステップ3: モデル評価

eval_task = evaluate_model(

test_dataset=preprocess_task.outputs["test_dataset"],

trained_model=train_task.outputs["trained_model"],

accuracy_threshold=accuracy_threshold

)

eval_task.set_display_name("Model Evaluation")

ステップ4: 条件付きデプロイ(精度閾値通過時)

with dsl.If(eval_task.output == True):

deploy_task = deploy_model(

model=train_task.outputs["trained_model"],

model_name="fraud-detector",

serving_endpoint="https://serving.example.com"

)

deploy_task.set_display_name("Model Deployment")

パイプラインコンパイル

from kfp import compiler

compiler.Compiler().compile(

pipeline_func=ml_training_pipeline,

package_path="ml_training_pipeline.yaml"

)

コンポーネントタイプ別の活用

KFP v2は3種類のコンポーネントタイプをサポートしており、それぞれ適した使用シナリオがある。

| コンポーネントタイプ | 定義方法 | 適した状況 | メリット | デメリット |

| ---------------------- | -------------------------- | ------------------------------------------ | ------------------------------------------ | -------------------------------------- |

| **Lightweight Python** | `@dsl.component`デコレータ | 純粋なPythonロジック、高速プロトタイピング | コードと定義が一か所、高速な反復 | 外部ファイル参照不可、関数内import必須 |

| **Container** | `@dsl.container_component` | 既存Dockerイメージ活用、非Python作業 | 言語不問、既存イメージの再利用 | アーティファクトタイプが制限的 |

| **Importer** | `dsl.importer()` | 外部アーティファクトをパイプラインに導入 | 既存アーティファクトをパイプライン内で追跡 | データ移動なし、メタデータ登録のみ |

Container Component の例

@dsl.container_component

def run_spark_job(

input_data: Input[Dataset],

output_data: Output[Dataset],

spark_config: str

):

"""Sparkジョブをコンテナとして実行する。"""

return dsl.ContainerSpec(

image="my-registry/spark-processor:3.5",

command=["spark-submit"],

args=[

"--master", "k8s://https://kubernetes.default.svc",

"--conf", spark_config,

"--input", input_data.path,

"--output", output_data.path,

"/app/etl_job.py"

]

)

Importer活用:外部モデルをパイプラインに導入

@dsl.pipeline(name="model-comparison-pipeline")

def comparison_pipeline():

existing_model = dsl.importer(

artifact_uri="gs://models/production/v2.1/model.pkl",

artifact_class=Model,

reimport=False,

metadata={"version": "2.1", "framework": "sklearn"}

)

既存モデルと新モデルを比較

compare_task = compare_models(

baseline_model=existing_model.output,

candidate_model=train_task.outputs["trained_model"]

)

アーティファクト管理とキャッシング戦略

アーティファクトシステム

KFP v2のアーティファクトシステムは、ML Metadata(MLMD)をベースにすべての入出力を追跡する。主要なアーティファクトタイプは以下の通りである。

| アーティファクトタイプ | 用途 | 例 |

| ----------------------- | -------------------- | --------------------------- |

| `Dataset` | データセット | CSV、Parquetファイル |

| `Model` | 学習済みモデル | pickle、ONNX、SavedModel |

| `Metrics` | 数値メトリクス | accuracy、loss、f1-score |

| `ClassificationMetrics` | 分類メトリクス | confusion matrix、ROC curve |

| `HTML` | HTMLレポート | 可視化レポート |

| `Markdown` | Markdownレポート | テキストベースレポート |

| `Artifact` | 汎用アーティファクト | その他ファイル |

キャッシング戦略

KFP v2はコンポーネントレベルでの自動キャッシングをサポートする。同一の入力とコードで実行されたコンポーネントの結果を再利用し、実行時間を大幅に短縮できる。

@dsl.pipeline(name="caching-example")

def caching_pipeline(data_path: str, retrain: bool = False):

データ前処理はキャッシング有効化(同一データなら再利用)

preprocess_task = preprocess_data(raw_data_path=data_path)

preprocess_task.set_caching_options(True)

学習はretrainフラグに応じてキャッシング制御

train_task = train_model(

train_dataset=preprocess_task.outputs["train_dataset"],

n_estimators=200,

max_depth=15

)

if retrain:

train_task.set_caching_options(False) # 強制再学習

外部API呼び出しコンポーネントはキャッシング無効化

deploy_task = deploy_model(model=train_task.outputs["trained_model"])

deploy_task.set_caching_options(False) # 常に新規実行

キャッシング運用時の注意点がある。第一に、コンポーネントコードが純粋関数(同じ入力なら同じ出力)であることがキャッシングの意味を持つ前提である。第二に、KFP v2 SDKではキャッシュ有効期限の設定がサポートされていないため、時間に敏感なデータを扱うコンポーネントはキャッシングを無効化すべきである。第三に、環境変数`KFP_DISABLE_EXECUTION_CACHING_BY_DEFAULT`を`true`に設定すると、デフォルトですべてのパイプラインでキャッシングが無効化される。

CI/CD統合

GitHub Actionsによるパイプライン自動デプロイ

.github/workflows/kfp-deploy.yaml

name: KFP Pipeline CI/CD

on:

push:

branches: [main]

paths:

- 'pipelines/**'

- 'components/**'

env:

KFP_HOST: ${{ secrets.KFP_HOST }}

KFP_NAMESPACE: kubeflow

jobs:

validate-and-compile:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Set up Python

uses: actions/setup-python@v5

with:

python-version: '3.11'

cache: 'pip'

- name: Install dependencies

run: |

pip install kfp==2.7.0 kfp-kubernetes==1.2.0

pip install pytest

- name: Lint pipeline code

run: |

pip install ruff

ruff check pipelines/ components/

- name: Run unit tests

run: pytest tests/unit/ -v

- name: Compile pipeline

run: |

python -c "

from pipelines.training_pipeline import ml_training_pipeline

from kfp import compiler

compiler.Compiler().compile(

pipeline_func=ml_training_pipeline,

package_path='compiled_pipeline.yaml'

)

print('Pipeline compiled successfully')

"

- name: Upload compiled pipeline

uses: actions/upload-artifact@v4

with:

name: compiled-pipeline

path: compiled_pipeline.yaml

deploy-pipeline:

needs: validate-and-compile

runs-on: ubuntu-latest

if: github.ref == 'refs/heads/main'

steps:

- uses: actions/checkout@v4

- name: Download compiled pipeline

uses: actions/download-artifact@v4

with:

name: compiled-pipeline

- name: Deploy to KFP

run: |

pip install kfp==2.7.0

python -c "

from kfp.client import Client

client = Client(host='${KFP_HOST}')

パイプラインのアップロードまたは更新

try:

pipeline = client.upload_pipeline(

pipeline_package_path='compiled_pipeline.yaml',

pipeline_name='ml-training-pipeline',

description='Automated ML training pipeline'

)

print(f'Pipeline uploaded: {pipeline.pipeline_id}')

except Exception:

既に存在する場合は新バージョンとしてアップロード

pipeline = client.upload_pipeline_version(

pipeline_package_path='compiled_pipeline.yaml',

pipeline_name='ml-training-pipeline',

pipeline_version_name='v${GITHUB_SHA::7}'

)

print(f'Pipeline version uploaded: {pipeline.pipeline_version_id}')

"

プログラムによるパイプライン実行

from kfp.client import Client

def trigger_pipeline_run(

host: str,

pipeline_name: str,

experiment_name: str,

params: dict

) -> str:

"""プログラムからパイプライン実行をトリガーする。"""

client = Client(host=host)

実験の作成または取得

experiment = client.create_experiment(

name=experiment_name,

namespace="kubeflow"

)

パイプラインの取得

pipelines = client.list_pipelines(filter=f'name="{pipeline_name}"')

if not pipelines.pipelines:

raise ValueError(f"Pipeline '{pipeline_name}' not found")

pipeline_id = pipelines.pipelines[0].pipeline_id

実行の作成

run = client.run_pipeline(

experiment_id=experiment.experiment_id,

job_name=f"{pipeline_name}-{params.get('run_tag', 'manual')}",

pipeline_id=pipeline_id,

params=params

)

print(f"Run created: {run.run_id}")

print(f"Monitor at: {host}/#/runs/details/{run.run_id}")

return run.run_id

定期スケジュール実行の設定

def create_recurring_run(client: Client, pipeline_id: str, experiment_id: str):

"""毎日深夜0時にパイプラインを実行するスケジュールを作成する。"""

recurring_run = client.create_recurring_run(

experiment_id=experiment_id,

job_name="daily-training",

pipeline_id=pipeline_id,

params={"raw_data_path": "gs://data/daily/latest.csv"},

cron_expression="0 0 * * *",

max_concurrency=1,

enabled=True

)

return recurring_run

MLワークフローオーケストレーションツール比較

| 項目 | KFP v2 | Apache Airflow | Prefect | Vertex AI Pipelines |

| ------------------------ | ------------------------ | ---------------------- | ------------------------- | ------------------------ |

| **主な用途** | MLパイプライン専用 | 汎用データワークフロー | 汎用ワークフロー | マネージドMLパイプライン |

| **インフラ** | Kubernetes必須 | スタンドアロン実行可能 | スタンドアロン / クラウド | Google Cloudマネージド |

| **パイプライン定義** | Pythonデコレータ | Python(DAGクラス) | Pythonデコレータ | KFP SDK(同一) |

| **アーティファクト追跡** | ML Metadata(内蔵) | XCom(限定的) | 外部連携が必要 | Vertex ML Metadata |

| **実験管理** | 内蔵 | 非対応(MLflow連携) | 非対応 | 内蔵 |

| **キャッシング** | コンポーネントレベル自動 | タスクレベル手動 | タスクレベル内蔵 | コンポーネントレベル自動 |

| **GPUサポート** | Kubernetesネイティブ | K8s Executorが必要 | Kubernetes連携 | 自動 |

| **UI** | パイプライン可視化内蔵 | Web UI内蔵 | Prefect Cloud UI | Google Cloud Console |

| **スケーラビリティ** | Kubernetesスケーリング | Celery/K8s Executor | Dask/Ray連携 | オートスケーリング |

| **学習コスト** | 高い(K8s知識が必要) | 中程度 | 低い | 中程度(GCP依存) |

| **コスト** | インフラ自己運用 | インフラ自己運用 | Prefect Cloud有料 | 従量課金 |

**選択基準まとめ**: KubernetesベースのML専用パイプラインが必要なら**KFP v2**、データエンジニアリングとMLを統合するなら**Airflow**、素早く始めたいなら**Prefect**、Google Cloudに全面的にコミットしているなら**Vertex AI Pipelines**が適している。

Kubernetes拡張機能

KFP v2は`kfp-kubernetes`拡張ライブラリを通じてKubernetes特化機能をサポートする。

from kfp import dsl

from kfp import kubernetes

@dsl.pipeline(name="gpu-training-pipeline")

def gpu_training_pipeline():

train_task = train_deep_learning_model(

dataset_path="gs://data/training",

epochs=50,

batch_size=64

)

GPUリソース割り当て

kubernetes.add_node_selector(

train_task,

label_key="accelerator",

label_value="nvidia-a100"

)

kubernetes.add_toleration(

train_task,

key="nvidia.com/gpu",

operator="Exists",

effect="NoSchedule"

)

train_task.set_accelerator_type("nvidia.com/gpu")

train_task.set_accelerator_limit(2)

train_task.set_cpu_limit("16")

train_task.set_memory_limit("64Gi")

Secretマウント(モデルレジストリ認証情報)

kubernetes.use_secret_as_env(

train_task,

secret_name="model-registry-credentials",

secret_key_to_env={

"username": "REGISTRY_USERNAME",

"password": "REGISTRY_PASSWORD"

}

)

PVCマウント(共有データボリューム)

kubernetes.mount_pvc(

train_task,

pvc_name="shared-data-pvc",

mount_path="/mnt/shared-data"

)

モニタリングとトラブルシューティング

よくある問題と解決方法

**問題1: パイプラインのコンパイルは成功するが実行が失敗する場合**

最も一般的な原因は、コンポーネント内部で使用しているパッケージが`packages_to_install`に記載されていないことである。`@dsl.component`デコレータのLightweight Pythonコンポーネントは隔離された環境で実行されるため、関数内部で使用するすべてのimportを関数本体内に記述し、必要なパッケージを明示する必要がある。

**問題2: キャッシングが期待通りに動作しない場合**

キャッシングはコンポーネントの入力パラメータ、コンポーネントコード、ベースイメージに基づいてキャッシュキーを生成する。コードを変更したにもかかわらずキャッシュがヒットする場合、コンポーネントコードが実際には変更されていないか、キャッシュキーに含まれない部分のみが変更されている。パイプラインを再コンパイルしてアップロードし直したか確認すること。

**問題3: OOM(Out of Memory)でPodが終了する場合**

大規模データを処理するコンポーネントで頻繁に発生する。`set_memory_limit()`と`set_memory_request()`を設定し、requestはlimitの約80%程度に設定するのが望ましい。データをチャンク単位で処理したり、不要な変数を明示的に`del`することも有効である。

**問題4: パイプライン実行がPending状態で停止する場合**

Kubernetesノードのリソース不足が原因であることが多い。`kubectl describe pod`コマンドでイベントを確認し、ノードオートスケーラーの設定を点検すること。GPUノードの場合はノードプールの最大サイズを確認する必要がある。

**問題5: ParallelFor内部でアーティファクトが上書きされる場合**

KFP v2の既知の問題(GitHub Issue #10186)で、ParallelForやSub-DAG内部で同時に実行されるコンポーネントのアーティファクトが競合する可能性がある。アーティファクトパスに一意の識別子を含めるか、コンポーネント内部で一意のファイル名を生成して回避する。

ログ確認とデバッグ

パイプライン実行の全ログ確認

kubectl logs -n kubeflow -l pipeline/runid=<run-id> --all-containers

特定コンポーネントのログ確認

kubectl logs -n kubeflow <pod-name> -c main

Argo Workflowステータス確認

kubectl get workflows -n kubeflow

kubectl describe workflow <workflow-name> -n kubeflow

ML Metadataの直接クエリ(デバッグ用)

kubectl port-forward -n kubeflow svc/metadata-grpc-service 8080:8080

本番運用チェックリスト

本番環境でKFP v2を運用する際、以下の事項を点検する必要がある。

- **リソース管理**: すべてのコンポーネントにCPU/メモリのrequestとlimitを設定する。GPUコンポーネントには`tolerations`と`nodeSelector`を明示する。

- **リトライポリシー**: 一時的なエラーに備えて`task.set_retry(num_retries=3, backoff_duration="60s")`を設定する。

- **タイムアウト**: 長時間実行されるパイプラインに`timeout`設定でリソースの浪費を防止する。

- **ネームスペース分離**: 開発/ステージング/本番パイプラインを別々のネームスペースに分離する。

- **アーティファクトクリーンアップ**: 古い実行のアーティファクトを定期的にクリーンアップするCronJobを設定する。MinIO/S3のLifecycle Policyを活用する。

- **モニタリング連携**: Prometheus + Grafanaでパイプラインの実行時間、成功率、リソース使用量をモニタリングする。

- **アラート設定**: パイプライン失敗時にSlack/PagerDutyに通知を送るWebhookを連携する。

参考資料

1. [Kubeflow Pipelines 公式ドキュメント](https://www.kubeflow.org/docs/components/pipelines/)

2. [KFP SDK v2 API Reference](https://kubeflow-pipelines.readthedocs.io/)

3. [Kubeflow Pipelines GitHub リポジトリ](https://github.com/kubeflow/pipelines)

4. [KFP v2 キャッシングガイド](https://www.kubeflow.org/docs/components/pipelines/caching-v2/)

5. [KFP v2 アーティファクト管理](https://www.kubeflow.org/docs/components/pipelines/user-guides/data-handling/artifacts/)

6. [KFP v1からv2へのマイグレーションガイド](https://www.kubeflow.org/docs/components/pipelines/user-guides/migration/)

7. [KFP Python SDK DeepWiki](https://deepwiki.com/kubeflow/pipelines/2-kfp-python-sdk)

8. [Vertex AI Pipelinesへのマイグレーション](https://cloud.google.com/vertex-ai/docs/pipelines/migrate-kfp)

クイズ

Q1: 「Kubeflow Pipelines v2 MLワークフロー自動化と運用ガイド」の主なトピックは何ですか?

KFP v2のアーキテクチャからKFP

SDKによるMLパイプライン構築、キャッシング、アーティファクト管理、CI/CD統合、本番運用トラブルシューティングまで。

コアコンポーネント KFP v2のアーキテクチャは以下のコアレイヤーで構成される。 KFP

v2における最大の変化はIR YAMLの導入である。v1ではパイプラインをArgo Workflow

YAMLに直接コンパイルしていたため、Argoと強く結合していた。v2ではIRという中間表現にまずコンパイルし、各バックエンドドライバーがそれを解釈して実行する。これにより、同じパイプライン定義をKubeflowクラスターだけでなく、Google

Vertex AI Pipelinesでも実行できるようになった。 インストールとクラスター構成

基本コンポーネントとパイプライン KFP

v2では@dsl.componentと@dsl.pipelineデコレータでパイプラインを定義する。コンポーネントはパイプラインの最小実行単位であり、各コンポーネントは独立したコンテナで実行される。

パイプラインの組み合わせと条件付き実行

KFP v2は3種類のコンポーネントタイプをサポートしており、それぞれ適した使用シナリオがある。

Container Component の例

アーティファクトシステム KFP v2のアーティファクトシステムは、ML

Metadata(MLMD)をベースにすべての入出力を追跡する。主要なアーティファクトタイプは以下の通りである。

キャッシング戦略 KFP

v2はコンポーネントレベルでの自動キャッシングをサポートする。同一の入力とコードで実行されたコンポーネントの結果を再利用し、実行時間を大幅に短縮できる。

キャッシング運用時の注意点がある。第一に、コンポーネントコードが純粋関数(同じ入力なら同じ出力)であることがキャッシングの意味を持つ前提である。

현재 단락 (1/418)

MLモデルを開発することと、本番環境で安定的に運用することはまったく異なる問題である。データ前処理、学習、評価、デプロイまでのパイプラインを手動で管理すると、再現性の欠如、実験追跡の失敗、デプロイの遅...

작성 글자: 0원문 글자: 16,795작성 단락: 0/418