- Authors
- Name
- はじめに
- 実験トラッキングプラットフォーム比較
- MLflowトラッキングサーバーのスケーリング
- 実験トラッキングのベストプラクティス
- モデルレジストリのライフサイクル管理
- GitHub ActionsによるCI/CD統合
- マルチチーム実験組織
- 障害ケースと運用上の警告
- 本番モニタリングとクリーンアップ
- まとめ
- 参考文献
はじめに
ローカルでML実験を行うのは簡単です。しかし、複数チームにまたがって再現性・監査可能性・自動デプロイを確保しながらスケールさせることは、まったく別の課題です。MLflowは実験トラッキングとモデルライフサイクル管理のデファクトスタンダードとなるオープンソースプラットフォームですが、多くのチュートリアルはlocalhostでのmlflow.log_metric()で終わってしまいます。
本ガイドでは、本番グレードのMLflowワークフローを取り上げます。PostgreSQLとS3によるトラッキングサーバーのスケーリング、マルチチームコラボレーション向けの実験構造設計、エイリアスを活用したモデルレジストリのライフサイクル管理、GitHub ActionsによるCI/CDパイプライン統合、そしてスケール時にのみ顕在化する障害モードへの対処法を解説します。
実験トラッキングプラットフォーム比較
MLflowの詳細に入る前に、エコシステム内の他の実験トラッキングプラットフォームとの比較を把握しておきましょう。
| 機能 | MLflow | Weights and Biases | Neptune | ClearML |
|---|---|---|---|---|
| ライセンス | Apache 2.0 (OSS) | プロプライエタリ (無料枠あり) | プロプライエタリ (無料枠あり) | Apache 2.0 (OSS) |
| セルフホスト | 完全対応 | 制限あり | 制限あり | 完全対応 |
| 実験トラッキング | 強力 | 優秀 | 優秀 | 強力 |
| モデルレジストリ | 内蔵 | 内蔵 | メタデータのみ | 内蔵 |
| ハイパーパラメータ探索 | 手動 / Optuna | 内蔵 (Sweeps) | 連携経由 | 内蔵 (HPO) |
| アーティファクトストレージ | S3/GCS/Azure/HDFS | W and Bサーバー | Neptuneサーバー | S3/GCS/Azure |
| UIの品質 | 良好 | 優秀 | 優秀 | 良好 |
| フレームワーク統合 | 主要フレームワーク全対応 | 主要フレームワーク全対応 | 主要フレームワーク全対応 | 主要フレームワーク全対応 |
| 料金 (チーム) | 無料 (セルフホスト) | 約$50/ユーザー/月 | 約$79/ユーザー/月 | 無料 (セルフホスト) |
| CI/CD統合 | 任意 (オープンAPI) | GitHub/GitLab | GitHub/GitLab | GitHub/GitLab |
| データガバナンス | 完全制御 (自社管理) | ベンダー管理 | ベンダー管理 | 完全制御 (自社管理) |
MLflowはセルフホスティングの柔軟性とベンダー独立性で優位です。Weights and Biasesは可視化とコラボレーションUXに優れています。Neptuneはメタデータクエリが高度です。ClearMLは最も包括的なオープンソースパイプライン管理を提供します。チームの主要な制約(予算、ガバナンス、UIの洗練さ)に基づいて選択してください。
MLflowトラッキングサーバーのスケーリング
アーキテクチャ概要
本番環境のMLflowデプロイでは、3つの関心事を分離します:
- トラッキングサーバー - APIとUIのプロセス
- バックエンドストア - 実験メタデータ、パラメータ、メトリクス、タグ用のPostgreSQL
- アーティファクトストア - モデルファイル、プロット、大規模バイナリアーティファクト用のS3(またはS3互換)
PostgreSQLバックエンドとS3アーティファクトストア
# docker-compose.production.yml
services:
mlflow-server:
image: ghcr.io/mlflow/mlflow:v2.20.0
ports:
- '5000:5000'
environment:
MLFLOW_BACKEND_STORE_URI: 'postgresql://mlflow:${DB_PASSWORD}@postgres:5432/mlflowdb'
MLFLOW_DEFAULT_ARTIFACT_ROOT: 's3://mlflow-artifacts-prod/'
AWS_ACCESS_KEY_ID: '${AWS_ACCESS_KEY_ID}'
AWS_SECRET_ACCESS_KEY: '${AWS_SECRET_ACCESS_KEY}'
AWS_DEFAULT_REGION: 'ap-northeast-1'
command: >
mlflow server
--backend-store-uri postgresql://mlflow:${DB_PASSWORD}@postgres:5432/mlflowdb
--default-artifact-root s3://mlflow-artifacts-prod/
--host 0.0.0.0
--port 5000
--workers 4
--app-name basic-auth
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: '${DB_PASSWORD}'
POSTGRES_DB: mlflowdb
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U mlflow -d mlflowdb']
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
nginx:
image: nginx:1.27-alpine
ports:
- '443:443'
- '80:80'
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
depends_on:
- mlflow-server
restart: unless-stopped
volumes:
pgdata:
警告: 認証なしでMLflowサーバーをインターネットに直接公開しないでください。--app-name basic-authフラグで組み込みのHTTPベーシック認証を有効にします。本番環境では必ずTLS対応のNginxリバースプロキシをサーバーの前段に配置してください。
MLflowワークロード向けPostgreSQLチューニング
MLflowのトラッキングワークロードは、学習中はライトヘビー(頻繁なメトリクスロギング)、分析時はリードヘビー(UIクエリ)です。それに応じてPostgreSQLをチューニングします:
# postgresql.conf - MLflowワークロード向け調整
# PostgreSQL専用RAM 8GBを想定
shared_buffers = 2GB
effective_cache_size = 6GB
work_mem = 64MB
maintenance_work_mem = 512MB
# ライトヘビー最適化
wal_buffers = 64MB
checkpoint_completion_target = 0.9
max_wal_size = 4GB
# コネクションプーリング(同時学習ジョブ50以上の場合はPgBouncerを使用)
max_connections = 200
運用上の警告: 毎ステップ頻繁にメトリクスをロギングする同時学習ジョブが50以上実行される場合、コネクションプールが枯渇します。MLflowとPostgreSQLの間にトランザクションモードのPgBouncerをデプロイしてください。これがないと、ピーク負荷時にconnection refusedエラーで学習ジョブが失敗します。
実験トラッキングのベストプラクティス
チーム向け実験の構造化
import mlflow
from mlflow.tracking import MlflowClient
# リモートトラッキングサーバーの設定
mlflow.set_tracking_uri("https://mlflow.internal.company.com")
# 命名規則:team/project/experiment-type
# これによりフィルタリングとアクセス制御がスケールで可能になる
EXPERIMENT_NAME = "recommendation-team/product-ranking/hyperparameter-search"
mlflow.set_experiment(EXPERIMENT_NAME)
client = MlflowClient()
def train_model(config: dict):
"""本番グレードの実験トラッキングと適切なエラーハンドリング。"""
with mlflow.start_run(
run_name=f"xgb-{config['max_depth']}d-{config['learning_rate']}lr",
tags={
"team": "recommendation",
"project": "product-ranking",
"environment": "staging",
"git_commit": config.get("git_sha", "unknown"),
"data_version": config.get("data_version", "v1"),
},
) as run:
# 全ハイパーパラメータをロギング
mlflow.log_params({
"model_type": "xgboost",
"max_depth": config["max_depth"],
"learning_rate": config["learning_rate"],
"n_estimators": config["n_estimators"],
"subsample": config["subsample"],
"colsample_bytree": config["colsample_bytree"],
"eval_metric": "ndcg",
"training_data_path": config["data_path"],
"feature_count": config["feature_count"],
})
# データセット情報を入力として記録
dataset = mlflow.data.from_pandas(
config["train_df"],
source=config["data_path"],
name="product_ranking_train",
)
mlflow.log_input(dataset, context="training")
# モデル学習
model = train_xgboost(config)
# 各評価ポイントでメトリクスをロギング
for epoch, metrics in enumerate(model.eval_history):
mlflow.log_metrics({
"train_ndcg": metrics["train_ndcg"],
"val_ndcg": metrics["val_ndcg"],
"train_loss": metrics["train_loss"],
"val_loss": metrics["val_loss"],
}, step=epoch)
# 最終メトリクスのロギング
final_metrics = evaluate_model(model, config["test_data"])
mlflow.log_metrics({
"test_ndcg": final_metrics["ndcg"],
"test_precision_at_10": final_metrics["precision@10"],
"test_recall_at_50": final_metrics["recall@50"],
"test_mrr": final_metrics["mrr"],
"inference_latency_p99_ms": final_metrics["latency_p99"],
})
# シグネチャ付きでモデルをロギング
signature = mlflow.models.infer_signature(
config["sample_input"],
model.predict(config["sample_input"]),
)
mlflow.xgboost.log_model(
model,
artifact_path="model",
signature=signature,
registered_model_name="product-ranking-xgb",
)
# アーティファクトのロギング
mlflow.log_artifact("feature_importance.png")
mlflow.log_artifact("confusion_matrix.png")
return run.info.run_id
パフォーマンス向上のためのバッチメトリクスロギング
警告: 学習ステップごとにmlflow.log_metric()を呼び出すと、呼び出しごとに個別のHTTPリクエストが発生します。数千ステップのディープラーニング学習では、トラッキングサーバーが飽和します。
import mlflow
def log_metrics_batched(metrics_buffer: list, batch_size: int = 100):
"""HTTPオーバーヘッドを軽減するバッチメトリクスロギング。
個々のステップを毎回ロギングする代わりに、メトリクスを蓄積し
バッチでフラッシュします。長い学習実行に対して
トラッキングサーバーの負荷を50-100倍削減します。
"""
if len(metrics_buffer) >= batch_size:
with mlflow.start_run(run_id=current_run_id):
for step, metrics in metrics_buffer:
mlflow.log_metrics(metrics, step=step)
metrics_buffer.clear()
# 学習ループでの使用例
metrics_buffer = []
for step in range(100000):
loss = train_step()
metrics_buffer.append((step, {
"train_loss": loss,
"learning_rate": scheduler.get_last_lr()[0],
}))
# 毎ステップではなく100ステップごとにフラッシュ
log_metrics_batched(metrics_buffer, batch_size=100)
# 残りのメトリクスをフラッシュ
log_metrics_batched(metrics_buffer, batch_size=1)
モデルレジストリのライフサイクル管理
モデルエイリアスの理解(ステージ非推奨後)
MLflow 2.9以降、レガシーのステージベースワークフロー(Staging、Production、Archived)は非推奨となり、モデルエイリアスに置き換わりました。エイリアスは実世界のデプロイメントパターンにより柔軟に対応します。
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 新しいモデルバージョンの登録(log_modelで自動的に行われる)
# または明示的に:
result = client.create_model_version(
name="product-ranking-xgb",
source="s3://mlflow-artifacts-prod/3/abc123/artifacts/model",
run_id="abc123",
description="XGBoost v2:新ユーザー特徴量追加、NDCG@10が3.2%改善",
)
model_version = result.version
# デプロイメントワークフロー用エイリアスの設定
# Champion = 現在本番トラフィックを処理中
client.set_registered_model_alias(
name="product-ranking-xgb",
alias="champion",
version=model_version,
)
# Challenger = シャドーモードで検証中の候補
client.set_registered_model_alias(
name="product-ranking-xgb",
alias="challenger",
version=model_version + 1,
)
# サービングコードでエイリアスによるモデル読み込み
champion_model = mlflow.pyfunc.load_model("models:/product-ranking-xgb@champion")
challenger_model = mlflow.pyfunc.load_model("models:/product-ranking-xgb@challenger")
# 追加メタデータ用のタグ設定
client.set_model_version_tag(
name="product-ranking-xgb",
version=model_version,
key="validation_status",
value="passed",
)
client.set_model_version_tag(
name="product-ranking-xgb",
version=model_version,
key="approved_by",
value="ml-lead@company.com",
)
モデルプロモーションワークフロー
推奨される本番ワークフローは3つのエイリアスパターンを使用します:
- candidate - 新たに学習されたモデル、検証待ち
- challenger - 検証済みモデル、championと並行してシャドーモードで実行中
- champion - ライブの本番トラフィックを処理中
def promote_model(model_name: str, version: int, target_alias: str):
"""モデルバージョンをデプロイメントライフサイクルに沿ってプロモーション。
ワークフロー: candidate -> challenger -> champion
各プロモーションには検証ゲートの通過が必要:
- candidate -> challenger: 自動テストスイートの通過
- challenger -> champion: シャドーモードのメトリクスが許容範囲内
"""
client = MlflowClient()
# 現在のモデルバージョン情報を取得
mv = client.get_model_version(name=model_name, version=str(version))
# プロモーションが許可されているか検証
if target_alias == "challenger":
# 自動検証に合格している必要がある
tags = {t.key: t.value for t in mv.tags}
if tags.get("validation_status") != "passed":
raise ValueError(
f"モデルバージョン {version} は検証に合格していません。"
f"現在のステータス: {tags.get('validation_status', 'unknown')}"
)
elif target_alias == "champion":
# 現在challengerである必要がある
try:
current_challenger = client.get_model_version_by_alias(
name=model_name, alias="challenger"
)
if current_challenger.version != str(version):
raise ValueError(
f"バージョン {version} は現在のchallengerではありません。"
f"現在のchallengerはバージョン {current_challenger.version} です"
)
except mlflow.exceptions.MlflowException:
raise ValueError("challengerエイリアスが未設定です。先にシャドーモードを実行してください。")
# 古いchampionをアーカイブ
try:
old_champion = client.get_model_version_by_alias(
name=model_name, alias="champion"
)
client.set_model_version_tag(
name=model_name,
version=old_champion.version,
key="archived_at",
value=datetime.utcnow().isoformat(),
)
client.delete_registered_model_alias(
name=model_name, alias="champion"
)
except mlflow.exceptions.MlflowException:
pass # 既存のchampionなし
# 新しいエイリアスを設定
client.set_registered_model_alias(
name=model_name, alias=target_alias, version=version
)
# プロモーションイベントをタグに記録
client.set_model_version_tag(
name=model_name,
version=str(version),
key=f"promoted_to_{target_alias}_at",
value=datetime.utcnow().isoformat(),
)
print(f"モデル {model_name} v{version} を {target_alias} にプロモーション完了")
警告: モデルエイリアスの再割り当てはアトミックですが、複数のエイリアス間でトランザクショナルではありません。championとchallengerを同時にスワップする必要がある場合、両方が同じバージョンを指す短い時間窓が発生します。サービング層でこれを適切に処理するよう設計してください。
GitHub ActionsによるCI/CD統合
自動学習・検証パイプライン
# .github/workflows/ml-pipeline.yml
name: ML学習・モデル検証パイプライン
on:
push:
paths:
- 'ml/**'
- 'features/**'
branches: [main]
workflow_dispatch:
inputs:
experiment_name:
description: 'MLflow実験名'
required: true
default: 'recommendation-team/product-ranking/scheduled'
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
MLFLOW_TRACKING_USERNAME: ${{ secrets.MLFLOW_TRACKING_USERNAME }}
MLFLOW_TRACKING_PASSWORD: ${{ secrets.MLFLOW_TRACKING_PASSWORD }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
jobs:
train:
runs-on: [self-hosted, gpu]
outputs:
run_id: ${{ steps.train.outputs.run_id }}
model_version: ${{ steps.train.outputs.model_version }}
steps:
- uses: actions/checkout@v4
- name: Python環境セットアップ
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: 依存関係インストール
run: pip install -r requirements.txt
- name: 学習実行
id: train
run: |
python ml/train.py \
--experiment-name "${{ github.event.inputs.experiment_name || 'recommendation-team/product-ranking/ci' }}" \
--git-sha "${{ github.sha }}" \
--data-version "$(date +%Y%m%d)"
echo "run_id=$(cat /tmp/mlflow_run_id)" >> $GITHUB_OUTPUT
echo "model_version=$(cat /tmp/mlflow_model_version)" >> $GITHUB_OUTPUT
validate:
needs: train
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: 依存関係インストール
run: pip install -r requirements.txt
- name: モデル検証実行
run: |
python ml/validate.py \
--model-uri "models:/product-ranking-xgb/${{ needs.train.outputs.model_version }}" \
--min-ndcg 0.45 \
--max-latency-p99-ms 50 \
--min-data-coverage 0.95
- name: candidateエイリアス設定
if: success()
run: |
python -c "
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.set_registered_model_alias(
'product-ranking-xgb', 'candidate',
${{ needs.train.outputs.model_version }}
)
client.set_model_version_tag(
'product-ranking-xgb',
'${{ needs.train.outputs.model_version }}',
'validation_status', 'passed'
)
"
promote-to-challenger:
needs: [train, validate]
runs-on: ubuntu-latest
environment: staging
steps:
- uses: actions/checkout@v4
- name: 依存関係インストール
run: pip install mlflow
- name: challengerにプロモーション
run: |
python ml/promote.py \
--model-name "product-ranking-xgb" \
--version "${{ needs.train.outputs.model_version }}" \
--target-alias "challenger"
- name: シャドーモードにデプロイ
run: |
kubectl set image deployment/ranking-shadow \
model-server=ranking-server:v${{ needs.train.outputs.model_version }} \
--namespace ml-staging
自動Championプロモーション
# .github/workflows/promote-champion.yml
name: ChallengerをChampionにプロモーション
on:
workflow_dispatch:
inputs:
model_name:
description: '登録済みモデル名'
required: true
version:
description: 'プロモーション対象のモデルバージョン'
required: true
jobs:
promote:
runs-on: ubuntu-latest
environment: production # 手動承認が必要
steps:
- uses: actions/checkout@v4
- name: 依存関係インストール
run: pip install mlflow
- name: シャドーモードメトリクスの検証
run: |
python ml/verify_shadow_metrics.py \
--model-name "${{ github.event.inputs.model_name }}" \
--version "${{ github.event.inputs.version }}" \
--min-hours-in-shadow 24 \
--max-metric-degradation 0.02
- name: championにプロモーション
run: |
python ml/promote.py \
--model-name "${{ github.event.inputs.model_name }}" \
--version "${{ github.event.inputs.version }}" \
--target-alias "champion"
- name: 本番環境にローリングデプロイ
run: |
kubectl set image deployment/ranking-prod \
model-server=ranking-server:v${{ github.event.inputs.version }} \
--namespace ml-production
kubectl rollout status deployment/ranking-prod \
--namespace ml-production --timeout=600s
マルチチーム実験組織
アクセス制御とネームスペース戦略
"""
マルチチーム組織向けMLflow実験ネームスペース戦略。
規則:
{team}/{project}/{experiment-type}
例:
recommendation-team/product-ranking/hyperparameter-search
recommendation-team/product-ranking/feature-ablation
search-team/query-understanding/weekly-retrain
fraud-team/transaction-scoring/model-comparison
モデル命名規則:
{project}-{algorithm}
例:
product-ranking-xgb
query-understanding-bert
transaction-scoring-lgbm
"""
import mlflow
from mlflow.tracking import MlflowClient
from dataclasses import dataclass
@dataclass
class ExperimentConfig:
team: str
project: str
experiment_type: str
@property
def experiment_name(self) -> str:
return f"{self.team}/{self.project}/{self.experiment_type}"
@property
def model_name_prefix(self) -> str:
return self.project
def setup_experiment(config: ExperimentConfig) -> str:
"""適切なタグを付けて発見可能性を確保した実験の作成または取得。"""
client = MlflowClient()
experiment = client.get_experiment_by_name(config.experiment_name)
if experiment is None:
experiment_id = client.create_experiment(
name=config.experiment_name,
tags={
"team": config.team,
"project": config.project,
"type": config.experiment_type,
"owner": f"{config.team}-lead@company.com",
},
)
else:
experiment_id = experiment.experiment_id
mlflow.set_experiment(experiment_id=experiment_id)
return experiment_id
障害ケースと運用上の警告
本番環境でよくある障害
1. アーティファクトストアの権限
最も一般的な本番障害は、学習ジョブがMLflowサーバーとは異なるIAMロールで実行される場合のS3権限エラーです:
# 症状:学習は完了するがモデルが保存されない
# エラー:botocore.exceptions.ClientError: AccessDenied
# 修正:学習ジョブのIAMロールに以下の両方があることを確認:
# - アーティファクトバケットへの s3:PutObject
# - アーティファクトバケットへの s3:GetObject(モデル読み込み用)
# 権限の確認:
aws s3 cp test.txt s3://mlflow-artifacts-prod/test.txt
aws s3 ls s3://mlflow-artifacts-prod/
2. PostgreSQLコネクション枯渇
多数の同時学習ジョブを実行すると、各ジョブがデータベースコネクションを保持します。コネクションプーリングなしでは連鎖障害が発生します:
# MLflowとPostgreSQLの間にPgBouncerをデプロイ
# pgbouncer.ini
[databases]
mlflowdb = host=postgres port=5432 dbname=mlflowdb
[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
max_client_conn = 500
default_pool_size = 30
min_pool_size = 10
reserve_pool_size = 5
3. 大規模アーティファクトのアップロードタイムアウト
ディープラーニングモデル(数GB)はアップロード中にタイムアウトすることがあります。クライアントのタイムアウトを設定してください:
import os
# 大規模モデルアップロードのタイムアウトを延長(デフォルトは120秒)
os.environ["MLFLOW_HTTP_REQUEST_TIMEOUT"] = "600"
# 非常に大きなアーティファクトの場合、マルチパートアップロードを使用
os.environ["MLFLOW_MULTIPART_UPLOAD_CHUNK_SIZE"] = "104857600" # 100MBチャンク
4. メトリクスロギングの競合状態
複数のプロセスが同じrunにロギングする場合(例:分散学習)、メトリクスが順序どおりに到着しないことがあります:
# 悪い例:複数のワーカーが同じrunにロギング
# ステップの順序問題やメトリクスの上書きが発生する
# 良い例:分散学習にはchild runを使用
with mlflow.start_run(run_name="distributed-training") as parent_run:
for worker_id in range(num_workers):
with mlflow.start_run(
run_name=f"worker-{worker_id}",
nested=True,
) as child_run:
# 各ワーカーは自身のchild runにロギング
train_worker(worker_id, child_run.info.run_id)
# 親runにメトリクスを集約
aggregate_and_log_metrics(parent_run.info.run_id)
5. モデルレジストリの名前衝突
チームが誤って互いの登録モデルを上書きしてしまう問題:
# ラッパーで命名規則を強制
def register_model_safe(model_uri: str, name: str, team: str):
"""チームプレフィックス検証付きのモデル登録。"""
allowed_prefixes = {
"recommendation": ["product-ranking", "user-embedding"],
"search": ["query-understanding", "document-ranking"],
"fraud": ["transaction-scoring", "account-risk"],
}
valid = any(
name.startswith(prefix)
for prefix in allowed_prefixes.get(team, [])
)
if not valid:
raise ValueError(
f"チーム '{team}' はモデル '{name}' を登録できません。"
f"許可されたプレフィックス: {allowed_prefixes.get(team, [])}"
)
return mlflow.register_model(model_uri, name)
本番モニタリングとクリーンアップ
自動実験クリーンアップ
古い実験が蓄積されるとUIが遅くなります。定期的なクリーンアップをスケジュールしてください:
from mlflow.tracking import MlflowClient
from datetime import datetime, timedelta
def cleanup_old_runs(
experiment_name: str,
max_age_days: int = 90,
keep_top_n: int = 10,
dry_run: bool = True,
):
"""上位パフォーマーを保持しながら古い実験runをクリーンアップ。
警告:これはrunとそのアーティファクトを永久に削除します。
必ず最初にdry_run=Trueで実行してください。
"""
client = MlflowClient()
experiment = client.get_experiment_by_name(experiment_name)
if experiment is None:
print(f"実験 '{experiment_name}' が見つかりません")
return
cutoff = datetime.now() - timedelta(days=max_age_days)
cutoff_ms = int(cutoff.timestamp() * 1000)
# プライマリメトリクスでソートされた全runを取得
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=["metrics.val_ndcg DESC"],
)
# 年数に関係なくトップNのrunを保護
protected_run_ids = {r.info.run_id for r in runs[:keep_top_n]}
deleted_count = 0
for run in runs:
if run.info.run_id in protected_run_ids:
continue
if run.info.end_time and run.info.end_time < cutoff_ms:
if dry_run:
print(f"削除対象のrun {run.info.run_id} "
f"(終了時刻: {datetime.fromtimestamp(run.info.end_time/1000)})")
else:
client.delete_run(run.info.run_id)
deleted_count += 1
print(f"{'削除対象' if dry_run else '削除済み'}: {deleted_count} 件のrun")
まとめ
本番スケールのMLflowは、単にmlflow.log_metric()を呼び出すだけでは不十分です。主要な原則は以下のとおりです:
- コンピュートとストレージを分離する:メタデータにはPostgreSQL、アーティファクトにはS3を使用。コネクションプーリングにはPgBouncerをデプロイ。
- チームとプロジェクト単位で実験を構造化する:組織の成長に合わせてスケールする明確な命名規則を使用。
- ステージではなくエイリアスを使用する:champion/challenger/candidateパターンとモデルエイリアスにより、柔軟なデプロイメントワークフローを実現。
- CI/CDと統合する:環境ベースの承認フローを持つGitHub Actionsを通じて、検証ゲートとデプロイメントを自動化。
- 障害に備える:コネクション枯渇、権限エラー、分散学習での競合状態が最も一般的な本番問題。
- プロアクティブにクリーンアップする:古いrunが蓄積するとUI性能が低下。上位パフォーマンスモデルの保護付きで自動クリーンアップをスケジュール。
ステージからエイリアスへの移行、モデルシグネチャの採用、データセットトラッキング(mlflow.log_input()経由)の統合は、MLflowが本番グレードのMLOpsプラットフォームへと成熟していることを示しています。適切なインフラストラクチャのスケーリングとCI/CD統合を組み合わせることで、MLflowはエンタープライズスケールでのML実験とモデル管理の堅固な基盤を提供します。