Skip to content
Published on

MLOps & AI 모델 배포 완전 가이드 — 학습부터 서빙, 모니터링까지

Authors

목차

  1. MLOps란 무엇인가
  2. 실험 관리
  3. 데이터 파이프라인
  4. 모델 학습 인프라
  5. 모델 서빙
  6. LLM 서빙 특별편
  7. CI/CD for ML
  8. 모니터링
  9. A/B 테스트와 배포 전략
  10. 비용 최적화
  11. 실전: LLM RAG 파이프라인 구축

1. MLOps란 무엇인가

DevOps와 ML의 만남

MLOps는 Machine Learning과 Operations의 합성어다. 소프트웨어 엔지니어링의 DevOps 원칙을 머신러닝 라이프사이클에 적용한 것이라고 보면 된다.

전통적인 소프트웨어 개발에서는 코드만 관리하면 됐다. 하지만 ML 시스템에서는 코드, 데이터, 모델 세 가지를 동시에 관리해야 한다. 데이터가 바뀌면 모델이 바뀌고, 모델이 바뀌면 서빙 방식도 바뀔 수 있다.

ML 라이프사이클

ML 프로젝트의 전체 흐름은 다음과 같다.

  1. 문제 정의 - 비즈니스 요구사항을 ML 문제로 변환
  2. 데이터 수집 및 전처리 - 데이터 파이프라인 구축
  3. 피처 엔지니어링 - 원시 데이터를 모델이 이해할 수 있는 형태로 변환
  4. 모델 학습 - 다양한 알고리즘과 하이퍼파라미터 실험
  5. 모델 평가 - 오프라인 메트릭으로 성능 검증
  6. 모델 배포 - 프로덕션 환경에 서빙
  7. 모니터링 - 성능 추적과 드리프트 감지
  8. 재학습 - 성능 저하 시 모델 업데이트

이 과정은 일회성이 아니라 지속적인 순환이다.

Google의 MLOps 성숙도 모델

Google은 MLOps 성숙도를 세 단계로 정의한다.

Level 0 - 수동 프로세스

  • 데이터 과학자가 노트북에서 수동으로 학습
  • 모델 배포가 수동이고 비정기적
  • 모니터링 없음
  • 대부분의 팀이 여기에 해당

Level 1 - ML 파이프라인 자동화

  • 학습 파이프라인이 자동화됨
  • 지속적 학습(Continuous Training) 구현
  • 실험 추적 시스템 도입
  • 모델 레지스트리 사용

Level 2 - CI/CD 파이프라인 자동화

  • 파이프라인 자체의 CI/CD 구현
  • 자동화된 테스트와 검증
  • 피처 스토어 활용
  • 완전한 모니터링과 알림 체계

대부분의 조직이 Level 0에서 Level 1으로 가는 것만으로도 큰 가치를 얻을 수 있다.


2. 실험 관리

MLflow - 오픈소스 ML 플랫폼

MLflow는 ML 실험 관리의 사실상 표준이다. 네 가지 핵심 컴포넌트로 구성된다.

MLflow Tracking - 실험 기록

import mlflow

mlflow.set_experiment("recommendation-model-v2")

with mlflow.start_run():
    # 하이퍼파라미터 기록
    mlflow.log_param("learning_rate", 0.001)
    mlflow.log_param("batch_size", 64)
    mlflow.log_param("epochs", 50)

    # 학습 실행
    model = train_model(lr=0.001, batch_size=64, epochs=50)

    # 메트릭 기록
    mlflow.log_metric("accuracy", 0.923)
    mlflow.log_metric("f1_score", 0.891)
    mlflow.log_metric("auc_roc", 0.956)

    # 모델 저장
    mlflow.pytorch.log_model(model, "model")

MLflow Models - 모델 패키징

# 모델 레지스트리에 등록
mlflow.register_model(
    "runs:/abc123/model",
    "recommendation-model"
)

# 스테이지 전환
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="recommendation-model",
    version=3,
    stage="Production"
)

Weights and Biases

W&B는 상용 실험 관리 플랫폼으로, 시각화 기능이 뛰어나다.

import wandb

wandb.init(project="image-classification", config={
    "learning_rate": 0.001,
    "architecture": "ResNet50",
    "dataset": "imagenet-subset",
    "epochs": 100,
})

for epoch in range(100):
    train_loss, val_loss = train_one_epoch(model)
    wandb.log({
        "train_loss": train_loss,
        "val_loss": val_loss,
        "epoch": epoch
    })

Optuna - 하이퍼파라미터 최적화

수동으로 하이퍼파라미터를 조정하는 것은 비효율적이다. Optuna는 베이지안 최적화 기반으로 자동 탐색을 수행한다.

import optuna

def objective(trial):
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
    n_layers = trial.suggest_int("n_layers", 1, 5)
    dropout = trial.suggest_float("dropout", 0.1, 0.5)

    model = create_model(n_layers=n_layers, dropout=dropout)
    accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)

    return accuracy

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)

print(f"Best accuracy: {study.best_trial.value}")
print(f"Best params: {study.best_trial.params}")

3. 데이터 파이프라인

Feature Store

피처 스토어는 ML 피처의 중앙 저장소다. 학습과 서빙에서 동일한 피처를 사용할 수 있게 해준다.

Feast 예시

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

# 피처 정의
from feast import Entity, FeatureView, Field
from feast.types import Float32, Int64

user = Entity(name="user_id", join_keys=["user_id"])

user_features = FeatureView(
    name="user_features",
    entities=[user],
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_order_value", dtype=Float32),
        Field(name="days_since_last_order", dtype=Int64),
    ],
    source=user_source,
)

# 학습 데이터 조회
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:total_purchases",
        "user_features:avg_order_value",
        "user_features:days_since_last_order",
    ],
).to_df()

# 실시간 서빙용 조회
online_features = store.get_online_features(
    features=[
        "user_features:total_purchases",
        "user_features:avg_order_value",
    ],
    entity_rows=[{"user_id": 12345}],
).to_dict()

DVC - 데이터 버저닝

DVC(Data Version Control)는 Git으로 데이터를 버전 관리할 수 있게 한다.

# DVC 초기화
dvc init

# 데이터 파일 추적
dvc add data/training_data.csv

# 리모트 스토리지 설정
dvc remote add -d myremote s3://my-bucket/dvc-storage

# 데이터 푸시
dvc push

# 특정 버전의 데이터 가져오기
git checkout v1.0
dvc pull

데이터 검증 - Great Expectations

데이터 품질 검증은 파이프라인의 핵심이다.

import great_expectations as gx

context = gx.get_context()

# 기대치 정의
validator = context.sources.pandas_default.read_csv(
    "data/training_data.csv"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])

# 검증 실행
results = validator.validate()
if not results.success:
    raise ValueError("Data validation failed!")

4. 모델 학습 인프라

GPU 클러스터 구성

대규모 모델 학습에는 GPU 클러스터가 필수다.

Kubernetes + GPU 노드풀

apiVersion: v1
kind: Pod
metadata:
  name: training-job
spec:
  containers:
    - name: trainer
      image: training-image:latest
      resources:
        limits:
          nvidia.com/gpu: 4
      volumeMounts:
        - name: dataset
          mountPath: /data
  nodeSelector:
    gpu-type: a100
  tolerations:
    - key: "nvidia.com/gpu"
      operator: "Exists"
      effect: "NoSchedule"

분산 학습

단일 GPU로 감당할 수 없는 모델은 분산 학습이 필요하다.

PyTorch DDP (Distributed Data Parallel)

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

def train(rank, world_size):
    setup(rank, world_size)

    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)
        for batch in dataloader:
            loss = model(batch)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

    dist.destroy_process_group()

Kubeflow Training Operator

Kubeflow를 사용하면 Kubernetes 위에서 분산 학습을 선언적으로 관리할 수 있다.

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-training
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
            - name: pytorch
              image: my-training:latest
              resources:
                limits:
                  nvidia.com/gpu: 1
    Worker:
      replicas: 3
      template:
        spec:
          containers:
            - name: pytorch
              image: my-training:latest
              resources:
                limits:
                  nvidia.com/gpu: 1

5. 모델 서빙

서빙 아키텍처 비교

프레임워크장점단점적합한 경우
TorchServePyTorch 네이티브, 관리 쉬움성능 제한적소규모 PyTorch 모델
Triton멀티 프레임워크, 고성능설정 복잡대규모 프로덕션
vLLMLLM 특화, PagedAttentionLLM 전용LLM 서빙
FastAPI유연성 최고직접 구현 필요커스텀 로직 필요 시

TorchServe

# 모델 아카이브 생성
torch-model-archiver \
  --model-name resnet50 \
  --version 1.0 \
  --model-file model.py \
  --serialized-file model.pth \
  --handler image_classifier

# 서버 시작
torchserve --start \
  --model-store model_store \
  --models resnet50=resnet50.mar

NVIDIA Triton Inference Server

Triton은 다양한 프레임워크의 모델을 동시에 서빙할 수 있다.

# 모델 저장소 구조
model_repository/
  resnet50/
    config.pbtxt
    1/
      model.onnx
  bert_base/
    config.pbtxt
    1/
      model.plan

config.pbtxt 설정

name: "resnet50"
platform: "onnxruntime_onnx"
max_batch_size: 32
input [
  {
    name: "input"
    data_type: TYPE_FP32
    dims: [3, 224, 224]
  }
]
output [
  {
    name: "output"
    data_type: TYPE_FP32
    dims: [1000]
  }
]
dynamic_batching {
  preferred_batch_size: [8, 16, 32]
  max_queue_delay_microseconds: 100
}
instance_group [
  {
    count: 2
    kind: KIND_GPU
  }
]

FastAPI 래핑

간단한 모델 서빙에는 FastAPI가 적합하다.

from fastapi import FastAPI
import torch
from pydantic import BaseModel

app = FastAPI()

# 모델 로드 (앱 시작 시 1회)
model = torch.jit.load("model.pt")
model.eval()

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    confidence: float

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    tensor = torch.tensor([request.features])
    with torch.no_grad():
        output = model(tensor)
    prob = torch.softmax(output, dim=1)
    prediction = torch.argmax(prob, dim=1).item()
    confidence = prob[0][prediction].item()

    return PredictionResponse(
        prediction=prediction,
        confidence=confidence
    )

@app.get("/health")
async def health():
    return {"status": "healthy"}

6. LLM 서빙 특별편

vLLM - 고성능 LLM 서빙

vLLM은 PagedAttention 기술을 사용하여 LLM 서빙 성능을 극대화한다.

from vllm import LLM, SamplingParams

# 모델 로드
llm = LLM(
    model="meta-llama/Llama-3.1-8B-Instruct",
    tensor_parallel_size=2,
    gpu_memory_utilization=0.9,
    max_model_len=8192,
)

# 추론
sampling_params = SamplingParams(
    temperature=0.7,
    top_p=0.9,
    max_tokens=512,
)

outputs = llm.generate(
    ["Explain MLOps in simple terms."],
    sampling_params,
)
print(outputs[0].outputs[0].text)

vLLM OpenAI 호환 서버

python -m vllm.entrypoints.openai.api_server \
  --model meta-llama/Llama-3.1-8B-Instruct \
  --tensor-parallel-size 2 \
  --port 8000
# OpenAI SDK로 호출 가능
from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")

response = client.chat.completions.create(
    model="meta-llama/Llama-3.1-8B-Instruct",
    messages=[
        {"role": "user", "content": "What is MLOps?"}
    ],
    max_tokens=256,
)

Text Generation Inference (TGI)

HuggingFace의 TGI는 Rust 기반으로 안정적이다.

docker run --gpus all \
  -p 8080:80 \
  -v /data:/data \
  ghcr.io/huggingface/text-generation-inference:latest \
  --model-id meta-llama/Llama-3.1-8B-Instruct \
  --quantize gptq \
  --max-input-length 4096 \
  --max-total-tokens 8192

양자화 기법 비교

LLM 배포 시 모델 크기를 줄이는 양자화는 필수다.

기법크기 감소품질 손실속도 향상비고
GPTQ4x낮음2-3xGPU 최적화, 정확도 우수
AWQ4x매우 낮음2-3xActivation-aware, GPTQ보다 품질 좋음
GGUF2-8x가변적CPU 가능llama.cpp용, CPU/GPU 혼합 지원
BitsAndBytes2-4x낮음1.5x간편한 적용, QLoRA 학습 가능
# BitsAndBytes 4비트 양자화
from transformers import AutoModelForCausalLM, BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
)

model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-3.1-70B-Instruct",
    quantization_config=quantization_config,
    device_map="auto",
)

Ollama - 로컬 LLM

개발 환경이나 소규모 배포에는 Ollama가 간편하다.

# 모델 다운로드 및 실행
ollama pull llama3.1
ollama run llama3.1

# API 서버 모드
ollama serve
import requests

response = requests.post("http://localhost:11434/api/generate", json={
    "model": "llama3.1",
    "prompt": "Explain MLOps briefly.",
    "stream": False,
})
print(response.json()["response"])

7. CI/CD for ML

모델 테스트 전략

ML 모델에는 일반 소프트웨어와 다른 테스트가 필요하다.

import pytest

class TestModelQuality:
    """모델 품질 테스트"""

    def test_accuracy_threshold(self, model, test_data):
        """정확도가 기준치 이상인지 확인"""
        accuracy = evaluate(model, test_data)
        assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"

    def test_latency(self, model, sample_input):
        """추론 지연 시간 확인"""
        import time
        start = time.time()
        model.predict(sample_input)
        latency = time.time() - start
        assert latency < 0.1, f"Latency {latency}s exceeds 100ms"

    def test_no_bias(self, model, fairness_data):
        """공정성 검증"""
        results_group_a = model.predict(fairness_data["group_a"])
        results_group_b = model.predict(fairness_data["group_b"])
        disparity = abs(results_group_a.mean() - results_group_b.mean())
        assert disparity < 0.05, f"Bias detected: disparity={disparity}"

    def test_model_size(self, model_path):
        """모델 크기 제한"""
        import os
        size_mb = os.path.getsize(model_path) / (1024 * 1024)
        assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"

GitHub Actions로 ML 파이프라인

name: ML Pipeline

on:
  push:
    paths:
      - "src/**"
      - "data/**"
      - "configs/**"

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate data
        run: python scripts/validate_data.py

  train:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train model
        run: python scripts/train.py --config configs/production.yaml
      - name: Upload model artifact
        uses: actions/upload-artifact@v4
        with:
          name: model
          path: outputs/model/

  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Download model
        uses: actions/download-artifact@v4
        with:
          name: model
      - name: Run evaluation
        run: python scripts/evaluate.py
      - name: Check quality gates
        run: python scripts/quality_gate.py

  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: ./scripts/deploy.sh staging
      - name: Smoke test
        run: python scripts/smoke_test.py
      - name: Deploy to production
        run: ./scripts/deploy.sh production

데이터 검증 파이프라인

def validate_training_data(data_path: str) -> bool:
    """학습 데이터의 품질을 검증하는 파이프라인"""

    df = pd.read_parquet(data_path)

    checks = [
        # 데이터 크기 확인
        len(df) >= 10000,
        # 필수 컬럼 존재
        all(col in df.columns for col in REQUIRED_COLUMNS),
        # null 비율 확인
        df.isnull().mean().max() < 0.05,
        # 레이블 분포 확인
        df["label"].value_counts(normalize=True).min() > 0.1,
        # 중복 제거
        df.duplicated().mean() < 0.01,
    ]

    if not all(checks):
        failed = [i for i, c in enumerate(checks) if not c]
        raise ValueError(f"Data validation failed at checks: {failed}")

    return True

8. 모니터링

모델 성능 모니터링

배포 후 모델 성능은 시간이 지나면서 저하될 수 있다.

from prometheus_client import Gauge, Histogram, Counter

# 메트릭 정의
prediction_latency = Histogram(
    "model_prediction_latency_seconds",
    "Prediction latency in seconds",
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)

prediction_count = Counter(
    "model_prediction_total",
    "Total number of predictions",
    ["model_version", "status"]
)

model_accuracy = Gauge(
    "model_accuracy",
    "Current model accuracy",
    ["model_name"]
)

# 사용
import time

def predict_with_monitoring(model, input_data):
    start = time.time()
    try:
        result = model.predict(input_data)
        prediction_count.labels(
            model_version="v2.1",
            status="success"
        ).inc()
        return result
    except Exception as e:
        prediction_count.labels(
            model_version="v2.1",
            status="error"
        ).inc()
        raise
    finally:
        latency = time.time() - start
        prediction_latency.observe(latency)

데이터 드리프트 감지

입력 데이터의 분포가 변하면 모델 성능이 저하된다.

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# 드리프트 리포트 생성
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

column_mapping = ColumnMapping(
    target="label",
    prediction="prediction",
    numerical_features=["feature_1", "feature_2", "feature_3"],
    categorical_features=["category", "region"],
)

report.run(
    reference_data=training_data,
    current_data=production_data,
    column_mapping=column_mapping,
)

# 드리프트 감지 결과 확인
drift_results = report.as_dict()
if drift_results["metrics"][0]["result"]["dataset_drift"]:
    trigger_retraining_pipeline()

개념 드리프트 (Concept Drift)

데이터 분포뿐 아니라 입력과 출력의 관계 자체가 변할 수 있다. 예를 들어 코로나 이전과 이후의 소비 패턴은 완전히 달라졌다.

class ConceptDriftDetector:
    """PSI(Population Stability Index) 기반 드리프트 감지"""

    def __init__(self, threshold=0.2):
        self.threshold = threshold

    def calculate_psi(self, expected, actual, bins=10):
        """PSI 계산"""
        breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
        expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
        actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)

        # 0 방지
        expected_counts = np.clip(expected_counts, 1e-6, None)
        actual_counts = np.clip(actual_counts, 1e-6, None)

        psi = np.sum(
            (actual_counts - expected_counts) *
            np.log(actual_counts / expected_counts)
        )
        return psi

    def check_drift(self, reference_predictions, current_predictions):
        psi = self.calculate_psi(reference_predictions, current_predictions)

        if psi > self.threshold:
            return {"drift_detected": True, "psi": psi, "action": "retrain"}
        elif psi > self.threshold / 2:
            return {"drift_detected": False, "psi": psi, "action": "monitor"}
        else:
            return {"drift_detected": False, "psi": psi, "action": "none"}

Grafana 대시보드 구성

모니터링 메트릭을 시각화하려면 Grafana가 적합하다. Prometheus에서 수집한 메트릭을 대시보드로 구성할 수 있다.

주요 모니터링 패널 구성:

  • 요청 처리량: 초당 예측 요청 수
  • 지연 시간 분포: p50, p95, p99 레이턴시
  • 오류율: 예측 실패 비율
  • 모델 정확도: 실시간 성능 메트릭
  • 데이터 드리프트 점수: PSI, KL Divergence
  • 리소스 사용량: GPU 메모리, CPU, 네트워크

9. A/B 테스트와 배포 전략

카나리 배포 (Canary Deployment)

새 모델을 소량의 트래픽에만 먼저 적용하는 전략이다.

# Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: model-serving
spec:
  hosts:
    - model-service
  http:
    - match:
        - headers:
            canary:
              exact: "true"
      route:
        - destination:
            host: model-service
            subset: v2
    - route:
        - destination:
            host: model-service
            subset: v1
          weight: 90
        - destination:
            host: model-service
            subset: v2
          weight: 10

섀도 모드 (Shadow Mode)

새 모델이 실제 트래픽으로 추론하되, 결과는 저장만 하고 사용자에게 보내지 않는다. 기존 모델의 결과와 비교하여 안전하게 검증할 수 있다.

class ShadowModelRouter:
    def __init__(self, primary_model, shadow_model):
        self.primary = primary_model
        self.shadow = shadow_model

    async def predict(self, input_data):
        # 주 모델 결과 (사용자에게 반환)
        primary_result = await self.primary.predict(input_data)

        # 섀도 모델 결과 (비교용 저장)
        try:
            shadow_result = await self.shadow.predict(input_data)
            await self.log_comparison(
                input_data, primary_result, shadow_result
            )
        except Exception:
            pass  # 섀도 모델 오류는 무시

        return primary_result

    async def log_comparison(self, input_data, primary, shadow):
        comparison = {
            "timestamp": datetime.utcnow().isoformat(),
            "input_hash": hash(str(input_data)),
            "primary_prediction": primary,
            "shadow_prediction": shadow,
            "agreement": primary == shadow,
        }
        await self.metrics_store.insert(comparison)

멀티암 밴딧 (Multi-Armed Bandit)

A/B 테스트보다 효율적인 동적 트래픽 할당 방법이다. 성능이 좋은 모델에 더 많은 트래픽을 자동으로 배분한다.

import numpy as np

class ThompsonSamplingRouter:
    """톰슨 샘플링 기반 모델 라우터"""

    def __init__(self, model_names):
        self.models = model_names
        # Beta 분포의 파라미터 (alpha, beta)
        self.successes = {name: 1 for name in model_names}
        self.failures = {name: 1 for name in model_names}

    def select_model(self):
        """톰슨 샘플링으로 모델 선택"""
        samples = {}
        for name in self.models:
            samples[name] = np.random.beta(
                self.successes[name],
                self.failures[name]
            )
        return max(samples, key=samples.get)

    def update(self, model_name, success: bool):
        """결과에 따라 분포 업데이트"""
        if success:
            self.successes[model_name] += 1
        else:
            self.failures[model_name] += 1

    def get_allocation(self):
        """현재 트래픽 비율 확인"""
        total = sum(self.successes[m] + self.failures[m] for m in self.models)
        return {
            m: (self.successes[m] + self.failures[m]) / total
            for m in self.models
        }

10. 비용 최적화

스팟 인스턴스 활용

학습 작업은 중단 가능하므로 스팟 인스턴스로 비용을 70% 이상 절약할 수 있다.

# AWS SageMaker 스팟 학습
import sagemaker

estimator = sagemaker.estimator.Estimator(
    image_uri="my-training-image:latest",
    role="arn:aws:iam::ACCOUNT:role/SageMakerRole",
    instance_count=4,
    instance_type="ml.p4d.24xlarge",
    use_spot_instances=True,
    max_wait=7200,  # 최대 대기 시간
    max_run=3600,   # 최대 실행 시간
    checkpoint_s3_uri="s3://bucket/checkpoints/",
)

estimator.fit({"training": "s3://bucket/data/"})

핵심은 체크포인트 저장이다. 스팟 인스턴스가 중단되어도 체크포인트에서 재개할 수 있어야 한다.

모델 경량화

프로덕션 서빙 비용을 줄이는 방법들이다.

지식 증류 (Knowledge Distillation)

class DistillationTrainer:
    def __init__(self, teacher, student, temperature=3.0, alpha=0.5):
        self.teacher = teacher
        self.student = student
        self.temperature = temperature
        self.alpha = alpha

    def distillation_loss(self, student_logits, teacher_logits, labels):
        # Soft target loss (KD loss)
        soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)
        soft_loss = F.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=1),
            soft_targets,
            reduction="batchmean"
        ) * (self.temperature ** 2)

        # Hard target loss
        hard_loss = F.cross_entropy(student_logits, labels)

        return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

ONNX 변환으로 추론 최적화

import torch
import onnx

# PyTorch -> ONNX 변환
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
    opset_version=17,
)

# ONNX Runtime으로 추론
import onnxruntime as ort

session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])
result = session.run(None, {"input": input_array})

배치 추론 vs 실시간 추론

구분배치 추론실시간 추론
지연 시간분~시간밀리초~초
처리량매우 높음중간
비용낮음 (스팟 인스턴스)높음 (상시 가동)
적합한 경우추천 시스템, 리포트챗봇, 검색
# 배치 추론 예시 (Spark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

# UDF로 모델 추론 등록
@udf("float")
def predict_udf(features):
    return float(model.predict([features])[0])

# 대규모 데이터에 대해 배치 추론
df = spark.read.parquet("s3://bucket/daily-data/")
predictions = df.withColumn("prediction", predict_udf("features"))
predictions.write.parquet("s3://bucket/predictions/")

11. 실전: LLM RAG 파이프라인 구축

RAG(Retrieval-Augmented Generation)는 LLM의 한계를 극복하는 핵심 패턴이다. 외부 지식 베이스를 검색하여 LLM에 컨텍스트로 제공한다.

전체 아키텍처

  1. 문서 수집 - 다양한 소스에서 데이터 수집
  2. 청킹 - 문서를 적절한 크기로 분할
  3. 임베딩 - 텍스트를 벡터로 변환
  4. 벡터 DB 저장 - 벡터 인덱스에 저장
  5. 검색 - 질의에 대해 유사한 문서 검색
  6. 생성 - 검색 결과를 컨텍스트로 LLM에 전달

문서 수집 및 청킹

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import (
    PyPDFLoader,
    WebBaseLoader,
    NotionDirectoryLoader,
)

# 다양한 소스에서 문서 로드
pdf_docs = PyPDFLoader("docs/manual.pdf").load()
web_docs = WebBaseLoader("https://docs.example.com").load()
notion_docs = NotionDirectoryLoader("notion_export/").load()

all_docs = pdf_docs + web_docs + notion_docs

# 청킹
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "],
)

chunks = text_splitter.split_documents(all_docs)
print(f"Total chunks: {len(chunks)}")

임베딩 및 벡터 DB

from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

# 임베딩 모델
embedding_model = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-en-v1.5",
    model_kwargs={"device": "cuda"},
    encode_kwargs={"normalize_embeddings": True},
)

# Chroma 벡터 DB에 저장
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embedding_model,
    persist_directory="./chroma_db",
    collection_name="knowledge_base",
)

# 유사도 검색
results = vectorstore.similarity_search(
    "How to deploy a model to production?",
    k=5,
)

RAG 파이프라인 조립

from langchain.chains import RetrievalQA
from langchain.llms import VLLM
from langchain.prompts import PromptTemplate

# LLM 설정
llm = VLLM(
    model="meta-llama/Llama-3.1-8B-Instruct",
    tensor_parallel_size=1,
    max_new_tokens=512,
    temperature=0.1,
)

# 프롬프트 템플릿
prompt_template = PromptTemplate(
    template="""다음 컨텍스트를 참고하여 질문에 답변하세요.
컨텍스트에 없는 정보는 "모르겠습니다"라고 답하세요.

컨텍스트:
{context}

질문: {question}

답변:""",
    input_variables=["context", "question"],
)

# RAG 체인 구성
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(
        search_type="mmr",
        search_kwargs={"k": 5, "fetch_k": 20},
    ),
    chain_type_kwargs={"prompt": prompt_template},
    return_source_documents=True,
)

# 질의
result = rag_chain.invoke("프로덕션 배포 절차가 어떻게 되나요?")
print(result["result"])
for doc in result["source_documents"]:
    print(f"  Source: {doc.metadata['source']}")

프로덕션 RAG 서빙

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class QueryRequest(BaseModel):
    question: str
    top_k: int = 5

class QueryResponse(BaseModel):
    answer: str
    sources: list[str]
    latency_ms: float

@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    import time
    start = time.time()

    result = rag_chain.invoke(request.question)

    sources = list(set(
        doc.metadata.get("source", "unknown")
        for doc in result["source_documents"]
    ))

    latency = (time.time() - start) * 1000

    return QueryResponse(
        answer=result["result"],
        sources=sources,
        latency_ms=round(latency, 2),
    )

마치며

MLOps는 ML 모델을 프로덕션에서 안정적으로 운영하기 위한 필수 분야다.

핵심 포인트를 정리하면 다음과 같다.

  1. 실험 관리부터 시작하라 - MLflow 하나만 도입해도 큰 변화가 생긴다.
  2. 자동화에 투자하라 - 수동 프로세스는 반드시 실패한다.
  3. 모니터링은 필수다 - 배포 후가 진짜 시작이다.
  4. 작게 시작하라 - Google Level 0에서 1으로 가는 것이 가장 중요하다.
  5. LLM 시대에 맞춰 진화하라 - vLLM, RAG 등 새로운 도구를 적극 활용하라.

모델을 학습하는 것은 전체의 20%에 불과하다. 나머지 80%는 데이터 파이프라인, 서빙, 모니터링, 운영이다. MLOps는 바로 그 80%를 체계적으로 관리하는 방법론이다.

MLOps 셀프 체크리스트

실험 관리

  • 모든 실험의 하이퍼파라미터와 메트릭을 추적하고 있는가?
  • 실험 결과를 재현할 수 있는가?

데이터 파이프라인

  • 데이터 버전 관리를 하고 있는가?
  • 데이터 품질 검증이 자동화되어 있는가?

학습 인프라

  • 학습 파이프라인이 자동화되어 있는가?
  • 체크포인트를 저장하고 있는가?

서빙

  • 모델 서빙의 지연 시간을 모니터링하고 있는가?
  • 롤백이 가능한 배포 전략을 쓰고 있는가?

모니터링

  • 데이터 드리프트를 감지하고 있는가?
  • 모델 성능 저하 시 알림이 오는가?

비용

  • 스팟 인스턴스를 활용하고 있는가?
  • 모델 경량화를 고려했는가?