- Authors

- Name
- Youngju Kim
- @fjvbn20031
목차
- MLOps란 무엇인가
- 실험 관리
- 데이터 파이프라인
- 모델 학습 인프라
- 모델 서빙
- LLM 서빙 특별편
- CI/CD for ML
- 모니터링
- A/B 테스트와 배포 전략
- 비용 최적화
- 실전: LLM RAG 파이프라인 구축
1. MLOps란 무엇인가
DevOps와 ML의 만남
MLOps는 Machine Learning과 Operations의 합성어다. 소프트웨어 엔지니어링의 DevOps 원칙을 머신러닝 라이프사이클에 적용한 것이라고 보면 된다.
전통적인 소프트웨어 개발에서는 코드만 관리하면 됐다. 하지만 ML 시스템에서는 코드, 데이터, 모델 세 가지를 동시에 관리해야 한다. 데이터가 바뀌면 모델이 바뀌고, 모델이 바뀌면 서빙 방식도 바뀔 수 있다.
ML 라이프사이클
ML 프로젝트의 전체 흐름은 다음과 같다.
- 문제 정의 - 비즈니스 요구사항을 ML 문제로 변환
- 데이터 수집 및 전처리 - 데이터 파이프라인 구축
- 피처 엔지니어링 - 원시 데이터를 모델이 이해할 수 있는 형태로 변환
- 모델 학습 - 다양한 알고리즘과 하이퍼파라미터 실험
- 모델 평가 - 오프라인 메트릭으로 성능 검증
- 모델 배포 - 프로덕션 환경에 서빙
- 모니터링 - 성능 추적과 드리프트 감지
- 재학습 - 성능 저하 시 모델 업데이트
이 과정은 일회성이 아니라 지속적인 순환이다.
Google의 MLOps 성숙도 모델
Google은 MLOps 성숙도를 세 단계로 정의한다.
Level 0 - 수동 프로세스
- 데이터 과학자가 노트북에서 수동으로 학습
- 모델 배포가 수동이고 비정기적
- 모니터링 없음
- 대부분의 팀이 여기에 해당
Level 1 - ML 파이프라인 자동화
- 학습 파이프라인이 자동화됨
- 지속적 학습(Continuous Training) 구현
- 실험 추적 시스템 도입
- 모델 레지스트리 사용
Level 2 - CI/CD 파이프라인 자동화
- 파이프라인 자체의 CI/CD 구현
- 자동화된 테스트와 검증
- 피처 스토어 활용
- 완전한 모니터링과 알림 체계
대부분의 조직이 Level 0에서 Level 1으로 가는 것만으로도 큰 가치를 얻을 수 있다.
2. 실험 관리
MLflow - 오픈소스 ML 플랫폼
MLflow는 ML 실험 관리의 사실상 표준이다. 네 가지 핵심 컴포넌트로 구성된다.
MLflow Tracking - 실험 기록
import mlflow
mlflow.set_experiment("recommendation-model-v2")
with mlflow.start_run():
# 하이퍼파라미터 기록
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 64)
mlflow.log_param("epochs", 50)
# 학습 실행
model = train_model(lr=0.001, batch_size=64, epochs=50)
# 메트릭 기록
mlflow.log_metric("accuracy", 0.923)
mlflow.log_metric("f1_score", 0.891)
mlflow.log_metric("auc_roc", 0.956)
# 모델 저장
mlflow.pytorch.log_model(model, "model")
MLflow Models - 모델 패키징
# 모델 레지스트리에 등록
mlflow.register_model(
"runs:/abc123/model",
"recommendation-model"
)
# 스테이지 전환
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="recommendation-model",
version=3,
stage="Production"
)
Weights and Biases
W&B는 상용 실험 관리 플랫폼으로, 시각화 기능이 뛰어나다.
import wandb
wandb.init(project="image-classification", config={
"learning_rate": 0.001,
"architecture": "ResNet50",
"dataset": "imagenet-subset",
"epochs": 100,
})
for epoch in range(100):
train_loss, val_loss = train_one_epoch(model)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"epoch": epoch
})
Optuna - 하이퍼파라미터 최적화
수동으로 하이퍼파라미터를 조정하는 것은 비효율적이다. Optuna는 베이지안 최적화 기반으로 자동 탐색을 수행한다.
import optuna
def objective(trial):
lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
n_layers = trial.suggest_int("n_layers", 1, 5)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
model = create_model(n_layers=n_layers, dropout=dropout)
accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)
return accuracy
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")
3. 데이터 파이프라인
Feature Store
피처 스토어는 ML 피처의 중앙 저장소다. 학습과 서빙에서 동일한 피처를 사용할 수 있게 해준다.
Feast 예시
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# 피처 정의
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
user = Entity(name="user_id", join_keys=["user_id"])
user_features = FeatureView(
name="user_features",
entities=[user],
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="days_since_last_order", dtype=Int64),
],
source=user_source,
)
# 학습 데이터 조회
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
"user_features:days_since_last_order",
],
).to_df()
# 실시간 서빙용 조회
online_features = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
],
entity_rows=[{"user_id": 12345}],
).to_dict()
DVC - 데이터 버저닝
DVC(Data Version Control)는 Git으로 데이터를 버전 관리할 수 있게 한다.
# DVC 초기화
dvc init
# 데이터 파일 추적
dvc add data/training_data.csv
# 리모트 스토리지 설정
dvc remote add -d myremote s3://my-bucket/dvc-storage
# 데이터 푸시
dvc push
# 특정 버전의 데이터 가져오기
git checkout v1.0
dvc pull
데이터 검증 - Great Expectations
데이터 품질 검증은 파이프라인의 핵심이다.
import great_expectations as gx
context = gx.get_context()
# 기대치 정의
validator = context.sources.pandas_default.read_csv(
"data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])
# 검증 실행
results = validator.validate()
if not results.success:
raise ValueError("Data validation failed!")
4. 모델 학습 인프라
GPU 클러스터 구성
대규모 모델 학습에는 GPU 클러스터가 필수다.
Kubernetes + GPU 노드풀
apiVersion: v1
kind: Pod
metadata:
name: training-job
spec:
containers:
- name: trainer
image: training-image:latest
resources:
limits:
nvidia.com/gpu: 4
volumeMounts:
- name: dataset
mountPath: /data
nodeSelector:
gpu-type: a100
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
분산 학습
단일 GPU로 감당할 수 없는 모델은 분산 학습이 필요하다.
PyTorch DDP (Distributed Data Parallel)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def train(rank, world_size):
setup(rank, world_size)
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
Kubeflow Training Operator
Kubeflow를 사용하면 Kubernetes 위에서 분산 학습을 선언적으로 관리할 수 있다.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
5. 모델 서빙
서빙 아키텍처 비교
| 프레임워크 | 장점 | 단점 | 적합한 경우 |
|---|---|---|---|
| TorchServe | PyTorch 네이티브, 관리 쉬움 | 성능 제한적 | 소규모 PyTorch 모델 |
| Triton | 멀티 프레임워크, 고성능 | 설정 복잡 | 대규모 프로덕션 |
| vLLM | LLM 특화, PagedAttention | LLM 전용 | LLM 서빙 |
| FastAPI | 유연성 최고 | 직접 구현 필요 | 커스텀 로직 필요 시 |
TorchServe
# 모델 아카이브 생성
torch-model-archiver \
--model-name resnet50 \
--version 1.0 \
--model-file model.py \
--serialized-file model.pth \
--handler image_classifier
# 서버 시작
torchserve --start \
--model-store model_store \
--models resnet50=resnet50.mar
NVIDIA Triton Inference Server
Triton은 다양한 프레임워크의 모델을 동시에 서빙할 수 있다.
# 모델 저장소 구조
model_repository/
resnet50/
config.pbtxt
1/
model.onnx
bert_base/
config.pbtxt
1/
model.plan
config.pbtxt 설정
name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
{
name: "input"
data_type: TYPE_FP32
dims: [3, 224, 224]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [1000]
}
]
dynamic_batching {
preferred_batch_size: [8, 16, 32]
max_queue_delay_microseconds: 100
}
instance_group [
{
count: 2
kind: KIND_GPU
}
]
FastAPI 래핑
간단한 모델 서빙에는 FastAPI가 적합하다.
from fastapi import FastAPI
import torch
from pydantic import BaseModel
app = FastAPI()
# 모델 로드 (앱 시작 시 1회)
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
confidence: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
tensor = torch.tensor([request.features])
with torch.no_grad():
output = model(tensor)
prob = torch.softmax(output, dim=1)
prediction = torch.argmax(prob, dim=1).item()
confidence = prob[0][prediction].item()
return PredictionResponse(
prediction=prediction,
confidence=confidence
)
@app.get("/health")
async def health():
return {"status": "healthy"}
6. LLM 서빙 특별편
vLLM - 고성능 LLM 서빙
vLLM은 PagedAttention 기술을 사용하여 LLM 서빙 성능을 극대화한다.
from vllm import LLM, SamplingParams
# 모델 로드
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=2,
gpu_memory_utilization=0.9,
max_model_len=8192,
)
# 추론
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=512,
)
outputs = llm.generate(
["Explain MLOps in simple terms."],
sampling_params,
)
print(outputs[0].outputs[0].text)
vLLM OpenAI 호환 서버
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-8B-Instruct \
--tensor-parallel-size 2 \
--port 8000
# OpenAI SDK로 호출 가능
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[
{"role": "user", "content": "What is MLOps?"}
],
max_tokens=256,
)
Text Generation Inference (TGI)
HuggingFace의 TGI는 Rust 기반으로 안정적이다.
docker run --gpus all \
-p 8080:80 \
-v /data:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-3.1-8B-Instruct \
--quantize gptq \
--max-input-length 4096 \
--max-total-tokens 8192
양자화 기법 비교
LLM 배포 시 모델 크기를 줄이는 양자화는 필수다.
| 기법 | 크기 감소 | 품질 손실 | 속도 향상 | 비고 |
|---|---|---|---|---|
| GPTQ | 4x | 낮음 | 2-3x | GPU 최적화, 정확도 우수 |
| AWQ | 4x | 매우 낮음 | 2-3x | Activation-aware, GPTQ보다 품질 좋음 |
| GGUF | 2-8x | 가변적 | CPU 가능 | llama.cpp용, CPU/GPU 혼합 지원 |
| BitsAndBytes | 2-4x | 낮음 | 1.5x | 간편한 적용, QLoRA 학습 가능 |
# BitsAndBytes 4비트 양자화
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.1-70B-Instruct",
quantization_config=quantization_config,
device_map="auto",
)
Ollama - 로컬 LLM
개발 환경이나 소규모 배포에는 Ollama가 간편하다.
# 모델 다운로드 및 실행
ollama pull llama3.1
ollama run llama3.1
# API 서버 모드
ollama serve
import requests
response = requests.post("http://localhost:11434/api/generate", json={
"model": "llama3.1",
"prompt": "Explain MLOps briefly.",
"stream": False,
})
print(response.json()["response"])
7. CI/CD for ML
모델 테스트 전략
ML 모델에는 일반 소프트웨어와 다른 테스트가 필요하다.
import pytest
class TestModelQuality:
"""모델 품질 테스트"""
def test_accuracy_threshold(self, model, test_data):
"""정확도가 기준치 이상인지 확인"""
accuracy = evaluate(model, test_data)
assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"
def test_latency(self, model, sample_input):
"""추론 지연 시간 확인"""
import time
start = time.time()
model.predict(sample_input)
latency = time.time() - start
assert latency < 0.1, f"Latency {latency}s exceeds 100ms"
def test_no_bias(self, model, fairness_data):
"""공정성 검증"""
results_group_a = model.predict(fairness_data["group_a"])
results_group_b = model.predict(fairness_data["group_b"])
disparity = abs(results_group_a.mean() - results_group_b.mean())
assert disparity < 0.05, f"Bias detected: disparity={disparity}"
def test_model_size(self, model_path):
"""모델 크기 제한"""
import os
size_mb = os.path.getsize(model_path) / (1024 * 1024)
assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"
GitHub Actions로 ML 파이프라인
name: ML Pipeline
on:
push:
paths:
- "src/**"
- "data/**"
- "configs/**"
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data
run: python scripts/validate_data.py
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train model
run: python scripts/train.py --config configs/production.yaml
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: outputs/model/
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download model
uses: actions/download-artifact@v4
with:
name: model
- name: Run evaluation
run: python scripts/evaluate.py
- name: Check quality gates
run: python scripts/quality_gate.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: ./scripts/deploy.sh staging
- name: Smoke test
run: python scripts/smoke_test.py
- name: Deploy to production
run: ./scripts/deploy.sh production
데이터 검증 파이프라인
def validate_training_data(data_path: str) -> bool:
"""학습 데이터의 품질을 검증하는 파이프라인"""
df = pd.read_parquet(data_path)
checks = [
# 데이터 크기 확인
len(df) >= 10000,
# 필수 컬럼 존재
all(col in df.columns for col in REQUIRED_COLUMNS),
# null 비율 확인
df.isnull().mean().max() < 0.05,
# 레이블 분포 확인
df["label"].value_counts(normalize=True).min() > 0.1,
# 중복 제거
df.duplicated().mean() < 0.01,
]
if not all(checks):
failed = [i for i, c in enumerate(checks) if not c]
raise ValueError(f"Data validation failed at checks: {failed}")
return True
8. 모니터링
모델 성능 모니터링
배포 후 모델 성능은 시간이 지나면서 저하될 수 있다.
from prometheus_client import Gauge, Histogram, Counter
# 메트릭 정의
prediction_latency = Histogram(
"model_prediction_latency_seconds",
"Prediction latency in seconds",
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
prediction_count = Counter(
"model_prediction_total",
"Total number of predictions",
["model_version", "status"]
)
model_accuracy = Gauge(
"model_accuracy",
"Current model accuracy",
["model_name"]
)
# 사용
import time
def predict_with_monitoring(model, input_data):
start = time.time()
try:
result = model.predict(input_data)
prediction_count.labels(
model_version="v2.1",
status="success"
).inc()
return result
except Exception as e:
prediction_count.labels(
model_version="v2.1",
status="error"
).inc()
raise
finally:
latency = time.time() - start
prediction_latency.observe(latency)
데이터 드리프트 감지
입력 데이터의 분포가 변하면 모델 성능이 저하된다.
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# 드리프트 리포트 생성
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3"],
categorical_features=["category", "region"],
)
report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping,
)
# 드리프트 감지 결과 확인
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
trigger_retraining_pipeline()
개념 드리프트 (Concept Drift)
데이터 분포뿐 아니라 입력과 출력의 관계 자체가 변할 수 있다. 예를 들어 코로나 이전과 이후의 소비 패턴은 완전히 달라졌다.
class ConceptDriftDetector:
"""PSI(Population Stability Index) 기반 드리프트 감지"""
def __init__(self, threshold=0.2):
self.threshold = threshold
def calculate_psi(self, expected, actual, bins=10):
"""PSI 계산"""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)
# 0 방지
expected_counts = np.clip(expected_counts, 1e-6, None)
actual_counts = np.clip(actual_counts, 1e-6, None)
psi = np.sum(
(actual_counts - expected_counts) *
np.log(actual_counts / expected_counts)
)
return psi
def check_drift(self, reference_predictions, current_predictions):
psi = self.calculate_psi(reference_predictions, current_predictions)
if psi > self.threshold:
return {"drift_detected": True, "psi": psi, "action": "retrain"}
elif psi > self.threshold / 2:
return {"drift_detected": False, "psi": psi, "action": "monitor"}
else:
return {"drift_detected": False, "psi": psi, "action": "none"}
Grafana 대시보드 구성
모니터링 메트릭을 시각화하려면 Grafana가 적합하다. Prometheus에서 수집한 메트릭을 대시보드로 구성할 수 있다.
주요 모니터링 패널 구성:
- 요청 처리량: 초당 예측 요청 수
- 지연 시간 분포: p50, p95, p99 레이턴시
- 오류율: 예측 실패 비율
- 모델 정확도: 실시간 성능 메트릭
- 데이터 드리프트 점수: PSI, KL Divergence
- 리소스 사용량: GPU 메모리, CPU, 네트워크
9. A/B 테스트와 배포 전략
카나리 배포 (Canary Deployment)
새 모델을 소량의 트래픽에만 먼저 적용하는 전략이다.
# Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-serving
spec:
hosts:
- model-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: model-service
subset: v2
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
섀도 모드 (Shadow Mode)
새 모델이 실제 트래픽으로 추론하되, 결과는 저장만 하고 사용자에게 보내지 않는다. 기존 모델의 결과와 비교하여 안전하게 검증할 수 있다.
class ShadowModelRouter:
def __init__(self, primary_model, shadow_model):
self.primary = primary_model
self.shadow = shadow_model
async def predict(self, input_data):
# 주 모델 결과 (사용자에게 반환)
primary_result = await self.primary.predict(input_data)
# 섀도 모델 결과 (비교용 저장)
try:
shadow_result = await self.shadow.predict(input_data)
await self.log_comparison(
input_data, primary_result, shadow_result
)
except Exception:
pass # 섀도 모델 오류는 무시
return primary_result
async def log_comparison(self, input_data, primary, shadow):
comparison = {
"timestamp": datetime.utcnow().isoformat(),
"input_hash": hash(str(input_data)),
"primary_prediction": primary,
"shadow_prediction": shadow,
"agreement": primary == shadow,
}
await self.metrics_store.insert(comparison)
멀티암 밴딧 (Multi-Armed Bandit)
A/B 테스트보다 효율적인 동적 트래픽 할당 방법이다. 성능이 좋은 모델에 더 많은 트래픽을 자동으로 배분한다.
import numpy as np
class ThompsonSamplingRouter:
"""톰슨 샘플링 기반 모델 라우터"""
def __init__(self, model_names):
self.models = model_names
# Beta 분포의 파라미터 (alpha, beta)
self.successes = {name: 1 for name in model_names}
self.failures = {name: 1 for name in model_names}
def select_model(self):
"""톰슨 샘플링으로 모델 선택"""
samples = {}
for name in self.models:
samples[name] = np.random.beta(
self.successes[name],
self.failures[name]
)
return max(samples, key=samples.get)
def update(self, model_name, success: bool):
"""결과에 따라 분포 업데이트"""
if success:
self.successes[model_name] += 1
else:
self.failures[model_name] += 1
def get_allocation(self):
"""현재 트래픽 비율 확인"""
total = sum(self.successes[m] + self.failures[m] for m in self.models)
return {
m: (self.successes[m] + self.failures[m]) / total
for m in self.models
}
10. 비용 최적화
스팟 인스턴스 활용
학습 작업은 중단 가능하므로 스팟 인스턴스로 비용을 70% 이상 절약할 수 있다.
# AWS SageMaker 스팟 학습
import sagemaker
estimator = sagemaker.estimator.Estimator(
image_uri="my-training-image:latest",
role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
instance_count=4,
instance_type="ml.p4d.24xlarge",
use_spot_instances=True,
max_wait=7200, # 최대 대기 시간
max_run=3600, # 최대 실행 시간
checkpoint_s3_uri="s3://bucket/checkpoints/",
)
estimator.fit({"training": "s3://bucket/data/"})
핵심은 체크포인트 저장이다. 스팟 인스턴스가 중단되어도 체크포인트에서 재개할 수 있어야 한다.
모델 경량화
프로덕션 서빙 비용을 줄이는 방법들이다.
지식 증류 (Knowledge Distillation)
class DistillationTrainer:
def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha
def distillation_loss(self, student_logits, teacher_logits, labels):
# Soft target loss (KD loss)
soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
soft_targets,
reduction="batchmean"
) * (self.temperature ** 2)
# Hard target loss
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
ONNX 변환으로 추론 최적화
import torch
import onnx
# PyTorch -> ONNX 변환
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=17,
)
# ONNX Runtime으로 추론
import onnxruntime as ort
session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})
배치 추론 vs 실시간 추론
| 구분 | 배치 추론 | 실시간 추론 |
|---|---|---|
| 지연 시간 | 분~시간 | 밀리초~초 |
| 처리량 | 매우 높음 | 중간 |
| 비용 | 낮음 (스팟 인스턴스) | 높음 (상시 가동) |
| 적합한 경우 | 추천 시스템, 리포트 | 챗봇, 검색 |
# 배치 추론 예시 (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
# UDF로 모델 추론 등록
@udf("float")
def predict_udf(features):
return float(model.predict([features])[0])
# 대규모 데이터에 대해 배치 추론
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")
11. 실전: LLM RAG 파이프라인 구축
RAG(Retrieval-Augmented Generation)는 LLM의 한계를 극복하는 핵심 패턴이다. 외부 지식 베이스를 검색하여 LLM에 컨텍스트로 제공한다.
전체 아키텍처
- 문서 수집 - 다양한 소스에서 데이터 수집
- 청킹 - 문서를 적절한 크기로 분할
- 임베딩 - 텍스트를 벡터로 변환
- 벡터 DB 저장 - 벡터 인덱스에 저장
- 검색 - 질의에 대해 유사한 문서 검색
- 생성 - 검색 결과를 컨텍스트로 LLM에 전달
문서 수집 및 청킹
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
PyPDFLoader,
WebBaseLoader,
NotionDirectoryLoader,
)
# 다양한 소스에서 문서 로드
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()
all_docs = pdf_docs + web_docs + notion_docs
# 청킹
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "],
)
chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")
임베딩 및 벡터 DB
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
# 임베딩 모델
embedding_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={"device": "cuda"},
encode_kwargs={"normalize_embeddings": True},
)
# Chroma 벡터 DB에 저장
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embedding_model,
persist_directory="./chroma_db",
collection_name="knowledge_base",
)
# 유사도 검색
results = vectorstore.similarity_search(
"How to deploy a model to production?",
k=5,
)
RAG 파이프라인 조립
from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate
# LLM 설정
llm = VLLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_new_tokens=512,
temperature=0.1,
)
# 프롬프트 템플릿
prompt_template = PromptTemplate(
template="""다음 컨텍스트를 참고하여 질문에 답변하세요.
컨텍스트에 없는 정보는 "모르겠습니다"라고 답하세요.
컨텍스트:
{context}
질문: {question}
답변:""",
input_variables=["context", "question"],
)
# RAG 체인 구성
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
),
chain_type_kwargs={"prompt": prompt_template},
return_source_documents=True,
)
# 질의
result = rag_chain.invoke("프로덕션 배포 절차가 어떻게 되나요?")
print(result["result"])
for doc in result["source_documents"]:
print(f" Source: {doc.metadata['source']}")
프로덕션 RAG 서빙
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
question: str
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[str]
latency_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
import time
start = time.time()
result = rag_chain.invoke(request.question)
sources = list(set(
doc.metadata.get("source", "unknown")
for doc in result["source_documents"]
))
latency = (time.time() - start) * 1000
return QueryResponse(
answer=result["result"],
sources=sources,
latency_ms=round(latency, 2),
)
마치며
MLOps는 ML 모델을 프로덕션에서 안정적으로 운영하기 위한 필수 분야다.
핵심 포인트를 정리하면 다음과 같다.
- 실험 관리부터 시작하라 - MLflow 하나만 도입해도 큰 변화가 생긴다.
- 자동화에 투자하라 - 수동 프로세스는 반드시 실패한다.
- 모니터링은 필수다 - 배포 후가 진짜 시작이다.
- 작게 시작하라 - Google Level 0에서 1으로 가는 것이 가장 중요하다.
- LLM 시대에 맞춰 진화하라 - vLLM, RAG 등 새로운 도구를 적극 활용하라.
모델을 학습하는 것은 전체의 20%에 불과하다. 나머지 80%는 데이터 파이프라인, 서빙, 모니터링, 운영이다. MLOps는 바로 그 80%를 체계적으로 관리하는 방법론이다.
MLOps 셀프 체크리스트
실험 관리
- 모든 실험의 하이퍼파라미터와 메트릭을 추적하고 있는가?
- 실험 결과를 재현할 수 있는가?
데이터 파이프라인
- 데이터 버전 관리를 하고 있는가?
- 데이터 품질 검증이 자동화되어 있는가?
학습 인프라
- 학습 파이프라인이 자동화되어 있는가?
- 체크포인트를 저장하고 있는가?
서빙
- 모델 서빙의 지연 시간을 모니터링하고 있는가?
- 롤백이 가능한 배포 전략을 쓰고 있는가?
모니터링
- 데이터 드리프트를 감지하고 있는가?
- 모델 성능 저하 시 알림이 오는가?
비용
- 스팟 인스턴스를 활용하고 있는가?
- 모델 경량화를 고려했는가?