- Authors

- Name
- Youngju Kim
- @fjvbn20031
들어가며
현대의 머신러닝 프로젝트는 Jupyter Notebook에서 모델을 만들고 끝나는 게 아닙니다. 데이터 수집, 전처리, 학습, 평가, 배포, 모니터링, 재학습까지 전체 라이프사이클을 안정적으로 관리해야 합니다. 이것이 바로 MLOps가 등장한 이유입니다.
이 가이드에서는 MLOps의 기초 개념부터 실전 도구 활용까지 완전히 정복합니다.
1. MLOps란 무엇인가?
1.1 MLOps의 정의
MLOps(Machine Learning Operations)는 ML 시스템의 개발과 운영을 통합하는 방법론입니다. DevOps의 철학을 ML 워크플로에 적용한 것으로, 모델의 지속적인 학습(CT), 지속적인 통합(CI), 지속적인 배포(CD)를 자동화합니다.
1.2 MLOps vs DevOps vs DataOps
| 구분 | DevOps | DataOps | MLOps |
|---|---|---|---|
| 초점 | 소프트웨어 배포 | 데이터 파이프라인 | ML 모델 라이프사이클 |
| 산출물 | 애플리케이션 | 데이터/리포트 | ML 모델 |
| 재현성 | 코드 버전 | 데이터 버전 | 코드+데이터+모델 버전 |
| 자동화 | CI/CD | 데이터 테스트 | 학습+평가+배포 자동화 |
MLOps가 DevOps와 다른 핵심은 데이터 의존성입니다. 동일한 코드라도 다른 데이터로 학습하면 전혀 다른 모델이 생성되며, 모델 성능은 코드 품질뿐만 아니라 데이터 품질에도 크게 좌우됩니다.
1.3 ML 시스템의 특수성
ML 시스템은 전통적인 소프트웨어와 다른 몇 가지 고유한 특성을 가집니다:
- 데이터 의존성: 모델 동작이 코드가 아닌 데이터에 의해 결정됨
- 실험적 특성: 수십~수백 번의 실험 반복이 필요
- 모델 부식(Model Decay): 시간이 지남에 따라 데이터 분포가 변하면 모델 성능 저하
- 재현성 문제: 동일한 환경에서 동일한 결과를 보장하기 어려움
- 멀티플 아티팩트: 코드, 데이터, 모델 모두 버전 관리 필요
1.4 MLOps 성숙도 모델
Google의 MLOps 성숙도 레벨을 기준으로 설명합니다:
레벨 0 - 수동 ML
- 모든 과정이 수동
- 스크립트 기반 실험
- 배포가 드물고 수동
- 모니터링 없음
레벨 1 - ML 파이프라인 자동화
- 학습 파이프라인 자동화
- 지속적 학습 가능
- 실험 추적 시작
- 피처 스토어 도입
레벨 2 - CI/CD 파이프라인 자동화
- CI/CD 완전 자동화
- 모델 레지스트리 활용
- 자동 재학습 트리거
- 완전한 모니터링
1.5 MLOps 도구 에코시스템
데이터 버전 관리: DVC, LakeFS, Delta Lake
실험 추적: MLflow, W&B, Neptune, Comet ML
파이프라인: Airflow, Prefect, Metaflow, Kubeflow Pipelines
모델 레지스트리: MLflow Registry, W&B Artifacts, Vertex AI
컨테이너화: Docker, Podman
오케스트레이션: Kubernetes, ECS, GKE
모델 서빙: Triton, TorchServe, BentoML, KServe
모니터링: Evidently, WhyLogs, Arize, Fiddler
피처 스토어: Feast, Tecton, Vertex AI Feature Store
2. 데이터 버전 관리 (DVC)
2.1 DVC 소개
DVC(Data Version Control)는 Git과 함께 사용하는 ML 프로젝트용 버전 관리 도구입니다. 대용량 데이터셋과 ML 모델을 Git처럼 버전 관리할 수 있습니다.
# DVC 설치
pip install dvc
# S3 지원 포함 설치
pip install "dvc[s3]"
# GCS 지원 포함 설치
pip install "dvc[gs]"
# 모든 원격 저장소 지원
pip install "dvc[all]"
2.2 DVC 초기화 및 기본 사용
# Git 저장소 초기화 (없다면)
git init
# DVC 초기화
dvc init
# 생성된 파일 확인
ls .dvc/
# config .gitignore tmp/
# 데이터 추적 시작
dvc add data/train.csv
# .dvc 파일이 생성됨 (Git으로 추적)
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"
2.3 원격 스토리지 설정
# S3 원격 스토리지 설정
dvc remote add -d myremote s3://my-bucket/dvc-store
# AWS 자격증명 설정 (환경변수 또는 ~/.aws/credentials)
dvc remote modify myremote access_key_id YOUR_ACCESS_KEY
dvc remote modify myremote secret_access_key YOUR_SECRET_KEY
# GCS 원격 스토리지
dvc remote add -d gcsstorage gs://my-bucket/dvc-store
# 로컬 원격 스토리지 (테스트용)
dvc remote add -d localremote /tmp/dvc-storage
# 데이터 푸시
dvc push
# 데이터 풀
dvc pull
2.4 DVC 파이프라인
DVC 파이프라인은 각 단계의 의존성을 추적하고 변경된 단계만 재실행합니다.
# dvc.yaml
stages:
prepare:
cmd: python src/prepare.py
deps:
- src/prepare.py
- data/raw
outs:
- data/prepared
featurize:
cmd: python src/featurize.py
deps:
- src/featurize.py
- data/prepared
outs:
- data/features
train:
cmd: python src/train.py
deps:
- src/train.py
- data/features
params:
- params.yaml:
- train.lr
- train.n_estimators
outs:
- models/model.pkl
metrics:
- metrics/scores.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pkl
- data/features
metrics:
- metrics/eval.json:
cache: false
# 파이프라인 실행
dvc repro
# 특정 단계까지 실행
dvc repro train
# 파이프라인 DAG 시각화
dvc dag
# 실험 결과 비교
dvc metrics show
dvc metrics diff HEAD~1
2.5 파라미터 관리
# params.yaml
train:
lr: 0.001
n_estimators: 100
max_depth: 5
batch_size: 32
epochs: 10
data:
test_size: 0.2
random_state: 42
# src/train.py
import dvc.api
import yaml
# 파라미터 로드
with open("params.yaml") as f:
params = yaml.safe_load(f)
lr = params["train"]["lr"]
n_estimators = params["train"]["n_estimators"]
# 모델 학습
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=params["train"]["max_depth"],
random_state=params["data"]["random_state"]
)
2.6 Git + DVC 워크플로
# 새 실험 브랜치 생성
git checkout -b experiment/increase-lr
# 파라미터 수정
# params.yaml에서 lr: 0.001 → lr: 0.01로 변경
# 파이프라인 재실행
dvc repro
# 결과 확인
dvc metrics show
# 커밋
git add dvc.lock params.yaml
git commit -m "Experiment: increase learning rate to 0.01"
# 실험 비교
git checkout main
dvc metrics diff experiment/increase-lr
3. 실험 추적 (Experiment Tracking)
3.1 MLflow 설치 및 설정
# MLflow 설치
pip install mlflow
# MLflow UI 시작
mlflow ui
# 특정 포트와 호스트 지정
mlflow ui --host 0.0.0.0 --port 5001
# 원격 추적 서버 설정
export MLFLOW_TRACKING_URI=http://mlflow-server:5000
3.2 MLflow 기본 사용
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
# 추적 URI 설정 (기본값: ./mlruns)
mlflow.set_tracking_uri("http://localhost:5000")
# 실험 설정
mlflow.set_experiment("my-classification-experiment")
# 실험 실행
with mlflow.start_run(run_name="random-forest-v1"):
# 파라미터 로깅
n_estimators = 100
max_depth = 5
lr = 0.001
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
mlflow.log_param("learning_rate", lr)
# 모델 학습
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)
# 메트릭 로깅
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# 학습 곡선 (스텝별 메트릭)
for epoch in range(10):
train_loss = np.random.random() * 0.5 / (epoch + 1)
mlflow.log_metric("train_loss", train_loss, step=epoch)
# 아티팩트 로깅
mlflow.log_artifact("data/train.csv")
# 모델 로깅
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="RandomForestClassifier"
)
print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
print(f"Run ID: {mlflow.active_run().info.run_id}")
3.3 MLflow 자동 로깅
# 자동 로깅 활성화 (sklearn 예제)
mlflow.sklearn.autolog()
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# 파라미터, 메트릭, 모델이 자동으로 로깅됨
# PyTorch 자동 로깅
mlflow.pytorch.autolog()
# XGBoost 자동 로깅
mlflow.xgboost.autolog()
3.4 실전: PyTorch 학습 추적
import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch
class SimpleNet(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.dropout(x)
return self.fc2(x)
def train_with_mlflow(
model, train_loader, val_loader,
epochs=10, lr=0.001, experiment_name="pytorch-training"
):
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
# 하이퍼파라미터 로깅
mlflow.log_params({
"epochs": epochs,
"learning_rate": lr,
"model_type": "SimpleNet",
"optimizer": "Adam",
"batch_size": train_loader.batch_size,
})
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
for epoch in range(epochs):
# 학습
model.train()
train_loss = 0.0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
# 검증
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for batch_X, batch_y in val_loader:
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
val_loss += loss.item()
_, predicted = outputs.max(1)
total += batch_y.size(0)
correct += predicted.eq(batch_y).sum().item()
val_loss /= len(val_loader)
val_accuracy = correct / total
# 스텝별 메트릭 로깅
mlflow.log_metrics({
"train_loss": train_loss,
"val_loss": val_loss,
"val_accuracy": val_accuracy,
"learning_rate": scheduler.get_last_lr()[0],
}, step=epoch)
scheduler.step()
print(f"Epoch {epoch+1}/{epochs} | "
f"Train Loss: {train_loss:.4f} | "
f"Val Loss: {val_loss:.4f} | "
f"Val Acc: {val_accuracy:.4f}")
# 최종 모델 저장
mlflow.pytorch.log_model(model, "model")
# 모델 구조 텍스트로 저장
with open("model_summary.txt", "w") as f:
f.write(str(model))
mlflow.log_artifact("model_summary.txt")
return mlflow.active_run().info.run_id
3.5 W&B (Weights & Biases) 사용법
# W&B 설치
pip install wandb
# 로그인
wandb login
import wandb
import torch
import torch.nn as nn
# W&B 초기화
wandb.init(
project="my-ml-project",
name="experiment-001",
config={
"learning_rate": 0.001,
"epochs": 100,
"batch_size": 64,
"architecture": "ResNet50",
"dataset": "ImageNet",
}
)
# config 접근
lr = wandb.config.learning_rate
# 학습 루프 예제
for epoch in range(wandb.config.epochs):
train_loss, train_acc = train_epoch(model, train_loader)
val_loss, val_acc = eval_epoch(model, val_loader)
# 메트릭 로깅
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"train_accuracy": train_acc,
"val_loss": val_loss,
"val_accuracy": val_acc,
"learning_rate": optimizer.param_groups[0]['lr'],
})
# 모델 저장
wandb.save("model.pth")
# 이미지 로깅 예제
wandb.log({
"predictions": [
wandb.Image(img, caption=f"Pred: {pred}, True: {true}")
for img, pred, true in sample_predictions
]
})
# W&B Artifacts (데이터셋 버전 관리)
artifact = wandb.Artifact("training-data", type="dataset")
artifact.add_dir("data/train")
wandb.log_artifact(artifact)
# 실험 종료
wandb.finish()
3.6 W&B Sweeps (하이퍼파라미터 최적화)
import wandb
# sweep 설정
sweep_config = {
"method": "bayes", # random, grid, bayes
"metric": {
"name": "val_accuracy",
"goal": "maximize"
},
"parameters": {
"learning_rate": {
"distribution": "log_uniform_values",
"min": 1e-5,
"max": 1e-1,
},
"batch_size": {
"values": [16, 32, 64, 128]
},
"hidden_dim": {
"values": [64, 128, 256, 512]
},
"dropout": {
"distribution": "uniform",
"min": 0.1,
"max": 0.5,
}
}
}
def train_sweep():
with wandb.init() as run:
config = wandb.config
model = SimpleNet(
input_dim=784,
hidden_dim=config.hidden_dim,
output_dim=10
)
optimizer = torch.optim.Adam(
model.parameters(),
lr=config.learning_rate
)
# 학습 루프 (간략화)
for epoch in range(10):
train_loss, val_loss, val_acc = run_epoch(
model, optimizer, config.batch_size
)
wandb.log({
"train_loss": train_loss,
"val_loss": val_loss,
"val_accuracy": val_acc,
})
# sweep 생성 및 실행
sweep_id = wandb.sweep(sweep_config, project="my-project")
wandb.agent(sweep_id, function=train_sweep, count=50)
4. ML 파이프라인 오케스트레이션
4.1 Apache Airflow로 ML 파이프라인
# dags/ml_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'mlops-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='ML 모델 학습 파이프라인',
schedule_interval='0 2 * * *', # 매일 오전 2시
catchup=False,
)
def extract_data(**kwargs):
"""데이터 추출"""
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@db:5432/mldb')
df = pd.read_sql("SELECT * FROM features WHERE date >= CURRENT_DATE - 7", engine)
output_path = '/tmp/raw_data.parquet'
df.to_parquet(output_path)
# XCom으로 경로 전달
kwargs['ti'].xcom_push(key='data_path', value=output_path)
return output_path
def preprocess_data(**kwargs):
"""데이터 전처리"""
ti = kwargs['ti']
data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(data_path)
# 전처리
scaler = StandardScaler()
features = df.drop('target', axis=1)
labels = df['target']
scaled_features = scaler.fit_transform(features)
processed_path = '/tmp/processed_data.parquet'
pd.DataFrame(scaled_features).to_parquet(processed_path)
ti.xcom_push(key='processed_path', value=processed_path)
def train_model(**kwargs):
"""모델 학습"""
ti = kwargs['ti']
processed_path = ti.xcom_pull(
task_ids='preprocess_data',
key='processed_path'
)
import mlflow
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
mlflow.set_experiment("production-training")
with mlflow.start_run():
data = pd.read_parquet(processed_path)
X_train, X_test, y_train, y_test = train_test_split(
data.drop('target', axis=1),
data['target'],
test_size=0.2
)
model = GradientBoostingClassifier(n_estimators=200)
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "model")
run_id = mlflow.active_run().info.run_id
ti.xcom_push(key='run_id', value=run_id)
def evaluate_and_deploy(**kwargs):
"""모델 평가 및 배포"""
ti = kwargs['ti']
run_id = ti.xcom_pull(task_ids='train_model', key='run_id')
import mlflow
client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id)
accuracy = run.data.metrics['accuracy']
# 성능 기준 충족시 배포
if accuracy >= 0.90:
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "ProductionModel")
print(f"Model deployed with accuracy: {accuracy:.4f}")
else:
raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.90")
# 태스크 정의
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
preprocess_task = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
dag=dag,
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag,
)
evaluate_task = PythonOperator(
task_id='evaluate_and_deploy',
python_callable=evaluate_and_deploy,
dag=dag,
)
# 의존성 설정
extract_task >> preprocess_task >> train_task >> evaluate_task
4.2 Prefect로 ML 파이프라인
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(data_path: str) -> pd.DataFrame:
"""데이터 로드 (캐싱 지원)"""
return pd.read_parquet(data_path)
@task(retries=3, retry_delay_seconds=10)
def preprocess(df: pd.DataFrame) -> pd.DataFrame:
"""데이터 전처리"""
df = df.dropna()
df = df.drop_duplicates()
return df
@task
def train(df: pd.DataFrame, params: dict) -> str:
"""모델 학습"""
from sklearn.ensemble import RandomForestClassifier
mlflow.set_experiment("prefect-ml-pipeline")
with mlflow.start_run():
model = RandomForestClassifier(**params)
X = df.drop('target', axis=1)
y = df['target']
model.fit(X, y)
mlflow.sklearn.log_model(model, "model")
mlflow.log_params(params)
return mlflow.active_run().info.run_id
@task
def evaluate(run_id: str) -> float:
"""모델 평가"""
client = mlflow.tracking.MlflowClient()
run = client.get_run(run_id)
return run.data.metrics.get('accuracy', 0.0)
@flow(name="ml-training-pipeline")
def ml_pipeline(data_path: str, params: dict = None):
"""메인 파이프라인"""
if params is None:
params = {"n_estimators": 100, "max_depth": 5}
df = load_data(data_path)
processed_df = preprocess(df)
run_id = train(processed_df, params)
accuracy = evaluate(run_id)
print(f"Pipeline complete. Accuracy: {accuracy:.4f}")
return accuracy
# 실행
if __name__ == "__main__":
ml_pipeline(
data_path="data/train.parquet",
params={"n_estimators": 200, "max_depth": 7}
)
5. 모델 레지스트리
5.1 MLflow Model Registry
모델 레지스트리는 모델의 버전을 중앙에서 관리하고, Staging/Production 상태를 추적합니다.
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 모델 등록
model_uri = "runs:/abc123def456/model"
registered_model = mlflow.register_model(model_uri, "MyClassifier")
# 버전 정보 확인
print(f"Version: {registered_model.version}")
print(f"Status: {registered_model.status}")
# 모델 버전 메타데이터 업데이트
client.update_model_version(
name="MyClassifier",
version=registered_model.version,
description="Random Forest with 200 estimators, accuracy 0.94"
)
# 태그 추가
client.set_model_version_tag(
name="MyClassifier",
version=registered_model.version,
key="validated_by",
value="data-science-team"
)
5.2 Staging → Production 승격
# Staging으로 전환
client.transition_model_version_stage(
name="MyClassifier",
version=1,
stage="Staging",
archive_existing_versions=False
)
# 검증 후 Production으로 승격
def promote_to_production(model_name: str, version: int, min_accuracy: float = 0.90):
"""모델을 Production으로 승격"""
# 현재 메트릭 확인
model_version = client.get_model_version(model_name, version)
run_id = model_version.run_id
run = client.get_run(run_id)
accuracy = run.data.metrics.get('accuracy', 0)
if accuracy < min_accuracy:
raise ValueError(
f"Model accuracy {accuracy:.4f} is below minimum {min_accuracy}"
)
# Production으로 승격 (기존 Production 버전은 Archived로)
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
archive_existing_versions=True
)
print(f"Model {model_name} v{version} promoted to Production!")
print(f"Accuracy: {accuracy:.4f}")
# Production 모델 로드
production_model = mlflow.pyfunc.load_model(
model_uri=f"models:/MyClassifier/Production"
)
predictions = production_model.predict(X_test)
5.3 모델 서빙 (MLflow 내장)
# MLflow 모델 서빙
mlflow models serve \
-m "models:/MyClassifier/Production" \
--host 0.0.0.0 \
--port 5001
# 예측 요청
curl -X POST http://localhost:5001/invocations \
-H "Content-Type: application/json" \
-d '{"dataframe_split": {"columns": ["feature1", "feature2"], "data": [[1.0, 2.0]]}}'
6. 컨테이너화 (Docker)
6.1 ML 환경의 Docker화
# Dockerfile.train - 학습용 이미지
FROM python:3.11-slim
# 작업 디렉토리 설정
WORKDIR /app
# 시스템 의존성 설치
RUN apt-get update && apt-get install -y \
gcc \
g++ \
git \
&& rm -rf /var/lib/apt/lists/*
# 파이썬 패키지 설치 (레이어 캐싱 최적화)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 소스 코드 복사
COPY src/ ./src/
COPY params.yaml .
COPY dvc.yaml .
# 학습 실행
CMD ["python", "src/train.py"]
6.2 멀티스테이지 빌드
# Dockerfile.serve - 서빙용 멀티스테이지 빌드
# Stage 1: 빌더
FROM python:3.11 AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir --target /install -r requirements.txt
# Stage 2: 최종 이미지 (슬림)
FROM python:3.11-slim AS runtime
WORKDIR /app
# 빌더에서 패키지 복사
COPY /install /usr/local/lib/python3.11/site-packages
# 서빙 코드만 복사
COPY src/serve.py .
COPY models/ ./models/
# 포트 노출
EXPOSE 8080
# 비루트 사용자로 실행 (보안)
RUN useradd -m -u 1000 mluser
USER mluser
CMD ["python", "serve.py"]
6.3 GPU Docker (nvidia-docker)
# Dockerfile.gpu - GPU 지원 이미지
FROM nvidia/cuda:12.1.0-cudnn8-devel-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
python3.11 \
python3-pip \
git \
&& rm -rf /var/lib/apt/lists/*
# PyTorch GPU 버전 설치
RUN pip3 install torch torchvision torchaudio \
--index-url https://download.pytorch.org/whl/cu121
WORKDIR /app
COPY requirements-gpu.txt .
RUN pip3 install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
CMD ["python3", "src/train_gpu.py"]
# GPU 컨테이너 실행
docker run --gpus all \
-v /data:/data \
-v /models:/models \
--shm-size=8gb \
my-ml-gpu:latest
6.4 Docker Compose로 ML 스택
# docker-compose.yml
version: '3.8'
services:
# MLflow 추적 서버
mlflow:
image: ghcr.io/mlflow/mlflow:latest
command: >
mlflow server
--backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
--default-artifact-root s3://my-bucket/mlflow
--host 0.0.0.0
--port 5000
ports:
- '5000:5000'
environment:
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
depends_on:
- postgres
# PostgreSQL (MLflow 백엔드)
postgres:
image: postgres:15
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- '5432:5432'
# MinIO (S3 호환 스토리지)
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- '9000:9000'
- '9001:9001'
volumes:
- minio_data:/data
# 학습 워커
trainer:
build:
context: .
dockerfile: Dockerfile.train
environment:
MLFLOW_TRACKING_URI: http://mlflow:5000
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin
MLFLOW_S3_ENDPOINT_URL: http://minio:9000
volumes:
- ./data:/app/data
- ./models:/app/models
depends_on:
- mlflow
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
postgres_data:
minio_data:
6.5 실전: PyTorch 모델 서버 Dockerfile
# Dockerfile.pytorch-serve
FROM python:3.11-slim
WORKDIR /app
# 의존성
RUN pip install --no-cache-dir \
torch==2.2.0+cpu \
torchvision \
fastapi \
uvicorn \
pydantic \
numpy \
pillow \
--index-url https://download.pytorch.org/whl/cpu
COPY src/server.py .
COPY models/model.pt .
EXPOSE 8000
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# src/server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import List
app = FastAPI(title="ML Model Server")
# 모델 로드 (서버 시작 시 한 번만)
model = torch.jit.load("model.pt")
model.eval()
class PredictionRequest(BaseModel):
features: List[List[float]]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
tensor = torch.tensor(request.features, dtype=torch.float32)
with torch.no_grad():
outputs = model(tensor)
probabilities = torch.softmax(outputs, dim=1)
predictions = torch.argmax(probabilities, dim=1)
return PredictionResponse(
predictions=predictions.tolist(),
probabilities=probabilities.tolist()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
7. Kubernetes for ML
7.1 ML 워크로드를 위한 K8s 기초
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: ml-platform
labels:
app: ml-platform
environment: production
7.2 GPU 노드 풀 설정
# k8s/gpu-node-pool.yaml (GKE 예시)
apiVersion: container.googleapis.com/v1
kind: NodePool
metadata:
name: gpu-pool
spec:
machineType: n1-standard-8
accelerators:
- acceleratorCount: 1
acceleratorType: nvidia-tesla-t4
autoscaling:
minNodeCount: 0
maxNodeCount: 10
nodeConfig:
labels:
accelerator: nvidia-t4
taints:
- key: nvidia.com/gpu
value: present
effect: NoSchedule
7.3 학습 Job
# k8s/training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: ml-training-job
namespace: ml-platform
spec:
backoffLimit: 3
template:
spec:
restartPolicy: OnFailure
# GPU 노드에 스케줄링
nodeSelector:
accelerator: nvidia-t4
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: trainer
image: my-registry/ml-trainer:v1.2.0
resources:
requests:
memory: '8Gi'
cpu: '4'
nvidia.com/gpu: '1'
limits:
memory: '16Gi'
cpu: '8'
nvidia.com/gpu: '1'
env:
- name: MLFLOW_TRACKING_URI
value: 'http://mlflow-service.ml-platform:5000'
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access_key_id
volumeMounts:
- name: training-data
mountPath: /data
- name: model-output
mountPath: /models
- name: dshm
mountPath: /dev/shm
volumes:
- name: training-data
persistentVolumeClaim:
claimName: training-data-pvc
- name: model-output
persistentVolumeClaim:
claimName: model-output-pvc
- name: dshm # PyTorch DataLoader 공유 메모리
emptyDir:
medium: Memory
sizeLimit: 8Gi
7.4 정기 학습 CronJob
# k8s/training-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: daily-retraining
namespace: ml-platform
spec:
schedule: '0 2 * * *' # 매일 오전 2시
concurrencyPolicy: Forbid # 동시 실행 방지
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: retrainer
image: my-registry/ml-trainer:latest
command: ['python', 'src/retrain.py']
env:
- name: TRAINING_DATE
value: '$(date +%Y-%m-%d)'
resources:
requests:
memory: '4Gi'
cpu: '2'
limits:
memory: '8Gi'
cpu: '4'
7.5 모델 서빙 Deployment
# k8s/model-serving-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-server
namespace: ml-platform
spec:
replicas: 3
selector:
matchLabels:
app: model-server
template:
metadata:
labels:
app: model-server
spec:
containers:
- name: model-server
image: my-registry/model-server:v1.0.0
ports:
- containerPort: 8000
resources:
requests:
memory: '2Gi'
cpu: '1'
limits:
memory: '4Gi'
cpu: '2'
# 헬스체크
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 15
periodSeconds: 5
# 환경변수 (ConfigMap에서)
envFrom:
- configMapRef:
name: model-server-config
---
apiVersion: v1
kind: Service
metadata:
name: model-server-service
namespace: ml-platform
spec:
selector:
app: model-server
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
# HPA (수평 자동 확장)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: model-server-hpa
namespace: ml-platform
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: model-server
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
8. Kubeflow
8.1 Kubeflow 소개
Kubeflow는 Kubernetes 위에서 ML 워크플로를 관리하는 오픈소스 플랫폼입니다. 다음 구성요소로 이루어져 있습니다:
- Pipelines: ML 워크플로 오케스트레이션
- Notebooks: 관리형 Jupyter 노트북
- Katib: 하이퍼파라미터 튜닝
- KServe: 모델 서빙
- Training Operator: 분산 학습
8.2 Kubeflow Pipelines
# kubeflow_pipeline.py
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
import kfp.components as comp
# 컴포넌트 정의
@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(data_path: str, output_path: kfp.dsl.OutputPath(str)):
"""데이터 준비 컴포넌트"""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_parquet(data_path)
train, test = train_test_split(df, test_size=0.2)
train.to_parquet(f"{output_path}/train.parquet")
test.to_parquet(f"{output_path}/test.parquet")
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "mlflow", "pandas"]
)
def train_model(
data_path: str,
n_estimators: int,
max_depth: int,
model_output: kfp.dsl.OutputPath(str),
mlflow_uri: str
):
"""모델 학습 컴포넌트"""
import mlflow
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
mlflow.set_tracking_uri(mlflow_uri)
df = pd.read_parquet(f"{data_path}/train.parquet")
X = df.drop('target', axis=1)
y = df['target']
with mlflow.start_run():
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth
)
model.fit(X, y)
run_id = mlflow.active_run().info.run_id
mlflow.sklearn.log_model(model, "model")
with open(model_output, 'w') as f:
f.write(run_id)
@dsl.component(packages_to_install=["mlflow", "scikit-learn", "pandas"])
def evaluate_model(
data_path: str,
run_id_path: str,
mlflow_uri: str
) -> float:
"""모델 평가 컴포넌트"""
import mlflow
import pandas as pd
mlflow.set_tracking_uri(mlflow_uri)
with open(run_id_path, 'r') as f:
run_id = f.read()
model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
test_df = pd.read_parquet(f"{data_path}/test.parquet")
X_test = test_df.drop('target', axis=1)
y_test = test_df['target']
accuracy = model.score(X_test, y_test)
return accuracy
# 파이프라인 정의
@dsl.pipeline(
name="ML Training Pipeline",
description="End-to-end ML training pipeline"
)
def ml_pipeline(
data_path: str = "gs://my-bucket/data/train.parquet",
n_estimators: int = 100,
max_depth: int = 5,
mlflow_uri: str = "http://mlflow:5000"
):
# 데이터 준비
prepare_task = prepare_data(data_path=data_path)
# 모델 학습
train_task = train_model(
data_path=prepare_task.output,
n_estimators=n_estimators,
max_depth=max_depth,
mlflow_uri=mlflow_uri
)
# 모델 평가
evaluate_task = evaluate_model(
data_path=prepare_task.output,
run_id_path=train_task.outputs['model_output'],
mlflow_uri=mlflow_uri
)
# 파이프라인 컴파일 및 실행
if __name__ == "__main__":
# 파이프라인 컴파일
kfp.compiler.Compiler().compile(
pipeline_func=ml_pipeline,
package_path="ml_pipeline.yaml"
)
# Kubeflow에 제출
client = kfp.Client(host="http://kubeflow-host/pipeline")
run = client.create_run_from_pipeline_package(
pipeline_file="ml_pipeline.yaml",
arguments={
"n_estimators": 200,
"max_depth": 7,
},
run_name="training-run-001"
)
8.3 Katib 하이퍼파라미터 튜닝
# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: hp-tuning-experiment
namespace: kubeflow
spec:
objective:
type: maximize
goal: 0.95
objectiveMetricName: accuracy
additionalMetricNames:
- f1_score
algorithm:
algorithmName: bayesianoptimization
parallelTrialCount: 3
maxTrialCount: 20
maxFailedTrialCount: 5
parameters:
- name: learning_rate
parameterType: double
feasibleSpace:
min: '0.0001'
max: '0.01'
- name: n_estimators
parameterType: int
feasibleSpace:
min: '50'
max: '500'
- name: max_depth
parameterType: int
feasibleSpace:
min: '3'
max: '10'
trialTemplate:
primaryContainerName: training-container
trialParameters:
- name: learning_rate
description: Learning rate
reference: learning_rate
- name: n_estimators
description: Number of estimators
reference: n_estimators
- name: max_depth
description: Max tree depth
reference: max_depth
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
restartPolicy: Never
containers:
- name: training-container
image: my-registry/trainer:latest
command:
- 'python'
- 'train.py'
- '--lr=${trialParameters.learning_rate}'
- '--n-estimators=${trialParameters.n_estimators}'
- '--max-depth=${trialParameters.max_depth}'
9. CI/CD for ML
9.1 GitHub Actions for ML
# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'src/**'
- 'params.yaml'
- 'requirements.txt'
pull_request:
branches: [main]
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
jobs:
test:
name: Unit Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run unit tests
run: |
pytest tests/unit/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: coverage.xml
data-validation:
name: Data Validation
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v4
- name: Setup DVC
run: pip install "dvc[s3]"
- name: Pull data
run: dvc pull data/
- name: Validate data
run: python src/validate_data.py
train-and-evaluate:
name: Train and Evaluate Model
runs-on: ubuntu-latest
needs: data-validation
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run DVC pipeline
run: dvc repro
- name: Check model performance
run: |
python scripts/check_metrics.py \
--min-accuracy 0.90 \
--metrics-file metrics/scores.json
- name: Register model if metrics pass
run: python scripts/register_model.py
- name: Report metrics
uses: iterative/cml@v2
with:
report-type: md
metrics: metrics/scores.json
build-and-push:
name: Build Docker Image
runs-on: ubuntu-latest
needs: train-and-evaluate
steps:
- uses: actions/checkout@v4
- name: Login to Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
file: Dockerfile.serve
push: true
tags: |
ghcr.io/${{ github.repository }}/model-server:latest
ghcr.io/${{ github.repository }}/model-server:${{ github.sha }}
deploy:
name: Deploy to Kubernetes
runs-on: ubuntu-latest
needs: build-and-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy
run: |
kubectl set image deployment/model-server \
model-server=ghcr.io/${{ github.repository }}/model-server:${{ github.sha }} \
-n ml-platform
kubectl rollout status deployment/model-server -n ml-platform
10. 모델 모니터링
10.1 Evidently AI로 데이터 드리프트 감지
pip install evidently
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import *
# 참조 데이터 (학습 시 데이터)
reference_data = pd.read_parquet("data/reference.parquet")
# 현재 데이터 (프로덕션 예측 데이터)
current_data = pd.read_parquet("data/current.parquet")
# 데이터 드리프트 리포트
drift_report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
ColumnDriftMetric(column_name="age"),
ColumnDriftMetric(column_name="income"),
])
drift_report.run(
reference_data=reference_data,
current_data=current_data
)
# HTML 리포트 저장
drift_report.save_html("drift_report.html")
# JSON으로 결과 추출
result = drift_report.as_dict()
drift_detected = result['metrics'][0]['result']['dataset_drift']
if drift_detected:
print("Data drift detected! Consider retraining.")
# 알림 전송 (Slack, Email 등)
send_alert("Data drift detected in production model")
10.2 모델 성능 모니터링
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset
from evidently.metrics import ClassificationQualityMetric
# 예측 데이터 준비 (실제 레이블이 있는 경우)
prediction_data = pd.DataFrame({
'target': y_true,
'prediction': y_pred,
'prediction_proba': y_proba,
'feature1': X_test['feature1'],
'feature2': X_test['feature2'],
})
# 분류 성능 리포트
performance_report = Report(metrics=[
ClassificationPreset(),
ClassificationQualityMetric(),
])
performance_report.run(
reference_data=reference_predictions,
current_data=prediction_data
)
# 결과 추출
metrics = performance_report.as_dict()
current_accuracy = metrics['metrics'][0]['result']['current']['accuracy']
reference_accuracy = metrics['metrics'][0]['result']['reference']['accuracy']
degradation = reference_accuracy - current_accuracy
if degradation > 0.05: # 5% 이상 성능 저하
trigger_retraining(f"Model degraded by {degradation:.2%}")
10.3 실시간 예측 모니터링 파이프라인
import logging
from datetime import datetime
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Prometheus 메트릭 정의
prediction_counter = Counter('model_predictions_total', 'Total predictions')
prediction_latency = Histogram('model_prediction_duration_seconds', 'Prediction latency')
model_accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
drift_score_gauge = Gauge('data_drift_score', 'Data drift score')
class MonitoredModelServer:
def __init__(self, model, reference_data):
self.model = model
self.reference_data = reference_data
self.predictions_buffer = []
self.labels_buffer = []
# Prometheus 메트릭 서버 시작
start_http_server(8001)
def predict(self, features):
with prediction_latency.time():
prediction = self.model.predict(features)
prediction_counter.inc()
self.predictions_buffer.append(prediction)
# 버퍼가 1000개 차면 모니터링 실행
if len(self.predictions_buffer) >= 1000:
self._run_monitoring()
return prediction
def _run_monitoring(self):
"""주기적 모니터링 실행"""
current_df = pd.DataFrame(self.predictions_buffer)
# 드리프트 감지
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
reference_data=self.reference_data,
current_data=current_df
)
result = drift_report.as_dict()
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
drift_score_gauge.set(drift_share)
if drift_share > 0.3:
logging.warning(f"High drift detected: {drift_share:.2%} of columns drifted")
# 버퍼 초기화
self.predictions_buffer = []
11. Feature Store
11.1 Feature Store 개념
Feature Store는 ML 피처(특성)를 중앙에서 관리하고 재사용할 수 있게 해주는 인프라입니다.
핵심 개념:
- 온라인 스토어: 실시간 예측을 위한 저지연 피처 조회 (Redis, DynamoDB)
- 오프라인 스토어: 배치 학습을 위한 히스토리컬 피처 (S3, BigQuery)
- 피처 뷰: 피처 정의와 데이터 소스 매핑
- 엔티티: 피처의 주체 (사용자 ID, 상품 ID 등)
11.2 Feast 설치 및 설정
# Feast 설치
pip install feast
# Feast 프로젝트 초기화
feast init my-feature-store
cd my-feature-store
# feature_store.yaml
project: my_feature_store
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file
entity_key_serialization_version: 2
# features.py - 피처 정의
from feast import (
Entity, Feature, FeatureView,
FileSource, ValueType, Field
)
from feast.types import Float64, Int64, String
from datetime import timedelta
# 엔티티 정의
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User ID"
)
# 데이터 소스
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
)
# 피처 뷰 정의
user_stats_fv = FeatureView(
name="user_statistics",
entities=[user],
ttl=timedelta(days=30),
schema=[
Field(name="purchase_count_7d", dtype=Int64),
Field(name="purchase_amount_7d", dtype=Float64),
Field(name="avg_session_duration", dtype=Float64),
Field(name="last_purchase_days_ago", dtype=Int64),
],
online=True,
source=user_stats_source,
)
# feast_usage.py - Feast 활용
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# 오프라인 피처 조회 (학습용)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"event_timestamp": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"])
})
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_statistics:purchase_count_7d",
"user_statistics:purchase_amount_7d",
"user_statistics:avg_session_duration",
]
).to_df()
print(training_df.head())
# 온라인 피처 조회 (실시간 예측용)
# 먼저 온라인 스토어에 데이터 로드
store.materialize_incremental(end_date=datetime.now())
# 실시간 조회
online_features = store.get_online_features(
features=[
"user_statistics:purchase_count_7d",
"user_statistics:purchase_amount_7d",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
]
).to_dict()
print(online_features)
12. 실전 MLOps 프로젝트
12.1 전체 파이프라인 아키텍처
데이터 소스 (DB, S3, API)
↓
데이터 수집 (Airflow DAG)
↓
데이터 검증 (Great Expectations)
↓
피처 엔지니어링 (Feast)
↓
모델 학습 (MLflow 추적)
↓
모델 평가 (자동화된 검증)
↓
모델 레지스트리 (MLflow Registry)
↓
CI/CD (GitHub Actions)
↓
컨테이너 빌드 (Docker)
↓
K8s 배포 (Kubernetes)
↓
서빙 (FastAPI + Triton)
↓
모니터링 (Evidently + Prometheus + Grafana)
↓
알림 (드리프트 감지 시 재학습 트리거)
12.2 실전 프로젝트: 고객 이탈 예측 시스템
# scripts/full_pipeline.py
"""
전체 MLOps 파이프라인: 고객 이탈 예측
"""
import mlflow
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
accuracy_score, roc_auc_score,
classification_report, confusion_matrix
)
from sklearn.preprocessing import StandardScaler
import joblib
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChurnPredictionPipeline:
"""고객 이탈 예측 파이프라인"""
def __init__(self, mlflow_uri: str = "http://localhost:5000"):
mlflow.set_tracking_uri(mlflow_uri)
mlflow.set_experiment("churn-prediction")
self.scaler = StandardScaler()
def load_data(self, data_path: str) -> pd.DataFrame:
"""데이터 로드 및 기본 검증"""
logger.info(f"Loading data from {data_path}")
df = pd.read_parquet(data_path)
# 기본 데이터 품질 검사
assert df.shape[0] > 1000, "Too few samples"
assert 'churn' in df.columns, "Target column 'churn' missing"
assert df['churn'].nunique() == 2, "Binary classification expected"
missing_rate = df.isnull().mean()
high_missing = missing_rate[missing_rate > 0.5].index.tolist()
if high_missing:
logger.warning(f"Columns with >50% missing: {high_missing}")
df = df.drop(columns=high_missing)
logger.info(f"Data loaded: {df.shape[0]} rows, {df.shape[1]} columns")
return df
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""피처 엔지니어링"""
logger.info("Engineering features...")
# 결측값 처리
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
# 파생 피처 생성
if 'tenure_months' in df.columns and 'monthly_charges' in df.columns:
df['total_value'] = df['tenure_months'] * df['monthly_charges']
if 'num_support_tickets' in df.columns and 'tenure_months' in df.columns:
df['tickets_per_month'] = (
df['num_support_tickets'] / (df['tenure_months'] + 1)
)
return df
def train(
self,
df: pd.DataFrame,
params: dict = None
) -> str:
"""모델 학습 및 MLflow 추적"""
if params is None:
params = {
"n_estimators": 200,
"max_depth": 5,
"learning_rate": 0.05,
"subsample": 0.8,
"random_state": 42,
}
feature_cols = [c for c in df.columns if c != 'churn']
X = df[feature_cols]
y = df['churn']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 스케일링
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
with mlflow.start_run(run_name="churn-gbt"):
# 파라미터 로깅
mlflow.log_params(params)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("test_size", len(X_test))
mlflow.log_param("n_features", len(feature_cols))
# 모델 학습
model = GradientBoostingClassifier(**params)
model.fit(X_train_scaled, y_train)
# 평가
y_pred = model.predict(X_test_scaled)
y_proba = model.predict_proba(X_test_scaled)[:, 1]
accuracy = accuracy_score(y_test, y_pred)
auc_roc = roc_auc_score(y_test, y_proba)
mlflow.log_metrics({
"accuracy": accuracy,
"auc_roc": auc_roc,
})
# 교차 검증
cv_scores = cross_val_score(
model, X_train_scaled, y_train, cv=5, scoring='roc_auc'
)
mlflow.log_metric("cv_auc_mean", cv_scores.mean())
mlflow.log_metric("cv_auc_std", cv_scores.std())
# 피처 중요도
feature_importance = pd.DataFrame({
'feature': feature_cols,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
feature_importance.to_csv('/tmp/feature_importance.csv', index=False)
mlflow.log_artifact('/tmp/feature_importance.csv')
# 모델 저장
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="ChurnPredictor"
)
run_id = mlflow.active_run().info.run_id
logger.info(f"Training complete. Accuracy: {accuracy:.4f}, AUC-ROC: {auc_roc:.4f}")
logger.info(f"MLflow Run ID: {run_id}")
return run_id
def promote_best_model(self, min_auc: float = 0.85):
"""최고 성능 모델을 Production으로 승격"""
client = mlflow.tracking.MlflowClient()
# 최신 실험 결과 조회
experiment = client.get_experiment_by_name("churn-prediction")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string="metrics.auc_roc > 0.80",
order_by=["metrics.auc_roc DESC"],
max_results=1
)
if not runs:
raise ValueError("No runs found meeting criteria")
best_run = runs[0]
auc = best_run.data.metrics['auc_roc']
if auc < min_auc:
raise ValueError(f"Best AUC {auc:.4f} below minimum {min_auc}")
# 모델 등록 및 Production 승격
model_uri = f"runs:/{best_run.info.run_id}/model"
mv = mlflow.register_model(model_uri, "ChurnPredictor")
client.transition_model_version_stage(
name="ChurnPredictor",
version=mv.version,
stage="Production",
archive_existing_versions=True
)
logger.info(f"Model v{mv.version} promoted to Production! AUC: {auc:.4f}")
return mv.version
# 실행
if __name__ == "__main__":
pipeline = ChurnPredictionPipeline()
df = pipeline.load_data("data/customers.parquet")
df = pipeline.engineer_features(df)
run_id = pipeline.train(df)
pipeline.promote_best_model(min_auc=0.85)
마무리
MLOps는 단순히 도구를 배우는 것이 아니라, ML 시스템을 안정적이고 재현 가능하며 확장 가능하게 운영하는 문화와 프로세스입니다.
핵심 원칙을 기억하세요:
- 모든 것을 버전 관리하라: 코드, 데이터, 모델, 환경 모두
- 자동화를 최우선으로: 수동 과정은 오류의 원인
- 측정하고 모니터링하라: 측정하지 않으면 개선할 수 없다
- 실패를 빠르게 감지하라: 드리프트, 성능 저하를 즉시 감지
- 재현 가능성을 보장하라: 누구든, 언제든 동일한 결과를 재현할 수 있어야 함
MLOps 여정은 점진적입니다. 레벨 0에서 시작해 조직의 필요에 맞게 성숙도를 높여가세요.