Skip to content

✍️ 필사 모드: MLOps & AIモデルデプロイ完全ガイド — 学習からサービング、モニタリングまで

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

目次

  1. MLOpsとは
  2. 実験管理
  3. データパイプライン
  4. モデル学習インフラ
  5. モデルサービング
  6. LLMサービング特別編
  7. CI/CD for ML
  8. モニタリング
  9. A/Bテストとデプロイ戦略
  10. コスト最適化
  11. 実践:LLM RAGパイプライン構築

1. MLOpsとは

DevOpsとMLの融合

MLOpsはMachine LearningとOperationsの造語である。 ソフトウェアエンジニアリングのDevOps原則を機械学習ライフサイクルに適用したものだと考えればよい。

従来のソフトウェア開発ではコードだけ管理すればよかった。 しかしMLシステムではコード、データ、モデルの3つを同時に管理しなければならない。 データが変わればモデルが変わり、モデルが変わればサービング方式も変わる可能性がある。

MLライフサイクル

MLプロジェクトの全体フローは以下の通りである。

  1. 問題定義 - ビジネス要件をML問題に変換
  2. データ収集と前処理 - データパイプライン構築
  3. 特徴量エンジニアリング - 生データをモデルが理解できる形に変換
  4. モデル学習 - 様々なアルゴリズムとハイパーパラメータの実験
  5. モデル評価 - オフラインメトリクスで性能検証
  6. モデルデプロイ - プロダクション環境でサービング
  7. モニタリング - 性能追跡とドリフト検出
  8. 再学習 - 性能低下時にモデルをアップデート

このプロセスは一度きりではなく、継続的なサイクルである。

GoogleのMLOps成熟度モデル

GoogleはMLOpsの成熟度を3段階で定義している。

Level 0 - 手動プロセス

  • データサイエンティストがノートブックで手動学習
  • モデルデプロイが手動で不定期
  • モニタリングなし
  • ほとんどのチームがこの段階

Level 1 - MLパイプライン自動化

  • 学習パイプラインが自動化済み
  • 継続的学習(Continuous Training)を実装
  • 実験追跡システム導入
  • モデルレジストリを使用

Level 2 - CI/CDパイプライン自動化

  • パイプライン自体のCI/CDを実装
  • 自動化されたテストと検証
  • Feature Storeの活用
  • 完全なモニタリングとアラート体制

ほとんどの組織はLevel 0からLevel 1に移行するだけでも大きな価値を得られる。


2. 実験管理

MLflow - オープンソースMLプラットフォーム

MLflowはML実験管理の事実上の標準である。 4つのコアコンポーネントで構成されている。

MLflow Tracking - 実験記録

import mlflow

mlflow.set_experiment("recommendation-model-v2")

with mlflow.start_run():
    # ハイパーパラメータの記録
    mlflow.log_param("learning_rate", 0.001)
    mlflow.log_param("batch_size", 64)
    mlflow.log_param("epochs", 50)

    # 学習実行
    model = train_model(lr=0.001, batch_size=64, epochs=50)

    # メトリクスの記録
    mlflow.log_metric("accuracy", 0.923)
    mlflow.log_metric("f1_score", 0.891)
    mlflow.log_metric("auc_roc", 0.956)

    # モデル保存
    mlflow.pytorch.log_model(model, "model")

MLflow Models - モデルパッケージング

# モデルレジストリに登録
mlflow.register_model(
    "runs:/abc123/model",
    "recommendation-model"
)

# ステージ遷移
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="recommendation-model",
    version=3,
    stage="Production"
)

Weights and Biases

W&Bは商用の実験管理プラットフォームで、可視化機能が優れている。

import wandb

wandb.init(project="image-classification", config={
    "learning_rate": 0.001,
    "architecture": "ResNet50",
    "dataset": "imagenet-subset",
    "epochs": 100,
})

for epoch in range(100):
    train_loss, val_loss = train_one_epoch(model)
    wandb.log({
        "train_loss": train_loss,
        "val_loss": val_loss,
        "epoch": epoch
    })

Optuna - ハイパーパラメータ最適化

手動でハイパーパラメータを調整するのは非効率的である。 Optunaはベイズ最適化ベースの自動探索を行う。

import optuna

def objective(trial):
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
    n_layers = trial.suggest_int("n_layers", 1, 5)
    dropout = trial.suggest_float("dropout", 0.1, 0.5)

    model = create_model(n_layers=n_layers, dropout=dropout)
    accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)

    return accuracy

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)

print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")

3. データパイプライン

Feature Store

Feature Storeは、ML特徴量の中央リポジトリである。 学習とサービングで同じ特徴量を使えるようにする。

Feastの例

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

# 特徴量定義
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64

user = Entity(name="user_id", join_keys=["user_id"])

user_features = FeatureView(
    name="user_features",
    entities=[user],
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_order_value", dtype=Float32),
        Field(name="days_since_last_order", dtype=Int64),
    ],
    source=user_source,
)

# 学習データの取得
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:total_purchases",
        "user_features:avg_order_value",
        "user_features:days_since_last_order",
    ],
).to_df()

# リアルタイムサービング用クエリ
online_features = store.get_online_features(
    features=[
        "user_features:total_purchases",
        "user_features:avg_order_value",
    ],
    entity_rows=[{"user_id": 12345}],
).to_dict()

DVC - データバージョニング

DVC(Data Version Control)は、Gitでデータのバージョン管理を可能にする。

# DVC初期化
dvc init

# データファイルの追跡
dvc add data/training_data.csv

# リモートストレージ設定
dvc remote add -d myremote s3://my-bucket/dvc-storage

# データのプッシュ
dvc push

# 特定バージョンのデータを取得
git checkout v1.0
dvc pull

データ検証 - Great Expectations

データ品質の検証はパイプラインの中核である。

import great_expectations as gx

context = gx.get_context()

# 期待値の定義
validator = context.sources.pandas_default.read_csv(
    "data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])

# 検証実行
results = validator.validate()
if not results.success:
    raise ValueError("Data validation failed!")

4. モデル学習インフラ

GPUクラスター構成

大規模モデルの学習にはGPUクラスターが必須である。

Kubernetes + GPUノードプール

apiVersion: v1
kind: Pod
metadata:
  name: training-job
spec:
  containers:
    - name: trainer
      image: training-image:latest
      resources:
        limits:
          nvidia.com/gpu: 4
      volumeMounts:
        - name: dataset
          mountPath: /data
  nodeSelector:
    gpu-type: a100
  tolerations:
    - key: "nvidia.com/gpu"
      operator: "Exists"
      effect: "NoSchedule"

分散学習

単一GPUでは対応できないモデルには分散学習が必要である。

PyTorch DDP(Distributed Data Parallel)

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def train(rank, world_size):
    setup(rank, world_size)

    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)
        for batch in dataloader:
            loss = model(batch)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

    dist.destroy_process_group()

Kubeflow Training Operator

Kubeflowを使えば、Kubernetes上で分散学習を宣言的に管理できる。

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-training
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
            - name: pytorch
              image: my-training:latest
              resources:
                limits:
                  nvidia.com/gpu: 1
    Worker:
      replicas: 3
      template:
        spec:
          containers:
            - name: pytorch
              image: my-training:latest
              resources:
                limits:
                  nvidia.com/gpu: 1

5. モデルサービング

サービングフレームワーク比較

フレームワーク長所短所適するケース
TorchServePyTorchネイティブ、管理が簡単性能に限界小規模PyTorchモデル
Tritonマルチフレームワーク、高性能設定が複雑大規模プロダクション
vLLMLLM特化、PagedAttentionLLM専用LLMサービング
FastAPI柔軟性が最高手動実装が必要カスタムロジック必要時

TorchServe

# モデルアーカイブ作成
torch-model-archiver \
  --model-name resnet50 \
  --version 1.0 \
  --model-file model.py \
  --serialized-file model.pth \
  --handler image_classifier

# サーバー起動
torchserve --start \
  --model-store model_store \
  --models resnet50=resnet50.mar

NVIDIA Triton Inference Server

Tritonは複数のフレームワークのモデルを同時にサービングできる。

# モデルリポジトリ構造
model_repository/
  resnet50/
    config.pbtxt
    1/
      model.onnx
  bert_base/
    config.pbtxt
    1/
      model.plan

config.pbtxt設定

name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
  {
    name: "input"
    data_type: TYPE_FP32
    dims: [3, 224, 224]
  }
]
output [
  {
    name: "output"
    data_type: TYPE_FP32
    dims: [1000]
  }
]
dynamic_batching {
  preferred_batch_size: [8, 16, 32]
  max_queue_delay_microseconds: 100
}
instance_group [
  {
    count: 2
    kind: KIND_GPU
  }
]

FastAPIラッピング

シンプルなモデルサービングにはFastAPIが適している。

from fastapi import FastAPI
import torch
from pydantic import BaseModel

app = FastAPI()

# モデルロード(アプリ起動時に1回)
model = torch.jit.load("model.pt")
model.eval()

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    confidence: float

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    tensor = torch.tensor([request.features])
    with torch.no_grad():
        output = model(tensor)
    prob = torch.softmax(output, dim=1)
    prediction = torch.argmax(prob, dim=1).item()
    confidence = prob[0][prediction].item()

    return PredictionResponse(
        prediction=prediction,
        confidence=confidence
    )

@app.get("/health")
async def health():
    return {"status": "healthy"}

6. LLMサービング特別編

vLLM - 高性能LLMサービング

vLLMはPagedAttention技術を使用してLLMサービング性能を最大化する。

from vllm import LLM, SamplingParams

# モデルロード
llm = LLM(
    model="meta-llama/Llama-3.1-8B-Instruct",
    tensor_parallel_size=2,
    gpu_memory_utilization=0.9,
    max_model_len=8192,
)

# 推論
sampling_params = SamplingParams(
    temperature=0.7,
    top_p=0.9,
    max_tokens=512,
)

outputs = llm.generate(
    ["Explain MLOps in simple terms."],
    sampling_params,
)
print(outputs[0].outputs[0].text)

vLLM OpenAI互換サーバー

python -m vllm.entrypoints.openai.api_server \
  --model meta-llama/Llama-3.1-8B-Instruct \
  --tensor-parallel-size 2 \
  --port 8000
# OpenAI SDKで呼び出し可能
from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")

response = client.chat.completions.create(
    model="meta-llama/Llama-3.1-8B-Instruct",
    messages=[
        {"role": "user", "content": "What is MLOps?"}
    ],
    max_tokens=256,
)

Text Generation Inference (TGI)

HuggingFaceのTGIはRustベースで安定性が高い。

docker run --gpus all \
  -p 8080:80 \
  -v /data:/data \
  ghcr.io/huggingface/text-generation-inference:latest \
  --model-id meta-llama/Llama-3.1-8B-Instruct \
  --quantize gptq \
  --max-input-length 4096 \
  --max-total-tokens 8192

量子化技法の比較

LLMデプロイ時にモデルサイズを削減する量子化は必須である。

技法サイズ削減品質低下速度向上備考
GPTQ4倍低い2-3倍GPU最適化、精度良好
AWQ4倍非常に低い2-3倍Activation-aware、GPTQより高品質
GGUF2-8倍可変CPU可能llama.cpp用、CPU/GPUハイブリッド
BitsAndBytes2-4倍低い1.5倍適用簡単、QLoRA学習可能
# BitsAndBytes 4ビット量子化
from transformers import AutoModelForCausalLM, BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
)

model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-3.1-70B-Instruct",
    quantization_config=quantization_config,
    device_map="auto",
)

Ollama - ローカルLLM

開発環境や小規模デプロイにはOllamaが便利である。

# モデルダウンロードと実行
ollama pull llama3.1
ollama run llama3.1

# APIサーバーモード
ollama serve
import requests

response = requests.post("http://localhost:11434/api/generate", json={
    "model": "llama3.1",
    "prompt": "Explain MLOps briefly.",
    "stream": False,
})
print(response.json()["response"])

7. CI/CD for ML

モデルテスト戦略

MLモデルには、一般的なソフトウェアとは異なるテストが必要である。

import pytest

class TestModelQuality:
    """モデル品質テスト"""

    def test_accuracy_threshold(self, model, test_data):
        """精度が基準値以上かを確認"""
        accuracy = evaluate(model, test_data)
        assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"

    def test_latency(self, model, sample_input):
        """推論レイテンシの確認"""
        import time
        start = time.time()
        model.predict(sample_input)
        latency = time.time() - start
        assert latency < 0.1, f"Latency {latency}s exceeds 100ms"

    def test_no_bias(self, model, fairness_data):
        """公平性検証"""
        results_group_a = model.predict(fairness_data["group_a"])
        results_group_b = model.predict(fairness_data["group_b"])
        disparity = abs(results_group_a.mean() - results_group_b.mean())
        assert disparity < 0.05, f"Bias detected: disparity={disparity}"

    def test_model_size(self, model_path):
        """モデルサイズ制限"""
        import os
        size_mb = os.path.getsize(model_path) / (1024 * 1024)
        assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"

GitHub ActionsでMLパイプライン

name: ML Pipeline

on:
  push:
    paths:
      - "src/**"
      - "data/**"
      - "configs/**"

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate data
        run: python scripts/validate_data.py

  train:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train model
        run: python scripts/train.py --config configs/production.yaml
      - name: Upload model artifact
        uses: actions/upload-artifact@v4
        with:
          name: model
          path: outputs/model/

  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Download model
        uses: actions/download-artifact@v4
        with:
          name: model
      - name: Run evaluation
        run: python scripts/evaluate.py
      - name: Check quality gates
        run: python scripts/quality_gate.py

  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: ./scripts/deploy.sh staging
      - name: Smoke test
        run: python scripts/smoke_test.py
      - name: Deploy to production
        run: ./scripts/deploy.sh production

データ検証パイプライン

def validate_training_data(data_path: str) -> bool:
    """学習データの品質を検証するパイプライン"""

    df = pd.read_parquet(data_path)

    checks = [
        # データサイズ確認
        len(df) >= 10000,
        # 必須カラムの存在
        all(col in df.columns for col in REQUIRED_COLUMNS),
        # null率の確認
        df.isnull().mean().max() < 0.05,
        # ラベル分布の確認
        df["label"].value_counts(normalize=True).min() > 0.1,
        # 重複チェック
        df.duplicated().mean() < 0.01,
    ]

    if not all(checks):
        failed = [i for i, c in enumerate(checks) if not c]
        raise ValueError(f"Data validation failed at checks: {failed}")

    return True

8. モニタリング

モデル性能モニタリング

デプロイ後のモデル性能は時間経過とともに低下する可能性がある。

from prometheus_client import Gauge, Histogram, Counter

# メトリクス定義
prediction_latency = Histogram(
    "model_prediction_latency_seconds",
    "Prediction latency in seconds",
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)

prediction_count = Counter(
    "model_prediction_total",
    "Total number of predictions",
    ["model_version", "status"]
)

model_accuracy = Gauge(
    "model_accuracy",
    "Current model accuracy",
    ["model_name"]
)

# 使用
import time

def predict_with_monitoring(model, input_data):
    start = time.time()
    try:
        result = model.predict(input_data)
        prediction_count.labels(
            model_version="v2.1",
            status="success"
        ).inc()
        return result
    except Exception as e:
        prediction_count.labels(
            model_version="v2.1",
            status="error"
        ).inc()
        raise
    finally:
        latency = time.time() - start
        prediction_latency.observe(latency)

データドリフト検出

入力データの分布が変化すると、モデル性能が低下する。

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# ドリフトレポート生成
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

column_mapping = ColumnMapping(
    target="label",
    prediction="prediction",
    numerical_features=["feature_1", "feature_2", "feature_3"],
    categorical_features=["category", "region"],
)

report.run(
    reference_data=training_data,
    current_data=production_data,
    column_mapping=column_mapping,
)

# ドリフト検出結果の確認
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
    trigger_retraining_pipeline()

コンセプトドリフト

データ分布だけでなく、入力と出力の関係自体が変わることもある。 例えば、コロナ前後では消費パターンが完全に変わった。

class ConceptDriftDetector:
    """PSI(Population Stability Index)ベースのドリフト検出"""

    def __init__(self, threshold=0.2):
        self.threshold = threshold

    def calculate_psi(self, expected, actual, bins=10):
        """PSI計算"""
        breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
        expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
        actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)

        # ゼロ除算防止
        expected_counts = np.clip(expected_counts, 1e-6, None)
        actual_counts = np.clip(actual_counts, 1e-6, None)

        psi = np.sum(
            (actual_counts - expected_counts) *
            np.log(actual_counts / expected_counts)
        )
        return psi

    def check_drift(self, reference_predictions, current_predictions):
        psi = self.calculate_psi(reference_predictions, current_predictions)

        if psi > self.threshold:
            return {"drift_detected": True, "psi": psi, "action": "retrain"}
        elif psi > self.threshold / 2:
            return {"drift_detected": False, "psi": psi, "action": "monitor"}
        else:
            return {"drift_detected": False, "psi": psi, "action": "none"}

Grafanaダッシュボード構成

モニタリングメトリクスの可視化にはGrafanaが適している。 Prometheusで収集したメトリクスをダッシュボードとして構成できる。

主要モニタリングパネル構成:

  • リクエストスループット:毎秒の予測リクエスト数
  • レイテンシ分布:p50、p95、p99レイテンシ
  • エラー率:予測失敗率
  • モデル精度:リアルタイム性能メトリクス
  • データドリフトスコア:PSI、KLダイバージェンス
  • リソース使用量:GPUメモリ、CPU、ネットワーク

9. A/Bテストとデプロイ戦略

カナリアデプロイ

新しいモデルを少量のトラフィックにのみ先に適用する戦略である。

# Istio VirtualServiceでトラフィック分割
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: model-serving
spec:
  hosts:
    - model-service
  http:
    - match:
        - headers:
            canary:
              exact: "true"
      route:
        - destination:
            host: model-service
            subset: v2
    - route:
        - destination:
            host: model-service
            subset: v1
          weight: 90
        - destination:
            host: model-service
            subset: v2
          weight: 10

シャドーモード

新しいモデルが実際のトラフィックで推論を行うが、結果は保存のみでユーザーには返さない。 既存モデルの結果と比較して安全に検証できる。

class ShadowModelRouter:
    def __init__(self, primary_model, shadow_model):
        self.primary = primary_model
        self.shadow = shadow_model

    async def predict(self, input_data):
        # プライマリモデルの結果(ユーザーに返す)
        primary_result = await self.primary.predict(input_data)

        # シャドーモデルの結果(比較用に保存)
        try:
            shadow_result = await self.shadow.predict(input_data)
            await self.log_comparison(
                input_data, primary_result, shadow_result
            )
        except Exception:
            pass  # シャドーモデルのエラーは無視

        return primary_result

    async def log_comparison(self, input_data, primary, shadow):
        comparison = {
            "timestamp": datetime.utcnow().isoformat(),
            "input_hash": hash(str(input_data)),
            "primary_prediction": primary,
            "shadow_prediction": shadow,
            "agreement": primary == shadow,
        }
        await self.metrics_store.insert(comparison)

マルチアームドバンディット

A/Bテストより効率的な動的トラフィック配分方法である。 性能の良いモデルに自動的により多くのトラフィックを配分する。

import numpy as np

class ThompsonSamplingRouter:
    """トンプソンサンプリングベースのモデルルーター"""

    def __init__(self, model_names):
        self.models = model_names
        # ベータ分布のパラメータ(alpha, beta)
        self.successes = {name: 1 for name in model_names}
        self.failures = {name: 1 for name in model_names}

    def select_model(self):
        """トンプソンサンプリングでモデルを選択"""
        samples = {}
        for name in self.models:
            samples[name] = np.random.beta(
                self.successes[name],
                self.failures[name]
            )
        return max(samples, key=samples.get)

    def update(self, model_name, success: bool):
        """結果に基づいて分布を更新"""
        if success:
            self.successes[model_name] += 1
        else:
            self.failures[model_name] += 1

    def get_allocation(self):
        """現在のトラフィック比率を確認"""
        total = sum(self.successes[m] + self.failures[m] for m in self.models)
        return {
            m: (self.successes[m] + self.failures[m]) / total
            for m in self.models
        }

10. コスト最適化

スポットインスタンスの活用

学習ジョブは中断可能であるため、スポットインスタンスで70%以上のコスト削減が可能である。

# AWS SageMakerスポット学習
import sagemaker

estimator = sagemaker.estimator.Estimator(
    image_uri="my-training-image:latest",
    role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
    instance_count=4,
    instance_type="ml.p4d.24xlarge",
    use_spot_instances=True,
    max_wait=7200,  # 最大待機時間
    max_run=3600,   # 最大実行時間
    checkpoint_s3_uri="s3://bucket/checkpoints/",
)

estimator.fit({"training": "s3://bucket/data/"})

鍵はチェックポイントの保存である。スポットインスタンスが中断されてもチェックポイントから再開できなければならない。

モデル軽量化

プロダクションサービングコストを削減する方法を紹介する。

知識蒸留(Knowledge Distillation)

class DistillationTrainer:
    def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
        self.teacher = teacher
        self.student = student
        self.temperature = temperature
        self.alpha = alpha

    def distillation_loss(self, student_logits, teacher_logits, labels):
        # ソフトターゲットロス(KDロス)
        soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
        soft_loss = F.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            soft_targets,
            reduction="batchmean"
        ) * (self.temperature ** 2)

        # ハードターゲットロス
        hard_loss = F.cross_entropy(student_logits, labels)

        return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

ONNX変換で推論最適化

import torch
import onnx

# PyTorch -> ONNX変換
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
    opset_version=17,
)

# ONNX Runtimeで推論
import onnxruntime as ort

session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})

バッチ推論 vs リアルタイム推論

区分バッチ推論リアルタイム推論
レイテンシ分〜時間ミリ秒〜秒
スループット非常に高い中程度
コスト低い(スポットインスタンス)高い(常時稼働)
適するケースレコメンドシステム、レポートチャットボット、検索
# バッチ推論の例(Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

# UDFでモデル推論を登録
@udf("float")
def predict_udf(features):
    return float(model.predict([features])[0])

# 大規模データに対してバッチ推論
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")

11. 実践:LLM RAGパイプライン構築

RAG(Retrieval-Augmented Generation)はLLMの限界を克服する重要なパターンである。 外部知識ベースを検索してLLMにコンテキストとして提供する。

全体アーキテクチャ

  1. ドキュメント収集 - 様々なソースからデータ収集
  2. チャンキング - ドキュメントを適切なサイズに分割
  3. エンベディング - テキストをベクトルに変換
  4. ベクトルDB格納 - ベクトルインデックスに格納
  5. 検索 - クエリに対して類似ドキュメントを検索
  6. 生成 - 検索結果をコンテキストとしてLLMに渡す

ドキュメント収集とチャンキング

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
    PyPDFLoader,
    WebBaseLoader,
    NotionDirectoryLoader,
)

# 様々なソースからドキュメントをロード
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()

all_docs = pdf_docs + web_docs + notion_docs

# チャンキング
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "],
)

chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")

エンベディングとベクトルDB

from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

# エンベディングモデル
embedding_model = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-en-v1.5",
    model_kwargs={"device": "cuda"},
    encode_kwargs={"normalize_embeddings": True},
)

# ChromaベクトルDBに格納
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embedding_model,
    persist_directory="./chroma_db",
    collection_name="knowledge_base",
)

# 類似度検索
results = vectorstore.similarity_search(
    "How to deploy a model to production?",
    k=5,
)

RAGパイプラインの組み立て

from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate

# LLM設定
llm = VLLM(
    model="meta-llama/Llama-3.1-8B-Instruct",
    tensor_parallel_size=1,
    max_new_tokens=512,
    temperature=0.1,
)

# プロンプトテンプレート
prompt_template = PromptTemplate(
    template="""以下のコンテキストを参考にして質問に答えてください。
コンテキストにない情報は「わかりません」と答えてください。

コンテキスト:
{context}

質問:{question}

回答:""",
    input_variables=["context", "question"],
)

# RAGチェーン構成
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(
        search_type="mmr",
        search_kwargs={"k": 5, "fetch_k": 20},
    ),
    chain_type_kwargs={"prompt": prompt_template},
    return_source_documents=True,
)

# クエリ
result = rag_chain.invoke("プロダクションデプロイの手順はどうなっていますか?")
print(result["result"])
for doc in result["source_documents"]:
    print(f"  Source: {doc.metadata['source']}")

プロダクションRAGサービング

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class QueryRequest(BaseModel):
    question: str
    top_k: int = 5

class QueryResponse(BaseModel):
    answer: str
    sources: list[str]
    latency_ms: float

@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    import time
    start = time.time()

    result = rag_chain.invoke(request.question)

    sources = list(set(
        doc.metadata.get("source", "unknown")
        for doc in result["source_documents"]
    ))

    latency = (time.time() - start) * 1000

    return QueryResponse(
        answer=result["result"],
        sources=sources,
        latency_ms=round(latency, 2),
    )

おわりに

MLOpsは、MLモデルをプロダクションで安定的に運用するための必須分野である。

重要なポイントを整理すると以下の通りである。

  1. 実験管理から始めよう - MLflowを導入するだけでも大きな変化が生まれる。
  2. 自動化に投資しよう - 手動プロセスは必ず失敗する。
  3. モニタリングは必須 - デプロイ後が本番のスタートである。
  4. 小さく始めよう - Google Level 0から1への移行が最も重要である。
  5. LLM時代に合わせて進化しよう - vLLM、RAGなどの新しいツールを積極的に活用しよう。

モデルの学習は全体の20%に過ぎない。 残りの80%はデータパイプライン、サービング、モニタリング、運用である。 MLOpsはまさにその80%を体系的に管理する方法論である。

MLOpsセルフチェックリスト

実験管理

  • すべての実験のハイパーパラメータとメトリクスを追跡しているか?
  • 実験結果を再現できるか?

データパイプライン

  • データのバージョン管理を行っているか?
  • データ品質検証が自動化されているか?

学習インフラ

  • 学習パイプラインが自動化されているか?
  • チェックポイントを保存しているか?

サービング

  • モデルサービングのレイテンシをモニタリングしているか?
  • ロールバック可能なデプロイ戦略を使っているか?

モニタリング

  • データドリフトを検出しているか?
  • モデル性能低下時にアラートが来るか?

コスト

  • スポットインスタンスを活用しているか?
  • モデル軽量化を検討したか?

현재 단락 (1/810)

1. [MLOpsとは](#1-mlopsとは)

작성 글자: 0원문 글자: 22,194작성 단락: 0/810