Split View: MLOps & AI 모델 배포 완전 가이드 — 학습부터 서빙, 모니터링까지
MLOps & AI 모델 배포 완전 가이드 — 학습부터 서빙, 모니터링까지
목차
- MLOps란 무엇인가
- 실험 관리
- 데이터 파이프라인
- 모델 학습 인프라
- 모델 서빙
- LLM 서빙 특별편
- CI/CD for ML
- 모니터링
- A/B 테스트와 배포 전략
- 비용 최적화
- 실전: LLM RAG 파이프라인 구축
1. MLOps란 무엇인가
DevOps와 ML의 만남
MLOps는 Machine Learning과 Operations의 합성어다. 소프트웨어 엔지니어링의 DevOps 원칙을 머신러닝 라이프사이클에 적용한 것이라고 보면 된다.
전통적인 소프트웨어 개발에서는 코드만 관리하면 됐다. 하지만 ML 시스템에서는 코드, 데이터, 모델 세 가지를 동시에 관리해야 한다. 데이터가 바뀌면 모델이 바뀌고, 모델이 바뀌면 서빙 방식도 바뀔 수 있다.
ML 라이프사이클
ML 프로젝트의 전체 흐름은 다음과 같다.
- 문제 정의 - 비즈니스 요구사항을 ML 문제로 변환
- 데이터 수집 및 전처리 - 데이터 파이프라인 구축
- 피처 엔지니어링 - 원시 데이터를 모델이 이해할 수 있는 형태로 변환
- 모델 학습 - 다양한 알고리즘과 하이퍼파라미터 실험
- 모델 평가 - 오프라인 메트릭으로 성능 검증
- 모델 배포 - 프로덕션 환경에 서빙
- 모니터링 - 성능 추적과 드리프트 감지
- 재학습 - 성능 저하 시 모델 업데이트
이 과정은 일회성이 아니라 지속적인 순환이다.
Google의 MLOps 성숙도 모델
Google은 MLOps 성숙도를 세 단계로 정의한다.
Level 0 - 수동 프로세스
- 데이터 과학자가 노트북에서 수동으로 학습
- 모델 배포가 수동이고 비정기적
- 모니터링 없음
- 대부분의 팀이 여기에 해당
Level 1 - ML 파이프라인 자동화
- 학습 파이프라인이 자동화됨
- 지속적 학습(Continuous Training) 구현
- 실험 추적 시스템 도입
- 모델 레지스트리 사용
Level 2 - CI/CD 파이프라인 자동화
- 파이프라인 자체의 CI/CD 구현
- 자동화된 테스트와 검증
- 피처 스토어 활용
- 완전한 모니터링과 알림 체계
대부분의 조직이 Level 0에서 Level 1으로 가는 것만으로도 큰 가치를 얻을 수 있다.
2. 실험 관리
MLflow - 오픈소스 ML 플랫폼
MLflow는 ML 실험 관리의 사실상 표준이다. 네 가지 핵심 컴포넌트로 구성된다.
MLflow Tracking - 실험 기록
import mlflow
mlflow.set_experiment("recommendation-model-v2")
with mlflow.start_run():
# 하이퍼파라미터 기록
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 64)
mlflow.log_param("epochs", 50)
# 학습 실행
model = train_model(lr=0.001, batch_size=64, epochs=50)
# 메트릭 기록
mlflow.log_metric("accuracy", 0.923)
mlflow.log_metric("f1_score", 0.891)
mlflow.log_metric("auc_roc", 0.956)
# 모델 저장
mlflow.pytorch.log_model(model, "model")
MLflow Models - 모델 패키징
# 모델 레지스트리에 등록
mlflow.register_model(
"runs:/abc123/model",
"recommendation-model"
)
# 스테이지 전환
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="recommendation-model",
version=3,
stage="Production"
)
Weights and Biases
W&B는 상용 실험 관리 플랫폼으로, 시각화 기능이 뛰어나다.
import wandb
wandb.init(project="image-classification", config={
"learning_rate": 0.001,
"architecture": "ResNet50",
"dataset": "imagenet-subset",
"epochs": 100,
})
for epoch in range(100):
train_loss, val_loss = train_one_epoch(model)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"epoch": epoch
})
Optuna - 하이퍼파라미터 최적화
수동으로 하이퍼파라미터를 조정하는 것은 비효율적이다. Optuna는 베이지안 최적화 기반으로 자동 탐색을 수행한다.
import optuna
def objective(trial):
lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
n_layers = trial.suggest_int("n_layers", 1, 5)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
model = create_model(n_layers=n_layers, dropout=dropout)
accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)
return accuracy
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")
3. 데이터 파이프라인
Feature Store
피처 스토어는 ML 피처의 중앙 저장소다. 학습과 서빙에서 동일한 피처를 사용할 수 있게 해준다.
Feast 예시
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# 피처 정의
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
user = Entity(name="user_id", join_keys=["user_id"])
user_features = FeatureView(
name="user_features",
entities=[user],
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="days_since_last_order", dtype=Int64),
],
source=user_source,
)
# 학습 데이터 조회
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
"user_features:days_since_last_order",
],
).to_df()
# 실시간 서빙용 조회
online_features = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
],
entity_rows=[{"user_id": 12345}],
).to_dict()
DVC - 데이터 버저닝
DVC(Data Version Control)는 Git으로 데이터를 버전 관리할 수 있게 한다.
# DVC 초기화
dvc init
# 데이터 파일 추적
dvc add data/training_data.csv
# 리모트 스토리지 설정
dvc remote add -d myremote s3://my-bucket/dvc-storage
# 데이터 푸시
dvc push
# 특정 버전의 데이터 가져오기
git checkout v1.0
dvc pull
데이터 검증 - Great Expectations
데이터 품질 검증은 파이프라인의 핵심이다.
import great_expectations as gx
context = gx.get_context()
# 기대치 정의
validator = context.sources.pandas_default.read_csv(
"data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])
# 검증 실행
results = validator.validate()
if not results.success:
raise ValueError("Data validation failed!")
4. 모델 학습 인프라
GPU 클러스터 구성
대규모 모델 학습에는 GPU 클러스터가 필수다.
Kubernetes + GPU 노드풀
apiVersion: v1
kind: Pod
metadata:
name: training-job
spec:
containers:
- name: trainer
image: training-image:latest
resources:
limits:
nvidia.com/gpu: 4
volumeMounts:
- name: dataset
mountPath: /data
nodeSelector:
gpu-type: a100
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
분산 학습
단일 GPU로 감당할 수 없는 모델은 분산 학습이 필요하다.
PyTorch DDP (Distributed Data Parallel)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def train(rank, world_size):
setup(rank, world_size)
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
Kubeflow Training Operator
Kubeflow를 사용하면 Kubernetes 위에서 분산 학습을 선언적으로 관리할 수 있다.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
5. 모델 서빙
서빙 아키텍처 비교
| 프레임워크 | 장점 | 단점 | 적합한 경우 |
|---|---|---|---|
| TorchServe | PyTorch 네이티브, 관리 쉬움 | 성능 제한적 | 소규모 PyTorch 모델 |
| Triton | 멀티 프레임워크, 고성능 | 설정 복잡 | 대규모 프로덕션 |
| vLLM | LLM 특화, PagedAttention | LLM 전용 | LLM 서빙 |
| FastAPI | 유연성 최고 | 직접 구현 필요 | 커스텀 로직 필요 시 |
TorchServe
# 모델 아카이브 생성
torch-model-archiver \
--model-name resnet50 \
--version 1.0 \
--model-file model.py \
--serialized-file model.pth \
--handler image_classifier
# 서버 시작
torchserve --start \
--model-store model_store \
--models resnet50=resnet50.mar
NVIDIA Triton Inference Server
Triton은 다양한 프레임워크의 모델을 동시에 서빙할 수 있다.
# 모델 저장소 구조
model_repository/
resnet50/
config.pbtxt
1/
model.onnx
bert_base/
config.pbtxt
1/
model.plan
config.pbtxt 설정
name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
{
name: "input"
data_type: TYPE_FP32
dims: [3, 224, 224]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [1000]
}
]
dynamic_batching {
preferred_batch_size: [8, 16, 32]
max_queue_delay_microseconds: 100
}
instance_group [
{
count: 2
kind: KIND_GPU
}
]
FastAPI 래핑
간단한 모델 서빙에는 FastAPI가 적합하다.
from fastapi import FastAPI
import torch
from pydantic import BaseModel
app = FastAPI()
# 모델 로드 (앱 시작 시 1회)
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
confidence: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
tensor = torch.tensor([request.features])
with torch.no_grad():
output = model(tensor)
prob = torch.softmax(output, dim=1)
prediction = torch.argmax(prob, dim=1).item()
confidence = prob[0][prediction].item()
return PredictionResponse(
prediction=prediction,
confidence=confidence
)
@app.get("/health")
async def health():
return {"status": "healthy"}
6. LLM 서빙 특별편
vLLM - 고성능 LLM 서빙
vLLM은 PagedAttention 기술을 사용하여 LLM 서빙 성능을 극대화한다.
from vllm import LLM, SamplingParams
# 모델 로드
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=2,
gpu_memory_utilization=0.9,
max_model_len=8192,
)
# 추론
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=512,
)
outputs = llm.generate(
["Explain MLOps in simple terms."],
sampling_params,
)
print(outputs[0].outputs[0].text)
vLLM OpenAI 호환 서버
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-8B-Instruct \
--tensor-parallel-size 2 \
--port 8000
# OpenAI SDK로 호출 가능
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[
{"role": "user", "content": "What is MLOps?"}
],
max_tokens=256,
)
Text Generation Inference (TGI)
HuggingFace의 TGI는 Rust 기반으로 안정적이다.
docker run --gpus all \
-p 8080:80 \
-v /data:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-3.1-8B-Instruct \
--quantize gptq \
--max-input-length 4096 \
--max-total-tokens 8192
양자화 기법 비교
LLM 배포 시 모델 크기를 줄이는 양자화는 필수다.
| 기법 | 크기 감소 | 품질 손실 | 속도 향상 | 비고 |
|---|---|---|---|---|
| GPTQ | 4x | 낮음 | 2-3x | GPU 최적화, 정확도 우수 |
| AWQ | 4x | 매우 낮음 | 2-3x | Activation-aware, GPTQ보다 품질 좋음 |
| GGUF | 2-8x | 가변적 | CPU 가능 | llama.cpp용, CPU/GPU 혼합 지원 |
| BitsAndBytes | 2-4x | 낮음 | 1.5x | 간편한 적용, QLoRA 학습 가능 |
# BitsAndBytes 4비트 양자화
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.1-70B-Instruct",
quantization_config=quantization_config,
device_map="auto",
)
Ollama - 로컬 LLM
개발 환경이나 소규모 배포에는 Ollama가 간편하다.
# 모델 다운로드 및 실행
ollama pull llama3.1
ollama run llama3.1
# API 서버 모드
ollama serve
import requests
response = requests.post("http://localhost:11434/api/generate", json={
"model": "llama3.1",
"prompt": "Explain MLOps briefly.",
"stream": False,
})
print(response.json()["response"])
7. CI/CD for ML
모델 테스트 전략
ML 모델에는 일반 소프트웨어와 다른 테스트가 필요하다.
import pytest
class TestModelQuality:
"""모델 품질 테스트"""
def test_accuracy_threshold(self, model, test_data):
"""정확도가 기준치 이상인지 확인"""
accuracy = evaluate(model, test_data)
assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"
def test_latency(self, model, sample_input):
"""추론 지연 시간 확인"""
import time
start = time.time()
model.predict(sample_input)
latency = time.time() - start
assert latency < 0.1, f"Latency {latency}s exceeds 100ms"
def test_no_bias(self, model, fairness_data):
"""공정성 검증"""
results_group_a = model.predict(fairness_data["group_a"])
results_group_b = model.predict(fairness_data["group_b"])
disparity = abs(results_group_a.mean() - results_group_b.mean())
assert disparity < 0.05, f"Bias detected: disparity={disparity}"
def test_model_size(self, model_path):
"""모델 크기 제한"""
import os
size_mb = os.path.getsize(model_path) / (1024 * 1024)
assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"
GitHub Actions로 ML 파이프라인
name: ML Pipeline
on:
push:
paths:
- "src/**"
- "data/**"
- "configs/**"
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data
run: python scripts/validate_data.py
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train model
run: python scripts/train.py --config configs/production.yaml
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: outputs/model/
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download model
uses: actions/download-artifact@v4
with:
name: model
- name: Run evaluation
run: python scripts/evaluate.py
- name: Check quality gates
run: python scripts/quality_gate.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: ./scripts/deploy.sh staging
- name: Smoke test
run: python scripts/smoke_test.py
- name: Deploy to production
run: ./scripts/deploy.sh production
데이터 검증 파이프라인
def validate_training_data(data_path: str) -> bool:
"""학습 데이터의 품질을 검증하는 파이프라인"""
df = pd.read_parquet(data_path)
checks = [
# 데이터 크기 확인
len(df) >= 10000,
# 필수 컬럼 존재
all(col in df.columns for col in REQUIRED_COLUMNS),
# null 비율 확인
df.isnull().mean().max() < 0.05,
# 레이블 분포 확인
df["label"].value_counts(normalize=True).min() > 0.1,
# 중복 제거
df.duplicated().mean() < 0.01,
]
if not all(checks):
failed = [i for i, c in enumerate(checks) if not c]
raise ValueError(f"Data validation failed at checks: {failed}")
return True
8. 모니터링
모델 성능 모니터링
배포 후 모델 성능은 시간이 지나면서 저하될 수 있다.
from prometheus_client import Gauge, Histogram, Counter
# 메트릭 정의
prediction_latency = Histogram(
"model_prediction_latency_seconds",
"Prediction latency in seconds",
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
prediction_count = Counter(
"model_prediction_total",
"Total number of predictions",
["model_version", "status"]
)
model_accuracy = Gauge(
"model_accuracy",
"Current model accuracy",
["model_name"]
)
# 사용
import time
def predict_with_monitoring(model, input_data):
start = time.time()
try:
result = model.predict(input_data)
prediction_count.labels(
model_version="v2.1",
status="success"
).inc()
return result
except Exception as e:
prediction_count.labels(
model_version="v2.1",
status="error"
).inc()
raise
finally:
latency = time.time() - start
prediction_latency.observe(latency)
데이터 드리프트 감지
입력 데이터의 분포가 변하면 모델 성능이 저하된다.
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# 드리프트 리포트 생성
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3"],
categorical_features=["category", "region"],
)
report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping,
)
# 드리프트 감지 결과 확인
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
trigger_retraining_pipeline()
개념 드리프트 (Concept Drift)
데이터 분포뿐 아니라 입력과 출력의 관계 자체가 변할 수 있다. 예를 들어 코로나 이전과 이후의 소비 패턴은 완전히 달라졌다.
class ConceptDriftDetector:
"""PSI(Population Stability Index) 기반 드리프트 감지"""
def __init__(self, threshold=0.2):
self.threshold = threshold
def calculate_psi(self, expected, actual, bins=10):
"""PSI 계산"""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)
# 0 방지
expected_counts = np.clip(expected_counts, 1e-6, None)
actual_counts = np.clip(actual_counts, 1e-6, None)
psi = np.sum(
(actual_counts - expected_counts) *
np.log(actual_counts / expected_counts)
)
return psi
def check_drift(self, reference_predictions, current_predictions):
psi = self.calculate_psi(reference_predictions, current_predictions)
if psi > self.threshold:
return {"drift_detected": True, "psi": psi, "action": "retrain"}
elif psi > self.threshold / 2:
return {"drift_detected": False, "psi": psi, "action": "monitor"}
else:
return {"drift_detected": False, "psi": psi, "action": "none"}
Grafana 대시보드 구성
모니터링 메트릭을 시각화하려면 Grafana가 적합하다. Prometheus에서 수집한 메트릭을 대시보드로 구성할 수 있다.
주요 모니터링 패널 구성:
- 요청 처리량: 초당 예측 요청 수
- 지연 시간 분포: p50, p95, p99 레이턴시
- 오류율: 예측 실패 비율
- 모델 정확도: 실시간 성능 메트릭
- 데이터 드리프트 점수: PSI, KL Divergence
- 리소스 사용량: GPU 메모리, CPU, 네트워크
9. A/B 테스트와 배포 전략
카나리 배포 (Canary Deployment)
새 모델을 소량의 트래픽에만 먼저 적용하는 전략이다.
# Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-serving
spec:
hosts:
- model-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: model-service
subset: v2
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
섀도 모드 (Shadow Mode)
새 모델이 실제 트래픽으로 추론하되, 결과는 저장만 하고 사용자에게 보내지 않는다. 기존 모델의 결과와 비교하여 안전하게 검증할 수 있다.
class ShadowModelRouter:
def __init__(self, primary_model, shadow_model):
self.primary = primary_model
self.shadow = shadow_model
async def predict(self, input_data):
# 주 모델 결과 (사용자에게 반환)
primary_result = await self.primary.predict(input_data)
# 섀도 모델 결과 (비교용 저장)
try:
shadow_result = await self.shadow.predict(input_data)
await self.log_comparison(
input_data, primary_result, shadow_result
)
except Exception:
pass # 섀도 모델 오류는 무시
return primary_result
async def log_comparison(self, input_data, primary, shadow):
comparison = {
"timestamp": datetime.utcnow().isoformat(),
"input_hash": hash(str(input_data)),
"primary_prediction": primary,
"shadow_prediction": shadow,
"agreement": primary == shadow,
}
await self.metrics_store.insert(comparison)
멀티암 밴딧 (Multi-Armed Bandit)
A/B 테스트보다 효율적인 동적 트래픽 할당 방법이다. 성능이 좋은 모델에 더 많은 트래픽을 자동으로 배분한다.
import numpy as np
class ThompsonSamplingRouter:
"""톰슨 샘플링 기반 모델 라우터"""
def __init__(self, model_names):
self.models = model_names
# Beta 분포의 파라미터 (alpha, beta)
self.successes = {name: 1 for name in model_names}
self.failures = {name: 1 for name in model_names}
def select_model(self):
"""톰슨 샘플링으로 모델 선택"""
samples = {}
for name in self.models:
samples[name] = np.random.beta(
self.successes[name],
self.failures[name]
)
return max(samples, key=samples.get)
def update(self, model_name, success: bool):
"""결과에 따라 분포 업데이트"""
if success:
self.successes[model_name] += 1
else:
self.failures[model_name] += 1
def get_allocation(self):
"""현재 트래픽 비율 확인"""
total = sum(self.successes[m] + self.failures[m] for m in self.models)
return {
m: (self.successes[m] + self.failures[m]) / total
for m in self.models
}
10. 비용 최적화
스팟 인스턴스 활용
학습 작업은 중단 가능하므로 스팟 인스턴스로 비용을 70% 이상 절약할 수 있다.
# AWS SageMaker 스팟 학습
import sagemaker
estimator = sagemaker.estimator.Estimator(
image_uri="my-training-image:latest",
role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
instance_count=4,
instance_type="ml.p4d.24xlarge",
use_spot_instances=True,
max_wait=7200, # 최대 대기 시간
max_run=3600, # 최대 실행 시간
checkpoint_s3_uri="s3://bucket/checkpoints/",
)
estimator.fit({"training": "s3://bucket/data/"})
핵심은 체크포인트 저장이다. 스팟 인스턴스가 중단되어도 체크포인트에서 재개할 수 있어야 한다.
모델 경량화
프로덕션 서빙 비용을 줄이는 방법들이다.
지식 증류 (Knowledge Distillation)
class DistillationTrainer:
def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha
def distillation_loss(self, student_logits, teacher_logits, labels):
# Soft target loss (KD loss)
soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
soft_targets,
reduction="batchmean"
) * (self.temperature ** 2)
# Hard target loss
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
ONNX 변환으로 추론 최적화
import torch
import onnx
# PyTorch -> ONNX 변환
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=17,
)
# ONNX Runtime으로 추론
import onnxruntime as ort
session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})
배치 추론 vs 실시간 추론
| 구분 | 배치 추론 | 실시간 추론 |
|---|---|---|
| 지연 시간 | 분~시간 | 밀리초~초 |
| 처리량 | 매우 높음 | 중간 |
| 비용 | 낮음 (스팟 인스턴스) | 높음 (상시 가동) |
| 적합한 경우 | 추천 시스템, 리포트 | 챗봇, 검색 |
# 배치 추론 예시 (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
# UDF로 모델 추론 등록
@udf("float")
def predict_udf(features):
return float(model.predict([features])[0])
# 대규모 데이터에 대해 배치 추론
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")
11. 실전: LLM RAG 파이프라인 구축
RAG(Retrieval-Augmented Generation)는 LLM의 한계를 극복하는 핵심 패턴이다. 외부 지식 베이스를 검색하여 LLM에 컨텍스트로 제공한다.
전체 아키텍처
- 문서 수집 - 다양한 소스에서 데이터 수집
- 청킹 - 문서를 적절한 크기로 분할
- 임베딩 - 텍스트를 벡터로 변환
- 벡터 DB 저장 - 벡터 인덱스에 저장
- 검색 - 질의에 대해 유사한 문서 검색
- 생성 - 검색 결과를 컨텍스트로 LLM에 전달
문서 수집 및 청킹
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
PyPDFLoader,
WebBaseLoader,
NotionDirectoryLoader,
)
# 다양한 소스에서 문서 로드
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()
all_docs = pdf_docs + web_docs + notion_docs
# 청킹
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "],
)
chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")
임베딩 및 벡터 DB
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
# 임베딩 모델
embedding_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={"device": "cuda"},
encode_kwargs={"normalize_embeddings": True},
)
# Chroma 벡터 DB에 저장
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embedding_model,
persist_directory="./chroma_db",
collection_name="knowledge_base",
)
# 유사도 검색
results = vectorstore.similarity_search(
"How to deploy a model to production?",
k=5,
)
RAG 파이프라인 조립
from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate
# LLM 설정
llm = VLLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_new_tokens=512,
temperature=0.1,
)
# 프롬프트 템플릿
prompt_template = PromptTemplate(
template="""다음 컨텍스트를 참고하여 질문에 답변하세요.
컨텍스트에 없는 정보는 "모르겠습니다"라고 답하세요.
컨텍스트:
{context}
질문: {question}
답변:""",
input_variables=["context", "question"],
)
# RAG 체인 구성
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
),
chain_type_kwargs={"prompt": prompt_template},
return_source_documents=True,
)
# 질의
result = rag_chain.invoke("프로덕션 배포 절차가 어떻게 되나요?")
print(result["result"])
for doc in result["source_documents"]:
print(f" Source: {doc.metadata['source']}")
프로덕션 RAG 서빙
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
question: str
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[str]
latency_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
import time
start = time.time()
result = rag_chain.invoke(request.question)
sources = list(set(
doc.metadata.get("source", "unknown")
for doc in result["source_documents"]
))
latency = (time.time() - start) * 1000
return QueryResponse(
answer=result["result"],
sources=sources,
latency_ms=round(latency, 2),
)
마치며
MLOps는 ML 모델을 프로덕션에서 안정적으로 운영하기 위한 필수 분야다.
핵심 포인트를 정리하면 다음과 같다.
- 실험 관리부터 시작하라 - MLflow 하나만 도입해도 큰 변화가 생긴다.
- 자동화에 투자하라 - 수동 프로세스는 반드시 실패한다.
- 모니터링은 필수다 - 배포 후가 진짜 시작이다.
- 작게 시작하라 - Google Level 0에서 1으로 가는 것이 가장 중요하다.
- LLM 시대에 맞춰 진화하라 - vLLM, RAG 등 새로운 도구를 적극 활용하라.
모델을 학습하는 것은 전체의 20%에 불과하다. 나머지 80%는 데이터 파이프라인, 서빙, 모니터링, 운영이다. MLOps는 바로 그 80%를 체계적으로 관리하는 방법론이다.
MLOps 셀프 체크리스트
실험 관리
- 모든 실험의 하이퍼파라미터와 메트릭을 추적하고 있는가?
- 실험 결과를 재현할 수 있는가?
데이터 파이프라인
- 데이터 버전 관리를 하고 있는가?
- 데이터 품질 검증이 자동화되어 있는가?
학습 인프라
- 학습 파이프라인이 자동화되어 있는가?
- 체크포인트를 저장하고 있는가?
서빙
- 모델 서빙의 지연 시간을 모니터링하고 있는가?
- 롤백이 가능한 배포 전략을 쓰고 있는가?
모니터링
- 데이터 드리프트를 감지하고 있는가?
- 모델 성능 저하 시 알림이 오는가?
비용
- 스팟 인스턴스를 활용하고 있는가?
- 모델 경량화를 고려했는가?
The Complete MLOps & AI Model Deployment Guide — From Training to Serving and Monitoring
Table of Contents
- What Is MLOps
- Experiment Management
- Data Pipelines
- Model Training Infrastructure
- Model Serving
- LLM Serving Special
- CI/CD for ML
- Monitoring
- A/B Testing and Deployment Strategies
- Cost Optimization
- Practical: Building an LLM RAG Pipeline
1. What Is MLOps
When DevOps Meets ML
MLOps is a portmanteau of Machine Learning and Operations. Think of it as applying DevOps principles from software engineering to the machine learning lifecycle.
In traditional software development, you only had to manage code. In ML systems, however, you must manage code, data, and models simultaneously. When data changes, the model changes. When the model changes, the serving approach may need to change too.
The ML Lifecycle
The overall flow of an ML project looks like this:
- Problem Definition - Translate business requirements into an ML problem
- Data Collection and Preprocessing - Build data pipelines
- Feature Engineering - Transform raw data into model-ready features
- Model Training - Experiment with algorithms and hyperparameters
- Model Evaluation - Validate performance with offline metrics
- Model Deployment - Serve in production
- Monitoring - Track performance and detect drift
- Retraining - Update the model when performance degrades
This is not a one-time process but a continuous cycle.
Google's MLOps Maturity Model
Google defines three levels of MLOps maturity.
Level 0 - Manual Process
- Data scientists train models manually in notebooks
- Model deployment is manual and irregular
- No monitoring
- Most teams are at this level
Level 1 - ML Pipeline Automation
- Training pipeline is automated
- Continuous Training implemented
- Experiment tracking system in place
- Model registry in use
Level 2 - CI/CD Pipeline Automation
- CI/CD for the pipeline itself
- Automated testing and validation
- Feature store utilization
- Complete monitoring and alerting
Most organizations can gain tremendous value just by moving from Level 0 to Level 1.
2. Experiment Management
MLflow - The Open-Source ML Platform
MLflow is the de facto standard for ML experiment management. It consists of four core components.
MLflow Tracking - Experiment Logging
import mlflow
mlflow.set_experiment("recommendation-model-v2")
with mlflow.start_run():
# Log hyperparameters
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 64)
mlflow.log_param("epochs", 50)
# Run training
model = train_model(lr=0.001, batch_size=64, epochs=50)
# Log metrics
mlflow.log_metric("accuracy", 0.923)
mlflow.log_metric("f1_score", 0.891)
mlflow.log_metric("auc_roc", 0.956)
# Save model
mlflow.pytorch.log_model(model, "model")
MLflow Models - Model Packaging
# Register in model registry
mlflow.register_model(
"runs:/abc123/model",
"recommendation-model"
)
# Transition stage
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="recommendation-model",
version=3,
stage="Production"
)
Weights and Biases
W&B is a commercial experiment management platform with excellent visualization capabilities.
import wandb
wandb.init(project="image-classification", config={
"learning_rate": 0.001,
"architecture": "ResNet50",
"dataset": "imagenet-subset",
"epochs": 100,
})
for epoch in range(100):
train_loss, val_loss = train_one_epoch(model)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"epoch": epoch
})
Optuna - Hyperparameter Optimization
Manually tuning hyperparameters is inefficient. Optuna performs automated search based on Bayesian optimization.
import optuna
def objective(trial):
lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
n_layers = trial.suggest_int("n_layers", 1, 5)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
model = create_model(n_layers=n_layers, dropout=dropout)
accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)
return accuracy
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")
3. Data Pipelines
Feature Store
A feature store is a centralized repository for ML features. It ensures the same features are used for both training and serving.
Feast Example
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# Feature definition
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
user = Entity(name="user_id", join_keys=["user_id"])
user_features = FeatureView(
name="user_features",
entities=[user],
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="days_since_last_order", dtype=Int64),
],
source=user_source,
)
# Retrieve training data
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
"user_features:days_since_last_order",
],
).to_df()
# Real-time serving query
online_features = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
],
entity_rows=[{"user_id": 12345}],
).to_dict()
DVC - Data Version Control
DVC enables Git-based version control for data.
# Initialize DVC
dvc init
# Track data file
dvc add data/training_data.csv
# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
# Push data
dvc push
# Retrieve a specific version
git checkout v1.0
dvc pull
Data Validation - Great Expectations
Data quality validation is a cornerstone of the pipeline.
import great_expectations as gx
context = gx.get_context()
# Define expectations
validator = context.sources.pandas_default.read_csv(
"data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])
# Run validation
results = validator.validate()
if not results.success:
raise ValueError("Data validation failed!")
4. Model Training Infrastructure
GPU Cluster Setup
GPU clusters are essential for training large-scale models.
Kubernetes + GPU Node Pool
apiVersion: v1
kind: Pod
metadata:
name: training-job
spec:
containers:
- name: trainer
image: training-image:latest
resources:
limits:
nvidia.com/gpu: 4
volumeMounts:
- name: dataset
mountPath: /data
nodeSelector:
gpu-type: a100
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
Distributed Training
Models that cannot fit on a single GPU require distributed training.
PyTorch DDP (Distributed Data Parallel)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def train(rank, world_size):
setup(rank, world_size)
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
Kubeflow Training Operator
Kubeflow lets you declaratively manage distributed training on Kubernetes.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
5. Model Serving
Serving Framework Comparison
| Framework | Strengths | Weaknesses | Best For |
|---|---|---|---|
| TorchServe | Native PyTorch, easy setup | Limited performance | Small PyTorch models |
| Triton | Multi-framework, high perf | Complex config | Large-scale production |
| vLLM | LLM-optimized, PagedAttention | LLM only | LLM serving |
| FastAPI | Maximum flexibility | Manual implementation | Custom logic needed |
TorchServe
# Create model archive
torch-model-archiver \
--model-name resnet50 \
--version 1.0 \
--model-file model.py \
--serialized-file model.pth \
--handler image_classifier
# Start server
torchserve --start \
--model-store model_store \
--models resnet50=resnet50.mar
NVIDIA Triton Inference Server
Triton can serve models from multiple frameworks simultaneously.
# Model repository structure
model_repository/
resnet50/
config.pbtxt
1/
model.onnx
bert_base/
config.pbtxt
1/
model.plan
config.pbtxt Configuration
name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
{
name: "input"
data_type: TYPE_FP32
dims: [3, 224, 224]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [1000]
}
]
dynamic_batching {
preferred_batch_size: [8, 16, 32]
max_queue_delay_microseconds: 100
}
instance_group [
{
count: 2
kind: KIND_GPU
}
]
FastAPI Wrapping
FastAPI is well-suited for simple model serving scenarios.
from fastapi import FastAPI
import torch
from pydantic import BaseModel
app = FastAPI()
# Load model once at startup
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
confidence: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
tensor = torch.tensor([request.features])
with torch.no_grad():
output = model(tensor)
prob = torch.softmax(output, dim=1)
prediction = torch.argmax(prob, dim=1).item()
confidence = prob[0][prediction].item()
return PredictionResponse(
prediction=prediction,
confidence=confidence
)
@app.get("/health")
async def health():
return {"status": "healthy"}
6. LLM Serving Special
vLLM - High-Performance LLM Serving
vLLM uses PagedAttention technology to maximize LLM serving performance.
from vllm import LLM, SamplingParams
# Load model
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=2,
gpu_memory_utilization=0.9,
max_model_len=8192,
)
# Inference
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=512,
)
outputs = llm.generate(
["Explain MLOps in simple terms."],
sampling_params,
)
print(outputs[0].outputs[0].text)
vLLM OpenAI-Compatible Server
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-8B-Instruct \
--tensor-parallel-size 2 \
--port 8000
# Can call using the OpenAI SDK
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[
{"role": "user", "content": "What is MLOps?"}
],
max_tokens=256,
)
Text Generation Inference (TGI)
HuggingFace's TGI is Rust-based and reliable.
docker run --gpus all \
-p 8080:80 \
-v /data:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-3.1-8B-Instruct \
--quantize gptq \
--max-input-length 4096 \
--max-total-tokens 8192
Quantization Techniques Compared
Quantization is essential for reducing model size when deploying LLMs.
| Technique | Size Reduction | Quality Loss | Speed Gain | Notes |
|---|---|---|---|---|
| GPTQ | 4x | Low | 2-3x | GPU-optimized, good accuracy |
| AWQ | 4x | Very low | 2-3x | Activation-aware, better than GPTQ |
| GGUF | 2-8x | Variable | CPU possible | For llama.cpp, CPU/GPU hybrid |
| BitsAndBytes | 2-4x | Low | 1.5x | Easy to apply, QLoRA training |
# BitsAndBytes 4-bit quantization
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.1-70B-Instruct",
quantization_config=quantization_config,
device_map="auto",
)
Ollama - Local LLM
Ollama is convenient for development environments or small-scale deployments.
# Download and run a model
ollama pull llama3.1
ollama run llama3.1
# API server mode
ollama serve
import requests
response = requests.post("http://localhost:11434/api/generate", json={
"model": "llama3.1",
"prompt": "Explain MLOps briefly.",
"stream": False,
})
print(response.json()["response"])
7. CI/CD for ML
Model Testing Strategy
ML models require different testing approaches compared to traditional software.
import pytest
class TestModelQuality:
"""Model quality tests"""
def test_accuracy_threshold(self, model, test_data):
"""Verify accuracy meets threshold"""
accuracy = evaluate(model, test_data)
assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"
def test_latency(self, model, sample_input):
"""Check inference latency"""
import time
start = time.time()
model.predict(sample_input)
latency = time.time() - start
assert latency < 0.1, f"Latency {latency}s exceeds 100ms"
def test_no_bias(self, model, fairness_data):
"""Fairness verification"""
results_group_a = model.predict(fairness_data["group_a"])
results_group_b = model.predict(fairness_data["group_b"])
disparity = abs(results_group_a.mean() - results_group_b.mean())
assert disparity < 0.05, f"Bias detected: disparity={disparity}"
def test_model_size(self, model_path):
"""Model size limit"""
import os
size_mb = os.path.getsize(model_path) / (1024 * 1024)
assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"
ML Pipeline with GitHub Actions
name: ML Pipeline
on:
push:
paths:
- "src/**"
- "data/**"
- "configs/**"
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data
run: python scripts/validate_data.py
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train model
run: python scripts/train.py --config configs/production.yaml
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: outputs/model/
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download model
uses: actions/download-artifact@v4
with:
name: model
- name: Run evaluation
run: python scripts/evaluate.py
- name: Check quality gates
run: python scripts/quality_gate.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: ./scripts/deploy.sh staging
- name: Smoke test
run: python scripts/smoke_test.py
- name: Deploy to production
run: ./scripts/deploy.sh production
Data Validation Pipeline
def validate_training_data(data_path: str) -> bool:
"""Pipeline to validate training data quality"""
df = pd.read_parquet(data_path)
checks = [
# Check data size
len(df) >= 10000,
# Required columns exist
all(col in df.columns for col in REQUIRED_COLUMNS),
# Null ratio check
df.isnull().mean().max() < 0.05,
# Label distribution check
df["label"].value_counts(normalize=True).min() > 0.1,
# Duplicate check
df.duplicated().mean() < 0.01,
]
if not all(checks):
failed = [i for i, c in enumerate(checks) if not c]
raise ValueError(f"Data validation failed at checks: {failed}")
return True
8. Monitoring
Model Performance Monitoring
Model performance can degrade over time after deployment.
from prometheus_client import Gauge, Histogram, Counter
# Define metrics
prediction_latency = Histogram(
"model_prediction_latency_seconds",
"Prediction latency in seconds",
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
prediction_count = Counter(
"model_prediction_total",
"Total number of predictions",
["model_version", "status"]
)
model_accuracy = Gauge(
"model_accuracy",
"Current model accuracy",
["model_name"]
)
# Usage
import time
def predict_with_monitoring(model, input_data):
start = time.time()
try:
result = model.predict(input_data)
prediction_count.labels(
model_version="v2.1",
status="success"
).inc()
return result
except Exception as e:
prediction_count.labels(
model_version="v2.1",
status="error"
).inc()
raise
finally:
latency = time.time() - start
prediction_latency.observe(latency)
Data Drift Detection
When the distribution of input data changes, model performance degrades.
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# Generate drift report
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3"],
categorical_features=["category", "region"],
)
report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping,
)
# Check drift detection results
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
trigger_retraining_pipeline()
Concept Drift
Not only can data distributions change, but the relationship between inputs and outputs itself can shift. For example, consumer behavior patterns changed dramatically before and after COVID.
class ConceptDriftDetector:
"""PSI (Population Stability Index) based drift detection"""
def __init__(self, threshold=0.2):
self.threshold = threshold
def calculate_psi(self, expected, actual, bins=10):
"""Calculate PSI"""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)
# Prevent division by zero
expected_counts = np.clip(expected_counts, 1e-6, None)
actual_counts = np.clip(actual_counts, 1e-6, None)
psi = np.sum(
(actual_counts - expected_counts) *
np.log(actual_counts / expected_counts)
)
return psi
def check_drift(self, reference_predictions, current_predictions):
psi = self.calculate_psi(reference_predictions, current_predictions)
if psi > self.threshold:
return {"drift_detected": True, "psi": psi, "action": "retrain"}
elif psi > self.threshold / 2:
return {"drift_detected": False, "psi": psi, "action": "monitor"}
else:
return {"drift_detected": False, "psi": psi, "action": "none"}
Grafana Dashboard Setup
Grafana is ideal for visualizing monitoring metrics. You can create dashboards from metrics collected by Prometheus.
Key monitoring panels:
- Request Throughput: Prediction requests per second
- Latency Distribution: p50, p95, p99 latency
- Error Rate: Prediction failure ratio
- Model Accuracy: Real-time performance metrics
- Data Drift Score: PSI, KL Divergence
- Resource Usage: GPU memory, CPU, network
9. A/B Testing and Deployment Strategies
Canary Deployment
A strategy that first applies a new model to only a small portion of traffic.
# Traffic splitting with Istio VirtualService
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-serving
spec:
hosts:
- model-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: model-service
subset: v2
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
Shadow Mode
The new model runs inference on real traffic, but results are only stored and not returned to users. This allows safe validation by comparing against the existing model's results.
class ShadowModelRouter:
def __init__(self, primary_model, shadow_model):
self.primary = primary_model
self.shadow = shadow_model
async def predict(self, input_data):
# Primary model result (returned to user)
primary_result = await self.primary.predict(input_data)
# Shadow model result (stored for comparison)
try:
shadow_result = await self.shadow.predict(input_data)
await self.log_comparison(
input_data, primary_result, shadow_result
)
except Exception:
pass # Shadow model errors are ignored
return primary_result
async def log_comparison(self, input_data, primary, shadow):
comparison = {
"timestamp": datetime.utcnow().isoformat(),
"input_hash": hash(str(input_data)),
"primary_prediction": primary,
"shadow_prediction": shadow,
"agreement": primary == shadow,
}
await self.metrics_store.insert(comparison)
Multi-Armed Bandit
A more efficient dynamic traffic allocation method than A/B testing. It automatically routes more traffic to better-performing models.
import numpy as np
class ThompsonSamplingRouter:
"""Thompson Sampling based model router"""
def __init__(self, model_names):
self.models = model_names
# Beta distribution parameters (alpha, beta)
self.successes = {name: 1 for name in model_names}
self.failures = {name: 1 for name in model_names}
def select_model(self):
"""Select a model via Thompson Sampling"""
samples = {}
for name in self.models:
samples[name] = np.random.beta(
self.successes[name],
self.failures[name]
)
return max(samples, key=samples.get)
def update(self, model_name, success: bool):
"""Update the distribution based on results"""
if success:
self.successes[model_name] += 1
else:
self.failures[model_name] += 1
def get_allocation(self):
"""Check current traffic ratios"""
total = sum(self.successes[m] + self.failures[m] for m in self.models)
return {
m: (self.successes[m] + self.failures[m]) / total
for m in self.models
}
10. Cost Optimization
Leveraging Spot Instances
Training jobs are interruptible, so spot instances can save over 70% on costs.
# AWS SageMaker spot training
import sagemaker
estimator = sagemaker.estimator.Estimator(
image_uri="my-training-image:latest",
role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
instance_count=4,
instance_type="ml.p4d.24xlarge",
use_spot_instances=True,
max_wait=7200, # Maximum wait time
max_run=3600, # Maximum run time
checkpoint_s3_uri="s3://bucket/checkpoints/",
)
estimator.fit({"training": "s3://bucket/data/"})
The key is checkpoint saving. You must be able to resume from a checkpoint when a spot instance is interrupted.
Model Compression
Methods to reduce production serving costs:
Knowledge Distillation
class DistillationTrainer:
def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha
def distillation_loss(self, student_logits, teacher_logits, labels):
# Soft target loss (KD loss)
soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
soft_targets,
reduction="batchmean"
) * (self.temperature ** 2)
# Hard target loss
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
ONNX Conversion for Inference Optimization
import torch
import onnx
# PyTorch -> ONNX conversion
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=17,
)
# Inference with ONNX Runtime
import onnxruntime as ort
session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})
Batch Inference vs Real-Time Inference
| Aspect | Batch Inference | Real-Time Inference |
|---|---|---|
| Latency | Minutes to hours | Milliseconds to seconds |
| Throughput | Very high | Medium |
| Cost | Low (spot instances) | High (always on) |
| Best For | Recommendation systems, reports | Chatbots, search |
# Batch inference example (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
# Register model inference as a UDF
@udf("float")
def predict_udf(features):
return float(model.predict([features])[0])
# Run batch inference on large dataset
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")
11. Practical: Building an LLM RAG Pipeline
RAG (Retrieval-Augmented Generation) is a key pattern for overcoming LLM limitations. It searches an external knowledge base and provides context to the LLM.
Overall Architecture
- Document Collection - Gather data from various sources
- Chunking - Split documents into appropriate sizes
- Embedding - Convert text to vectors
- Vector DB Storage - Store in a vector index
- Retrieval - Search for similar documents for a query
- Generation - Pass search results as context to the LLM
Document Collection and Chunking
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
PyPDFLoader,
WebBaseLoader,
NotionDirectoryLoader,
)
# Load documents from various sources
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()
all_docs = pdf_docs + web_docs + notion_docs
# Chunking
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "],
)
chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")
Embedding and Vector DB
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
# Embedding model
embedding_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={"device": "cuda"},
encode_kwargs={"normalize_embeddings": True},
)
# Store in Chroma vector DB
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embedding_model,
persist_directory="./chroma_db",
collection_name="knowledge_base",
)
# Similarity search
results = vectorstore.similarity_search(
"How to deploy a model to production?",
k=5,
)
Assembling the RAG Pipeline
from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate
# LLM setup
llm = VLLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_new_tokens=512,
temperature=0.1,
)
# Prompt template
prompt_template = PromptTemplate(
template="""Answer the question using the following context.
If the information is not in the context, say "I don't know."
Context:
{context}
Question: {question}
Answer:""",
input_variables=["context", "question"],
)
# RAG chain
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
),
chain_type_kwargs={"prompt": prompt_template},
return_source_documents=True,
)
# Query
result = rag_chain.invoke("What is the production deployment procedure?")
print(result["result"])
for doc in result["source_documents"]:
print(f" Source: {doc.metadata['source']}")
Production RAG Serving
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
question: str
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[str]
latency_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
import time
start = time.time()
result = rag_chain.invoke(request.question)
sources = list(set(
doc.metadata.get("source", "unknown")
for doc in result["source_documents"]
))
latency = (time.time() - start) * 1000
return QueryResponse(
answer=result["result"],
sources=sources,
latency_ms=round(latency, 2),
)
Conclusion
MLOps is an essential discipline for reliably operating ML models in production.
Here are the key takeaways:
- Start with experiment management - Even adopting MLflow alone makes a huge difference.
- Invest in automation - Manual processes will inevitably fail.
- Monitoring is non-negotiable - The real work begins after deployment.
- Start small - Going from Google Level 0 to Level 1 is the most important step.
- Evolve for the LLM era - Actively adopt new tools like vLLM and RAG.
Training a model accounts for only 20% of the total effort. The remaining 80% is data pipelines, serving, monitoring, and operations. MLOps is the methodology for systematically managing that 80%.
MLOps Self-Check Checklist
Experiment Management
- Are you tracking all experiment hyperparameters and metrics?
- Can you reproduce experiment results?
Data Pipelines
- Are you version-controlling your data?
- Is data quality validation automated?
Training Infrastructure
- Is your training pipeline automated?
- Are you saving checkpoints?
Serving
- Are you monitoring model serving latency?
- Are you using a deployment strategy that supports rollback?
Monitoring
- Are you detecting data drift?
- Do you get alerts when model performance degrades?
Cost
- Are you leveraging spot instances?
- Have you considered model compression?