Skip to content
Published on

MLOps & 모델 라이프사이클 완전 정복: MLflow, DVC, LLMOps까지

Authors

목차

  1. MLOps 개요 및 성숙도 모델
  2. 실험 추적: MLflow & Weights and Biases
  3. 데이터 버전 관리: DVC
  4. 피처 스토어
  5. 모델 레지스트리
  6. ML을 위한 CI/CD
  7. 모델 모니터링 & 드리프트 감지
  8. LLMOps
  9. 퀴즈

MLOps 개요 및 성숙도 모델

MLOps(Machine Learning Operations)는 ML 시스템을 프로덕션에서 안정적으로 운영하기 위한 방법론, 도구, 문화의 집합입니다. DevOps 원칙을 ML 워크플로우에 적용하여 모델 개발부터 배포, 모니터링, 재학습까지의 전체 라이프사이클을 자동화합니다.

MLOps가 필요한 이유

ML 프로젝트의 95% 이상이 프로덕션 배포에 실패한다는 통계가 있습니다. 그 원인은 다음과 같습니다:

  • 재현 불가능한 실험: 코드, 데이터, 환경이 버전 관리되지 않음
  • 수동 배포 프로세스: 느리고 오류 발생이 잦음
  • 모니터링 부재: 모델 성능 저하를 늦게 발견
  • 팀 사일로: 데이터 과학팀과 엔지니어링팀 간 단절

MLOps 성숙도 레벨

Google의 MLOps 성숙도 모델은 세 단계로 구분됩니다.

Level 0: 수동 프로세스

모든 과정이 수동입니다. 데이터 과학자가 Jupyter Notebook에서 실험하고 결과를 수동으로 배포합니다.

특징설명
배포 주기수개월에 한 번
자동화 수준없음
재현성낮음
모니터링없거나 수동

한계점: 실험 추적 부재, 코드와 데이터 버전 불일치, 배포 오류, 모델 성능 저하 감지 불가

Level 1: ML 파이프라인 자동화

CT(Continuous Training)가 도입됩니다. 데이터 파이프라인과 모델 학습이 자동화되지만 CI/CD는 아직 수동입니다.

핵심 구성 요소:

  • 자동화된 데이터 검증 파이프라인
  • 피처 엔지니어링 파이프라인
  • 모델 학습 파이프라인 (Kubeflow Pipelines, Apache Airflow 등)
  • 모델 성능 평가 자동화
  • 피처 스토어 도입
# Kubeflow Pipeline 예시 - Level 1 CT 파이프라인
import kfp
from kfp import dsl

@dsl.component
def data_validation_op(data_path: str) -> bool:
    import great_expectations as ge
    ds = ge.read_csv(data_path)
    results = ds.expect_column_values_to_not_be_null("target")
    return results["success"]

@dsl.component
def train_model_op(data_path: str, model_output: str):
    import mlflow
    import sklearn
    # 모델 학습 로직
    pass

@dsl.pipeline(name="CT Pipeline")
def ct_pipeline(data_path: str):
    validation = data_validation_op(data_path=data_path)
    with dsl.Condition(validation.output == True):
        train_model_op(data_path=data_path, model_output="/models/")

Level 2: CI/CD 파이프라인 자동화

완전한 MLOps 자동화 단계입니다. 코드, 데이터, 모델이 모두 버전 관리되며 CI/CD/CT가 완전히 자동화됩니다.

자동화 트리거 조건:

  • 새로운 학습 데이터 도달 (스케줄 또는 데이터 임계값)
  • 모델 성능 지표 저하 감지
  • 데이터 드리프트 감지
  • 코드 변경 (새 피처, 알고리즘 개선)

Level 2 아키텍처:

소스 코드 변경 또는 데이터 트리거
CI 파이프라인 (테스트, 빌드)
CD 파이프라인 (파이프라인 배포)
CT 파이프라인 (자동 재학습)
모델 평가 → 통과/실패
모델 레지스트리 등록
StagingProduction 프로모션
모니터링 & 알림

실험 추적: MLflow & Weights and Biases

MLflow 완전 가이드

MLflow는 ML 라이프사이클 관리를 위한 오픈소스 플랫폼입니다. 4가지 핵심 컴포넌트로 구성됩니다.

MLflow Tracking

실험 파라미터, 메트릭, 아티팩트를 추적합니다.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

# MLflow Tracking 서버 설정
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="rf-baseline") as run:
    # 하이퍼파라미터 로깅
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5,
        "random_state": 42
    }
    mlflow.log_params(params)

    # 모델 학습
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # 메트릭 로깅
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred, average="weighted"),
    }
    mlflow.log_metrics(metrics)

    # 모델 자동 로깅 (sklearn autolog 사용 시)
    # mlflow.sklearn.autolog()

    # 모델 저장
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        registered_model_name="fraud-detection",
        input_example=X_test[:5],
        signature=mlflow.models.infer_signature(X_train, y_pred)
    )

    # 커스텀 아티팩트
    import matplotlib.pyplot as plt
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Accuracy: {metrics['accuracy']:.4f}")

MLflow Autolog

프레임워크별 자동 로깅으로 코드 최소화:

import mlflow

# 프레임워크 자동 감지 및 로깅
mlflow.autolog()

# PyTorch 전용 autolog
mlflow.pytorch.autolog(
    log_every_n_epoch=1,
    log_models=True,
    disable=False,
    exclusive=False,
    log_datasets=True
)

# XGBoost 전용 autolog
mlflow.xgboost.autolog(
    log_input_examples=True,
    log_model_signatures=True,
    log_models=True,
    log_datasets=True
)

MLflow Projects

재현 가능한 ML 프로젝트 패키징:

# MLproject 파일
name: fraud-detection

conda_env: conda.yaml

entry_points:
  main:
    parameters:
      n_estimators: { type: int, default: 100 }
      max_depth: { type: int, default: 10 }
      data_path: { type: str, default: 'data/train.csv' }
    command: 'python train.py --n_estimators {n_estimators} --max_depth {max_depth} --data_path {data_path}'
  evaluate:
    parameters:
      model_uri: { type: str }
      test_data: { type: str }
    command: 'python evaluate.py --model_uri {model_uri} --test_data {test_data}'

Weights & Biases (W&B)

W&B는 실험 추적, 시각화, 하이퍼파라미터 최적화를 제공하는 MLOps 플랫폼입니다.

import wandb
from wandb.integration.keras import WandbCallback

# W&B 초기화
run = wandb.init(
    project="image-classification",
    config={
        "learning_rate": 0.001,
        "epochs": 50,
        "batch_size": 32,
        "architecture": "ResNet50"
    }
)

# W&B Sweep으로 하이퍼파라미터 최적화
sweep_config = {
    "method": "bayes",
    "metric": {"name": "val_accuracy", "goal": "maximize"},
    "parameters": {
        "learning_rate": {"min": 1e-5, "max": 1e-2},
        "batch_size": {"values": [16, 32, 64]},
        "dropout": {"min": 0.1, "max": 0.5}
    }
}
sweep_id = wandb.sweep(sweep_config, project="image-classification")
wandb.agent(sweep_id, function=train_fn, count=50)

데이터 버전 관리: DVC

DVC(Data Version Control)는 Git과 연동하여 대용량 데이터셋과 ML 파이프라인을 버전 관리하는 도구입니다.

DVC 작동 방식

DVC는 대용량 파일을 직접 Git에 저장하는 대신 .dvc 메타데이터 파일(포인터)만 Git에 커밋합니다. 실제 데이터는 S3, GCS, Azure Blob, SSH 등 원격 스토리지에 저장됩니다.

# DVC 초기화
git init
dvc init

# 원격 스토리지 설정 (S3)
dvc remote add -d myremote s3://my-bucket/dvc-store
dvc remote modify myremote region us-east-1

# 데이터 추가
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Add training data v1"
dvc push

# 다른 환경에서 데이터 가져오기
git pull
dvc pull

DVC Pipeline (dvc.yaml)

재현 가능한 ML 파이프라인을 선언적으로 정의합니다:

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py --input data/raw.csv --output data/processed/
    deps:
      - src/prepare.py
      - data/raw.csv
    outs:
      - data/processed/train.csv
      - data/processed/test.csv
    params:
      - prepare:
          - split_ratio
          - random_seed

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/processed/train.csv
    outs:
      - data/features/train_features.pkl
    params:
      - featurize:
          - max_features
          - ngrams

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/train_features.pkl
    outs:
      - models/model.pkl
    metrics:
      - reports/metrics.json:
          cache: false
    params:
      - train:
          - n_estimators
          - max_depth
          - random_seed

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/processed/test.csv
    metrics:
      - reports/eval_metrics.json:
          cache: false
    plots:
      - reports/plots/confusion_matrix.csv:
          cache: false
# params.yaml
prepare:
  split_ratio: 0.8
  random_seed: 42
featurize:
  max_features: 1000
  ngrams: 2
train:
  n_estimators: 100
  max_depth: 10
  random_seed: 42

DVC 실험 관리

# 파이프라인 실행
dvc repro

# 실험 브랜치 생성
dvc exp run --set-param train.n_estimators=200 --name exp-200-trees

# 실험 비교
dvc exp show

# 실험 결과 표 출력
dvc metrics show
dvc metrics diff

피처 스토어

피처 스토어(Feature Store)는 ML 피처를 중앙 집중식으로 저장, 공유, 서빙하는 데이터 레이어입니다.

피처 스토어가 필요한 이유

  • 학습/서빙 스큐 제거: 학습과 추론에서 동일한 피처 변환 보장
  • 피처 재사용: 팀 간 피처 공유로 중복 작업 제거
  • 저지연 서빙: 온라인 예측을 위한 실시간 피처 조회
  • 피처 일관성: 배치와 실시간 파이프라인 일관성 유지

온라인 vs 오프라인 저장소

구분온라인 저장소오프라인 저장소
목적실시간 추론 서빙모델 학습
지연시간수 밀리초수 초 ~ 분
저장소Redis, DynamoDB, CassandraS3, BigQuery, Hive
데이터 양최신 상태 (현재 값)전체 이력
쿼리 패턴단건 조회 (키 기반)배치 스캔

Feast 피처 스토어

# feature_repo/feature_store.yaml
project: fraud_detection
registry: data/registry.db
provider: local
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: bigquery
  dataset: feast_dev
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.types import Float32, Int64, String

# 엔티티 정의
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="사용자 ID"
)

# 데이터 소스 정의
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created"
)

# 피처 뷰 정의
user_stats_fv = FeatureView(
    name="user_stats",
    entities=["user_id"],
    ttl=timedelta(days=7),
    features=[
        Feature(name="transaction_count_7d", dtype=Float32),
        Feature(name="avg_transaction_amount", dtype=Float32),
        Feature(name="days_since_last_login", dtype=Int64),
        Feature(name="account_age_days", dtype=Int64),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "fraud", "version": "v2"},
)
# 피처 스토어 사용 - 학습
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# 학습 데이터 조회 (오프라인)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": pd.to_datetime(["2026-03-01", "2026-03-01", "2026-03-01"])
})

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
        "user_stats:days_since_last_login",
    ]
).to_df()

# 온라인 서빙 - 실시간 피처 조회
feature_vector = store.get_online_features(
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
    ],
    entity_rows=[{"user_id": 1001}]
).to_dict()

피처 드리프트 감지

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# 피처 드리프트 리포트 생성
report = Report(metrics=[DataDriftPreset()])
report.run(
    reference_data=reference_features,
    current_data=current_features,
    column_mapping=ColumnMapping(target="label")
)
report.save_html("feature_drift_report.html")

# 드리프트 결과 확인
results = report.as_dict()
drifted_features = [
    col for col, info in results["metrics"][0]["result"]["drift_by_columns"].items()
    if info["drift_detected"]
]
print(f"드리프트 감지된 피처: {drifted_features}")

모델 레지스트리

MLflow Model Registry

MLflow Model Registry는 모델 버전 관리, 스테이지 전환, 협업을 위한 중앙 저장소입니다.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# 새 모델 등록
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(
    model_uri=model_uri,
    name="fraud-detection"
)

# 모델 설명 추가
client.update_registered_model(
    name="fraud-detection",
    description="결제 사기 탐지 모델 - RandomForest 기반"
)

client.update_model_version(
    name="fraud-detection",
    version=model_version.version,
    description=f"Accuracy: 0.956, F1: 0.943 on test set"
)

# Staging으로 전환
client.transition_model_version_stage(
    name="fraud-detection",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False
)

# Staging 모델 로드 및 검증
staging_model = mlflow.pyfunc.load_model(
    model_uri="models:/fraud-detection/Staging"
)
staging_preds = staging_model.predict(X_val)
staging_accuracy = accuracy_score(y_val, staging_preds)

# 검증 통과 시 Production 승격
if staging_accuracy > 0.95:
    client.transition_model_version_stage(
        name="fraud-detection",
        version=model_version.version,
        stage="Production",
        archive_existing_versions=True  # 이전 Production 버전 아카이브
    )
    print(f"모델 v{model_version.version} Production 승격 완료")

Hugging Face Hub 모델 레지스트리

from huggingface_hub import HfApi, Repository
from transformers import AutoModelForSequenceClassification, AutoTokenizer

api = HfApi()

# 모델 업로드
api.upload_folder(
    folder_path="./fine-tuned-model",
    repo_id="myorg/sentiment-classifier-v2",
    repo_type="model",
)

# 모델 카드 업데이트
api.upload_file(
    path_or_fileobj="README.md",
    path_in_repo="README.md",
    repo_id="myorg/sentiment-classifier-v2",
)

# 특정 버전 태깅
api.create_tag(
    repo_id="myorg/sentiment-classifier-v2",
    tag="v2.1.0",
    tag_message="Improved accuracy on edge cases"
)

ML을 위한 CI/CD

GitHub Actions ML 파이프라인

# .github/workflows/ml-cicd.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'params.yaml'
      - 'dvc.yaml'
  schedule:
    - cron: '0 2 * * 1' # 매주 월요일 오전 2시 자동 재학습

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run unit tests
        run: pytest tests/ -v --cov=src
      - name: Data validation
        run: python src/validate_data.py

  train-and-evaluate:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Configure DVC remote
        run: |
          dvc remote modify myremote access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
          dvc remote modify myremote secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      - name: Pull data
        run: dvc pull
      - name: Run DVC pipeline
        run: dvc repro
      - name: Log metrics to MLflow
        run: python src/log_results.py
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
      - name: Check model performance gate
        run: |
          python src/check_performance_gate.py \
            --min-accuracy 0.95 \
            --min-f1 0.93
      - name: Push results
        run: |
          dvc push
          git add reports/metrics.json dvc.lock
          git commit -m "chore: update metrics [skip ci]"
          git push

  deploy-staging:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Promote model to Staging
        run: python src/promote_model.py --stage Staging
      - name: Run integration tests
        run: pytest tests/integration/ -v
      - name: Deploy to staging endpoint
        run: kubectl apply -f k8s/staging/

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Promote model to Production
        run: python src/promote_model.py --stage Production
      - name: Blue/Green deployment
        run: ./scripts/blue_green_deploy.sh
      - name: Smoke tests
        run: pytest tests/smoke/ -v

자동 재학습 트리거

# src/check_retrain_trigger.py
import mlflow
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

def should_retrain(
    current_data,
    reference_data,
    performance_threshold=0.92,
    drift_threshold=0.3
) -> tuple[bool, str]:
    """재학습 필요 여부 판단"""

    # 1. 성능 기반 트리거
    current_metrics = get_current_metrics()
    if current_metrics["accuracy"] < performance_threshold:
        return True, f"성능 저하: accuracy={current_metrics['accuracy']:.3f}"

    # 2. 데이터 드리프트 트리거
    report = Report(metrics=[DataDriftPreset()])
    report.run(reference_data=reference_data, current_data=current_data)
    results = report.as_dict()
    drift_share = results["metrics"][0]["result"]["share_of_drifted_columns"]

    if drift_share > drift_threshold:
        return True, f"데이터 드리프트: {drift_share:.1%} 피처 드리프트 감지"

    return False, "재학습 불필요"

def get_current_metrics():
    client = mlflow.tracking.MlflowClient()
    prod_model = client.get_latest_versions("fraud-detection", stages=["Production"])[0]
    run = client.get_run(prod_model.run_id)
    return {
        "accuracy": float(run.data.metrics.get("accuracy", 0)),
        "f1_score": float(run.data.metrics.get("f1_score", 0))
    }

모델 모니터링 & 드리프트 감지

데이터 드리프트 vs 컨셉 드리프트

데이터 드리프트(Data Drift): 입력 피처의 분포가 변화합니다. P(X)가 변하지만 P(Y|X)는 유지됩니다. 예: 사용자 연령 분포 변화, 거래 금액 분포 변화.

컨셉 드리프트(Concept Drift): 입력과 출력 간 관계가 변화합니다. P(Y|X)가 변합니다. 예: 사기 패턴 변화, 사용자 선호도 변화.

Evidently 드리프트 모니터링

import pandas as pd
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently import ColumnMapping
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
    TargetDriftPreset,
    ClassificationPreset
)
from evidently.tests import (
    TestNumberOfDriftedColumns,
    TestShareOfDriftedColumns,
    TestColumnDrift
)

# 컬럼 매핑 설정
column_mapping = ColumnMapping(
    target="fraud_label",
    prediction="fraud_score",
    numerical_features=["amount", "transaction_count_7d", "avg_amount"],
    categorical_features=["merchant_category", "payment_method"]
)

# 종합 드리프트 리포트
report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
    TargetDriftPreset(),
    ClassificationPreset()
])
report.run(
    reference_data=reference_df,
    current_data=production_df,
    column_mapping=column_mapping
)
report.save_html("monitoring/report.html")

# 알림 테스트 스위트
test_suite = TestSuite(tests=[
    TestNumberOfDriftedColumns(lt=3),
    TestShareOfDriftedColumns(lt=0.3),
    TestColumnDrift(column_name="amount"),
    TestColumnDrift(column_name="transaction_count_7d"),
])
test_suite.run(
    reference_data=reference_df,
    current_data=production_df
)

# 테스트 실패 시 알림
results = test_suite.as_dict()
failed_tests = [t for t in results["tests"] if t["status"] == "FAIL"]
if failed_tests:
    send_alert(f"모니터링 경보: {len(failed_tests)}개 테스트 실패")

Prometheus + Grafana 메트릭 모니터링

# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# 메트릭 정의
prediction_counter = Counter(
    "model_predictions_total",
    "총 예측 수",
    ["model_version", "result"]
)
prediction_latency = Histogram(
    "model_prediction_latency_seconds",
    "예측 지연 시간",
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)
model_accuracy = Gauge(
    "model_accuracy_current",
    "현재 모델 정확도"
)
drift_score = Gauge(
    "feature_drift_score",
    "피처 드리프트 점수",
    ["feature_name"]
)

# 서빙 코드에서 메트릭 업데이트
def predict_with_monitoring(features, model_version="v2.1"):
    start_time = time.time()
    prediction = model.predict(features)
    latency = time.time() - start_time

    prediction_latency.observe(latency)
    prediction_counter.labels(
        model_version=model_version,
        result="fraud" if prediction[0] == 1 else "normal"
    ).inc()

    return prediction

LLMOps

LLMOps는 대규모 언어 모델의 개발, 배포, 운영을 위한 MLOps 확장입니다.

LLM 파이프라인 특수 과제

  • 비결정적 출력: 동일 입력에 다른 출력 → 평가 복잡
  • 프롬프트 민감성: 작은 변경이 큰 성능 차이 유발
  • 고비용 Fine-tuning: 대용량 GPU 자원 필요
  • 환각(Hallucination): 사실과 다른 정보 생성
  • 맥락 길이 관리: 긴 컨텍스트 처리

LangSmith를 이용한 LLM 추적

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langsmith import Client
import os

# LangSmith 설정
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-chatbot"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

# LangChain 체인 정의 (자동으로 LangSmith에 트레이스 기록)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
prompt = ChatPromptTemplate.from_template(
    "당신은 친절한 고객 서비스 에이전트입니다.\n\n질문: {question}\n\n답변:"
)
chain = prompt | llm

# 실행 - 자동 트레이싱
response = chain.invoke({"question": "환불 정책이 어떻게 되나요?"})

# LangSmith 클라이언트로 평가
langsmith_client = Client()

# 데이터셋 생성
dataset = langsmith_client.create_dataset(
    dataset_name="customer-service-eval",
    description="고객 서비스 챗봇 평가 데이터셋"
)

# 평가 예제 추가
langsmith_client.create_examples(
    inputs=[{"question": "환불 정책을 알려주세요"}],
    outputs=[{"answer": "구매 후 30일 이내 환불 가능합니다."}],
    dataset_id=dataset.id
)

# 자동 평가 실행
from langsmith.evaluation import evaluate, LangChainStringEvaluator

evaluators = [
    LangChainStringEvaluator("cot_qa"),
    LangChainStringEvaluator("labeled_criteria", config={
        "criteria": "correctness"
    })
]
results = evaluate(
    lambda x: chain.invoke(x),
    data=dataset.name,
    evaluators=evaluators,
    experiment_prefix="gpt4o-baseline"
)

프롬프트 버전 관리

# prompt_registry.py
import mlflow
from dataclasses import dataclass
from typing import Optional

@dataclass
class PromptVersion:
    template: str
    version: str
    description: str
    metrics: Optional[dict] = None

class PromptRegistry:
    def __init__(self, mlflow_uri: str):
        mlflow.set_tracking_uri(mlflow_uri)
        self.experiment_name = "prompt-versions"
        mlflow.set_experiment(self.experiment_name)

    def register_prompt(self, prompt: PromptVersion) -> str:
        with mlflow.start_run(run_name=f"prompt-{prompt.version}") as run:
            mlflow.log_param("version", prompt.version)
            mlflow.log_param("description", prompt.description)
            mlflow.log_text(prompt.template, "prompt_template.txt")
            if prompt.metrics:
                mlflow.log_metrics(prompt.metrics)
            return run.info.run_id

    def get_prompt(self, version: str) -> str:
        client = mlflow.tracking.MlflowClient()
        runs = client.search_runs(
            experiment_ids=[mlflow.get_experiment_by_name(self.experiment_name).experiment_id],
            filter_string=f"params.version = '{version}'"
        )
        if not runs:
            raise ValueError(f"프롬프트 버전 {version}을 찾을 수 없습니다")
        artifact_uri = runs[0].info.artifact_uri
        return mlflow.artifacts.load_text(f"{artifact_uri}/prompt_template.txt")

# 사용 예시
registry = PromptRegistry("http://mlflow-server:5000")
registry.register_prompt(PromptVersion(
    template="당신은 {role}입니다. {context}\n\n질문: {question}\n답변:",
    version="v1.2.0",
    description="컨텍스트 포함 프롬프트 개선",
    metrics={"accuracy": 0.87, "hallucination_rate": 0.03}
))

LLM Fine-tuning 파이프라인

# fine_tuning_pipeline.py
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer
)
from peft import LoraConfig, get_peft_model, TaskType
import mlflow

def fine_tune_with_lora(
    base_model: str,
    dataset_path: str,
    output_dir: str,
    lora_r: int = 16,
    lora_alpha: int = 32
):
    mlflow.set_experiment("llm-fine-tuning")

    with mlflow.start_run():
        # LoRA 설정
        lora_config = LoraConfig(
            task_type=TaskType.CAUSAL_LM,
            r=lora_r,
            lora_alpha=lora_alpha,
            target_modules=["q_proj", "v_proj"],
            lora_dropout=0.05,
            bias="none"
        )
        mlflow.log_params({
            "base_model": base_model,
            "lora_r": lora_r,
            "lora_alpha": lora_alpha
        })

        # 모델 준비
        model = AutoModelForCausalLM.from_pretrained(base_model)
        model = get_peft_model(model, lora_config)
        model.print_trainable_parameters()

        # 학습 설정
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=3,
            per_device_train_batch_size=4,
            gradient_accumulation_steps=4,
            learning_rate=2e-4,
            fp16=True,
            report_to="mlflow"
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
        )
        trainer.train()

        # 모델 저장 및 등록
        model.save_pretrained(output_dir)
        mlflow.transformers.log_model(
            transformers_model={"model": model, "tokenizer": tokenizer},
            artifact_path="fine-tuned-model",
            registered_model_name="customer-service-llm"
        )

퀴즈

Q1. MLOps Level 2에서 CT(Continuous Training)가 자동화되는 트리거 조건 4가지를 설명하세요.

정답: 데이터 트리거, 성능 트리거, 드리프트 트리거, 스케줄 트리거

설명:

  1. 데이터 트리거: 새 학습 데이터가 특정 임계값(예: 10만 건)에 도달하거나, 새 데이터 배치가 파이프라인에 유입될 때 자동 재학습이 시작됩니다.
  2. 성능 트리거: 프로덕션 모델의 정확도, F1 점수 등이 사전 정의된 임계값(예: accuracy < 0.92) 아래로 떨어질 때 트리거됩니다.
  3. 드리프트 트리거: Evidently 같은 도구로 감지된 데이터 드리프트 비율이 임계값(예: 30% 이상 피처 드리프트)을 초과할 때 트리거됩니다.
  4. 스케줄 트리거: 비즈니스 요구에 따른 주기적 재학습(예: 매주 월요일 새벽 2시)으로 데이터 신선도를 유지합니다.
Q2. 피처 스토어에서 온라인과 오프라인 저장소를 분리하는 이유를 설명하세요.

정답: 학습과 추론의 서로 다른 접근 패턴과 성능 요구사항을 각각 최적화하기 위해서입니다.

설명:

  • 오프라인 저장소(S3, BigQuery)는 모델 학습용입니다. 수백만 건의 이력 데이터를 배치로 스캔해야 하므로 대용량 처리와 비용 효율이 중요합니다. 높은 지연시간(초~분)을 허용합니다.
  • 온라인 저장소(Redis, DynamoDB)는 실시간 추론용입니다. 특정 엔티티(사용자 ID, 상품 ID)의 최신 피처를 수 밀리초 내에 조회해야 하므로 저지연 단건 조회에 최적화됩니다.
  • 두 저장소를 분리하지 않으면 학습 시 대용량 스캔 쿼리가 실시간 추론 조회를 방해하거나, 반대로 실시간 요구사항에 맞추다 학습 비용이 폭증합니다.
Q3. 데이터 드리프트와 컨셉 드리프트의 차이와 각각 감지하는 방법을 설명하세요.

정답: 데이터 드리프트는 P(X) 변화, 컨셉 드리프트는 P(Y|X) 변화

설명:

  • 데이터 드리프트: 입력 피처의 통계적 분포가 변화합니다. Kolmogorov-Smirnov 검정, Population Stability Index(PSI), JS Divergence 등으로 감지합니다. Evidently의 DataDriftPreset이 대표적입니다. 레이블 없이도 감지 가능합니다.
  • 컨셉 드리프트: 동일한 입력에 대한 올바른 출력이 변합니다. 예를 들어 사기 탐지에서 새로운 사기 패턴이 등장하면 기존 규칙이 유효하지 않게 됩니다. 실제 레이블이 필요하며, 모델 성능(accuracy, F1) 저하로 감지합니다. 레이블이 늦게 들어오는 경우 프록시 메트릭을 활용합니다.
Q4. DVC가 Git으로 대용량 ML 데이터를 관리하는 방식을 설명하세요.

정답: 포인터(메타데이터) 파일을 Git에 저장하고 실제 데이터는 원격 스토리지에 저장

설명: DVC는 대용량 파일(데이터셋, 모델)을 직접 Git에 저장하지 않습니다. 대신 .dvc 확장자의 메타데이터 파일을 생성하여 Git으로 추적합니다. 이 파일에는 실제 데이터의 MD5 해시, 크기, 경로 등이 저장됩니다. 실제 데이터는 S3, GCS, Azure Blob 등 원격 스토리지에 dvc push로 업로드됩니다. 다른 환경에서 dvc pull로 정확히 동일한 버전의 데이터를 내려받을 수 있습니다. Git 커밋과 DVC 데이터 버전이 1:1로 연결되어 실험 재현성을 보장합니다.

Q5. MLflow Model Registry에서 Staging에서 Production으로 프로모션 전 검증해야 할 항목을 설명하세요.

정답: 성능 검증, 공정성 검증, 통합 테스트, 성능(레이턴시) 테스트, 데이터 스키마 호환성

설명:

  1. 성능 검증: 홀드아웃 테스트셋 또는 최근 프로덕션 데이터에서 정확도, F1, AUC 등이 기존 Production 모델 대비 동등하거나 개선되었는지 확인합니다.
  2. 공정성 검증: 특정 인구 집단, 연령대 등에서 성능 편향이 없는지 슬라이스별 메트릭을 검토합니다.
  3. 통합 테스트: 실제 서빙 환경(API, 피처 스토어 연결)에서 end-to-end 예측이 정상 작동하는지 확인합니다.
  4. 성능(레이턴시) 테스트: 평균 응답시간 및 P99 레이턴시가 SLA를 만족하는지 부하 테스트를 수행합니다.
  5. 스키마 호환성: 입력 피처 스키마, 출력 형식이 현재 서빙 인프라와 호환되는지 확인합니다.