- Published on
Feature Store & MLOps パイプライン完全ガイド 2025:Feast、Feature Engineering、モデルサービング
- Authors

- Name
- Youngju Kim
- @fjvbn20031
目次
1. なぜFeature Storeなのか?
1.1 Training-Serving Skew問題
MLモデル開発で最も頻繁(ひんぱん)な障害原因(しょうがいげんいん)は、学習時点とサービング時点のフィーチャー不整合です。
データサイエンティストがJupyter NotebookでPandasを使ってフィーチャーを生成し、エンジニアがJava/Goで同じロジックを再実装する際、微妙(びみょう)な差異(さい)が発生します。
# 学習時:Pandasでフィーチャー生成
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling('7D').mean()
)
# サービング時:SQLまたは別言語で再実装 → 微妙な差異が発生
# - 境界条件(inclusive/exclusive)の違い
# - タイムゾーン処理の違い
# - NULL処理方式の違い
Feature Storeは単一のフィーチャー定義を学習とサービングの両方で共有し、この問題を根本的(こんぽんてき)に解決します。
1.2 フィーチャーの再利用とチーム協業
大規模ML組織では、数十チームが類似(るいじ)のフィーチャーを独立して生成しています。
| 問題 | Feature Store導入前 | Feature Store導入後 |
|---|---|---|
| フィーチャー重複 | チームごとに類似フィーチャーを再生成 | 中央レジストリから検索・再利用 |
| 一貫性 | チームごとに異なる計算ロジック | 単一定義、バージョン管理 |
| 鮮度 | 手動更新 | 自動マテリアライゼーション |
| 発見可能性 | Slack/Wiki依存 | フィーチャーカタログ + メタデータ |
| ガバナンス | なし | オーナー、リネージ、アクセス制御 |
1.3 データパイプラインの複雑性削減
Feature Storeなしでは、各モデルが独自のデータパイプラインを維持します。
モデルA: raw data → ETL A → features A → training A
モデルB: raw data → ETL B → features B → training B
モデルC: raw data → ETL C → features C → training C
Feature Store導入後:
raw data → Feature Store(一元管理) → モデルA, B, C共有
2. Feature Storeアーキテクチャ
2.1 コアコンポーネント
Feature Storeは4つのコアコンポーネントで構成されます。
オフラインストア(Offline Store)
- 大量の履歴フィーチャーデータを保存
- 学習データ生成に使用
- BigQuery、Snowflake、Redshift、Parquetファイル
- Point-in-Time Joinをサポート
オンラインストア(Online Store)
- 最新フィーチャー値の低遅延(ていちえん)参照
- リアルタイム推論に使用
- Redis、DynamoDB、Bigtable
- P99レイテンシ10ms以下を目標
フィーチャーレジストリ(Feature Registry)
- すべてのフィーチャーのメタデータ管理
- 名前、型、オーナー、説明、タグ
- データリネージ追跡
変換エンジン(Transformation Engine)
- 生データからフィーチャーを生成
- バッチ/ストリーミング変換をサポート
- Spark、Flink、dbt統合
2.2 データフロー
[データソース] → [変換エンジン] → [オフラインストア] ←→ [オンラインストア]
↑ ↓ ↓
生データ 学習パイプライン 推論サービス
↓
[フィーチャーレジストリ]
(メタデータ管理)
2.3 主要Feature Storeソリューション比較
| ソリューション | 種類 | オフライン | オンライン | ストリーミング | コスト |
|---|---|---|---|---|---|
| Feast | OSS | Redshift/BQ/File | Redis/DynamoDB | Push型 | 無料(インフラのみ) |
| Tecton | マネージド | Spark + Delta | DynamoDB | Spark Streaming | サブスクリプション |
| Hopsworks | OSS/マネージド | Hudi | RonDB | Flink | コミュニティ無料 |
| Vertex AI FS | GCPマネージド | BigQuery | Bigtable | Dataflow | 従量課金 |
| SageMaker FS | AWSマネージド | S3 + Glue | 内蔵オンライン | Kinesis | 従量課金 |
3. Feast ディープダイブ
3.1 インストールと初期設定
# Feastインストール
pip install feast
# プロジェクト初期化
feast init my_feature_store
cd my_feature_store
# プロジェクト構成
# my_feature_store/
# feature_store.yaml # プロジェクト設定
# features.py # フィーチャー定義
# data/ # サンプルデータ
feature_store.yaml 設定:
project: my_ml_project
registry: data/registry.db
provider: local # local, gcp, aws
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2
3.2 フィーチャー定義
# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
# エンティティ定義
user = Entity(
name="user_id",
join_keys=["user_id"],
description="ユーザー固有ID",
)
# データソース定義
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# フィーチャービュー定義
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="days_since_last_login", dtype=Int64),
Field(name="preferred_category", dtype=String),
],
online=True,
source=user_stats_source,
tags={"team": "recommendation", "version": "v2"},
)
3.3 マテリアライゼーション(オフライン → オンライン)
# フィーチャー定義の適用
feast apply
# オフライン → オンラインストアへのフィーチャー値マテリアライゼーション
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59
# インクリメンタルマテリアライゼーション(前回以降のみ)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
3.4 学習データ生成(Point-in-Time Join)
Point-in-Time JoinはFeature Storeの核心(かくしん)機能です。データリーケージ(Data Leakage)を防止しながら、過去の特定時点のフィーチャー値を正確に取得します。
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# 学習データのエンティティ + タイムスタンプ
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1001],
"event_timestamp": pd.to_datetime([
"2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
]),
})
# Point-in-Time Joinでフィーチャーを取得
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
).to_df()
print(training_df)
# 各行はその時点基準のフィーチャー値を持つ
# → 2024-09-01時点のuser 1001のフィーチャー != 2024-10-01時点のuser 1001のフィーチャー
3.5 オンラインサービング
# リアルタイム推論のためのオンラインフィーチャー取得
feature_vector = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
],
).to_dict()
# 結果:最新のフィーチャー値を返却(P99レイテンシ約5-10ms)
3.6 GCP/AWS本番設定
# GCP本番設定
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
type: datastore # または bigtable
project_id: my-gcp-project
offline_store:
type: bigquery
project_id: my-gcp-project
dataset: feast_features
# AWS本番設定
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
type: dynamodb
region: ap-northeast-1
offline_store:
type: redshift
cluster_id: my-redshift-cluster
region: ap-northeast-1
database: ml_features
user: feast_user
s3_staging_location: s3://my-bucket/feast-staging/
4. Feature Engineeringパターン
4.1 時間ベースフィーチャー(Temporal Features)
import pandas as pd
def create_temporal_features(df, timestamp_col, entity_col, value_col):
"""タイムウィンドウベースの集約フィーチャー生成"""
df = df.sort_values(timestamp_col)
features = pd.DataFrame()
features[entity_col] = df[entity_col]
features[timestamp_col] = df[timestamp_col]
# 各ウィンドウの移動平均
for window in ['1D', '7D', '30D']:
features[f'{value_col}_mean_{window}'] = (
df.groupby(entity_col)[value_col]
.transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
)
# トレンドフィーチャー:7日平均 / 30日平均
features[f'{value_col}_trend_7d_30d'] = (
features[f'{value_col}_mean_7D'] /
features[f'{value_col}_mean_30D'].replace(0, float('nan'))
)
# 曜日/時間フィーチャー
features['day_of_week'] = df[timestamp_col].dt.dayofweek
features['hour_of_day'] = df[timestamp_col].dt.hour
features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
return features
4.2 集約フィーチャー(Aggregation Features)
def create_aggregation_features(events_df, entity_col, group_col):
"""エンティティごとの集約フィーチャー"""
agg_features = events_df.groupby(entity_col).agg(
event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
).reset_index()
# 比率フィーチャー
agg_features['high_value_ratio'] = (
events_df[events_df['amount'] > 100]
.groupby(entity_col)
.size()
.reindex(agg_features[entity_col], fill_value=0)
.values / agg_features['event_count']
)
return agg_features
4.3 エンベディングフィーチャー
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
def create_text_embedding_features(df, text_col, prefix='emb'):
"""テキストエンベディングフィーチャー生成"""
embeddings = model.encode(df[text_col].fillna('').tolist())
emb_df = pd.DataFrame(
embeddings,
columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
index=df.index,
)
return pd.concat([df, emb_df], axis=1)
4.4 クロスフィーチャー(Cross Features)
def create_cross_features(df):
"""フィーチャー間のクロス/相互作用フィーチャー"""
# 比率フィーチャー
df['purchase_to_visit_ratio'] = (
df['purchase_count'] / df['visit_count'].replace(0, 1)
)
# バケット化 + クロス
df['age_bucket'] = pd.cut(
df['age'], bins=[0, 25, 35, 50, 100],
labels=['young', 'middle', 'senior', 'elder']
)
df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']
# 数値相互作用
df['income_x_age'] = df['income'] * df['age']
return df
5. MLOps成熟度レベル
Googleが定義したMLOps成熟度(せいじゅくど)モデルは4段階に分かれます。
Level 0:手動プロセス
データサイエンティストが手動で:
1. データ収集と前処理
2. Jupyterでモデル学習
3. モデルファイルをエンジニアに引き渡し
4. エンジニアがサービングコードを作成
5. 手動デプロイ
問題点:
- デプロイサイクル:数ヶ月
- 再現性なし
- モニタリングなし
Level 1:MLパイプライン自動化
自動化されたMLパイプライン:
1. データ検証 → フィーチャーエンジニアリング → 学習 → 評価 → デプロイ
2. パイプラインオーケストレーター使用(Kubeflow、Airflow)
3. CT(Continuous Training):トリガーベースの自動再学習
改善点:
- デプロイサイクル:週単位
- パイプライン再現可能
- 基本的なモニタリング
Level 2:CI/CD for ML
CI/CDパイプライン統合:
1. コード変更 → 自動テスト → パイプラインビルド → モデル学習
2. モデル検証ゲート(性能閾値)
3. Shadow Deployment → Canary → Full Rollout
4. A/Bテスト自動化
改善点:
- デプロイサイクル:日単位
- 自動ロールバック
- 体系的な実験管理
Level 3:自動再学習 + 完全自動化
完全自動化:
1. ドリフト検知 → 自動再学習トリガー
2. 自動フィーチャー選択/ハイパーパラメータ最適化
3. モデル性能の自動比較 + チャンピオン/チャレンジャー
4. 自動スケーリング + コスト最適化
改善点:
- デプロイサイクル:時間単位
- 無人運用
- プロアクティブモニタリング
6. MLパイプラインオーケストレーション
6.1 Kubeflow Pipelines
# Kubeflow Pipelines DSLでパイプライン定義
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11",
packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
raw_data: Input[Dataset],
processed_data: Output[Dataset],
):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(raw_data.path)
scaler = StandardScaler()
df_scaled = pd.DataFrame(
scaler.fit_transform(df.select_dtypes(include='number')),
columns=df.select_dtypes(include='number').columns,
)
df_scaled.to_parquet(processed_data.path)
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
training_data: Input[Dataset],
model_artifact: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 10,
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import joblib
df = pd.read_parquet(training_data.path)
X = df.drop('target', axis=1)
y = df['target']
clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
)
scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')
clf.fit(X, y)
joblib.dump(clf, model_artifact.path)
metrics.log_metric("f1_mean", float(scores.mean()))
metrics.log_metric("f1_std", float(scores.std()))
@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
preprocess_task = preprocess_data(
raw_data=dsl.importer(
artifact_uri="gs://my-bucket/raw-data/",
artifact_class=Dataset,
).output,
)
train_task = train_model(
training_data=preprocess_task.outputs["processed_data"],
n_estimators=n_estimators,
max_depth=max_depth,
)
# コンパイルと実行
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")
6.2 Airflow MLパイプライン
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_training_pipeline',
default_args=default_args,
schedule_interval='@weekly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'training'],
) as dag:
validate_data = KubernetesPodOperator(
task_id='validate_data',
name='data-validation',
image='ml-pipeline:latest',
cmds=['python', 'validate.py'],
namespace='ml-pipelines',
)
extract_features = KubernetesPodOperator(
task_id='extract_features',
name='feature-extraction',
image='ml-pipeline:latest',
cmds=['python', 'extract_features.py'],
namespace='ml-pipelines',
)
train_model_task = KubernetesPodOperator(
task_id='train_model',
name='model-training',
image='ml-pipeline:latest',
cmds=['python', 'train.py'],
namespace='ml-pipelines',
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=lambda: print("Evaluating model..."),
)
validate_data >> extract_features >> train_model_task >> evaluate_model
7. 実験追跡(Experiment Tracking)
7.1 MLflow実験追跡
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, precision_score, recall_score
# MLflowサーバー設定
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")
# 実験実行
with mlflow.start_run(run_name="gbm-experiment-42") as run:
# ハイパーパラメータのロギング
params = {
"n_estimators": 200,
"max_depth": 8,
"learning_rate": 0.1,
"subsample": 0.8,
}
mlflow.log_params(params)
# モデル学習
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# メトリクスのロギング
y_pred = model.predict(X_test)
metrics = {
"f1": f1_score(y_test, y_pred, average='macro'),
"precision": precision_score(y_test, y_pred, average='macro'),
"recall": recall_score(y_test, y_pred, average='macro'),
}
mlflow.log_metrics(metrics)
# モデルアーティファクトのロギング
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="recommendation-gbm",
)
mlflow.log_artifact("feature_importance.png")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
8. モデルレジストリ(Model Registry)
8.1 MLflow Model Registry
from mlflow.tracking import MlflowClient
client = MlflowClient()
# モデルバージョン作成
model_version = client.create_model_version(
name="recommendation-gbm",
source=f"runs:/{run_id}/model",
run_id=run_id,
description="GBMモデルv3:新規フィーチャー追加",
)
# ステージ遷移:None → Staging
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Staging",
archive_existing_versions=False,
)
# ステージング検証後、本番昇格
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Production",
archive_existing_versions=True, # 既存の本番バージョンをアーカイブ
)
8.2 モデルバージョン管理戦略
モデルライフサイクル:
None → Staging → Production → Archived
バージョン管理ポリシー:
- Staging:自動性能テスト合格時
- Production:手動承認またはA/Bテスト合格時
- Archived:新バージョンの本番昇格時に自動
- 直近3バージョンを保持、それ以前は削除
# 本番モデルのロード
import mlflow.pyfunc
model = mlflow.pyfunc.load_model(
model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)
9. モデルサービング
9.1 BentoML
# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON
# 保存済みモデルのロード
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()
svc = bentoml.Service("recommendation_service", runners=[runner])
@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
features = np.array(input_data["features"]).reshape(1, -1)
prediction = await runner.predict.async_run(features)
return {
"prediction": int(prediction[0]),
"model_version": "v3.2.1",
}
# ビルドとデプロイ
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest
9.2 Seldon Core(Kubernetes)
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: recommendation-model
namespace: ml-serving
spec:
predictors:
- name: default
replicas: 3
graph:
name: classifier
implementation: SKLEARN_SERVER
modelUri: s3://models/recommendation/v3
envSecretRefName: s3-credentials
componentSpecs:
- spec:
containers:
- name: classifier
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
traffic: 100
labels:
version: v3
9.3 vLLM(LLMサービング)
# vLLMサーバー起動
# python -m vllm.entrypoints.openai.api_server \
# --model meta-llama/Llama-3-8B-Instruct \
# --tensor-parallel-size 2 \
# --max-model-len 8192
# クライアント呼び出し
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed",
)
response = client.chat.completions.create(
model="meta-llama/Llama-3-8B-Instruct",
messages=[
{"role": "user", "content": "推薦システムの利点は?"},
],
temperature=0.7,
max_tokens=512,
)
9.4 サービングソリューション比較
| ソリューション | モデル種類 | デプロイ環境 | バッチ推論 | GPU対応 | 自動スケーリング |
|---|---|---|---|---|---|
| BentoML | 汎用 | Docker/K8s | 対応 | 対応 | 対応 |
| Seldon Core | 汎用 | K8s専用 | 対応 | 対応 | HPA |
| TFServing | TFモデル | Docker/K8s | 対応 | 対応 | 手動 |
| Triton | マルチFW | Docker/K8s | 対応 | 最適化 | 対応 |
| vLLM | LLM | Docker/K8s | 非対応 | 必須 | 対応 |
10. モデルモニタリング
10.1 データドリフト検知
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# 基準データ(学習時点)
reference_data = pd.read_parquet("training_data.parquet")
# 現在データ(本番)
current_data = pd.read_parquet("production_data_latest.parquet")
column_mapping = ColumnMapping(
target='label',
numerical_features=['age', 'income', 'purchase_count'],
categorical_features=['category', 'region'],
)
# ドリフトレポート生成
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
report.save_html("drift_report.html")
# プログラマティックアクセス
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
if dataset_drift:
print(f"ドリフト検知!ドリフトフィーチャー比率:{drift_share:.2%}")
trigger_retraining_pipeline()
10.2 予測ドリフト検知
from evidently.test_suite import TestSuite
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestMeanInNSigmas,
)
test_suite = TestSuite(tests=[
TestShareOfDriftedColumns(lt=0.3), # ドリフトフィーチャー30%未満
TestColumnDrift(column_name="prediction_score"),
TestMeanInNSigmas(column_name="prediction_score", n=2),
])
test_suite.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
test_results = test_suite.as_dict()
all_passed = all(
t['status'] == 'SUCCESS' for t in test_results['tests']
)
if not all_passed:
alert_team("モデルドリフト検知 - 再学習レビューが必要です")
11. A/Bテスト
11.1 トラフィック分割
# Istio VirtualServiceでトラフィック分割
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: recommendation-ab-test
spec:
hosts:
- recommendation-service
http:
- match:
- headers:
x-experiment-group:
exact: "treatment"
route:
- destination:
host: recommendation-service
subset: v2-challenger
weight: 100
- route:
- destination:
host: recommendation-service
subset: v1-champion
weight: 80
- destination:
host: recommendation-service
subset: v2-challenger
weight: 20
11.2 統計的有意性検定
from scipy import stats
import numpy as np
def ab_test_significance(
control_conversions, control_total,
treatment_conversions, treatment_total,
alpha=0.05,
):
"""A/Bテストの統計的有意性検証"""
control_rate = control_conversions / control_total
treatment_rate = treatment_conversions / treatment_total
# 比率のZ検定
pooled_rate = (
(control_conversions + treatment_conversions) /
(control_total + treatment_total)
)
se = np.sqrt(
pooled_rate * (1 - pooled_rate) *
(1/control_total + 1/treatment_total)
)
z_stat = (treatment_rate - control_rate) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))
lift = (treatment_rate - control_rate) / control_rate
return {
"control_rate": control_rate,
"treatment_rate": treatment_rate,
"lift": f"{lift:.2%}",
"p_value": p_value,
"significant": p_value < alpha,
"recommendation": (
"チャレンジャーモデルを昇格" if p_value < alpha and lift > 0
else "現行モデルを維持"
),
}
11.3 Multi-Armed Bandit
import numpy as np
class ThompsonSampling:
"""Thompson Samplingによる適応的トラフィック分割"""
def __init__(self, n_arms):
self.n_arms = n_arms
self.successes = np.ones(n_arms)
self.failures = np.ones(n_arms)
def select_arm(self):
"""Beta分布からサンプリングして最適armを選択"""
samples = [
np.random.beta(self.successes[i], self.failures[i])
for i in range(self.n_arms)
]
return int(np.argmax(samples))
def update(self, arm, reward):
if reward:
self.successes[arm] += 1
else:
self.failures[arm] += 1
def get_allocation(self):
total = self.successes + self.failures
rates = self.successes / total
return rates / rates.sum()
12. CI/CD for ML
12.1 モデル検証ゲート
def validate_model(
model_uri: str,
test_data_path: str,
min_f1: float = 0.85,
max_latency_ms: float = 50.0,
):
"""モデルデプロイ前の検証ゲート"""
results = {"passed": True, "checks": []}
# 1. 性能検証
model = mlflow.pyfunc.load_model(model_uri)
test_data = pd.read_parquet(test_data_path)
predictions = model.predict(test_data.drop('target', axis=1))
f1 = f1_score(test_data['target'], predictions, average='macro')
results["checks"].append({
"name": "performance",
"value": f1,
"threshold": min_f1,
"passed": f1 >= min_f1,
})
# 2. レイテンシ検証
import time
latencies = []
for i in range(100):
start = time.time()
model.predict(test_data.iloc[[i]])
latencies.append((time.time() - start) * 1000)
p99_latency = np.percentile(latencies, 99)
results["checks"].append({
"name": "latency",
"value": p99_latency,
"threshold": max_latency_ms,
"passed": p99_latency <= max_latency_ms,
})
results["passed"] = all(c["passed"] for c in results["checks"])
return results
12.2 GitHub Actions ML CI/CD
name: ML CI/CD Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'features/**'
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate Data Schema
run: python scripts/validate_data.py
train-and-evaluate:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train Model
run: python train.py --config configs/production.yaml
model-validation:
needs: train-and-evaluate
runs-on: ubuntu-latest
steps:
- name: Run Validation Gates
run: python model_validation.py
deploy-production:
needs: model-validation
runs-on: ubuntu-latest
environment: production
steps:
- name: Canary Rollout
run: |
kubectl apply -f k8s/production/canary-10.yaml
sleep 300
python check_canary_metrics.py
kubectl apply -f k8s/production/full-rollout.yaml
13. コスト最適化
13.1 バッチ vs リアルタイム推論
| 戦略 | 期待削減率 | トレードオフ |
|---|---|---|
| バッチ推論への切り替え | 50-80% | リアルタイム性の放棄 |
| 予測キャッシング | 30-60% | キャッシュ鮮度 |
| モデル量子化 | 40-60% | 微細な精度低下 |
| スポットインスタンス | 60-90% | 可用性リスク |
| オートスケーリング | 20-40% | コールドスタート |
| モデル蒸留 | 50-70% | 開発コスト |
13.2 予測キャッシング
import redis
import hashlib
import json
class PredictionCache:
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
self.hit_count = 0
self.miss_count = 0
def _make_key(self, features: dict) -> str:
feature_str = json.dumps(features, sort_keys=True)
return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"
def get_or_predict(self, features: dict, model) -> dict:
key = self._make_key(features)
cached = self.redis.get(key)
if cached:
self.hit_count += 1
return json.loads(cached)
self.miss_count += 1
prediction = model.predict(features)
self.redis.setex(key, self.ttl, json.dumps(prediction))
return prediction
14. クイズ
Q1:Feature Storeの核心的な価値は何ですか?
A: Feature Storeの核心的な価値はTraining-Serving Skewの防止です。学習時点とサービング時点で同一のフィーチャー定義と変換ロジックを共有し、データの不整合を根本的に排除します。加えて、フィーチャーの再利用、チーム協業、フィーチャーガバナンスも重要な価値です。
Q2:Point-in-Time Joinが必要な理由は?
A: Point-in-Time Joinはデータリーケージ(Data Leakage)を防止します。学習データ生成時、各イベント時点で実際に利用可能だったフィーチャー値のみをジョインする必要があります。未来のデータが含まれると、学習時は非現実的に高い性能を示しますが、本番では大幅に性能が低下します。
Q3:MLOps Level 2からLevel 3に移行するための核心要素は?
A: Level 2(CI/CD for ML)からLevel 3(自動再学習)への移行には、自動ドリフト検知と再学習トリガーシステムが必要です。データドリフトと予測ドリフトをリアルタイムでモニタリングし、閾値超過時に自動で再学習パイプラインを実行し、チャンピオン/チャレンジャー比較による自動昇格を行います。
Q4:BentoMLとSeldon Coreの主な違いは?
A: BentoMLはフレームワーク非依存のモデルパッケージングツールで、Dockerコンテナを生成してどこでもデプロイ可能です。Seldon CoreはKubernetesネイティブのサービングプラットフォームで、CRDベースのデプロイ、A/Bテスト、Canaryデプロイ、説明可能性(Explainer)などK8sエコシステムと深く統合されています。
Q5:モデルドリフトを検知する主要な方法3つは?
A: (1) データドリフト:入力フィーチャー分布の変化をKS検定、PSI(Population Stability Index)等で検知。(2) 予測ドリフト:モデル出力分布の変化をモニタリング。(3) 性能ドリフト:実際のラベルと比較して精度、F1等のメトリクス低下を追跡。Evidently AIとWhyLabsが代表的なツールです。
15. 参考資料
- Feast公式ドキュメント - https://docs.feast.dev/
- MLflow公式ドキュメント - https://mlflow.org/docs/latest/index.html
- Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
- Evidently AIドキュメント - https://docs.evidentlyai.com/
- BentoML公式ドキュメント - https://docs.bentoml.com/
- Seldon Coreドキュメント - https://docs.seldon.io/
- Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
- WhyLabsドキュメント - https://docs.whylabs.ai/
- vLLMプロジェクト - https://docs.vllm.ai/
- Weights and Biases - https://docs.wandb.ai/
- Tecton Feature Store - https://docs.tecton.ai/
- Hopsworks Feature Store - https://docs.hopsworks.ai/
- NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/