Skip to content
Published on

MLflow本番運用ガイド:実験トラッキング、モデルレジストリ、スケーラブルMLOpsワークフロー

Authors
  • Name
    Twitter
MLflow本番運用ガイド

はじめに

ローカルでML実験を行うのは簡単です。しかし、複数チームにまたがって再現性・監査可能性・自動デプロイを確保しながらスケールさせることは、まったく別の課題です。MLflowは実験トラッキングとモデルライフサイクル管理のデファクトスタンダードとなるオープンソースプラットフォームですが、多くのチュートリアルはlocalhostでのmlflow.log_metric()で終わってしまいます。

本ガイドでは、本番グレードのMLflowワークフローを取り上げます。PostgreSQLとS3によるトラッキングサーバーのスケーリング、マルチチームコラボレーション向けの実験構造設計、エイリアスを活用したモデルレジストリのライフサイクル管理、GitHub ActionsによるCI/CDパイプライン統合、そしてスケール時にのみ顕在化する障害モードへの対処法を解説します。

実験トラッキングプラットフォーム比較

MLflowの詳細に入る前に、エコシステム内の他の実験トラッキングプラットフォームとの比較を把握しておきましょう。

機能MLflowWeights and BiasesNeptuneClearML
ライセンスApache 2.0 (OSS)プロプライエタリ (無料枠あり)プロプライエタリ (無料枠あり)Apache 2.0 (OSS)
セルフホスト完全対応制限あり制限あり完全対応
実験トラッキング強力優秀優秀強力
モデルレジストリ内蔵内蔵メタデータのみ内蔵
ハイパーパラメータ探索手動 / Optuna内蔵 (Sweeps)連携経由内蔵 (HPO)
アーティファクトストレージS3/GCS/Azure/HDFSW and BサーバーNeptuneサーバーS3/GCS/Azure
UIの品質良好優秀優秀良好
フレームワーク統合主要フレームワーク全対応主要フレームワーク全対応主要フレームワーク全対応主要フレームワーク全対応
料金 (チーム)無料 (セルフホスト)約$50/ユーザー/月約$79/ユーザー/月無料 (セルフホスト)
CI/CD統合任意 (オープンAPI)GitHub/GitLabGitHub/GitLabGitHub/GitLab
データガバナンス完全制御 (自社管理)ベンダー管理ベンダー管理完全制御 (自社管理)

MLflowはセルフホスティングの柔軟性とベンダー独立性で優位です。Weights and Biasesは可視化とコラボレーションUXに優れています。Neptuneはメタデータクエリが高度です。ClearMLは最も包括的なオープンソースパイプライン管理を提供します。チームの主要な制約(予算、ガバナンス、UIの洗練さ)に基づいて選択してください。

MLflowトラッキングサーバーのスケーリング

アーキテクチャ概要

本番環境のMLflowデプロイでは、3つの関心事を分離します:

  1. トラッキングサーバー - APIとUIのプロセス
  2. バックエンドストア - 実験メタデータ、パラメータ、メトリクス、タグ用のPostgreSQL
  3. アーティファクトストア - モデルファイル、プロット、大規模バイナリアーティファクト用のS3(またはS3互換)

PostgreSQLバックエンドとS3アーティファクトストア

# docker-compose.production.yml
services:
  mlflow-server:
    image: ghcr.io/mlflow/mlflow:v2.20.0
    ports:
      - '5000:5000'
    environment:
      MLFLOW_BACKEND_STORE_URI: 'postgresql://mlflow:${DB_PASSWORD}@postgres:5432/mlflowdb'
      MLFLOW_DEFAULT_ARTIFACT_ROOT: 's3://mlflow-artifacts-prod/'
      AWS_ACCESS_KEY_ID: '${AWS_ACCESS_KEY_ID}'
      AWS_SECRET_ACCESS_KEY: '${AWS_SECRET_ACCESS_KEY}'
      AWS_DEFAULT_REGION: 'ap-northeast-1'
    command: >
      mlflow server
      --backend-store-uri postgresql://mlflow:${DB_PASSWORD}@postgres:5432/mlflowdb
      --default-artifact-root s3://mlflow-artifacts-prod/
      --host 0.0.0.0
      --port 5000
      --workers 4
      --app-name basic-auth
    depends_on:
      postgres:
        condition: service_healthy
    restart: unless-stopped

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: mlflow
      POSTGRES_PASSWORD: '${DB_PASSWORD}'
      POSTGRES_DB: mlflowdb
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ['CMD-SHELL', 'pg_isready -U mlflow -d mlflowdb']
      interval: 10s
      timeout: 5s
      retries: 5
    restart: unless-stopped

  nginx:
    image: nginx:1.27-alpine
    ports:
      - '443:443'
      - '80:80'
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./certs:/etc/nginx/certs:ro
    depends_on:
      - mlflow-server
    restart: unless-stopped

volumes:
  pgdata:

警告: 認証なしでMLflowサーバーをインターネットに直接公開しないでください。--app-name basic-authフラグで組み込みのHTTPベーシック認証を有効にします。本番環境では必ずTLS対応のNginxリバースプロキシをサーバーの前段に配置してください。

MLflowワークロード向けPostgreSQLチューニング

MLflowのトラッキングワークロードは、学習中はライトヘビー(頻繁なメトリクスロギング)、分析時はリードヘビー(UIクエリ)です。それに応じてPostgreSQLをチューニングします:

# postgresql.conf - MLflowワークロード向け調整
# PostgreSQL専用RAM 8GBを想定

shared_buffers = 2GB
effective_cache_size = 6GB
work_mem = 64MB
maintenance_work_mem = 512MB

# ライトヘビー最適化
wal_buffers = 64MB
checkpoint_completion_target = 0.9
max_wal_size = 4GB

# コネクションプーリング(同時学習ジョブ50以上の場合はPgBouncerを使用)
max_connections = 200

運用上の警告: 毎ステップ頻繁にメトリクスをロギングする同時学習ジョブが50以上実行される場合、コネクションプールが枯渇します。MLflowとPostgreSQLの間にトランザクションモードのPgBouncerをデプロイしてください。これがないと、ピーク負荷時にconnection refusedエラーで学習ジョブが失敗します。

実験トラッキングのベストプラクティス

チーム向け実験の構造化

import mlflow
from mlflow.tracking import MlflowClient

# リモートトラッキングサーバーの設定
mlflow.set_tracking_uri("https://mlflow.internal.company.com")

# 命名規則:team/project/experiment-type
# これによりフィルタリングとアクセス制御がスケールで可能になる
EXPERIMENT_NAME = "recommendation-team/product-ranking/hyperparameter-search"

mlflow.set_experiment(EXPERIMENT_NAME)

client = MlflowClient()

def train_model(config: dict):
    """本番グレードの実験トラッキングと適切なエラーハンドリング。"""
    with mlflow.start_run(
        run_name=f"xgb-{config['max_depth']}d-{config['learning_rate']}lr",
        tags={
            "team": "recommendation",
            "project": "product-ranking",
            "environment": "staging",
            "git_commit": config.get("git_sha", "unknown"),
            "data_version": config.get("data_version", "v1"),
        },
    ) as run:
        # 全ハイパーパラメータをロギング
        mlflow.log_params({
            "model_type": "xgboost",
            "max_depth": config["max_depth"],
            "learning_rate": config["learning_rate"],
            "n_estimators": config["n_estimators"],
            "subsample": config["subsample"],
            "colsample_bytree": config["colsample_bytree"],
            "eval_metric": "ndcg",
            "training_data_path": config["data_path"],
            "feature_count": config["feature_count"],
        })

        # データセット情報を入力として記録
        dataset = mlflow.data.from_pandas(
            config["train_df"],
            source=config["data_path"],
            name="product_ranking_train",
        )
        mlflow.log_input(dataset, context="training")

        # モデル学習
        model = train_xgboost(config)

        # 各評価ポイントでメトリクスをロギング
        for epoch, metrics in enumerate(model.eval_history):
            mlflow.log_metrics({
                "train_ndcg": metrics["train_ndcg"],
                "val_ndcg": metrics["val_ndcg"],
                "train_loss": metrics["train_loss"],
                "val_loss": metrics["val_loss"],
            }, step=epoch)

        # 最終メトリクスのロギング
        final_metrics = evaluate_model(model, config["test_data"])
        mlflow.log_metrics({
            "test_ndcg": final_metrics["ndcg"],
            "test_precision_at_10": final_metrics["precision@10"],
            "test_recall_at_50": final_metrics["recall@50"],
            "test_mrr": final_metrics["mrr"],
            "inference_latency_p99_ms": final_metrics["latency_p99"],
        })

        # シグネチャ付きでモデルをロギング
        signature = mlflow.models.infer_signature(
            config["sample_input"],
            model.predict(config["sample_input"]),
        )
        mlflow.xgboost.log_model(
            model,
            artifact_path="model",
            signature=signature,
            registered_model_name="product-ranking-xgb",
        )

        # アーティファクトのロギング
        mlflow.log_artifact("feature_importance.png")
        mlflow.log_artifact("confusion_matrix.png")

        return run.info.run_id

パフォーマンス向上のためのバッチメトリクスロギング

警告: 学習ステップごとにmlflow.log_metric()を呼び出すと、呼び出しごとに個別のHTTPリクエストが発生します。数千ステップのディープラーニング学習では、トラッキングサーバーが飽和します。

import mlflow

def log_metrics_batched(metrics_buffer: list, batch_size: int = 100):
    """HTTPオーバーヘッドを軽減するバッチメトリクスロギング。

    個々のステップを毎回ロギングする代わりに、メトリクスを蓄積し
    バッチでフラッシュします。長い学習実行に対して
    トラッキングサーバーの負荷を50-100倍削減します。
    """
    if len(metrics_buffer) >= batch_size:
        with mlflow.start_run(run_id=current_run_id):
            for step, metrics in metrics_buffer:
                mlflow.log_metrics(metrics, step=step)
        metrics_buffer.clear()


# 学習ループでの使用例
metrics_buffer = []

for step in range(100000):
    loss = train_step()

    metrics_buffer.append((step, {
        "train_loss": loss,
        "learning_rate": scheduler.get_last_lr()[0],
    }))

    # 毎ステップではなく100ステップごとにフラッシュ
    log_metrics_batched(metrics_buffer, batch_size=100)

# 残りのメトリクスをフラッシュ
log_metrics_batched(metrics_buffer, batch_size=1)

モデルレジストリのライフサイクル管理

モデルエイリアスの理解(ステージ非推奨後)

MLflow 2.9以降、レガシーのステージベースワークフロー(Staging、Production、Archived)は非推奨となり、モデルエイリアスに置き換わりました。エイリアスは実世界のデプロイメントパターンにより柔軟に対応します。

from mlflow.tracking import MlflowClient

client = MlflowClient()

# 新しいモデルバージョンの登録(log_modelで自動的に行われる)
# または明示的に:
result = client.create_model_version(
    name="product-ranking-xgb",
    source="s3://mlflow-artifacts-prod/3/abc123/artifacts/model",
    run_id="abc123",
    description="XGBoost v2:新ユーザー特徴量追加、NDCG@10が3.2%改善",
)
model_version = result.version

# デプロイメントワークフロー用エイリアスの設定
# Champion = 現在本番トラフィックを処理中
client.set_registered_model_alias(
    name="product-ranking-xgb",
    alias="champion",
    version=model_version,
)

# Challenger = シャドーモードで検証中の候補
client.set_registered_model_alias(
    name="product-ranking-xgb",
    alias="challenger",
    version=model_version + 1,
)

# サービングコードでエイリアスによるモデル読み込み
champion_model = mlflow.pyfunc.load_model("models:/product-ranking-xgb@champion")
challenger_model = mlflow.pyfunc.load_model("models:/product-ranking-xgb@challenger")

# 追加メタデータ用のタグ設定
client.set_model_version_tag(
    name="product-ranking-xgb",
    version=model_version,
    key="validation_status",
    value="passed",
)

client.set_model_version_tag(
    name="product-ranking-xgb",
    version=model_version,
    key="approved_by",
    value="ml-lead@company.com",
)

モデルプロモーションワークフロー

推奨される本番ワークフローは3つのエイリアスパターンを使用します:

  1. candidate - 新たに学習されたモデル、検証待ち
  2. challenger - 検証済みモデル、championと並行してシャドーモードで実行中
  3. champion - ライブの本番トラフィックを処理中
def promote_model(model_name: str, version: int, target_alias: str):
    """モデルバージョンをデプロイメントライフサイクルに沿ってプロモーション。

    ワークフロー: candidate -> challenger -> champion

    各プロモーションには検証ゲートの通過が必要:
    - candidate -> challenger: 自動テストスイートの通過
    - challenger -> champion: シャドーモードのメトリクスが許容範囲内
    """
    client = MlflowClient()

    # 現在のモデルバージョン情報を取得
    mv = client.get_model_version(name=model_name, version=str(version))

    # プロモーションが許可されているか検証
    if target_alias == "challenger":
        # 自動検証に合格している必要がある
        tags = {t.key: t.value for t in mv.tags}
        if tags.get("validation_status") != "passed":
            raise ValueError(
                f"モデルバージョン {version} は検証に合格していません。"
                f"現在のステータス: {tags.get('validation_status', 'unknown')}"
            )

    elif target_alias == "champion":
        # 現在challengerである必要がある
        try:
            current_challenger = client.get_model_version_by_alias(
                name=model_name, alias="challenger"
            )
            if current_challenger.version != str(version):
                raise ValueError(
                    f"バージョン {version} は現在のchallengerではありません。"
                    f"現在のchallengerはバージョン {current_challenger.version} です"
                )
        except mlflow.exceptions.MlflowException:
            raise ValueError("challengerエイリアスが未設定です。先にシャドーモードを実行してください。")

        # 古いchampionをアーカイブ
        try:
            old_champion = client.get_model_version_by_alias(
                name=model_name, alias="champion"
            )
            client.set_model_version_tag(
                name=model_name,
                version=old_champion.version,
                key="archived_at",
                value=datetime.utcnow().isoformat(),
            )
            client.delete_registered_model_alias(
                name=model_name, alias="champion"
            )
        except mlflow.exceptions.MlflowException:
            pass  # 既存のchampionなし

    # 新しいエイリアスを設定
    client.set_registered_model_alias(
        name=model_name, alias=target_alias, version=version
    )

    # プロモーションイベントをタグに記録
    client.set_model_version_tag(
        name=model_name,
        version=str(version),
        key=f"promoted_to_{target_alias}_at",
        value=datetime.utcnow().isoformat(),
    )

    print(f"モデル {model_name} v{version}{target_alias} にプロモーション完了")

警告: モデルエイリアスの再割り当てはアトミックですが、複数のエイリアス間でトランザクショナルではありません。championとchallengerを同時にスワップする必要がある場合、両方が同じバージョンを指す短い時間窓が発生します。サービング層でこれを適切に処理するよう設計してください。

GitHub ActionsによるCI/CD統合

自動学習・検証パイプライン

# .github/workflows/ml-pipeline.yml
name: ML学習・モデル検証パイプライン

on:
  push:
    paths:
      - 'ml/**'
      - 'features/**'
    branches: [main]
  workflow_dispatch:
    inputs:
      experiment_name:
        description: 'MLflow実験名'
        required: true
        default: 'recommendation-team/product-ranking/scheduled'

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  MLFLOW_TRACKING_USERNAME: ${{ secrets.MLFLOW_TRACKING_USERNAME }}
  MLFLOW_TRACKING_PASSWORD: ${{ secrets.MLFLOW_TRACKING_PASSWORD }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:
  train:
    runs-on: [self-hosted, gpu]
    outputs:
      run_id: ${{ steps.train.outputs.run_id }}
      model_version: ${{ steps.train.outputs.model_version }}
    steps:
      - uses: actions/checkout@v4

      - name: Python環境セットアップ
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: 依存関係インストール
        run: pip install -r requirements.txt

      - name: 学習実行
        id: train
        run: |
          python ml/train.py \
            --experiment-name "${{ github.event.inputs.experiment_name || 'recommendation-team/product-ranking/ci' }}" \
            --git-sha "${{ github.sha }}" \
            --data-version "$(date +%Y%m%d)"
          echo "run_id=$(cat /tmp/mlflow_run_id)" >> $GITHUB_OUTPUT
          echo "model_version=$(cat /tmp/mlflow_model_version)" >> $GITHUB_OUTPUT

  validate:
    needs: train
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4

      - name: 依存関係インストール
        run: pip install -r requirements.txt

      - name: モデル検証実行
        run: |
          python ml/validate.py \
            --model-uri "models:/product-ranking-xgb/${{ needs.train.outputs.model_version }}" \
            --min-ndcg 0.45 \
            --max-latency-p99-ms 50 \
            --min-data-coverage 0.95

      - name: candidateエイリアス設定
        if: success()
        run: |
          python -c "
          from mlflow.tracking import MlflowClient
          client = MlflowClient()
          client.set_registered_model_alias(
              'product-ranking-xgb', 'candidate',
              ${{ needs.train.outputs.model_version }}
          )
          client.set_model_version_tag(
              'product-ranking-xgb',
              '${{ needs.train.outputs.model_version }}',
              'validation_status', 'passed'
          )
          "

  promote-to-challenger:
    needs: [train, validate]
    runs-on: ubuntu-latest
    environment: staging
    steps:
      - uses: actions/checkout@v4

      - name: 依存関係インストール
        run: pip install mlflow

      - name: challengerにプロモーション
        run: |
          python ml/promote.py \
            --model-name "product-ranking-xgb" \
            --version "${{ needs.train.outputs.model_version }}" \
            --target-alias "challenger"

      - name: シャドーモードにデプロイ
        run: |
          kubectl set image deployment/ranking-shadow \
            model-server=ranking-server:v${{ needs.train.outputs.model_version }} \
            --namespace ml-staging

自動Championプロモーション

# .github/workflows/promote-champion.yml
name: ChallengerをChampionにプロモーション

on:
  workflow_dispatch:
    inputs:
      model_name:
        description: '登録済みモデル名'
        required: true
      version:
        description: 'プロモーション対象のモデルバージョン'
        required: true

jobs:
  promote:
    runs-on: ubuntu-latest
    environment: production # 手動承認が必要
    steps:
      - uses: actions/checkout@v4

      - name: 依存関係インストール
        run: pip install mlflow

      - name: シャドーモードメトリクスの検証
        run: |
          python ml/verify_shadow_metrics.py \
            --model-name "${{ github.event.inputs.model_name }}" \
            --version "${{ github.event.inputs.version }}" \
            --min-hours-in-shadow 24 \
            --max-metric-degradation 0.02

      - name: championにプロモーション
        run: |
          python ml/promote.py \
            --model-name "${{ github.event.inputs.model_name }}" \
            --version "${{ github.event.inputs.version }}" \
            --target-alias "champion"

      - name: 本番環境にローリングデプロイ
        run: |
          kubectl set image deployment/ranking-prod \
            model-server=ranking-server:v${{ github.event.inputs.version }} \
            --namespace ml-production
          kubectl rollout status deployment/ranking-prod \
            --namespace ml-production --timeout=600s

マルチチーム実験組織

アクセス制御とネームスペース戦略

"""
マルチチーム組織向けMLflow実験ネームスペース戦略。

規則:
  {team}/{project}/{experiment-type}

例:
  recommendation-team/product-ranking/hyperparameter-search
  recommendation-team/product-ranking/feature-ablation
  search-team/query-understanding/weekly-retrain
  fraud-team/transaction-scoring/model-comparison

モデル命名規則:
  {project}-{algorithm}

例:
  product-ranking-xgb
  query-understanding-bert
  transaction-scoring-lgbm
"""

import mlflow
from mlflow.tracking import MlflowClient
from dataclasses import dataclass


@dataclass
class ExperimentConfig:
    team: str
    project: str
    experiment_type: str

    @property
    def experiment_name(self) -> str:
        return f"{self.team}/{self.project}/{self.experiment_type}"

    @property
    def model_name_prefix(self) -> str:
        return self.project


def setup_experiment(config: ExperimentConfig) -> str:
    """適切なタグを付けて発見可能性を確保した実験の作成または取得。"""
    client = MlflowClient()

    experiment = client.get_experiment_by_name(config.experiment_name)

    if experiment is None:
        experiment_id = client.create_experiment(
            name=config.experiment_name,
            tags={
                "team": config.team,
                "project": config.project,
                "type": config.experiment_type,
                "owner": f"{config.team}-lead@company.com",
            },
        )
    else:
        experiment_id = experiment.experiment_id

    mlflow.set_experiment(experiment_id=experiment_id)
    return experiment_id

障害ケースと運用上の警告

本番環境でよくある障害

1. アーティファクトストアの権限

最も一般的な本番障害は、学習ジョブがMLflowサーバーとは異なるIAMロールで実行される場合のS3権限エラーです:

# 症状:学習は完了するがモデルが保存されない
# エラー:botocore.exceptions.ClientError: AccessDenied

# 修正:学習ジョブのIAMロールに以下の両方があることを確認:
# - アーティファクトバケットへの s3:PutObject
# - アーティファクトバケットへの s3:GetObject(モデル読み込み用)

# 権限の確認:
aws s3 cp test.txt s3://mlflow-artifacts-prod/test.txt
aws s3 ls s3://mlflow-artifacts-prod/

2. PostgreSQLコネクション枯渇

多数の同時学習ジョブを実行すると、各ジョブがデータベースコネクションを保持します。コネクションプーリングなしでは連鎖障害が発生します:

# MLflowとPostgreSQLの間にPgBouncerをデプロイ
# pgbouncer.ini
[databases]
mlflowdb = host=postgres port=5432 dbname=mlflowdb

[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
max_client_conn = 500
default_pool_size = 30
min_pool_size = 10
reserve_pool_size = 5

3. 大規模アーティファクトのアップロードタイムアウト

ディープラーニングモデル(数GB)はアップロード中にタイムアウトすることがあります。クライアントのタイムアウトを設定してください:

import os

# 大規模モデルアップロードのタイムアウトを延長(デフォルトは120秒)
os.environ["MLFLOW_HTTP_REQUEST_TIMEOUT"] = "600"

# 非常に大きなアーティファクトの場合、マルチパートアップロードを使用
os.environ["MLFLOW_MULTIPART_UPLOAD_CHUNK_SIZE"] = "104857600"  # 100MBチャンク

4. メトリクスロギングの競合状態

複数のプロセスが同じrunにロギングする場合(例:分散学習)、メトリクスが順序どおりに到着しないことがあります:

# 悪い例:複数のワーカーが同じrunにロギング
# ステップの順序問題やメトリクスの上書きが発生する

# 良い例:分散学習にはchild runを使用
with mlflow.start_run(run_name="distributed-training") as parent_run:
    for worker_id in range(num_workers):
        with mlflow.start_run(
            run_name=f"worker-{worker_id}",
            nested=True,
        ) as child_run:
            # 各ワーカーは自身のchild runにロギング
            train_worker(worker_id, child_run.info.run_id)

    # 親runにメトリクスを集約
    aggregate_and_log_metrics(parent_run.info.run_id)

5. モデルレジストリの名前衝突

チームが誤って互いの登録モデルを上書きしてしまう問題:

# ラッパーで命名規則を強制
def register_model_safe(model_uri: str, name: str, team: str):
    """チームプレフィックス検証付きのモデル登録。"""
    allowed_prefixes = {
        "recommendation": ["product-ranking", "user-embedding"],
        "search": ["query-understanding", "document-ranking"],
        "fraud": ["transaction-scoring", "account-risk"],
    }

    valid = any(
        name.startswith(prefix)
        for prefix in allowed_prefixes.get(team, [])
    )

    if not valid:
        raise ValueError(
            f"チーム '{team}' はモデル '{name}' を登録できません。"
            f"許可されたプレフィックス: {allowed_prefixes.get(team, [])}"
        )

    return mlflow.register_model(model_uri, name)

本番モニタリングとクリーンアップ

自動実験クリーンアップ

古い実験が蓄積されるとUIが遅くなります。定期的なクリーンアップをスケジュールしてください:

from mlflow.tracking import MlflowClient
from datetime import datetime, timedelta

def cleanup_old_runs(
    experiment_name: str,
    max_age_days: int = 90,
    keep_top_n: int = 10,
    dry_run: bool = True,
):
    """上位パフォーマーを保持しながら古い実験runをクリーンアップ。

    警告:これはrunとそのアーティファクトを永久に削除します。
    必ず最初にdry_run=Trueで実行してください。
    """
    client = MlflowClient()
    experiment = client.get_experiment_by_name(experiment_name)

    if experiment is None:
        print(f"実験 '{experiment_name}' が見つかりません")
        return

    cutoff = datetime.now() - timedelta(days=max_age_days)
    cutoff_ms = int(cutoff.timestamp() * 1000)

    # プライマリメトリクスでソートされた全runを取得
    runs = client.search_runs(
        experiment_ids=[experiment.experiment_id],
        order_by=["metrics.val_ndcg DESC"],
    )

    # 年数に関係なくトップNのrunを保護
    protected_run_ids = {r.info.run_id for r in runs[:keep_top_n]}

    deleted_count = 0
    for run in runs:
        if run.info.run_id in protected_run_ids:
            continue
        if run.info.end_time and run.info.end_time < cutoff_ms:
            if dry_run:
                print(f"削除対象のrun {run.info.run_id} "
                      f"(終了時刻: {datetime.fromtimestamp(run.info.end_time/1000)})")
            else:
                client.delete_run(run.info.run_id)
            deleted_count += 1

    print(f"{'削除対象' if dry_run else '削除済み'}: {deleted_count} 件のrun")

まとめ

本番スケールのMLflowは、単にmlflow.log_metric()を呼び出すだけでは不十分です。主要な原則は以下のとおりです:

  1. コンピュートとストレージを分離する:メタデータにはPostgreSQL、アーティファクトにはS3を使用。コネクションプーリングにはPgBouncerをデプロイ。
  2. チームとプロジェクト単位で実験を構造化する:組織の成長に合わせてスケールする明確な命名規則を使用。
  3. ステージではなくエイリアスを使用する:champion/challenger/candidateパターンとモデルエイリアスにより、柔軟なデプロイメントワークフローを実現。
  4. CI/CDと統合する:環境ベースの承認フローを持つGitHub Actionsを通じて、検証ゲートとデプロイメントを自動化。
  5. 障害に備える:コネクション枯渇、権限エラー、分散学習での競合状態が最も一般的な本番問題。
  6. プロアクティブにクリーンアップする:古いrunが蓄積するとUI性能が低下。上位パフォーマンスモデルの保護付きで自動クリーンアップをスケジュール。

ステージからエイリアスへの移行、モデルシグネチャの採用、データセットトラッキング(mlflow.log_input()経由)の統合は、MLflowが本番グレードのMLOpsプラットフォームへと成熟していることを示しています。適切なインフラストラクチャのスケーリングとCI/CD統合を組み合わせることで、MLflowはエンタープライズスケールでのML実験とモデル管理の堅固な基盤を提供します。

参考文献