- Authors

- Name
- Youngju Kim
- @fjvbn20031
はじめに
現代の機械学習プロジェクトは、Jupyter Notebookでモデルを構築したら終わりではありません。データ収集、前処理、学習、評価、デプロイ、モニタリング、再学習というライフサイクル全体を確実に管理する必要があります。これこそがMLOpsが存在する理由です。
このガイドでは、MLOpsの基礎から実践的なツールの使い方まで、体系的に解説します。
1. MLOpsとは何か?
1.1 MLOpsの定義
MLOps(Machine Learning Operations)は、MLシステムの開発と運用を統合する方法論です。DevOpsの原則をMLワークフローに適用し、モデルの継続的学習(CT)、継続的インテグレーション(CI)、継続的デプロイ(CD)を自動化します。
1.2 MLOps vs DevOps vs DataOps
| カテゴリ | DevOps | DataOps | MLOps |
|---|---|---|---|
| 焦点 | ソフトウェアデプロイ | データパイプライン | MLモデルライフサイクル |
| 成果物 | アプリケーション | データ/レポート | MLモデル |
| 再現性 | コードバージョン管理 | データバージョン管理 | コード+データ+モデルのバージョン管理 |
| 自動化 | CI/CD | データテスト | 学習+評価+デプロイ |
MLOpsとDevOpsの最大の違いはデータ依存性です。同じコードでも、異なるデータで学習すると全く異なるモデルが生成され、モデルの性能はコード品質と同様にデータ品質にも強く依存します。
1.3 MLシステムの固有の特性
MLシステムには、従来のソフトウェアとは異なるいくつかの特性があります:
- データ依存性: モデルの動作はコードだけでなくデータによって決まる
- 実験的性質: 数十〜数百回の反復実験が必要
- モデルの劣化: データ分布の変化に伴い、モデル性能が低下する
- 再現性の課題: 同じ環境で同一の結果を保証することが難しい
- 複数のアーティファクト: コード、データ、モデルすべてにバージョン管理が必要
1.4 MLOps成熟度モデル
GoogleのMLOps成熟度レベルに基づく:
レベル0 - 手動ML
- すべてのプロセスが手動
- スクリプトベースの実験
- まれで手動のデプロイ
- モニタリングなし
レベル1 - MLパイプライン自動化
- 自動化された学習パイプライン
- 継続的学習が可能
- 実験トラッキングの開始
- フィーチャーストアの導入
レベル2 - CI/CDパイプライン自動化
- 完全自動化されたCI/CD
- モデルレジストリの使用
- 自動再学習トリガー
- 完全なモニタリング
1.5 MLOpsツールエコシステム
データバージョン管理: DVC, LakeFS, Delta Lake
実験トラッキング: MLflow, W&B, Neptune, Comet ML
パイプライン: Airflow, Prefect, Metaflow, Kubeflow Pipelines
モデルレジストリ: MLflow Registry, W&B Artifacts, Vertex AI
コンテナ化: Docker, Podman
オーケストレーション: Kubernetes, ECS, GKE
モデルサービング: Triton, TorchServe, BentoML, KServe
モニタリング: Evidently, WhyLogs, Arize, Fiddler
フィーチャーストア: Feast, Tecton, Vertex AI Feature Store
2. データバージョン管理(DVC)
2.1 DVCの概要
DVC(Data Version Control)は、Gitと連携して動作するMLプロジェクト用のバージョン管理ツールです。大規模なデータセットやMLモデルをGitと同様にバージョン管理できます。
# DVCのインストール
pip install dvc
# S3サポート付きインストール
pip install "dvc[s3]"
# GCSサポート付きインストール
pip install "dvc[gs]"
# 全リモートサポート付きインストール
pip install "dvc[all]"
2.2 DVCの初期化と基本的な使い方
# Gitリポジトリの初期化(必要な場合)
git init
# DVCの初期化
dvc init
# 作成されたファイルの確認
ls .dvc/
# config .gitignore tmp/
# データのトラッキング開始
dvc add data/train.csv
# .dvcファイルが作成される(Gitでトラッキング)
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"
2.3 リモートストレージの設定
# S3リモートストレージの設定
dvc remote add -d myremote s3://my-bucket/dvc-store
# AWS認証情報の設定(環境変数または~/.aws/credentials)
dvc remote modify myremote access_key_id YOUR_ACCESS_KEY
dvc remote modify myremote secret_access_key YOUR_SECRET_KEY
# GCSリモートストレージ
dvc remote add -d gcsstorage gs://my-bucket/dvc-store
# ローカルリモートストレージ(テスト用)
dvc remote add -d localremote /tmp/dvc-storage
# データのプッシュ
dvc push
# データのプル
dvc pull
2.4 DVCパイプライン
DVCパイプラインはステージ間の依存関係をトラッキングし、変更されたステージのみを再実行します。
# dvc.yaml
stages:
prepare:
cmd: python src/prepare.py
deps:
- src/prepare.py
- data/raw
outs:
- data/prepared
featurize:
cmd: python src/featurize.py
deps:
- src/featurize.py
- data/prepared
outs:
- data/features
train:
cmd: python src/train.py
deps:
- src/train.py
- data/features
params:
- params.yaml:
- train.lr
- train.n_estimators
outs:
- models/model.pkl
metrics:
- metrics/scores.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pkl
- data/features
metrics:
- metrics/eval.json:
cache: false
# パイプラインの実行
dvc repro
# 特定のステージまで実行
dvc repro train
# パイプラインDAGの可視化
dvc dag
# 実験結果の比較
dvc metrics show
dvc metrics diff HEAD~1
2.5 パラメータ管理
# params.yaml
train:
lr: 0.001
n_estimators: 100
max_depth: 5
batch_size: 32
epochs: 10
data:
test_size: 0.2
random_state: 42
# src/train.py
import yaml
# パラメータの読み込み
with open("params.yaml") as f:
params = yaml.safe_load(f)
lr = params["train"]["lr"]
n_estimators = params["train"]["n_estimators"]
# モデルの学習
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=params["train"]["max_depth"],
random_state=params["data"]["random_state"]
)
2.6 Git + DVCワークフロー
# 新しい実験ブランチの作成
git checkout -b experiment/increase-lr
# パラメータの変更
# params.yamlで lr: 0.001 を lr: 0.01 に変更
# パイプラインの再実行
dvc repro
# 結果の確認
dvc metrics show
# コミット
git add dvc.lock params.yaml
git commit -m "Experiment: increase learning rate to 0.01"
# 実験の比較
git checkout main
dvc metrics diff experiment/increase-lr
3. 実験トラッキング
3.1 MLflowのインストールと設定
# MLflowのインストール
pip install mlflow
# MLflow UIの起動
mlflow ui
# ホストとポートの指定
mlflow ui --host 0.0.0.0 --port 5001
# リモートトラッキングサーバーの設定
export MLFLOW_TRACKING_URI=http://mlflow-server:5000
3.2 MLflowの基本的な使い方
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
# トラッキングURIの設定(デフォルト: ./mlruns)
mlflow.set_tracking_uri("http://localhost:5000")
# 実験の設定
mlflow.set_experiment("my-classification-experiment")
# 実験の実行
with mlflow.start_run(run_name="random-forest-v1"):
# パラメータのログ
n_estimators = 100
max_depth = 5
lr = 0.001
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
mlflow.log_param("learning_rate", lr)
# モデルの学習
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)
# メトリクスのログ
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# ステップごとのメトリクス(学習曲線)
for epoch in range(10):
train_loss = np.random.random() * 0.5 / (epoch + 1)
mlflow.log_metric("train_loss", train_loss, step=epoch)
# アーティファクトのログ
mlflow.log_artifact("data/train.csv")
# モデルのログ
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="RandomForestClassifier"
)
print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
print(f"Run ID: {mlflow.active_run().info.run_id}")
3.3 MLflow自動ログ
# sklearnの自動ログを有効化
mlflow.sklearn.autolog()
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# パラメータ、メトリクス、モデルが自動的にログされる
# PyTorchの自動ログ
mlflow.pytorch.autolog()
# XGBoostの自動ログ
mlflow.xgboost.autolog()
3.4 実例:PyTorch学習のトラッキング
import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch
class SimpleNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.dropout(x)
return self.fc2(x)
def train_with_mlflow(
model, train_loader, val_loader,
epochs=10, lr=0.001, experiment_name="pytorch-training"
):
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
# ハイパーパラメータのログ
mlflow.log_params({
"epochs": epochs,
"learning_rate": lr,
"model_type": "SimpleNet",
"optimizer": "Adam",
"batch_size": train_loader.batch_size,
})
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
for epoch in range(epochs):
# 学習
model.train()
train_loss = 0.0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
# 検証
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for batch_X, batch_y in val_loader:
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
val_loss += loss.item()
_, predicted = outputs.max(1)
total += batch_y.size(0)
correct += predicted.eq(batch_y).sum().item()
val_loss /= len(val_loader)
val_accuracy = correct / total
# ステップごとのメトリクスログ
mlflow.log_metrics({
"train_loss": train_loss,
"val_loss": val_loss,
"val_accuracy": val_accuracy,
"learning_rate": scheduler.get_last_lr()[0],
}, step=epoch)
scheduler.step()
print(f"Epoch {epoch+1}/{epochs} | "
f"Train Loss: {train_loss:.4f} | "
f"Val Loss: {val_loss:.4f} | "
f"Val Acc: {val_accuracy:.4f}")
# 最終モデルの保存
mlflow.pytorch.log_model(model, "model")
return mlflow.active_run().info.run_id
3.5 W&B(Weights & Biases)の使い方
# W&Bのインストール
pip install wandb
# ログイン
wandb login
import wandb
import torch
# W&Bの初期化
wandb.init(
project="my-ml-project",
name="experiment-001",
config={
"learning_rate": 0.001,
"epochs": 100,
"batch_size": 64,
"architecture": "ResNet50",
"dataset": "ImageNet",
}
)
# 設定値へのアクセス
lr = wandb.config.learning_rate
# 学習ループ
for epoch in range(wandb.config.epochs):
train_loss, train_acc = train_epoch(model, train_loader)
val_loss, val_acc = eval_epoch(model, val_loader)
# メトリクスのログ
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"train_accuracy": train_acc,
"val_loss": val_loss,
"val_accuracy": val_acc,
"learning_rate": optimizer.param_groups[0]['lr'],
})
# モデルの保存
wandb.save("model.pth")
# 画像のログ
wandb.log({
"predictions": [
wandb.Image(img, caption=f"Pred: {pred}, True: {true}")
for img, pred, true in sample_predictions
]
})
# W&B Artifacts(データセットのバージョン管理)
artifact = wandb.Artifact("training-data", type="dataset")
artifact.add_dir("data/train")
wandb.log_artifact(artifact)
wandb.finish()
3.6 W&B Sweeps(ハイパーパラメータ最適化)
import wandb
# スイープ設定
sweep_config = {
"method": "bayes", # random, grid, bayes
"metric": {
"name": "val_accuracy",
"goal": "maximize"
},
"parameters": {
"learning_rate": {
"distribution": "log_uniform_values",
"min": 1e-5,
"max": 1e-1,
},
"batch_size": {
"values": [16, 32, 64, 128]
},
"hidden_dim": {
"values": [64, 128, 256, 512]
},
"dropout": {
"distribution": "uniform",
"min": 0.1,
"max": 0.5,
}
}
}
def train_sweep():
with wandb.init() as run:
config = wandb.config
model = SimpleNet(
input_dim=784,
hidden_dim=config.hidden_dim,
output_dim=10
)
optimizer = torch.optim.Adam(
model.parameters(),
lr=config.learning_rate
)
for epoch in range(10):
train_loss, val_loss, val_acc = run_epoch(
model, optimizer, config.batch_size
)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"val_accuracy": val_acc,
})
# スイープの作成と実行
sweep_id = wandb.sweep(sweep_config, project="my-project")
wandb.agent(sweep_id, function=train_sweep, count=50)
4. MLパイプラインオーケストレーション
4.1 Apache AirflowによるMLパイプライン
# dags/ml_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'mlops-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='ML model training pipeline',
schedule_interval='0 2 * * *', # 毎日午前2時
catchup=False,
)
def extract_data(**kwargs):
"""データの抽出"""
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@db:5432/mldb')
df = pd.read_sql(
"SELECT * FROM features WHERE date >= CURRENT_DATE - 7",
engine
)
output_path = '/tmp/raw_data.parquet'
df.to_parquet(output_path)
kwargs['ti'].xcom_push(key='data_path', value=output_path)
return output_path
def preprocess_data(**kwargs):
"""データの前処理"""
ti = kwargs['ti']
data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(data_path)
scaler = StandardScaler()
features = df.drop('target', axis=1)
scaled_features = scaler.fit_transform(features)
processed_path = '/tmp/processed_data.parquet'
pd.DataFrame(scaled_features).to_parquet(processed_path)
ti.xcom_push(key='processed_path', value=processed_path)
def train_model(**kwargs):
"""モデルの学習"""
ti = kwargs['ti']
processed_path = ti.xcom_pull(
task_ids='preprocess_data',
key='processed_path'
)
import mlflow
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
mlflow.set_experiment("production-training")
with mlflow.start_run():
data = pd.read_parquet(processed_path)
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y)
model = GradientBoostingClassifier(n_estimators=200)
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "model")
run_id = mlflow.active_run().info.run_id
ti.xcom_push(key='run_id', value=run_id)
def evaluate_and_deploy(**kwargs):
"""モデルの評価とデプロイ"""
ti = kwargs['ti']
run_id = ti.xcom_pull(task_ids='train_model', key='run_id')
import mlflow
client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id)
accuracy = run.data.metrics['accuracy']
if accuracy >= 0.90:
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "ProductionModel")
print(f"Model deployed with accuracy: {accuracy:.4f}")
else:
raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.90")
# タスクの定義
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
preprocess_task = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
dag=dag,
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag,
)
evaluate_task = PythonOperator(
task_id='evaluate_and_deploy',
python_callable=evaluate_and_deploy,
dag=dag,
)
# 依存関係の設定
extract_task >> preprocess_task >> train_task >> evaluate_task
4.2 PrefectによるMLパイプライン
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(data_path: str) -> pd.DataFrame:
"""データの読み込み(キャッシュ付き)"""
return pd.read_parquet(data_path)
@task(retries=3, retry_delay_seconds=10)
def preprocess(df: pd.DataFrame) -> pd.DataFrame:
"""データの前処理"""
df = df.dropna()
df = df.drop_duplicates()
return df
@task
def train(df: pd.DataFrame, params: dict) -> str:
"""モデルの学習"""
from sklearn.ensemble import RandomForestClassifier
mlflow.set_experiment("prefect-ml-pipeline")
with mlflow.start_run():
model = RandomForestClassifier(**params)
X = df.drop('target', axis=1)
y = df['target']
model.fit(X, y)
mlflow.sklearn.log_model(model, "model")
mlflow.log_params(params)
return mlflow.active_run().info.run_id
@task
def evaluate(run_id: str) -> float:
"""モデルの評価"""
client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id)
return run.data.metrics.get('accuracy', 0.0)
@flow(name="ml-training-pipeline")
def ml_pipeline(data_path: str, params: dict = None):
"""メインパイプライン"""
if params is None:
params = {"n_estimators": 100, "max_depth": 5}
df = load_data(data_path)
processed_df = preprocess(df)
run_id = train(processed_df, params)
accuracy = evaluate(run_id)
print(f"Pipeline complete. Accuracy: {accuracy:.4f}")
return accuracy
if __name__ == "__main__":
ml_pipeline(
data_path="data/train.parquet",
params={"n_estimators": 200, "max_depth": 7}
)
5. モデルレジストリ
5.1 MLflowモデルレジストリ
モデルレジストリは、モデルバージョンの一元管理と、Staging/Productionの状態追跡を提供します。
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# モデルの登録
model_uri = "runs:/abc123def456/model"
registered_model = mlflow.register_model(model_uri, "MyClassifier")
# バージョン情報の確認
print(f"Version: {registered_model.version}")
print(f"Status: {registered_model.status}")
# モデルバージョンのメタデータ更新
client.update_model_version(
name="MyClassifier",
version=registered_model.version,
description="Random Forest with 200 estimators, accuracy 0.94"
)
# タグの追加
client.set_model_version_tag(
name="MyClassifier",
version=registered_model.version,
key="validated_by",
value="data-science-team"
)
5.2 StagingからProductionへの昇格
# Stagingへの移行
client.transition_model_version_stage(
name="MyClassifier",
version=1,
stage="Staging",
archive_existing_versions=False
)
# 検証後にProductionへ昇格
def promote_to_production(model_name: str, version: int, min_accuracy: float = 0.90):
"""モデルをProductionに昇格"""
model_version = client.get_model_version(model_name, version)
run_id = model_version.run_id
run = client.get_run(run_id)
accuracy = run.data.metrics.get('accuracy', 0)
if accuracy < min_accuracy:
raise ValueError(
f"Model accuracy {accuracy:.4f} is below minimum {min_accuracy}"
)
# Productionへの昇格(既存のProductionバージョンをアーカイブ)
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
archive_existing_versions=True
)
print(f"Model {model_name} v{version} promoted to Production!")
print(f"Accuracy: {accuracy:.4f}")
# Productionモデルの読み込み
production_model = mlflow.pyfunc.load_model(
model_uri=f"models:/MyClassifier/Production"
)
predictions = production_model.predict(X_test)
5.3 モデルサービング(MLflow組み込み)
# MLflowモデルのサービング
mlflow models serve \
-m "models:/MyClassifier/Production" \
--host 0.0.0.0 \
--port 5001
# 予測リクエスト
curl -X POST http://localhost:5001/invocations \
-H "Content-Type: application/json" \
-d '{"dataframe_split": {"columns": ["feature1", "feature2"], "data": [[1.0, 2.0]]}}'
6. コンテナ化(Docker)
6.1 ML環境のDockerization
# Dockerfile.train - 学習用イメージ
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
gcc \
g++ \
git \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY params.yaml .
COPY dvc.yaml .
CMD ["python", "src/train.py"]
6.2 マルチステージビルド
# Dockerfile.serve - サービング用マルチステージビルド
# ステージ1: ビルダー
FROM python:3.11 AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir --target /install -r requirements.txt
# ステージ2: 最終スリムイメージ
FROM python:3.11-slim AS runtime
WORKDIR /app
COPY /install /usr/local/lib/python3.11/site-packages
COPY src/serve.py .
COPY models/ ./models/
EXPOSE 8080
RUN useradd -m -u 1000 mluser
USER mluser
CMD ["python", "serve.py"]
6.3 GPU Docker(nvidia-docker)
# Dockerfile.gpu - GPU対応イメージ
FROM nvidia/cuda:12.1.0-cudnn8-devel-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
python3.11 \
python3-pip \
git \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install torch torchvision torchaudio \
--index-url https://download.pytorch.org/whl/cu121
WORKDIR /app
COPY requirements-gpu.txt .
RUN pip3 install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
CMD ["python3", "src/train_gpu.py"]
# GPUコンテナの実行
docker run --gpus all \
-v /data:/data \
-v /models:/models \
--shm-size=8gb \
my-ml-gpu:latest
6.4 Docker ComposeによるMLスタック
# docker-compose.yml
version: '3.8'
services:
mlflow:
image: ghcr.io/mlflow/mlflow:latest
command: >
mlflow server
--backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
--default-artifact-root s3://my-bucket/mlflow
--host 0.0.0.0
--port 5000
ports:
- '5000:5000'
depends_on:
- postgres
postgres:
image: postgres:15
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- '9000:9000'
- '9001:9001'
volumes:
- minio_data:/data
trainer:
build:
context: .
dockerfile: Dockerfile.train
environment:
MLFLOW_TRACKING_URI: http://mlflow:5000
volumes:
- ./data:/app/data
- ./models:/app/models
depends_on:
- mlflow
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
postgres_data:
minio_data:
6.5 実例:PyTorchモデルサーバーのDockerfile
# Dockerfile.pytorch-serve
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir \
torch==2.2.0+cpu \
torchvision \
fastapi \
uvicorn \
pydantic \
numpy \
pillow \
--index-url https://download.pytorch.org/whl/cpu
COPY src/server.py .
COPY models/model.pt .
EXPOSE 8000
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# src/server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import List
app = FastAPI(title="ML Model Server")
# サーバー起動時にモデルを一度読み込む
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: List[List[float]]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
tensor = torch.tensor(request.features, dtype=torch.float32)
with torch.no_grad():
outputs = model(tensor)
probabilities = torch.softmax(outputs, dim=1)
predictions = torch.argmax(probabilities, dim=1)
return PredictionResponse(
predictions=predictions.tolist(),
probabilities=probabilities.tolist()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
7. ML向けKubernetes
7.1 MLワークロード向けK8sの基礎
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: ml-platform
labels:
app: ml-platform
environment: production
7.2 学習ジョブ
# k8s/training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: ml-training-job
namespace: ml-platform
spec:
backoffLimit: 3
template:
spec:
restartPolicy: OnFailure
nodeSelector:
accelerator: nvidia-t4
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: trainer
image: my-registry/ml-trainer:v1.2.0
resources:
requests:
memory: '8Gi'
cpu: '4'
nvidia.com/gpu: '1'
limits:
memory: '16Gi'
cpu: '8'
nvidia.com/gpu: '1'
env:
- name: MLFLOW_TRACKING_URI
value: 'http://mlflow-service.ml-platform:5000'
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access_key_id
volumeMounts:
- name: training-data
mountPath: /data
- name: dshm
mountPath: /dev/shm
volumes:
- name: training-data
persistentVolumeClaim:
claimName: training-data-pvc
- name: dshm
emptyDir:
medium: Memory
sizeLimit: 8Gi
7.3 定期再学習のCronJob
# k8s/training-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: daily-retraining
namespace: ml-platform
spec:
schedule: '0 2 * * *' # 毎日午前2時
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: retrainer
image: my-registry/ml-trainer:latest
command: ['python', 'src/retrain.py']
resources:
requests:
memory: '4Gi'
cpu: '2'
limits:
memory: '8Gi'
cpu: '4'
7.4 HPAを使用したモデルサービングDeployment
# k8s/model-serving-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-server
namespace: ml-platform
spec:
replicas: 3
selector:
matchLabels:
app: model-server
template:
metadata:
labels:
app: model-server
spec:
containers:
- name: model-server
image: my-registry/model-server:v1.0.0
ports:
- containerPort: 8000
resources:
requests:
memory: '2Gi'
cpu: '1'
limits:
memory: '4Gi'
cpu: '2'
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 15
periodSeconds: 5
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: model-server-hpa
namespace: ml-platform
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: model-server
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
8. Kubeflow
8.1 Kubeflowの概要
KubeflowはKubernetes上でMLワークフローを管理するオープンソースプラットフォームです。以下のコンポーネントで構成されています:
- Pipelines: MLワークフローオーケストレーション
- Notebooks: マネージドJupyterノートブック
- Katib: ハイパーパラメータチューニング
- KServe: モデルサービング
- Training Operator: 分散学習
8.2 Kubeflowパイプライン
# kubeflow_pipeline.py
import kfp
from kfp import dsl
@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(data_path: str, output_path: kfp.dsl.OutputPath(str)):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_parquet(data_path)
train, test = train_test_split(df, test_size=0.2)
train.to_parquet(f"{output_path}/train.parquet")
test.to_parquet(f"{output_path}/test.parquet")
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "mlflow", "pandas"]
)
def train_model(
data_path: str,
n_estimators: int,
max_depth: int,
model_output: kfp.dsl.OutputPath(str),
mlflow_uri: str
):
import mlflow
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
mlflow.set_tracking_uri(mlflow_uri)
df = pd.read_parquet(f"{data_path}/train.parquet")
X = df.drop('target', axis=1)
y = df['target']
with mlflow.start_run():
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth
)
model.fit(X, y)
run_id = mlflow.active_run().info.run_id
mlflow.sklearn.log_model(model, "model")
with open(model_output, 'w') as f:
f.write(run_id)
@dsl.pipeline(
name="ML Training Pipeline",
description="End-to-end ML training pipeline"
)
def ml_pipeline(
data_path: str = "gs://my-bucket/data/train.parquet",
n_estimators: int = 100,
max_depth: int = 5,
mlflow_uri: str = "http://mlflow:5000"
):
prepare_task = prepare_data(data_path=data_path)
train_task = train_model(
data_path=prepare_task.output,
n_estimators=n_estimators,
max_depth=max_depth,
mlflow_uri=mlflow_uri
)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="ml_pipeline.yaml"
)
client = kfp.Client(host="http://kubeflow-host/pipeline")
run = client.create_run_from_pipeline_package(
pipeline_file="ml_pipeline.yaml",
arguments={"n_estimators": 200, "max_depth": 7},
run_name="training-run-001"
)
9. ML向けCI/CD
9.1 ML向けGitHub Actions
# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'src/**'
- 'params.yaml'
- 'requirements.txt'
pull_request:
branches: [main]
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
test:
name: Unit Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run unit tests
run: pytest tests/unit/ -v --cov=src --cov-report=xml
train-and-evaluate:
name: Train and Evaluate Model
runs-on: ubuntu-latest
needs: test
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run DVC pipeline
run: dvc repro
- name: Check model performance
run: |
python scripts/check_metrics.py \
--min-accuracy 0.90 \
--metrics-file metrics/scores.json
- name: Register model
run: python scripts/register_model.py
build-and-push:
name: Build Docker Image
runs-on: ubuntu-latest
needs: train-and-evaluate
steps:
- uses: actions/checkout@v4
- name: Login to Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.serve
push: true
tags: |
ghcr.io/${{ github.repository }}/model-server:latest
ghcr.io/${{ github.repository }}/model-server:${{ github.sha }}
deploy:
name: Deploy to Kubernetes
runs-on: ubuntu-latest
needs: build-and-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy
run: |
kubectl set image deployment/model-server \
model-server=ghcr.io/${{ github.repository }}/model-server:${{ github.sha }} \
-n ml-platform
kubectl rollout status deployment/model-server -n ml-platform
10. モデルモニタリング
10.1 Evidently AIを使用したデータドリフト検出
pip install evidently
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import ColumnDriftMetric
# 参照データ(学習時のデータ)
reference_data = pd.read_parquet("data/reference.parquet")
# 現在のデータ(本番環境での予測データ)
current_data = pd.read_parquet("data/current.parquet")
# データドリフトレポート
drift_report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
ColumnDriftMetric(column_name="age"),
ColumnDriftMetric(column_name="income"),
])
drift_report.run(
reference_data=reference_data,
current_data=current_data
)
drift_report.save_html("drift_report.html")
result = drift_report.as_dict()
drift_detected = result['metrics'][0]['result']['dataset_drift']
if drift_detected:
print("Data drift detected! Consider retraining.")
send_alert("Data drift detected in production model")
10.2 モデルパフォーマンスモニタリング
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset
prediction_data = pd.DataFrame({
'target': y_true,
'prediction': y_pred,
'prediction_proba': y_proba,
'feature1': X_test['feature1'],
'feature2': X_test['feature2'],
})
performance_report = Report(metrics=[ClassificationPreset()])
performance_report.run(
reference_data=reference_predictions,
current_data=prediction_data
)
metrics = performance_report.as_dict()
current_accuracy = metrics['metrics'][0]['result']['current']['accuracy']
reference_accuracy = metrics['metrics'][0]['result']['reference']['accuracy']
degradation = reference_accuracy - current_accuracy
if degradation > 0.05:
trigger_retraining(f"Model degraded by {degradation:.2%}")
10.3 Prometheusによるリアルタイムモニタリング
from prometheus_client import Counter, Histogram, Gauge, start_http_server
prediction_counter = Counter('model_predictions_total', 'Total predictions')
prediction_latency = Histogram('model_prediction_duration_seconds', 'Prediction latency')
model_accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
drift_score_gauge = Gauge('data_drift_score', 'Data drift score')
class MonitoredModelServer:
def __init__(self, model, reference_data):
self.model = model
self.reference_data = reference_data
self.predictions_buffer = []
start_http_server(8001)
def predict(self, features):
with prediction_latency.time():
prediction = self.model.predict(features)
prediction_counter.inc()
self.predictions_buffer.append(prediction)
if len(self.predictions_buffer) >= 1000:
self._run_monitoring()
return prediction
def _run_monitoring(self):
current_df = pd.DataFrame(self.predictions_buffer)
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
reference_data=self.reference_data,
current_data=current_df
)
result = drift_report.as_dict()
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
drift_score_gauge.set(drift_share)
self.predictions_buffer = []
11. フィーチャーストア
11.1 フィーチャーストアの概念
フィーチャーストアは、ML特徴量の一元管理と再利用を可能にするインフラです。
主要な概念:
- オンラインストア: リアルタイム予測のための低レイテンシな特徴量取得(Redis、DynamoDB)
- オフラインストア: バッチ学習のための履歴特徴量(S3、BigQuery)
- フィーチャービュー: 特徴量定義とデータソースのマッピング
- エンティティ: 特徴量の主体(ユーザーID、商品IDなど)
11.2 Feastのセットアップと使い方
pip install feast
feast init my-feature-store
cd my-feature-store
# features.py
from feast import Entity, FeatureView, FileSource, ValueType, Field
from feast.types import Float64, Int64
from datetime import timedelta
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User ID"
)
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
)
user_stats_fv = FeatureView(
name="user_statistics",
entities=[user],
ttl=timedelta(days=30),
schema=[
Field(name="purchase_count_7d", dtype=Int64),
Field(name="purchase_amount_7d", dtype=Float64),
Field(name="avg_session_duration", dtype=Float64),
Field(name="last_purchase_days_ago", dtype=Int64),
],
online=True,
source=user_stats_source,
)
# feast_usage.py
from feast import FeatureStore
import pandas as pd
from datetime import datetime
store = FeatureStore(repo_path=".")
# オフライン特徴量の取得(学習用)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"event_timestamp": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"])
})
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_statistics:purchase_count_7d",
"user_statistics:purchase_amount_7d",
"user_statistics:avg_session_duration",
]
).to_df()
# オンラインストアへのマテリアライズ
store.materialize_incremental(end_date=datetime.now())
# オンライン特徴量の取得(リアルタイム推論用)
online_features = store.get_online_features(
features=[
"user_statistics:purchase_count_7d",
"user_statistics:purchase_amount_7d",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
]
).to_dict()
12. 実世界のMLOpsプロジェクト
12.1 完全なパイプラインアーキテクチャ
データソース(DB、S3、API)
↓
データ収集(Airflow DAG)
↓
データ検証(Great Expectations)
↓
特徴量エンジニアリング(Feast)
↓
モデル学習(MLflowトラッキング)
↓
モデル評価(自動検証)
↓
モデルレジストリ(MLflow Registry)
↓
CI/CD(GitHub Actions)
↓
コンテナビルド(Docker)
↓
K8sデプロイ(Kubernetes)
↓
サービング(FastAPI + Triton)
↓
モニタリング(Evidently + Prometheus + Grafana)
↓
アラート(ドリフト検出時に再学習をトリガー)
12.2 実例:顧客チャーン予測システム
# scripts/full_pipeline.py
import mlflow
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChurnPredictionPipeline:
"""顧客チャーン予測パイプライン"""
def __init__(self, mlflow_uri: str = "http://localhost:5000"):
mlflow.set_tracking_uri(mlflow_uri)
mlflow.set_experiment("churn-prediction")
self.scaler = StandardScaler()
def load_data(self, data_path: str) -> pd.DataFrame:
logger.info(f"Loading data from {data_path}")
df = pd.read_parquet(data_path)
assert df.shape[0] > 1000, "Too few samples"
assert 'churn' in df.columns, "Target column 'churn' missing"
missing_rate = df.isnull().mean()
high_missing = missing_rate[missing_rate > 0.5].index.tolist()
if high_missing:
logger.warning(f"Columns with >50% missing: {high_missing}")
df = df.drop(columns=high_missing)
return df
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
logger.info("Engineering features...")
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
if 'tenure_months' in df.columns and 'monthly_charges' in df.columns:
df['total_value'] = df['tenure_months'] * df['monthly_charges']
if 'num_support_tickets' in df.columns and 'tenure_months' in df.columns:
df['tickets_per_month'] = (
df['num_support_tickets'] / (df['tenure_months'] + 1)
)
return df
def train(self, df: pd.DataFrame, params: dict = None) -> str:
if params is None:
params = {
"n_estimators": 200,
"max_depth": 5,
"learning_rate": 0.05,
"subsample": 0.8,
"random_state": 42,
}
feature_cols = [c for c in df.columns if c != 'churn']
X = df[feature_cols]
y = df['churn']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
with mlflow.start_run(run_name="churn-gbt"):
mlflow.log_params(params)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("n_features", len(feature_cols))
model = GradientBoostingClassifier(**params)
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
y_proba = model.predict_proba(X_test_scaled)[:, 1]
accuracy = accuracy_score(y_test, y_pred)
auc_roc = roc_auc_score(y_test, y_proba)
mlflow.log_metrics({
"accuracy": accuracy,
"auc_roc": auc_roc,
})
cv_scores = cross_val_score(
model, X_train_scaled, y_train, cv=5, scoring='roc_auc'
)
mlflow.log_metric("cv_auc_mean", cv_scores.mean())
mlflow.log_metric("cv_auc_std", cv_scores.std())
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="ChurnPredictor"
)
run_id = mlflow.active_run().info.run_id
logger.info(f"Training complete. AUC-ROC: {auc_roc:.4f}")
return run_id
def promote_best_model(self, min_auc: float = 0.85):
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name("churn-prediction")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.auc_roc > 0.80",
order_by=["metrics.auc_roc DESC"],
max_results=1
)
if not runs:
raise ValueError("No runs found meeting criteria")
best_run = runs[0]
auc = best_run.data.metrics['auc_roc']
if auc < min_auc:
raise ValueError(f"Best AUC {auc:.4f} below minimum {min_auc}")
model_uri = f"runs:/{best_run.info.run_id}/model"
mv = mlflow.register_model(model_uri, "ChurnPredictor")
client.transition_model_version_stage(
name="ChurnPredictor",
version=mv.version,
stage="Production",
archive_existing_versions=True
)
logger.info(f"Model v{mv.version} promoted to Production! AUC: {auc:.4f}")
return mv.version
if __name__ == "__main__":
pipeline = ChurnPredictionPipeline()
df = pipeline.load_data("data/customers.parquet")
df = pipeline.engineer_features(df)
run_id = pipeline.train(df)
pipeline.promote_best_model(min_auc=0.85)
まとめ
MLOpsは単にツールを学ぶことではなく、MLシステムを確実に、再現可能に、スケーラブルに運用するための文化とプロセスです。
以下のコア原則を心に留めておいてください:
- すべてをバージョン管理する: コード、データ、モデル、環境
- まず自動化する: 手動ステップはエラーの源
- 計測してモニタリングする: 計測できないものは改善できない
- 障害を素早く検出する: ドリフトやパフォーマンス劣化を即座にキャッチ
- 再現性を保証する: 誰でも、どこでも、いつでも同じ結果を再現できるようにする
MLOpsの旅は段階的です。レベル0から始め、組織のニーズに合ったペースで成熟させていきましょう。