Skip to content

Split View: MLflow 실험 관리 완벽 가이드: 실험 추적·모델 레지스트리·배포 파이프라인 구축

✨ Learn with Quiz
|

MLflow 실험 관리 완벽 가이드: 실험 추적·모델 레지스트리·배포 파이프라인 구축

MLflow 실험 관리 완벽 가이드

들어가며

머신러닝 프로젝트가 규모를 갖추면 가장 먼저 부딪히는 문제는 실험 관리다. 수십 번의 하이퍼파라미터 튜닝, 다양한 피처 조합, 여러 알고리즘 비교 실험을 스프레드시트나 노트로 관리하는 것은 한계가 있다. 실험 결과를 재현할 수 없거나, 어떤 모델이 프로덕션에 배포되어 있는지 추적할 수 없는 상황이 빈번하게 발생한다.

MLflow는 이러한 문제를 해결하기 위해 Databricks에서 시작한 오픈소스 MLOps 플랫폼이다. Tracking, Model Registry, Model Serving이라는 세 가지 핵심 컴포넌트를 통해 ML 라이프사이클 전반을 관리한다. 이 글에서는 MLflow의 아키텍처부터 실전 배포까지, 프로덕션 환경에서 MLflow를 효과적으로 운영하는 방법을 다룬다.

MLflow 아키텍처

핵심 컴포넌트 구조

MLflow는 크게 네 가지 컴포넌트로 구성된다.

컴포넌트역할저장소
Tracking Server실험 파라미터·메트릭·아티팩트 기록Backend Store + Artifact Store
Model Registry모델 버전 관리·스테이지 전환Backend Store
Model ServingREST API로 모델 배포컨테이너/클라우드
Projects재현 가능한 실험 패키징Git 또는 로컬

Tracking Server 배포 아키텍처

프로덕션 환경에서는 원격 Tracking Server를 구성해야 한다. Backend Store로 PostgreSQL, Artifact Store로 S3를 사용하는 것이 일반적이다.

# tracking_server_config.py
"""
MLflow Tracking Server 프로덕션 설정
Backend Store: PostgreSQL
Artifact Store: S3
"""

import os

TRACKING_CONFIG = {
    "backend_store_uri": "postgresql://mlflow:password@db-host:5432/mlflow",
    "default_artifact_root": "s3://mlflow-artifacts/experiments",
    "host": "0.0.0.0",
    "port": 5000,
    "workers": 4,
}
# MLflow Tracking Server 실행
mlflow server \
  --backend-store-uri postgresql://mlflow:password@db-host:5432/mlflow \
  --default-artifact-root s3://mlflow-artifacts/experiments \
  --host 0.0.0.0 \
  --port 5000 \
  --workers 4

# Docker Compose로 실행
docker compose up -d mlflow-server
# docker-compose.yaml
version: '3.8'
services:
  mlflow-db:
    image: postgres:15
    environment:
      POSTGRES_DB: mlflow
      POSTGRES_USER: mlflow
      POSTGRES_PASSWORD: mlflow_password
    volumes:
      - pgdata:/var/lib/postgresql/data
    ports:
      - '5432:5432'

  mlflow-server:
    build: ./mlflow
    depends_on:
      - mlflow-db
    environment:
      MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
      MLFLOW_DEFAULT_ARTIFACT_ROOT: s3://mlflow-artifacts/experiments
      AWS_ACCESS_KEY_ID: your-access-key
      AWS_SECRET_ACCESS_KEY: your-secret-key
    ports:
      - '5000:5000'
    command: >
      mlflow server
      --backend-store-uri postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
      --default-artifact-root s3://mlflow-artifacts/experiments
      --host 0.0.0.0
      --port 5000
      --workers 4

volumes:
  pgdata:

실험 추적 (Experiment Tracking)

기본 실험 로깅

MLflow의 실험 추적은 Run 단위로 이루어진다. 각 Run에는 파라미터, 메트릭, 아티팩트를 기록할 수 있다.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.datasets import load_iris

# Tracking Server 연결
mlflow.set_tracking_uri("http://mlflow-server:5000")

# 실험 생성 또는 기존 실험 선택
mlflow.set_experiment("iris-classification")

# 데이터 준비
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

# 실험 실행
with mlflow.start_run(run_name="rf-baseline-v1") as run:
    # 하이퍼파라미터 로깅
    params = {
        "n_estimators": 100,
        "max_depth": 5,
        "min_samples_split": 2,
        "random_state": 42,
    }
    mlflow.log_params(params)

    # 모델 학습
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # 예측 및 메트릭 계산
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_macro": f1_score(y_test, y_pred, average="macro"),
        "precision_macro": precision_score(y_test, y_pred, average="macro"),
        "recall_macro": recall_score(y_test, y_pred, average="macro"),
    }
    mlflow.log_metrics(metrics)

    # 모델 아티팩트 로깅
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="iris-classifier",
    )

    # 추가 아티팩트 로깅 (예: confusion matrix 이미지)
    import matplotlib.pyplot as plt
    from sklearn.metrics import ConfusionMatrixDisplay

    fig, ax = plt.subplots(figsize=(8, 6))
    ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
    fig.savefig("/tmp/confusion_matrix.png")
    mlflow.log_artifact("/tmp/confusion_matrix.png", "plots")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

자동 로깅 (Autologging)

MLflow는 scikit-learn, PyTorch, TensorFlow, XGBoost 등 주요 프레임워크에 대한 자동 로깅을 지원한다. 한 줄의 코드로 파라미터, 메트릭, 모델을 자동으로 기록할 수 있다.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score

# 자동 로깅 활성화
mlflow.sklearn.autolog(
    log_input_examples=True,      # 입력 데이터 예시 저장
    log_model_signatures=True,     # 모델 시그니처 자동 감지
    log_models=True,               # 모델 아티팩트 자동 저장
    log_datasets=True,             # 학습 데이터셋 정보 저장
    silent=False,                  # 로깅 메시지 표시
)

mlflow.set_experiment("iris-autolog-experiment")

with mlflow.start_run(run_name="gbc-autolog"):
    model = GradientBoostingClassifier(
        n_estimators=200,
        max_depth=3,
        learning_rate=0.1,
        random_state=42,
    )
    # autolog이 fit 호출 시 자동으로 파라미터/메트릭/모델을 기록
    model.fit(X_train, y_train)

    # cross-validation 점수도 자동 기록됨
    cv_scores = cross_val_score(model, X_train, y_train, cv=5)
    mlflow.log_metric("cv_mean_accuracy", cv_scores.mean())
    mlflow.log_metric("cv_std_accuracy", cv_scores.std())

PyTorch 딥러닝 실험 추적

import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

mlflow.set_experiment("pytorch-classification")

class SimpleNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# 학습 설정
config = {
    "input_dim": 4,
    "hidden_dim": 64,
    "output_dim": 3,
    "learning_rate": 0.001,
    "epochs": 50,
    "batch_size": 16,
}

with mlflow.start_run(run_name="pytorch-simplenet"):
    mlflow.log_params(config)

    model = SimpleNet(config["input_dim"], config["hidden_dim"], config["output_dim"])
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])

    X_tensor = torch.FloatTensor(X_train)
    y_tensor = torch.LongTensor(y_train)
    dataset = TensorDataset(X_tensor, y_tensor)
    dataloader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True)

    for epoch in range(config["epochs"]):
        model.train()
        total_loss = 0
        for batch_X, batch_y in dataloader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        # 에폭별 메트릭 로깅
        mlflow.log_metric("train_loss", avg_loss, step=epoch)

        # 검증
        model.eval()
        with torch.no_grad():
            X_test_tensor = torch.FloatTensor(X_test)
            test_outputs = model(X_test_tensor)
            _, predicted = torch.max(test_outputs, 1)
            val_acc = (predicted.numpy() == y_test).mean()
            mlflow.log_metric("val_accuracy", val_acc, step=epoch)

    # 모델 저장
    mlflow.pytorch.log_model(model, "pytorch-model")

MLflow Search API

실험 결과를 프로그래밍 방식으로 검색하고 비교할 수 있다.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient(tracking_uri="http://mlflow-server:5000")

# 특정 실험의 모든 Run 조회
experiment = client.get_experiment_by_name("iris-classification")
runs = client.search_runs(
    experiment_ids=[experiment.experiment_id],
    filter_string="metrics.accuracy > 0.9 AND params.n_estimators = '100'",
    order_by=["metrics.f1_macro DESC"],
    max_results=10,
)

# 결과 출력
for run in runs:
    print(f"Run ID: {run.info.run_id}")
    print(f"  Accuracy: {run.data.metrics.get('accuracy', 'N/A')}")
    print(f"  F1 Score: {run.data.metrics.get('f1_macro', 'N/A')}")
    print(f"  Params: {run.data.params}")
    print("---")

# 두 Run 비교
run1 = runs[0]
run2 = runs[1] if len(runs) > 1 else None

if run2:
    print("=== Run Comparison ===")
    for metric_key in run1.data.metrics:
        v1 = run1.data.metrics[metric_key]
        v2 = run2.data.metrics.get(metric_key, "N/A")
        print(f"  {metric_key}: {v1} vs {v2}")

Model Registry

모델 등록 및 버전 관리

Model Registry는 모델의 라이프사이클을 관리하는 중앙 저장소다. 모델을 등록하면 자동으로 버전이 부여되며, Staging, Production, Archived 스테이지 간 전환이 가능하다.

from mlflow.tracking import MlflowClient

client = MlflowClient()

# 모델 등록 (학습 Run에서 직접 등록)
model_name = "iris-classifier"
result = mlflow.register_model(
    model_uri=f"runs:/{run.info.run_id}/model",
    name=model_name,
)
print(f"Model Version: {result.version}")

# 모델 버전에 설명 추가
client.update_model_version(
    name=model_name,
    version=result.version,
    description="RandomForest baseline model with 100 trees, accuracy 0.95",
)

# 모델 버전에 태그 추가
client.set_model_version_tag(
    name=model_name,
    version=result.version,
    key="validation_status",
    value="approved",
)

모델 Alias와 스테이지 전환

MLflow 2.x부터는 Aliases를 사용한 모델 참조가 권장된다. 기존의 Stage(Staging/Production/Archived) 방식도 여전히 지원된다.

from mlflow.tracking import MlflowClient

client = MlflowClient()
model_name = "iris-classifier"

# Alias 방식 (MLflow 2.x 권장)
# champion alias 설정
client.set_registered_model_alias(
    name=model_name,
    alias="champion",
    version=3,
)

# challenger alias 설정
client.set_registered_model_alias(
    name=model_name,
    alias="challenger",
    version=4,
)

# Alias로 모델 로드
champion_model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
challenger_model = mlflow.pyfunc.load_model(f"models:/{model_name}@challenger")

# 예측 비교
champion_pred = champion_model.predict(X_test)
challenger_pred = challenger_model.predict(X_test)

print(f"Champion Accuracy: {accuracy_score(y_test, champion_pred)}")
print(f"Challenger Accuracy: {accuracy_score(y_test, challenger_pred)}")

# Challenger가 더 좋으면 Champion으로 승격
if accuracy_score(y_test, challenger_pred) > accuracy_score(y_test, champion_pred):
    client.set_registered_model_alias(
        name=model_name,
        alias="champion",
        version=4,
    )
    print("Challenger promoted to Champion!")

모델 승인 워크플로우

프로덕션 환경에서는 모델 배포 전 승인 프로세스가 필요하다.

def model_approval_workflow(model_name, version):
    """모델 승인 워크플로우"""
    client = MlflowClient()

    # 1단계: 모델 검증 메트릭 확인
    model_version = client.get_model_version(model_name, version)
    run = client.get_run(model_version.run_id)
    accuracy = run.data.metrics.get("accuracy", 0)
    f1 = run.data.metrics.get("f1_macro", 0)

    # 2단계: 품질 기준 확인
    quality_gates = {
        "accuracy >= 0.90": accuracy >= 0.90,
        "f1_macro >= 0.85": f1 >= 0.85,
    }

    all_passed = all(quality_gates.values())
    print("=== Quality Gate Results ===")
    for gate, passed in quality_gates.items():
        status = "PASS" if passed else "FAIL"
        print(f"  {gate}: {status}")

    # 3단계: 승인 여부에 따라 Alias 설정
    if all_passed:
        client.set_model_version_tag(
            name=model_name, version=version,
            key="approval_status", value="approved"
        )
        # Staging Alias 부여
        client.set_registered_model_alias(
            name=model_name, alias="staging", version=version
        )
        print(f"Model v{version} approved and moved to staging")
        return True
    else:
        client.set_model_version_tag(
            name=model_name, version=version,
            key="approval_status", value="rejected"
        )
        print(f"Model v{version} rejected - quality gates not met")
        return False

# 워크플로우 실행
model_approval_workflow("iris-classifier", 5)

배포 파이프라인

Docker 기반 배포

# Dockerfile.mlflow-serve
FROM python:3.11-slim

RUN pip install mlflow[extras] boto3 psycopg2-binary

ENV MLFLOW_TRACKING_URI=http://mlflow-server:5000
ENV MODEL_NAME=iris-classifier
ENV MODEL_ALIAS=champion

EXPOSE 8080

CMD mlflow models serve \
    --model-uri "models:/${MODEL_NAME}@${MODEL_ALIAS}" \
    --host 0.0.0.0 \
    --port 8080 \
    --workers 2 \
    --no-conda
# Docker 이미지 빌드 및 실행
docker build -t mlflow-model-serve -f Dockerfile.mlflow-serve .
docker run -p 8080:8080 \
  -e AWS_ACCESS_KEY_ID=your-key \
  -e AWS_SECRET_ACCESS_KEY=your-secret \
  mlflow-model-serve

# 예측 요청 테스트
curl -X POST http://localhost:8080/invocations \
  -H "Content-Type: application/json" \
  -d '{"inputs": [[5.1, 3.5, 1.4, 0.2]]}'

Kubernetes 배포

# k8s/mlflow-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: iris-classifier-serving
  labels:
    app: iris-classifier
spec:
  replicas: 3
  selector:
    matchLabels:
      app: iris-classifier
  template:
    metadata:
      labels:
        app: iris-classifier
    spec:
      containers:
        - name: model-server
          image: mlflow-model-serve:latest
          ports:
            - containerPort: 8080
          env:
            - name: MLFLOW_TRACKING_URI
              value: 'http://mlflow-server.mlflow.svc.cluster.local:5000'
            - name: MODEL_NAME
              value: 'iris-classifier'
            - name: MODEL_ALIAS
              value: 'champion'
          resources:
            requests:
              cpu: '500m'
              memory: '512Mi'
            limits:
              cpu: '1000m'
              memory: '1Gi'
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 30
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 60
            periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
  name: iris-classifier-service
spec:
  selector:
    app: iris-classifier
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
  type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: iris-classifier-ingress
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
    - host: model.example.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: iris-classifier-service
                port:
                  number: 80

CI/CD with GitHub Actions

# .github/workflows/model-deploy.yaml
name: Model Deployment Pipeline

on:
  workflow_dispatch:
    inputs:
      model_name:
        description: 'Model name in registry'
        required: true
        default: 'iris-classifier'
      model_version:
        description: 'Model version to deploy'
        required: true

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install mlflow boto3 scikit-learn

      - name: Validate model
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        run: |
          python scripts/validate_model.py \
            --model-name ${{ github.event.inputs.model_name }} \
            --model-version ${{ github.event.inputs.model_version }}

  deploy-staging:
    needs: validate
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to staging
        run: |
          kubectl apply -f k8s/staging/
          kubectl set image deployment/model-serving \
            model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to production
        run: |
          kubectl apply -f k8s/production/
          kubectl set image deployment/model-serving \
            model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}

      - name: Update MLflow alias
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
        run: |
          python -c "
          from mlflow.tracking import MlflowClient
          client = MlflowClient()
          client.set_registered_model_alias(
              name='${{ github.event.inputs.model_name }}',
              alias='champion',
              version=${{ github.event.inputs.model_version }}
          )
          "

실험 추적 플랫폼 비교

기능MLflowWeights and BiasesNeptuneCometML
라이선스오픈소스 (Apache 2.0)상용 (무료 티어 있음)상용 (무료 티어 있음)상용 (무료 티어 있음)
셀프 호스팅완전 지원제한적지원지원
실험 추적우수매우 우수우수우수
모델 레지스트리기본 내장외부 연동 필요제한적제한적
협업 기능기본적매우 우수 (리포트)우수우수
시각화기본적매우 우수우수우수
자동 로깅주요 프레임워크광범위 지원광범위 지원광범위 지원
Kubernetes 연동네이티브 지원제한적제한적제한적
하이퍼파라미터 튜닝Optuna 연동Sweeps 내장Optuna 연동Optimizer 내장
데이터 버전 관리기본적Artifacts기본적기본적
학습 곡선보통낮음보통낮음
커뮤니티매우 활발활발보통보통

플랫폼 선택 가이드

  • 셀프 호스팅 필수, 오픈소스 우선: MLflow
  • 팀 협업·실험 시각화 중심: Weights and Biases
  • 세밀한 메트릭 관리: Neptune
  • 빠른 도입, 간단한 설정: CometML

Transformers 통합

HuggingFace Transformers와 MLflow 연동

import mlflow
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
)
from datasets import load_dataset

mlflow.set_experiment("sentiment-analysis")

# 데이터셋 준비
dataset = load_dataset("imdb", split="train[:1000]")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.2)

# MLflow 자동 로깅 활성화
mlflow.transformers.autolog(log_models=True)

model = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased", num_labels=2
)

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    warmup_steps=100,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
)

# 학습 시작 (자동으로 MLflow에 로깅됨)
with mlflow.start_run(run_name="distilbert-sentiment"):
    trainer.train()

    # 추가 메트릭 기록
    eval_results = trainer.evaluate()
    mlflow.log_metrics(eval_results)

트러블슈팅

분산 학습 환경에서의 실험 추적

분산 학습 시 여러 워커가 동시에 MLflow에 로깅하면 충돌이 발생할 수 있다.

import mlflow
import os

def setup_mlflow_distributed():
    """분산 학습 환경에서 MLflow 설정"""

    rank = int(os.environ.get("RANK", 0))
    local_rank = int(os.environ.get("LOCAL_RANK", 0))
    world_size = int(os.environ.get("WORLD_SIZE", 1))

    # Rank 0 프로세스만 MLflow에 로깅
    if rank == 0:
        mlflow.set_tracking_uri("http://mlflow-server:5000")
        mlflow.set_experiment("distributed-training")
        run = mlflow.start_run(run_name=f"dist-train-{world_size}gpu")
        mlflow.log_param("world_size", world_size)
        return run
    else:
        # 다른 프로세스는 로깅 비활성화
        os.environ["MLFLOW_TRACKING_URI"] = ""
        return None


def log_distributed_metrics(metrics, step, rank=0):
    """Rank 0에서만 메트릭 기록"""
    if rank == 0:
        mlflow.log_metrics(metrics, step=step)

Registry 충돌 해결

여러 팀이 동시에 모델을 등록하거나 스테이지를 변경할 때 충돌이 발생할 수 있다.

from mlflow.tracking import MlflowClient
from mlflow.exceptions import MlflowException
import time

def safe_transition_model(model_name, version, target_alias, max_retries=3):
    """안전한 모델 스테이지 전환 (재시도 로직 포함)"""
    client = MlflowClient()

    for attempt in range(max_retries):
        try:
            # 현재 champion 확인
            try:
                current_champion = client.get_model_version_by_alias(
                    model_name, target_alias
                )
                print(f"Current {target_alias}: v{current_champion.version}")
            except MlflowException:
                print(f"No current {target_alias} found")

            # Alias 전환
            client.set_registered_model_alias(
                name=model_name,
                alias=target_alias,
                version=version,
            )
            print(f"Successfully set v{version} as {target_alias}")
            return True

        except MlflowException as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 지수 백오프

    print(f"Failed to transition model after {max_retries} attempts")
    return False

Artifact Store 접근 오류

S3 Artifact Store 사용 시 빈번하게 발생하는 인증 관련 문제와 해결 방법이다.

import boto3
from botocore.exceptions import ClientError

def diagnose_artifact_access(bucket_name, prefix="experiments/"):
    """S3 Artifact Store 접근 진단"""
    s3 = boto3.client("s3")

    checks = {}

    # 1. 버킷 접근 확인
    try:
        s3.head_bucket(Bucket=bucket_name)
        checks["bucket_access"] = "OK"
    except ClientError as e:
        checks["bucket_access"] = f"FAIL: {e.response['Error']['Code']}"

    # 2. 객체 리스트 확인
    try:
        response = s3.list_objects_v2(
            Bucket=bucket_name, Prefix=prefix, MaxKeys=5
        )
        count = response.get("KeyCount", 0)
        checks["list_objects"] = f"OK ({count} objects found)"
    except ClientError as e:
        checks["list_objects"] = f"FAIL: {e.response['Error']['Code']}"

    # 3. 쓰기 권한 확인
    try:
        test_key = f"{prefix}_health_check"
        s3.put_object(Bucket=bucket_name, Key=test_key, Body=b"test")
        s3.delete_object(Bucket=bucket_name, Key=test_key)
        checks["write_access"] = "OK"
    except ClientError as e:
        checks["write_access"] = f"FAIL: {e.response['Error']['Code']}"

    print("=== S3 Artifact Store Diagnosis ===")
    for check, result in checks.items():
        print(f"  {check}: {result}")

    return checks

운영 노트

성능 최적화 팁

  1. Batch 로깅 사용: mlflow.log_metrics()로 여러 메트릭을 한 번에 기록하면 API 호출 횟수를 줄일 수 있다
  2. 비동기 로깅: 대규모 아티팩트는 학습 완료 후 별도 프로세스로 업로드
  3. Tracking Server 캐싱: Nginx 리버스 프록시 앞단에 캐시 설정으로 읽기 성능 향상
  4. PostgreSQL 인덱스: 실험 검색이 느릴 경우 runs 테이블에 적절한 인덱스 추가

보안 고려사항

  • Tracking Server 앞에 인증 프록시(OAuth2 Proxy, Nginx Basic Auth) 배치
  • S3 버킷에 VPC 엔드포인트 적용으로 외부 접근 차단
  • 모델 아티팩트 암호화(SSE-S3 또는 SSE-KMS) 활성화
  • RBAC(Role-Based Access Control)으로 팀별 실험 접근 제어

프로덕션 체크리스트

  • [ ] Tracking Server를 별도 서버/컨테이너로 분리하여 운영
  • [ ] Backend Store를 PostgreSQL/MySQL로 구성 (SQLite 사용 금지)
  • [ ] Artifact Store를 S3/GCS/Azure Blob으로 구성
  • [ ] Tracking Server 앞에 인증 프록시 배치
  • [ ] 모델 레지스트리에 승인 워크플로우 적용
  • [ ] 모델 배포 시 자동 검증(Quality Gate) 파이프라인 구축
  • [ ] 분산 학습 환경에서 Rank 0만 로깅하도록 구성
  • [ ] Artifact Store에 적절한 보존 정책(Lifecycle Policy) 설정
  • [ ] 모니터링 대시보드(Grafana)로 Tracking Server 상태 감시
  • [ ] 정기적인 데이터베이스 백업 및 복구 테스트 수행
  • [ ] CI/CD 파이프라인에 모델 배포 자동화 연동
  • [ ] 모델 서빙 엔드포인트에 헬스체크 및 오토스케일링 구성

참고자료

Complete Guide to MLflow Experiment Management: Experiment Tracking, Model Registry, and Deployment Pipeline

Complete Guide to MLflow Experiment Management

Introduction

As machine learning projects scale, the first challenge teams face is experiment management. Managing dozens of hyperparameter tuning runs, various feature combinations, and algorithm comparisons via spreadsheets or notebooks quickly hits a wall. Being unable to reproduce experiment results or track which model is currently in production becomes a recurring issue.

MLflow is an open-source MLOps platform that originated at Databricks to solve these problems. Through its three core components -- Tracking, Model Registry, and Model Serving -- it manages the entire ML lifecycle. This guide covers everything from MLflow architecture to production deployment, providing practical strategies for running MLflow effectively in production.

MLflow Architecture

Core Component Structure

MLflow consists of four main components:

ComponentRoleStorage
Tracking ServerRecords experiment parameters, metrics, and artifactsBackend Store + Artifact Store
Model RegistryManages model versions and stage transitionsBackend Store
Model ServingDeploys models as REST APIsContainers/Cloud
ProjectsPackages reproducible experimentsGit or Local

Tracking Server Deployment Architecture

In production, you need a remote Tracking Server. The standard setup uses PostgreSQL as the Backend Store and S3 as the Artifact Store.

# tracking_server_config.py
"""
MLflow Tracking Server production configuration
Backend Store: PostgreSQL
Artifact Store: S3
"""

import os

TRACKING_CONFIG = {
    "backend_store_uri": "postgresql://mlflow:password@db-host:5432/mlflow",
    "default_artifact_root": "s3://mlflow-artifacts/experiments",
    "host": "0.0.0.0",
    "port": 5000,
    "workers": 4,
}
# Launch MLflow Tracking Server
mlflow server \
  --backend-store-uri postgresql://mlflow:password@db-host:5432/mlflow \
  --default-artifact-root s3://mlflow-artifacts/experiments \
  --host 0.0.0.0 \
  --port 5000 \
  --workers 4

# Launch with Docker Compose
docker compose up -d mlflow-server
# docker-compose.yaml
version: '3.8'
services:
  mlflow-db:
    image: postgres:15
    environment:
      POSTGRES_DB: mlflow
      POSTGRES_USER: mlflow
      POSTGRES_PASSWORD: mlflow_password
    volumes:
      - pgdata:/var/lib/postgresql/data
    ports:
      - '5432:5432'

  mlflow-server:
    build: ./mlflow
    depends_on:
      - mlflow-db
    environment:
      MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
      MLFLOW_DEFAULT_ARTIFACT_ROOT: s3://mlflow-artifacts/experiments
      AWS_ACCESS_KEY_ID: your-access-key
      AWS_SECRET_ACCESS_KEY: your-secret-key
    ports:
      - '5000:5000'
    command: >
      mlflow server
      --backend-store-uri postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
      --default-artifact-root s3://mlflow-artifacts/experiments
      --host 0.0.0.0
      --port 5000
      --workers 4

volumes:
  pgdata:

Experiment Tracking

Basic Experiment Logging

MLflow experiment tracking operates on a Run-by-Run basis. Each Run can record parameters, metrics, and artifacts.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.datasets import load_iris

# Connect to Tracking Server
mlflow.set_tracking_uri("http://mlflow-server:5000")

# Create or select an experiment
mlflow.set_experiment("iris-classification")

# Prepare data
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

# Run the experiment
with mlflow.start_run(run_name="rf-baseline-v1") as run:
    # Log hyperparameters
    params = {
        "n_estimators": 100,
        "max_depth": 5,
        "min_samples_split": 2,
        "random_state": 42,
    }
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Predict and compute metrics
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_macro": f1_score(y_test, y_pred, average="macro"),
        "precision_macro": precision_score(y_test, y_pred, average="macro"),
        "recall_macro": recall_score(y_test, y_pred, average="macro"),
    }
    mlflow.log_metrics(metrics)

    # Log model artifact
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="iris-classifier",
    )

    # Log additional artifacts (e.g., confusion matrix image)
    import matplotlib.pyplot as plt
    from sklearn.metrics import ConfusionMatrixDisplay

    fig, ax = plt.subplots(figsize=(8, 6))
    ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
    fig.savefig("/tmp/confusion_matrix.png")
    mlflow.log_artifact("/tmp/confusion_matrix.png", "plots")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

Autologging

MLflow supports autologging for major frameworks including scikit-learn, PyTorch, TensorFlow, and XGBoost. A single line of code automatically records parameters, metrics, and models.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score

# Enable autologging
mlflow.sklearn.autolog(
    log_input_examples=True,      # Save input data examples
    log_model_signatures=True,     # Auto-detect model signatures
    log_models=True,               # Auto-save model artifacts
    log_datasets=True,             # Save training dataset info
    silent=False,                  # Show logging messages
)

mlflow.set_experiment("iris-autolog-experiment")

with mlflow.start_run(run_name="gbc-autolog"):
    model = GradientBoostingClassifier(
        n_estimators=200,
        max_depth=3,
        learning_rate=0.1,
        random_state=42,
    )
    # autolog automatically records params/metrics/model on fit()
    model.fit(X_train, y_train)

    # cross-validation scores are also auto-logged
    cv_scores = cross_val_score(model, X_train, y_train, cv=5)
    mlflow.log_metric("cv_mean_accuracy", cv_scores.mean())
    mlflow.log_metric("cv_std_accuracy", cv_scores.std())

PyTorch Deep Learning Experiment Tracking

import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

mlflow.set_experiment("pytorch-classification")

class SimpleNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Training configuration
config = {
    "input_dim": 4,
    "hidden_dim": 64,
    "output_dim": 3,
    "learning_rate": 0.001,
    "epochs": 50,
    "batch_size": 16,
}

with mlflow.start_run(run_name="pytorch-simplenet"):
    mlflow.log_params(config)

    model = SimpleNet(config["input_dim"], config["hidden_dim"], config["output_dim"])
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])

    X_tensor = torch.FloatTensor(X_train)
    y_tensor = torch.LongTensor(y_train)
    dataset = TensorDataset(X_tensor, y_tensor)
    dataloader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True)

    for epoch in range(config["epochs"]):
        model.train()
        total_loss = 0
        for batch_X, batch_y in dataloader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        # Log per-epoch metrics
        mlflow.log_metric("train_loss", avg_loss, step=epoch)

        # Validation
        model.eval()
        with torch.no_grad():
            X_test_tensor = torch.FloatTensor(X_test)
            test_outputs = model(X_test_tensor)
            _, predicted = torch.max(test_outputs, 1)
            val_acc = (predicted.numpy() == y_test).mean()
            mlflow.log_metric("val_accuracy", val_acc, step=epoch)

    # Save model
    mlflow.pytorch.log_model(model, "pytorch-model")

MLflow Search API

You can programmatically search and compare experiment results.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient(tracking_uri="http://mlflow-server:5000")

# Query all Runs for a specific experiment
experiment = client.get_experiment_by_name("iris-classification")
runs = client.search_runs(
    experiment_ids=[experiment.experiment_id],
    filter_string="metrics.accuracy > 0.9 AND params.n_estimators = '100'",
    order_by=["metrics.f1_macro DESC"],
    max_results=10,
)

# Display results
for run in runs:
    print(f"Run ID: {run.info.run_id}")
    print(f"  Accuracy: {run.data.metrics.get('accuracy', 'N/A')}")
    print(f"  F1 Score: {run.data.metrics.get('f1_macro', 'N/A')}")
    print(f"  Params: {run.data.params}")
    print("---")

# Compare two Runs
run1 = runs[0]
run2 = runs[1] if len(runs) > 1 else None

if run2:
    print("=== Run Comparison ===")
    for metric_key in run1.data.metrics:
        v1 = run1.data.metrics[metric_key]
        v2 = run2.data.metrics.get(metric_key, "N/A")
        print(f"  {metric_key}: {v1} vs {v2}")

Model Registry

Model Registration and Versioning

The Model Registry is a centralized repository for managing the model lifecycle. When a model is registered, it is automatically versioned, and transitions between Staging, Production, and Archived stages are supported.

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model (directly from a training Run)
model_name = "iris-classifier"
result = mlflow.register_model(
    model_uri=f"runs:/{run.info.run_id}/model",
    name=model_name,
)
print(f"Model Version: {result.version}")

# Add description to model version
client.update_model_version(
    name=model_name,
    version=result.version,
    description="RandomForest baseline model with 100 trees, accuracy 0.95",
)

# Add tags to model version
client.set_model_version_tag(
    name=model_name,
    version=result.version,
    key="validation_status",
    value="approved",
)

Model Aliases and Stage Transitions

Starting with MLflow 2.x, model references using Aliases are recommended. The legacy Stage-based approach (Staging/Production/Archived) is still supported.

from mlflow.tracking import MlflowClient

client = MlflowClient()
model_name = "iris-classifier"

# Alias approach (recommended in MLflow 2.x)
# Set champion alias
client.set_registered_model_alias(
    name=model_name,
    alias="champion",
    version=3,
)

# Set challenger alias
client.set_registered_model_alias(
    name=model_name,
    alias="challenger",
    version=4,
)

# Load models by alias
champion_model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
challenger_model = mlflow.pyfunc.load_model(f"models:/{model_name}@challenger")

# Compare predictions
champion_pred = champion_model.predict(X_test)
challenger_pred = challenger_model.predict(X_test)

print(f"Champion Accuracy: {accuracy_score(y_test, champion_pred)}")
print(f"Challenger Accuracy: {accuracy_score(y_test, challenger_pred)}")

# Promote challenger to champion if it performs better
if accuracy_score(y_test, challenger_pred) > accuracy_score(y_test, champion_pred):
    client.set_registered_model_alias(
        name=model_name,
        alias="champion",
        version=4,
    )
    print("Challenger promoted to Champion!")

Model Approval Workflow

In production environments, an approval process is required before model deployment.

def model_approval_workflow(model_name, version):
    """Model approval workflow"""
    client = MlflowClient()

    # Step 1: Check model validation metrics
    model_version = client.get_model_version(model_name, version)
    run = client.get_run(model_version.run_id)
    accuracy = run.data.metrics.get("accuracy", 0)
    f1 = run.data.metrics.get("f1_macro", 0)

    # Step 2: Verify quality criteria
    quality_gates = {
        "accuracy >= 0.90": accuracy >= 0.90,
        "f1_macro >= 0.85": f1 >= 0.85,
    }

    all_passed = all(quality_gates.values())
    print("=== Quality Gate Results ===")
    for gate, passed in quality_gates.items():
        status = "PASS" if passed else "FAIL"
        print(f"  {gate}: {status}")

    # Step 3: Set alias based on approval
    if all_passed:
        client.set_model_version_tag(
            name=model_name, version=version,
            key="approval_status", value="approved"
        )
        # Assign staging alias
        client.set_registered_model_alias(
            name=model_name, alias="staging", version=version
        )
        print(f"Model v{version} approved and moved to staging")
        return True
    else:
        client.set_model_version_tag(
            name=model_name, version=version,
            key="approval_status", value="rejected"
        )
        print(f"Model v{version} rejected - quality gates not met")
        return False

# Execute workflow
model_approval_workflow("iris-classifier", 5)

Deployment Pipeline

Docker-Based Deployment

# Dockerfile.mlflow-serve
FROM python:3.11-slim

RUN pip install mlflow[extras] boto3 psycopg2-binary

ENV MLFLOW_TRACKING_URI=http://mlflow-server:5000
ENV MODEL_NAME=iris-classifier
ENV MODEL_ALIAS=champion

EXPOSE 8080

CMD mlflow models serve \
    --model-uri "models:/${MODEL_NAME}@${MODEL_ALIAS}" \
    --host 0.0.0.0 \
    --port 8080 \
    --workers 2 \
    --no-conda
# Build and run Docker image
docker build -t mlflow-model-serve -f Dockerfile.mlflow-serve .
docker run -p 8080:8080 \
  -e AWS_ACCESS_KEY_ID=your-key \
  -e AWS_SECRET_ACCESS_KEY=your-secret \
  mlflow-model-serve

# Test prediction request
curl -X POST http://localhost:8080/invocations \
  -H "Content-Type: application/json" \
  -d '{"inputs": [[5.1, 3.5, 1.4, 0.2]]}'

Kubernetes Deployment

# k8s/mlflow-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: iris-classifier-serving
  labels:
    app: iris-classifier
spec:
  replicas: 3
  selector:
    matchLabels:
      app: iris-classifier
  template:
    metadata:
      labels:
        app: iris-classifier
    spec:
      containers:
        - name: model-server
          image: mlflow-model-serve:latest
          ports:
            - containerPort: 8080
          env:
            - name: MLFLOW_TRACKING_URI
              value: 'http://mlflow-server.mlflow.svc.cluster.local:5000'
            - name: MODEL_NAME
              value: 'iris-classifier'
            - name: MODEL_ALIAS
              value: 'champion'
          resources:
            requests:
              cpu: '500m'
              memory: '512Mi'
            limits:
              cpu: '1000m'
              memory: '1Gi'
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 30
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 60
            periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
  name: iris-classifier-service
spec:
  selector:
    app: iris-classifier
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
  type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: iris-classifier-ingress
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
    - host: model.example.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: iris-classifier-service
                port:
                  number: 80

CI/CD with GitHub Actions

# .github/workflows/model-deploy.yaml
name: Model Deployment Pipeline

on:
  workflow_dispatch:
    inputs:
      model_name:
        description: 'Model name in registry'
        required: true
        default: 'iris-classifier'
      model_version:
        description: 'Model version to deploy'
        required: true

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install mlflow boto3 scikit-learn

      - name: Validate model
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        run: |
          python scripts/validate_model.py \
            --model-name ${{ github.event.inputs.model_name }} \
            --model-version ${{ github.event.inputs.model_version }}

  deploy-staging:
    needs: validate
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to staging
        run: |
          kubectl apply -f k8s/staging/
          kubectl set image deployment/model-serving \
            model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to production
        run: |
          kubectl apply -f k8s/production/
          kubectl set image deployment/model-serving \
            model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}

      - name: Update MLflow alias
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
        run: |
          python -c "
          from mlflow.tracking import MlflowClient
          client = MlflowClient()
          client.set_registered_model_alias(
              name='${{ github.event.inputs.model_name }}',
              alias='champion',
              version=${{ github.event.inputs.model_version }}
          )
          "

Experiment Tracking Platform Comparison

FeatureMLflowWeights and BiasesNeptuneCometML
LicenseOpen Source (Apache 2.0)Commercial (free tier)Commercial (free tier)Commercial (free tier)
Self-HostingFully supportedLimitedSupportedSupported
Experiment TrackingExcellentOutstandingExcellentExcellent
Model RegistryBuilt-inExternal integrationLimitedLimited
CollaborationBasicOutstanding (reports)ExcellentExcellent
VisualizationBasicOutstandingExcellentExcellent
AutologgingMajor frameworksExtensiveExtensiveExtensive
Kubernetes IntegrationNative supportLimitedLimitedLimited
Hyperparameter TuningOptuna integrationSweeps built-inOptuna integrationOptimizer built-in
Data VersioningBasicArtifactsBasicBasic
Learning CurveModerateLowModerateLow
CommunityVery activeActiveModerateModerate

Platform Selection Guide

  • Self-hosting required, open-source priority: MLflow
  • Team collaboration and experiment visualization focused: Weights and Biases
  • Fine-grained metric management: Neptune
  • Quick adoption, simple setup: CometML

Transformers Integration

HuggingFace Transformers with MLflow

import mlflow
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
)
from datasets import load_dataset

mlflow.set_experiment("sentiment-analysis")

# Prepare dataset
dataset = load_dataset("imdb", split="train[:1000]")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.2)

# Enable MLflow autologging
mlflow.transformers.autolog(log_models=True)

model = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased", num_labels=2
)

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    warmup_steps=100,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
)

# Start training (auto-logged to MLflow)
with mlflow.start_run(run_name="distilbert-sentiment"):
    trainer.train()

    # Log additional metrics
    eval_results = trainer.evaluate()
    mlflow.log_metrics(eval_results)

Troubleshooting

Experiment Tracking in Distributed Training

When multiple workers log to MLflow simultaneously during distributed training, conflicts can occur.

import mlflow
import os

def setup_mlflow_distributed():
    """MLflow setup for distributed training"""

    rank = int(os.environ.get("RANK", 0))
    local_rank = int(os.environ.get("LOCAL_RANK", 0))
    world_size = int(os.environ.get("WORLD_SIZE", 1))

    # Only Rank 0 process logs to MLflow
    if rank == 0:
        mlflow.set_tracking_uri("http://mlflow-server:5000")
        mlflow.set_experiment("distributed-training")
        run = mlflow.start_run(run_name=f"dist-train-{world_size}gpu")
        mlflow.log_param("world_size", world_size)
        return run
    else:
        # Disable logging for other processes
        os.environ["MLFLOW_TRACKING_URI"] = ""
        return None


def log_distributed_metrics(metrics, step, rank=0):
    """Log metrics only from Rank 0"""
    if rank == 0:
        mlflow.log_metrics(metrics, step=step)

Resolving Registry Conflicts

Conflicts can arise when multiple teams simultaneously register models or change stages.

from mlflow.tracking import MlflowClient
from mlflow.exceptions import MlflowException
import time

def safe_transition_model(model_name, version, target_alias, max_retries=3):
    """Safe model stage transition with retry logic"""
    client = MlflowClient()

    for attempt in range(max_retries):
        try:
            # Check current champion
            try:
                current_champion = client.get_model_version_by_alias(
                    model_name, target_alias
                )
                print(f"Current {target_alias}: v{current_champion.version}")
            except MlflowException:
                print(f"No current {target_alias} found")

            # Transition alias
            client.set_registered_model_alias(
                name=model_name,
                alias=target_alias,
                version=version,
            )
            print(f"Successfully set v{version} as {target_alias}")
            return True

        except MlflowException as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff

    print(f"Failed to transition model after {max_retries} attempts")
    return False

Artifact Store Access Errors

Common authentication-related issues and solutions when using S3 as the Artifact Store.

import boto3
from botocore.exceptions import ClientError

def diagnose_artifact_access(bucket_name, prefix="experiments/"):
    """Diagnose S3 Artifact Store access"""
    s3 = boto3.client("s3")

    checks = {}

    # 1. Check bucket access
    try:
        s3.head_bucket(Bucket=bucket_name)
        checks["bucket_access"] = "OK"
    except ClientError as e:
        checks["bucket_access"] = f"FAIL: {e.response['Error']['Code']}"

    # 2. Check object listing
    try:
        response = s3.list_objects_v2(
            Bucket=bucket_name, Prefix=prefix, MaxKeys=5
        )
        count = response.get("KeyCount", 0)
        checks["list_objects"] = f"OK ({count} objects found)"
    except ClientError as e:
        checks["list_objects"] = f"FAIL: {e.response['Error']['Code']}"

    # 3. Check write permission
    try:
        test_key = f"{prefix}_health_check"
        s3.put_object(Bucket=bucket_name, Key=test_key, Body=b"test")
        s3.delete_object(Bucket=bucket_name, Key=test_key)
        checks["write_access"] = "OK"
    except ClientError as e:
        checks["write_access"] = f"FAIL: {e.response['Error']['Code']}"

    print("=== S3 Artifact Store Diagnosis ===")
    for check, result in checks.items():
        print(f"  {check}: {result}")

    return checks

Operational Notes

Performance Optimization Tips

  1. Use batch logging: Log multiple metrics at once with mlflow.log_metrics() to reduce API calls
  2. Asynchronous logging: Upload large artifacts in a separate process after training completes
  3. Tracking Server caching: Improve read performance with cache settings on an Nginx reverse proxy
  4. PostgreSQL indexes: Add appropriate indexes on the runs table if experiment searches are slow

Security Considerations

  • Place an authentication proxy (OAuth2 Proxy, Nginx Basic Auth) in front of the Tracking Server
  • Apply VPC endpoints to S3 buckets to block external access
  • Enable model artifact encryption (SSE-S3 or SSE-KMS)
  • Use RBAC (Role-Based Access Control) for team-level experiment access control

Production Checklist

  • [ ] Deploy Tracking Server as a separate server/container
  • [ ] Configure Backend Store with PostgreSQL/MySQL (never use SQLite)
  • [ ] Configure Artifact Store with S3/GCS/Azure Blob
  • [ ] Place authentication proxy in front of Tracking Server
  • [ ] Apply approval workflow to Model Registry
  • [ ] Build automated validation (Quality Gate) pipeline for model deployment
  • [ ] Configure only Rank 0 logging in distributed training environments
  • [ ] Set appropriate retention policies (Lifecycle Policy) on Artifact Store
  • [ ] Monitor Tracking Server health with Grafana dashboards
  • [ ] Perform regular database backups and recovery testing
  • [ ] Integrate model deployment automation in CI/CD pipeline
  • [ ] Configure health checks and autoscaling for model serving endpoints

References