Skip to content

✍️ 필사 모드: Feature Store & MLOps パイプライン完全ガイド 2025:Feast、Feature Engineering、モデルサービング

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

目次

1. なぜFeature Storeなのか?

1.1 Training-Serving Skew問題

MLモデル開発で最も頻繁(ひんぱん)な障害原因(しょうがいげんいん)は、学習時点とサービング時点のフィーチャー不整合です。

データサイエンティストがJupyter NotebookでPandasを使ってフィーチャーを生成し、エンジニアがJava/Goで同じロジックを再実装する際、微妙(びみょう)な差異(さい)が発生します。

# 学習時:Pandasでフィーチャー生成
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
    lambda x: x.rolling('7D').mean()
)

# サービング時:SQLまたは別言語で再実装 → 微妙な差異が発生
# - 境界条件(inclusive/exclusive)の違い
# - タイムゾーン処理の違い
# - NULL処理方式の違い

Feature Storeは単一のフィーチャー定義を学習とサービングの両方で共有し、この問題を根本的(こんぽんてき)に解決します。

1.2 フィーチャーの再利用とチーム協業

大規模ML組織では、数十チームが類似(るいじ)のフィーチャーを独立して生成しています。

問題Feature Store導入前Feature Store導入後
フィーチャー重複チームごとに類似フィーチャーを再生成中央レジストリから検索・再利用
一貫性チームごとに異なる計算ロジック単一定義、バージョン管理
鮮度手動更新自動マテリアライゼーション
発見可能性Slack/Wiki依存フィーチャーカタログ + メタデータ
ガバナンスなしオーナー、リネージ、アクセス制御

1.3 データパイプラインの複雑性削減

Feature Storeなしでは、各モデルが独自のデータパイプラインを維持します。

モデルA: raw data → ETL A → features A → training A
モデルB: raw data → ETL B → features B → training B
モデルC: raw data → ETL C → features C → training C

Feature Store導入後:

raw data → Feature Store(一元管理) → モデルA, B, C共有

2. Feature Storeアーキテクチャ

2.1 コアコンポーネント

Feature Storeは4つのコアコンポーネントで構成されます。

オフラインストア(Offline Store)

  • 大量の履歴フィーチャーデータを保存
  • 学習データ生成に使用
  • BigQuery、Snowflake、Redshift、Parquetファイル
  • Point-in-Time Joinをサポート

オンラインストア(Online Store)

  • 最新フィーチャー値の低遅延(ていちえん)参照
  • リアルタイム推論に使用
  • Redis、DynamoDB、Bigtable
  • P99レイテンシ10ms以下を目標

フィーチャーレジストリ(Feature Registry)

  • すべてのフィーチャーのメタデータ管理
  • 名前、型、オーナー、説明、タグ
  • データリネージ追跡

変換エンジン(Transformation Engine)

  • 生データからフィーチャーを生成
  • バッチ/ストリーミング変換をサポート
  • Spark、Flink、dbt統合

2.2 データフロー

[データソース][変換エンジン][オフラインストア] ←→ [オンラインストア]
      ↑                              ↓                        ↓
  生データ                      学習パイプライン          推論サービス
                            [フィーチャーレジストリ]
                            (メタデータ管理)

2.3 主要Feature Storeソリューション比較

ソリューション種類オフラインオンラインストリーミングコスト
FeastOSSRedshift/BQ/FileRedis/DynamoDBPush型無料(インフラのみ)
TectonマネージドSpark + DeltaDynamoDBSpark Streamingサブスクリプション
HopsworksOSS/マネージドHudiRonDBFlinkコミュニティ無料
Vertex AI FSGCPマネージドBigQueryBigtableDataflow従量課金
SageMaker FSAWSマネージドS3 + Glue内蔵オンラインKinesis従量課金

3. Feast ディープダイブ

3.1 インストールと初期設定

# Feastインストール
pip install feast

# プロジェクト初期化
feast init my_feature_store
cd my_feature_store

# プロジェクト構成
# my_feature_store/
#   feature_store.yaml    # プロジェクト設定
#   features.py           # フィーチャー定義
#   data/                 # サンプルデータ

feature_store.yaml 設定:

project: my_ml_project
registry: data/registry.db
provider: local  # local, gcp, aws
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: file  # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2

3.2 フィーチャー定義

# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

# エンティティ定義
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="ユーザー固有ID",
)

# データソース定義
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# フィーチャービュー定義
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="days_since_last_login", dtype=Int64),
        Field(name="preferred_category", dtype=String),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "recommendation", "version": "v2"},
)

3.3 マテリアライゼーション(オフライン → オンライン)

# フィーチャー定義の適用
feast apply

# オフライン → オンラインストアへのフィーチャー値マテリアライゼーション
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59

# インクリメンタルマテリアライゼーション(前回以降のみ)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

3.4 学習データ生成(Point-in-Time Join)

Point-in-Time JoinはFeature Storeの核心(かくしん)機能です。データリーケージ(Data Leakage)を防止しながら、過去の特定時点のフィーチャー値を正確に取得します。

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# 学習データのエンティティ + タイムスタンプ
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1001],
    "event_timestamp": pd.to_datetime([
        "2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
    ]),
})

# Point-in-Time Joinでフィーチャーを取得
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
).to_df()

print(training_df)
# 各行はその時点基準のフィーチャー値を持つ
# → 2024-09-01時点のuser 1001のフィーチャー != 2024-10-01時点のuser 1001のフィーチャー

3.5 オンラインサービング

# リアルタイム推論のためのオンラインフィーチャー取得
feature_vector = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ],
).to_dict()

# 結果:最新のフィーチャー値を返却(P99レイテンシ約5-10ms)

3.6 GCP/AWS本番設定

# GCP本番設定
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
  type: datastore  # または bigtable
  project_id: my-gcp-project
offline_store:
  type: bigquery
  project_id: my-gcp-project
  dataset: feast_features
# AWS本番設定
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
  type: dynamodb
  region: ap-northeast-1
offline_store:
  type: redshift
  cluster_id: my-redshift-cluster
  region: ap-northeast-1
  database: ml_features
  user: feast_user
  s3_staging_location: s3://my-bucket/feast-staging/

4. Feature Engineeringパターン

4.1 時間ベースフィーチャー(Temporal Features)

import pandas as pd

def create_temporal_features(df, timestamp_col, entity_col, value_col):
    """タイムウィンドウベースの集約フィーチャー生成"""
    df = df.sort_values(timestamp_col)

    features = pd.DataFrame()
    features[entity_col] = df[entity_col]
    features[timestamp_col] = df[timestamp_col]

    # 各ウィンドウの移動平均
    for window in ['1D', '7D', '30D']:
        features[f'{value_col}_mean_{window}'] = (
            df.groupby(entity_col)[value_col]
            .transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
        )

    # トレンドフィーチャー:7日平均 / 30日平均
    features[f'{value_col}_trend_7d_30d'] = (
        features[f'{value_col}_mean_7D'] /
        features[f'{value_col}_mean_30D'].replace(0, float('nan'))
    )

    # 曜日/時間フィーチャー
    features['day_of_week'] = df[timestamp_col].dt.dayofweek
    features['hour_of_day'] = df[timestamp_col].dt.hour
    features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)

    return features

4.2 集約フィーチャー(Aggregation Features)

def create_aggregation_features(events_df, entity_col, group_col):
    """エンティティごとの集約フィーチャー"""
    agg_features = events_df.groupby(entity_col).agg(
        event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
        unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
        total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
        avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
        max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
        std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
    ).reset_index()

    # 比率フィーチャー
    agg_features['high_value_ratio'] = (
        events_df[events_df['amount'] > 100]
        .groupby(entity_col)
        .size()
        .reindex(agg_features[entity_col], fill_value=0)
        .values / agg_features['event_count']
    )

    return agg_features

4.3 エンベディングフィーチャー

from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer('all-MiniLM-L6-v2')

def create_text_embedding_features(df, text_col, prefix='emb'):
    """テキストエンベディングフィーチャー生成"""
    embeddings = model.encode(df[text_col].fillna('').tolist())
    emb_df = pd.DataFrame(
        embeddings,
        columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
        index=df.index,
    )
    return pd.concat([df, emb_df], axis=1)

4.4 クロスフィーチャー(Cross Features)

def create_cross_features(df):
    """フィーチャー間のクロス/相互作用フィーチャー"""
    # 比率フィーチャー
    df['purchase_to_visit_ratio'] = (
        df['purchase_count'] / df['visit_count'].replace(0, 1)
    )

    # バケット化 + クロス
    df['age_bucket'] = pd.cut(
        df['age'], bins=[0, 25, 35, 50, 100],
        labels=['young', 'middle', 'senior', 'elder']
    )
    df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']

    # 数値相互作用
    df['income_x_age'] = df['income'] * df['age']

    return df

5. MLOps成熟度レベル

Googleが定義したMLOps成熟度(せいじゅくど)モデルは4段階に分かれます。

Level 0:手動プロセス

データサイエンティストが手動で:
1. データ収集と前処理
2. Jupyterでモデル学習
3. モデルファイルをエンジニアに引き渡し
4. エンジニアがサービングコードを作成
5. 手動デプロイ

問題点:
- デプロイサイクル:数ヶ月
- 再現性なし
- モニタリングなし

Level 1:MLパイプライン自動化

自動化されたMLパイプライン:
1. データ検証 → フィーチャーエンジニアリング → 学習 → 評価 → デプロイ
2. パイプラインオーケストレーター使用(Kubeflow、Airflow)
3. CT(Continuous Training):トリガーベースの自動再学習

改善点:
- デプロイサイクル:週単位
- パイプライン再現可能
- 基本的なモニタリング

Level 2:CI/CD for ML

CI/CDパイプライン統合:
1. コード変更 → 自動テスト → パイプラインビルド → モデル学習
2. モデル検証ゲート(性能閾値)
3. Shadow DeploymentCanaryFull Rollout
4. A/Bテスト自動化

改善点:
- デプロイサイクル:日単位
- 自動ロールバック
- 体系的な実験管理

Level 3:自動再学習 + 完全自動化

完全自動化:
1. ドリフト検知 → 自動再学習トリガー
2. 自動フィーチャー選択/ハイパーパラメータ最適化
3. モデル性能の自動比較 + チャンピオン/チャレンジャー
4. 自動スケーリング + コスト最適化

改善点:
- デプロイサイクル:時間単位
- 無人運用
- プロアクティブモニタリング

6. MLパイプラインオーケストレーション

6.1 Kubeflow Pipelines

# Kubeflow Pipelines DSLでパイプライン定義
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
    raw_data: Input[Dataset],
    processed_data: Output[Dataset],
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(raw_data.path)
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df.select_dtypes(include='number')),
        columns=df.select_dtypes(include='number').columns,
    )
    df_scaled.to_parquet(processed_data.path)


@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
    training_data: Input[Dataset],
    model_artifact: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 10,
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    import joblib

    df = pd.read_parquet(training_data.path)
    X = df.drop('target', axis=1)
    y = df['target']

    clf = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
    )
    scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')

    clf.fit(X, y)
    joblib.dump(clf, model_artifact.path)

    metrics.log_metric("f1_mean", float(scores.mean()))
    metrics.log_metric("f1_std", float(scores.std()))


@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
    preprocess_task = preprocess_data(
        raw_data=dsl.importer(
            artifact_uri="gs://my-bucket/raw-data/",
            artifact_class=Dataset,
        ).output,
    )

    train_task = train_model(
        training_data=preprocess_task.outputs["processed_data"],
        n_estimators=n_estimators,
        max_depth=max_depth,
    )


# コンパイルと実行
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

6.2 Airflow MLパイプライン

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'ml_training_pipeline',
    default_args=default_args,
    schedule_interval='@weekly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'training'],
) as dag:

    validate_data = KubernetesPodOperator(
        task_id='validate_data',
        name='data-validation',
        image='ml-pipeline:latest',
        cmds=['python', 'validate.py'],
        namespace='ml-pipelines',
    )

    extract_features = KubernetesPodOperator(
        task_id='extract_features',
        name='feature-extraction',
        image='ml-pipeline:latest',
        cmds=['python', 'extract_features.py'],
        namespace='ml-pipelines',
    )

    train_model_task = KubernetesPodOperator(
        task_id='train_model',
        name='model-training',
        image='ml-pipeline:latest',
        cmds=['python', 'train.py'],
        namespace='ml-pipelines',
    )

    evaluate_model = PythonOperator(
        task_id='evaluate_model',
        python_callable=lambda: print("Evaluating model..."),
    )

    validate_data >> extract_features >> train_model_task >> evaluate_model

7. 実験追跡(Experiment Tracking)

7.1 MLflow実験追跡

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, precision_score, recall_score

# MLflowサーバー設定
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")

# 実験実行
with mlflow.start_run(run_name="gbm-experiment-42") as run:
    # ハイパーパラメータのロギング
    params = {
        "n_estimators": 200,
        "max_depth": 8,
        "learning_rate": 0.1,
        "subsample": 0.8,
    }
    mlflow.log_params(params)

    # モデル学習
    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)

    # メトリクスのロギング
    y_pred = model.predict(X_test)
    metrics = {
        "f1": f1_score(y_test, y_pred, average='macro'),
        "precision": precision_score(y_test, y_pred, average='macro'),
        "recall": recall_score(y_test, y_pred, average='macro'),
    }
    mlflow.log_metrics(metrics)

    # モデルアーティファクトのロギング
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="recommendation-gbm",
    )

    mlflow.log_artifact("feature_importance.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

8. モデルレジストリ(Model Registry)

8.1 MLflow Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# モデルバージョン作成
model_version = client.create_model_version(
    name="recommendation-gbm",
    source=f"runs:/{run_id}/model",
    run_id=run_id,
    description="GBMモデルv3:新規フィーチャー追加",
)

# ステージ遷移:None → Staging
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False,
)

# ステージング検証後、本番昇格
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Production",
    archive_existing_versions=True,  # 既存の本番バージョンをアーカイブ
)

8.2 モデルバージョン管理戦略

モデルライフサイクル:
NoneStagingProductionArchived

バージョン管理ポリシー:
- Staging:自動性能テスト合格時
- Production:手動承認またはA/Bテスト合格時
- Archived:新バージョンの本番昇格時に自動
- 直近3バージョンを保持、それ以前は削除
# 本番モデルのロード
import mlflow.pyfunc

model = mlflow.pyfunc.load_model(
    model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)

9. モデルサービング

9.1 BentoML

# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON

# 保存済みモデルのロード
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()

svc = bentoml.Service("recommendation_service", runners=[runner])

@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
    features = np.array(input_data["features"]).reshape(1, -1)
    prediction = await runner.predict.async_run(features)
    return {
        "prediction": int(prediction[0]),
        "model_version": "v3.2.1",
    }
# ビルドとデプロイ
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest

9.2 Seldon Core(Kubernetes)

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: recommendation-model
  namespace: ml-serving
spec:
  predictors:
    - name: default
      replicas: 3
      graph:
        name: classifier
        implementation: SKLEARN_SERVER
        modelUri: s3://models/recommendation/v3
        envSecretRefName: s3-credentials
      componentSpecs:
        - spec:
            containers:
              - name: classifier
                resources:
                  requests:
                    cpu: "1"
                    memory: "2Gi"
                  limits:
                    cpu: "2"
                    memory: "4Gi"
      traffic: 100
      labels:
        version: v3

9.3 vLLM(LLMサービング)

# vLLMサーバー起動
# python -m vllm.entrypoints.openai.api_server \
#   --model meta-llama/Llama-3-8B-Instruct \
#   --tensor-parallel-size 2 \
#   --max-model-len 8192

# クライアント呼び出し
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed",
)

response = client.chat.completions.create(
    model="meta-llama/Llama-3-8B-Instruct",
    messages=[
        {"role": "user", "content": "推薦システムの利点は?"},
    ],
    temperature=0.7,
    max_tokens=512,
)

9.4 サービングソリューション比較

ソリューションモデル種類デプロイ環境バッチ推論GPU対応自動スケーリング
BentoML汎用Docker/K8s対応対応対応
Seldon Core汎用K8s専用対応対応HPA
TFServingTFモデルDocker/K8s対応対応手動
TritonマルチFWDocker/K8s対応最適化対応
vLLMLLMDocker/K8s非対応必須対応

10. モデルモニタリング

10.1 データドリフト検知

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# 基準データ(学習時点)
reference_data = pd.read_parquet("training_data.parquet")
# 現在データ(本番)
current_data = pd.read_parquet("production_data_latest.parquet")

column_mapping = ColumnMapping(
    target='label',
    numerical_features=['age', 'income', 'purchase_count'],
    categorical_features=['category', 'region'],
)

# ドリフトレポート生成
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

report.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

report.save_html("drift_report.html")

# プログラマティックアクセス
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']

if dataset_drift:
    print(f"ドリフト検知!ドリフトフィーチャー比率:{drift_share:.2%}")
    trigger_retraining_pipeline()

10.2 予測ドリフト検知

from evidently.test_suite import TestSuite
from evidently.tests import (
    TestColumnDrift,
    TestShareOfDriftedColumns,
    TestMeanInNSigmas,
)

test_suite = TestSuite(tests=[
    TestShareOfDriftedColumns(lt=0.3),  # ドリフトフィーチャー30%未満
    TestColumnDrift(column_name="prediction_score"),
    TestMeanInNSigmas(column_name="prediction_score", n=2),
])

test_suite.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

test_results = test_suite.as_dict()
all_passed = all(
    t['status'] == 'SUCCESS' for t in test_results['tests']
)

if not all_passed:
    alert_team("モデルドリフト検知 - 再学習レビューが必要です")

11. A/Bテスト

11.1 トラフィック分割

# Istio VirtualServiceでトラフィック分割
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: recommendation-ab-test
spec:
  hosts:
    - recommendation-service
  http:
    - match:
        - headers:
            x-experiment-group:
              exact: "treatment"
      route:
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 100
    - route:
        - destination:
            host: recommendation-service
            subset: v1-champion
          weight: 80
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 20

11.2 統計的有意性検定

from scipy import stats
import numpy as np

def ab_test_significance(
    control_conversions, control_total,
    treatment_conversions, treatment_total,
    alpha=0.05,
):
    """A/Bテストの統計的有意性検証"""
    control_rate = control_conversions / control_total
    treatment_rate = treatment_conversions / treatment_total

    # 比率のZ検定
    pooled_rate = (
        (control_conversions + treatment_conversions) /
        (control_total + treatment_total)
    )
    se = np.sqrt(
        pooled_rate * (1 - pooled_rate) *
        (1/control_total + 1/treatment_total)
    )
    z_stat = (treatment_rate - control_rate) / se
    p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

    lift = (treatment_rate - control_rate) / control_rate

    return {
        "control_rate": control_rate,
        "treatment_rate": treatment_rate,
        "lift": f"{lift:.2%}",
        "p_value": p_value,
        "significant": p_value < alpha,
        "recommendation": (
            "チャレンジャーモデルを昇格" if p_value < alpha and lift > 0
            else "現行モデルを維持"
        ),
    }

11.3 Multi-Armed Bandit

import numpy as np

class ThompsonSampling:
    """Thompson Samplingによる適応的トラフィック分割"""

    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.successes = np.ones(n_arms)
        self.failures = np.ones(n_arms)

    def select_arm(self):
        """Beta分布からサンプリングして最適armを選択"""
        samples = [
            np.random.beta(self.successes[i], self.failures[i])
            for i in range(self.n_arms)
        ]
        return int(np.argmax(samples))

    def update(self, arm, reward):
        if reward:
            self.successes[arm] += 1
        else:
            self.failures[arm] += 1

    def get_allocation(self):
        total = self.successes + self.failures
        rates = self.successes / total
        return rates / rates.sum()

12. CI/CD for ML

12.1 モデル検証ゲート

def validate_model(
    model_uri: str,
    test_data_path: str,
    min_f1: float = 0.85,
    max_latency_ms: float = 50.0,
):
    """モデルデプロイ前の検証ゲート"""
    results = {"passed": True, "checks": []}

    # 1. 性能検証
    model = mlflow.pyfunc.load_model(model_uri)
    test_data = pd.read_parquet(test_data_path)
    predictions = model.predict(test_data.drop('target', axis=1))
    f1 = f1_score(test_data['target'], predictions, average='macro')

    results["checks"].append({
        "name": "performance",
        "value": f1,
        "threshold": min_f1,
        "passed": f1 >= min_f1,
    })

    # 2. レイテンシ検証
    import time
    latencies = []
    for i in range(100):
        start = time.time()
        model.predict(test_data.iloc[[i]])
        latencies.append((time.time() - start) * 1000)

    p99_latency = np.percentile(latencies, 99)
    results["checks"].append({
        "name": "latency",
        "value": p99_latency,
        "threshold": max_latency_ms,
        "passed": p99_latency <= max_latency_ms,
    })

    results["passed"] = all(c["passed"] for c in results["checks"])
    return results

12.2 GitHub Actions ML CI/CD

name: ML CI/CD Pipeline
on:
  push:
    branches: [main]
    paths:
      - 'models/**'
      - 'features/**'

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate Data Schema
        run: python scripts/validate_data.py

  train-and-evaluate:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train Model
        run: python train.py --config configs/production.yaml

  model-validation:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    steps:
      - name: Run Validation Gates
        run: python model_validation.py

  deploy-production:
    needs: model-validation
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Canary Rollout
        run: |
          kubectl apply -f k8s/production/canary-10.yaml
          sleep 300
          python check_canary_metrics.py
          kubectl apply -f k8s/production/full-rollout.yaml

13. コスト最適化

13.1 バッチ vs リアルタイム推論

戦略期待削減率トレードオフ
バッチ推論への切り替え50-80%リアルタイム性の放棄
予測キャッシング30-60%キャッシュ鮮度
モデル量子化40-60%微細な精度低下
スポットインスタンス60-90%可用性リスク
オートスケーリング20-40%コールドスタート
モデル蒸留50-70%開発コスト

13.2 予測キャッシング

import redis
import hashlib
import json

class PredictionCache:
    def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
        self.hit_count = 0
        self.miss_count = 0

    def _make_key(self, features: dict) -> str:
        feature_str = json.dumps(features, sort_keys=True)
        return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"

    def get_or_predict(self, features: dict, model) -> dict:
        key = self._make_key(features)
        cached = self.redis.get(key)

        if cached:
            self.hit_count += 1
            return json.loads(cached)

        self.miss_count += 1
        prediction = model.predict(features)
        self.redis.setex(key, self.ttl, json.dumps(prediction))
        return prediction

14. クイズ

Q1:Feature Storeの核心的な価値は何ですか?

A: Feature Storeの核心的な価値はTraining-Serving Skewの防止です。学習時点とサービング時点で同一のフィーチャー定義と変換ロジックを共有し、データの不整合を根本的に排除します。加えて、フィーチャーの再利用、チーム協業、フィーチャーガバナンスも重要な価値です。

Q2:Point-in-Time Joinが必要な理由は?

A: Point-in-Time Joinはデータリーケージ(Data Leakage)を防止します。学習データ生成時、各イベント時点で実際に利用可能だったフィーチャー値のみをジョインする必要があります。未来のデータが含まれると、学習時は非現実的に高い性能を示しますが、本番では大幅に性能が低下します。

Q3:MLOps Level 2からLevel 3に移行するための核心要素は?

A: Level 2(CI/CD for ML)からLevel 3(自動再学習)への移行には、自動ドリフト検知と再学習トリガーシステムが必要です。データドリフトと予測ドリフトをリアルタイムでモニタリングし、閾値超過時に自動で再学習パイプラインを実行し、チャンピオン/チャレンジャー比較による自動昇格を行います。

Q4:BentoMLとSeldon Coreの主な違いは?

A: BentoMLはフレームワーク非依存のモデルパッケージングツールで、Dockerコンテナを生成してどこでもデプロイ可能です。Seldon CoreはKubernetesネイティブのサービングプラットフォームで、CRDベースのデプロイ、A/Bテスト、Canaryデプロイ、説明可能性(Explainer)などK8sエコシステムと深く統合されています。

Q5:モデルドリフトを検知する主要な方法3つは?

A: (1) データドリフト:入力フィーチャー分布の変化をKS検定、PSI(Population Stability Index)等で検知。(2) 予測ドリフト:モデル出力分布の変化をモニタリング。(3) 性能ドリフト:実際のラベルと比較して精度、F1等のメトリクス低下を追跡。Evidently AIとWhyLabsが代表的なツールです。


15. 参考資料

  1. Feast公式ドキュメント - https://docs.feast.dev/
  2. MLflow公式ドキュメント - https://mlflow.org/docs/latest/index.html
  3. Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
  4. Evidently AIドキュメント - https://docs.evidentlyai.com/
  5. BentoML公式ドキュメント - https://docs.bentoml.com/
  6. Seldon Coreドキュメント - https://docs.seldon.io/
  7. Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
  8. WhyLabsドキュメント - https://docs.whylabs.ai/
  9. vLLMプロジェクト - https://docs.vllm.ai/
  10. Weights and Biases - https://docs.wandb.ai/
  11. Tecton Feature Store - https://docs.tecton.ai/
  12. Hopsworks Feature Store - https://docs.hopsworks.ai/
  13. NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/

현재 단락 (1/725)

MLモデル開発で最も頻繁(ひんぱん)な障害原因(しょうがいげんいん)は、**学習時点とサービング時点のフィーチャー不整合**です。

작성 글자: 0원문 글자: 22,424작성 단락: 0/725