- Published on
MLflow 실험 관리 완벽 가이드: 실험 추적·모델 레지스트리·배포 파이프라인 구축
- Authors
- Name
- 들어가며
- MLflow 아키텍처
- 실험 추적 (Experiment Tracking)
- Model Registry
- 배포 파이프라인
- 실험 추적 플랫폼 비교
- Transformers 통합
- 트러블슈팅
- 운영 노트
- 프로덕션 체크리스트
- 참고자료
들어가며
머신러닝 프로젝트가 규모를 갖추면 가장 먼저 부딪히는 문제는 실험 관리다. 수십 번의 하이퍼파라미터 튜닝, 다양한 피처 조합, 여러 알고리즘 비교 실험을 스프레드시트나 노트로 관리하는 것은 한계가 있다. 실험 결과를 재현할 수 없거나, 어떤 모델이 프로덕션에 배포되어 있는지 추적할 수 없는 상황이 빈번하게 발생한다.
MLflow는 이러한 문제를 해결하기 위해 Databricks에서 시작한 오픈소스 MLOps 플랫폼이다. Tracking, Model Registry, Model Serving이라는 세 가지 핵심 컴포넌트를 통해 ML 라이프사이클 전반을 관리한다. 이 글에서는 MLflow의 아키텍처부터 실전 배포까지, 프로덕션 환경에서 MLflow를 효과적으로 운영하는 방법을 다룬다.
MLflow 아키텍처
핵심 컴포넌트 구조
MLflow는 크게 네 가지 컴포넌트로 구성된다.
| 컴포넌트 | 역할 | 저장소 |
|---|---|---|
| Tracking Server | 실험 파라미터·메트릭·아티팩트 기록 | Backend Store + Artifact Store |
| Model Registry | 모델 버전 관리·스테이지 전환 | Backend Store |
| Model Serving | REST API로 모델 배포 | 컨테이너/클라우드 |
| Projects | 재현 가능한 실험 패키징 | Git 또는 로컬 |
Tracking Server 배포 아키텍처
프로덕션 환경에서는 원격 Tracking Server를 구성해야 한다. Backend Store로 PostgreSQL, Artifact Store로 S3를 사용하는 것이 일반적이다.
# tracking_server_config.py
"""
MLflow Tracking Server 프로덕션 설정
Backend Store: PostgreSQL
Artifact Store: S3
"""
import os
TRACKING_CONFIG = {
"backend_store_uri": "postgresql://mlflow:password@db-host:5432/mlflow",
"default_artifact_root": "s3://mlflow-artifacts/experiments",
"host": "0.0.0.0",
"port": 5000,
"workers": 4,
}
# MLflow Tracking Server 실행
mlflow server \
--backend-store-uri postgresql://mlflow:password@db-host:5432/mlflow \
--default-artifact-root s3://mlflow-artifacts/experiments \
--host 0.0.0.0 \
--port 5000 \
--workers 4
# Docker Compose로 실행
docker compose up -d mlflow-server
# docker-compose.yaml
version: '3.8'
services:
mlflow-db:
image: postgres:15
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: mlflow_password
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- '5432:5432'
mlflow-server:
build: ./mlflow
depends_on:
- mlflow-db
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
MLFLOW_DEFAULT_ARTIFACT_ROOT: s3://mlflow-artifacts/experiments
AWS_ACCESS_KEY_ID: your-access-key
AWS_SECRET_ACCESS_KEY: your-secret-key
ports:
- '5000:5000'
command: >
mlflow server
--backend-store-uri postgresql://mlflow:mlflow_password@mlflow-db:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/experiments
--host 0.0.0.0
--port 5000
--workers 4
volumes:
pgdata:
실험 추적 (Experiment Tracking)
기본 실험 로깅
MLflow의 실험 추적은 Run 단위로 이루어진다. 각 Run에는 파라미터, 메트릭, 아티팩트를 기록할 수 있다.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.datasets import load_iris
# Tracking Server 연결
mlflow.set_tracking_uri("http://mlflow-server:5000")
# 실험 생성 또는 기존 실험 선택
mlflow.set_experiment("iris-classification")
# 데이터 준비
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# 실험 실행
with mlflow.start_run(run_name="rf-baseline-v1") as run:
# 하이퍼파라미터 로깅
params = {
"n_estimators": 100,
"max_depth": 5,
"min_samples_split": 2,
"random_state": 42,
}
mlflow.log_params(params)
# 모델 학습
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# 예측 및 메트릭 계산
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_macro": f1_score(y_test, y_pred, average="macro"),
"precision_macro": precision_score(y_test, y_pred, average="macro"),
"recall_macro": recall_score(y_test, y_pred, average="macro"),
}
mlflow.log_metrics(metrics)
# 모델 아티팩트 로깅
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="iris-classifier",
)
# 추가 아티팩트 로깅 (예: confusion matrix 이미지)
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
fig, ax = plt.subplots(figsize=(8, 6))
ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
fig.savefig("/tmp/confusion_matrix.png")
mlflow.log_artifact("/tmp/confusion_matrix.png", "plots")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
자동 로깅 (Autologging)
MLflow는 scikit-learn, PyTorch, TensorFlow, XGBoost 등 주요 프레임워크에 대한 자동 로깅을 지원한다. 한 줄의 코드로 파라미터, 메트릭, 모델을 자동으로 기록할 수 있다.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
# 자동 로깅 활성화
mlflow.sklearn.autolog(
log_input_examples=True, # 입력 데이터 예시 저장
log_model_signatures=True, # 모델 시그니처 자동 감지
log_models=True, # 모델 아티팩트 자동 저장
log_datasets=True, # 학습 데이터셋 정보 저장
silent=False, # 로깅 메시지 표시
)
mlflow.set_experiment("iris-autolog-experiment")
with mlflow.start_run(run_name="gbc-autolog"):
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=3,
learning_rate=0.1,
random_state=42,
)
# autolog이 fit 호출 시 자동으로 파라미터/메트릭/모델을 기록
model.fit(X_train, y_train)
# cross-validation 점수도 자동 기록됨
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
mlflow.log_metric("cv_mean_accuracy", cv_scores.mean())
mlflow.log_metric("cv_std_accuracy", cv_scores.std())
PyTorch 딥러닝 실험 추적
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
mlflow.set_experiment("pytorch-classification")
class SimpleNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
# 학습 설정
config = {
"input_dim": 4,
"hidden_dim": 64,
"output_dim": 3,
"learning_rate": 0.001,
"epochs": 50,
"batch_size": 16,
}
with mlflow.start_run(run_name="pytorch-simplenet"):
mlflow.log_params(config)
model = SimpleNet(config["input_dim"], config["hidden_dim"], config["output_dim"])
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
X_tensor = torch.FloatTensor(X_train)
y_tensor = torch.LongTensor(y_train)
dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True)
for epoch in range(config["epochs"]):
model.train()
total_loss = 0
for batch_X, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(dataloader)
# 에폭별 메트릭 로깅
mlflow.log_metric("train_loss", avg_loss, step=epoch)
# 검증
model.eval()
with torch.no_grad():
X_test_tensor = torch.FloatTensor(X_test)
test_outputs = model(X_test_tensor)
_, predicted = torch.max(test_outputs, 1)
val_acc = (predicted.numpy() == y_test).mean()
mlflow.log_metric("val_accuracy", val_acc, step=epoch)
# 모델 저장
mlflow.pytorch.log_model(model, "pytorch-model")
MLflow Search API
실험 결과를 프로그래밍 방식으로 검색하고 비교할 수 있다.
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient(tracking_uri="http://mlflow-server:5000")
# 특정 실험의 모든 Run 조회
experiment = client.get_experiment_by_name("iris-classification")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.accuracy > 0.9 AND params.n_estimators = '100'",
order_by=["metrics.f1_macro DESC"],
max_results=10,
)
# 결과 출력
for run in runs:
print(f"Run ID: {run.info.run_id}")
print(f" Accuracy: {run.data.metrics.get('accuracy', 'N/A')}")
print(f" F1 Score: {run.data.metrics.get('f1_macro', 'N/A')}")
print(f" Params: {run.data.params}")
print("---")
# 두 Run 비교
run1 = runs[0]
run2 = runs[1] if len(runs) > 1 else None
if run2:
print("=== Run Comparison ===")
for metric_key in run1.data.metrics:
v1 = run1.data.metrics[metric_key]
v2 = run2.data.metrics.get(metric_key, "N/A")
print(f" {metric_key}: {v1} vs {v2}")
Model Registry
모델 등록 및 버전 관리
Model Registry는 모델의 라이프사이클을 관리하는 중앙 저장소다. 모델을 등록하면 자동으로 버전이 부여되며, Staging, Production, Archived 스테이지 간 전환이 가능하다.
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 모델 등록 (학습 Run에서 직접 등록)
model_name = "iris-classifier"
result = mlflow.register_model(
model_uri=f"runs:/{run.info.run_id}/model",
name=model_name,
)
print(f"Model Version: {result.version}")
# 모델 버전에 설명 추가
client.update_model_version(
name=model_name,
version=result.version,
description="RandomForest baseline model with 100 trees, accuracy 0.95",
)
# 모델 버전에 태그 추가
client.set_model_version_tag(
name=model_name,
version=result.version,
key="validation_status",
value="approved",
)
모델 Alias와 스테이지 전환
MLflow 2.x부터는 Aliases를 사용한 모델 참조가 권장된다. 기존의 Stage(Staging/Production/Archived) 방식도 여전히 지원된다.
from mlflow.tracking import MlflowClient
client = MlflowClient()
model_name = "iris-classifier"
# Alias 방식 (MLflow 2.x 권장)
# champion alias 설정
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=3,
)
# challenger alias 설정
client.set_registered_model_alias(
name=model_name,
alias="challenger",
version=4,
)
# Alias로 모델 로드
champion_model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
challenger_model = mlflow.pyfunc.load_model(f"models:/{model_name}@challenger")
# 예측 비교
champion_pred = champion_model.predict(X_test)
challenger_pred = challenger_model.predict(X_test)
print(f"Champion Accuracy: {accuracy_score(y_test, champion_pred)}")
print(f"Challenger Accuracy: {accuracy_score(y_test, challenger_pred)}")
# Challenger가 더 좋으면 Champion으로 승격
if accuracy_score(y_test, challenger_pred) > accuracy_score(y_test, champion_pred):
client.set_registered_model_alias(
name=model_name,
alias="champion",
version=4,
)
print("Challenger promoted to Champion!")
모델 승인 워크플로우
프로덕션 환경에서는 모델 배포 전 승인 프로세스가 필요하다.
def model_approval_workflow(model_name, version):
"""모델 승인 워크플로우"""
client = MlflowClient()
# 1단계: 모델 검증 메트릭 확인
model_version = client.get_model_version(model_name, version)
run = client.get_run(model_version.run_id)
accuracy = run.data.metrics.get("accuracy", 0)
f1 = run.data.metrics.get("f1_macro", 0)
# 2단계: 품질 기준 확인
quality_gates = {
"accuracy >= 0.90": accuracy >= 0.90,
"f1_macro >= 0.85": f1 >= 0.85,
}
all_passed = all(quality_gates.values())
print("=== Quality Gate Results ===")
for gate, passed in quality_gates.items():
status = "PASS" if passed else "FAIL"
print(f" {gate}: {status}")
# 3단계: 승인 여부에 따라 Alias 설정
if all_passed:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="approved"
)
# Staging Alias 부여
client.set_registered_model_alias(
name=model_name, alias="staging", version=version
)
print(f"Model v{version} approved and moved to staging")
return True
else:
client.set_model_version_tag(
name=model_name, version=version,
key="approval_status", value="rejected"
)
print(f"Model v{version} rejected - quality gates not met")
return False
# 워크플로우 실행
model_approval_workflow("iris-classifier", 5)
배포 파이프라인
Docker 기반 배포
# Dockerfile.mlflow-serve
FROM python:3.11-slim
RUN pip install mlflow[extras] boto3 psycopg2-binary
ENV MLFLOW_TRACKING_URI=http://mlflow-server:5000
ENV MODEL_NAME=iris-classifier
ENV MODEL_ALIAS=champion
EXPOSE 8080
CMD mlflow models serve \
--model-uri "models:/${MODEL_NAME}@${MODEL_ALIAS}" \
--host 0.0.0.0 \
--port 8080 \
--workers 2 \
--no-conda
# Docker 이미지 빌드 및 실행
docker build -t mlflow-model-serve -f Dockerfile.mlflow-serve .
docker run -p 8080:8080 \
-e AWS_ACCESS_KEY_ID=your-key \
-e AWS_SECRET_ACCESS_KEY=your-secret \
mlflow-model-serve
# 예측 요청 테스트
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [[5.1, 3.5, 1.4, 0.2]]}'
Kubernetes 배포
# k8s/mlflow-model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: iris-classifier-serving
labels:
app: iris-classifier
spec:
replicas: 3
selector:
matchLabels:
app: iris-classifier
template:
metadata:
labels:
app: iris-classifier
spec:
containers:
- name: model-server
image: mlflow-model-serve:latest
ports:
- containerPort: 8080
env:
- name: MLFLOW_TRACKING_URI
value: 'http://mlflow-server.mlflow.svc.cluster.local:5000'
- name: MODEL_NAME
value: 'iris-classifier'
- name: MODEL_ALIAS
value: 'champion'
resources:
requests:
cpu: '500m'
memory: '512Mi'
limits:
cpu: '1000m'
memory: '1Gi'
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: iris-classifier-service
spec:
selector:
app: iris-classifier
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: iris-classifier-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: model.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: iris-classifier-service
port:
number: 80
CI/CD with GitHub Actions
# .github/workflows/model-deploy.yaml
name: Model Deployment Pipeline
on:
workflow_dispatch:
inputs:
model_name:
description: 'Model name in registry'
required: true
default: 'iris-classifier'
model_version:
description: 'Model version to deploy'
required: true
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install mlflow boto3 scikit-learn
- name: Validate model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
python scripts/validate_model.py \
--model-name ${{ github.event.inputs.model_name }} \
--model-version ${{ github.event.inputs.model_version }}
deploy-staging:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
kubectl apply -f k8s/staging/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
kubectl apply -f k8s/production/
kubectl set image deployment/model-serving \
model-server=registry.example.com/model:v${{ github.event.inputs.model_version }}
- name: Update MLflow alias
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
run: |
python -c "
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.set_registered_model_alias(
name='${{ github.event.inputs.model_name }}',
alias='champion',
version=${{ github.event.inputs.model_version }}
)
"
실험 추적 플랫폼 비교
| 기능 | MLflow | Weights and Biases | Neptune | CometML |
|---|---|---|---|---|
| 라이선스 | 오픈소스 (Apache 2.0) | 상용 (무료 티어 있음) | 상용 (무료 티어 있음) | 상용 (무료 티어 있음) |
| 셀프 호스팅 | 완전 지원 | 제한적 | 지원 | 지원 |
| 실험 추적 | 우수 | 매우 우수 | 우수 | 우수 |
| 모델 레지스트리 | 기본 내장 | 외부 연동 필요 | 제한적 | 제한적 |
| 협업 기능 | 기본적 | 매우 우수 (리포트) | 우수 | 우수 |
| 시각화 | 기본적 | 매우 우수 | 우수 | 우수 |
| 자동 로깅 | 주요 프레임워크 | 광범위 지원 | 광범위 지원 | 광범위 지원 |
| Kubernetes 연동 | 네이티브 지원 | 제한적 | 제한적 | 제한적 |
| 하이퍼파라미터 튜닝 | Optuna 연동 | Sweeps 내장 | Optuna 연동 | Optimizer 내장 |
| 데이터 버전 관리 | 기본적 | Artifacts | 기본적 | 기본적 |
| 학습 곡선 | 보통 | 낮음 | 보통 | 낮음 |
| 커뮤니티 | 매우 활발 | 활발 | 보통 | 보통 |
플랫폼 선택 가이드
- 셀프 호스팅 필수, 오픈소스 우선: MLflow
- 팀 협업·실험 시각화 중심: Weights and Biases
- 세밀한 메트릭 관리: Neptune
- 빠른 도입, 간단한 설정: CometML
Transformers 통합
HuggingFace Transformers와 MLflow 연동
import mlflow
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
TrainingArguments,
Trainer,
)
from datasets import load_dataset
mlflow.set_experiment("sentiment-analysis")
# 데이터셋 준비
dataset = load_dataset("imdb", split="train[:1000]")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=256)
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.2)
# MLflow 자동 로깅 활성화
mlflow.transformers.autolog(log_models=True)
model = AutoModelForSequenceClassification.from_pretrained(
"distilbert-base-uncased", num_labels=2
)
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=100,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset["test"],
)
# 학습 시작 (자동으로 MLflow에 로깅됨)
with mlflow.start_run(run_name="distilbert-sentiment"):
trainer.train()
# 추가 메트릭 기록
eval_results = trainer.evaluate()
mlflow.log_metrics(eval_results)
트러블슈팅
분산 학습 환경에서의 실험 추적
분산 학습 시 여러 워커가 동시에 MLflow에 로깅하면 충돌이 발생할 수 있다.
import mlflow
import os
def setup_mlflow_distributed():
"""분산 학습 환경에서 MLflow 설정"""
rank = int(os.environ.get("RANK", 0))
local_rank = int(os.environ.get("LOCAL_RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
# Rank 0 프로세스만 MLflow에 로깅
if rank == 0:
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("distributed-training")
run = mlflow.start_run(run_name=f"dist-train-{world_size}gpu")
mlflow.log_param("world_size", world_size)
return run
else:
# 다른 프로세스는 로깅 비활성화
os.environ["MLFLOW_TRACKING_URI"] = ""
return None
def log_distributed_metrics(metrics, step, rank=0):
"""Rank 0에서만 메트릭 기록"""
if rank == 0:
mlflow.log_metrics(metrics, step=step)
Registry 충돌 해결
여러 팀이 동시에 모델을 등록하거나 스테이지를 변경할 때 충돌이 발생할 수 있다.
from mlflow.tracking import MlflowClient
from mlflow.exceptions import MlflowException
import time
def safe_transition_model(model_name, version, target_alias, max_retries=3):
"""안전한 모델 스테이지 전환 (재시도 로직 포함)"""
client = MlflowClient()
for attempt in range(max_retries):
try:
# 현재 champion 확인
try:
current_champion = client.get_model_version_by_alias(
model_name, target_alias
)
print(f"Current {target_alias}: v{current_champion.version}")
except MlflowException:
print(f"No current {target_alias} found")
# Alias 전환
client.set_registered_model_alias(
name=model_name,
alias=target_alias,
version=version,
)
print(f"Successfully set v{version} as {target_alias}")
return True
except MlflowException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 지수 백오프
print(f"Failed to transition model after {max_retries} attempts")
return False
Artifact Store 접근 오류
S3 Artifact Store 사용 시 빈번하게 발생하는 인증 관련 문제와 해결 방법이다.
import boto3
from botocore.exceptions import ClientError
def diagnose_artifact_access(bucket_name, prefix="experiments/"):
"""S3 Artifact Store 접근 진단"""
s3 = boto3.client("s3")
checks = {}
# 1. 버킷 접근 확인
try:
s3.head_bucket(Bucket=bucket_name)
checks["bucket_access"] = "OK"
except ClientError as e:
checks["bucket_access"] = f"FAIL: {e.response['Error']['Code']}"
# 2. 객체 리스트 확인
try:
response = s3.list_objects_v2(
Bucket=bucket_name, Prefix=prefix, MaxKeys=5
)
count = response.get("KeyCount", 0)
checks["list_objects"] = f"OK ({count} objects found)"
except ClientError as e:
checks["list_objects"] = f"FAIL: {e.response['Error']['Code']}"
# 3. 쓰기 권한 확인
try:
test_key = f"{prefix}_health_check"
s3.put_object(Bucket=bucket_name, Key=test_key, Body=b"test")
s3.delete_object(Bucket=bucket_name, Key=test_key)
checks["write_access"] = "OK"
except ClientError as e:
checks["write_access"] = f"FAIL: {e.response['Error']['Code']}"
print("=== S3 Artifact Store Diagnosis ===")
for check, result in checks.items():
print(f" {check}: {result}")
return checks
운영 노트
성능 최적화 팁
- Batch 로깅 사용:
mlflow.log_metrics()로 여러 메트릭을 한 번에 기록하면 API 호출 횟수를 줄일 수 있다 - 비동기 로깅: 대규모 아티팩트는 학습 완료 후 별도 프로세스로 업로드
- Tracking Server 캐싱: Nginx 리버스 프록시 앞단에 캐시 설정으로 읽기 성능 향상
- PostgreSQL 인덱스: 실험 검색이 느릴 경우
runs테이블에 적절한 인덱스 추가
보안 고려사항
- Tracking Server 앞에 인증 프록시(OAuth2 Proxy, Nginx Basic Auth) 배치
- S3 버킷에 VPC 엔드포인트 적용으로 외부 접근 차단
- 모델 아티팩트 암호화(SSE-S3 또는 SSE-KMS) 활성화
- RBAC(Role-Based Access Control)으로 팀별 실험 접근 제어
프로덕션 체크리스트
- [ ] Tracking Server를 별도 서버/컨테이너로 분리하여 운영
- [ ] Backend Store를 PostgreSQL/MySQL로 구성 (SQLite 사용 금지)
- [ ] Artifact Store를 S3/GCS/Azure Blob으로 구성
- [ ] Tracking Server 앞에 인증 프록시 배치
- [ ] 모델 레지스트리에 승인 워크플로우 적용
- [ ] 모델 배포 시 자동 검증(Quality Gate) 파이프라인 구축
- [ ] 분산 학습 환경에서 Rank 0만 로깅하도록 구성
- [ ] Artifact Store에 적절한 보존 정책(Lifecycle Policy) 설정
- [ ] 모니터링 대시보드(Grafana)로 Tracking Server 상태 감시
- [ ] 정기적인 데이터베이스 백업 및 복구 테스트 수행
- [ ] CI/CD 파이프라인에 모델 배포 자동화 연동
- [ ] 모델 서빙 엔드포인트에 헬스체크 및 오토스케일링 구성