Skip to content
Published on

MLOps完全ガイド: MLパイプラインから本番デプロイまで

Authors

はじめに

現代の機械学習プロジェクトは、Jupyter Notebookでモデルを構築したら終わりではありません。データ収集、前処理、学習、評価、デプロイ、モニタリング、再学習というライフサイクル全体を確実に管理する必要があります。これこそがMLOpsが存在する理由です。

このガイドでは、MLOpsの基礎から実践的なツールの使い方まで、体系的に解説します。


1. MLOpsとは何か?

1.1 MLOpsの定義

MLOps(Machine Learning Operations)は、MLシステムの開発と運用を統合する方法論です。DevOpsの原則をMLワークフローに適用し、モデルの継続的学習(CT)、継続的インテグレーション(CI)、継続的デプロイ(CD)を自動化します。

1.2 MLOps vs DevOps vs DataOps

カテゴリDevOpsDataOpsMLOps
焦点ソフトウェアデプロイデータパイプラインMLモデルライフサイクル
成果物アプリケーションデータ/レポートMLモデル
再現性コードバージョン管理データバージョン管理コード+データ+モデルのバージョン管理
自動化CI/CDデータテスト学習+評価+デプロイ

MLOpsとDevOpsの最大の違いはデータ依存性です。同じコードでも、異なるデータで学習すると全く異なるモデルが生成され、モデルの性能はコード品質と同様にデータ品質にも強く依存します。

1.3 MLシステムの固有の特性

MLシステムには、従来のソフトウェアとは異なるいくつかの特性があります:

  1. データ依存性: モデルの動作はコードだけでなくデータによって決まる
  2. 実験的性質: 数十〜数百回の反復実験が必要
  3. モデルの劣化: データ分布の変化に伴い、モデル性能が低下する
  4. 再現性の課題: 同じ環境で同一の結果を保証することが難しい
  5. 複数のアーティファクト: コード、データ、モデルすべてにバージョン管理が必要

1.4 MLOps成熟度モデル

GoogleのMLOps成熟度レベルに基づく:

レベル0 - 手動ML

  • すべてのプロセスが手動
  • スクリプトベースの実験
  • まれで手動のデプロイ
  • モニタリングなし

レベル1 - MLパイプライン自動化

  • 自動化された学習パイプライン
  • 継続的学習が可能
  • 実験トラッキングの開始
  • フィーチャーストアの導入

レベル2 - CI/CDパイプライン自動化

  • 完全自動化されたCI/CD
  • モデルレジストリの使用
  • 自動再学習トリガー
  • 完全なモニタリング

1.5 MLOpsツールエコシステム

データバージョン管理: DVC, LakeFS, Delta Lake
実験トラッキング: MLflow, W&B, Neptune, Comet ML
パイプライン: Airflow, Prefect, Metaflow, Kubeflow Pipelines
モデルレジストリ: MLflow Registry, W&B Artifacts, Vertex AI
コンテナ化: Docker, Podman
オーケストレーション: Kubernetes, ECS, GKE
モデルサービング: Triton, TorchServe, BentoML, KServe
モニタリング: Evidently, WhyLogs, Arize, Fiddler
フィーチャーストア: Feast, Tecton, Vertex AI Feature Store

2. データバージョン管理(DVC)

2.1 DVCの概要

DVC(Data Version Control)は、Gitと連携して動作するMLプロジェクト用のバージョン管理ツールです。大規模なデータセットやMLモデルをGitと同様にバージョン管理できます。

# DVCのインストール
pip install dvc

# S3サポート付きインストール
pip install "dvc[s3]"

# GCSサポート付きインストール
pip install "dvc[gs]"

# 全リモートサポート付きインストール
pip install "dvc[all]"

2.2 DVCの初期化と基本的な使い方

# Gitリポジトリの初期化(必要な場合)
git init

# DVCの初期化
dvc init

# 作成されたファイルの確認
ls .dvc/
# config  .gitignore  tmp/

# データのトラッキング開始
dvc add data/train.csv

# .dvcファイルが作成される(Gitでトラッキング)
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"

2.3 リモートストレージの設定

# S3リモートストレージの設定
dvc remote add -d myremote s3://my-bucket/dvc-store

# AWS認証情報の設定(環境変数または~/.aws/credentials)
dvc remote modify myremote access_key_id YOUR_ACCESS_KEY
dvc remote modify myremote secret_access_key YOUR_SECRET_KEY

# GCSリモートストレージ
dvc remote add -d gcsstorage gs://my-bucket/dvc-store

# ローカルリモートストレージ(テスト用)
dvc remote add -d localremote /tmp/dvc-storage

# データのプッシュ
dvc push

# データのプル
dvc pull

2.4 DVCパイプライン

DVCパイプラインはステージ間の依存関係をトラッキングし、変更されたステージのみを再実行します。

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw
    outs:
      - data/prepared

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/prepared
    outs:
      - data/features

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features
    params:
      - params.yaml:
          - train.lr
          - train.n_estimators
    outs:
      - models/model.pkl
    metrics:
      - metrics/scores.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/features
    metrics:
      - metrics/eval.json:
          cache: false
# パイプラインの実行
dvc repro

# 特定のステージまで実行
dvc repro train

# パイプラインDAGの可視化
dvc dag

# 実験結果の比較
dvc metrics show
dvc metrics diff HEAD~1

2.5 パラメータ管理

# params.yaml
train:
  lr: 0.001
  n_estimators: 100
  max_depth: 5
  batch_size: 32
  epochs: 10

data:
  test_size: 0.2
  random_state: 42
# src/train.py
import yaml

# パラメータの読み込み
with open("params.yaml") as f:
    params = yaml.safe_load(f)

lr = params["train"]["lr"]
n_estimators = params["train"]["n_estimators"]

# モデルの学習
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(
    n_estimators=n_estimators,
    max_depth=params["train"]["max_depth"],
    random_state=params["data"]["random_state"]
)

2.6 Git + DVCワークフロー

# 新しい実験ブランチの作成
git checkout -b experiment/increase-lr

# パラメータの変更
# params.yamlで lr: 0.001 を lr: 0.01 に変更

# パイプラインの再実行
dvc repro

# 結果の確認
dvc metrics show

# コミット
git add dvc.lock params.yaml
git commit -m "Experiment: increase learning rate to 0.01"

# 実験の比較
git checkout main
dvc metrics diff experiment/increase-lr

3. 実験トラッキング

3.1 MLflowのインストールと設定

# MLflowのインストール
pip install mlflow

# MLflow UIの起動
mlflow ui

# ホストとポートの指定
mlflow ui --host 0.0.0.0 --port 5001

# リモートトラッキングサーバーの設定
export MLFLOW_TRACKING_URI=http://mlflow-server:5000

3.2 MLflowの基本的な使い方

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

# トラッキングURIの設定(デフォルト: ./mlruns)
mlflow.set_tracking_uri("http://localhost:5000")

# 実験の設定
mlflow.set_experiment("my-classification-experiment")

# 実験の実行
with mlflow.start_run(run_name="random-forest-v1"):
    # パラメータのログ
    n_estimators = 100
    max_depth = 5
    lr = 0.001

    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    mlflow.log_param("learning_rate", lr)

    # モデルの学習
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42
    )
    model.fit(X_train, y_train)

    # メトリクスのログ
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1)

    # ステップごとのメトリクス(学習曲線)
    for epoch in range(10):
        train_loss = np.random.random() * 0.5 / (epoch + 1)
        mlflow.log_metric("train_loss", train_loss, step=epoch)

    # アーティファクトのログ
    mlflow.log_artifact("data/train.csv")

    # モデルのログ
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="RandomForestClassifier"
    )

    print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
    print(f"Run ID: {mlflow.active_run().info.run_id}")

3.3 MLflow自動ログ

# sklearnの自動ログを有効化
mlflow.sklearn.autolog()

with mlflow.start_run():
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)
    # パラメータ、メトリクス、モデルが自動的にログされる

# PyTorchの自動ログ
mlflow.pytorch.autolog()

# XGBoostの自動ログ
mlflow.xgboost.autolog()

3.4 実例:PyTorch学習のトラッキング

import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch

class SimpleNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)

def train_with_mlflow(
    model, train_loader, val_loader,
    epochs=10, lr=0.001, experiment_name="pytorch-training"
):
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run():
        # ハイパーパラメータのログ
        mlflow.log_params({
            "epochs": epochs,
            "learning_rate": lr,
            "model_type": "SimpleNet",
            "optimizer": "Adam",
            "batch_size": train_loader.batch_size,
        })

        optimizer = optim.Adam(model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

        for epoch in range(epochs):
            # 学習
            model.train()
            train_loss = 0.0
            for batch_X, batch_y in train_loader:
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            train_loss /= len(train_loader)

            # 検証
            model.eval()
            val_loss = 0.0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    outputs = model(batch_X)
                    loss = criterion(outputs, batch_y)
                    val_loss += loss.item()

                    _, predicted = outputs.max(1)
                    total += batch_y.size(0)
                    correct += predicted.eq(batch_y).sum().item()

            val_loss /= len(val_loader)
            val_accuracy = correct / total

            # ステップごとのメトリクスログ
            mlflow.log_metrics({
                "train_loss": train_loss,
                "val_loss": val_loss,
                "val_accuracy": val_accuracy,
                "learning_rate": scheduler.get_last_lr()[0],
            }, step=epoch)

            scheduler.step()

            print(f"Epoch {epoch+1}/{epochs} | "
                  f"Train Loss: {train_loss:.4f} | "
                  f"Val Loss: {val_loss:.4f} | "
                  f"Val Acc: {val_accuracy:.4f}")

        # 最終モデルの保存
        mlflow.pytorch.log_model(model, "model")

        return mlflow.active_run().info.run_id

3.5 W&B(Weights & Biases)の使い方

# W&Bのインストール
pip install wandb

# ログイン
wandb login
import wandb
import torch

# W&Bの初期化
wandb.init(
    project="my-ml-project",
    name="experiment-001",
    config={
        "learning_rate": 0.001,
        "epochs": 100,
        "batch_size": 64,
        "architecture": "ResNet50",
        "dataset": "ImageNet",
    }
)

# 設定値へのアクセス
lr = wandb.config.learning_rate

# 学習ループ
for epoch in range(wandb.config.epochs):
    train_loss, train_acc = train_epoch(model, train_loader)
    val_loss, val_acc = eval_epoch(model, val_loader)

    # メトリクスのログ
    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "train_accuracy": train_acc,
        "val_loss": val_loss,
        "val_accuracy": val_acc,
        "learning_rate": optimizer.param_groups[0]['lr'],
    })

# モデルの保存
wandb.save("model.pth")

# 画像のログ
wandb.log({
    "predictions": [
        wandb.Image(img, caption=f"Pred: {pred}, True: {true}")
        for img, pred, true in sample_predictions
    ]
})

# W&B Artifacts(データセットのバージョン管理)
artifact = wandb.Artifact("training-data", type="dataset")
artifact.add_dir("data/train")
wandb.log_artifact(artifact)

wandb.finish()

3.6 W&B Sweeps(ハイパーパラメータ最適化)

import wandb

# スイープ設定
sweep_config = {
    "method": "bayes",  # random, grid, bayes
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "learning_rate": {
            "distribution": "log_uniform_values",
            "min": 1e-5,
            "max": 1e-1,
        },
        "batch_size": {
            "values": [16, 32, 64, 128]
        },
        "hidden_dim": {
            "values": [64, 128, 256, 512]
        },
        "dropout": {
            "distribution": "uniform",
            "min": 0.1,
            "max": 0.5,
        }
    }
}

def train_sweep():
    with wandb.init() as run:
        config = wandb.config

        model = SimpleNet(
            input_dim=784,
            hidden_dim=config.hidden_dim,
            output_dim=10
        )

        optimizer = torch.optim.Adam(
            model.parameters(),
            lr=config.learning_rate
        )

        for epoch in range(10):
            train_loss, val_loss, val_acc = run_epoch(
                model, optimizer, config.batch_size
            )
            wandb.log({
                "train_loss": train_loss,
                "val_loss": val_loss,
                "val_accuracy": val_acc,
            })

# スイープの作成と実行
sweep_id = wandb.sweep(sweep_config, project="my-project")
wandb.agent(sweep_id, function=train_sweep, count=50)

4. MLパイプラインオーケストレーション

4.1 Apache AirflowによるMLパイプライン

# dags/ml_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'mlops-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='ML model training pipeline',
    schedule_interval='0 2 * * *',  # 毎日午前2時
    catchup=False,
)

def extract_data(**kwargs):
    """データの抽出"""
    import pandas as pd
    from sqlalchemy import create_engine

    engine = create_engine('postgresql://user:pass@db:5432/mldb')
    df = pd.read_sql(
        "SELECT * FROM features WHERE date >= CURRENT_DATE - 7",
        engine
    )

    output_path = '/tmp/raw_data.parquet'
    df.to_parquet(output_path)
    kwargs['ti'].xcom_push(key='data_path', value=output_path)
    return output_path

def preprocess_data(**kwargs):
    """データの前処理"""
    ti = kwargs['ti']
    data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')

    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(data_path)
    scaler = StandardScaler()
    features = df.drop('target', axis=1)
    scaled_features = scaler.fit_transform(features)

    processed_path = '/tmp/processed_data.parquet'
    pd.DataFrame(scaled_features).to_parquet(processed_path)
    ti.xcom_push(key='processed_path', value=processed_path)

def train_model(**kwargs):
    """モデルの学習"""
    ti = kwargs['ti']
    processed_path = ti.xcom_pull(
        task_ids='preprocess_data',
        key='processed_path'
    )

    import mlflow
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import train_test_split

    mlflow.set_experiment("production-training")

    with mlflow.start_run():
        data = pd.read_parquet(processed_path)
        X = data.drop('target', axis=1)
        y = data['target']
        X_train, X_test, y_train, y_test = train_test_split(X, y)

        model = GradientBoostingClassifier(n_estimators=200)
        model.fit(X_train, y_train)

        accuracy = model.score(X_test, y_test)
        mlflow.log_metric("accuracy", accuracy)
        mlflow.sklearn.log_model(model, "model")

        run_id = mlflow.active_run().info.run_id
        ti.xcom_push(key='run_id', value=run_id)

def evaluate_and_deploy(**kwargs):
    """モデルの評価とデプロイ"""
    ti = kwargs['ti']
    run_id = ti.xcom_pull(task_ids='train_model', key='run_id')

    import mlflow
    client = mlflow.tracking.MlflowClient()

    run = client.get_run(run_id)
    accuracy = run.data.metrics['accuracy']

    if accuracy >= 0.90:
        model_uri = f"runs:/{run_id}/model"
        mlflow.register_model(model_uri, "ProductionModel")
        print(f"Model deployed with accuracy: {accuracy:.4f}")
    else:
        raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.90")

# タスクの定義
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

preprocess_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

evaluate_task = PythonOperator(
    task_id='evaluate_and_deploy',
    python_callable=evaluate_and_deploy,
    dag=dag,
)

# 依存関係の設定
extract_task >> preprocess_task >> train_task >> evaluate_task

4.2 PrefectによるMLパイプライン

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(data_path: str) -> pd.DataFrame:
    """データの読み込み(キャッシュ付き)"""
    return pd.read_parquet(data_path)

@task(retries=3, retry_delay_seconds=10)
def preprocess(df: pd.DataFrame) -> pd.DataFrame:
    """データの前処理"""
    df = df.dropna()
    df = df.drop_duplicates()
    return df

@task
def train(df: pd.DataFrame, params: dict) -> str:
    """モデルの学習"""
    from sklearn.ensemble import RandomForestClassifier

    mlflow.set_experiment("prefect-ml-pipeline")

    with mlflow.start_run():
        model = RandomForestClassifier(**params)
        X = df.drop('target', axis=1)
        y = df['target']
        model.fit(X, y)

        mlflow.sklearn.log_model(model, "model")
        mlflow.log_params(params)

        return mlflow.active_run().info.run_id

@task
def evaluate(run_id: str) -> float:
    """モデルの評価"""
    client = mlflow.tracking.MlflowClient()
    run = client.get_run(run_id)
    return run.data.metrics.get('accuracy', 0.0)

@flow(name="ml-training-pipeline")
def ml_pipeline(data_path: str, params: dict = None):
    """メインパイプライン"""
    if params is None:
        params = {"n_estimators": 100, "max_depth": 5}

    df = load_data(data_path)
    processed_df = preprocess(df)
    run_id = train(processed_df, params)
    accuracy = evaluate(run_id)

    print(f"Pipeline complete. Accuracy: {accuracy:.4f}")
    return accuracy

if __name__ == "__main__":
    ml_pipeline(
        data_path="data/train.parquet",
        params={"n_estimators": 200, "max_depth": 7}
    )

5. モデルレジストリ

5.1 MLflowモデルレジストリ

モデルレジストリは、モデルバージョンの一元管理と、Staging/Productionの状態追跡を提供します。

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# モデルの登録
model_uri = "runs:/abc123def456/model"
registered_model = mlflow.register_model(model_uri, "MyClassifier")

# バージョン情報の確認
print(f"Version: {registered_model.version}")
print(f"Status: {registered_model.status}")

# モデルバージョンのメタデータ更新
client.update_model_version(
    name="MyClassifier",
    version=registered_model.version,
    description="Random Forest with 200 estimators, accuracy 0.94"
)

# タグの追加
client.set_model_version_tag(
    name="MyClassifier",
    version=registered_model.version,
    key="validated_by",
    value="data-science-team"
)

5.2 StagingからProductionへの昇格

# Stagingへの移行
client.transition_model_version_stage(
    name="MyClassifier",
    version=1,
    stage="Staging",
    archive_existing_versions=False
)

# 検証後にProductionへ昇格
def promote_to_production(model_name: str, version: int, min_accuracy: float = 0.90):
    """モデルをProductionに昇格"""
    model_version = client.get_model_version(model_name, version)
    run_id = model_version.run_id
    run = client.get_run(run_id)

    accuracy = run.data.metrics.get('accuracy', 0)

    if accuracy < min_accuracy:
        raise ValueError(
            f"Model accuracy {accuracy:.4f} is below minimum {min_accuracy}"
        )

    # Productionへの昇格(既存のProductionバージョンをアーカイブ)
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production",
        archive_existing_versions=True
    )

    print(f"Model {model_name} v{version} promoted to Production!")
    print(f"Accuracy: {accuracy:.4f}")

# Productionモデルの読み込み
production_model = mlflow.pyfunc.load_model(
    model_uri=f"models:/MyClassifier/Production"
)
predictions = production_model.predict(X_test)

5.3 モデルサービング(MLflow組み込み)

# MLflowモデルのサービング
mlflow models serve \
  -m "models:/MyClassifier/Production" \
  --host 0.0.0.0 \
  --port 5001

# 予測リクエスト
curl -X POST http://localhost:5001/invocations \
  -H "Content-Type: application/json" \
  -d '{"dataframe_split": {"columns": ["feature1", "feature2"], "data": [[1.0, 2.0]]}}'

6. コンテナ化(Docker)

6.1 ML環境のDockerization

# Dockerfile.train - 学習用イメージ
FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    git \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY params.yaml .
COPY dvc.yaml .

CMD ["python", "src/train.py"]

6.2 マルチステージビルド

# Dockerfile.serve - サービング用マルチステージビルド
# ステージ1: ビルダー
FROM python:3.11 AS builder

WORKDIR /build

COPY requirements.txt .
RUN pip install --no-cache-dir --target /install -r requirements.txt

# ステージ2: 最終スリムイメージ
FROM python:3.11-slim AS runtime

WORKDIR /app

COPY --from=builder /install /usr/local/lib/python3.11/site-packages

COPY src/serve.py .
COPY models/ ./models/

EXPOSE 8080

RUN useradd -m -u 1000 mluser
USER mluser

CMD ["python", "serve.py"]

6.3 GPU Docker(nvidia-docker)

# Dockerfile.gpu - GPU対応イメージ
FROM nvidia/cuda:12.1.0-cudnn8-devel-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \
    python3.11 \
    python3-pip \
    git \
    && rm -rf /var/lib/apt/lists/*

RUN pip3 install torch torchvision torchaudio \
    --index-url https://download.pytorch.org/whl/cu121

WORKDIR /app
COPY requirements-gpu.txt .
RUN pip3 install --no-cache-dir -r requirements-gpu.txt

COPY src/ ./src/

CMD ["python3", "src/train_gpu.py"]
# GPUコンテナの実行
docker run --gpus all \
  -v /data:/data \
  -v /models:/models \
  --shm-size=8gb \
  my-ml-gpu:latest

6.4 Docker ComposeによるMLスタック

# docker-compose.yml
version: '3.8'

services:
  mlflow:
    image: ghcr.io/mlflow/mlflow:latest
    command: >
      mlflow server
      --backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
      --default-artifact-root s3://my-bucket/mlflow
      --host 0.0.0.0
      --port 5000
    ports:
      - '5000:5000'
    depends_on:
      - postgres

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: mlflow
      POSTGRES_USER: mlflow
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data

  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    ports:
      - '9000:9000'
      - '9001:9001'
    volumes:
      - minio_data:/data

  trainer:
    build:
      context: .
      dockerfile: Dockerfile.train
    environment:
      MLFLOW_TRACKING_URI: http://mlflow:5000
    volumes:
      - ./data:/app/data
      - ./models:/app/models
    depends_on:
      - mlflow
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

volumes:
  postgres_data:
  minio_data:

6.5 実例:PyTorchモデルサーバーのDockerfile

# Dockerfile.pytorch-serve
FROM python:3.11-slim

WORKDIR /app

RUN pip install --no-cache-dir \
    torch==2.2.0+cpu \
    torchvision \
    fastapi \
    uvicorn \
    pydantic \
    numpy \
    pillow \
    --index-url https://download.pytorch.org/whl/cpu

COPY src/server.py .
COPY models/model.pt .

EXPOSE 8000

CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# src/server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import List

app = FastAPI(title="ML Model Server")

# サーバー起動時にモデルを一度読み込む
model = torch.jit.load("model.pt")
model.eval()

class PredictionRequest(BaseModel):
    features: List[List[float]]

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    try:
        tensor = torch.tensor(request.features, dtype=torch.float32)

        with torch.no_grad():
            outputs = model(tensor)
            probabilities = torch.softmax(outputs, dim=1)
            predictions = torch.argmax(probabilities, dim=1)

        return PredictionResponse(
            predictions=predictions.tolist(),
            probabilities=probabilities.tolist()
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

7. ML向けKubernetes

7.1 MLワークロード向けK8sの基礎

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: ml-platform
  labels:
    app: ml-platform
    environment: production

7.2 学習ジョブ

# k8s/training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: ml-training-job
  namespace: ml-platform
spec:
  backoffLimit: 3
  template:
    spec:
      restartPolicy: OnFailure

      nodeSelector:
        accelerator: nvidia-t4

      tolerations:
        - key: nvidia.com/gpu
          operator: Exists
          effect: NoSchedule

      containers:
        - name: trainer
          image: my-registry/ml-trainer:v1.2.0

          resources:
            requests:
              memory: '8Gi'
              cpu: '4'
              nvidia.com/gpu: '1'
            limits:
              memory: '16Gi'
              cpu: '8'
              nvidia.com/gpu: '1'

          env:
            - name: MLFLOW_TRACKING_URI
              value: 'http://mlflow-service.ml-platform:5000'
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: aws-credentials
                  key: access_key_id

          volumeMounts:
            - name: training-data
              mountPath: /data
            - name: dshm
              mountPath: /dev/shm

      volumes:
        - name: training-data
          persistentVolumeClaim:
            claimName: training-data-pvc
        - name: dshm
          emptyDir:
            medium: Memory
            sizeLimit: 8Gi

7.3 定期再学習のCronJob

# k8s/training-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: daily-retraining
  namespace: ml-platform
spec:
  schedule: '0 2 * * *' # 毎日午前2時
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 3

  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
            - name: retrainer
              image: my-registry/ml-trainer:latest
              command: ['python', 'src/retrain.py']
              resources:
                requests:
                  memory: '4Gi'
                  cpu: '2'
                limits:
                  memory: '8Gi'
                  cpu: '4'

7.4 HPAを使用したモデルサービングDeployment

# k8s/model-serving-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-server
  namespace: ml-platform
spec:
  replicas: 3
  selector:
    matchLabels:
      app: model-server

  template:
    metadata:
      labels:
        app: model-server
    spec:
      containers:
        - name: model-server
          image: my-registry/model-server:v1.0.0
          ports:
            - containerPort: 8000

          resources:
            requests:
              memory: '2Gi'
              cpu: '1'
            limits:
              memory: '4Gi'
              cpu: '2'

          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10

          readinessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 15
            periodSeconds: 5

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: model-server-hpa
  namespace: ml-platform
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: model-server
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

8. Kubeflow

8.1 Kubeflowの概要

KubeflowはKubernetes上でMLワークフローを管理するオープンソースプラットフォームです。以下のコンポーネントで構成されています:

  • Pipelines: MLワークフローオーケストレーション
  • Notebooks: マネージドJupyterノートブック
  • Katib: ハイパーパラメータチューニング
  • KServe: モデルサービング
  • Training Operator: 分散学習

8.2 Kubeflowパイプライン

# kubeflow_pipeline.py
import kfp
from kfp import dsl

@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(data_path: str, output_path: kfp.dsl.OutputPath(str)):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_parquet(data_path)
    train, test = train_test_split(df, test_size=0.2)
    train.to_parquet(f"{output_path}/train.parquet")
    test.to_parquet(f"{output_path}/test.parquet")

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "mlflow", "pandas"]
)
def train_model(
    data_path: str,
    n_estimators: int,
    max_depth: int,
    model_output: kfp.dsl.OutputPath(str),
    mlflow_uri: str
):
    import mlflow
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier

    mlflow.set_tracking_uri(mlflow_uri)

    df = pd.read_parquet(f"{data_path}/train.parquet")
    X = df.drop('target', axis=1)
    y = df['target']

    with mlflow.start_run():
        model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth
        )
        model.fit(X, y)

        run_id = mlflow.active_run().info.run_id
        mlflow.sklearn.log_model(model, "model")

    with open(model_output, 'w') as f:
        f.write(run_id)

@dsl.pipeline(
    name="ML Training Pipeline",
    description="End-to-end ML training pipeline"
)
def ml_pipeline(
    data_path: str = "gs://my-bucket/data/train.parquet",
    n_estimators: int = 100,
    max_depth: int = 5,
    mlflow_uri: str = "http://mlflow:5000"
):
    prepare_task = prepare_data(data_path=data_path)

    train_task = train_model(
        data_path=prepare_task.output,
        n_estimators=n_estimators,
        max_depth=max_depth,
        mlflow_uri=mlflow_uri
    )

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(
        pipeline_func=ml_pipeline,
        package_path="ml_pipeline.yaml"
    )

    client = kfp.Client(host="http://kubeflow-host/pipeline")
    run = client.create_run_from_pipeline_package(
        pipeline_file="ml_pipeline.yaml",
        arguments={"n_estimators": 200, "max_depth": 7},
        run_name="training-run-001"
    )

9. ML向けCI/CD

9.1 ML向けGitHub Actions

# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'params.yaml'
      - 'requirements.txt'
  pull_request:
    branches: [main]

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}

jobs:
  test:
    name: Unit Tests
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov
      - name: Run unit tests
        run: pytest tests/unit/ -v --cov=src --cov-report=xml

  train-and-evaluate:
    name: Train and Evaluate Model
    runs-on: ubuntu-latest
    needs: test
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run DVC pipeline
        run: dvc repro
      - name: Check model performance
        run: |
          python scripts/check_metrics.py \
            --min-accuracy 0.90 \
            --metrics-file metrics/scores.json
      - name: Register model
        run: python scripts/register_model.py

  build-and-push:
    name: Build Docker Image
    runs-on: ubuntu-latest
    needs: train-and-evaluate
    steps:
      - uses: actions/checkout@v4
      - name: Login to Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      - name: Build and push
        uses: docker/build-push-action@v5
        with:
          context: .
          file: Dockerfile.serve
          push: true
          tags: |
            ghcr.io/${{ github.repository }}/model-server:latest
            ghcr.io/${{ github.repository }}/model-server:${{ github.sha }}

  deploy:
    name: Deploy to Kubernetes
    runs-on: ubuntu-latest
    needs: build-and-push
    environment: production
    steps:
      - uses: actions/checkout@v4
      - name: Configure kubectl
        uses: azure/k8s-set-context@v3
        with:
          kubeconfig: ${{ secrets.KUBECONFIG }}
      - name: Deploy
        run: |
          kubectl set image deployment/model-server \
            model-server=ghcr.io/${{ github.repository }}/model-server:${{ github.sha }} \
            -n ml-platform
          kubectl rollout status deployment/model-server -n ml-platform

10. モデルモニタリング

10.1 Evidently AIを使用したデータドリフト検出

pip install evidently
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import ColumnDriftMetric

# 参照データ(学習時のデータ)
reference_data = pd.read_parquet("data/reference.parquet")

# 現在のデータ(本番環境での予測データ)
current_data = pd.read_parquet("data/current.parquet")

# データドリフトレポート
drift_report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
    ColumnDriftMetric(column_name="age"),
    ColumnDriftMetric(column_name="income"),
])

drift_report.run(
    reference_data=reference_data,
    current_data=current_data
)

drift_report.save_html("drift_report.html")

result = drift_report.as_dict()
drift_detected = result['metrics'][0]['result']['dataset_drift']

if drift_detected:
    print("Data drift detected! Consider retraining.")
    send_alert("Data drift detected in production model")

10.2 モデルパフォーマンスモニタリング

from evidently.report import Report
from evidently.metric_preset import ClassificationPreset

prediction_data = pd.DataFrame({
    'target': y_true,
    'prediction': y_pred,
    'prediction_proba': y_proba,
    'feature1': X_test['feature1'],
    'feature2': X_test['feature2'],
})

performance_report = Report(metrics=[ClassificationPreset()])
performance_report.run(
    reference_data=reference_predictions,
    current_data=prediction_data
)

metrics = performance_report.as_dict()
current_accuracy = metrics['metrics'][0]['result']['current']['accuracy']
reference_accuracy = metrics['metrics'][0]['result']['reference']['accuracy']

degradation = reference_accuracy - current_accuracy
if degradation > 0.05:
    trigger_retraining(f"Model degraded by {degradation:.2%}")

10.3 Prometheusによるリアルタイムモニタリング

from prometheus_client import Counter, Histogram, Gauge, start_http_server

prediction_counter = Counter('model_predictions_total', 'Total predictions')
prediction_latency = Histogram('model_prediction_duration_seconds', 'Prediction latency')
model_accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
drift_score_gauge = Gauge('data_drift_score', 'Data drift score')

class MonitoredModelServer:
    def __init__(self, model, reference_data):
        self.model = model
        self.reference_data = reference_data
        self.predictions_buffer = []
        start_http_server(8001)

    def predict(self, features):
        with prediction_latency.time():
            prediction = self.model.predict(features)

        prediction_counter.inc()
        self.predictions_buffer.append(prediction)

        if len(self.predictions_buffer) >= 1000:
            self._run_monitoring()

        return prediction

    def _run_monitoring(self):
        current_df = pd.DataFrame(self.predictions_buffer)
        drift_report = Report(metrics=[DataDriftPreset()])
        drift_report.run(
            reference_data=self.reference_data,
            current_data=current_df
        )

        result = drift_report.as_dict()
        drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
        drift_score_gauge.set(drift_share)

        self.predictions_buffer = []

11. フィーチャーストア

11.1 フィーチャーストアの概念

フィーチャーストアは、ML特徴量の一元管理と再利用を可能にするインフラです。

主要な概念:

  • オンラインストア: リアルタイム予測のための低レイテンシな特徴量取得(Redis、DynamoDB)
  • オフラインストア: バッチ学習のための履歴特徴量(S3、BigQuery)
  • フィーチャービュー: 特徴量定義とデータソースのマッピング
  • エンティティ: 特徴量の主体(ユーザーID、商品IDなど)

11.2 Feastのセットアップと使い方

pip install feast
feast init my-feature-store
cd my-feature-store
# features.py
from feast import Entity, FeatureView, FileSource, ValueType, Field
from feast.types import Float64, Int64
from datetime import timedelta

user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User ID"
)

user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
)

user_stats_fv = FeatureView(
    name="user_statistics",
    entities=[user],
    ttl=timedelta(days=30),
    schema=[
        Field(name="purchase_count_7d", dtype=Int64),
        Field(name="purchase_amount_7d", dtype=Float64),
        Field(name="avg_session_duration", dtype=Float64),
        Field(name="last_purchase_days_ago", dtype=Int64),
    ],
    online=True,
    source=user_stats_source,
)
# feast_usage.py
from feast import FeatureStore
import pandas as pd
from datetime import datetime

store = FeatureStore(repo_path=".")

# オフライン特徴量の取得(学習用)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"])
})

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_statistics:purchase_count_7d",
        "user_statistics:purchase_amount_7d",
        "user_statistics:avg_session_duration",
    ]
).to_df()

# オンラインストアへのマテリアライズ
store.materialize_incremental(end_date=datetime.now())

# オンライン特徴量の取得(リアルタイム推論用)
online_features = store.get_online_features(
    features=[
        "user_statistics:purchase_count_7d",
        "user_statistics:purchase_amount_7d",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ]
).to_dict()

12. 実世界のMLOpsプロジェクト

12.1 完全なパイプラインアーキテクチャ

データソース(DBS3APIデータ収集(Airflow DAGデータ検証(Great Expectations)
特徴量エンジニアリング(Feast)
モデル学習(MLflowトラッキング)
モデル評価(自動検証)
モデルレジストリ(MLflow Registry)
CI/CD(GitHub Actions)
コンテナビルド(Docker)
K8sデプロイ(Kubernetes)
サービング(FastAPI + Triton)
モニタリング(Evidently + Prometheus + Grafana)
アラート(ドリフト検出時に再学習をトリガー)

12.2 実例:顧客チャーン予測システム

# scripts/full_pipeline.py
import mlflow
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ChurnPredictionPipeline:
    """顧客チャーン予測パイプライン"""

    def __init__(self, mlflow_uri: str = "http://localhost:5000"):
        mlflow.set_tracking_uri(mlflow_uri)
        mlflow.set_experiment("churn-prediction")
        self.scaler = StandardScaler()

    def load_data(self, data_path: str) -> pd.DataFrame:
        logger.info(f"Loading data from {data_path}")
        df = pd.read_parquet(data_path)

        assert df.shape[0] > 1000, "Too few samples"
        assert 'churn' in df.columns, "Target column 'churn' missing"

        missing_rate = df.isnull().mean()
        high_missing = missing_rate[missing_rate > 0.5].index.tolist()
        if high_missing:
            logger.warning(f"Columns with >50% missing: {high_missing}")
            df = df.drop(columns=high_missing)

        return df

    def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
        logger.info("Engineering features...")

        numeric_cols = df.select_dtypes(include=[np.number]).columns
        df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())

        if 'tenure_months' in df.columns and 'monthly_charges' in df.columns:
            df['total_value'] = df['tenure_months'] * df['monthly_charges']

        if 'num_support_tickets' in df.columns and 'tenure_months' in df.columns:
            df['tickets_per_month'] = (
                df['num_support_tickets'] / (df['tenure_months'] + 1)
            )

        return df

    def train(self, df: pd.DataFrame, params: dict = None) -> str:
        if params is None:
            params = {
                "n_estimators": 200,
                "max_depth": 5,
                "learning_rate": 0.05,
                "subsample": 0.8,
                "random_state": 42,
            }

        feature_cols = [c for c in df.columns if c != 'churn']
        X = df[feature_cols]
        y = df['churn']

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        with mlflow.start_run(run_name="churn-gbt"):
            mlflow.log_params(params)
            mlflow.log_param("train_size", len(X_train))
            mlflow.log_param("n_features", len(feature_cols))

            model = GradientBoostingClassifier(**params)
            model.fit(X_train_scaled, y_train)

            y_pred = model.predict(X_test_scaled)
            y_proba = model.predict_proba(X_test_scaled)[:, 1]

            accuracy = accuracy_score(y_test, y_pred)
            auc_roc = roc_auc_score(y_test, y_proba)

            mlflow.log_metrics({
                "accuracy": accuracy,
                "auc_roc": auc_roc,
            })

            cv_scores = cross_val_score(
                model, X_train_scaled, y_train, cv=5, scoring='roc_auc'
            )
            mlflow.log_metric("cv_auc_mean", cv_scores.mean())
            mlflow.log_metric("cv_auc_std", cv_scores.std())

            mlflow.sklearn.log_model(
                model,
                "model",
                registered_model_name="ChurnPredictor"
            )

            run_id = mlflow.active_run().info.run_id
            logger.info(f"Training complete. AUC-ROC: {auc_roc:.4f}")
            return run_id

    def promote_best_model(self, min_auc: float = 0.85):
        client = mlflow.tracking.MlflowClient()

        experiment = client.get_experiment_by_name("churn-prediction")
        runs = client.search_runs(
            experiment_ids=[experiment.experiment_id],
            filter_string="metrics.auc_roc > 0.80",
            order_by=["metrics.auc_roc DESC"],
            max_results=1
        )

        if not runs:
            raise ValueError("No runs found meeting criteria")

        best_run = runs[0]
        auc = best_run.data.metrics['auc_roc']

        if auc < min_auc:
            raise ValueError(f"Best AUC {auc:.4f} below minimum {min_auc}")

        model_uri = f"runs:/{best_run.info.run_id}/model"
        mv = mlflow.register_model(model_uri, "ChurnPredictor")

        client.transition_model_version_stage(
            name="ChurnPredictor",
            version=mv.version,
            stage="Production",
            archive_existing_versions=True
        )

        logger.info(f"Model v{mv.version} promoted to Production! AUC: {auc:.4f}")
        return mv.version

if __name__ == "__main__":
    pipeline = ChurnPredictionPipeline()
    df = pipeline.load_data("data/customers.parquet")
    df = pipeline.engineer_features(df)
    run_id = pipeline.train(df)
    pipeline.promote_best_model(min_auc=0.85)

まとめ

MLOpsは単にツールを学ぶことではなく、MLシステムを確実に、再現可能に、スケーラブルに運用するための文化とプロセスです。

以下のコア原則を心に留めておいてください:

  1. すべてをバージョン管理する: コード、データ、モデル、環境
  2. まず自動化する: 手動ステップはエラーの源
  3. 計測してモニタリングする: 計測できないものは改善できない
  4. 障害を素早く検出する: ドリフトやパフォーマンス劣化を即座にキャッチ
  5. 再現性を保証する: 誰でも、どこでも、いつでも同じ結果を再現できるようにする

MLOpsの旅は段階的です。レベル0から始め、組織のニーズに合ったペースで成熟させていきましょう。


参考資料