Split View: MLflow 실험 관리 완벽 가이드: 실험 추적·모델 레지스트리·배포 파이프라인 구축
MLflow 실험 관리 완벽 가이드: 실험 추적·모델 레지스트리·배포 파이프라인 구축
- 들어가며
- MLflow 아키텍처
- 실험 추적 (Experiment Tracking)
- Model Registry
- 배포 파이프라인
- 실험 추적 플랫폼 비교
- Transformers 통합
- 트러블슈팅
- 운영 노트
- 프로덕션 체크리스트
- 참고자료
들어가며
머신러닝 프로젝트가 규모를 갖추면 가장 먼저 부딪히는 문제는 실험 관리다. 수십 번의 하이퍼파라미터 튜닝, 다양한 피처 조합, 여러 알고리즘 비교 실험을 스프레드시트나 노트로 관리하는 것은 한계가 있다. 실험 결과를 재현할 수 없거나, 어떤 모델이 프로덕션에 배포되어 있는지 추적할 수 없는 상황이 빈번하게 발생한다.
MLflow는 이러한 문제를 해결하기 위해 Databricks에서 시작한 오픈소스 MLOps 플랫폼이다. Tracking, Model Registry, Model Serving이라는 세 가지 핵심 컴포넌트를 통해 ML 라이프사이클 전반을 관리한다. 이 글에서는 MLflow의 아키텍처부터 실전 배포까지, 프로덕션 환경에서 MLflow를 효과적으로 운영하는 방법을 다룬다.
MLflow 아키텍처
핵심 컴포넌트 구조
MLflow는 크게 네 가지 컴포넌트로 구성된다.
| 컴포넌트 | 역할 | 저장소 |
|---|---|---|
| Tracking Server | 실험 파라미터·메트릭·아티팩트 기록 | Backend Store + Artifact Store |
| Model Registry | 모델 버전 관리·스테이지 전환 | Backend Store |
| Model Serving | REST API로 모델 배포 | 컨테이너/클라우드 |
| Projects | 재현 가능한 실험 패키징 | Git 또는 로컬 |
Tracking Server 배포 아키텍처
프로덕션 환경에서는 원격 Tracking Server를 구성해야 한다. Backend Store로 PostgreSQL, Artifact Store로 S3를 사용하는 것이 일반적이다.
# tracking_server_config.py
"""
MLflow Tracking Server 프로덕션 설정
Backend Store: PostgreSQL
Artifact Store: S3
"""
import os
TRACKING_CONFIG = {
"backend_store_uri": "postgresql://mlflow:password@db-host:5432/mlflow",
"default_artifact_root": "s3://mlflow-artifacts/experiments",
"host": "0.0.0.0",
"port": 5000,
"workers": 4,
}
# MLflow Tracking Server 실행
mlflow server \
--backend-store-uri postgresql://mlflow:password@db-host:5432/mlflow \
--default-artifact-root s3://mlflow-artifacts/experiments \
--host 0.0.0.0 \
--port 5000 \
--workers 4
# Docker Compose로 실행
docker compose up -d mlflow-server
# docker-compose.yaml
version: '3.8'
services:
mlflow-db:
image: postgres:15
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: mlflow_password
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- '5432:5432'
mlflow-server:
build: ./mlflow
depends_on:
- mlflow-db
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
MLFLOW_DEFAULT_ARTIFACT_ROOT: s3://mlflow-artifacts/experiments
AWS_ACCESS_KEY_ID: your-access-key
AWS_SECRET_ACCESS_KEY: your-secret-key
ports:
- '5000:5000'
command: >
mlflow server
--backend-store-uri postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/experiments
--host 0.0.0.0
--port 5000
--workers 4
volumes:
pgdata:
실험 추적 (Experiment Tracking)
기본 실험 로깅
MLflow의 실험 추적은 Run 단위로 이루어진다. 각 Run에는 파라미터, 메트릭, 아티팩트를 기록할 수 있다.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.datasets import load_iris
# Tracking Server 연결
mlflow.set_tracking_uri("http://mlflow-server:5000")
# 실험 생성 또는 기존 실험 선택
mlflow.set_experiment("iris-classification")
# 데이터 준비
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# 실험 실행
with mlflow.start_run(run_name="rf-baseline-v1") as run:
# 하이퍼파라미터 로깅
params = {
"n_estimators": 100,
"max_depth": 5,
"min_samples_split": 2,
"random_state": 42,
}
mlflow.log_params(params)
# 모델 학습
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# 예측 및 메트릭 계산
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_macro": f1_score(y_test, y_pred, average="macro"),
"precision_macro": precision_score(y_test, y_pred, average="macro"),
"recall_macro": recall_score(y_test, y_pred, average="macro"),
}
mlflow.log_metrics(metrics)
# 모델 아티팩트 로깅
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="iris-classifier",
)
# 추가 아티팩트 로깅 (예: confusion matrix 이미지)
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
fig, ax = plt.subplots(figsize=(8, 6))
ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
fig.savefig("/tmp/confusion_matrix.png")
mlflow.log_artifact("/tmp/confusion_matrix.png", "plots")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
자동 로깅 (Autologging)
MLflow는 scikit-learn, PyTorch, TensorFlow, XGBoost 등 주요 프레임워크에 대한 자동 로깅을 지원한다. 한 줄의 코드로 파라미터, 메트릭, 모델을 자동으로 기록할 수 있다.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
# 자동 로깅 활성화
mlflow.sklearn.autolog(
log_input_examples=True, # 입력 데이터 예시 저장
log_model_signatures=True, # 모델 시그니처 자동 감지
log_models=True, # 모델 아티팩트 자동 저장
log_datasets=True, # 학습 데이터셋 정보 저장
silent=False, # 로깅 메시지 표시
)
mlflow.set_experiment("iris-autolog-experiment")
with mlflow.start_run(run_name="gbc-autolog"):
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=3,
learning_rate=0.1,
random_state=42,
)
# autolog이 fit 호출 시 자동으로 파라미터/메트릭/모델을 기록
model.fit(X_train, y_train)
# cross-validation 점수도 자동 기록됨
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
mlflow.log_metric("cv_mean_accuracy", cv_scores.mean())
mlflow.log_metric("cv_std_accuracy", cv_scores.std())
PyTorch 딥러닝 실험 추적
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
mlflow.set_experiment("pytorch-classification")
class SimpleNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
# 학습 설정
config = {
"input_dim": 4,
"hidden_dim": 64,
"output_dim": 3,
"learning_rate": 0.001,
"epochs": 50,
"batch_size": 16,
}
with mlflow.start_run(run_name="pytorch-simplenet"):
mlflow.log_params(config)
model = SimpleNet(config["input_dim"], config["hidden_dim"], config["output_dim"])
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
X_tensor = torch.FloatTensor(X_train)
y_tensor = torch.LongTensor(y_train)
dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True)
for epoch in range(config["epochs"]):
model.train()
total_loss = 0
for batch_X, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(dataloader)
# 에폭별 메트릭 로깅
mlflow.log_metric("train_loss", avg_loss, step=epoch)
# 검증
model.eval()
with torch.no_grad():
X_test_tensor = torch.FloatTensor(X_test)
test_outputs = model(X_test_tensor)
_, predicted = torch.max(test_outputs, 1)
val_acc = (predicted.numpy() == y_test).mean()
mlflow.log_metric("val_accuracy", val_acc, step=epoch)
# 모델 저장
mlflow.pytorch.log_model(model, "pytorch-model")
MLflow Search API
실험 결과를 프로그래밍 방식으로 검색하고 비교할 수 있다.
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient(tracking_uri="http://mlflow-server:5000")
# 특정 실험의 모든 Run 조회
experiment = client.get_experiment_by_name("iris-classification")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.accuracy > 0.9 AND params.n_estimators = '100'",
order_by=["metrics.f1_macro DESC"],
max_results=10,
)
# 결과 출력
for run in runs:
print(f"Run ID: {run.info.run_id}")
print(f" Accuracy: {run.data.metrics.get('accuracy', 'N/A')}")
print(f" F1 Score: {run.data.metrics.get('f1_macro', 'N/A')}")
print(f" Params: {run.data.params}")
print("---")
# 두 Run 비교
run1 = runs[0]
run2 = runs[1] if len(runs) > 1 else None
if run2:
print("=== Run Comparison ===")
for metric_key in run1.data.metrics:
v1 = run1.data.metrics[metric_key]
v2 = run2.data.metrics.get(metric_key, "N/A")
print(f" {metric_key}: {v1} vs {v2}")
Model Registry
모델 등록 및 버전 관리
Model Registry는 모델의 라이프사이클을 관리하는 중앙 저장소다. 모델을 등록하면 자동으로 버전이 부여되며, Staging, Production, Archived 스테이지 간 전환이 가능하다.
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 모델 등록 (학습 Run에서 직접 등록)
model_name = "iris-classifier"
result = mlflow.register_model(
model_uri=f"runs:/{run.info.run_id}/model",
name=model_name,
)
print(f"Model Version: {result.version}")
# 모델 버전에 설명 추가
client.update_model_version(
name=model_name,
version=result.version,
description="RandomForest baseline model with 100 trees, accuracy 0.95",
)
# 모델 버전에 태그 추가
client.set_model_version_tag(
name=model_name,
version=result.version,
key="validation_status",
value="approved",
)
모델 Alias와 스테이지 전환
MLflow 2.x부터는 Aliases를 사용한 모델 참조가 권장된다. 기존의 Stage(Staging/Production/Archived) 방식도 여전히 지원된다.
from mlflow.tracking import MlflowClient
client = MlflowClient()
model_name = "iris-classifier"
# Alias 방식 (MLflow 2.x 권장)
# champion alias 설정
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=3,
)
# challenger alias 설정
client.set_registered_model_alias(
name=model_name,
alias="challenger",
version=4,
)
# Alias로 모델 로드
champion_model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
challenger_model = mlflow.pyfunc.load_model(f"models:/{model_name}@challenger")
# 예측 비교
champion_pred = champion_model.predict(X_test)
challenger_pred = challenger_model.predict(X_test)
print(f"Champion Accuracy: {accuracy_score(y_test, champion_pred)}")
print(f"Challenger Accuracy: {accuracy_score(y_test, challenger_pred)}")
# Challenger가 더 좋으면 Champion으로 승격
if accuracy_score(y_test, challenger_pred) > accuracy_score(y_test, champion_pred):
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=4,
)
print("Challenger promoted to Champion!")
모델 승인 워크플로우
프로덕션 환경에서는 모델 배포 전 승인 프로세스가 필요하다.
def model_approval_workflow(model_name, version):
"""모델 승인 워크플로우"""
client = MlflowClient()
# 1단계: 모델 검증 메트릭 확인
model_version = client.get_model_version(model_name, version)
run = client.get_run(model_version.run_id)
accuracy = run.data.metrics.get("accuracy", 0)
f1 = run.data.metrics.get("f1_macro", 0)
# 2단계: 품질 기준 확인
quality_gates = {
"accuracy >= 0.90": accuracy >= 0.90,
"f1_macro >= 0.85": f1 >= 0.85,
}
all_passed = all(quality_gates.values())
print("=== Quality Gate Results ===")
for gate, passed in quality_gates.items():
status = "PASS" if passed else "FAIL"
print(f" {gate}: {status}")
# 3단계: 승인 여부에 따라 Alias 설정
if all_passed:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="approved"
)
# Staging Alias 부여
client.set_registered_model_alias(
name=model_name, alias="staging", version=version
)
print(f"Model v{version} approved and moved to staging")
return True
else:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="rejected"
)
print(f"Model v{version} rejected - quality gates not met")
return False
# 워크플로우 실행
model_approval_workflow("iris-classifier", 5)
배포 파이프라인
Docker 기반 배포
# Dockerfile.mlflow-serve
FROM python:3.11-slim
RUN pip install mlflow[extras] boto3 psycopg2-binary
ENV MLFLOW_TRACKING_URI=http://mlflow-server:5000
ENV MODEL_NAME=iris-classifier
ENV MODEL_ALIAS=champion
EXPOSE 8080
CMD mlflow models serve \
--model-uri "models:/${MODEL_NAME}@${MODEL_ALIAS}" \
--host 0.0.0.0 \
--port 8080 \
--workers 2 \
--no-conda
# Docker 이미지 빌드 및 실행
docker build -t mlflow-model-serve -f Dockerfile.mlflow-serve .
docker run -p 8080:8080 \
-e AWS_ACCESS_KEY_ID=your-key \
-e AWS_SECRET_ACCESS_KEY=your-secret \
mlflow-model-serve
# 예측 요청 테스트
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [[5.1, 3.5, 1.4, 0.2]]}'
Kubernetes 배포
# k8s/mlflow-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: iris-classifier-serving
labels:
app: iris-classifier
spec:
replicas: 3
selector:
matchLabels:
app: iris-classifier
template:
metadata:
labels:
app: iris-classifier
spec:
containers:
- name: model-server
image: mlflow-model-serve:latest
ports:
- containerPort: 8080
env:
- name: MLFLOW_TRACKING_URI
value: 'http://mlflow-server.mlflow.svc.cluster.local:5000'
- name: MODEL_NAME
value: 'iris-classifier'
- name: MODEL_ALIAS
value: 'champion'
resources:
requests:
cpu: '500m'
memory: '512Mi'
limits:
cpu: '1000m'
memory: '1Gi'
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: iris-classifier-service
spec:
selector:
app: iris-classifier
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: iris-classifier-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: model.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: iris-classifier-service
port:
number: 80
CI/CD with GitHub Actions
# .github/workflows/model-deploy.yaml
name: Model Deployment Pipeline
on:
workflow_dispatch:
inputs:
model_name:
description: 'Model name in registry'
required: true
default: 'iris-classifier'
model_version:
description: 'Model version to deploy'
required: true
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install mlflow boto3 scikit-learn
- name: Validate model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
python scripts/validate_model.py \
--model-name ${{ github.event.inputs.model_name }} \
--model-version ${{ github.event.inputs.model_version }}
deploy-staging:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
kubectl apply -f k8s/staging/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
kubectl apply -f k8s/production/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
- name: Update MLflow alias
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
run: |
python -c "
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.set_registered_model_alias(
name='${{ github.event.inputs.model_name }}',
alias='champion',
version=${{ github.event.inputs.model_version }}
)
"
실험 추적 플랫폼 비교
| 기능 | MLflow | Weights and Biases | Neptune | CometML |
|---|---|---|---|---|
| 라이선스 | 오픈소스 (Apache 2.0) | 상용 (무료 티어 있음) | 상용 (무료 티어 있음) | 상용 (무료 티어 있음) |
| 셀프 호스팅 | 완전 지원 | 제한적 | 지원 | 지원 |
| 실험 추적 | 우수 | 매우 우수 | 우수 | 우수 |
| 모델 레지스트리 | 기본 내장 | 외부 연동 필요 | 제한적 | 제한적 |
| 협업 기능 | 기본적 | 매우 우수 (리포트) | 우수 | 우수 |
| 시각화 | 기본적 | 매우 우수 | 우수 | 우수 |
| 자동 로깅 | 주요 프레임워크 | 광범위 지원 | 광범위 지원 | 광범위 지원 |
| Kubernetes 연동 | 네이티브 지원 | 제한적 | 제한적 | 제한적 |
| 하이퍼파라미터 튜닝 | Optuna 연동 | Sweeps 내장 | Optuna 연동 | Optimizer 내장 |
| 데이터 버전 관리 | 기본적 | Artifacts | 기본적 | 기본적 |
| 학습 곡선 | 보통 | 낮음 | 보통 | 낮음 |
| 커뮤니티 | 매우 활발 | 활발 | 보통 | 보통 |
플랫폼 선택 가이드
- 셀프 호스팅 필수, 오픈소스 우선: MLflow
- 팀 협업·실험 시각화 중심: Weights and Biases
- 세밀한 메트릭 관리: Neptune
- 빠른 도입, 간단한 설정: CometML
Transformers 통합
HuggingFace Transformers와 MLflow 연동
import mlflow
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
TrainingArguments,
Trainer,
)
from datasets import load_dataset
mlflow.set_experiment("sentiment-analysis")
# 데이터셋 준비
dataset = load_dataset("imdb", split="train[:1000]")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.2)
# MLflow 자동 로깅 활성화
mlflow.transformers.autolog(log_models=True)
model = AutoModelForSequenceClassification.from_pretrained(
"distilbert-base-uncased", num_labels=2
)
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=100,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset["test"],
)
# 학습 시작 (자동으로 MLflow에 로깅됨)
with mlflow.start_run(run_name="distilbert-sentiment"):
trainer.train()
# 추가 메트릭 기록
eval_results = trainer.evaluate()
mlflow.log_metrics(eval_results)
트러블슈팅
분산 학습 환경에서의 실험 추적
분산 학습 시 여러 워커가 동시에 MLflow에 로깅하면 충돌이 발생할 수 있다.
import mlflow
import os
def setup_mlflow_distributed():
"""분산 학습 환경에서 MLflow 설정"""
rank = int(os.environ.get("RANK", 0))
local_rank = int(os.environ.get("LOCAL_RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
# Rank 0 프로세스만 MLflow에 로깅
if rank == 0:
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("distributed-training")
run = mlflow.start_run(run_name=f"dist-train-{world_size}gpu")
mlflow.log_param("world_size", world_size)
return run
else:
# 다른 프로세스는 로깅 비활성화
os.environ["MLFLOW_TRACKING_URI"] = ""
return None
def log_distributed_metrics(metrics, step, rank=0):
"""Rank 0에서만 메트릭 기록"""
if rank == 0:
mlflow.log_metrics(metrics, step=step)
Registry 충돌 해결
여러 팀이 동시에 모델을 등록하거나 스테이지를 변경할 때 충돌이 발생할 수 있다.
from mlflow.tracking import MlflowClient
from mlflow.exceptions import MlflowException
import time
def safe_transition_model(model_name, version, target_alias, max_retries=3):
"""안전한 모델 스테이지 전환 (재시도 로직 포함)"""
client = MlflowClient()
for attempt in range(max_retries):
try:
# 현재 champion 확인
try:
current_champion = client.get_model_version_by_alias(
model_name, target_alias
)
print(f"Current {target_alias}: v{current_champion.version}")
except MlflowException:
print(f"No current {target_alias} found")
# Alias 전환
client.set_registered_model_alias(
name=model_name,
alias=target_alias,
version=version,
)
print(f"Successfully set v{version} as {target_alias}")
return True
except MlflowException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 지수 백오프
print(f"Failed to transition model after {max_retries} attempts")
return False
Artifact Store 접근 오류
S3 Artifact Store 사용 시 빈번하게 발생하는 인증 관련 문제와 해결 방법이다.
import boto3
from botocore.exceptions import ClientError
def diagnose_artifact_access(bucket_name, prefix="experiments/"):
"""S3 Artifact Store 접근 진단"""
s3 = boto3.client("s3")
checks = {}
# 1. 버킷 접근 확인
try:
s3.head_bucket(Bucket=bucket_name)
checks["bucket_access"] = "OK"
except ClientError as e:
checks["bucket_access"] = f"FAIL: {e.response['Error']['Code']}"
# 2. 객체 리스트 확인
try:
response = s3.list_objects_v2(
Bucket=bucket_name, Prefix=prefix, MaxKeys=5
)
count = response.get("KeyCount", 0)
checks["list_objects"] = f"OK ({count} objects found)"
except ClientError as e:
checks["list_objects"] = f"FAIL: {e.response['Error']['Code']}"
# 3. 쓰기 권한 확인
try:
test_key = f"{prefix}_health_check"
s3.put_object(Bucket=bucket_name, Key=test_key, Body=b"test")
s3.delete_object(Bucket=bucket_name, Key=test_key)
checks["write_access"] = "OK"
except ClientError as e:
checks["write_access"] = f"FAIL: {e.response['Error']['Code']}"
print("=== S3 Artifact Store Diagnosis ===")
for check, result in checks.items():
print(f" {check}: {result}")
return checks
운영 노트
성능 최적화 팁
- Batch 로깅 사용:
mlflow.log_metrics()로 여러 메트릭을 한 번에 기록하면 API 호출 횟수를 줄일 수 있다 - 비동기 로깅: 대규모 아티팩트는 학습 완료 후 별도 프로세스로 업로드
- Tracking Server 캐싱: Nginx 리버스 프록시 앞단에 캐시 설정으로 읽기 성능 향상
- PostgreSQL 인덱스: 실험 검색이 느릴 경우
runs테이블에 적절한 인덱스 추가
보안 고려사항
- Tracking Server 앞에 인증 프록시(OAuth2 Proxy, Nginx Basic Auth) 배치
- S3 버킷에 VPC 엔드포인트 적용으로 외부 접근 차단
- 모델 아티팩트 암호화(SSE-S3 또는 SSE-KMS) 활성화
- RBAC(Role-Based Access Control)으로 팀별 실험 접근 제어
프로덕션 체크리스트
- [ ] Tracking Server를 별도 서버/컨테이너로 분리하여 운영
- [ ] Backend Store를 PostgreSQL/MySQL로 구성 (SQLite 사용 금지)
- [ ] Artifact Store를 S3/GCS/Azure Blob으로 구성
- [ ] Tracking Server 앞에 인증 프록시 배치
- [ ] 모델 레지스트리에 승인 워크플로우 적용
- [ ] 모델 배포 시 자동 검증(Quality Gate) 파이프라인 구축
- [ ] 분산 학습 환경에서 Rank 0만 로깅하도록 구성
- [ ] Artifact Store에 적절한 보존 정책(Lifecycle Policy) 설정
- [ ] 모니터링 대시보드(Grafana)로 Tracking Server 상태 감시
- [ ] 정기적인 데이터베이스 백업 및 복구 테스트 수행
- [ ] CI/CD 파이프라인에 모델 배포 자동화 연동
- [ ] 모델 서빙 엔드포인트에 헬스체크 및 오토스케일링 구성
참고자료
Complete Guide to MLflow Experiment Management: Experiment Tracking, Model Registry, and Deployment Pipeline
- Introduction
- MLflow Architecture
- Experiment Tracking
- Model Registry
- Deployment Pipeline
- Experiment Tracking Platform Comparison
- Transformers Integration
- Troubleshooting
- Operational Notes
- Production Checklist
- References
Introduction
As machine learning projects scale, the first challenge teams face is experiment management. Managing dozens of hyperparameter tuning runs, various feature combinations, and algorithm comparisons via spreadsheets or notebooks quickly hits a wall. Being unable to reproduce experiment results or track which model is currently in production becomes a recurring issue.
MLflow is an open-source MLOps platform that originated at Databricks to solve these problems. Through its three core components -- Tracking, Model Registry, and Model Serving -- it manages the entire ML lifecycle. This guide covers everything from MLflow architecture to production deployment, providing practical strategies for running MLflow effectively in production.
MLflow Architecture
Core Component Structure
MLflow consists of four main components:
| Component | Role | Storage |
|---|---|---|
| Tracking Server | Records experiment parameters, metrics, and artifacts | Backend Store + Artifact Store |
| Model Registry | Manages model versions and stage transitions | Backend Store |
| Model Serving | Deploys models as REST APIs | Containers/Cloud |
| Projects | Packages reproducible experiments | Git or Local |
Tracking Server Deployment Architecture
In production, you need a remote Tracking Server. The standard setup uses PostgreSQL as the Backend Store and S3 as the Artifact Store.
# tracking_server_config.py
"""
MLflow Tracking Server production configuration
Backend Store: PostgreSQL
Artifact Store: S3
"""
import os
TRACKING_CONFIG = {
"backend_store_uri": "postgresql://mlflow:password@db-host:5432/mlflow",
"default_artifact_root": "s3://mlflow-artifacts/experiments",
"host": "0.0.0.0",
"port": 5000,
"workers": 4,
}
# Launch MLflow Tracking Server
mlflow server \
--backend-store-uri postgresql://mlflow:password@db-host:5432/mlflow \
--default-artifact-root s3://mlflow-artifacts/experiments \
--host 0.0.0.0 \
--port 5000 \
--workers 4
# Launch with Docker Compose
docker compose up -d mlflow-server
# docker-compose.yaml
version: '3.8'
services:
mlflow-db:
image: postgres:15
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: mlflow_password
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- '5432:5432'
mlflow-server:
build: ./mlflow
depends_on:
- mlflow-db
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
MLFLOW_DEFAULT_ARTIFACT_ROOT: s3://mlflow-artifacts/experiments
AWS_ACCESS_KEY_ID: your-access-key
AWS_SECRET_ACCESS_KEY: your-secret-key
ports:
- '5000:5000'
command: >
mlflow server
--backend-store-uri postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/experiments
--host 0.0.0.0
--port 5000
--workers 4
volumes:
pgdata:
Experiment Tracking
Basic Experiment Logging
MLflow experiment tracking operates on a Run-by-Run basis. Each Run can record parameters, metrics, and artifacts.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.datasets import load_iris
# Connect to Tracking Server
mlflow.set_tracking_uri("http://mlflow-server:5000")
# Create or select an experiment
mlflow.set_experiment("iris-classification")
# Prepare data
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# Run the experiment
with mlflow.start_run(run_name="rf-baseline-v1") as run:
# Log hyperparameters
params = {
"n_estimators": 100,
"max_depth": 5,
"min_samples_split": 2,
"random_state": 42,
}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Predict and compute metrics
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_macro": f1_score(y_test, y_pred, average="macro"),
"precision_macro": precision_score(y_test, y_pred, average="macro"),
"recall_macro": recall_score(y_test, y_pred, average="macro"),
}
mlflow.log_metrics(metrics)
# Log model artifact
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="iris-classifier",
)
# Log additional artifacts (e.g., confusion matrix image)
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
fig, ax = plt.subplots(figsize=(8, 6))
ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
fig.savefig("/tmp/confusion_matrix.png")
mlflow.log_artifact("/tmp/confusion_matrix.png", "plots")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
Autologging
MLflow supports autologging for major frameworks including scikit-learn, PyTorch, TensorFlow, and XGBoost. A single line of code automatically records parameters, metrics, and models.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
# Enable autologging
mlflow.sklearn.autolog(
log_input_examples=True, # Save input data examples
log_model_signatures=True, # Auto-detect model signatures
log_models=True, # Auto-save model artifacts
log_datasets=True, # Save training dataset info
silent=False, # Show logging messages
)
mlflow.set_experiment("iris-autolog-experiment")
with mlflow.start_run(run_name="gbc-autolog"):
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=3,
learning_rate=0.1,
random_state=42,
)
# autolog automatically records params/metrics/model on fit()
model.fit(X_train, y_train)
# cross-validation scores are also auto-logged
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
mlflow.log_metric("cv_mean_accuracy", cv_scores.mean())
mlflow.log_metric("cv_std_accuracy", cv_scores.std())
PyTorch Deep Learning Experiment Tracking
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
mlflow.set_experiment("pytorch-classification")
class SimpleNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
# Training configuration
config = {
"input_dim": 4,
"hidden_dim": 64,
"output_dim": 3,
"learning_rate": 0.001,
"epochs": 50,
"batch_size": 16,
}
with mlflow.start_run(run_name="pytorch-simplenet"):
mlflow.log_params(config)
model = SimpleNet(config["input_dim"], config["hidden_dim"], config["output_dim"])
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
X_tensor = torch.FloatTensor(X_train)
y_tensor = torch.LongTensor(y_train)
dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True)
for epoch in range(config["epochs"]):
model.train()
total_loss = 0
for batch_X, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(dataloader)
# Log per-epoch metrics
mlflow.log_metric("train_loss", avg_loss, step=epoch)
# Validation
model.eval()
with torch.no_grad():
X_test_tensor = torch.FloatTensor(X_test)
test_outputs = model(X_test_tensor)
_, predicted = torch.max(test_outputs, 1)
val_acc = (predicted.numpy() == y_test).mean()
mlflow.log_metric("val_accuracy", val_acc, step=epoch)
# Save model
mlflow.pytorch.log_model(model, "pytorch-model")
MLflow Search API
You can programmatically search and compare experiment results.
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient(tracking_uri="http://mlflow-server:5000")
# Query all Runs for a specific experiment
experiment = client.get_experiment_by_name("iris-classification")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.accuracy > 0.9 AND params.n_estimators = '100'",
order_by=["metrics.f1_macro DESC"],
max_results=10,
)
# Display results
for run in runs:
print(f"Run ID: {run.info.run_id}")
print(f" Accuracy: {run.data.metrics.get('accuracy', 'N/A')}")
print(f" F1 Score: {run.data.metrics.get('f1_macro', 'N/A')}")
print(f" Params: {run.data.params}")
print("---")
# Compare two Runs
run1 = runs[0]
run2 = runs[1] if len(runs) > 1 else None
if run2:
print("=== Run Comparison ===")
for metric_key in run1.data.metrics:
v1 = run1.data.metrics[metric_key]
v2 = run2.data.metrics.get(metric_key, "N/A")
print(f" {metric_key}: {v1} vs {v2}")
Model Registry
Model Registration and Versioning
The Model Registry is a centralized repository for managing the model lifecycle. When a model is registered, it is automatically versioned, and transitions between Staging, Production, and Archived stages are supported.
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register model (directly from a training Run)
model_name = "iris-classifier"
result = mlflow.register_model(
model_uri=f"runs:/{run.info.run_id}/model",
name=model_name,
)
print(f"Model Version: {result.version}")
# Add description to model version
client.update_model_version(
name=model_name,
version=result.version,
description="RandomForest baseline model with 100 trees, accuracy 0.95",
)
# Add tags to model version
client.set_model_version_tag(
name=model_name,
version=result.version,
key="validation_status",
value="approved",
)
Model Aliases and Stage Transitions
Starting with MLflow 2.x, model references using Aliases are recommended. The legacy Stage-based approach (Staging/Production/Archived) is still supported.
from mlflow.tracking import MlflowClient
client = MlflowClient()
model_name = "iris-classifier"
# Alias approach (recommended in MLflow 2.x)
# Set champion alias
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=3,
)
# Set challenger alias
client.set_registered_model_alias(
name=model_name,
alias="challenger",
version=4,
)
# Load models by alias
champion_model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
challenger_model = mlflow.pyfunc.load_model(f"models:/{model_name}@challenger")
# Compare predictions
champion_pred = champion_model.predict(X_test)
challenger_pred = challenger_model.predict(X_test)
print(f"Champion Accuracy: {accuracy_score(y_test, champion_pred)}")
print(f"Challenger Accuracy: {accuracy_score(y_test, challenger_pred)}")
# Promote challenger to champion if it performs better
if accuracy_score(y_test, challenger_pred) > accuracy_score(y_test, champion_pred):
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=4,
)
print("Challenger promoted to Champion!")
Model Approval Workflow
In production environments, an approval process is required before model deployment.
def model_approval_workflow(model_name, version):
"""Model approval workflow"""
client = MlflowClient()
# Step 1: Check model validation metrics
model_version = client.get_model_version(model_name, version)
run = client.get_run(model_version.run_id)
accuracy = run.data.metrics.get("accuracy", 0)
f1 = run.data.metrics.get("f1_macro", 0)
# Step 2: Verify quality criteria
quality_gates = {
"accuracy >= 0.90": accuracy >= 0.90,
"f1_macro >= 0.85": f1 >= 0.85,
}
all_passed = all(quality_gates.values())
print("=== Quality Gate Results ===")
for gate, passed in quality_gates.items():
status = "PASS" if passed else "FAIL"
print(f" {gate}: {status}")
# Step 3: Set alias based on approval
if all_passed:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="approved"
)
# Assign staging alias
client.set_registered_model_alias(
name=model_name, alias="staging", version=version
)
print(f"Model v{version} approved and moved to staging")
return True
else:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="rejected"
)
print(f"Model v{version} rejected - quality gates not met")
return False
# Execute workflow
model_approval_workflow("iris-classifier", 5)
Deployment Pipeline
Docker-Based Deployment
# Dockerfile.mlflow-serve
FROM python:3.11-slim
RUN pip install mlflow[extras] boto3 psycopg2-binary
ENV MLFLOW_TRACKING_URI=http://mlflow-server:5000
ENV MODEL_NAME=iris-classifier
ENV MODEL_ALIAS=champion
EXPOSE 8080
CMD mlflow models serve \
--model-uri "models:/${MODEL_NAME}@${MODEL_ALIAS}" \
--host 0.0.0.0 \
--port 8080 \
--workers 2 \
--no-conda
# Build and run Docker image
docker build -t mlflow-model-serve -f Dockerfile.mlflow-serve .
docker run -p 8080:8080 \
-e AWS_ACCESS_KEY_ID=your-key \
-e AWS_SECRET_ACCESS_KEY=your-secret \
mlflow-model-serve
# Test prediction request
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [[5.1, 3.5, 1.4, 0.2]]}'
Kubernetes Deployment
# k8s/mlflow-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: iris-classifier-serving
labels:
app: iris-classifier
spec:
replicas: 3
selector:
matchLabels:
app: iris-classifier
template:
metadata:
labels:
app: iris-classifier
spec:
containers:
- name: model-server
image: mlflow-model-serve:latest
ports:
- containerPort: 8080
env:
- name: MLFLOW_TRACKING_URI
value: 'http://mlflow-server.mlflow.svc.cluster.local:5000'
- name: MODEL_NAME
value: 'iris-classifier'
- name: MODEL_ALIAS
value: 'champion'
resources:
requests:
cpu: '500m'
memory: '512Mi'
limits:
cpu: '1000m'
memory: '1Gi'
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: iris-classifier-service
spec:
selector:
app: iris-classifier
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: iris-classifier-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: model.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: iris-classifier-service
port:
number: 80
CI/CD with GitHub Actions
# .github/workflows/model-deploy.yaml
name: Model Deployment Pipeline
on:
workflow_dispatch:
inputs:
model_name:
description: 'Model name in registry'
required: true
default: 'iris-classifier'
model_version:
description: 'Model version to deploy'
required: true
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install mlflow boto3 scikit-learn
- name: Validate model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
python scripts/validate_model.py \
--model-name ${{ github.event.inputs.model_name }} \
--model-version ${{ github.event.inputs.model_version }}
deploy-staging:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
kubectl apply -f k8s/staging/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
kubectl apply -f k8s/production/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
- name: Update MLflow alias
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
run: |
python -c "
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.set_registered_model_alias(
name='${{ github.event.inputs.model_name }}',
alias='champion',
version=${{ github.event.inputs.model_version }}
)
"
Experiment Tracking Platform Comparison
| Feature | MLflow | Weights and Biases | Neptune | CometML |
|---|---|---|---|---|
| License | Open Source (Apache 2.0) | Commercial (free tier) | Commercial (free tier) | Commercial (free tier) |
| Self-Hosting | Fully supported | Limited | Supported | Supported |
| Experiment Tracking | Excellent | Outstanding | Excellent | Excellent |
| Model Registry | Built-in | External integration | Limited | Limited |
| Collaboration | Basic | Outstanding (reports) | Excellent | Excellent |
| Visualization | Basic | Outstanding | Excellent | Excellent |
| Autologging | Major frameworks | Extensive | Extensive | Extensive |
| Kubernetes Integration | Native support | Limited | Limited | Limited |
| Hyperparameter Tuning | Optuna integration | Sweeps built-in | Optuna integration | Optimizer built-in |
| Data Versioning | Basic | Artifacts | Basic | Basic |
| Learning Curve | Moderate | Low | Moderate | Low |
| Community | Very active | Active | Moderate | Moderate |
Platform Selection Guide
- Self-hosting required, open-source priority: MLflow
- Team collaboration and experiment visualization focused: Weights and Biases
- Fine-grained metric management: Neptune
- Quick adoption, simple setup: CometML
Transformers Integration
HuggingFace Transformers with MLflow
import mlflow
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
TrainingArguments,
Trainer,
)
from datasets import load_dataset
mlflow.set_experiment("sentiment-analysis")
# Prepare dataset
dataset = load_dataset("imdb", split="train[:1000]")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.2)
# Enable MLflow autologging
mlflow.transformers.autolog(log_models=True)
model = AutoModelForSequenceClassification.from_pretrained(
"distilbert-base-uncased", num_labels=2
)
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=100,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset["test"],
)
# Start training (auto-logged to MLflow)
with mlflow.start_run(run_name="distilbert-sentiment"):
trainer.train()
# Log additional metrics
eval_results = trainer.evaluate()
mlflow.log_metrics(eval_results)
Troubleshooting
Experiment Tracking in Distributed Training
When multiple workers log to MLflow simultaneously during distributed training, conflicts can occur.
import mlflow
import os
def setup_mlflow_distributed():
"""MLflow setup for distributed training"""
rank = int(os.environ.get("RANK", 0))
local_rank = int(os.environ.get("LOCAL_RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
# Only Rank 0 process logs to MLflow
if rank == 0:
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("distributed-training")
run = mlflow.start_run(run_name=f"dist-train-{world_size}gpu")
mlflow.log_param("world_size", world_size)
return run
else:
# Disable logging for other processes
os.environ["MLFLOW_TRACKING_URI"] = ""
return None
def log_distributed_metrics(metrics, step, rank=0):
"""Log metrics only from Rank 0"""
if rank == 0:
mlflow.log_metrics(metrics, step=step)
Resolving Registry Conflicts
Conflicts can arise when multiple teams simultaneously register models or change stages.
from mlflow.tracking import MlflowClient
from mlflow.exceptions import MlflowException
import time
def safe_transition_model(model_name, version, target_alias, max_retries=3):
"""Safe model stage transition with retry logic"""
client = MlflowClient()
for attempt in range(max_retries):
try:
# Check current champion
try:
current_champion = client.get_model_version_by_alias(
model_name, target_alias
)
print(f"Current {target_alias}: v{current_champion.version}")
except MlflowException:
print(f"No current {target_alias} found")
# Transition alias
client.set_registered_model_alias(
name=model_name,
alias=target_alias,
version=version,
)
print(f"Successfully set v{version} as {target_alias}")
return True
except MlflowException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
print(f"Failed to transition model after {max_retries} attempts")
return False
Artifact Store Access Errors
Common authentication-related issues and solutions when using S3 as the Artifact Store.
import boto3
from botocore.exceptions import ClientError
def diagnose_artifact_access(bucket_name, prefix="experiments/"):
"""Diagnose S3 Artifact Store access"""
s3 = boto3.client("s3")
checks = {}
# 1. Check bucket access
try:
s3.head_bucket(Bucket=bucket_name)
checks["bucket_access"] = "OK"
except ClientError as e:
checks["bucket_access"] = f"FAIL: {e.response['Error']['Code']}"
# 2. Check object listing
try:
response = s3.list_objects_v2(
Bucket=bucket_name, Prefix=prefix, MaxKeys=5
)
count = response.get("KeyCount", 0)
checks["list_objects"] = f"OK ({count} objects found)"
except ClientError as e:
checks["list_objects"] = f"FAIL: {e.response['Error']['Code']}"
# 3. Check write permission
try:
test_key = f"{prefix}_health_check"
s3.put_object(Bucket=bucket_name, Key=test_key, Body=b"test")
s3.delete_object(Bucket=bucket_name, Key=test_key)
checks["write_access"] = "OK"
except ClientError as e:
checks["write_access"] = f"FAIL: {e.response['Error']['Code']}"
print("=== S3 Artifact Store Diagnosis ===")
for check, result in checks.items():
print(f" {check}: {result}")
return checks
Operational Notes
Performance Optimization Tips
- Use batch logging: Log multiple metrics at once with
mlflow.log_metrics()to reduce API calls - Asynchronous logging: Upload large artifacts in a separate process after training completes
- Tracking Server caching: Improve read performance with cache settings on an Nginx reverse proxy
- PostgreSQL indexes: Add appropriate indexes on the
runstable if experiment searches are slow
Security Considerations
- Place an authentication proxy (OAuth2 Proxy, Nginx Basic Auth) in front of the Tracking Server
- Apply VPC endpoints to S3 buckets to block external access
- Enable model artifact encryption (SSE-S3 or SSE-KMS)
- Use RBAC (Role-Based Access Control) for team-level experiment access control
Production Checklist
- [ ] Deploy Tracking Server as a separate server/container
- [ ] Configure Backend Store with PostgreSQL/MySQL (never use SQLite)
- [ ] Configure Artifact Store with S3/GCS/Azure Blob
- [ ] Place authentication proxy in front of Tracking Server
- [ ] Apply approval workflow to Model Registry
- [ ] Build automated validation (Quality Gate) pipeline for model deployment
- [ ] Configure only Rank 0 logging in distributed training environments
- [ ] Set appropriate retention policies (Lifecycle Policy) on Artifact Store
- [ ] Monitor Tracking Server health with Grafana dashboards
- [ ] Perform regular database backups and recovery testing
- [ ] Integrate model deployment automation in CI/CD pipeline
- [ ] Configure health checks and autoscaling for model serving endpoints