목차
1. [MLOps란 무엇인가](#1-mlops란-무엇인가)
2. [실험 관리](#2-실험-관리)
3. [데이터 파이프라인](#3-데이터-파이프라인)
4. [모델 학습 인프라](#4-모델-학습-인프라)
5. [모델 서빙](#5-모델-서빙)
6. [LLM 서빙 특별편](#6-llm-서빙-특별편)
7. [CI/CD for ML](#7-cicd-for-ml)
8. [모니터링](#8-모니터링)
9. [A/B 테스트와 배포 전략](#9-ab-테스트와-배포-전략)
10. [비용 최적화](#10-비용-최적화)
11. [실전: LLM RAG 파이프라인 구축](#11-실전-llm-rag-파이프라인-구축)
1. MLOps란 무엇인가
DevOps와 ML의 만남
MLOps는 Machine Learning과 Operations의 합성어다.
소프트웨어 엔지니어링의 DevOps 원칙을 머신러닝 라이프사이클에 적용한 것이라고 보면 된다.
전통적인 소프트웨어 개발에서는 코드만 관리하면 됐다.
하지만 ML 시스템에서는 **코드, 데이터, 모델** 세 가지를 동시에 관리해야 한다.
데이터가 바뀌면 모델이 바뀌고, 모델이 바뀌면 서빙 방식도 바뀔 수 있다.
ML 라이프사이클
ML 프로젝트의 전체 흐름은 다음과 같다.
1. **문제 정의** - 비즈니스 요구사항을 ML 문제로 변환
2. **데이터 수집 및 전처리** - 데이터 파이프라인 구축
3. **피처 엔지니어링** - 원시 데이터를 모델이 이해할 수 있는 형태로 변환
4. **모델 학습** - 다양한 알고리즘과 하이퍼파라미터 실험
5. **모델 평가** - 오프라인 메트릭으로 성능 검증
6. **모델 배포** - 프로덕션 환경에 서빙
7. **모니터링** - 성능 추적과 드리프트 감지
8. **재학습** - 성능 저하 시 모델 업데이트
이 과정은 일회성이 아니라 **지속적인 순환**이다.
Google의 MLOps 성숙도 모델
Google은 MLOps 성숙도를 세 단계로 정의한다.
**Level 0 - 수동 프로세스**
- 데이터 과학자가 노트북에서 수동으로 학습
- 모델 배포가 수동이고 비정기적
- 모니터링 없음
- 대부분의 팀이 여기에 해당
**Level 1 - ML 파이프라인 자동화**
- 학습 파이프라인이 자동화됨
- 지속적 학습(Continuous Training) 구현
- 실험 추적 시스템 도입
- 모델 레지스트리 사용
**Level 2 - CI/CD 파이프라인 자동화**
- 파이프라인 자체의 CI/CD 구현
- 자동화된 테스트와 검증
- 피처 스토어 활용
- 완전한 모니터링과 알림 체계
대부분의 조직이 Level 0에서 Level 1으로 가는 것만으로도 큰 가치를 얻을 수 있다.
2. 실험 관리
MLflow - 오픈소스 ML 플랫폼
MLflow는 ML 실험 관리의 사실상 표준이다.
네 가지 핵심 컴포넌트로 구성된다.
**MLflow Tracking** - 실험 기록
mlflow.set_experiment("recommendation-model-v2")
with mlflow.start_run():
하이퍼파라미터 기록
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 64)
mlflow.log_param("epochs", 50)
학습 실행
model = train_model(lr=0.001, batch_size=64, epochs=50)
메트릭 기록
mlflow.log_metric("accuracy", 0.923)
mlflow.log_metric("f1_score", 0.891)
mlflow.log_metric("auc_roc", 0.956)
모델 저장
mlflow.pytorch.log_model(model, "model")
**MLflow Models** - 모델 패키징
모델 레지스트리에 등록
mlflow.register_model(
"runs:/abc123/model",
"recommendation-model"
)
스테이지 전환
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="recommendation-model",
version=3,
stage="Production"
)
Weights and Biases
W&B는 상용 실험 관리 플랫폼으로, 시각화 기능이 뛰어나다.
wandb.init(project="image-classification", config={
"learning_rate": 0.001,
"architecture": "ResNet50",
"dataset": "imagenet-subset",
"epochs": 100,
})
for epoch in range(100):
train_loss, val_loss = train_one_epoch(model)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"epoch": epoch
})
Optuna - 하이퍼파라미터 최적화
수동으로 하이퍼파라미터를 조정하는 것은 비효율적이다.
Optuna는 베이지안 최적화 기반으로 자동 탐색을 수행한다.
def objective(trial):
lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
n_layers = trial.suggest_int("n_layers", 1, 5)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
model = create_model(n_layers=n_layers, dropout=dropout)
accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)
return accuracy
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")
3. 데이터 파이프라인
Feature Store
피처 스토어는 ML 피처의 중앙 저장소다.
학습과 서빙에서 동일한 피처를 사용할 수 있게 해준다.
**Feast 예시**
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
피처 정의
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64
user = Entity(name="user_id", join_keys=["user_id"])
user_features = FeatureView(
name="user_features",
entities=[user],
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="days_since_last_order", dtype=Int64),
],
source=user_source,
)
학습 데이터 조회
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
"user_features:days_since_last_order",
],
).to_df()
실시간 서빙용 조회
online_features = store.get_online_features(
features=[
"user_features:total_purchases",
"user_features:avg_order_value",
],
entity_rows=[{"user_id": 12345}],
).to_dict()
DVC - 데이터 버저닝
DVC(Data Version Control)는 Git으로 데이터를 버전 관리할 수 있게 한다.
DVC 초기화
dvc init
데이터 파일 추적
dvc add data/training_data.csv
리모트 스토리지 설정
dvc remote add -d myremote s3://my-bucket/dvc-storage
데이터 푸시
dvc push
특정 버전의 데이터 가져오기
git checkout v1.0
dvc pull
데이터 검증 - Great Expectations
데이터 품질 검증은 파이프라인의 핵심이다.
context = gx.get_context()
기대치 정의
validator = context.sources.pandas_default.read_csv(
"data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])
검증 실행
results = validator.validate()
if not results.success:
raise ValueError("Data validation failed!")
4. 모델 학습 인프라
GPU 클러스터 구성
대규모 모델 학습에는 GPU 클러스터가 필수다.
**Kubernetes + GPU 노드풀**
apiVersion: v1
kind: Pod
metadata:
name: training-job
spec:
containers:
- name: trainer
image: training-image:latest
resources:
limits:
nvidia.com/gpu: 4
volumeMounts:
- name: dataset
mountPath: /data
nodeSelector:
gpu-type: a100
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
분산 학습
단일 GPU로 감당할 수 없는 모델은 분산 학습이 필요하다.
**PyTorch DDP (Distributed Data Parallel)**
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
def train(rank, world_size):
setup(rank, world_size)
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
Kubeflow Training Operator
Kubeflow를 사용하면 Kubernetes 위에서 분산 학습을 선언적으로 관리할 수 있다.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
Worker:
replicas: 3
template:
spec:
containers:
- name: pytorch
image: my-training:latest
resources:
limits:
nvidia.com/gpu: 1
5. 모델 서빙
서빙 아키텍처 비교
| 프레임워크 | 장점 | 단점 | 적합한 경우 |
|-----------|------|------|-----------|
| TorchServe | PyTorch 네이티브, 관리 쉬움 | 성능 제한적 | 소규모 PyTorch 모델 |
| Triton | 멀티 프레임워크, 고성능 | 설정 복잡 | 대규모 프로덕션 |
| vLLM | LLM 특화, PagedAttention | LLM 전용 | LLM 서빙 |
| FastAPI | 유연성 최고 | 직접 구현 필요 | 커스텀 로직 필요 시 |
TorchServe
모델 아카이브 생성
torch-model-archiver \
--model-name resnet50 \
--version 1.0 \
--model-file model.py \
--serialized-file model.pth \
--handler image_classifier
서버 시작
torchserve --start \
--model-store model_store \
--models resnet50=resnet50.mar
NVIDIA Triton Inference Server
Triton은 다양한 프레임워크의 모델을 동시에 서빙할 수 있다.
모델 저장소 구조
model_repository/
resnet50/
config.pbtxt
1/
model.onnx
bert_base/
config.pbtxt
1/
model.plan
**config.pbtxt 설정**
name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
{
name: "input"
data_type: TYPE_FP32
dims: [3, 224, 224]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [1000]
}
]
dynamic_batching {
preferred_batch_size: [8, 16, 32]
max_queue_delay_microseconds: 100
}
instance_group [
{
count: 2
kind: KIND_GPU
}
]
FastAPI 래핑
간단한 모델 서빙에는 FastAPI가 적합하다.
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
모델 로드 (앱 시작 시 1회)
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
confidence: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
tensor = torch.tensor([request.features])
with torch.no_grad():
output = model(tensor)
prob = torch.softmax(output, dim=1)
prediction = torch.argmax(prob, dim=1).item()
confidence = prob[0][prediction].item()
return PredictionResponse(
prediction=prediction,
confidence=confidence
)
@app.get("/health")
async def health():
return {"status": "healthy"}
6. LLM 서빙 특별편
vLLM - 고성능 LLM 서빙
vLLM은 PagedAttention 기술을 사용하여 LLM 서빙 성능을 극대화한다.
from vllm import LLM, SamplingParams
모델 로드
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=2,
gpu_memory_utilization=0.9,
max_model_len=8192,
)
추론
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=512,
)
outputs = llm.generate(
["Explain MLOps in simple terms."],
sampling_params,
)
print(outputs[0].outputs[0].text)
**vLLM OpenAI 호환 서버**
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-8B-Instruct \
--tensor-parallel-size 2 \
--port 8000
OpenAI SDK로 호출 가능
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[
{"role": "user", "content": "What is MLOps?"}
],
max_tokens=256,
)
Text Generation Inference (TGI)
HuggingFace의 TGI는 Rust 기반으로 안정적이다.
docker run --gpus all \
-p 8080:80 \
-v /data:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-3.1-8B-Instruct \
--quantize gptq \
--max-input-length 4096 \
--max-total-tokens 8192
양자화 기법 비교
LLM 배포 시 모델 크기를 줄이는 양자화는 필수다.
| 기법 | 크기 감소 | 품질 손실 | 속도 향상 | 비고 |
|------|----------|----------|----------|------|
| GPTQ | 4x | 낮음 | 2-3x | GPU 최적화, 정확도 우수 |
| AWQ | 4x | 매우 낮음 | 2-3x | Activation-aware, GPTQ보다 품질 좋음 |
| GGUF | 2-8x | 가변적 | CPU 가능 | llama.cpp용, CPU/GPU 혼합 지원 |
| BitsAndBytes | 2-4x | 낮음 | 1.5x | 간편한 적용, QLoRA 학습 가능 |
BitsAndBytes 4비트 양자화
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.1-70B-Instruct",
quantization_config=quantization_config,
device_map="auto",
)
Ollama - 로컬 LLM
개발 환경이나 소규모 배포에는 Ollama가 간편하다.
모델 다운로드 및 실행
ollama pull llama3.1
ollama run llama3.1
API 서버 모드
ollama serve
response = requests.post("http://localhost:11434/api/generate", json={
"model": "llama3.1",
"prompt": "Explain MLOps briefly.",
"stream": False,
})
print(response.json()["response"])
7. CI/CD for ML
모델 테스트 전략
ML 모델에는 일반 소프트웨어와 다른 테스트가 필요하다.
class TestModelQuality:
"""모델 품질 테스트"""
def test_accuracy_threshold(self, model, test_data):
"""정확도가 기준치 이상인지 확인"""
accuracy = evaluate(model, test_data)
assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"
def test_latency(self, model, sample_input):
"""추론 지연 시간 확인"""
start = time.time()
model.predict(sample_input)
latency = time.time() - start
assert latency < 0.1, f"Latency {latency}s exceeds 100ms"
def test_no_bias(self, model, fairness_data):
"""공정성 검증"""
results_group_a = model.predict(fairness_data["group_a"])
results_group_b = model.predict(fairness_data["group_b"])
disparity = abs(results_group_a.mean() - results_group_b.mean())
assert disparity < 0.05, f"Bias detected: disparity={disparity}"
def test_model_size(self, model_path):
"""모델 크기 제한"""
size_mb = os.path.getsize(model_path) / (1024 * 1024)
assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"
GitHub Actions로 ML 파이프라인
name: ML Pipeline
on:
push:
paths:
- "src/**"
- "data/**"
- "configs/**"
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate data
run: python scripts/validate_data.py
train:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train model
run: python scripts/train.py --config configs/production.yaml
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: outputs/model/
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download model
uses: actions/download-artifact@v4
with:
name: model
- name: Run evaluation
run: python scripts/evaluate.py
- name: Check quality gates
run: python scripts/quality_gate.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: ./scripts/deploy.sh staging
- name: Smoke test
run: python scripts/smoke_test.py
- name: Deploy to production
run: ./scripts/deploy.sh production
데이터 검증 파이프라인
def validate_training_data(data_path: str) -> bool:
"""학습 데이터의 품질을 검증하는 파이프라인"""
df = pd.read_parquet(data_path)
checks = [
데이터 크기 확인
len(df) >= 10000,
필수 컬럼 존재
all(col in df.columns for col in REQUIRED_COLUMNS),
null 비율 확인
df.isnull().mean().max() < 0.05,
레이블 분포 확인
df["label"].value_counts(normalize=True).min() > 0.1,
중복 제거
df.duplicated().mean() < 0.01,
]
if not all(checks):
failed = [i for i, c in enumerate(checks) if not c]
raise ValueError(f"Data validation failed at checks: {failed}")
return True
8. 모니터링
모델 성능 모니터링
배포 후 모델 성능은 시간이 지나면서 저하될 수 있다.
from prometheus_client import Gauge, Histogram, Counter
메트릭 정의
prediction_latency = Histogram(
"model_prediction_latency_seconds",
"Prediction latency in seconds",
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
prediction_count = Counter(
"model_prediction_total",
"Total number of predictions",
["model_version", "status"]
)
model_accuracy = Gauge(
"model_accuracy",
"Current model accuracy",
["model_name"]
)
사용
def predict_with_monitoring(model, input_data):
start = time.time()
try:
result = model.predict(input_data)
prediction_count.labels(
model_version="v2.1",
status="success"
).inc()
return result
except Exception as e:
prediction_count.labels(
model_version="v2.1",
status="error"
).inc()
raise
finally:
latency = time.time() - start
prediction_latency.observe(latency)
데이터 드리프트 감지
입력 데이터의 분포가 변하면 모델 성능이 저하된다.
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
드리프트 리포트 생성
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
column_mapping = ColumnMapping(
target="label",
prediction="prediction",
numerical_features=["feature_1", "feature_2", "feature_3"],
categorical_features=["category", "region"],
)
report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping,
)
드리프트 감지 결과 확인
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
trigger_retraining_pipeline()
개념 드리프트 (Concept Drift)
데이터 분포뿐 아니라 입력과 출력의 관계 자체가 변할 수 있다.
예를 들어 코로나 이전과 이후의 소비 패턴은 완전히 달라졌다.
class ConceptDriftDetector:
"""PSI(Population Stability Index) 기반 드리프트 감지"""
def __init__(self, threshold=0.2):
self.threshold = threshold
def calculate_psi(self, expected, actual, bins=10):
"""PSI 계산"""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)
0 방지
expected_counts = np.clip(expected_counts, 1e-6, None)
actual_counts = np.clip(actual_counts, 1e-6, None)
psi = np.sum(
(actual_counts - expected_counts) *
np.log(actual_counts / expected_counts)
)
return psi
def check_drift(self, reference_predictions, current_predictions):
psi = self.calculate_psi(reference_predictions, current_predictions)
if psi > self.threshold:
return {"drift_detected": True, "psi": psi, "action": "retrain"}
elif psi > self.threshold / 2:
return {"drift_detected": False, "psi": psi, "action": "monitor"}
else:
return {"drift_detected": False, "psi": psi, "action": "none"}
Grafana 대시보드 구성
모니터링 메트릭을 시각화하려면 Grafana가 적합하다.
Prometheus에서 수집한 메트릭을 대시보드로 구성할 수 있다.
주요 모니터링 패널 구성:
- **요청 처리량**: 초당 예측 요청 수
- **지연 시간 분포**: p50, p95, p99 레이턴시
- **오류율**: 예측 실패 비율
- **모델 정확도**: 실시간 성능 메트릭
- **데이터 드리프트 점수**: PSI, KL Divergence
- **리소스 사용량**: GPU 메모리, CPU, 네트워크
9. A/B 테스트와 배포 전략
카나리 배포 (Canary Deployment)
새 모델을 소량의 트래픽에만 먼저 적용하는 전략이다.
Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-serving
spec:
hosts:
- model-service
http:
- match:
- headers:
canary:
exact: "true"
route:
- destination:
host: model-service
subset: v2
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
섀도 모드 (Shadow Mode)
새 모델이 실제 트래픽으로 추론하되, 결과는 저장만 하고 사용자에게 보내지 않는다.
기존 모델의 결과와 비교하여 안전하게 검증할 수 있다.
class ShadowModelRouter:
def __init__(self, primary_model, shadow_model):
self.primary = primary_model
self.shadow = shadow_model
async def predict(self, input_data):
주 모델 결과 (사용자에게 반환)
primary_result = await self.primary.predict(input_data)
섀도 모델 결과 (비교용 저장)
try:
shadow_result = await self.shadow.predict(input_data)
await self.log_comparison(
input_data, primary_result, shadow_result
)
except Exception:
pass # 섀도 모델 오류는 무시
return primary_result
async def log_comparison(self, input_data, primary, shadow):
comparison = {
"timestamp": datetime.utcnow().isoformat(),
"input_hash": hash(str(input_data)),
"primary_prediction": primary,
"shadow_prediction": shadow,
"agreement": primary == shadow,
}
await self.metrics_store.insert(comparison)
멀티암 밴딧 (Multi-Armed Bandit)
A/B 테스트보다 효율적인 동적 트래픽 할당 방법이다.
성능이 좋은 모델에 더 많은 트래픽을 자동으로 배분한다.
class ThompsonSamplingRouter:
"""톰슨 샘플링 기반 모델 라우터"""
def __init__(self, model_names):
self.models = model_names
Beta 분포의 파라미터 (alpha, beta)
self.successes = {name: 1 for name in model_names}
self.failures = {name: 1 for name in model_names}
def select_model(self):
"""톰슨 샘플링으로 모델 선택"""
samples = {}
for name in self.models:
samples[name] = np.random.beta(
self.successes[name],
self.failures[name]
)
return max(samples, key=samples.get)
def update(self, model_name, success: bool):
"""결과에 따라 분포 업데이트"""
if success:
self.successes[model_name] += 1
else:
self.failures[model_name] += 1
def get_allocation(self):
"""현재 트래픽 비율 확인"""
total = sum(self.successes[m] + self.failures[m] for m in self.models)
return {
m: (self.successes[m] + self.failures[m]) / total
for m in self.models
}
10. 비용 최적화
스팟 인스턴스 활용
학습 작업은 중단 가능하므로 스팟 인스턴스로 비용을 70% 이상 절약할 수 있다.
AWS SageMaker 스팟 학습
estimator = sagemaker.estimator.Estimator(
image_uri="my-training-image:latest",
role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
instance_count=4,
instance_type="ml.p4d.24xlarge",
use_spot_instances=True,
max_wait=7200, # 최대 대기 시간
max_run=3600, # 최대 실행 시간
checkpoint_s3_uri="s3://bucket/checkpoints/",
)
estimator.fit({"training": "s3://bucket/data/"})
핵심은 **체크포인트 저장**이다. 스팟 인스턴스가 중단되어도 체크포인트에서 재개할 수 있어야 한다.
모델 경량화
프로덕션 서빙 비용을 줄이는 방법들이다.
**지식 증류 (Knowledge Distillation)**
class DistillationTrainer:
def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha
def distillation_loss(self, student_logits, teacher_logits, labels):
Soft target loss (KD loss)
soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.temperature, dim=1),
soft_targets,
reduction="batchmean"
) * (self.temperature ** 2)
Hard target loss
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
**ONNX 변환으로 추론 최적화**
PyTorch -> ONNX 변환
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
opset_version=17,
)
ONNX Runtime으로 추론
session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})
배치 추론 vs 실시간 추론
| 구분 | 배치 추론 | 실시간 추론 |
|------|----------|-----------|
| 지연 시간 | 분~시간 | 밀리초~초 |
| 처리량 | 매우 높음 | 중간 |
| 비용 | 낮음 (스팟 인스턴스) | 높음 (상시 가동) |
| 적합한 경우 | 추천 시스템, 리포트 | 챗봇, 검색 |
배치 추론 예시 (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
UDF로 모델 추론 등록
@udf("float")
def predict_udf(features):
return float(model.predict([features])[0])
대규모 데이터에 대해 배치 추론
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")
11. 실전: LLM RAG 파이프라인 구축
RAG(Retrieval-Augmented Generation)는 LLM의 한계를 극복하는 핵심 패턴이다.
외부 지식 베이스를 검색하여 LLM에 컨텍스트로 제공한다.
전체 아키텍처
1. **문서 수집** - 다양한 소스에서 데이터 수집
2. **청킹** - 문서를 적절한 크기로 분할
3. **임베딩** - 텍스트를 벡터로 변환
4. **벡터 DB 저장** - 벡터 인덱스에 저장
5. **검색** - 질의에 대해 유사한 문서 검색
6. **생성** - 검색 결과를 컨텍스트로 LLM에 전달
문서 수집 및 청킹
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
PyPDFLoader,
WebBaseLoader,
NotionDirectoryLoader,
)
다양한 소스에서 문서 로드
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()
all_docs = pdf_docs + web_docs + notion_docs
청킹
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "],
)
chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")
임베딩 및 벡터 DB
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
임베딩 모델
embedding_model = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={"device": "cuda"},
encode_kwargs={"normalize_embeddings": True},
)
Chroma 벡터 DB에 저장
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embedding_model,
persist_directory="./chroma_db",
collection_name="knowledge_base",
)
유사도 검색
results = vectorstore.similarity_search(
"How to deploy a model to production?",
k=5,
)
RAG 파이프라인 조립
from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate
LLM 설정
llm = VLLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_new_tokens=512,
temperature=0.1,
)
프롬프트 템플릿
prompt_template = PromptTemplate(
template="""다음 컨텍스트를 참고하여 질문에 답변하세요.
컨텍스트에 없는 정보는 "모르겠습니다"라고 답하세요.
컨텍스트:
{context}
질문: {question}
답변:""",
input_variables=["context", "question"],
)
RAG 체인 구성
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
),
chain_type_kwargs={"prompt": prompt_template},
return_source_documents=True,
)
질의
result = rag_chain.invoke("프로덕션 배포 절차가 어떻게 되나요?")
print(result["result"])
for doc in result["source_documents"]:
print(f" Source: {doc.metadata['source']}")
프로덕션 RAG 서빙
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
question: str
top_k: int = 5
class QueryResponse(BaseModel):
answer: str
sources: list[str]
latency_ms: float
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
start = time.time()
result = rag_chain.invoke(request.question)
sources = list(set(
doc.metadata.get("source", "unknown")
for doc in result["source_documents"]
))
latency = (time.time() - start) * 1000
return QueryResponse(
answer=result["result"],
sources=sources,
latency_ms=round(latency, 2),
)
마치며
MLOps는 ML 모델을 프로덕션에서 안정적으로 운영하기 위한 필수 분야다.
핵심 포인트를 정리하면 다음과 같다.
1. **실험 관리부터 시작하라** - MLflow 하나만 도입해도 큰 변화가 생긴다.
2. **자동화에 투자하라** - 수동 프로세스는 반드시 실패한다.
3. **모니터링은 필수다** - 배포 후가 진짜 시작이다.
4. **작게 시작하라** - Google Level 0에서 1으로 가는 것이 가장 중요하다.
5. **LLM 시대에 맞춰 진화하라** - vLLM, RAG 등 새로운 도구를 적극 활용하라.
모델을 학습하는 것은 전체의 20%에 불과하다.
나머지 80%는 데이터 파이프라인, 서빙, 모니터링, 운영이다.
MLOps는 바로 그 80%를 체계적으로 관리하는 방법론이다.
**실험 관리**
- 모든 실험의 하이퍼파라미터와 메트릭을 추적하고 있는가?
- 실험 결과를 재현할 수 있는가?
**데이터 파이프라인**
- 데이터 버전 관리를 하고 있는가?
- 데이터 품질 검증이 자동화되어 있는가?
**학습 인프라**
- 학습 파이프라인이 자동화되어 있는가?
- 체크포인트를 저장하고 있는가?
**서빙**
- 모델 서빙의 지연 시간을 모니터링하고 있는가?
- 롤백이 가능한 배포 전략을 쓰고 있는가?
**모니터링**
- 데이터 드리프트를 감지하고 있는가?
- 모델 성능 저하 시 알림이 오는가?
**비용**
- 스팟 인스턴스를 활용하고 있는가?
- 모델 경량화를 고려했는가?
현재 단락 (1/807)
1. [MLOps란 무엇인가](#1-mlops란-무엇인가)