Skip to content

필사 모드: MLOps & AI 모델 배포 완전 가이드 — 학습부터 서빙, 모니터링까지

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

목차

1. [MLOps란 무엇인가](#1-mlops란-무엇인가)

2. [실험 관리](#2-실험-관리)

3. [데이터 파이프라인](#3-데이터-파이프라인)

4. [모델 학습 인프라](#4-모델-학습-인프라)

5. [모델 서빙](#5-모델-서빙)

6. [LLM 서빙 특별편](#6-llm-서빙-특별편)

7. [CI/CD for ML](#7-cicd-for-ml)

8. [모니터링](#8-모니터링)

9. [A/B 테스트와 배포 전략](#9-ab-테스트와-배포-전략)

10. [비용 최적화](#10-비용-최적화)

11. [실전: LLM RAG 파이프라인 구축](#11-실전-llm-rag-파이프라인-구축)

1. MLOps란 무엇인가

DevOps와 ML의 만남

MLOps는 Machine Learning과 Operations의 합성어다.

소프트웨어 엔지니어링의 DevOps 원칙을 머신러닝 라이프사이클에 적용한 것이라고 보면 된다.

전통적인 소프트웨어 개발에서는 코드만 관리하면 됐다.

하지만 ML 시스템에서는 **코드, 데이터, 모델** 세 가지를 동시에 관리해야 한다.

데이터가 바뀌면 모델이 바뀌고, 모델이 바뀌면 서빙 방식도 바뀔 수 있다.

ML 라이프사이클

ML 프로젝트의 전체 흐름은 다음과 같다.

1. **문제 정의** - 비즈니스 요구사항을 ML 문제로 변환

2. **데이터 수집 및 전처리** - 데이터 파이프라인 구축

3. **피처 엔지니어링** - 원시 데이터를 모델이 이해할 수 있는 형태로 변환

4. **모델 학습** - 다양한 알고리즘과 하이퍼파라미터 실험

5. **모델 평가** - 오프라인 메트릭으로 성능 검증

6. **모델 배포** - 프로덕션 환경에 서빙

7. **모니터링** - 성능 추적과 드리프트 감지

8. **재학습** - 성능 저하 시 모델 업데이트

이 과정은 일회성이 아니라 **지속적인 순환**이다.

Google의 MLOps 성숙도 모델

Google은 MLOps 성숙도를 세 단계로 정의한다.

**Level 0 - 수동 프로세스**

- 데이터 과학자가 노트북에서 수동으로 학습

- 모델 배포가 수동이고 비정기적

- 모니터링 없음

- 대부분의 팀이 여기에 해당

**Level 1 - ML 파이프라인 자동화**

- 학습 파이프라인이 자동화됨

- 지속적 학습(Continuous Training) 구현

- 실험 추적 시스템 도입

- 모델 레지스트리 사용

**Level 2 - CI/CD 파이프라인 자동화**

- 파이프라인 자체의 CI/CD 구현

- 자동화된 테스트와 검증

- 피처 스토어 활용

- 완전한 모니터링과 알림 체계

대부분의 조직이 Level 0에서 Level 1으로 가는 것만으로도 큰 가치를 얻을 수 있다.

2. 실험 관리

MLflow - 오픈소스 ML 플랫폼

MLflow는 ML 실험 관리의 사실상 표준이다.

네 가지 핵심 컴포넌트로 구성된다.

**MLflow Tracking** - 실험 기록

mlflow.set_experiment("recommendation-model-v2")

with mlflow.start_run():

하이퍼파라미터 기록

mlflow.log_param("learning_rate", 0.001)

mlflow.log_param("batch_size", 64)

mlflow.log_param("epochs", 50)

학습 실행

model = train_model(lr=0.001, batch_size=64, epochs=50)

메트릭 기록

mlflow.log_metric("accuracy", 0.923)

mlflow.log_metric("f1_score", 0.891)

mlflow.log_metric("auc_roc", 0.956)

모델 저장

mlflow.pytorch.log_model(model, "model")

**MLflow Models** - 모델 패키징

모델 레지스트리에 등록

mlflow.register_model(

"runs:/abc123/model",

"recommendation-model"

)

스테이지 전환

client = mlflow.tracking.MlflowClient()

client.transition_model_version_stage(

name="recommendation-model",

version=3,

stage="Production"

)

Weights and Biases

W&B는 상용 실험 관리 플랫폼으로, 시각화 기능이 뛰어나다.

wandb.init(project="image-classification", config={

"learning_rate": 0.001,

"architecture": "ResNet50",

"dataset": "imagenet-subset",

"epochs": 100,

})

for epoch in range(100):

train_loss, val_loss = train_one_epoch(model)

wandb.log({

"train_loss": train_loss,

"val_loss": val_loss,

"epoch": epoch

})

Optuna - 하이퍼파라미터 최적화

수동으로 하이퍼파라미터를 조정하는 것은 비효율적이다.

Optuna는 베이지안 최적화 기반으로 자동 탐색을 수행한다.

def objective(trial):

lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)

batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])

n_layers = trial.suggest_int("n_layers", 1, 5)

dropout = trial.suggest_float("dropout", 0.1, 0.5)

model = create_model(n_layers=n_layers, dropout=dropout)

accuracy = train_and_evaluate(model, lr=lr, batch_size=batch_size)

return accuracy

study = optuna.create_study(direction="maximize")

study.optimize(objective, n_trials=100)

print(f"Best accuracy: {study.best_trial.value}")

print(f"Best params: {study.best_trial.params}")

3. 데이터 파이프라인

Feature Store

피처 스토어는 ML 피처의 중앙 저장소다.

학습과 서빙에서 동일한 피처를 사용할 수 있게 해준다.

**Feast 예시**

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

피처 정의

from feast import Entity, FeatureView, Field

from feast.types import Float32, Int64

user = Entity(name="user_id", join_keys=["user_id"])

user_features = FeatureView(

name="user_features",

entities=[user],

schema=[

Field(name="total_purchases", dtype=Int64),

Field(name="avg_order_value", dtype=Float32),

Field(name="days_since_last_order", dtype=Int64),

],

source=user_source,

)

학습 데이터 조회

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"user_features:total_purchases",

"user_features:avg_order_value",

"user_features:days_since_last_order",

],

).to_df()

실시간 서빙용 조회

online_features = store.get_online_features(

features=[

"user_features:total_purchases",

"user_features:avg_order_value",

],

entity_rows=[{"user_id": 12345}],

).to_dict()

DVC - 데이터 버저닝

DVC(Data Version Control)는 Git으로 데이터를 버전 관리할 수 있게 한다.

DVC 초기화

dvc init

데이터 파일 추적

dvc add data/training_data.csv

리모트 스토리지 설정

dvc remote add -d myremote s3://my-bucket/dvc-storage

데이터 푸시

dvc push

특정 버전의 데이터 가져오기

git checkout v1.0

dvc pull

데이터 검증 - Great Expectations

데이터 품질 검증은 파이프라인의 핵심이다.

context = gx.get_context()

기대치 정의

validator = context.sources.pandas_default.read_csv(

"data/training_data.csv"

)

validator.expect_column_values_to_not_be_null("user_id")

validator.expect_column_values_to_be_between("age", min_value=0, max_value=150)

validator.expect_column_values_to_be_in_set("country", ["KR", "US", "JP", "UK"])

검증 실행

results = validator.validate()

if not results.success:

raise ValueError("Data validation failed!")

4. 모델 학습 인프라

GPU 클러스터 구성

대규모 모델 학습에는 GPU 클러스터가 필수다.

**Kubernetes + GPU 노드풀**

apiVersion: v1

kind: Pod

metadata:

name: training-job

spec:

containers:

- name: trainer

image: training-image:latest

resources:

limits:

nvidia.com/gpu: 4

volumeMounts:

- name: dataset

mountPath: /data

nodeSelector:

gpu-type: a100

tolerations:

- key: "nvidia.com/gpu"

operator: "Exists"

effect: "NoSchedule"

분산 학습

단일 GPU로 감당할 수 없는 모델은 분산 학습이 필요하다.

**PyTorch DDP (Distributed Data Parallel)**

from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):

dist.init_process_group("nccl", rank=rank, world_size=world_size)

torch.cuda.set_device(rank)

def train(rank, world_size):

setup(rank, world_size)

model = MyModel().to(rank)

model = DDP(model, device_ids=[rank])

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(num_epochs):

sampler.set_epoch(epoch)

for batch in dataloader:

loss = model(batch)

loss.backward()

optimizer.step()

optimizer.zero_grad()

dist.destroy_process_group()

Kubeflow Training Operator

Kubeflow를 사용하면 Kubernetes 위에서 분산 학습을 선언적으로 관리할 수 있다.

apiVersion: kubeflow.org/v1

kind: PyTorchJob

metadata:

name: pytorch-training

spec:

pytorchReplicaSpecs:

Master:

replicas: 1

template:

spec:

containers:

- name: pytorch

image: my-training:latest

resources:

limits:

nvidia.com/gpu: 1

Worker:

replicas: 3

template:

spec:

containers:

- name: pytorch

image: my-training:latest

resources:

limits:

nvidia.com/gpu: 1

5. 모델 서빙

서빙 아키텍처 비교

| 프레임워크 | 장점 | 단점 | 적합한 경우 |

|-----------|------|------|-----------|

| TorchServe | PyTorch 네이티브, 관리 쉬움 | 성능 제한적 | 소규모 PyTorch 모델 |

| Triton | 멀티 프레임워크, 고성능 | 설정 복잡 | 대규모 프로덕션 |

| vLLM | LLM 특화, PagedAttention | LLM 전용 | LLM 서빙 |

| FastAPI | 유연성 최고 | 직접 구현 필요 | 커스텀 로직 필요 시 |

TorchServe

모델 아카이브 생성

torch-model-archiver \

--model-name resnet50 \

--version 1.0 \

--model-file model.py \

--serialized-file model.pth \

--handler image_classifier

서버 시작

torchserve --start \

--model-store model_store \

--models resnet50=resnet50.mar

NVIDIA Triton Inference Server

Triton은 다양한 프레임워크의 모델을 동시에 서빙할 수 있다.

모델 저장소 구조

model_repository/

resnet50/

config.pbtxt

1/

model.onnx

bert_base/

config.pbtxt

1/

model.plan

**config.pbtxt 설정**

name: "resnet50"

platform: "onnxruntime_onnx"

max_batch_size: 32

input [

{

name: "input"

data_type: TYPE_FP32

dims: [3, 224, 224]

}

]

output [

{

name: "output"

data_type: TYPE_FP32

dims: [1000]

}

]

dynamic_batching {

preferred_batch_size: [8, 16, 32]

max_queue_delay_microseconds: 100

}

instance_group [

{

count: 2

kind: KIND_GPU

}

]

FastAPI 래핑

간단한 모델 서빙에는 FastAPI가 적합하다.

from fastapi import FastAPI

from pydantic import BaseModel

app = FastAPI()

모델 로드 (앱 시작 시 1회)

model = torch.jit.load("model.pt")

model.eval()

class PredictionRequest(BaseModel):

features: list[float]

class PredictionResponse(BaseModel):

prediction: float

confidence: float

@app.post("/predict", response_model=PredictionResponse)

async def predict(request: PredictionRequest):

tensor = torch.tensor([request.features])

with torch.no_grad():

output = model(tensor)

prob = torch.softmax(output, dim=1)

prediction = torch.argmax(prob, dim=1).item()

confidence = prob[0][prediction].item()

return PredictionResponse(

prediction=prediction,

confidence=confidence

)

@app.get("/health")

async def health():

return {"status": "healthy"}

6. LLM 서빙 특별편

vLLM - 고성능 LLM 서빙

vLLM은 PagedAttention 기술을 사용하여 LLM 서빙 성능을 극대화한다.

from vllm import LLM, SamplingParams

모델 로드

llm = LLM(

model="meta-llama/Llama-3.1-8B-Instruct",

tensor_parallel_size=2,

gpu_memory_utilization=0.9,

max_model_len=8192,

)

추론

sampling_params = SamplingParams(

temperature=0.7,

top_p=0.9,

max_tokens=512,

)

outputs = llm.generate(

["Explain MLOps in simple terms."],

sampling_params,

)

print(outputs[0].outputs[0].text)

**vLLM OpenAI 호환 서버**

python -m vllm.entrypoints.openai.api_server \

--model meta-llama/Llama-3.1-8B-Instruct \

--tensor-parallel-size 2 \

--port 8000

OpenAI SDK로 호출 가능

from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")

response = client.chat.completions.create(

model="meta-llama/Llama-3.1-8B-Instruct",

messages=[

{"role": "user", "content": "What is MLOps?"}

],

max_tokens=256,

)

Text Generation Inference (TGI)

HuggingFace의 TGI는 Rust 기반으로 안정적이다.

docker run --gpus all \

-p 8080:80 \

-v /data:/data \

ghcr.io/huggingface/text-generation-inference:latest \

--model-id meta-llama/Llama-3.1-8B-Instruct \

--quantize gptq \

--max-input-length 4096 \

--max-total-tokens 8192

양자화 기법 비교

LLM 배포 시 모델 크기를 줄이는 양자화는 필수다.

| 기법 | 크기 감소 | 품질 손실 | 속도 향상 | 비고 |

|------|----------|----------|----------|------|

| GPTQ | 4x | 낮음 | 2-3x | GPU 최적화, 정확도 우수 |

| AWQ | 4x | 매우 낮음 | 2-3x | Activation-aware, GPTQ보다 품질 좋음 |

| GGUF | 2-8x | 가변적 | CPU 가능 | llama.cpp용, CPU/GPU 혼합 지원 |

| BitsAndBytes | 2-4x | 낮음 | 1.5x | 간편한 적용, QLoRA 학습 가능 |

BitsAndBytes 4비트 양자화

from transformers import AutoModelForCausalLM, BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(

load_in_4bit=True,

bnb_4bit_compute_dtype=torch.float16,

bnb_4bit_quant_type="nf4",

bnb_4bit_use_double_quant=True,

)

model = AutoModelForCausalLM.from_pretrained(

"meta-llama/Llama-3.1-70B-Instruct",

quantization_config=quantization_config,

device_map="auto",

)

Ollama - 로컬 LLM

개발 환경이나 소규모 배포에는 Ollama가 간편하다.

모델 다운로드 및 실행

ollama pull llama3.1

ollama run llama3.1

API 서버 모드

ollama serve

response = requests.post("http://localhost:11434/api/generate", json={

"model": "llama3.1",

"prompt": "Explain MLOps briefly.",

"stream": False,

})

print(response.json()["response"])

7. CI/CD for ML

모델 테스트 전략

ML 모델에는 일반 소프트웨어와 다른 테스트가 필요하다.

class TestModelQuality:

"""모델 품질 테스트"""

def test_accuracy_threshold(self, model, test_data):

"""정확도가 기준치 이상인지 확인"""

accuracy = evaluate(model, test_data)

assert accuracy >= 0.90, f"Accuracy {accuracy} below threshold 0.90"

def test_latency(self, model, sample_input):

"""추론 지연 시간 확인"""

start = time.time()

model.predict(sample_input)

latency = time.time() - start

assert latency < 0.1, f"Latency {latency}s exceeds 100ms"

def test_no_bias(self, model, fairness_data):

"""공정성 검증"""

results_group_a = model.predict(fairness_data["group_a"])

results_group_b = model.predict(fairness_data["group_b"])

disparity = abs(results_group_a.mean() - results_group_b.mean())

assert disparity < 0.05, f"Bias detected: disparity={disparity}"

def test_model_size(self, model_path):

"""모델 크기 제한"""

size_mb = os.path.getsize(model_path) / (1024 * 1024)

assert size_mb < 500, f"Model size {size_mb}MB exceeds 500MB limit"

GitHub Actions로 ML 파이프라인

name: ML Pipeline

on:

push:

paths:

- "src/**"

- "data/**"

- "configs/**"

jobs:

data-validation:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Validate data

run: python scripts/validate_data.py

train:

needs: data-validation

runs-on: [self-hosted, gpu]

steps:

- uses: actions/checkout@v4

- name: Train model

run: python scripts/train.py --config configs/production.yaml

- name: Upload model artifact

uses: actions/upload-artifact@v4

with:

name: model

path: outputs/model/

evaluate:

needs: train

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Download model

uses: actions/download-artifact@v4

with:

name: model

- name: Run evaluation

run: python scripts/evaluate.py

- name: Check quality gates

run: python scripts/quality_gate.py

deploy:

needs: evaluate

if: github.ref == 'refs/heads/main'

runs-on: ubuntu-latest

steps:

- name: Deploy to staging

run: ./scripts/deploy.sh staging

- name: Smoke test

run: python scripts/smoke_test.py

- name: Deploy to production

run: ./scripts/deploy.sh production

데이터 검증 파이프라인

def validate_training_data(data_path: str) -> bool:

"""학습 데이터의 품질을 검증하는 파이프라인"""

df = pd.read_parquet(data_path)

checks = [

데이터 크기 확인

len(df) >= 10000,

필수 컬럼 존재

all(col in df.columns for col in REQUIRED_COLUMNS),

null 비율 확인

df.isnull().mean().max() < 0.05,

레이블 분포 확인

df["label"].value_counts(normalize=True).min() > 0.1,

중복 제거

df.duplicated().mean() < 0.01,

]

if not all(checks):

failed = [i for i, c in enumerate(checks) if not c]

raise ValueError(f"Data validation failed at checks: {failed}")

return True

8. 모니터링

모델 성능 모니터링

배포 후 모델 성능은 시간이 지나면서 저하될 수 있다.

from prometheus_client import Gauge, Histogram, Counter

메트릭 정의

prediction_latency = Histogram(

"model_prediction_latency_seconds",

"Prediction latency in seconds",

buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]

)

prediction_count = Counter(

"model_prediction_total",

"Total number of predictions",

["model_version", "status"]

)

model_accuracy = Gauge(

"model_accuracy",

"Current model accuracy",

["model_name"]

)

사용

def predict_with_monitoring(model, input_data):

start = time.time()

try:

result = model.predict(input_data)

prediction_count.labels(

model_version="v2.1",

status="success"

).inc()

return result

except Exception as e:

prediction_count.labels(

model_version="v2.1",

status="error"

).inc()

raise

finally:

latency = time.time() - start

prediction_latency.observe(latency)

데이터 드리프트 감지

입력 데이터의 분포가 변하면 모델 성능이 저하된다.

from evidently import ColumnMapping

from evidently.report import Report

from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

드리프트 리포트 생성

report = Report(metrics=[

DataDriftPreset(),

TargetDriftPreset(),

])

column_mapping = ColumnMapping(

target="label",

prediction="prediction",

numerical_features=["feature_1", "feature_2", "feature_3"],

categorical_features=["category", "region"],

)

report.run(

reference_data=training_data,

current_data=production_data,

column_mapping=column_mapping,

)

드리프트 감지 결과 확인

drift_results = report.as_dict()

if drift_results["metrics"][0]["result"]["dataset_drift"]:

trigger_retraining_pipeline()

개념 드리프트 (Concept Drift)

데이터 분포뿐 아니라 입력과 출력의 관계 자체가 변할 수 있다.

예를 들어 코로나 이전과 이후의 소비 패턴은 완전히 달라졌다.

class ConceptDriftDetector:

"""PSI(Population Stability Index) 기반 드리프트 감지"""

def __init__(self, threshold=0.2):

self.threshold = threshold

def calculate_psi(self, expected, actual, bins=10):

"""PSI 계산"""

breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))

expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)

actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)

0 방지

expected_counts = np.clip(expected_counts, 1e-6, None)

actual_counts = np.clip(actual_counts, 1e-6, None)

psi = np.sum(

(actual_counts - expected_counts) *

np.log(actual_counts / expected_counts)

)

return psi

def check_drift(self, reference_predictions, current_predictions):

psi = self.calculate_psi(reference_predictions, current_predictions)

if psi > self.threshold:

return {"drift_detected": True, "psi": psi, "action": "retrain"}

elif psi > self.threshold / 2:

return {"drift_detected": False, "psi": psi, "action": "monitor"}

else:

return {"drift_detected": False, "psi": psi, "action": "none"}

Grafana 대시보드 구성

모니터링 메트릭을 시각화하려면 Grafana가 적합하다.

Prometheus에서 수집한 메트릭을 대시보드로 구성할 수 있다.

주요 모니터링 패널 구성:

- **요청 처리량**: 초당 예측 요청 수

- **지연 시간 분포**: p50, p95, p99 레이턴시

- **오류율**: 예측 실패 비율

- **모델 정확도**: 실시간 성능 메트릭

- **데이터 드리프트 점수**: PSI, KL Divergence

- **리소스 사용량**: GPU 메모리, CPU, 네트워크

9. A/B 테스트와 배포 전략

카나리 배포 (Canary Deployment)

새 모델을 소량의 트래픽에만 먼저 적용하는 전략이다.

Istio VirtualService로 트래픽 분할

apiVersion: networking.istio.io/v1beta1

kind: VirtualService

metadata:

name: model-serving

spec:

hosts:

- model-service

http:

- match:

- headers:

canary:

exact: "true"

route:

- destination:

host: model-service

subset: v2

- route:

- destination:

host: model-service

subset: v1

weight: 90

- destination:

host: model-service

subset: v2

weight: 10

섀도 모드 (Shadow Mode)

새 모델이 실제 트래픽으로 추론하되, 결과는 저장만 하고 사용자에게 보내지 않는다.

기존 모델의 결과와 비교하여 안전하게 검증할 수 있다.

class ShadowModelRouter:

def __init__(self, primary_model, shadow_model):

self.primary = primary_model

self.shadow = shadow_model

async def predict(self, input_data):

주 모델 결과 (사용자에게 반환)

primary_result = await self.primary.predict(input_data)

섀도 모델 결과 (비교용 저장)

try:

shadow_result = await self.shadow.predict(input_data)

await self.log_comparison(

input_data, primary_result, shadow_result

)

except Exception:

pass # 섀도 모델 오류는 무시

return primary_result

async def log_comparison(self, input_data, primary, shadow):

comparison = {

"timestamp": datetime.utcnow().isoformat(),

"input_hash": hash(str(input_data)),

"primary_prediction": primary,

"shadow_prediction": shadow,

"agreement": primary == shadow,

}

await self.metrics_store.insert(comparison)

멀티암 밴딧 (Multi-Armed Bandit)

A/B 테스트보다 효율적인 동적 트래픽 할당 방법이다.

성능이 좋은 모델에 더 많은 트래픽을 자동으로 배분한다.

class ThompsonSamplingRouter:

"""톰슨 샘플링 기반 모델 라우터"""

def __init__(self, model_names):

self.models = model_names

Beta 분포의 파라미터 (alpha, beta)

self.successes = {name: 1 for name in model_names}

self.failures = {name: 1 for name in model_names}

def select_model(self):

"""톰슨 샘플링으로 모델 선택"""

samples = {}

for name in self.models:

samples[name] = np.random.beta(

self.successes[name],

self.failures[name]

)

return max(samples, key=samples.get)

def update(self, model_name, success: bool):

"""결과에 따라 분포 업데이트"""

if success:

self.successes[model_name] += 1

else:

self.failures[model_name] += 1

def get_allocation(self):

"""현재 트래픽 비율 확인"""

total = sum(self.successes[m] + self.failures[m] for m in self.models)

return {

m: (self.successes[m] + self.failures[m]) / total

for m in self.models

}

10. 비용 최적화

스팟 인스턴스 활용

학습 작업은 중단 가능하므로 스팟 인스턴스로 비용을 70% 이상 절약할 수 있다.

AWS SageMaker 스팟 학습

estimator = sagemaker.estimator.Estimator(

image_uri="my-training-image:latest",

role="arn:aws:iam::ACCOUNT:role/SageMakerRole",

instance_count=4,

instance_type="ml.p4d.24xlarge",

use_spot_instances=True,

max_wait=7200, # 최대 대기 시간

max_run=3600, # 최대 실행 시간

checkpoint_s3_uri="s3://bucket/checkpoints/",

)

estimator.fit({"training": "s3://bucket/data/"})

핵심은 **체크포인트 저장**이다. 스팟 인스턴스가 중단되어도 체크포인트에서 재개할 수 있어야 한다.

모델 경량화

프로덕션 서빙 비용을 줄이는 방법들이다.

**지식 증류 (Knowledge Distillation)**

class DistillationTrainer:

def __init__(self, teacher, student, temperature=3.0, alpha=0.5):

self.teacher = teacher

self.student = student

self.temperature = temperature

self.alpha = alpha

def distillation_loss(self, student_logits, teacher_logits, labels):

Soft target loss (KD loss)

soft_targets = F.softmax(teacher_logits / self.temperature, dim=1)

soft_loss = F.kl_div(

F.log_softmax(student_logits / self.temperature, dim=1),

soft_targets,

reduction="batchmean"

) * (self.temperature ** 2)

Hard target loss

hard_loss = F.cross_entropy(student_logits, labels)

return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

**ONNX 변환으로 추론 최적화**

PyTorch -> ONNX 변환

dummy_input = torch.randn(1, 3, 224, 224)

torch.onnx.export(

model,

dummy_input,

"model.onnx",

input_names=["input"],

output_names=["output"],

dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},

opset_version=17,

)

ONNX Runtime으로 추론

session = ort.InferenceSession("model.onnx", providers=["CUDAExecutionProvider"])

result = session.run(None, {"input": input_array})

배치 추론 vs 실시간 추론

| 구분 | 배치 추론 | 실시간 추론 |

|------|----------|-----------|

| 지연 시간 | 분~시간 | 밀리초~초 |

| 처리량 | 매우 높음 | 중간 |

| 비용 | 낮음 (스팟 인스턴스) | 높음 (상시 가동) |

| 적합한 경우 | 추천 시스템, 리포트 | 챗봇, 검색 |

배치 추론 예시 (Spark)

from pyspark.sql import SparkSession

from pyspark.sql.functions import udf

spark = SparkSession.builder.getOrCreate()

UDF로 모델 추론 등록

@udf("float")

def predict_udf(features):

return float(model.predict([features])[0])

대규모 데이터에 대해 배치 추론

df = spark.read.parquet("s3://bucket/daily-data/")

predictions = df.withColumn("prediction", predict_udf("features"))

predictions.write.parquet("s3://bucket/predictions/")

11. 실전: LLM RAG 파이프라인 구축

RAG(Retrieval-Augmented Generation)는 LLM의 한계를 극복하는 핵심 패턴이다.

외부 지식 베이스를 검색하여 LLM에 컨텍스트로 제공한다.

전체 아키텍처

1. **문서 수집** - 다양한 소스에서 데이터 수집

2. **청킹** - 문서를 적절한 크기로 분할

3. **임베딩** - 텍스트를 벡터로 변환

4. **벡터 DB 저장** - 벡터 인덱스에 저장

5. **검색** - 질의에 대해 유사한 문서 검색

6. **생성** - 검색 결과를 컨텍스트로 LLM에 전달

문서 수집 및 청킹

from langchain.text_splitter import RecursiveCharacterTextSplitter

from langchain.document_loaders import (

PyPDFLoader,

WebBaseLoader,

NotionDirectoryLoader,

)

다양한 소스에서 문서 로드

pdf_docs = PyPDFLoader("docs/manual.pdf").load()

web_docs = WebBaseLoader("https://docs.example.com").load()

notion_docs = NotionDirectoryLoader("notion_export/").load()

all_docs = pdf_docs + web_docs + notion_docs

청킹

text_splitter = RecursiveCharacterTextSplitter(

chunk_size=1000,

chunk_overlap=200,

separators=["\n\n", "\n", ".", " "],

)

chunks = text_splitter.split_documents(all_docs)

print(f"Total chunks: {len(chunks)}")

임베딩 및 벡터 DB

from langchain.embeddings import HuggingFaceEmbeddings

from langchain.vectorstores import Chroma

임베딩 모델

embedding_model = HuggingFaceEmbeddings(

model_name="BAAI/bge-large-en-v1.5",

model_kwargs={"device": "cuda"},

encode_kwargs={"normalize_embeddings": True},

)

Chroma 벡터 DB에 저장

vectorstore = Chroma.from_documents(

documents=chunks,

embedding=embedding_model,

persist_directory="./chroma_db",

collection_name="knowledge_base",

)

유사도 검색

results = vectorstore.similarity_search(

"How to deploy a model to production?",

k=5,

)

RAG 파이프라인 조립

from langchain.chains import RetrievalQA

from langchain.llms import VLLM

from langchain.prompts import PromptTemplate

LLM 설정

llm = VLLM(

model="meta-llama/Llama-3.1-8B-Instruct",

tensor_parallel_size=1,

max_new_tokens=512,

temperature=0.1,

)

프롬프트 템플릿

prompt_template = PromptTemplate(

template="""다음 컨텍스트를 참고하여 질문에 답변하세요.

컨텍스트에 없는 정보는 "모르겠습니다"라고 답하세요.

컨텍스트:

{context}

질문: {question}

답변:""",

input_variables=["context", "question"],

)

RAG 체인 구성

rag_chain = RetrievalQA.from_chain_type(

llm=llm,

chain_type="stuff",

retriever=vectorstore.as_retriever(

search_type="mmr",

search_kwargs={"k": 5, "fetch_k": 20},

),

chain_type_kwargs={"prompt": prompt_template},

return_source_documents=True,

)

질의

result = rag_chain.invoke("프로덕션 배포 절차가 어떻게 되나요?")

print(result["result"])

for doc in result["source_documents"]:

print(f" Source: {doc.metadata['source']}")

프로덕션 RAG 서빙

from fastapi import FastAPI

from pydantic import BaseModel

app = FastAPI()

class QueryRequest(BaseModel):

question: str

top_k: int = 5

class QueryResponse(BaseModel):

answer: str

sources: list[str]

latency_ms: float

@app.post("/query", response_model=QueryResponse)

async def query(request: QueryRequest):

start = time.time()

result = rag_chain.invoke(request.question)

sources = list(set(

doc.metadata.get("source", "unknown")

for doc in result["source_documents"]

))

latency = (time.time() - start) * 1000

return QueryResponse(

answer=result["result"],

sources=sources,

latency_ms=round(latency, 2),

)

마치며

MLOps는 ML 모델을 프로덕션에서 안정적으로 운영하기 위한 필수 분야다.

핵심 포인트를 정리하면 다음과 같다.

1. **실험 관리부터 시작하라** - MLflow 하나만 도입해도 큰 변화가 생긴다.

2. **자동화에 투자하라** - 수동 프로세스는 반드시 실패한다.

3. **모니터링은 필수다** - 배포 후가 진짜 시작이다.

4. **작게 시작하라** - Google Level 0에서 1으로 가는 것이 가장 중요하다.

5. **LLM 시대에 맞춰 진화하라** - vLLM, RAG 등 새로운 도구를 적극 활용하라.

모델을 학습하는 것은 전체의 20%에 불과하다.

나머지 80%는 데이터 파이프라인, 서빙, 모니터링, 운영이다.

MLOps는 바로 그 80%를 체계적으로 관리하는 방법론이다.

**실험 관리**

- 모든 실험의 하이퍼파라미터와 메트릭을 추적하고 있는가?

- 실험 결과를 재현할 수 있는가?

**데이터 파이프라인**

- 데이터 버전 관리를 하고 있는가?

- 데이터 품질 검증이 자동화되어 있는가?

**학습 인프라**

- 학습 파이프라인이 자동화되어 있는가?

- 체크포인트를 저장하고 있는가?

**서빙**

- 모델 서빙의 지연 시간을 모니터링하고 있는가?

- 롤백이 가능한 배포 전략을 쓰고 있는가?

**모니터링**

- 데이터 드리프트를 감지하고 있는가?

- 모델 성능 저하 시 알림이 오는가?

**비용**

- 스팟 인스턴스를 활용하고 있는가?

- 모델 경량화를 고려했는가?

현재 단락 (1/807)

1. [MLOps란 무엇인가](#1-mlops란-무엇인가)

작성 글자: 0원문 글자: 21,444작성 단락: 0/807