目次
- MLOpsとは
- 実験管理
- データパイプライン
- モデル学習インフラ
- モデルサービング
- LLMサービング特別編
- CI/CD for ML
- モニタリング
- A/Bテストとデプロイ戦略
- コスト最適化
- 実践:LLM RAGパイプライン構築
1. MLOpsとは
DevOpsとMLの融合
MLOpsはMachine LearningとOperationsの造語である。 ソフトウェアエンジニアリングのDevOps原則を機械学習ライフサイクルに適用したものだと考えればよい。
従来のソフトウェア開発ではコードだけ管理すればよかった。 しかしMLシステムではコード、データ、モデルの3つを同時に管理しなければならない。 データが変わればモデルが変わり、モデルが変わればサービング方式も変わる可能性がある。
MLライフサイクル
MLプロジェクトの全体フローは以下の通りである。
- 問題定義 - ビジネス要件をML問題に変換
- データ収集と前処理 - データパイプライン構築
- 特徴量エンジニアリング - 生データをモデルが理解できる形に変換
- モデル学習 - 様々なアルゴリズムとハイパーパラメータの実験
- モデル評価 - オフラインメトリクスで性能検証
- モデルデプロイ - プロダクション環境でサービング
- モニタリング - 性能追跡とドリフト検出
- 再学習 - 性能低下時にモデルをアップデート
このプロセスは一度きりではなく、継続的なサイクルである。
GoogleのMLOps成熟度モデル
GoogleはMLOpsの成熟度を3段階で定義している。
Level 0 - 手動プロセス
- データサイエンティストがノートブックで手動学習
- モデルデプロイが手動で不定期
- モニタリングなし
- ほとんどのチームがこの段階
Level 1 - MLパイプライン自動化
- 学習パイプラインが自動化済み
- 継続的学習(Continuous Training)を実装
- 実験追跡システム導入
- モデルレジストリを使用
Level 2 - CI/CDパイプライン自動化
- パイプライン自体のCI/CDを実装
- 自動化されたテストと検証
- Feature Storeの活用
- 完全なモニタリングとアラート体制
ほとんどの組織はLevel 0からLevel 1に移行するだけでも大きな価値を得られる。
2. 実験管理
MLflow - オープンソースMLプラットフォーム
MLflowはML実験管理の事実上の標準である。 4つのコアコンポーネントで構成されている。
MLflow Tracking - 実験記録
import mlflow
mlflow.set_experiment("recommendation-model-v2")
with mlflow.start_run():
# ハイパーパラメータの記録
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 64)
mlflow.log_param("epochs", 50)
# 学習実行
model = train_model(lr=0.001, batch_size=64, epochs=50)
# メトリクスの記録
mlflow.log_metric("accuracy", 0.923)
mlflow.log_metric("f1_score", 0.891)
mlflow.log_metric("auc_roc", 0.956)
# モデル保存
mlflow.pytorch.log_model(model, "model")
MLflow Models - モデルパッケージング
# モデルレジストリに登録
mlflow.register_model(
"runs:/abc123/model",
"recommendation-model"
)
# ステージ遷移
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="recommendation-model",
version=3,
stage="Production"
)
Weights and Biases
W&Bは商用の実験管理プラットフォームで、可視化機能が優れている。
import wandb
wandb.init(project="image-classification", config={
"learning_rate": 0.001,
"architecture": "ResNet50",
"dataset": "imagenet-subset",
"epochs": 100,
})
for epoch in range(100):
train_loss, val_loss = train_one_epoch(model)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"epoch": epoch
})
Optuna - ハイパーパラメータ最適化
手動でハイパーパラメータを調整するのは非効率的である。 Optunaはベイズ最適化ベースの自動探索を行う。
import optuna
def objective(trial):
lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
n_layers = trial.suggest_int("n_layers", 1, 5)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
model = create_model(n_layers=n_layers, dropout=dropout)
accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)
return accuracy
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")
3. データパイプライン
Feature Store
Feature Storeは、ML特徴量の中央リポジトリである。 学習とサービングで同じ特徴量を使えるようにする。
Feastの例
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# 特徴量定義
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
user = Entity(name="user_id", join_keys=["user_id"])
user_features = FeatureView(
name="user_features",
entities=[user],
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="days_since_last_order", dtype=Int64),
],
source=user_source,
)
# 学習データの取得
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
"user_features:days_since_last_order",
],
).to_df()
# リアルタイムサービング用クエリ
online_features = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
],
entity_rows=[{"user_id": 12345}],
).to_dict()
DVC - データバージョニング
DVC(Data Version Control)は、Gitでデータのバージョン管理を可能にする。
# DVC初期化
dvc init
# データファイルの追跡
dvc add data/training_data.csv
# リモートストレージ設定
dvc remote add -d myremote s3://my-bucket/dvc-storage
# データのプッシュ
dvc push
# 特定バージョンのデータを取得
git checkout v1.0
dvc pull
データ検証 - Great Expectations
データ品質の検証はパイプラインの中核である。
import great_expectations as gx
context = gx.get_context()
# 期待値の定義
validator = context.sources.pandas_default.read_csv(
"data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])
# 検証実行
results = validator.validate()
if not results.success:
raise ValueError("Data validation failed!")
4. モデル学習インフラ
GPUクラスター構成
大規模モデルの学習にはGPUクラスターが必須である。
Kubernetes + GPUノードプール
apiVersion: v1
kind: Pod
metadata:
name: training-job
spec:
containers:
- name: trainer
image: training-image:latest
resources:
limits:
nvidia.com/gpu: 4
volumeMounts:
- name: dataset
mountPath: /data
nodeSelector:
gpu-type: a100
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
分散学習
単一GPUでは対応できないモデルには分散学習が必要である。
PyTorch DDP(Distributed Data Parallel)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def train(rank, world_size):
setup(rank, world_size)
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
Kubeflow Training Operator
Kubeflowを使えば、Kubernetes上で分散学習を宣言的に管理できる。
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
5. モデルサービング
サービングフレームワーク比較
| フレームワーク | 長所 | 短所 | 適するケース |
|---|---|---|---|
| TorchServe | PyTorchネイティブ、管理が簡単 | 性能に限界 | 小規模PyTorchモデル |
| Triton | マルチフレームワーク、高性能 | 設定が複雑 | 大規模プロダクション |
| vLLM | LLM特化、PagedAttention | LLM専用 | LLMサービング |
| FastAPI | 柔軟性が最高 | 手動実装が必要 | カスタムロジック必要時 |
TorchServe
# モデルアーカイブ作成
torch-model-archiver \
--model-name resnet50 \
--version 1.0 \
--model-file model.py \
--serialized-file model.pth \
--handler image_classifier
# サーバー起動
torchserve --start \
--model-store model_store \
--models resnet50=resnet50.mar
NVIDIA Triton Inference Server
Tritonは複数のフレームワークのモデルを同時にサービングできる。
# モデルリポジトリ構造
model_repository/
resnet50/
config.pbtxt
1/
model.onnx
bert_base/
config.pbtxt
1/
model.plan
config.pbtxt設定
name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
{
name: "input"
data_type: TYPE_FP32
dims: [3, 224, 224]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [1000]
}
]
dynamic_batching {
preferred_batch_size: [8, 16, 32]
max_queue_delay_microseconds: 100
}
instance_group [
{
count: 2
kind: KIND_GPU
}
]
FastAPIラッピング
シンプルなモデルサービングにはFastAPIが適している。
from fastapi import FastAPI
import torch
from pydantic import BaseModel
app = FastAPI()
# モデルロード(アプリ起動時に1回)
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
confidence: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
tensor = torch.tensor([request.features])
with torch.no_grad():
output = model(tensor)
prob = torch.softmax(output, dim=1)
prediction = torch.argmax(prob, dim=1).item()
confidence = prob[0][prediction].item()
return PredictionResponse(
prediction=prediction,
confidence=confidence
)
@app.get("/health")
async def health():
return {"status": "healthy"}
6. LLMサービング特別編
vLLM - 高性能LLMサービング
vLLMはPagedAttention技術を使用してLLMサービング性能を最大化する。
from vllm import LLM, SamplingParams
# モデルロード
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=2,
gpu_memory_utilization=0.9,
max_model_len=8192,
)
# 推論
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=512,
)
outputs = llm.generate(
["Explain MLOps in simple terms."],
sampling_params,
)
print(outputs[0].outputs[0].text)
vLLM OpenAI互換サーバー
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-8B-Instruct \
--tensor-parallel-size 2 \
--port 8000
# OpenAI SDKで呼び出し可能
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[
{"role": "user", "content": "What is MLOps?"}
],
max_tokens=256,
)
Text Generation Inference (TGI)
HuggingFaceのTGIはRustベースで安定性が高い。
docker run --gpus all \
-p 8080:80 \
-v /data:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-3.1-8B-Instruct \
--quantize gptq \
--max-input-length 4096 \
--max-total-tokens 8192
量子化技法の比較
LLMデプロイ時にモデルサイズを削減する量子化は必須である。
| 技法 | サイズ削減 | 品質低下 | 速度向上 | 備考 |
|---|---|---|---|---|
| GPTQ | 4倍 | 低い | 2-3倍 | GPU最適化、精度良好 |
| AWQ | 4倍 | 非常に低い | 2-3倍 | Activation-aware、GPTQより高品質 |
| GGUF | 2-8倍 | 可変 | CPU可能 | llama.cpp用、CPU/GPUハイブリッド |
| BitsAndBytes | 2-4倍 | 低い | 1.5倍 | 適用簡単、QLoRA学習可能 |
# BitsAndBytes 4ビット量子化
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.1-70B-Instruct",
quantization_config=quantization_config,
device_map="auto",
)
Ollama - ローカルLLM
開発環境や小規模デプロイにはOllamaが便利である。
# モデルダウンロードと実行
ollama pull llama3.1
ollama run llama3.1
# APIサーバーモード
ollama serve
import requests
response = requests.post("http://localhost:11434/api/generate", json={
"model": "llama3.1",
"prompt": "Explain MLOps briefly.",
"stream": False,
})
print(response.json()["response"])
7. CI/CD for ML
モデルテスト戦略
MLモデルには、一般的なソフトウェアとは異なるテストが必要である。
import pytest
class TestModelQuality:
"""モデル品質テスト"""
def test_accuracy_threshold(self, model, test_data):
"""精度が基準値以上かを確認"""
accuracy = evaluate(model, test_data)
assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"
def test_latency(self, model, sample_input):
"""推論レイテンシの確認"""
import time
start = time.time()
model.predict(sample_input)
latency = time.time() - start
assert latency < 0.1, f"Latency {latency}s exceeds 100ms"
def test_no_bias(self, model, fairness_data):
"""公平性検証"""
results_group_a = model.predict(fairness_data["group_a"])
results_group_b = model.predict(fairness_data["group_b"])
disparity = abs(results_group_a.mean() - results_group_b.mean())
assert disparity < 0.05, f"Bias detected: disparity={disparity}"
def test_model_size(self, model_path):
"""モデルサイズ制限"""
import os
size_mb = os.path.getsize(model_path) / (1024 * 1024)
assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"
GitHub ActionsでMLパイプライン
name: ML Pipeline
on:
push:
paths:
- "src/**"
- "data/**"
- "configs/**"
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data
run: python scripts/validate_data.py
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train model
run: python scripts/train.py --config configs/production.yaml
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: outputs/model/
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download model
uses: actions/download-artifact@v4
with:
name: model
- name: Run evaluation
run: python scripts/evaluate.py
- name: Check quality gates
run: python scripts/quality_gate.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: ./scripts/deploy.sh staging
- name: Smoke test
run: python scripts/smoke_test.py
- name: Deploy to production
run: ./scripts/deploy.sh production
データ検証パイプライン
def validate_training_data(data_path: str) -> bool:
"""学習データの品質を検証するパイプライン"""
df = pd.read_parquet(data_path)
checks = [
# データサイズ確認
len(df) >= 10000,
# 必須カラムの存在
all(col in df.columns for col in REQUIRED_COLUMNS),
# null率の確認
df.isnull().mean().max() < 0.05,
# ラベル分布の確認
df["label"].value_counts(normalize=True).min() > 0.1,
# 重複チェック
df.duplicated().mean() < 0.01,
]
if not all(checks):
failed = [i for i, c in enumerate(checks) if not c]
raise ValueError(f"Data validation failed at checks: {failed}")
return True
8. モニタリング
モデル性能モニタリング
デプロイ後のモデル性能は時間経過とともに低下する可能性がある。
from prometheus_client import Gauge, Histogram, Counter
# メトリクス定義
prediction_latency = Histogram(
"model_prediction_latency_seconds",
"Prediction latency in seconds",
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
prediction_count = Counter(
"model_prediction_total",
"Total number of predictions",
["model_version", "status"]
)
model_accuracy = Gauge(
"model_accuracy",
"Current model accuracy",
["model_name"]
)
# 使用
import time
def predict_with_monitoring(model, input_data):
start = time.time()
try:
result = model.predict(input_data)
prediction_count.labels(
model_version="v2.1",
status="success"
).inc()
return result
except Exception as e:
prediction_count.labels(
model_version="v2.1",
status="error"
).inc()
raise
finally:
latency = time.time() - start
prediction_latency.observe(latency)
データドリフト検出
入力データの分布が変化すると、モデル性能が低下する。
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# ドリフトレポート生成
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3"],
categorical_features=["category", "region"],
)
report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping,
)
# ドリフト検出結果の確認
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
trigger_retraining_pipeline()
コンセプトドリフト
データ分布だけでなく、入力と出力の関係自体が変わることもある。 例えば、コロナ前後では消費パターンが完全に変わった。
class ConceptDriftDetector:
"""PSI(Population Stability Index)ベースのドリフト検出"""
def __init__(self, threshold=0.2):
self.threshold = threshold
def calculate_psi(self, expected, actual, bins=10):
"""PSI計算"""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)
# ゼロ除算防止
expected_counts = np.clip(expected_counts, 1e-6, None)
actual_counts = np.clip(actual_counts, 1e-6, None)
psi = np.sum(
(actual_counts - expected_counts) *
np.log(actual_counts / expected_counts)
)
return psi
def check_drift(self, reference_predictions, current_predictions):
psi = self.calculate_psi(reference_predictions, current_predictions)
if psi > self.threshold:
return {"drift_detected": True, "psi": psi, "action": "retrain"}
elif psi > self.threshold / 2:
return {"drift_detected": False, "psi": psi, "action": "monitor"}
else:
return {"drift_detected": False, "psi": psi, "action": "none"}
Grafanaダッシュボード構成
モニタリングメトリクスの可視化にはGrafanaが適している。 Prometheusで収集したメトリクスをダッシュボードとして構成できる。
主要モニタリングパネル構成:
- リクエストスループット:毎秒の予測リクエスト数
- レイテンシ分布:p50、p95、p99レイテンシ
- エラー率:予測失敗率
- モデル精度:リアルタイム性能メトリクス
- データドリフトスコア:PSI、KLダイバージェンス
- リソース使用量:GPUメモリ、CPU、ネットワーク
9. A/Bテストとデプロイ戦略
カナリアデプロイ
新しいモデルを少量のトラフィックにのみ先に適用する戦略である。
# Istio VirtualServiceでトラフィック分割
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-serving
spec:
hosts:
- model-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: model-service
subset: v2
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
シャドーモード
新しいモデルが実際のトラフィックで推論を行うが、結果は保存のみでユーザーには返さない。 既存モデルの結果と比較して安全に検証できる。
class ShadowModelRouter:
def __init__(self, primary_model, shadow_model):
self.primary = primary_model
self.shadow = shadow_model
async def predict(self, input_data):
# プライマリモデルの結果(ユーザーに返す)
primary_result = await self.primary.predict(input_data)
# シャドーモデルの結果(比較用に保存)
try:
shadow_result = await self.shadow.predict(input_data)
await self.log_comparison(
input_data, primary_result, shadow_result
)
except Exception:
pass # シャドーモデルのエラーは無視
return primary_result
async def log_comparison(self, input_data, primary, shadow):
comparison = {
"timestamp": datetime.utcnow().isoformat(),
"input_hash": hash(str(input_data)),
"primary_prediction": primary,
"shadow_prediction": shadow,
"agreement": primary == shadow,
}
await self.metrics_store.insert(comparison)
マルチアームドバンディット
A/Bテストより効率的な動的トラフィック配分方法である。 性能の良いモデルに自動的により多くのトラフィックを配分する。
import numpy as np
class ThompsonSamplingRouter:
"""トンプソンサンプリングベースのモデルルーター"""
def __init__(self, model_names):
self.models = model_names
# ベータ分布のパラメータ(alpha, beta)
self.successes = {name: 1 for name in model_names}
self.failures = {name: 1 for name in model_names}
def select_model(self):
"""トンプソンサンプリングでモデルを選択"""
samples = {}
for name in self.models:
samples[name] = np.random.beta(
self.successes[name],
self.failures[name]
)
return max(samples, key=samples.get)
def update(self, model_name, success: bool):
"""結果に基づいて分布を更新"""
if success:
self.successes[model_name] += 1
else:
self.failures[model_name] += 1
def get_allocation(self):
"""現在のトラフィック比率を確認"""
total = sum(self.successes[m] + self.failures[m] for m in self.models)
return {
m: (self.successes[m] + self.failures[m]) / total
for m in self.models
}
10. コスト最適化
スポットインスタンスの活用
学習ジョブは中断可能であるため、スポットインスタンスで70%以上のコスト削減が可能である。
# AWS SageMakerスポット学習
import sagemaker
estimator = sagemaker.estimator.Estimator(
image_uri="my-training-image:latest",
role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
instance_count=4,
instance_type="ml.p4d.24xlarge",
use_spot_instances=True,
max_wait=7200, # 最大待機時間
max_run=3600, # 最大実行時間
checkpoint_s3_uri="s3://bucket/checkpoints/",
)
estimator.fit({"training": "s3://bucket/data/"})
鍵はチェックポイントの保存である。スポットインスタンスが中断されてもチェックポイントから再開できなければならない。
モデル軽量化
プロダクションサービングコストを削減する方法を紹介する。
知識蒸留(Knowledge Distillation)
class DistillationTrainer:
def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha
def distillation_loss(self, student_logits, teacher_logits, labels):
# ソフトターゲットロス(KDロス)
soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
soft_targets,
reduction="batchmean"
) * (self.temperature ** 2)
# ハードターゲットロス
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
ONNX変換で推論最適化
import torch
import onnx
# PyTorch -> ONNX変換
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=17,
)
# ONNX Runtimeで推論
import onnxruntime as ort
session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})
バッチ推論 vs リアルタイム推論
| 区分 | バッチ推論 | リアルタイム推論 |
|---|---|---|
| レイテンシ | 分〜時間 | ミリ秒〜秒 |
| スループット | 非常に高い | 中程度 |
| コスト | 低い(スポットインスタンス) | 高い(常時稼働) |
| 適するケース | レコメンドシステム、レポート | チャットボット、検索 |
# バッチ推論の例(Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
# UDFでモデル推論を登録
@udf("float")
def predict_udf(features):
return float(model.predict([features])[0])
# 大規模データに対してバッチ推論
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")
11. 実践:LLM RAGパイプライン構築
RAG(Retrieval-Augmented Generation)はLLMの限界を克服する重要なパターンである。 外部知識ベースを検索してLLMにコンテキストとして提供する。
全体アーキテクチャ
- ドキュメント収集 - 様々なソースからデータ収集
- チャンキング - ドキュメントを適切なサイズに分割
- エンベディング - テキストをベクトルに変換
- ベクトルDB格納 - ベクトルインデックスに格納
- 検索 - クエリに対して類似ドキュメントを検索
- 生成 - 検索結果をコンテキストとしてLLMに渡す
ドキュメント収集とチャンキング
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
PyPDFLoader,
WebBaseLoader,
NotionDirectoryLoader,
)
# 様々なソースからドキュメントをロード
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()
all_docs = pdf_docs + web_docs + notion_docs
# チャンキング
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "],
)
chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")
エンベディングとベクトルDB
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
# エンベディングモデル
embedding_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={"device": "cuda"},
encode_kwargs={"normalize_embeddings": True},
)
# ChromaベクトルDBに格納
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embedding_model,
persist_directory="./chroma_db",
collection_name="knowledge_base",
)
# 類似度検索
results = vectorstore.similarity_search(
"How to deploy a model to production?",
k=5,
)
RAGパイプラインの組み立て
from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate
# LLM設定
llm = VLLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_new_tokens=512,
temperature=0.1,
)
# プロンプトテンプレート
prompt_template = PromptTemplate(
template="""以下のコンテキストを参考にして質問に答えてください。
コンテキストにない情報は「わかりません」と答えてください。
コンテキスト:
{context}
質問:{question}
回答:""",
input_variables=["context", "question"],
)
# RAGチェーン構成
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
),
chain_type_kwargs={"prompt": prompt_template},
return_source_documents=True,
)
# クエリ
result = rag_chain.invoke("プロダクションデプロイの手順はどうなっていますか?")
print(result["result"])
for doc in result["source_documents"]:
print(f" Source: {doc.metadata['source']}")
プロダクションRAGサービング
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
question: str
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[str]
latency_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
import time
start = time.time()
result = rag_chain.invoke(request.question)
sources = list(set(
doc.metadata.get("source", "unknown")
for doc in result["source_documents"]
))
latency = (time.time() - start) * 1000
return QueryResponse(
answer=result["result"],
sources=sources,
latency_ms=round(latency, 2),
)
おわりに
MLOpsは、MLモデルをプロダクションで安定的に運用するための必須分野である。
重要なポイントを整理すると以下の通りである。
- 実験管理から始めよう - MLflowを導入するだけでも大きな変化が生まれる。
- 自動化に投資しよう - 手動プロセスは必ず失敗する。
- モニタリングは必須 - デプロイ後が本番のスタートである。
- 小さく始めよう - Google Level 0から1への移行が最も重要である。
- LLM時代に合わせて進化しよう - vLLM、RAGなどの新しいツールを積極的に活用しよう。
モデルの学習は全体の20%に過ぎない。 残りの80%はデータパイプライン、サービング、モニタリング、運用である。 MLOpsはまさにその80%を体系的に管理する方法論である。
MLOpsセルフチェックリスト
実験管理
- すべての実験のハイパーパラメータとメトリクスを追跡しているか?
- 実験結果を再現できるか?
データパイプライン
- データのバージョン管理を行っているか?
- データ品質検証が自動化されているか?
学習インフラ
- 学習パイプラインが自動化されているか?
- チェックポイントを保存しているか?
サービング
- モデルサービングのレイテンシをモニタリングしているか?
- ロールバック可能なデプロイ戦略を使っているか?
モニタリング
- データドリフトを検出しているか?
- モデル性能低下時にアラートが来るか?
コスト
- スポットインスタンスを活用しているか?
- モデル軽量化を検討したか?
현재 단락 (1/810)
1. [MLOpsとは](#1-mlopsとは)