Skip to content

Split View: MLOps & 모델 라이프사이클 완전 정복: MLflow, DVC, LLMOps까지

|

MLOps & 모델 라이프사이클 완전 정복: MLflow, DVC, LLMOps까지

목차

  1. MLOps 개요 및 성숙도 모델
  2. 실험 추적: MLflow & Weights and Biases
  3. 데이터 버전 관리: DVC
  4. 피처 스토어
  5. 모델 레지스트리
  6. ML을 위한 CI/CD
  7. 모델 모니터링 & 드리프트 감지
  8. LLMOps
  9. 퀴즈

MLOps 개요 및 성숙도 모델

MLOps(Machine Learning Operations)는 ML 시스템을 프로덕션에서 안정적으로 운영하기 위한 방법론, 도구, 문화의 집합입니다. DevOps 원칙을 ML 워크플로우에 적용하여 모델 개발부터 배포, 모니터링, 재학습까지의 전체 라이프사이클을 자동화합니다.

MLOps가 필요한 이유

ML 프로젝트의 95% 이상이 프로덕션 배포에 실패한다는 통계가 있습니다. 그 원인은 다음과 같습니다:

  • 재현 불가능한 실험: 코드, 데이터, 환경이 버전 관리되지 않음
  • 수동 배포 프로세스: 느리고 오류 발생이 잦음
  • 모니터링 부재: 모델 성능 저하를 늦게 발견
  • 팀 사일로: 데이터 과학팀과 엔지니어링팀 간 단절

MLOps 성숙도 레벨

Google의 MLOps 성숙도 모델은 세 단계로 구분됩니다.

Level 0: 수동 프로세스

모든 과정이 수동입니다. 데이터 과학자가 Jupyter Notebook에서 실험하고 결과를 수동으로 배포합니다.

특징설명
배포 주기수개월에 한 번
자동화 수준없음
재현성낮음
모니터링없거나 수동

한계점: 실험 추적 부재, 코드와 데이터 버전 불일치, 배포 오류, 모델 성능 저하 감지 불가

Level 1: ML 파이프라인 자동화

CT(Continuous Training)가 도입됩니다. 데이터 파이프라인과 모델 학습이 자동화되지만 CI/CD는 아직 수동입니다.

핵심 구성 요소:

  • 자동화된 데이터 검증 파이프라인
  • 피처 엔지니어링 파이프라인
  • 모델 학습 파이프라인 (Kubeflow Pipelines, Apache Airflow 등)
  • 모델 성능 평가 자동화
  • 피처 스토어 도입
# Kubeflow Pipeline 예시 - Level 1 CT 파이프라인
import kfp
from kfp import dsl

@dsl.component
def data_validation_op(data_path: str) -> bool:
    import great_expectations as ge
    ds = ge.read_csv(data_path)
    results = ds.expect_column_values_to_not_be_null("target")
    return results["success"]

@dsl.component
def train_model_op(data_path: str, model_output: str):
    import mlflow
    import sklearn
    # 모델 학습 로직
    pass

@dsl.pipeline(name="CT Pipeline")
def ct_pipeline(data_path: str):
    validation = data_validation_op(data_path=data_path)
    with dsl.Condition(validation.output == True):
        train_model_op(data_path=data_path, model_output="/models/")

Level 2: CI/CD 파이프라인 자동화

완전한 MLOps 자동화 단계입니다. 코드, 데이터, 모델이 모두 버전 관리되며 CI/CD/CT가 완전히 자동화됩니다.

자동화 트리거 조건:

  • 새로운 학습 데이터 도달 (스케줄 또는 데이터 임계값)
  • 모델 성능 지표 저하 감지
  • 데이터 드리프트 감지
  • 코드 변경 (새 피처, 알고리즘 개선)

Level 2 아키텍처:

소스 코드 변경 또는 데이터 트리거
CI 파이프라인 (테스트, 빌드)
CD 파이프라인 (파이프라인 배포)
CT 파이프라인 (자동 재학습)
모델 평가 → 통과/실패
모델 레지스트리 등록
StagingProduction 프로모션
모니터링 & 알림

실험 추적: MLflow & Weights and Biases

MLflow 완전 가이드

MLflow는 ML 라이프사이클 관리를 위한 오픈소스 플랫폼입니다. 4가지 핵심 컴포넌트로 구성됩니다.

MLflow Tracking

실험 파라미터, 메트릭, 아티팩트를 추적합니다.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

# MLflow Tracking 서버 설정
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="rf-baseline") as run:
    # 하이퍼파라미터 로깅
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5,
        "random_state": 42
    }
    mlflow.log_params(params)

    # 모델 학습
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # 메트릭 로깅
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred, average="weighted"),
    }
    mlflow.log_metrics(metrics)

    # 모델 자동 로깅 (sklearn autolog 사용 시)
    # mlflow.sklearn.autolog()

    # 모델 저장
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        registered_model_name="fraud-detection",
        input_example=X_test[:5],
        signature=mlflow.models.infer_signature(X_train, y_pred)
    )

    # 커스텀 아티팩트
    import matplotlib.pyplot as plt
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Accuracy: {metrics['accuracy']:.4f}")

MLflow Autolog

프레임워크별 자동 로깅으로 코드 최소화:

import mlflow

# 프레임워크 자동 감지 및 로깅
mlflow.autolog()

# PyTorch 전용 autolog
mlflow.pytorch.autolog(
    log_every_n_epoch=1,
    log_models=True,
    disable=False,
    exclusive=False,
    log_datasets=True
)

# XGBoost 전용 autolog
mlflow.xgboost.autolog(
    log_input_examples=True,
    log_model_signatures=True,
    log_models=True,
    log_datasets=True
)

MLflow Projects

재현 가능한 ML 프로젝트 패키징:

# MLproject 파일
name: fraud-detection

conda_env: conda.yaml

entry_points:
  main:
    parameters:
      n_estimators: { type: int, default: 100 }
      max_depth: { type: int, default: 10 }
      data_path: { type: str, default: 'data/train.csv' }
    command: 'python train.py --n_estimators {n_estimators} --max_depth {max_depth} --data_path {data_path}'
  evaluate:
    parameters:
      model_uri: { type: str }
      test_data: { type: str }
    command: 'python evaluate.py --model_uri {model_uri} --test_data {test_data}'

Weights & Biases (W&B)

W&B는 실험 추적, 시각화, 하이퍼파라미터 최적화를 제공하는 MLOps 플랫폼입니다.

import wandb
from wandb.integration.keras import WandbCallback

# W&B 초기화
run = wandb.init(
    project="image-classification",
    config={
        "learning_rate": 0.001,
        "epochs": 50,
        "batch_size": 32,
        "architecture": "ResNet50"
    }
)

# W&B Sweep으로 하이퍼파라미터 최적화
sweep_config = {
    "method": "bayes",
    "metric": {"name": "val_accuracy", "goal": "maximize"},
    "parameters": {
        "learning_rate": {"min": 1e-5, "max": 1e-2},
        "batch_size": {"values": [16, 32, 64]},
        "dropout": {"min": 0.1, "max": 0.5}
    }
}
sweep_id = wandb.sweep(sweep_config, project="image-classification")
wandb.agent(sweep_id, function=train_fn, count=50)

데이터 버전 관리: DVC

DVC(Data Version Control)는 Git과 연동하여 대용량 데이터셋과 ML 파이프라인을 버전 관리하는 도구입니다.

DVC 작동 방식

DVC는 대용량 파일을 직접 Git에 저장하는 대신 .dvc 메타데이터 파일(포인터)만 Git에 커밋합니다. 실제 데이터는 S3, GCS, Azure Blob, SSH 등 원격 스토리지에 저장됩니다.

# DVC 초기화
git init
dvc init

# 원격 스토리지 설정 (S3)
dvc remote add -d myremote s3://my-bucket/dvc-store
dvc remote modify myremote region us-east-1

# 데이터 추가
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Add training data v1"
dvc push

# 다른 환경에서 데이터 가져오기
git pull
dvc pull

DVC Pipeline (dvc.yaml)

재현 가능한 ML 파이프라인을 선언적으로 정의합니다:

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py --input data/raw.csv --output data/processed/
    deps:
      - src/prepare.py
      - data/raw.csv
    outs:
      - data/processed/train.csv
      - data/processed/test.csv
    params:
      - prepare:
          - split_ratio
          - random_seed

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/processed/train.csv
    outs:
      - data/features/train_features.pkl
    params:
      - featurize:
          - max_features
          - ngrams

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/train_features.pkl
    outs:
      - models/model.pkl
    metrics:
      - reports/metrics.json:
          cache: false
    params:
      - train:
          - n_estimators
          - max_depth
          - random_seed

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/processed/test.csv
    metrics:
      - reports/eval_metrics.json:
          cache: false
    plots:
      - reports/plots/confusion_matrix.csv:
          cache: false
# params.yaml
prepare:
  split_ratio: 0.8
  random_seed: 42
featurize:
  max_features: 1000
  ngrams: 2
train:
  n_estimators: 100
  max_depth: 10
  random_seed: 42

DVC 실험 관리

# 파이프라인 실행
dvc repro

# 실험 브랜치 생성
dvc exp run --set-param train.n_estimators=200 --name exp-200-trees

# 실험 비교
dvc exp show

# 실험 결과 표 출력
dvc metrics show
dvc metrics diff

피처 스토어

피처 스토어(Feature Store)는 ML 피처를 중앙 집중식으로 저장, 공유, 서빙하는 데이터 레이어입니다.

피처 스토어가 필요한 이유

  • 학습/서빙 스큐 제거: 학습과 추론에서 동일한 피처 변환 보장
  • 피처 재사용: 팀 간 피처 공유로 중복 작업 제거
  • 저지연 서빙: 온라인 예측을 위한 실시간 피처 조회
  • 피처 일관성: 배치와 실시간 파이프라인 일관성 유지

온라인 vs 오프라인 저장소

구분온라인 저장소오프라인 저장소
목적실시간 추론 서빙모델 학습
지연시간수 밀리초수 초 ~ 분
저장소Redis, DynamoDB, CassandraS3, BigQuery, Hive
데이터 양최신 상태 (현재 값)전체 이력
쿼리 패턴단건 조회 (키 기반)배치 스캔

Feast 피처 스토어

# feature_repo/feature_store.yaml
project: fraud_detection
registry: data/registry.db
provider: local
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: bigquery
  dataset: feast_dev
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.types import Float32, Int64, String

# 엔티티 정의
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="사용자 ID"
)

# 데이터 소스 정의
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created"
)

# 피처 뷰 정의
user_stats_fv = FeatureView(
    name="user_stats",
    entities=["user_id"],
    ttl=timedelta(days=7),
    features=[
        Feature(name="transaction_count_7d", dtype=Float32),
        Feature(name="avg_transaction_amount", dtype=Float32),
        Feature(name="days_since_last_login", dtype=Int64),
        Feature(name="account_age_days", dtype=Int64),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "fraud", "version": "v2"},
)
# 피처 스토어 사용 - 학습
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# 학습 데이터 조회 (오프라인)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": pd.to_datetime(["2026-03-01", "2026-03-01", "2026-03-01"])
})

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
        "user_stats:days_since_last_login",
    ]
).to_df()

# 온라인 서빙 - 실시간 피처 조회
feature_vector = store.get_online_features(
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
    ],
    entity_rows=[{"user_id": 1001}]
).to_dict()

피처 드리프트 감지

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# 피처 드리프트 리포트 생성
report = Report(metrics=[DataDriftPreset()])
report.run(
    reference_data=reference_features,
    current_data=current_features,
    column_mapping=ColumnMapping(target="label")
)
report.save_html("feature_drift_report.html")

# 드리프트 결과 확인
results = report.as_dict()
drifted_features = [
    col for col, info in results["metrics"][0]["result"]["drift_by_columns"].items()
    if info["drift_detected"]
]
print(f"드리프트 감지된 피처: {drifted_features}")

모델 레지스트리

MLflow Model Registry

MLflow Model Registry는 모델 버전 관리, 스테이지 전환, 협업을 위한 중앙 저장소입니다.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# 새 모델 등록
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(
    model_uri=model_uri,
    name="fraud-detection"
)

# 모델 설명 추가
client.update_registered_model(
    name="fraud-detection",
    description="결제 사기 탐지 모델 - RandomForest 기반"
)

client.update_model_version(
    name="fraud-detection",
    version=model_version.version,
    description=f"Accuracy: 0.956, F1: 0.943 on test set"
)

# Staging으로 전환
client.transition_model_version_stage(
    name="fraud-detection",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False
)

# Staging 모델 로드 및 검증
staging_model = mlflow.pyfunc.load_model(
    model_uri="models:/fraud-detection/Staging"
)
staging_preds = staging_model.predict(X_val)
staging_accuracy = accuracy_score(y_val, staging_preds)

# 검증 통과 시 Production 승격
if staging_accuracy > 0.95:
    client.transition_model_version_stage(
        name="fraud-detection",
        version=model_version.version,
        stage="Production",
        archive_existing_versions=True  # 이전 Production 버전 아카이브
    )
    print(f"모델 v{model_version.version} Production 승격 완료")

Hugging Face Hub 모델 레지스트리

from huggingface_hub import HfApi, Repository
from transformers import AutoModelForSequenceClassification, AutoTokenizer

api = HfApi()

# 모델 업로드
api.upload_folder(
    folder_path="./fine-tuned-model",
    repo_id="myorg/sentiment-classifier-v2",
    repo_type="model",
)

# 모델 카드 업데이트
api.upload_file(
    path_or_fileobj="README.md",
    path_in_repo="README.md",
    repo_id="myorg/sentiment-classifier-v2",
)

# 특정 버전 태깅
api.create_tag(
    repo_id="myorg/sentiment-classifier-v2",
    tag="v2.1.0",
    tag_message="Improved accuracy on edge cases"
)

ML을 위한 CI/CD

GitHub Actions ML 파이프라인

# .github/workflows/ml-cicd.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'params.yaml'
      - 'dvc.yaml'
  schedule:
    - cron: '0 2 * * 1' # 매주 월요일 오전 2시 자동 재학습

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run unit tests
        run: pytest tests/ -v --cov=src
      - name: Data validation
        run: python src/validate_data.py

  train-and-evaluate:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Configure DVC remote
        run: |
          dvc remote modify myremote access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
          dvc remote modify myremote secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      - name: Pull data
        run: dvc pull
      - name: Run DVC pipeline
        run: dvc repro
      - name: Log metrics to MLflow
        run: python src/log_results.py
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
      - name: Check model performance gate
        run: |
          python src/check_performance_gate.py \
            --min-accuracy 0.95 \
            --min-f1 0.93
      - name: Push results
        run: |
          dvc push
          git add reports/metrics.json dvc.lock
          git commit -m "chore: update metrics [skip ci]"
          git push

  deploy-staging:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Promote model to Staging
        run: python src/promote_model.py --stage Staging
      - name: Run integration tests
        run: pytest tests/integration/ -v
      - name: Deploy to staging endpoint
        run: kubectl apply -f k8s/staging/

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Promote model to Production
        run: python src/promote_model.py --stage Production
      - name: Blue/Green deployment
        run: ./scripts/blue_green_deploy.sh
      - name: Smoke tests
        run: pytest tests/smoke/ -v

자동 재학습 트리거

# src/check_retrain_trigger.py
import mlflow
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

def should_retrain(
    current_data,
    reference_data,
    performance_threshold=0.92,
    drift_threshold=0.3
) -> tuple[bool, str]:
    """재학습 필요 여부 판단"""

    # 1. 성능 기반 트리거
    current_metrics = get_current_metrics()
    if current_metrics["accuracy"] < performance_threshold:
        return True, f"성능 저하: accuracy={current_metrics['accuracy']:.3f}"

    # 2. 데이터 드리프트 트리거
    report = Report(metrics=[DataDriftPreset()])
    report.run(reference_data=reference_data, current_data=current_data)
    results = report.as_dict()
    drift_share = results["metrics"][0]["result"]["share_of_drifted_columns"]

    if drift_share > drift_threshold:
        return True, f"데이터 드리프트: {drift_share:.1%} 피처 드리프트 감지"

    return False, "재학습 불필요"

def get_current_metrics():
    client = mlflow.tracking.MlflowClient()
    prod_model = client.get_latest_versions("fraud-detection", stages=["Production"])[0]
    run = client.get_run(prod_model.run_id)
    return {
        "accuracy": float(run.data.metrics.get("accuracy", 0)),
        "f1_score": float(run.data.metrics.get("f1_score", 0))
    }

모델 모니터링 & 드리프트 감지

데이터 드리프트 vs 컨셉 드리프트

데이터 드리프트(Data Drift): 입력 피처의 분포가 변화합니다. P(X)가 변하지만 P(Y|X)는 유지됩니다. 예: 사용자 연령 분포 변화, 거래 금액 분포 변화.

컨셉 드리프트(Concept Drift): 입력과 출력 간 관계가 변화합니다. P(Y|X)가 변합니다. 예: 사기 패턴 변화, 사용자 선호도 변화.

Evidently 드리프트 모니터링

import pandas as pd
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently import ColumnMapping
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
    TargetDriftPreset,
    ClassificationPreset
)
from evidently.tests import (
    TestNumberOfDriftedColumns,
    TestShareOfDriftedColumns,
    TestColumnDrift
)

# 컬럼 매핑 설정
column_mapping = ColumnMapping(
    target="fraud_label",
    prediction="fraud_score",
    numerical_features=["amount", "transaction_count_7d", "avg_amount"],
    categorical_features=["merchant_category", "payment_method"]
)

# 종합 드리프트 리포트
report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
    TargetDriftPreset(),
    ClassificationPreset()
])
report.run(
    reference_data=reference_df,
    current_data=production_df,
    column_mapping=column_mapping
)
report.save_html("monitoring/report.html")

# 알림 테스트 스위트
test_suite = TestSuite(tests=[
    TestNumberOfDriftedColumns(lt=3),
    TestShareOfDriftedColumns(lt=0.3),
    TestColumnDrift(column_name="amount"),
    TestColumnDrift(column_name="transaction_count_7d"),
])
test_suite.run(
    reference_data=reference_df,
    current_data=production_df
)

# 테스트 실패 시 알림
results = test_suite.as_dict()
failed_tests = [t for t in results["tests"] if t["status"] == "FAIL"]
if failed_tests:
    send_alert(f"모니터링 경보: {len(failed_tests)}개 테스트 실패")

Prometheus + Grafana 메트릭 모니터링

# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# 메트릭 정의
prediction_counter = Counter(
    "model_predictions_total",
    "총 예측 수",
    ["model_version", "result"]
)
prediction_latency = Histogram(
    "model_prediction_latency_seconds",
    "예측 지연 시간",
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)
model_accuracy = Gauge(
    "model_accuracy_current",
    "현재 모델 정확도"
)
drift_score = Gauge(
    "feature_drift_score",
    "피처 드리프트 점수",
    ["feature_name"]
)

# 서빙 코드에서 메트릭 업데이트
def predict_with_monitoring(features, model_version="v2.1"):
    start_time = time.time()
    prediction = model.predict(features)
    latency = time.time() - start_time

    prediction_latency.observe(latency)
    prediction_counter.labels(
        model_version=model_version,
        result="fraud" if prediction[0] == 1 else "normal"
    ).inc()

    return prediction

LLMOps

LLMOps는 대규모 언어 모델의 개발, 배포, 운영을 위한 MLOps 확장입니다.

LLM 파이프라인 특수 과제

  • 비결정적 출력: 동일 입력에 다른 출력 → 평가 복잡
  • 프롬프트 민감성: 작은 변경이 큰 성능 차이 유발
  • 고비용 Fine-tuning: 대용량 GPU 자원 필요
  • 환각(Hallucination): 사실과 다른 정보 생성
  • 맥락 길이 관리: 긴 컨텍스트 처리

LangSmith를 이용한 LLM 추적

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langsmith import Client
import os

# LangSmith 설정
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-chatbot"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

# LangChain 체인 정의 (자동으로 LangSmith에 트레이스 기록)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
prompt = ChatPromptTemplate.from_template(
    "당신은 친절한 고객 서비스 에이전트입니다.\n\n질문: {question}\n\n답변:"
)
chain = prompt | llm

# 실행 - 자동 트레이싱
response = chain.invoke({"question": "환불 정책이 어떻게 되나요?"})

# LangSmith 클라이언트로 평가
langsmith_client = Client()

# 데이터셋 생성
dataset = langsmith_client.create_dataset(
    dataset_name="customer-service-eval",
    description="고객 서비스 챗봇 평가 데이터셋"
)

# 평가 예제 추가
langsmith_client.create_examples(
    inputs=[{"question": "환불 정책을 알려주세요"}],
    outputs=[{"answer": "구매 후 30일 이내 환불 가능합니다."}],
    dataset_id=dataset.id
)

# 자동 평가 실행
from langsmith.evaluation import evaluate, LangChainStringEvaluator

evaluators = [
    LangChainStringEvaluator("cot_qa"),
    LangChainStringEvaluator("labeled_criteria", config={
        "criteria": "correctness"
    })
]
results = evaluate(
    lambda x: chain.invoke(x),
    data=dataset.name,
    evaluators=evaluators,
    experiment_prefix="gpt4o-baseline"
)

프롬프트 버전 관리

# prompt_registry.py
import mlflow
from dataclasses import dataclass
from typing import Optional

@dataclass
class PromptVersion:
    template: str
    version: str
    description: str
    metrics: Optional[dict] = None

class PromptRegistry:
    def __init__(self, mlflow_uri: str):
        mlflow.set_tracking_uri(mlflow_uri)
        self.experiment_name = "prompt-versions"
        mlflow.set_experiment(self.experiment_name)

    def register_prompt(self, prompt: PromptVersion) -> str:
        with mlflow.start_run(run_name=f"prompt-{prompt.version}") as run:
            mlflow.log_param("version", prompt.version)
            mlflow.log_param("description", prompt.description)
            mlflow.log_text(prompt.template, "prompt_template.txt")
            if prompt.metrics:
                mlflow.log_metrics(prompt.metrics)
            return run.info.run_id

    def get_prompt(self, version: str) -> str:
        client = mlflow.tracking.MlflowClient()
        runs = client.search_runs(
            experiment_ids=[mlflow.get_experiment_by_name(self.experiment_name).experiment_id],
            filter_string=f"params.version = '{version}'"
        )
        if not runs:
            raise ValueError(f"프롬프트 버전 {version}을 찾을 수 없습니다")
        artifact_uri = runs[0].info.artifact_uri
        return mlflow.artifacts.load_text(f"{artifact_uri}/prompt_template.txt")

# 사용 예시
registry = PromptRegistry("http://mlflow-server:5000")
registry.register_prompt(PromptVersion(
    template="당신은 {role}입니다. {context}\n\n질문: {question}\n답변:",
    version="v1.2.0",
    description="컨텍스트 포함 프롬프트 개선",
    metrics={"accuracy": 0.87, "hallucination_rate": 0.03}
))

LLM Fine-tuning 파이프라인

# fine_tuning_pipeline.py
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer
)
from peft import LoraConfig, get_peft_model, TaskType
import mlflow

def fine_tune_with_lora(
    base_model: str,
    dataset_path: str,
    output_dir: str,
    lora_r: int = 16,
    lora_alpha: int = 32
):
    mlflow.set_experiment("llm-fine-tuning")

    with mlflow.start_run():
        # LoRA 설정
        lora_config = LoraConfig(
            task_type=TaskType.CAUSAL_LM,
            r=lora_r,
            lora_alpha=lora_alpha,
            target_modules=["q_proj", "v_proj"],
            lora_dropout=0.05,
            bias="none"
        )
        mlflow.log_params({
            "base_model": base_model,
            "lora_r": lora_r,
            "lora_alpha": lora_alpha
        })

        # 모델 준비
        model = AutoModelForCausalLM.from_pretrained(base_model)
        model = get_peft_model(model, lora_config)
        model.print_trainable_parameters()

        # 학습 설정
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=3,
            per_device_train_batch_size=4,
            gradient_accumulation_steps=4,
            learning_rate=2e-4,
            fp16=True,
            report_to="mlflow"
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
        )
        trainer.train()

        # 모델 저장 및 등록
        model.save_pretrained(output_dir)
        mlflow.transformers.log_model(
            transformers_model={"model": model, "tokenizer": tokenizer},
            artifact_path="fine-tuned-model",
            registered_model_name="customer-service-llm"
        )

퀴즈

Q1. MLOps Level 2에서 CT(Continuous Training)가 자동화되는 트리거 조건 4가지를 설명하세요.

정답: 데이터 트리거, 성능 트리거, 드리프트 트리거, 스케줄 트리거

설명:

  1. 데이터 트리거: 새 학습 데이터가 특정 임계값(예: 10만 건)에 도달하거나, 새 데이터 배치가 파이프라인에 유입될 때 자동 재학습이 시작됩니다.
  2. 성능 트리거: 프로덕션 모델의 정확도, F1 점수 등이 사전 정의된 임계값(예: accuracy < 0.92) 아래로 떨어질 때 트리거됩니다.
  3. 드리프트 트리거: Evidently 같은 도구로 감지된 데이터 드리프트 비율이 임계값(예: 30% 이상 피처 드리프트)을 초과할 때 트리거됩니다.
  4. 스케줄 트리거: 비즈니스 요구에 따른 주기적 재학습(예: 매주 월요일 새벽 2시)으로 데이터 신선도를 유지합니다.
Q2. 피처 스토어에서 온라인과 오프라인 저장소를 분리하는 이유를 설명하세요.

정답: 학습과 추론의 서로 다른 접근 패턴과 성능 요구사항을 각각 최적화하기 위해서입니다.

설명:

  • 오프라인 저장소(S3, BigQuery)는 모델 학습용입니다. 수백만 건의 이력 데이터를 배치로 스캔해야 하므로 대용량 처리와 비용 효율이 중요합니다. 높은 지연시간(초~분)을 허용합니다.
  • 온라인 저장소(Redis, DynamoDB)는 실시간 추론용입니다. 특정 엔티티(사용자 ID, 상품 ID)의 최신 피처를 수 밀리초 내에 조회해야 하므로 저지연 단건 조회에 최적화됩니다.
  • 두 저장소를 분리하지 않으면 학습 시 대용량 스캔 쿼리가 실시간 추론 조회를 방해하거나, 반대로 실시간 요구사항에 맞추다 학습 비용이 폭증합니다.
Q3. 데이터 드리프트와 컨셉 드리프트의 차이와 각각 감지하는 방법을 설명하세요.

정답: 데이터 드리프트는 P(X) 변화, 컨셉 드리프트는 P(Y|X) 변화

설명:

  • 데이터 드리프트: 입력 피처의 통계적 분포가 변화합니다. Kolmogorov-Smirnov 검정, Population Stability Index(PSI), JS Divergence 등으로 감지합니다. Evidently의 DataDriftPreset이 대표적입니다. 레이블 없이도 감지 가능합니다.
  • 컨셉 드리프트: 동일한 입력에 대한 올바른 출력이 변합니다. 예를 들어 사기 탐지에서 새로운 사기 패턴이 등장하면 기존 규칙이 유효하지 않게 됩니다. 실제 레이블이 필요하며, 모델 성능(accuracy, F1) 저하로 감지합니다. 레이블이 늦게 들어오는 경우 프록시 메트릭을 활용합니다.
Q4. DVC가 Git으로 대용량 ML 데이터를 관리하는 방식을 설명하세요.

정답: 포인터(메타데이터) 파일을 Git에 저장하고 실제 데이터는 원격 스토리지에 저장

설명: DVC는 대용량 파일(데이터셋, 모델)을 직접 Git에 저장하지 않습니다. 대신 .dvc 확장자의 메타데이터 파일을 생성하여 Git으로 추적합니다. 이 파일에는 실제 데이터의 MD5 해시, 크기, 경로 등이 저장됩니다. 실제 데이터는 S3, GCS, Azure Blob 등 원격 스토리지에 dvc push로 업로드됩니다. 다른 환경에서 dvc pull로 정확히 동일한 버전의 데이터를 내려받을 수 있습니다. Git 커밋과 DVC 데이터 버전이 1:1로 연결되어 실험 재현성을 보장합니다.

Q5. MLflow Model Registry에서 Staging에서 Production으로 프로모션 전 검증해야 할 항목을 설명하세요.

정답: 성능 검증, 공정성 검증, 통합 테스트, 성능(레이턴시) 테스트, 데이터 스키마 호환성

설명:

  1. 성능 검증: 홀드아웃 테스트셋 또는 최근 프로덕션 데이터에서 정확도, F1, AUC 등이 기존 Production 모델 대비 동등하거나 개선되었는지 확인합니다.
  2. 공정성 검증: 특정 인구 집단, 연령대 등에서 성능 편향이 없는지 슬라이스별 메트릭을 검토합니다.
  3. 통합 테스트: 실제 서빙 환경(API, 피처 스토어 연결)에서 end-to-end 예측이 정상 작동하는지 확인합니다.
  4. 성능(레이턴시) 테스트: 평균 응답시간 및 P99 레이턴시가 SLA를 만족하는지 부하 테스트를 수행합니다.
  5. 스키마 호환성: 입력 피처 스키마, 출력 형식이 현재 서빙 인프라와 호환되는지 확인합니다.

MLOps & Model Lifecycle Management: MLflow, DVC, and LLMOps Complete Guide

Table of Contents

  1. MLOps Overview and Maturity Model
  2. Experiment Tracking: MLflow & Weights and Biases
  3. Data Version Control: DVC
  4. Feature Store
  5. Model Registry
  6. CI/CD for ML
  7. Model Monitoring & Drift Detection
  8. LLMOps
  9. Quiz

MLOps Overview and Maturity Model

MLOps (Machine Learning Operations) is a set of practices, tools, and culture for reliably operating ML systems in production. It applies DevOps principles to ML workflows, automating the full lifecycle from model development through deployment, monitoring, and retraining.

Why MLOps Matters

Statistics show that over 95% of ML projects fail to reach production deployment. The root causes include:

  • Irreproducible experiments: Code, data, and environments are not version-controlled
  • Manual deployment processes: Slow and error-prone
  • Absent monitoring: Model performance degradation is detected too late
  • Team silos: Disconnect between data science and engineering teams

MLOps Maturity Levels

Google's MLOps maturity model defines three stages of automation.

Level 0: Manual Process

Everything is done manually. Data scientists experiment in Jupyter Notebooks and deploy results by hand.

CharacteristicDescription
Deployment frequencyEvery few months
Automation levelNone
ReproducibilityLow
MonitoringAbsent or manual

Limitations: No experiment tracking, code/data version mismatches, deployment errors, inability to detect model degradation.

Level 1: ML Pipeline Automation

CT (Continuous Training) is introduced. Data pipelines and model training are automated, but CI/CD remains manual.

Key components:

  • Automated data validation pipeline
  • Feature engineering pipeline
  • Model training pipeline (Kubeflow Pipelines, Apache Airflow, etc.)
  • Automated model performance evaluation
  • Introduction of feature stores
# Kubeflow Pipeline example - Level 1 CT pipeline
import kfp
from kfp import dsl

@dsl.component
def data_validation_op(data_path: str) -> bool:
    import great_expectations as ge
    ds = ge.read_csv(data_path)
    results = ds.expect_column_values_to_not_be_null("target")
    return results["success"]

@dsl.component
def train_model_op(data_path: str, model_output: str):
    import mlflow
    # Training logic here
    pass

@dsl.pipeline(name="CT Pipeline")
def ct_pipeline(data_path: str):
    validation = data_validation_op(data_path=data_path)
    with dsl.Condition(validation.output == True):
        train_model_op(data_path=data_path, model_output="/models/")

Level 2: CI/CD Pipeline Automation

Full MLOps automation. Code, data, and models are all version-controlled, and CI/CD/CT are completely automated.

Automated trigger conditions:

  • New training data arriving (schedule or data volume threshold)
  • Model performance metric degradation detected
  • Data drift detected
  • Code changes (new features, algorithm improvements)

Level 2 Architecture:

Source code change or data trigger
CI Pipeline (test, build)
CD Pipeline (deploy pipeline)
CT Pipeline (automated retraining)
Model evaluation → pass/fail gate
Model registry registration
StagingProduction promotion
Monitoring & alerting

Experiment Tracking: MLflow & Weights and Biases

MLflow Complete Guide

MLflow is an open-source platform for managing the ML lifecycle. It consists of four core components.

MLflow Tracking

Tracks experiment parameters, metrics, and artifacts.

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score

# Configure MLflow Tracking server
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="rf-baseline") as run:
    # Log hyperparameters
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5,
        "random_state": 42
    }
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Log metrics
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred, average="weighted"),
    }
    mlflow.log_metrics(metrics)

    # Save model with signature
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        registered_model_name="fraud-detection",
        input_example=X_test[:5],
        signature=mlflow.models.infer_signature(X_train, y_pred)
    )

    # Log custom artifacts
    import matplotlib.pyplot as plt
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Accuracy: {metrics['accuracy']:.4f}")

MLflow Autolog

Framework-specific automatic logging to minimize boilerplate:

import mlflow

# Auto-detect framework and log
mlflow.autolog()

# PyTorch-specific autolog
mlflow.pytorch.autolog(
    log_every_n_epoch=1,
    log_models=True,
    disable=False,
    log_datasets=True
)

# XGBoost-specific autolog
mlflow.xgboost.autolog(
    log_input_examples=True,
    log_model_signatures=True,
    log_models=True,
    log_datasets=True
)

MLflow Projects

Packaging reproducible ML projects:

# MLproject file
name: fraud-detection

conda_env: conda.yaml

entry_points:
  main:
    parameters:
      n_estimators: { type: int, default: 100 }
      max_depth: { type: int, default: 10 }
      data_path: { type: str, default: 'data/train.csv' }
    command: 'python train.py --n_estimators {n_estimators} --max_depth {max_depth} --data_path {data_path}'
  evaluate:
    parameters:
      model_uri: { type: str }
      test_data: { type: str }
    command: 'python evaluate.py --model_uri {model_uri} --test_data {test_data}'

Weights & Biases (W&B)

W&B is an MLOps platform providing experiment tracking, visualization, and hyperparameter optimization.

import wandb

# Initialize W&B run
run = wandb.init(
    project="image-classification",
    config={
        "learning_rate": 0.001,
        "epochs": 50,
        "batch_size": 32,
        "architecture": "ResNet50"
    }
)

# W&B Sweep for hyperparameter optimization
sweep_config = {
    "method": "bayes",
    "metric": {"name": "val_accuracy", "goal": "maximize"},
    "parameters": {
        "learning_rate": {"min": 1e-5, "max": 1e-2},
        "batch_size": {"values": [16, 32, 64]},
        "dropout": {"min": 0.1, "max": 0.5}
    }
}
sweep_id = wandb.sweep(sweep_config, project="image-classification")
wandb.agent(sweep_id, function=train_fn, count=50)

Data Version Control: DVC

DVC (Data Version Control) works alongside Git to version-control large datasets and ML pipelines.

How DVC Works

Instead of storing large files directly in Git, DVC creates .dvc metadata files (pointers) that are committed to Git. The actual data is stored in remote storage such as S3, GCS, Azure Blob, or SSH servers.

# Initialize DVC
git init
dvc init

# Configure remote storage (S3)
dvc remote add -d myremote s3://my-bucket/dvc-store
dvc remote modify myremote region us-east-1

# Add data files
dvc add data/train.csv
git add data/train.csv.dvc .gitignore
git commit -m "Add training data v1"
dvc push

# Pull data in another environment
git pull
dvc pull

DVC Pipeline (dvc.yaml)

Declarative definition of reproducible ML pipelines:

# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py --input data/raw.csv --output data/processed/
    deps:
      - src/prepare.py
      - data/raw.csv
    outs:
      - data/processed/train.csv
      - data/processed/test.csv
    params:
      - prepare:
          - split_ratio
          - random_seed

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/processed/train.csv
    outs:
      - data/features/train_features.pkl
    params:
      - featurize:
          - max_features
          - ngrams

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/train_features.pkl
    outs:
      - models/model.pkl
    metrics:
      - reports/metrics.json:
          cache: false
    params:
      - train:
          - n_estimators
          - max_depth
          - random_seed

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/processed/test.csv
    metrics:
      - reports/eval_metrics.json:
          cache: false
    plots:
      - reports/plots/confusion_matrix.csv:
          cache: false

DVC Experiment Management

# Run the pipeline
dvc repro

# Create an experiment branch
dvc exp run --set-param train.n_estimators=200 --name exp-200-trees

# Compare experiments
dvc exp show

# Show metrics table
dvc metrics show
dvc metrics diff

Feature Store

A feature store is a centralized data layer for storing, sharing, and serving ML features.

Why Feature Stores Are Necessary

  • Eliminate training/serving skew: Guarantee identical feature transformations at training and inference
  • Feature reuse: Share features across teams to eliminate redundant work
  • Low-latency serving: Real-time feature lookup for online predictions
  • Feature consistency: Maintain consistency between batch and streaming pipelines

Online vs Offline Store

AspectOnline StoreOffline Store
PurposeReal-time inference servingModel training
LatencyMillisecondsSeconds to minutes
StorageRedis, DynamoDB, CassandraS3, BigQuery, Hive
Data volumeLatest state (current values)Full history
Query patternSingle-key lookupBatch scan

Feast Feature Store

# feature_repo/feature_store.yaml
project: fraud_detection
registry: data/registry.db
provider: local
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: bigquery
  dataset: feast_dev
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.types import Float32, Int64

# Define entity
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User identifier"
)

# Define data source
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created"
)

# Define feature view
user_stats_fv = FeatureView(
    name="user_stats",
    entities=["user_id"],
    ttl=timedelta(days=7),
    features=[
        Feature(name="transaction_count_7d", dtype=Float32),
        Feature(name="avg_transaction_amount", dtype=Float32),
        Feature(name="days_since_last_login", dtype=Int64),
        Feature(name="account_age_days", dtype=Int64),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "fraud", "version": "v2"},
)
# Using the feature store
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# Training data retrieval (offline)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": pd.to_datetime(["2026-03-01", "2026-03-01", "2026-03-01"])
})
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
        "user_stats:days_since_last_login",
    ]
).to_df()

# Online serving - real-time feature retrieval
feature_vector = store.get_online_features(
    features=[
        "user_stats:transaction_count_7d",
        "user_stats:avg_transaction_amount",
    ],
    entity_rows=[{"user_id": 1001}]
).to_dict()

Feature Drift Detection

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# Generate feature drift report
report = Report(metrics=[DataDriftPreset()])
report.run(
    reference_data=reference_features,
    current_data=current_features,
    column_mapping=ColumnMapping(target="label")
)
report.save_html("feature_drift_report.html")

# Check drift results
results = report.as_dict()
drifted_features = [
    col for col, info in results["metrics"][0]["result"]["drift_by_columns"].items()
    if info["drift_detected"]
]
print(f"Drifted features: {drifted_features}")

Model Registry

MLflow Model Registry

The MLflow Model Registry is a central repository for model version management, stage transitions, and team collaboration.

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register a new model
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(
    model_uri=model_uri,
    name="fraud-detection"
)

# Add model description
client.update_registered_model(
    name="fraud-detection",
    description="Payment fraud detection model - RandomForest based"
)
client.update_model_version(
    name="fraud-detection",
    version=model_version.version,
    description=f"Accuracy: 0.956, F1: 0.943 on test set"
)

# Transition to Staging
client.transition_model_version_stage(
    name="fraud-detection",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False
)

# Load and validate Staging model
staging_model = mlflow.pyfunc.load_model(
    model_uri="models:/fraud-detection/Staging"
)
staging_preds = staging_model.predict(X_val)
staging_accuracy = accuracy_score(y_val, staging_preds)

# Promote to Production if validation passes
if staging_accuracy > 0.95:
    client.transition_model_version_stage(
        name="fraud-detection",
        version=model_version.version,
        stage="Production",
        archive_existing_versions=True
    )
    print(f"Model v{model_version.version} promoted to Production")

Hugging Face Hub Model Registry

from huggingface_hub import HfApi

api = HfApi()

# Upload model
api.upload_folder(
    folder_path="./fine-tuned-model",
    repo_id="myorg/sentiment-classifier-v2",
    repo_type="model",
)

# Tag a specific version
api.create_tag(
    repo_id="myorg/sentiment-classifier-v2",
    tag="v2.1.0",
    tag_message="Improved accuracy on edge cases"
)

CI/CD for ML

GitHub Actions ML Pipeline

# .github/workflows/ml-cicd.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'src/**'
      - 'params.yaml'
      - 'dvc.yaml'
  schedule:
    - cron: '0 2 * * 1' # Auto-retrain every Monday at 2 AM

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run unit tests
        run: pytest tests/ -v --cov=src
      - name: Data validation
        run: python src/validate_data.py

  train-and-evaluate:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Configure DVC remote
        run: |
          dvc remote modify myremote access_key_id ${{ secrets.AWS_ACCESS_KEY_ID }}
          dvc remote modify myremote secret_access_key ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      - name: Pull data
        run: dvc pull
      - name: Run DVC pipeline
        run: dvc repro
      - name: Log metrics to MLflow
        run: python src/log_results.py
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
      - name: Check model performance gate
        run: |
          python src/check_performance_gate.py \
            --min-accuracy 0.95 \
            --min-f1 0.93
      - name: Push results
        run: |
          dvc push
          git add reports/metrics.json dvc.lock
          git commit -m "chore: update metrics [skip ci]"
          git push

  deploy-staging:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Promote model to Staging
        run: python src/promote_model.py --stage Staging
      - name: Run integration tests
        run: pytest tests/integration/ -v
      - name: Deploy to staging endpoint
        run: kubectl apply -f k8s/staging/

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Promote model to Production
        run: python src/promote_model.py --stage Production
      - name: Blue/Green deployment
        run: ./scripts/blue_green_deploy.sh
      - name: Smoke tests
        run: pytest tests/smoke/ -v

Automated Retraining Trigger

# src/check_retrain_trigger.py
import mlflow
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

def should_retrain(
    current_data,
    reference_data,
    performance_threshold=0.92,
    drift_threshold=0.3
) -> tuple[bool, str]:
    """Determine whether retraining is needed."""

    # 1. Performance-based trigger
    current_metrics = get_current_metrics()
    if current_metrics["accuracy"] < performance_threshold:
        return True, f"Performance degradation: accuracy={current_metrics['accuracy']:.3f}"

    # 2. Data drift trigger
    report = Report(metrics=[DataDriftPreset()])
    report.run(reference_data=reference_data, current_data=current_data)
    results = report.as_dict()
    drift_share = results["metrics"][0]["result"]["share_of_drifted_columns"]

    if drift_share > drift_threshold:
        return True, f"Data drift: {drift_share:.1%} of features drifted"

    return False, "Retraining not required"

Model Monitoring & Drift Detection

Data Drift vs Concept Drift

Data Drift: The statistical distribution of input features changes. P(X) changes but P(Y|X) remains stable. Examples: shift in user age distribution, changes in transaction amount distribution.

Concept Drift: The relationship between inputs and outputs changes. P(Y|X) changes. Examples: new fraud patterns emerge, user preferences shift.

Evidently Drift Monitoring

import pandas as pd
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently import ColumnMapping
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
    TargetDriftPreset,
    ClassificationPreset
)
from evidently.tests import (
    TestNumberOfDriftedColumns,
    TestShareOfDriftedColumns,
    TestColumnDrift
)

# Column mapping configuration
column_mapping = ColumnMapping(
    target="fraud_label",
    prediction="fraud_score",
    numerical_features=["amount", "transaction_count_7d", "avg_amount"],
    categorical_features=["merchant_category", "payment_method"]
)

# Comprehensive drift report
report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
    TargetDriftPreset(),
    ClassificationPreset()
])
report.run(
    reference_data=reference_df,
    current_data=production_df,
    column_mapping=column_mapping
)
report.save_html("monitoring/report.html")

# Alerting test suite
test_suite = TestSuite(tests=[
    TestNumberOfDriftedColumns(lt=3),
    TestShareOfDriftedColumns(lt=0.3),
    TestColumnDrift(column_name="amount"),
    TestColumnDrift(column_name="transaction_count_7d"),
])
test_suite.run(
    reference_data=reference_df,
    current_data=production_df
)

# Alert on test failures
results = test_suite.as_dict()
failed_tests = [t for t in results["tests"] if t["status"] == "FAIL"]
if failed_tests:
    send_alert(f"Monitoring alert: {len(failed_tests)} tests failed")

Prometheus + Grafana Metrics

# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge

prediction_counter = Counter(
    "model_predictions_total",
    "Total prediction count",
    ["model_version", "result"]
)
prediction_latency = Histogram(
    "model_prediction_latency_seconds",
    "Prediction latency in seconds",
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)
model_accuracy = Gauge(
    "model_accuracy_current",
    "Current model accuracy"
)

import time

def predict_with_monitoring(features, model_version="v2.1"):
    start_time = time.time()
    prediction = model.predict(features)
    latency = time.time() - start_time

    prediction_latency.observe(latency)
    prediction_counter.labels(
        model_version=model_version,
        result="fraud" if prediction[0] == 1 else "normal"
    ).inc()

    return prediction

LLMOps

LLMOps is the extension of MLOps for developing, deploying, and operating large language models.

LLM Pipeline Unique Challenges

  • Non-deterministic outputs: Same input may produce different outputs — complex to evaluate
  • Prompt sensitivity: Small changes cause large performance swings
  • High-cost fine-tuning: Requires significant GPU resources
  • Hallucination: Model generates factually incorrect information
  • Context length management: Handling long contexts efficiently

LangSmith for LLM Tracing

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langsmith import Client
import os

# LangSmith configuration
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "production-chatbot"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"

# LangChain chain (automatically traced in LangSmith)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
prompt = ChatPromptTemplate.from_template(
    "You are a helpful customer service agent.\n\nQuestion: {question}\n\nAnswer:"
)
chain = prompt | llm

# Invocation - auto-traced
response = chain.invoke({"question": "What is your refund policy?"})

# Evaluation with LangSmith client
langsmith_client = Client()

dataset = langsmith_client.create_dataset(
    dataset_name="customer-service-eval",
    description="Customer service chatbot evaluation dataset"
)

langsmith_client.create_examples(
    inputs=[{"question": "What is your refund policy?"}],
    outputs=[{"answer": "Refunds are available within 30 days of purchase."}],
    dataset_id=dataset.id
)

from langsmith.evaluation import evaluate, LangChainStringEvaluator

evaluators = [
    LangChainStringEvaluator("cot_qa"),
    LangChainStringEvaluator("labeled_criteria", config={"criteria": "correctness"})
]
results = evaluate(
    lambda x: chain.invoke(x),
    data=dataset.name,
    evaluators=evaluators,
    experiment_prefix="gpt4o-baseline"
)

Prompt Version Control

# prompt_registry.py
import mlflow
from dataclasses import dataclass
from typing import Optional

@dataclass
class PromptVersion:
    template: str
    version: str
    description: str
    metrics: Optional[dict] = None

class PromptRegistry:
    def __init__(self, mlflow_uri: str):
        mlflow.set_tracking_uri(mlflow_uri)
        self.experiment_name = "prompt-versions"
        mlflow.set_experiment(self.experiment_name)

    def register_prompt(self, prompt: PromptVersion) -> str:
        with mlflow.start_run(run_name=f"prompt-{prompt.version}") as run:
            mlflow.log_param("version", prompt.version)
            mlflow.log_param("description", prompt.description)
            mlflow.log_text(prompt.template, "prompt_template.txt")
            if prompt.metrics:
                mlflow.log_metrics(prompt.metrics)
            return run.info.run_id

    def get_prompt(self, version: str) -> str:
        client = mlflow.tracking.MlflowClient()
        runs = client.search_runs(
            experiment_ids=[mlflow.get_experiment_by_name(self.experiment_name).experiment_id],
            filter_string=f"params.version = '{version}'"
        )
        if not runs:
            raise ValueError(f"Prompt version {version} not found")
        artifact_uri = runs[0].info.artifact_uri
        return mlflow.artifacts.load_text(f"{artifact_uri}/prompt_template.txt")

# Usage
registry = PromptRegistry("http://mlflow-server:5000")
registry.register_prompt(PromptVersion(
    template="You are a {role}. {context}\n\nQuestion: {question}\nAnswer:",
    version="v1.2.0",
    description="Improved prompt with context injection",
    metrics={"accuracy": 0.87, "hallucination_rate": 0.03}
))

LLM Fine-tuning Pipeline

# fine_tuning_pipeline.py
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer
from peft import LoraConfig, get_peft_model, TaskType
import mlflow

def fine_tune_with_lora(
    base_model: str,
    dataset_path: str,
    output_dir: str,
    lora_r: int = 16,
    lora_alpha: int = 32
):
    mlflow.set_experiment("llm-fine-tuning")

    with mlflow.start_run():
        lora_config = LoraConfig(
            task_type=TaskType.CAUSAL_LM,
            r=lora_r,
            lora_alpha=lora_alpha,
            target_modules=["q_proj", "v_proj"],
            lora_dropout=0.05,
            bias="none"
        )
        mlflow.log_params({
            "base_model": base_model,
            "lora_r": lora_r,
            "lora_alpha": lora_alpha
        })

        model = AutoModelForCausalLM.from_pretrained(base_model)
        model = get_peft_model(model, lora_config)
        model.print_trainable_parameters()

        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=3,
            per_device_train_batch_size=4,
            gradient_accumulation_steps=4,
            learning_rate=2e-4,
            fp16=True,
            report_to="mlflow"
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
        )
        trainer.train()

        model.save_pretrained(output_dir)
        mlflow.transformers.log_model(
            transformers_model={"model": model, "tokenizer": tokenizer},
            artifact_path="fine-tuned-model",
            registered_model_name="customer-service-llm"
        )

Quiz

Q1. List and explain the 4 automated CT (Continuous Training) trigger conditions in MLOps Level 2.

Answer: Data trigger, performance trigger, drift trigger, schedule trigger

Explanation:

  1. Data trigger: Automatic retraining starts when new training data reaches a threshold (e.g., 100k records) or a new data batch arrives in the pipeline.
  2. Performance trigger: Fires when production model accuracy, F1-score, or other KPIs fall below a predefined threshold (e.g., accuracy below 0.92).
  3. Drift trigger: Fires when the ratio of drifted features detected by tools like Evidently exceeds a threshold (e.g., over 30% of features show drift).
  4. Schedule trigger: Periodic retraining based on business requirements (e.g., every Monday at 2 AM) to maintain data freshness.
Q2. Explain why online and offline stores are kept separate in a feature store.

Answer: To independently optimize for the different access patterns and performance requirements of training and inference.

Explanation:

  • The offline store (S3, BigQuery) serves model training. It must batch-scan millions of historical records, so throughput and cost-efficiency matter most. High latency (seconds to minutes) is acceptable.
  • The online store (Redis, DynamoDB) serves real-time inference. It must retrieve the latest feature values for a given entity (user ID, product ID) within milliseconds, so it is optimized for low-latency single-key lookups.
  • Without separation, large batch scans during training would interfere with real-time inference queries, or the cost would explode when trying to meet real-time requirements from a single system.
Q3. Explain the difference between data drift and concept drift, and how to detect each.

Answer: Data drift is a change in P(X); concept drift is a change in P(Y|X).

Explanation:

  • Data drift: The statistical distribution of input features changes. Detected using Kolmogorov-Smirnov tests, Population Stability Index (PSI), or JS Divergence — all without requiring labels. Evidently's DataDriftPreset is a popular tool.
  • Concept drift: The correct output for the same input changes over time. For example, a new type of fraud emerges that the existing model does not recognize. Requires actual labels and is detected via model performance degradation (accuracy, F1 decline). When labels are delayed, proxy metrics can be used.
Q4. Explain how DVC manages large ML data alongside Git.

Answer: DVC commits pointer (metadata) files to Git and stores actual data in remote storage.

Explanation: DVC does not store large files (datasets, models) directly in Git. Instead it creates .dvc metadata files containing the MD5 hash, size, and path of the actual data, and these pointer files are committed to Git. The actual data is uploaded to remote storage (S3, GCS, Azure Blob) with dvc push. Any environment can download the exact same version of the data with dvc pull. Because each Git commit is linked 1:1 with a DVC data version, full experiment reproducibility is guaranteed.

Q5. Describe the validation checklist before promoting a model from Staging to Production in MLflow Model Registry.

Answer: Performance validation, fairness validation, integration tests, latency testing, data schema compatibility check.

Explanation:

  1. Performance validation: Confirm that accuracy, F1, AUC, or other metrics on a holdout test set or recent production data are equal to or better than the current Production model.
  2. Fairness validation: Review per-slice metrics to ensure no performance bias across demographic groups or age cohorts.
  3. Integration tests: Verify end-to-end prediction works correctly in the actual serving environment (API, feature store connections).
  4. Latency testing: Run load tests to confirm mean response time and P99 latency meet the defined SLA.
  5. Schema compatibility: Confirm that the input feature schema and output format are compatible with the current serving infrastructure.