Skip to content
Published on

MLOps 완전 정복 가이드: ML 파이프라인부터 프로덕션 배포까지

Authors

들어가며

현대의 머신러닝 프로젝트는 Jupyter Notebook에서 모델을 만들고 끝나는 게 아닙니다. 데이터 수집, 전처리, 학습, 평가, 배포, 모니터링, 재학습까지 전체 라이프사이클을 안정적으로 관리해야 합니다. 이것이 바로 MLOps가 등장한 이유입니다.

이 가이드에서는 MLOps의 기초 개념부터 실전 도구 활용까지 완전히 정복합니다.


1. MLOps란 무엇인가?

1.1 MLOps의 정의

MLOps(Machine Learning Operations)는 ML 시스템의 개발과 운영을 통합하는 방법론입니다. DevOps의 철학을 ML 워크플로에 적용한 것으로, 모델의 지속적인 학습(CT), 지속적인 통합(CI), 지속적인 배포(CD)를 자동화합니다.

1.2 MLOps vs DevOps vs DataOps

구분DevOpsDataOpsMLOps
초점소프트웨어 배포데이터 파이프라인ML 모델 라이프사이클
산출물애플리케이션데이터/리포트ML 모델
재현성코드 버전데이터 버전코드+데이터+모델 버전
자동화CI/CD데이터 테스트학습+평가+배포 자동화

MLOps가 DevOps와 다른 핵심은 데이터 의존성입니다. 동일한 코드라도 다른 데이터로 학습하면 전혀 다른 모델이 생성되며, 모델 성능은 코드 품질뿐만 아니라 데이터 품질에도 크게 좌우됩니다.

1.3 ML 시스템의 특수성

ML 시스템은 전통적인 소프트웨어와 다른 몇 가지 고유한 특성을 가집니다:

  1. 데이터 의존성: 모델 동작이 코드가 아닌 데이터에 의해 결정됨
  2. 실험적 특성: 수십~수백 번의 실험 반복이 필요
  3. 모델 부식(Model Decay): 시간이 지남에 따라 데이터 분포가 변하면 모델 성능 저하
  4. 재현성 문제: 동일한 환경에서 동일한 결과를 보장하기 어려움
  5. 멀티플 아티팩트: 코드, 데이터, 모델 모두 버전 관리 필요

1.4 MLOps 성숙도 모델

Google의 MLOps 성숙도 레벨을 기준으로 설명합니다:

레벨 0 - 수동 ML

  • 모든 과정이 수동
  • 스크립트 기반 실험
  • 배포가 드물고 수동
  • 모니터링 없음

레벨 1 - ML 파이프라인 자동화

  • 학습 파이프라인 자동화
  • 지속적 학습 가능
  • 실험 추적 시작
  • 피처 스토어 도입

레벨 2 - CI/CD 파이프라인 자동화

  • CI/CD 완전 자동화
  • 모델 레지스트리 활용
  • 자동 재학습 트리거
  • 완전한 모니터링

1.5 MLOps 도구 에코시스템

데이터 버전 관리: DVC, LakeFS, Delta Lake
실험 추적: MLflow, W&B, Neptune, Comet ML
파이프라인: Airflow, Prefect, Metaflow, Kubeflow Pipelines
모델 레지스트리: MLflow Registry, W&B Artifacts, Vertex AI
컨테이너화: Docker, Podman
오케스트레이션: Kubernetes, ECS, GKE
모델 서빙: Triton, TorchServe, BentoML, KServe
모니터링: Evidently, WhyLogs, Arize, Fiddler
피처 스토어: Feast, Tecton, Vertex AI Feature Store

2. 데이터 버전 관리 (DVC)

2.1 DVC 소개

DVC(Data Version Control)는 Git과 함께 사용하는 ML 프로젝트용 버전 관리 도구입니다. 대용량 데이터셋과 ML 모델을 Git처럼 버전 관리할 수 있습니다.

# DVC 설치
pip install dvc

# S3 지원 포함 설치
pip install "dvc[s3]"

# GCS 지원 포함 설치
pip install "dvc[gs]"

# 모든 원격 저장소 지원
pip install "dvc[all]"

2.2 DVC 초기화 및 기본 사용

# Git 저장소 초기화 (없다면)
git init

# DVC 초기화
dvc init

# 생성된 파일 확인
ls .dvc/
# config  .gitignore  tmp/

# 데이터 추적 시작
dvc add data/train.csv

# .dvc 파일이 생성됨 (Git으로 추적)
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data"

2.3 원격 스토리지 설정

# S3 원격 스토리지 설정
dvc remote add -d myremote s3://my-bucket/dvc-store

# AWS 자격증명 설정 (환경변수 또는 ~/.aws/credentials)
dvc remote modify myremote access_key_id YOUR_ACCESS_KEY
dvc remote modify myremote secret_access_key YOUR_SECRET_KEY

# GCS 원격 스토리지
dvc remote add -d gcsstorage gs://my-bucket/dvc-store

# 로컬 원격 스토리지 (테스트용)
dvc remote add -d localremote /tmp/dvc-storage

# 데이터 푸시
dvc push

# 데이터 풀
dvc pull

2.4 DVC 파이프라인

DVC 파이프라인은 각 단계의 의존성을 추적하고 변경된 단계만 재실행합니다.

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw
    outs:
      - data/prepared

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/prepared
    outs:
      - data/features

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features
    params:
      - params.yaml:
          - train.lr
          - train.n_estimators
    outs:
      - models/model.pkl
    metrics:
      - metrics/scores.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/features
    metrics:
      - metrics/eval.json:
          cache: false
# 파이프라인 실행
dvc repro

# 특정 단계까지 실행
dvc repro train

# 파이프라인 DAG 시각화
dvc dag

# 실험 결과 비교
dvc metrics show
dvc metrics diff HEAD~1

2.5 파라미터 관리

# params.yaml
train:
  lr: 0.001
  n_estimators: 100
  max_depth: 5
  batch_size: 32
  epochs: 10

data:
  test_size: 0.2
  random_state: 42
# src/train.py
import dvc.api
import yaml

# 파라미터 로드
with open("params.yaml") as f:
    params = yaml.safe_load(f)

lr = params["train"]["lr"]
n_estimators = params["train"]["n_estimators"]

# 모델 학습
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(
    n_estimators=n_estimators,
    max_depth=params["train"]["max_depth"],
    random_state=params["data"]["random_state"]
)

2.6 Git + DVC 워크플로

# 새 실험 브랜치 생성
git checkout -b experiment/increase-lr

# 파라미터 수정
# params.yaml에서 lr: 0.001 → lr: 0.01로 변경

# 파이프라인 재실행
dvc repro

# 결과 확인
dvc metrics show

# 커밋
git add dvc.lock params.yaml
git commit -m "Experiment: increase learning rate to 0.01"

# 실험 비교
git checkout main
dvc metrics diff experiment/increase-lr

3. 실험 추적 (Experiment Tracking)

3.1 MLflow 설치 및 설정

# MLflow 설치
pip install mlflow

# MLflow UI 시작
mlflow ui

# 특정 포트와 호스트 지정
mlflow ui --host 0.0.0.0 --port 5001

# 원격 추적 서버 설정
export MLFLOW_TRACKING_URI=http://mlflow-server:5000

3.2 MLflow 기본 사용

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

# 추적 URI 설정 (기본값: ./mlruns)
mlflow.set_tracking_uri("http://localhost:5000")

# 실험 설정
mlflow.set_experiment("my-classification-experiment")

# 실험 실행
with mlflow.start_run(run_name="random-forest-v1"):
    # 파라미터 로깅
    n_estimators = 100
    max_depth = 5
    lr = 0.001

    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    mlflow.log_param("learning_rate", lr)

    # 모델 학습
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42
    )
    model.fit(X_train, y_train)

    # 메트릭 로깅
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1)

    # 학습 곡선 (스텝별 메트릭)
    for epoch in range(10):
        train_loss = np.random.random() * 0.5 / (epoch + 1)
        mlflow.log_metric("train_loss", train_loss, step=epoch)

    # 아티팩트 로깅
    mlflow.log_artifact("data/train.csv")

    # 모델 로깅
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="RandomForestClassifier"
    )

    print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
    print(f"Run ID: {mlflow.active_run().info.run_id}")

3.3 MLflow 자동 로깅

# 자동 로깅 활성화 (sklearn 예제)
mlflow.sklearn.autolog()

with mlflow.start_run():
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)
    # 파라미터, 메트릭, 모델이 자동으로 로깅됨

# PyTorch 자동 로깅
mlflow.pytorch.autolog()

# XGBoost 자동 로깅
mlflow.xgboost.autolog()

3.4 실전: PyTorch 학습 추적

import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch

class SimpleNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        return self.fc2(x)

def train_with_mlflow(
    model, train_loader, val_loader,
    epochs=10, lr=0.001, experiment_name="pytorch-training"
):
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run():
        # 하이퍼파라미터 로깅
        mlflow.log_params({
            "epochs": epochs,
            "learning_rate": lr,
            "model_type": "SimpleNet",
            "optimizer": "Adam",
            "batch_size": train_loader.batch_size,
        })

        optimizer = optim.Adam(model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

        for epoch in range(epochs):
            # 학습
            model.train()
            train_loss = 0.0
            for batch_X, batch_y in train_loader:
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            train_loss /= len(train_loader)

            # 검증
            model.eval()
            val_loss = 0.0
            correct = 0
            total = 0

            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    outputs = model(batch_X)
                    loss = criterion(outputs, batch_y)
                    val_loss += loss.item()

                    _, predicted = outputs.max(1)
                    total += batch_y.size(0)
                    correct += predicted.eq(batch_y).sum().item()

            val_loss /= len(val_loader)
            val_accuracy = correct / total

            # 스텝별 메트릭 로깅
            mlflow.log_metrics({
                "train_loss": train_loss,
                "val_loss": val_loss,
                "val_accuracy": val_accuracy,
                "learning_rate": scheduler.get_last_lr()[0],
            }, step=epoch)

            scheduler.step()

            print(f"Epoch {epoch+1}/{epochs} | "
                  f"Train Loss: {train_loss:.4f} | "
                  f"Val Loss: {val_loss:.4f} | "
                  f"Val Acc: {val_accuracy:.4f}")

        # 최종 모델 저장
        mlflow.pytorch.log_model(model, "model")

        # 모델 구조 텍스트로 저장
        with open("model_summary.txt", "w") as f:
            f.write(str(model))
        mlflow.log_artifact("model_summary.txt")

        return mlflow.active_run().info.run_id

3.5 W&B (Weights & Biases) 사용법

# W&B 설치
pip install wandb

# 로그인
wandb login
import wandb
import torch
import torch.nn as nn

# W&B 초기화
wandb.init(
    project="my-ml-project",
    name="experiment-001",
    config={
        "learning_rate": 0.001,
        "epochs": 100,
        "batch_size": 64,
        "architecture": "ResNet50",
        "dataset": "ImageNet",
    }
)

# config 접근
lr = wandb.config.learning_rate

# 학습 루프 예제
for epoch in range(wandb.config.epochs):
    train_loss, train_acc = train_epoch(model, train_loader)
    val_loss, val_acc = eval_epoch(model, val_loader)

    # 메트릭 로깅
    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "train_accuracy": train_acc,
        "val_loss": val_loss,
        "val_accuracy": val_acc,
        "learning_rate": optimizer.param_groups[0]['lr'],
    })

# 모델 저장
wandb.save("model.pth")

# 이미지 로깅 예제
wandb.log({
    "predictions": [
        wandb.Image(img, caption=f"Pred: {pred}, True: {true}")
        for img, pred, true in sample_predictions
    ]
})

# W&B Artifacts (데이터셋 버전 관리)
artifact = wandb.Artifact("training-data", type="dataset")
artifact.add_dir("data/train")
wandb.log_artifact(artifact)

# 실험 종료
wandb.finish()

3.6 W&B Sweeps (하이퍼파라미터 최적화)

import wandb

# sweep 설정
sweep_config = {
    "method": "bayes",  # random, grid, bayes
    "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
    },
    "parameters": {
        "learning_rate": {
            "distribution": "log_uniform_values",
            "min": 1e-5,
            "max": 1e-1,
        },
        "batch_size": {
            "values": [16, 32, 64, 128]
        },
        "hidden_dim": {
            "values": [64, 128, 256, 512]
        },
        "dropout": {
            "distribution": "uniform",
            "min": 0.1,
            "max": 0.5,
        }
    }
}

def train_sweep():
    with wandb.init() as run:
        config = wandb.config

        model = SimpleNet(
            input_dim=784,
            hidden_dim=config.hidden_dim,
            output_dim=10
        )

        optimizer = torch.optim.Adam(
            model.parameters(),
            lr=config.learning_rate
        )

        # 학습 루프 (간략화)
        for epoch in range(10):
            train_loss, val_loss, val_acc = run_epoch(
                model, optimizer, config.batch_size
            )
            wandb.log({
                "train_loss": train_loss,
                "val_loss": val_loss,
                "val_accuracy": val_acc,
            })

# sweep 생성 및 실행
sweep_id = wandb.sweep(sweep_config, project="my-project")
wandb.agent(sweep_id, function=train_sweep, count=50)

4. ML 파이프라인 오케스트레이션

4.1 Apache Airflow로 ML 파이프라인

# dags/ml_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'mlops-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='ML 모델 학습 파이프라인',
    schedule_interval='0 2 * * *',  # 매일 오전 2시
    catchup=False,
)

def extract_data(**kwargs):
    """데이터 추출"""
    import pandas as pd
    from sqlalchemy import create_engine

    engine = create_engine('postgresql://user:pass@db:5432/mldb')
    df = pd.read_sql("SELECT * FROM features WHERE date >= CURRENT_DATE - 7", engine)

    output_path = '/tmp/raw_data.parquet'
    df.to_parquet(output_path)

    # XCom으로 경로 전달
    kwargs['ti'].xcom_push(key='data_path', value=output_path)
    return output_path

def preprocess_data(**kwargs):
    """데이터 전처리"""
    ti = kwargs['ti']
    data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')

    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(data_path)

    # 전처리
    scaler = StandardScaler()
    features = df.drop('target', axis=1)
    labels = df['target']

    scaled_features = scaler.fit_transform(features)

    processed_path = '/tmp/processed_data.parquet'
    pd.DataFrame(scaled_features).to_parquet(processed_path)

    ti.xcom_push(key='processed_path', value=processed_path)

def train_model(**kwargs):
    """모델 학습"""
    ti = kwargs['ti']
    processed_path = ti.xcom_pull(
        task_ids='preprocess_data',
        key='processed_path'
    )

    import mlflow
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier

    mlflow.set_experiment("production-training")

    with mlflow.start_run():
        data = pd.read_parquet(processed_path)
        X_train, X_test, y_train, y_test = train_test_split(
            data.drop('target', axis=1),
            data['target'],
            test_size=0.2
        )

        model = GradientBoostingClassifier(n_estimators=200)
        model.fit(X_train, y_train)

        accuracy = model.score(X_test, y_test)
        mlflow.log_metric("accuracy", accuracy)
        mlflow.sklearn.log_model(model, "model")

        run_id = mlflow.active_run().info.run_id
        ti.xcom_push(key='run_id', value=run_id)

def evaluate_and_deploy(**kwargs):
    """모델 평가 및 배포"""
    ti = kwargs['ti']
    run_id = ti.xcom_pull(task_ids='train_model', key='run_id')

    import mlflow
    client = mlflow.tracking.MlflowClient()

    run = client.get_run(run_id)
    accuracy = run.data.metrics['accuracy']

    # 성능 기준 충족시 배포
    if accuracy >= 0.90:
        model_uri = f"runs:/{run_id}/model"
        mlflow.register_model(model_uri, "ProductionModel")
        print(f"Model deployed with accuracy: {accuracy:.4f}")
    else:
        raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.90")

# 태스크 정의
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

preprocess_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

evaluate_task = PythonOperator(
    task_id='evaluate_and_deploy',
    python_callable=evaluate_and_deploy,
    dag=dag,
)

# 의존성 설정
extract_task >> preprocess_task >> train_task >> evaluate_task

4.2 Prefect로 ML 파이프라인

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def load_data(data_path: str) -> pd.DataFrame:
    """데이터 로드 (캐싱 지원)"""
    return pd.read_parquet(data_path)

@task(retries=3, retry_delay_seconds=10)
def preprocess(df: pd.DataFrame) -> pd.DataFrame:
    """데이터 전처리"""
    df = df.dropna()
    df = df.drop_duplicates()
    return df

@task
def train(df: pd.DataFrame, params: dict) -> str:
    """모델 학습"""
    from sklearn.ensemble import RandomForestClassifier

    mlflow.set_experiment("prefect-ml-pipeline")

    with mlflow.start_run():
        model = RandomForestClassifier(**params)
        X = df.drop('target', axis=1)
        y = df['target']

        model.fit(X, y)
        mlflow.sklearn.log_model(model, "model")
        mlflow.log_params(params)

        return mlflow.active_run().info.run_id

@task
def evaluate(run_id: str) -> float:
    """모델 평가"""
    client = mlflow.tracking.MlflowClient()
    run = client.get_run(run_id)
    return run.data.metrics.get('accuracy', 0.0)

@flow(name="ml-training-pipeline")
def ml_pipeline(data_path: str, params: dict = None):
    """메인 파이프라인"""
    if params is None:
        params = {"n_estimators": 100, "max_depth": 5}

    df = load_data(data_path)
    processed_df = preprocess(df)
    run_id = train(processed_df, params)
    accuracy = evaluate(run_id)

    print(f"Pipeline complete. Accuracy: {accuracy:.4f}")
    return accuracy

# 실행
if __name__ == "__main__":
    ml_pipeline(
        data_path="data/train.parquet",
        params={"n_estimators": 200, "max_depth": 7}
    )

5. 모델 레지스트리

5.1 MLflow Model Registry

모델 레지스트리는 모델의 버전을 중앙에서 관리하고, Staging/Production 상태를 추적합니다.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# 모델 등록
model_uri = "runs:/abc123def456/model"
registered_model = mlflow.register_model(model_uri, "MyClassifier")

# 버전 정보 확인
print(f"Version: {registered_model.version}")
print(f"Status: {registered_model.status}")

# 모델 버전 메타데이터 업데이트
client.update_model_version(
    name="MyClassifier",
    version=registered_model.version,
    description="Random Forest with 200 estimators, accuracy 0.94"
)

# 태그 추가
client.set_model_version_tag(
    name="MyClassifier",
    version=registered_model.version,
    key="validated_by",
    value="data-science-team"
)

5.2 Staging → Production 승격

# Staging으로 전환
client.transition_model_version_stage(
    name="MyClassifier",
    version=1,
    stage="Staging",
    archive_existing_versions=False
)

# 검증 후 Production으로 승격
def promote_to_production(model_name: str, version: int, min_accuracy: float = 0.90):
    """모델을 Production으로 승격"""
    # 현재 메트릭 확인
    model_version = client.get_model_version(model_name, version)
    run_id = model_version.run_id
    run = client.get_run(run_id)

    accuracy = run.data.metrics.get('accuracy', 0)

    if accuracy < min_accuracy:
        raise ValueError(
            f"Model accuracy {accuracy:.4f} is below minimum {min_accuracy}"
        )

    # Production으로 승격 (기존 Production 버전은 Archived로)
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production",
        archive_existing_versions=True
    )

    print(f"Model {model_name} v{version} promoted to Production!")
    print(f"Accuracy: {accuracy:.4f}")

# Production 모델 로드
production_model = mlflow.pyfunc.load_model(
    model_uri=f"models:/MyClassifier/Production"
)
predictions = production_model.predict(X_test)

5.3 모델 서빙 (MLflow 내장)

# MLflow 모델 서빙
mlflow models serve \
  -m "models:/MyClassifier/Production" \
  --host 0.0.0.0 \
  --port 5001

# 예측 요청
curl -X POST http://localhost:5001/invocations \
  -H "Content-Type: application/json" \
  -d '{"dataframe_split": {"columns": ["feature1", "feature2"], "data": [[1.0, 2.0]]}}'

6. 컨테이너화 (Docker)

6.1 ML 환경의 Docker화

# Dockerfile.train - 학습용 이미지
FROM python:3.11-slim

# 작업 디렉토리 설정
WORKDIR /app

# 시스템 의존성 설치
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    git \
    && rm -rf /var/lib/apt/lists/*

# 파이썬 패키지 설치 (레이어 캐싱 최적화)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 소스 코드 복사
COPY src/ ./src/
COPY params.yaml .
COPY dvc.yaml .

# 학습 실행
CMD ["python", "src/train.py"]

6.2 멀티스테이지 빌드

# Dockerfile.serve - 서빙용 멀티스테이지 빌드
# Stage 1: 빌더
FROM python:3.11 AS builder

WORKDIR /build

COPY requirements.txt .
RUN pip install --no-cache-dir --target /install -r requirements.txt

# Stage 2: 최종 이미지 (슬림)
FROM python:3.11-slim AS runtime

WORKDIR /app

# 빌더에서 패키지 복사
COPY --from=builder /install /usr/local/lib/python3.11/site-packages

# 서빙 코드만 복사
COPY src/serve.py .
COPY models/ ./models/

# 포트 노출
EXPOSE 8080

# 비루트 사용자로 실행 (보안)
RUN useradd -m -u 1000 mluser
USER mluser

CMD ["python", "serve.py"]

6.3 GPU Docker (nvidia-docker)

# Dockerfile.gpu - GPU 지원 이미지
FROM nvidia/cuda:12.1.0-cudnn8-devel-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \
    python3.11 \
    python3-pip \
    git \
    && rm -rf /var/lib/apt/lists/*

# PyTorch GPU 버전 설치
RUN pip3 install torch torchvision torchaudio \
    --index-url https://download.pytorch.org/whl/cu121

WORKDIR /app
COPY requirements-gpu.txt .
RUN pip3 install --no-cache-dir -r requirements-gpu.txt

COPY src/ ./src/

CMD ["python3", "src/train_gpu.py"]
# GPU 컨테이너 실행
docker run --gpus all \
  -v /data:/data \
  -v /models:/models \
  --shm-size=8gb \
  my-ml-gpu:latest

6.4 Docker Compose로 ML 스택

# docker-compose.yml
version: '3.8'

services:
  # MLflow 추적 서버
  mlflow:
    image: ghcr.io/mlflow/mlflow:latest
    command: >
      mlflow server
      --backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
      --default-artifact-root s3://my-bucket/mlflow
      --host 0.0.0.0
      --port 5000
    ports:
      - '5000:5000'
    environment:
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
    depends_on:
      - postgres

  # PostgreSQL (MLflow 백엔드)
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: mlflow
      POSTGRES_USER: mlflow
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - '5432:5432'

  # MinIO (S3 호환 스토리지)
  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    ports:
      - '9000:9000'
      - '9001:9001'
    volumes:
      - minio_data:/data

  # 학습 워커
  trainer:
    build:
      context: .
      dockerfile: Dockerfile.train
    environment:
      MLFLOW_TRACKING_URI: http://mlflow:5000
      AWS_ACCESS_KEY_ID: minioadmin
      AWS_SECRET_ACCESS_KEY: minioadmin
      MLFLOW_S3_ENDPOINT_URL: http://minio:9000
    volumes:
      - ./data:/app/data
      - ./models:/app/models
    depends_on:
      - mlflow
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

volumes:
  postgres_data:
  minio_data:

6.5 실전: PyTorch 모델 서버 Dockerfile

# Dockerfile.pytorch-serve
FROM python:3.11-slim

WORKDIR /app

# 의존성
RUN pip install --no-cache-dir \
    torch==2.2.0+cpu \
    torchvision \
    fastapi \
    uvicorn \
    pydantic \
    numpy \
    pillow \
    --index-url https://download.pytorch.org/whl/cpu

COPY src/server.py .
COPY models/model.pt .

EXPOSE 8000

CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# src/server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import List

app = FastAPI(title="ML Model Server")

# 모델 로드 (서버 시작 시 한 번만)
model = torch.jit.load("model.pt")
model.eval()

class PredictionRequest(BaseModel):
    features: List[List[float]]

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    try:
        tensor = torch.tensor(request.features, dtype=torch.float32)

        with torch.no_grad():
            outputs = model(tensor)
            probabilities = torch.softmax(outputs, dim=1)
            predictions = torch.argmax(probabilities, dim=1)

        return PredictionResponse(
            predictions=predictions.tolist(),
            probabilities=probabilities.tolist()
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

7. Kubernetes for ML

7.1 ML 워크로드를 위한 K8s 기초

# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: ml-platform
  labels:
    app: ml-platform
    environment: production

7.2 GPU 노드 풀 설정

# k8s/gpu-node-pool.yaml (GKE 예시)
apiVersion: container.googleapis.com/v1
kind: NodePool
metadata:
  name: gpu-pool
spec:
  machineType: n1-standard-8
  accelerators:
    - acceleratorCount: 1
      acceleratorType: nvidia-tesla-t4
  autoscaling:
    minNodeCount: 0
    maxNodeCount: 10
  nodeConfig:
    labels:
      accelerator: nvidia-t4
    taints:
      - key: nvidia.com/gpu
        value: present
        effect: NoSchedule

7.3 학습 Job

# k8s/training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: ml-training-job
  namespace: ml-platform
spec:
  backoffLimit: 3
  template:
    spec:
      restartPolicy: OnFailure

      # GPU 노드에 스케줄링
      nodeSelector:
        accelerator: nvidia-t4

      tolerations:
        - key: nvidia.com/gpu
          operator: Exists
          effect: NoSchedule

      containers:
        - name: trainer
          image: my-registry/ml-trainer:v1.2.0

          resources:
            requests:
              memory: '8Gi'
              cpu: '4'
              nvidia.com/gpu: '1'
            limits:
              memory: '16Gi'
              cpu: '8'
              nvidia.com/gpu: '1'

          env:
            - name: MLFLOW_TRACKING_URI
              value: 'http://mlflow-service.ml-platform:5000'
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: aws-credentials
                  key: access_key_id

          volumeMounts:
            - name: training-data
              mountPath: /data
            - name: model-output
              mountPath: /models
            - name: dshm
              mountPath: /dev/shm

      volumes:
        - name: training-data
          persistentVolumeClaim:
            claimName: training-data-pvc
        - name: model-output
          persistentVolumeClaim:
            claimName: model-output-pvc
        - name: dshm # PyTorch DataLoader 공유 메모리
          emptyDir:
            medium: Memory
            sizeLimit: 8Gi

7.4 정기 학습 CronJob

# k8s/training-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: daily-retraining
  namespace: ml-platform
spec:
  schedule: '0 2 * * *' # 매일 오전 2시
  concurrencyPolicy: Forbid # 동시 실행 방지
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 3

  jobTemplate:
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
            - name: retrainer
              image: my-registry/ml-trainer:latest
              command: ['python', 'src/retrain.py']
              env:
                - name: TRAINING_DATE
                  value: '$(date +%Y-%m-%d)'
              resources:
                requests:
                  memory: '4Gi'
                  cpu: '2'
                limits:
                  memory: '8Gi'
                  cpu: '4'

7.5 모델 서빙 Deployment

# k8s/model-serving-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-server
  namespace: ml-platform
spec:
  replicas: 3
  selector:
    matchLabels:
      app: model-server

  template:
    metadata:
      labels:
        app: model-server
    spec:
      containers:
        - name: model-server
          image: my-registry/model-server:v1.0.0
          ports:
            - containerPort: 8000

          resources:
            requests:
              memory: '2Gi'
              cpu: '1'
            limits:
              memory: '4Gi'
              cpu: '2'

          # 헬스체크
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10

          readinessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 15
            periodSeconds: 5

          # 환경변수 (ConfigMap에서)
          envFrom:
            - configMapRef:
                name: model-server-config

---
apiVersion: v1
kind: Service
metadata:
  name: model-server-service
  namespace: ml-platform
spec:
  selector:
    app: model-server
  ports:
    - port: 80
      targetPort: 8000
  type: ClusterIP

---
# HPA (수평 자동 확장)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: model-server-hpa
  namespace: ml-platform
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: model-server
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

8. Kubeflow

8.1 Kubeflow 소개

Kubeflow는 Kubernetes 위에서 ML 워크플로를 관리하는 오픈소스 플랫폼입니다. 다음 구성요소로 이루어져 있습니다:

  • Pipelines: ML 워크플로 오케스트레이션
  • Notebooks: 관리형 Jupyter 노트북
  • Katib: 하이퍼파라미터 튜닝
  • KServe: 모델 서빙
  • Training Operator: 분산 학습

8.2 Kubeflow Pipelines

# kubeflow_pipeline.py
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
import kfp.components as comp

# 컴포넌트 정의
@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(data_path: str, output_path: kfp.dsl.OutputPath(str)):
    """데이터 준비 컴포넌트"""
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_parquet(data_path)
    train, test = train_test_split(df, test_size=0.2)

    train.to_parquet(f"{output_path}/train.parquet")
    test.to_parquet(f"{output_path}/test.parquet")

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "mlflow", "pandas"]
)
def train_model(
    data_path: str,
    n_estimators: int,
    max_depth: int,
    model_output: kfp.dsl.OutputPath(str),
    mlflow_uri: str
):
    """모델 학습 컴포넌트"""
    import mlflow
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier

    mlflow.set_tracking_uri(mlflow_uri)

    df = pd.read_parquet(f"{data_path}/train.parquet")
    X = df.drop('target', axis=1)
    y = df['target']

    with mlflow.start_run():
        model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth
        )
        model.fit(X, y)

        run_id = mlflow.active_run().info.run_id
        mlflow.sklearn.log_model(model, "model")

    with open(model_output, 'w') as f:
        f.write(run_id)

@dsl.component(packages_to_install=["mlflow", "scikit-learn", "pandas"])
def evaluate_model(
    data_path: str,
    run_id_path: str,
    mlflow_uri: str
) -> float:
    """모델 평가 컴포넌트"""
    import mlflow
    import pandas as pd

    mlflow.set_tracking_uri(mlflow_uri)

    with open(run_id_path, 'r') as f:
        run_id = f.read()

    model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")

    test_df = pd.read_parquet(f"{data_path}/test.parquet")
    X_test = test_df.drop('target', axis=1)
    y_test = test_df['target']

    accuracy = model.score(X_test, y_test)
    return accuracy

# 파이프라인 정의
@dsl.pipeline(
    name="ML Training Pipeline",
    description="End-to-end ML training pipeline"
)
def ml_pipeline(
    data_path: str = "gs://my-bucket/data/train.parquet",
    n_estimators: int = 100,
    max_depth: int = 5,
    mlflow_uri: str = "http://mlflow:5000"
):
    # 데이터 준비
    prepare_task = prepare_data(data_path=data_path)

    # 모델 학습
    train_task = train_model(
        data_path=prepare_task.output,
        n_estimators=n_estimators,
        max_depth=max_depth,
        mlflow_uri=mlflow_uri
    )

    # 모델 평가
    evaluate_task = evaluate_model(
        data_path=prepare_task.output,
        run_id_path=train_task.outputs['model_output'],
        mlflow_uri=mlflow_uri
    )

# 파이프라인 컴파일 및 실행
if __name__ == "__main__":
    # 파이프라인 컴파일
    kfp.compiler.Compiler().compile(
        pipeline_func=ml_pipeline,
        package_path="ml_pipeline.yaml"
    )

    # Kubeflow에 제출
    client = kfp.Client(host="http://kubeflow-host/pipeline")

    run = client.create_run_from_pipeline_package(
        pipeline_file="ml_pipeline.yaml",
        arguments={
            "n_estimators": 200,
            "max_depth": 7,
        },
        run_name="training-run-001"
    )

8.3 Katib 하이퍼파라미터 튜닝

# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: hp-tuning-experiment
  namespace: kubeflow
spec:
  objective:
    type: maximize
    goal: 0.95
    objectiveMetricName: accuracy
    additionalMetricNames:
      - f1_score

  algorithm:
    algorithmName: bayesianoptimization

  parallelTrialCount: 3
  maxTrialCount: 20
  maxFailedTrialCount: 5

  parameters:
    - name: learning_rate
      parameterType: double
      feasibleSpace:
        min: '0.0001'
        max: '0.01'
    - name: n_estimators
      parameterType: int
      feasibleSpace:
        min: '50'
        max: '500'
    - name: max_depth
      parameterType: int
      feasibleSpace:
        min: '3'
        max: '10'

  trialTemplate:
    primaryContainerName: training-container
    trialParameters:
      - name: learning_rate
        description: Learning rate
        reference: learning_rate
      - name: n_estimators
        description: Number of estimators
        reference: n_estimators
      - name: max_depth
        description: Max tree depth
        reference: max_depth

    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          spec:
            restartPolicy: Never
            containers:
              - name: training-container
                image: my-registry/trainer:latest
                command:
                  - 'python'
                  - 'train.py'
                  - '--lr=${trialParameters.learning_rate}'
                  - '--n-estimators=${trialParameters.n_estimators}'
                  - '--max-depth=${trialParameters.max_depth}'

9. CI/CD for ML

9.1 GitHub Actions for ML

# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'params.yaml'
      - 'requirements.txt'
  pull_request:
    branches: [main]

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:
  test:
    name: Unit Tests
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov

      - name: Run unit tests
        run: |
          pytest tests/unit/ -v --cov=src --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v4
        with:
          file: coverage.xml

  data-validation:
    name: Data Validation
    runs-on: ubuntu-latest
    needs: test

    steps:
      - uses: actions/checkout@v4

      - name: Setup DVC
        run: pip install "dvc[s3]"

      - name: Pull data
        run: dvc pull data/

      - name: Validate data
        run: python src/validate_data.py

  train-and-evaluate:
    name: Train and Evaluate Model
    runs-on: ubuntu-latest
    needs: data-validation
    if: github.ref == 'refs/heads/main'

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Run DVC pipeline
        run: dvc repro

      - name: Check model performance
        run: |
          python scripts/check_metrics.py \
            --min-accuracy 0.90 \
            --metrics-file metrics/scores.json

      - name: Register model if metrics pass
        run: python scripts/register_model.py

      - name: Report metrics
        uses: iterative/cml@v2
        with:
          report-type: md
          metrics: metrics/scores.json

  build-and-push:
    name: Build Docker Image
    runs-on: ubuntu-latest
    needs: train-and-evaluate

    steps:
      - uses: actions/checkout@v4

      - name: Login to Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Build and push
        uses: docker/build-push-action@v5
        with:
          context: .
          file: Dockerfile.serve
          push: true
          tags: |
            ghcr.io/${{ github.repository }}/model-server:latest
            ghcr.io/${{ github.repository }}/model-server:${{ github.sha }}

  deploy:
    name: Deploy to Kubernetes
    runs-on: ubuntu-latest
    needs: build-and-push
    environment: production

    steps:
      - uses: actions/checkout@v4

      - name: Configure kubectl
        uses: azure/k8s-set-context@v3
        with:
          kubeconfig: ${{ secrets.KUBECONFIG }}

      - name: Deploy
        run: |
          kubectl set image deployment/model-server \
            model-server=ghcr.io/${{ github.repository }}/model-server:${{ github.sha }} \
            -n ml-platform

          kubectl rollout status deployment/model-server -n ml-platform

10. 모델 모니터링

10.1 Evidently AI로 데이터 드리프트 감지

pip install evidently
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import *

# 참조 데이터 (학습 시 데이터)
reference_data = pd.read_parquet("data/reference.parquet")

# 현재 데이터 (프로덕션 예측 데이터)
current_data = pd.read_parquet("data/current.parquet")

# 데이터 드리프트 리포트
drift_report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
    ColumnDriftMetric(column_name="age"),
    ColumnDriftMetric(column_name="income"),
])

drift_report.run(
    reference_data=reference_data,
    current_data=current_data
)

# HTML 리포트 저장
drift_report.save_html("drift_report.html")

# JSON으로 결과 추출
result = drift_report.as_dict()
drift_detected = result['metrics'][0]['result']['dataset_drift']

if drift_detected:
    print("Data drift detected! Consider retraining.")
    # 알림 전송 (Slack, Email 등)
    send_alert("Data drift detected in production model")

10.2 모델 성능 모니터링

from evidently.report import Report
from evidently.metric_preset import ClassificationPreset
from evidently.metrics import ClassificationQualityMetric

# 예측 데이터 준비 (실제 레이블이 있는 경우)
prediction_data = pd.DataFrame({
    'target': y_true,
    'prediction': y_pred,
    'prediction_proba': y_proba,
    'feature1': X_test['feature1'],
    'feature2': X_test['feature2'],
})

# 분류 성능 리포트
performance_report = Report(metrics=[
    ClassificationPreset(),
    ClassificationQualityMetric(),
])

performance_report.run(
    reference_data=reference_predictions,
    current_data=prediction_data
)

# 결과 추출
metrics = performance_report.as_dict()
current_accuracy = metrics['metrics'][0]['result']['current']['accuracy']
reference_accuracy = metrics['metrics'][0]['result']['reference']['accuracy']

degradation = reference_accuracy - current_accuracy
if degradation > 0.05:  # 5% 이상 성능 저하
    trigger_retraining(f"Model degraded by {degradation:.2%}")

10.3 실시간 예측 모니터링 파이프라인

import logging
from datetime import datetime
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Prometheus 메트릭 정의
prediction_counter = Counter('model_predictions_total', 'Total predictions')
prediction_latency = Histogram('model_prediction_duration_seconds', 'Prediction latency')
model_accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')
drift_score_gauge = Gauge('data_drift_score', 'Data drift score')

class MonitoredModelServer:
    def __init__(self, model, reference_data):
        self.model = model
        self.reference_data = reference_data
        self.predictions_buffer = []
        self.labels_buffer = []

        # Prometheus 메트릭 서버 시작
        start_http_server(8001)

    def predict(self, features):
        with prediction_latency.time():
            prediction = self.model.predict(features)

        prediction_counter.inc()
        self.predictions_buffer.append(prediction)

        # 버퍼가 1000개 차면 모니터링 실행
        if len(self.predictions_buffer) >= 1000:
            self._run_monitoring()

        return prediction

    def _run_monitoring(self):
        """주기적 모니터링 실행"""
        current_df = pd.DataFrame(self.predictions_buffer)

        # 드리프트 감지
        drift_report = Report(metrics=[DataDriftPreset()])
        drift_report.run(
            reference_data=self.reference_data,
            current_data=current_df
        )

        result = drift_report.as_dict()
        drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
        drift_score_gauge.set(drift_share)

        if drift_share > 0.3:
            logging.warning(f"High drift detected: {drift_share:.2%} of columns drifted")

        # 버퍼 초기화
        self.predictions_buffer = []

11. Feature Store

11.1 Feature Store 개념

Feature Store는 ML 피처(특성)를 중앙에서 관리하고 재사용할 수 있게 해주는 인프라입니다.

핵심 개념:

  • 온라인 스토어: 실시간 예측을 위한 저지연 피처 조회 (Redis, DynamoDB)
  • 오프라인 스토어: 배치 학습을 위한 히스토리컬 피처 (S3, BigQuery)
  • 피처 뷰: 피처 정의와 데이터 소스 매핑
  • 엔티티: 피처의 주체 (사용자 ID, 상품 ID 등)

11.2 Feast 설치 및 설정

# Feast 설치
pip install feast

# Feast 프로젝트 초기화
feast init my-feature-store
cd my-feature-store
# feature_store.yaml
project: my_feature_store
registry: data/registry.db
provider: local
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: file
entity_key_serialization_version: 2
# features.py - 피처 정의
from feast import (
    Entity, Feature, FeatureView,
    FileSource, ValueType, Field
)
from feast.types import Float64, Int64, String
from datetime import timedelta

# 엔티티 정의
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User ID"
)

# 데이터 소스
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
)

# 피처 뷰 정의
user_stats_fv = FeatureView(
    name="user_statistics",
    entities=[user],
    ttl=timedelta(days=30),
    schema=[
        Field(name="purchase_count_7d", dtype=Int64),
        Field(name="purchase_amount_7d", dtype=Float64),
        Field(name="avg_session_duration", dtype=Float64),
        Field(name="last_purchase_days_ago", dtype=Int64),
    ],
    online=True,
    source=user_stats_source,
)
# feast_usage.py - Feast 활용
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# 오프라인 피처 조회 (학습용)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"])
})

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_statistics:purchase_count_7d",
        "user_statistics:purchase_amount_7d",
        "user_statistics:avg_session_duration",
    ]
).to_df()

print(training_df.head())

# 온라인 피처 조회 (실시간 예측용)
# 먼저 온라인 스토어에 데이터 로드
store.materialize_incremental(end_date=datetime.now())

# 실시간 조회
online_features = store.get_online_features(
    features=[
        "user_statistics:purchase_count_7d",
        "user_statistics:purchase_amount_7d",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ]
).to_dict()

print(online_features)

12. 실전 MLOps 프로젝트

12.1 전체 파이프라인 아키텍처

데이터 소스 (DB, S3, API)
데이터 수집 (Airflow DAG)
데이터 검증 (Great Expectations)
피처 엔지니어링 (Feast)
모델 학습 (MLflow 추적)
모델 평가 (자동화된 검증)
모델 레지스트리 (MLflow Registry)
CI/CD (GitHub Actions)
컨테이너 빌드 (Docker)
K8s 배포 (Kubernetes)
서빙 (FastAPI + Triton)
모니터링 (Evidently + Prometheus + Grafana)
알림 (드리프트 감지 시 재학습 트리거)

12.2 실전 프로젝트: 고객 이탈 예측 시스템

# scripts/full_pipeline.py
"""
전체 MLOps 파이프라인: 고객 이탈 예측
"""
import mlflow
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
    accuracy_score, roc_auc_score,
    classification_report, confusion_matrix
)
from sklearn.preprocessing import StandardScaler
import joblib
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ChurnPredictionPipeline:
    """고객 이탈 예측 파이프라인"""

    def __init__(self, mlflow_uri: str = "http://localhost:5000"):
        mlflow.set_tracking_uri(mlflow_uri)
        mlflow.set_experiment("churn-prediction")
        self.scaler = StandardScaler()

    def load_data(self, data_path: str) -> pd.DataFrame:
        """데이터 로드 및 기본 검증"""
        logger.info(f"Loading data from {data_path}")
        df = pd.read_parquet(data_path)

        # 기본 데이터 품질 검사
        assert df.shape[0] > 1000, "Too few samples"
        assert 'churn' in df.columns, "Target column 'churn' missing"
        assert df['churn'].nunique() == 2, "Binary classification expected"

        missing_rate = df.isnull().mean()
        high_missing = missing_rate[missing_rate > 0.5].index.tolist()
        if high_missing:
            logger.warning(f"Columns with >50% missing: {high_missing}")
            df = df.drop(columns=high_missing)

        logger.info(f"Data loaded: {df.shape[0]} rows, {df.shape[1]} columns")
        return df

    def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """피처 엔지니어링"""
        logger.info("Engineering features...")

        # 결측값 처리
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())

        # 파생 피처 생성
        if 'tenure_months' in df.columns and 'monthly_charges' in df.columns:
            df['total_value'] = df['tenure_months'] * df['monthly_charges']

        if 'num_support_tickets' in df.columns and 'tenure_months' in df.columns:
            df['tickets_per_month'] = (
                df['num_support_tickets'] / (df['tenure_months'] + 1)
            )

        return df

    def train(
        self,
        df: pd.DataFrame,
        params: dict = None
    ) -> str:
        """모델 학습 및 MLflow 추적"""
        if params is None:
            params = {
                "n_estimators": 200,
                "max_depth": 5,
                "learning_rate": 0.05,
                "subsample": 0.8,
                "random_state": 42,
            }

        feature_cols = [c for c in df.columns if c != 'churn']
        X = df[feature_cols]
        y = df['churn']

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # 스케일링
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        with mlflow.start_run(run_name="churn-gbt"):
            # 파라미터 로깅
            mlflow.log_params(params)
            mlflow.log_param("train_size", len(X_train))
            mlflow.log_param("test_size", len(X_test))
            mlflow.log_param("n_features", len(feature_cols))

            # 모델 학습
            model = GradientBoostingClassifier(**params)
            model.fit(X_train_scaled, y_train)

            # 평가
            y_pred = model.predict(X_test_scaled)
            y_proba = model.predict_proba(X_test_scaled)[:, 1]

            accuracy = accuracy_score(y_test, y_pred)
            auc_roc = roc_auc_score(y_test, y_proba)

            mlflow.log_metrics({
                "accuracy": accuracy,
                "auc_roc": auc_roc,
            })

            # 교차 검증
            cv_scores = cross_val_score(
                model, X_train_scaled, y_train, cv=5, scoring='roc_auc'
            )
            mlflow.log_metric("cv_auc_mean", cv_scores.mean())
            mlflow.log_metric("cv_auc_std", cv_scores.std())

            # 피처 중요도
            feature_importance = pd.DataFrame({
                'feature': feature_cols,
                'importance': model.feature_importances_
            }).sort_values('importance', ascending=False)

            feature_importance.to_csv('/tmp/feature_importance.csv', index=False)
            mlflow.log_artifact('/tmp/feature_importance.csv')

            # 모델 저장
            mlflow.sklearn.log_model(
                model,
                "model",
                registered_model_name="ChurnPredictor"
            )

            run_id = mlflow.active_run().info.run_id

            logger.info(f"Training complete. Accuracy: {accuracy:.4f}, AUC-ROC: {auc_roc:.4f}")
            logger.info(f"MLflow Run ID: {run_id}")

            return run_id

    def promote_best_model(self, min_auc: float = 0.85):
        """최고 성능 모델을 Production으로 승격"""
        client = mlflow.tracking.MlflowClient()

        # 최신 실험 결과 조회
        experiment = client.get_experiment_by_name("churn-prediction")
        runs = client.search_runs(
            experiment_ids=[experiment.experiment_id],
            filter_string="metrics.auc_roc > 0.80",
            order_by=["metrics.auc_roc DESC"],
            max_results=1
        )

        if not runs:
            raise ValueError("No runs found meeting criteria")

        best_run = runs[0]
        auc = best_run.data.metrics['auc_roc']

        if auc < min_auc:
            raise ValueError(f"Best AUC {auc:.4f} below minimum {min_auc}")

        # 모델 등록 및 Production 승격
        model_uri = f"runs:/{best_run.info.run_id}/model"

        mv = mlflow.register_model(model_uri, "ChurnPredictor")

        client.transition_model_version_stage(
            name="ChurnPredictor",
            version=mv.version,
            stage="Production",
            archive_existing_versions=True
        )

        logger.info(f"Model v{mv.version} promoted to Production! AUC: {auc:.4f}")
        return mv.version

# 실행
if __name__ == "__main__":
    pipeline = ChurnPredictionPipeline()

    df = pipeline.load_data("data/customers.parquet")
    df = pipeline.engineer_features(df)

    run_id = pipeline.train(df)
    pipeline.promote_best_model(min_auc=0.85)

마무리

MLOps는 단순히 도구를 배우는 것이 아니라, ML 시스템을 안정적이고 재현 가능하며 확장 가능하게 운영하는 문화와 프로세스입니다.

핵심 원칙을 기억하세요:

  1. 모든 것을 버전 관리하라: 코드, 데이터, 모델, 환경 모두
  2. 자동화를 최우선으로: 수동 과정은 오류의 원인
  3. 측정하고 모니터링하라: 측정하지 않으면 개선할 수 없다
  4. 실패를 빠르게 감지하라: 드리프트, 성능 저하를 즉시 감지
  5. 재현 가능성을 보장하라: 누구든, 언제든 동일한 결과를 재현할 수 있어야 함

MLOps 여정은 점진적입니다. 레벨 0에서 시작해 조직의 필요에 맞게 성숙도를 높여가세요.


참고 자료