- Published on
MLflow実験管理完全ガイド:実験追跡・モデルレジストリ・デプロイパイプライン構築
- Authors
- Name
- はじめに
- MLflowアーキテクチャ
- 実験追跡(Experiment Tracking)
- Model Registry
- デプロイパイプライン
- 実験追跡プラットフォーム比較
- Transformers統合
- トラブルシューティング
- 運用ノート
- プロダクションチェックリスト
- 参考資料
はじめに
機械学習プロジェクトが規模を拡大すると、最初に直面する課題は実験管理である。数十回のハイパーパラメータチューニング、様々な特徴量の組み合わせ、複数のアルゴリズム比較実験をスプレッドシートやノートで管理するには限界がある。実験結果を再現できない、あるいはどのモデルが本番環境にデプロイされているか追跡できないという状況が頻繁に発生する。
MLflowは、これらの問題を解決するためにDatabricksで開発されたオープンソースのMLOpsプラットフォームである。Tracking、Model Registry、Model Servingという3つのコアコンポーネントを通じて、MLライフサイクル全体を管理する。本記事では、MLflowのアーキテクチャから本番デプロイまで、プロダクション環境でMLflowを効果的に運用する方法を解説する。
MLflowアーキテクチャ
コアコンポーネント構成
MLflowは大きく4つのコンポーネントで構成される。
| コンポーネント | 役割 | ストレージ |
|---|---|---|
| Tracking Server | 実験パラメータ・メトリクス・アーティファクトの記録 | Backend Store + Artifact Store |
| Model Registry | モデルバージョン管理・ステージ遷移 | Backend Store |
| Model Serving | REST APIによるモデルデプロイ | コンテナ/クラウド |
| Projects | 再現可能な実験パッケージング | Gitまたはローカル |
Tracking Serverデプロイアーキテクチャ
本番環境ではリモートTracking Serverを構成する必要がある。Backend StoreにPostgreSQL、Artifact StoreにS3を使用するのが一般的である。
# tracking_server_config.py
"""
MLflow Tracking Server 本番設定
Backend Store: PostgreSQL
Artifact Store: S3
"""
import os
TRACKING_CONFIG = {
"backend_store_uri": "postgresql://mlflow:password@db-host:5432/mlflow",
"default_artifact_root": "s3://mlflow-artifacts/experiments",
"host": "0.0.0.0",
"port": 5000,
"workers": 4,
}
# MLflow Tracking Server起動
mlflow server \
--backend-store-uri postgresql://mlflow:password@db-host:5432/mlflow \
--default-artifact-root s3://mlflow-artifacts/experiments \
--host 0.0.0.0 \
--port 5000 \
--workers 4
# Docker Composeで起動
docker compose up -d mlflow-server
# docker-compose.yaml
version: '3.8'
services:
mlflow-db:
image: postgres:15
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: mlflow_password
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- '5432:5432'
mlflow-server:
build: ./mlflow
depends_on:
- mlflow-db
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
MLFLOW_DEFAULT_ARTIFACT_ROOT: s3://mlflow-artifacts/experiments
AWS_ACCESS_KEY_ID: your-access-key
AWS_SECRET_ACCESS_KEY: your-secret-key
ports:
- '5000:5000'
command: >
mlflow server
--backend-store-uri postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/experiments
--host 0.0.0.0
--port 5000
--workers 4
volumes:
pgdata:
実験追跡(Experiment Tracking)
基本的な実験ロギング
MLflowの実験追跡はRun単位で行われる。各Runにはパラメータ、メトリクス、アーティファクトを記録できる。
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.datasets import load_iris
# Tracking Serverへ接続
mlflow.set_tracking_uri("http://mlflow-server:5000")
# 実験の作成または既存実験の選択
mlflow.set_experiment("iris-classification")
# データ準備
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# 実験実行
with mlflow.start_run(run_name="rf-baseline-v1") as run:
# ハイパーパラメータのロギング
params = {
"n_estimators": 100,
"max_depth": 5,
"min_samples_split": 2,
"random_state": 42,
}
mlflow.log_params(params)
# モデル学習
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# 予測とメトリクス計算
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_macro": f1_score(y_test, y_pred, average="macro"),
"precision_macro": precision_score(y_test, y_pred, average="macro"),
"recall_macro": recall_score(y_test, y_pred, average="macro"),
}
mlflow.log_metrics(metrics)
# モデルアーティファクトのロギング
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="iris-classifier",
)
# 追加アーティファクトのロギング(例:混同行列画像)
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
fig, ax = plt.subplots(figsize=(8, 6))
ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
fig.savefig("/tmp/confusion_matrix.png")
mlflow.log_artifact("/tmp/confusion_matrix.png", "plots")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
自動ロギング(Autologging)
MLflowはscikit-learn、PyTorch、TensorFlow、XGBoostなどの主要フレームワークに対する自動ロギングをサポートしている。1行のコードでパラメータ、メトリクス、モデルを自動的に記録できる。
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
# 自動ロギングの有効化
mlflow.sklearn.autolog(
log_input_examples=True, # 入力データ例の保存
log_model_signatures=True, # モデルシグネチャの自動検出
log_models=True, # モデルアーティファクトの自動保存
log_datasets=True, # 学習データセット情報の保存
silent=False, # ロギングメッセージの表示
)
mlflow.set_experiment("iris-autolog-experiment")
with mlflow.start_run(run_name="gbc-autolog"):
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=3,
learning_rate=0.1,
random_state=42,
)
# autologがfit呼び出し時に自動でパラメータ/メトリクス/モデルを記録
model.fit(X_train, y_train)
# クロスバリデーションスコアも自動記録
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
mlflow.log_metric("cv_mean_accuracy", cv_scores.mean())
mlflow.log_metric("cv_std_accuracy", cv_scores.std())
PyTorchディープラーニング実験追跡
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
mlflow.set_experiment("pytorch-classification")
class SimpleNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
# 学習設定
config = {
"input_dim": 4,
"hidden_dim": 64,
"output_dim": 3,
"learning_rate": 0.001,
"epochs": 50,
"batch_size": 16,
}
with mlflow.start_run(run_name="pytorch-simplenet"):
mlflow.log_params(config)
model = SimpleNet(config["input_dim"], config["hidden_dim"], config["output_dim"])
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
X_tensor = torch.FloatTensor(X_train)
y_tensor = torch.LongTensor(y_train)
dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True)
for epoch in range(config["epochs"]):
model.train()
total_loss = 0
for batch_X, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(dataloader)
# エポックごとのメトリクスロギング
mlflow.log_metric("train_loss", avg_loss, step=epoch)
# 検証
model.eval()
with torch.no_grad():
X_test_tensor = torch.FloatTensor(X_test)
test_outputs = model(X_test_tensor)
_, predicted = torch.max(test_outputs, 1)
val_acc = (predicted.numpy() == y_test).mean()
mlflow.log_metric("val_accuracy", val_acc, step=epoch)
# モデル保存
mlflow.pytorch.log_model(model, "pytorch-model")
MLflow Search API
実験結果をプログラム的に検索・比較できる。
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient(tracking_uri="http://mlflow-server:5000")
# 特定実験の全Run照会
experiment = client.get_experiment_by_name("iris-classification")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.accuracy > 0.9 AND params.n_estimators = '100'",
order_by=["metrics.f1_macro DESC"],
max_results=10,
)
# 結果出力
for run in runs:
print(f"Run ID: {run.info.run_id}")
print(f" Accuracy: {run.data.metrics.get('accuracy', 'N/A')}")
print(f" F1 Score: {run.data.metrics.get('f1_macro', 'N/A')}")
print(f" Params: {run.data.params}")
print("---")
# 2つのRunの比較
run1 = runs[0]
run2 = runs[1] if len(runs) > 1 else None
if run2:
print("=== Run Comparison ===")
for metric_key in run1.data.metrics:
v1 = run1.data.metrics[metric_key]
v2 = run2.data.metrics.get(metric_key, "N/A")
print(f" {metric_key}: {v1} vs {v2}")
Model Registry
モデル登録とバージョン管理
Model Registryはモデルのライフサイクルを管理する中央リポジトリである。モデルを登録すると自動的にバージョンが付与され、Staging、Production、Archivedステージ間の遷移が可能になる。
from mlflow.tracking import MlflowClient
client = MlflowClient()
# モデル登録(学習Runから直接登録)
model_name = "iris-classifier"
result = mlflow.register_model(
model_uri=f"runs:/{run.info.run_id}/model",
name=model_name,
)
print(f"Model Version: {result.version}")
# モデルバージョンに説明を追加
client.update_model_version(
name=model_name,
version=result.version,
description="RandomForest baseline model with 100 trees, accuracy 0.95",
)
# モデルバージョンにタグを追加
client.set_model_version_tag(
name=model_name,
version=result.version,
key="validation_status",
value="approved",
)
モデルAliasとステージ遷移
MLflow 2.xからはAliasを使用したモデル参照が推奨されている。従来のStage方式(Staging/Production/Archived)も引き続きサポートされている。
from mlflow.tracking import MlflowClient
client = MlflowClient()
model_name = "iris-classifier"
# Alias方式(MLflow 2.x推奨)
# champion aliasの設定
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=3,
)
# challenger aliasの設定
client.set_registered_model_alias(
name=model_name,
alias="challenger",
version=4,
)
# Aliasによるモデルロード
champion_model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
challenger_model = mlflow.pyfunc.load_model(f"models:/{model_name}@challenger")
# 予測の比較
champion_pred = champion_model.predict(X_test)
challenger_pred = challenger_model.predict(X_test)
print(f"Champion Accuracy: {accuracy_score(y_test, champion_pred)}")
print(f"Challenger Accuracy: {accuracy_score(y_test, challenger_pred)}")
# Challengerの方が良ければChampionに昇格
if accuracy_score(y_test, challenger_pred) > accuracy_score(y_test, champion_pred):
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=4,
)
print("Challenger promoted to Champion!")
モデル承認ワークフロー
本番環境ではモデルデプロイ前に承認プロセスが必要である。
def model_approval_workflow(model_name, version):
"""モデル承認ワークフロー"""
client = MlflowClient()
# ステップ1: モデル検証メトリクスの確認
model_version = client.get_model_version(model_name, version)
run = client.get_run(model_version.run_id)
accuracy = run.data.metrics.get("accuracy", 0)
f1 = run.data.metrics.get("f1_macro", 0)
# ステップ2: 品質基準の確認
quality_gates = {
"accuracy >= 0.90": accuracy >= 0.90,
"f1_macro >= 0.85": f1 >= 0.85,
}
all_passed = all(quality_gates.values())
print("=== Quality Gate Results ===")
for gate, passed in quality_gates.items():
status = "PASS" if passed else "FAIL"
print(f" {gate}: {status}")
# ステップ3: 承認結果に応じてAlias設定
if all_passed:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="approved"
)
# Staging Aliasの付与
client.set_registered_model_alias(
name=model_name, alias="staging", version=version
)
print(f"Model v{version} approved and moved to staging")
return True
else:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="rejected"
)
print(f"Model v{version} rejected - quality gates not met")
return False
# ワークフロー実行
model_approval_workflow("iris-classifier", 5)
デプロイパイプライン
Dockerベースデプロイ
# Dockerfile.mlflow-serve
FROM python:3.11-slim
RUN pip install mlflow[extras] boto3 psycopg2-binary
ENV MLFLOW_TRACKING_URI=http://mlflow-server:5000
ENV MODEL_NAME=iris-classifier
ENV MODEL_ALIAS=champion
EXPOSE 8080
CMD mlflow models serve \
--model-uri "models:/${MODEL_NAME}@${MODEL_ALIAS}" \
--host 0.0.0.0 \
--port 8080 \
--workers 2 \
--no-conda
# Dockerイメージのビルドと実行
docker build -t mlflow-model-serve -f Dockerfile.mlflow-serve .
docker run -p 8080:8080 \
-e AWS_ACCESS_KEY_ID=your-key \
-e AWS_SECRET_ACCESS_KEY=your-secret \
mlflow-model-serve
# 予測リクエストテスト
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [[5.1, 3.5, 1.4, 0.2]]}'
Kubernetesデプロイ
# k8s/mlflow-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: iris-classifier-serving
labels:
app: iris-classifier
spec:
replicas: 3
selector:
matchLabels:
app: iris-classifier
template:
metadata:
labels:
app: iris-classifier
spec:
containers:
- name: model-server
image: mlflow-model-serve:latest
ports:
- containerPort: 8080
env:
- name: MLFLOW_TRACKING_URI
value: 'http://mlflow-server.mlflow.svc.cluster.local:5000'
- name: MODEL_NAME
value: 'iris-classifier'
- name: MODEL_ALIAS
value: 'champion'
resources:
requests:
cpu: '500m'
memory: '512Mi'
limits:
cpu: '1000m'
memory: '1Gi'
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: iris-classifier-service
spec:
selector:
app: iris-classifier
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: iris-classifier-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: model.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: iris-classifier-service
port:
number: 80
GitHub ActionsによるCI/CD
# .github/workflows/model-deploy.yaml
name: Model Deployment Pipeline
on:
workflow_dispatch:
inputs:
model_name:
description: 'Model name in registry'
required: true
default: 'iris-classifier'
model_version:
description: 'Model version to deploy'
required: true
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install mlflow boto3 scikit-learn
- name: Validate model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
python scripts/validate_model.py \
--model-name ${{ github.event.inputs.model_name }} \
--model-version ${{ github.event.inputs.model_version }}
deploy-staging:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
kubectl apply -f k8s/staging/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
kubectl apply -f k8s/production/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
- name: Update MLflow alias
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
run: |
python -c "
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.set_registered_model_alias(
name='${{ github.event.inputs.model_name }}',
alias='champion',
version=${{ github.event.inputs.model_version }}
)
"
実験追跡プラットフォーム比較
| 機能 | MLflow | Weights and Biases | Neptune | CometML |
|---|---|---|---|---|
| ライセンス | オープンソース(Apache 2.0) | 商用(無料枠あり) | 商用(無料枠あり) | 商用(無料枠あり) |
| セルフホスティング | 完全対応 | 制限あり | 対応 | 対応 |
| 実験追跡 | 優秀 | 非常に優秀 | 優秀 | 優秀 |
| モデルレジストリ | 標準搭載 | 外部連携が必要 | 制限あり | 制限あり |
| コラボレーション | 基本的 | 非常に優秀(レポート) | 優秀 | 優秀 |
| 可視化 | 基本的 | 非常に優秀 | 優秀 | 優秀 |
| 自動ロギング | 主要フレームワーク | 幅広く対応 | 幅広く対応 | 幅広く対応 |
| Kubernetes連携 | ネイティブ対応 | 制限あり | 制限あり | 制限あり |
| ハイパーパラメータチューニング | Optuna連携 | Sweeps内蔵 | Optuna連携 | Optimizer内蔵 |
| データバージョン管理 | 基本的 | Artifacts | 基本的 | 基本的 |
| 学習コスト | 中程度 | 低い | 中程度 | 低い |
| コミュニティ | 非常に活発 | 活発 | 中程度 | 中程度 |
プラットフォーム選択ガイド
- セルフホスティング必須、オープンソース優先: MLflow
- チームコラボレーション・実験可視化重視: Weights and Biases
- 精密なメトリクス管理: Neptune
- 迅速な導入、シンプルな設定: CometML
Transformers統合
HuggingFace TransformersとMLflow連携
import mlflow
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
TrainingArguments,
Trainer,
)
from datasets import load_dataset
mlflow.set_experiment("sentiment-analysis")
# データセット準備
dataset = load_dataset("imdb", split="train[:1000]")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.2)
# MLflow自動ロギングの有効化
mlflow.transformers.autolog(log_models=True)
model = AutoModelForSequenceClassification.from_pretrained(
"distilbert-base-uncased", num_labels=2
)
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=100,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset["test"],
)
# 学習開始(自動的にMLflowにロギングされる)
with mlflow.start_run(run_name="distilbert-sentiment"):
trainer.train()
# 追加メトリクスの記録
eval_results = trainer.evaluate()
mlflow.log_metrics(eval_results)
トラブルシューティング
分散学習環境での実験追跡
分散学習時に複数のワーカーが同時にMLflowにロギングすると衝突が発生することがある。
import mlflow
import os
def setup_mlflow_distributed():
"""分散学習環境でのMLflow設定"""
rank = int(os.environ.get("RANK", 0))
local_rank = int(os.environ.get("LOCAL_RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
# Rank 0プロセスのみMLflowにロギング
if rank == 0:
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("distributed-training")
run = mlflow.start_run(run_name=f"dist-train-{world_size}gpu")
mlflow.log_param("world_size", world_size)
return run
else:
# 他のプロセスはロギング無効化
os.environ["MLFLOW_TRACKING_URI"] = ""
return None
def log_distributed_metrics(metrics, step, rank=0):
"""Rank 0のみメトリクスを記録"""
if rank == 0:
mlflow.log_metrics(metrics, step=step)
Registry競合の解決
複数のチームが同時にモデルを登録したりステージを変更する場合、競合が発生することがある。
from mlflow.tracking import MlflowClient
from mlflow.exceptions import MlflowException
import time
def safe_transition_model(model_name, version, target_alias, max_retries=3):
"""安全なモデルステージ遷移(リトライロジック付き)"""
client = MlflowClient()
for attempt in range(max_retries):
try:
# 現在のchampionを確認
try:
current_champion = client.get_model_version_by_alias(
model_name, target_alias
)
print(f"Current {target_alias}: v{current_champion.version}")
except MlflowException:
print(f"No current {target_alias} found")
# Alias遷移
client.set_registered_model_alias(
name=model_name,
alias=target_alias,
version=version,
)
print(f"Successfully set v{version} as {target_alias}")
return True
except MlflowException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数バックオフ
print(f"Failed to transition model after {max_retries} attempts")
return False
Artifact Storeアクセスエラー
S3をArtifact Storeとして使用する際に頻発する認証関連の問題と解決方法を示す。
import boto3
from botocore.exceptions import ClientError
def diagnose_artifact_access(bucket_name, prefix="experiments/"):
"""S3 Artifact Storeのアクセス診断"""
s3 = boto3.client("s3")
checks = {}
# 1. バケットアクセス確認
try:
s3.head_bucket(Bucket=bucket_name)
checks["bucket_access"] = "OK"
except ClientError as e:
checks["bucket_access"] = f"FAIL: {e.response['Error']['Code']}"
# 2. オブジェクトリスト確認
try:
response = s3.list_objects_v2(
Bucket=bucket_name, Prefix=prefix, MaxKeys=5
)
count = response.get("KeyCount", 0)
checks["list_objects"] = f"OK ({count} objects found)"
except ClientError as e:
checks["list_objects"] = f"FAIL: {e.response['Error']['Code']}"
# 3. 書き込み権限確認
try:
test_key = f"{prefix}_health_check"
s3.put_object(Bucket=bucket_name, Key=test_key, Body=b"test")
s3.delete_object(Bucket=bucket_name, Key=test_key)
checks["write_access"] = "OK"
except ClientError as e:
checks["write_access"] = f"FAIL: {e.response['Error']['Code']}"
print("=== S3 Artifact Store Diagnosis ===")
for check, result in checks.items():
print(f" {check}: {result}")
return checks
運用ノート
パフォーマンス最適化のヒント
- バッチロギングの使用:
mlflow.log_metrics()で複数メトリクスを一括記録し、API呼び出し回数を削減 - 非同期ロギング: 大規模アーティファクトは学習完了後に別プロセスでアップロード
- Tracking Serverキャッシング: Nginxリバースプロキシ前段でキャッシュ設定を行い読み取りパフォーマンス向上
- PostgreSQLインデックス: 実験検索が遅い場合は
runsテーブルに適切なインデックスを追加
セキュリティ考慮事項
- Tracking Serverの前に認証プロキシ(OAuth2 Proxy、Nginx Basic Auth)を配置
- S3バケットにVPCエンドポイントを適用し外部アクセスを遮断
- モデルアーティファクトの暗号化(SSE-S3またはSSE-KMS)を有効化
- RBAC(ロールベースアクセス制御)でチーム別の実験アクセスを制御
プロダクションチェックリスト
- [ ] Tracking Serverを専用サーバー/コンテナで分離運用
- [ ] Backend StoreをPostgreSQL/MySQLで構成(SQLite使用禁止)
- [ ] Artifact StoreをS3/GCS/Azure Blobで構成
- [ ] Tracking Serverの前に認証プロキシを配置
- [ ] Model Registryに承認ワークフローを適用
- [ ] モデルデプロイ時の自動検証(Quality Gate)パイプラインを構築
- [ ] 分散学習環境でRank 0のみロギングするように構成
- [ ] Artifact Storeに適切な保持ポリシー(Lifecycle Policy)を設定
- [ ] モニタリングダッシュボード(Grafana)でTracking Serverの状態を監視
- [ ] 定期的なデータベースバックアップとリカバリテストを実施
- [ ] CI/CDパイプラインにモデルデプロイ自動化を連携
- [ ] モデルサービングエンドポイントにヘルスチェックとオートスケーリングを構成