Skip to content
Published on

AI 플랫폼 모델 레지스트리와 A/B 배포 파이프라인 설계 2026

Authors
  • Name
    Twitter
AI 플랫폼 모델 레지스트리와 A/B 배포 파이프라인 설계 2026

개요

모델을 학습하는 것보다 학습된 모델을 안전하게 프로덕션에 배포하는 것이 더 어렵다. 학습 코드는 한 번 작성하면 반복 실행이 가능하지만, 배포는 매번 다른 조건에서 진행된다. 이전 버전과의 호환성, 트래픽 증가에 따른 리소스 할당, 새 모델의 성능 검증, 장애 시 즉각 롤백 -- 이 모든 것을 자동화된 파이프라인으로 처리해야 한다.

이 글에서는 두 가지 핵심 컴포넌트를 다룬다.

  1. 모델 레지스트리: 학습된 모델 artifact를 중앙에서 버전 관리하고, 메타데이터와 함께 승격(promotion) 워크플로를 제공하는 시스템
  2. A/B 배포 파이프라인: 새 모델과 기존 모델을 동시에 서빙하면서 트래픽을 분할하고, 실시간 메트릭을 기반으로 승격 또는 롤백을 결정하는 자동화 파이프라인

2026년 3월 기준으로 MLflow 2.19+, KServe 0.14+, W&B Registry, SageMaker Model Registry의 최신 기능을 반영한다. 기존 글에서 다룬 Kubeflow Pipeline 오케스트레이션이나 Feature Store 동기화와는 범위를 분리하여, 모델 레지스트리의 내부 설계와 배포 전략의 구체적 구현에 집중한다.

모델 레지스트리 핵심 기능

모델 레지스트리는 단순한 파일 저장소가 아니다. 프로덕션 ML 시스템에서 모델 레지스트리가 제공해야 하는 핵심 기능은 다음과 같다.

필수 기능 체크리스트

기능설명없으면 생기는 문제
버전 관리동일 모델의 여러 버전을 순차적으로 관리어떤 모델이 프로덕션에 있는지 추적 불가
메타데이터 태깅학습 데이터, 하이퍼파라미터, 평가 메트릭 연결모델 재현 불가, 디버깅 시 원인 추적 불가
Alias / Stage 관리champion, challenger, archived 같은 상태 표시배포 자동화 시 어떤 버전을 배포할지 판단 불가
Lineage 추적학습 Run, 데이터셋, 코드 버전까지 역추적규제 감사(audit) 대응 불가
접근 제어팀/역할별 읽기/쓰기/승격 권한 분리실수로 프로덕션 모델이 덮어써지는 사고
Artifact 저장소 연동S3, GCS, Azure Blob 등 외부 스토리지 지원대용량 모델(LLM) 관리 불가
API / SDK프로그래밍 방식으로 등록, 조회, 승격CI/CD 파이프라인 연동 불가

주요 모델 레지스트리 솔루션 비교

항목MLflow Model RegistryW&B RegistrySageMaker Model RegistryVertex AI Model Registry
배포 방식오픈소스 (자체 호스팅) 또는 DatabricksSaaS 또는 자체 호스팅AWS 관리형GCP 관리형
버전 관리자동 증분 번호 + Alias컬렉션 기반 + 버전 링크ModelPackageGroup 기반자동 버전 번호
Stage 관리Alias (@champion, @challenger)태그 기반Approval 상태 (Approved/Rejected/Pending)라벨 기반
LineageMLflow Run 연결W&B Run 연결Training Job + S3 URI 자동 캡처Pipeline Run 연결
크로스 팀 공유권한 설정 필요Organization 내 공유크로스 계정 리소스 정책IAM 기반
LLM 지원MLflow 2.x transformers flavorArtifact 버전 관리JumpStart 모델 + 자체 모델Model Garden + 자체 모델
CI/CD 연동REST API + Python SDKPython SDK + webhookCodePipeline 네이티브Cloud Build 연동
비용무료 (인프라 비용만)무료 티어 + 유료사용량 기반사용량 기반

MLflow Model Registry 구현

MLflow Model Registry는 2026년 현재 가장 널리 사용되는 오픈소스 모델 레지스트리다. MLflow 3.x에서 도입된 Alias 기반 모델 관리는 기존 Stage(Staging/Production/Archived) 방식을 대체하여, 하나의 모델 버전에 여러 Alias를 할당할 수 있는 유연한 워크플로를 제공한다.

모델 등록과 Alias 설정

"""
MLflow Model Registry: 모델 등록, 버전 관리, Alias 설정
MLflow 2.19+ 기준
"""
import mlflow
from mlflow.tracking import MlflowClient

# MLflow Tracking Server 연결
mlflow.set_tracking_uri("http://mlflow.internal.example.com:5000")
client = MlflowClient()

# 1. 학습 실행과 모델 로깅
with mlflow.start_run(run_name="fraud-detection-v3") as run:
    # 학습 메트릭 기록
    mlflow.log_params({
        "model_type": "xgboost",
        "n_estimators": 500,
        "max_depth": 8,
        "learning_rate": 0.05,
        "training_data_version": "2026-03-01",
        "feature_count": 47,
    })
    mlflow.log_metrics({
        "auc_roc": 0.9847,
        "precision_at_95_recall": 0.912,
        "f1_score": 0.943,
        "latency_p99_ms": 12.3,
    })

    # 모델 artifact 로깅 + 레지스트리 등록을 한 번에 수행
    model_info = mlflow.xgboost.log_model(
        xgb_model=trained_model,
        artifact_path="model",
        registered_model_name="fraud-detection",
        input_example=sample_input,
        signature=mlflow.models.infer_signature(X_test, y_pred),
    )
    print(f"Model URI: {model_info.model_uri}")
    # 출력: models:/fraud-detection/3

# 2. 등록된 모델 버전에 Alias 할당
# 새 모델을 challenger로 지정
client.set_registered_model_alias(
    name="fraud-detection",
    alias="challenger",
    version=3,
)

# 3. 검증 완료 후 champion으로 승격
def promote_to_champion(model_name: str, version: int):
    """모델을 champion으로 승격하고 이전 champion에 대한 정보를 기록한다."""
    # 현재 champion 확인
    try:
        current_champion = client.get_model_version_by_alias(
            name=model_name, alias="champion"
        )
        # 이전 champion에 archived 태그 추가
        client.set_model_version_tag(
            name=model_name,
            version=current_champion.version,
            key="previously_champion",
            value="true",
        )
        print(f"Previous champion: v{current_champion.version}")
    except mlflow.exceptions.MlflowException:
        print("No existing champion found")

    # 새 champion 지정
    client.set_registered_model_alias(
        name=model_name, alias="champion", version=version
    )
    # challenger alias 제거
    client.delete_registered_model_alias(
        name=model_name, alias="challenger"
    )
    print(f"Promoted v{version} to champion")

promote_to_champion("fraud-detection", version=3)

모델 로딩과 서빙 연동

"""
Alias 기반으로 모델을 로드하여 서빙에 활용하는 패턴.
배포 파이프라인에서 champion 모델 URI를 자동으로 resolve한다.
"""
import mlflow

# champion 모델 로드 (Alias 기반)
champion_model = mlflow.pyfunc.load_model("models:/fraud-detection@champion")

# 특정 버전 로드 (디버깅/비교용)
v2_model = mlflow.pyfunc.load_model("models:/fraud-detection/2")

# 모델 URI를 KServe storageUri로 변환하는 유틸리티
def resolve_model_storage_uri(model_name: str, alias: str) -> str:
    """MLflow Alias에서 실제 artifact storage URI를 추출한다."""
    client = MlflowClient()
    mv = client.get_model_version_by_alias(name=model_name, alias=alias)
    run = client.get_run(mv.run_id)
    artifact_uri = run.info.artifact_uri
    # 예: s3://ml-artifacts/12/abc123/artifacts/model
    return f"{artifact_uri}/model"

storage_uri = resolve_model_storage_uri("fraud-detection", "champion")
print(f"KServe storageUri: {storage_uri}")
# 출력: s3://ml-artifacts/12/abc123/artifacts/model

모델 버전 관리 전략

모델 버전 관리는 단순히 숫자를 증가시키는 것이 아니다. 프로덕션 환경에서 안전한 모델 교체를 보장하기 위해 체계적인 전략이 필요하다.

버전 관리 원칙

1. 불변성(Immutability) 보장: 한 번 등록된 모델 버전의 artifact는 절대 수정하지 않는다. 수정이 필요하면 새 버전을 등록한다. 이 원칙이 깨지면 "어제까지 잘 되던 모델이 갑자기 다른 결과를 낸다"는 유령 같은 버그가 발생한다.

2. 메타데이터 필수 항목 강제: 모델 등록 시 반드시 포함해야 하는 메타데이터를 정의하고 자동 검증한다.

"""
모델 등록 시 필수 메타데이터를 검증하는 게이트키퍼.
CI/CD 파이프라인의 등록 단계에 삽입하여 불완전한 모델이
레지스트리에 들어가는 것을 방지한다.
"""
from dataclasses import dataclass
from typing import Optional

REQUIRED_TAGS = [
    "training_data_version",
    "feature_schema_hash",
    "evaluation_dataset",
    "code_commit_sha",
    "owner_team",
]

REQUIRED_METRICS = [
    "auc_roc",
    "latency_p99_ms",
]

@dataclass
class RegistrationValidation:
    is_valid: bool
    missing_tags: list[str]
    missing_metrics: list[str]
    message: str

def validate_before_registration(
    run_id: str,
    min_auc: float = 0.95,
    max_latency_ms: float = 50.0,
) -> RegistrationValidation:
    """모델 등록 전 필수 조건을 검증한다."""
    client = MlflowClient()
    run = client.get_run(run_id)

    # 필수 태그 확인
    existing_tags = set(run.data.tags.keys())
    missing_tags = [t for t in REQUIRED_TAGS if t not in existing_tags]

    # 필수 메트릭 확인
    existing_metrics = set(run.data.metrics.keys())
    missing_metrics = [m for m in REQUIRED_METRICS if m not in existing_metrics]

    # 성능 기준선 확인
    threshold_failures = []
    if "auc_roc" in existing_metrics:
        auc = run.data.metrics["auc_roc"]
        if auc < min_auc:
            threshold_failures.append(
                f"auc_roc={auc:.4f} < min={min_auc}"
            )
    if "latency_p99_ms" in existing_metrics:
        latency = run.data.metrics["latency_p99_ms"]
        if latency > max_latency_ms:
            threshold_failures.append(
                f"latency_p99_ms={latency:.1f} > max={max_latency_ms}"
            )

    is_valid = (
        len(missing_tags) == 0
        and len(missing_metrics) == 0
        and len(threshold_failures) == 0
    )

    messages = []
    if missing_tags:
        messages.append(f"Missing tags: {missing_tags}")
    if missing_metrics:
        messages.append(f"Missing metrics: {missing_metrics}")
    if threshold_failures:
        messages.append(f"Threshold failures: {threshold_failures}")

    return RegistrationValidation(
        is_valid=is_valid,
        missing_tags=missing_tags,
        missing_metrics=missing_metrics,
        message=" | ".join(messages) if messages else "All checks passed",
    )

3. Alias 기반 배포 계약: 배포 파이프라인은 모델 버전 번호가 아니라 Alias를 참조한다. @champion은 현재 프로덕션 모델, @challenger는 A/B 테스트 대상 모델, @shadow는 섀도우 모드로 추론만 수행하는 모델이다.

모델 Artifact 저장소 옵션 비교

저장소장점단점권장 시나리오
S3 / GCSKServe 네이티브 지원, 저렴한 비용버전 관리를 별도 구현 필요대부분의 프로덕션 환경
MinIOS3 호환 API, 온프레미스 가능자체 운영 부담온프레미스/에어갭 환경
MLflow Artifact Store레지스트리와 자동 연동대용량 LLM에는 부적합할 수 있음MLflow 중심 스택
OCI Registry컨테이너 이미지와 동일한 방식으로 관리모델 전용 도구 대비 메타데이터 부족쿠버네티스 네이티브 환경
HuggingFace HubLLM 생태계 지원, Git LFS 기반프라이빗 호스팅 비용LLM/Foundation 모델 관리

A/B 배포 파이프라인 설계

A/B 배포는 단순히 트래픽을 나누는 것이 아니다. "새 모델이 기존 모델보다 나은가?"라는 질문에 통계적으로 유의미한 답을 얻기 위한 실험 설계다.

배포 전략 비교

전략트래픽 분할동시 실행 버전롤백 속도주요 용도
카나리(Canary)점진적 증가 (2% -> 10% -> 50% -> 100%)2개즉시 (트래픽 0%로 전환)안전한 점진적 릴리스
블루-그린(Blue-Green)전체 전환 (0% -> 100%)2개 (대기 상태 유지)즉시 (DNS/라우팅 전환)빠른 전체 전환과 즉각 롤백
A/B 테스트고정 비율 (예: 50/50)2개 이상실험 종료 후 전환비즈니스 메트릭 기반 비교
섀도우(Shadow)100% 복제 (응답 미반환)2개해당 없음 (사용자 노출 없음)신규 모델 검증, 레이턴시 측정
멀티암드 밴딧(MAB)동적 조정 (성능 기반)2개 이상자동 (성능 저하 시 비율 감소)탐색-활용 최적화

파이프라인 전체 흐름

프로덕션 A/B 배포 파이프라인의 전체 흐름을 정리하면 다음과 같다.

1. 모델 등록 (MLflow Registry)
   └─ 필수 메타데이터 검증 통과
   └─ @challenger alias 할당

2. 사전 검증 (Pre-deployment Validation)
   └─ 오프라인 평가 데이터셋으로 정확도 확인
   └─ 추론 레이턴시 벤치마크 (p50, p95, p99)
   └─ 입출력 스키마 호환성 검증

3. 카나리 배포 시작
   └─ KServe InferenceService 업데이트
   └─ canaryTrafficPercent: 5 설정
   └─ 모니터링 대시보드 활성화

4. 점진적 트래픽 증가
   └─ 5% -> 10% -> 25% -> 50% -> 100%
   └─ 각 단계에서 메트릭 자동 비교
   └─ 이상 감지 시 자동 롤백

5. 승격 또는 롤백
   └─ 성공: @champion alias 전환, 이전 버전 @archived
   └─ 실패: canaryTrafficPercent: 0, 원인 분석 태그 기록

KServe 카나리 배포

KServe는 2025년 11월 CNCF Incubating 프로젝트로 승격되었으며, InferenceService CRD를 통한 선언적 카나리 배포를 제공한다. v1beta1부터는 별도의 canary spec 없이 canaryTrafficPercent 필드만으로 카나리 롤아웃이 가능하다.

InferenceService 카나리 배포 YAML

# kserve-canary-deploy.yaml
# KServe InferenceService: 카나리 배포 설정
# 새 모델에 10%의 트래픽을 할당하고 점진적으로 증가시킨다.
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: fraud-detection
  namespace: ml-serving
  annotations:
    # 최소 스케일 설정 (콜드 스타트 방지)
    autoscaling.knative.dev/min-scale: '2'
    # 배포 메타데이터
    mlflow.model.version: '3'
    mlflow.model.alias: 'challenger'
    deploy.pipeline.run-id: 'deploy-20260304-001'
spec:
  predictor:
    # 카나리 트래픽 비율 (0-100)
    canaryTrafficPercent: 10
    # 새 모델 버전 (카나리)
    model:
      modelFormat:
        name: xgboost
      runtime: kserve-xgbserver
      storageUri: 's3://ml-artifacts/fraud-detection/v3/model'
      resources:
        requests:
          cpu: '2'
          memory: '4Gi'
        limits:
          cpu: '4'
          memory: '8Gi'
    # Autoscaler 설정
    scaleTarget: 10
    scaleMetric: concurrency
  # 트래픽 라우팅 설정 (선택)
  transformer:
    containers:
      - name: feature-enricher
        image: registry.example.com/ml/feature-enricher:v2.1
        resources:
          requests:
            cpu: '1'
            memory: '2Gi'

트래픽 점진적 증가 스크립트

"""
KServe 카나리 트래픽을 단계적으로 증가시키는 자동화 스크립트.
각 단계에서 핵심 메트릭을 확인하고, 기준을 미달하면 즉시 롤백한다.
"""
import time
import subprocess
import json
from dataclasses import dataclass

@dataclass
class CanaryStage:
    traffic_percent: int
    observation_minutes: int
    max_error_rate: float
    max_latency_p99_ms: float

CANARY_STAGES = [
    CanaryStage(traffic_percent=5,   observation_minutes=10, max_error_rate=0.01, max_latency_p99_ms=50),
    CanaryStage(traffic_percent=10,  observation_minutes=15, max_error_rate=0.01, max_latency_p99_ms=50),
    CanaryStage(traffic_percent=25,  observation_minutes=20, max_error_rate=0.005, max_latency_p99_ms=45),
    CanaryStage(traffic_percent=50,  observation_minutes=30, max_error_rate=0.005, max_latency_p99_ms=45),
    CanaryStage(traffic_percent=100, observation_minutes=0,  max_error_rate=0.005, max_latency_p99_ms=45),
]

def update_canary_traffic(
    isvc_name: str,
    namespace: str,
    traffic_percent: int,
) -> bool:
    """KServe InferenceService의 canaryTrafficPercent를 업데이트한다."""
    patch = {
        "spec": {
            "predictor": {
                "canaryTrafficPercent": traffic_percent
            }
        }
    }
    cmd = [
        "kubectl", "patch", "inferenceservice", isvc_name,
        "-n", namespace,
        "--type=merge",
        f"-p={json.dumps(patch)}",
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"Failed to patch: {result.stderr}")
        return False
    print(f"Updated canaryTrafficPercent to {traffic_percent}%")
    return True

def check_canary_health(
    isvc_name: str,
    namespace: str,
    stage: CanaryStage,
    prometheus_url: str = "http://prometheus.monitoring:9090",
) -> bool:
    """Prometheus에서 카나리 메트릭을 확인한다."""
    import requests

    # 에러율 확인
    error_query = (
        f'sum(rate(revision_request_count'
        f'{{service_name="{isvc_name}",response_code!="200",'
        f'namespace="{namespace}"}}[5m])) / '
        f'sum(rate(revision_request_count'
        f'{{service_name="{isvc_name}",'
        f'namespace="{namespace}"}}[5m]))'
    )
    resp = requests.get(
        f"{prometheus_url}/api/v1/query",
        params={"query": error_query},
    )
    data = resp.json()
    if data["data"]["result"]:
        error_rate = float(data["data"]["result"][0]["value"][1])
        if error_rate > stage.max_error_rate:
            print(f"Error rate {error_rate:.4f} exceeds {stage.max_error_rate}")
            return False

    # p99 레이턴시 확인
    latency_query = (
        f'histogram_quantile(0.99, '
        f'sum(rate(revision_request_latencies_bucket'
        f'{{service_name="{isvc_name}",'
        f'namespace="{namespace}"}}[5m])) by (le))'
    )
    resp = requests.get(
        f"{prometheus_url}/api/v1/query",
        params={"query": latency_query},
    )
    data = resp.json()
    if data["data"]["result"]:
        latency_ms = float(data["data"]["result"][0]["value"][1])
        if latency_ms > stage.max_latency_p99_ms:
            print(f"p99 latency {latency_ms:.1f}ms exceeds {stage.max_latency_p99_ms}ms")
            return False

    return True

def run_canary_rollout(isvc_name: str, namespace: str):
    """카나리 롤아웃을 단계별로 실행한다."""
    for i, stage in enumerate(CANARY_STAGES):
        print(f"\n--- Stage {i+1}/{len(CANARY_STAGES)}: "
              f"{stage.traffic_percent}% traffic ---")

        if not update_canary_traffic(isvc_name, namespace, stage.traffic_percent):
            print("ABORT: Failed to update traffic")
            rollback(isvc_name, namespace)
            return False

        if stage.observation_minutes > 0:
            print(f"Observing for {stage.observation_minutes} minutes...")
            # 관찰 기간 동안 주기적으로 메트릭 확인
            checks = stage.observation_minutes // 2  # 2분마다 확인
            for check in range(checks):
                time.sleep(120)  # 2분 대기
                if not check_canary_health(isvc_name, namespace, stage):
                    print(f"ROLLBACK: Health check failed at stage {i+1}")
                    rollback(isvc_name, namespace)
                    return False
                print(f"  Check {check+1}/{checks}: OK")

    print("\nCanary rollout completed successfully!")
    return True

def rollback(isvc_name: str, namespace: str):
    """카나리 트래픽을 0%로 설정하여 즉시 롤백한다."""
    update_canary_traffic(isvc_name, namespace, 0)
    print("Rolled back to previous stable version")

트래픽 분할과 라우팅

카나리 배포 외에도 다양한 트래픽 분할 패턴이 필요하다. 특정 사용자 그룹에만 새 모델을 노출하거나, 지역별로 다른 모델을 서빙하는 등의 시나리오를 다룬다.

Istio VirtualService 기반 A/B 트래픽 라우팅

KServe는 내부적으로 Istio 또는 Kourier를 사용하여 트래픽을 라우팅한다. 더 정교한 A/B 라우팅이 필요하면 Istio VirtualService를 직접 구성한다.

# istio-ab-routing.yaml
# 헤더 기반 A/B 트래픽 라우팅
# x-model-variant 헤더에 따라 다른 모델 버전으로 라우팅한다.
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: fraud-detection-ab
  namespace: ml-serving
spec:
  hosts:
    - fraud-detection.ml-serving.svc.cluster.local
  http:
    # 규칙 1: 명시적 variant B 요청 -> 새 모델
    - match:
        - headers:
            x-model-variant:
              exact: 'B'
      route:
        - destination:
            host: fraud-detection-predictor.ml-serving.svc.cluster.local
            port:
              number: 80
          headers:
            request:
              set:
                x-model-version: 'v3'
          weight: 100
    # 규칙 2: 특정 사용자 세그먼트 -> 새 모델 (쿠키 기반)
    - match:
        - headers:
            cookie:
              regex: '.*ab_group=treatment.*'
      route:
        - destination:
            host: fraud-detection-predictor.ml-serving.svc.cluster.local
            port:
              number: 80
          weight: 100
    # 규칙 3: 기본 트래픽 -> 가중치 기반 분할
    - route:
        - destination:
            host: fraud-detection-predictor.ml-serving.svc.cluster.local
            subset: stable
          weight: 90
        - destination:
            host: fraud-detection-predictor.ml-serving.svc.cluster.local
            subset: canary
          weight: 10
---
# DestinationRule로 서브셋 정의
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: fraud-detection-subsets
  namespace: ml-serving
spec:
  host: fraud-detection-predictor.ml-serving.svc.cluster.local
  subsets:
    - name: stable
      labels:
        model-version: 'v2'
    - name: canary
      labels:
        model-version: 'v3'

트래픽 라우팅 결정 기준

A/B 테스트에서 트래픽을 어떤 기준으로 분할할지는 실험 설계의 핵심이다. 아래 기준들을 조합하여 사용한다.

분할 기준구현 방식장점주의사항
무작위 비율Istio weight 기반구현 단순, 통계적 무작위성 보장동일 사용자가 매 요청마다 다른 모델을 만날 수 있음
사용자 ID 해시헤더/쿠키 기반 라우팅동일 사용자에게 일관된 경험 제공해시 분포 편향 가능성
지역/국가GeoIP 헤더 기반지역별 모델 성능 차이 검증 가능지역별 트래픽 불균형
디바이스 타입User-Agent 파싱디바이스별 레이턴시 최적화 검증파싱 정확도에 의존
피처 플래그LaunchDarkly/Unleash 연동실시간 분할 비율 변경 가능외부 의존성 추가

모니터링과 자동 롤백

A/B 배포의 성패는 모니터링에 달려 있다. 배포 후 모델의 성능을 실시간으로 추적하고, 이상 징후가 감지되면 사람의 개입 없이 자동으로 롤백해야 한다.

모니터링해야 할 핵심 메트릭

인프라 메트릭:

  • 추론 레이턴시 (p50, p95, p99)
  • 초당 요청 수 (RPS)
  • 에러율 (HTTP 5xx 비율)
  • GPU/CPU 사용률
  • 메모리 사용량

모델 메트릭:

  • 예측 분포 변화 (PSI: Population Stability Index)
  • 입력 피처 드리프트 (KL Divergence, Kolmogorov-Smirnov)
  • 비즈니스 KPI (전환율, 클릭률, 이탈률)
  • 모델 신뢰도 분포 변화

Prometheus + Grafana 모니터링 스크립트

"""
A/B 배포 모니터링 및 자동 롤백 컨트롤러.
Prometheus에서 메트릭을 수집하고, 기준 위반 시 자동 롤백을 트리거한다.
"""
import time
import requests
import logging
from dataclasses import dataclass, field
from enum import Enum

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ab-monitor")

class RollbackReason(Enum):
    ERROR_RATE = "error_rate_exceeded"
    LATENCY = "latency_exceeded"
    PREDICTION_DRIFT = "prediction_distribution_drift"
    BUSINESS_KPI = "business_kpi_degradation"

@dataclass
class MonitorConfig:
    prometheus_url: str = "http://prometheus.monitoring:9090"
    isvc_name: str = "fraud-detection"
    namespace: str = "ml-serving"
    check_interval_sec: int = 60
    # 임계값 설정
    max_error_rate: float = 0.01
    max_latency_p99_ms: float = 50.0
    max_psi: float = 0.2  # PSI > 0.2이면 유의미한 분포 변화
    # 연속 실패 횟수 기준
    consecutive_failures_to_rollback: int = 3
    # 알림 설정
    slack_webhook: str = ""
    pagerduty_key: str = ""

@dataclass
class MonitorState:
    consecutive_failures: int = 0
    total_checks: int = 0
    failure_reasons: list = field(default_factory=list)

def query_prometheus(config: MonitorConfig, query: str) -> float | None:
    """Prometheus 쿼리를 실행하고 단일 값을 반환한다."""
    try:
        resp = requests.get(
            f"{config.prometheus_url}/api/v1/query",
            params={"query": query},
            timeout=10,
        )
        data = resp.json()
        if data["status"] == "success" and data["data"]["result"]:
            return float(data["data"]["result"][0]["value"][1])
    except Exception as e:
        logger.error(f"Prometheus query failed: {e}")
    return None

def check_model_health(config: MonitorConfig) -> tuple[bool, list[RollbackReason]]:
    """모델의 건강 상태를 종합적으로 확인한다."""
    reasons = []

    # 1. 에러율 확인
    error_rate = query_prometheus(config, (
        f'sum(rate(revision_request_count'
        f'{{service_name="{config.isvc_name}",'
        f'response_code=~"5..",namespace="{config.namespace}"}}[5m]))'
        f' / sum(rate(revision_request_count'
        f'{{service_name="{config.isvc_name}",'
        f'namespace="{config.namespace}"}}[5m]))'
    ))
    if error_rate is not None and error_rate > config.max_error_rate:
        logger.warning(f"Error rate: {error_rate:.4f} > {config.max_error_rate}")
        reasons.append(RollbackReason.ERROR_RATE)

    # 2. p99 레이턴시 확인
    latency = query_prometheus(config, (
        f'histogram_quantile(0.99, sum(rate('
        f'revision_request_latencies_bucket'
        f'{{service_name="{config.isvc_name}",'
        f'namespace="{config.namespace}"}}[5m])) by (le))'
    ))
    if latency is not None and latency > config.max_latency_p99_ms:
        logger.warning(f"p99 latency: {latency:.1f}ms > {config.max_latency_p99_ms}ms")
        reasons.append(RollbackReason.LATENCY)

    # 3. 예측 분포 변화 (PSI) 확인
    psi = query_prometheus(config, (
        f'ml_prediction_psi'
        f'{{model="{config.isvc_name}",namespace="{config.namespace}"}}'
    ))
    if psi is not None and psi > config.max_psi:
        logger.warning(f"PSI: {psi:.4f} > {config.max_psi}")
        reasons.append(RollbackReason.PREDICTION_DRIFT)

    is_healthy = len(reasons) == 0
    return is_healthy, reasons

def send_alert(config: MonitorConfig, message: str):
    """Slack과 PagerDuty로 알림을 전송한다."""
    if config.slack_webhook:
        requests.post(config.slack_webhook, json={
            "text": f":rotating_light: [AB Deploy Monitor] {message}",
            "channel": "#ml-alerts",
        })
    logger.info(f"Alert: {message}")

def trigger_rollback(config: MonitorConfig, reasons: list[RollbackReason]):
    """자동 롤백을 실행한다."""
    import subprocess, json
    reason_str = ", ".join([r.value for r in reasons])
    logger.critical(f"Triggering rollback. Reasons: {reason_str}")

    # KServe canaryTrafficPercent를 0으로 패치
    patch = json.dumps({
        "spec": {"predictor": {"canaryTrafficPercent": 0}}
    })
    subprocess.run([
        "kubectl", "patch", "inferenceservice", config.isvc_name,
        "-n", config.namespace,
        "--type=merge", f"-p={patch}",
    ], check=True)

    send_alert(config, f"Auto-rollback executed for {config.isvc_name}. Reasons: {reason_str}")

def run_monitor(config: MonitorConfig):
    """모니터링 루프를 실행한다."""
    state = MonitorState()
    logger.info(f"Starting AB deploy monitor for {config.isvc_name}")

    while True:
        is_healthy, reasons = check_model_health(config)
        state.total_checks += 1

        if is_healthy:
            state.consecutive_failures = 0
            state.failure_reasons.clear()
            if state.total_checks % 10 == 0:
                logger.info(f"Check #{state.total_checks}: Healthy")
        else:
            state.consecutive_failures += 1
            state.failure_reasons.extend(reasons)
            logger.warning(
                f"Check #{state.total_checks}: Unhealthy "
                f"({state.consecutive_failures}/{config.consecutive_failures_to_rollback})"
            )

            if state.consecutive_failures >= config.consecutive_failures_to_rollback:
                trigger_rollback(config, reasons)
                break

        time.sleep(config.check_interval_sec)

# 실행 예시
if __name__ == "__main__":
    config = MonitorConfig(
        isvc_name="fraud-detection",
        namespace="ml-serving",
        slack_webhook="https://hooks.slack.com/services/xxx/yyy/zzz",
    )
    run_monitor(config)

모델 승격 파이프라인 자동화

카나리 배포가 성공하면 새 모델을 공식 champion으로 승격하는 파이프라인이 필요하다. 이 과정에서 MLflow 레지스트리 업데이트, KServe 설정 정리, 알림 전송을 한 번에 처리한다.

"""
모델 승격 파이프라인: 카나리 배포 성공 후 champion 전환 자동화.
MLflow Alias 업데이트 + KServe 카나리 정리 + 알림을 처리한다.
"""
import mlflow
from mlflow.tracking import MlflowClient
import subprocess
import json
from datetime import datetime

def promote_model_pipeline(
    model_name: str,
    new_version: int,
    isvc_name: str,
    namespace: str,
    ab_test_results: dict | None = None,
):
    """
    모델 승격 전체 파이프라인.

    Args:
        model_name: MLflow 등록 모델 이름
        new_version: 승격할 모델 버전 번호
        isvc_name: KServe InferenceService 이름
        namespace: 쿠버네티스 네임스페이스
        ab_test_results: A/B 테스트 결과 메트릭 딕셔너리
    """
    client = MlflowClient()
    timestamp = datetime.utcnow().isoformat()

    # Step 1: 현재 champion 정보 백업
    try:
        old_champion = client.get_model_version_by_alias(
            name=model_name, alias="champion"
        )
        old_version = int(old_champion.version)
        print(f"Current champion: v{old_version}")

        # 이전 champion에 메타데이터 기록
        client.set_model_version_tag(
            name=model_name, version=old_version,
            key="demoted_at", value=timestamp,
        )
        client.set_model_version_tag(
            name=model_name, version=old_version,
            key="demoted_by", value=f"v{new_version}",
        )
        # 이전 champion을 archived로 이동
        client.set_registered_model_alias(
            name=model_name, alias="archived", version=old_version,
        )
    except mlflow.exceptions.MlflowException:
        old_version = None
        print("No existing champion found")

    # Step 2: 새 모델을 champion으로 승격
    client.set_registered_model_alias(
        name=model_name, alias="champion", version=new_version,
    )
    # challenger alias 제거
    try:
        client.delete_registered_model_alias(
            name=model_name, alias="challenger",
        )
    except mlflow.exceptions.MlflowException:
        pass

    # 승격 메타데이터 기록
    client.set_model_version_tag(
        name=model_name, version=new_version,
        key="promoted_to_champion_at", value=timestamp,
    )
    if ab_test_results:
        for k, v in ab_test_results.items():
            client.set_model_version_tag(
                name=model_name, version=new_version,
                key=f"ab_result_{k}", value=str(v),
            )

    # Step 3: KServe에서 카나리 설정 정리
    # canaryTrafficPercent를 제거하여 새 모델이 100% 트래픽을 받도록 한다
    patch = json.dumps({
        "spec": {
            "predictor": {
                "canaryTrafficPercent": 100
            }
        }
    })
    subprocess.run([
        "kubectl", "patch", "inferenceservice", isvc_name,
        "-n", namespace,
        "--type=merge", f"-p={patch}",
    ], check=True)

    print(f"Promotion complete: {model_name} v{new_version} is now champion")

    return {
        "model_name": model_name,
        "new_champion_version": new_version,
        "old_champion_version": old_version,
        "promoted_at": timestamp,
    }

# 실행 예시
result = promote_model_pipeline(
    model_name="fraud-detection",
    new_version=3,
    isvc_name="fraud-detection",
    namespace="ml-serving",
    ab_test_results={
        "auc_improvement": 0.012,
        "latency_p99_delta_ms": -2.1,
        "error_rate_delta": -0.003,
        "test_duration_hours": 24,
        "traffic_percentage": 50,
    },
)

트러블슈팅

문제 1: 카나리 배포 후 트래픽이 분할되지 않는다

증상: canaryTrafficPercent를 설정했지만 모든 트래픽이 이전 모델로만 간다.

원인과 해결:

  1. KServe 버전 확인: v1beta1 이전 버전에서는 canary spec과 default spec을 별도로 정의해야 한다. kubectl get inferenceservice -o yaml로 실제 적용된 spec을 확인한다.

  2. Knative Revision 상태 확인: 새 모델의 Knative Revision이 Ready 상태인지 확인한다.

    • kubectl get revision -n ml-serving 실행
    • READY 컬럼이 True인지 확인
    • False라면 kubectl describe revision <revision-name>으로 원인 파악
  3. storageUri 접근 권한: 새 모델의 S3/GCS 경로에 대한 접근 권한이 없으면 Revision이 실패하고 트래픽이 분할되지 않는다. ServiceAccount에 적절한 IAM 역할이 바인딩되어 있는지 확인한다.

문제 2: MLflow Alias 업데이트 후 서빙 모델이 바뀌지 않는다

증상: MLflow에서 @champion alias를 새 버전으로 변경했지만 KServe가 여전히 이전 모델을 서빙한다.

원인: MLflow alias 변경은 레지스트리 메타데이터만 업데이트한다. KServe InferenceService의 storageUri는 자동으로 변경되지 않는다.

해결: 승격 파이프라인에서 MLflow alias 변경과 KServe InferenceService 패치를 반드시 함께 수행해야 한다. 위의 promote_model_pipeline 함수처럼 두 작업을 하나의 트랜잭션처럼 묶는다.

문제 3: 카나리 모델의 레이턴시가 비정상적으로 높다

증상: 동일 모델인데도 카나리 리비전의 p99 레이턴시가 stable 리비전보다 2-3배 높다.

원인:

  • 카나리 리비전의 Pod 수가 적어 콜드 스타트가 자주 발생한다.
  • 카나리 트래픽이 적으면 autoscaler가 스케일 다운하고, 다음 요청에서 콜드 스타트가 발생하는 악순환이 반복된다.

해결: autoscaling.knative.dev/min-scale 어노테이션을 최소 1 이상으로 설정하여 항상 하나의 Pod이 warm 상태를 유지하게 한다.

문제 4: A/B 테스트 결과의 통계적 유의성 부족

증상: 2주간 A/B 테스트를 진행했지만 두 모델 간의 성능 차이가 통계적으로 유의하지 않다.

원인: 트래픽이 충분하지 않거나 효과 크기(effect size)가 작아서 필요한 샘플 수에 도달하지 못했다.

해결: 테스트 시작 전에 필요한 최소 샘플 수를 계산한다. 일반적인 공식은 다음과 같다.

  • 기대 효과 크기(delta), 유의 수준(alpha = 0.05), 검정력(power = 0.8)을 기반으로 샘플 수 산출
  • 일일 트래픽과 비교하여 최소 테스트 기간을 사전에 결정
  • 트래픽이 부족하면 카나리 비율을 높이거나 테스트 기간을 연장

운영 체크리스트

배포 전 (Pre-deployment)

  • 모델 등록 시 필수 메타데이터(학습 데이터 버전, 피처 스키마 해시, 코드 커밋 SHA) 검증 통과 여부 확인
  • 오프라인 평가 데이터셋에서의 성능이 기존 champion 대비 동등 이상인지 확인
  • 입력 스키마와 출력 스키마가 현재 프로덕션 API 계약과 호환되는지 확인
  • 추론 레이턴시 벤치마크가 SLA 내에 있는지 확인 (p99 기준)
  • 모델 artifact 크기와 메모리 요구사항 확인 (리소스 요청/제한 설정)

배포 중 (During deployment)

  • 카나리 트래픽 비율이 의도한 값으로 설정되었는지 확인
  • 새 리비전의 Pod이 정상 기동되었는지 확인 (Ready 상태)
  • Prometheus/Grafana 대시보드에서 에러율, 레이턴시, RPS 실시간 확인
  • 자동 롤백 모니터가 정상 동작 중인지 확인
  • 이전 리비전이 충분한 리소스를 유지하고 있는지 확인 (스케일 다운 방지)

배포 후 (Post-deployment)

  • A/B 테스트 결과를 MLflow 모델 버전 태그로 기록
  • champion alias를 새 모델로 전환하고, 이전 champion을 archived로 이동
  • 이전 리비전의 리소스를 점진적으로 회수 (즉시 삭제하지 않고 1-2일 유지)
  • 배포 결과를 팀 채널에 공유 (모델 버전, 성능 변화, 테스트 기간)
  • 이번 배포에서 개선할 점을 레트로 문서에 기록

실패 사례

사례 1: Alias 동기화 누락으로 인한 모델 불일치

한 팀에서 MLflow Model Registry의 @champion alias를 v5로 업데이트했지만, KServe InferenceService의 storageUri는 여전히 v4를 가리키고 있었다. MLflow 대시보드에서는 v5가 champion으로 표시되었고, 실제 서빙은 v4가 처리하는 상태가 2주간 지속되었다. 문제는 A/B 테스트 결과 분석에서 발견되었는데, "champion 모델"의 성능 수치가 MLflow에 기록된 v5의 오프라인 평가 결과와 일치하지 않았다.

교훈: MLflow alias 변경과 KServe 배포를 하나의 파이프라인으로 묶어야 한다. 두 작업 중 하나가 실패하면 전체를 롤백하는 트랜잭션 로직이 필요하다.

사례 2: 콜드 스타트로 인한 카나리 오판

새 모델을 5% 카나리로 배포한 후 자동 모니터링이 p99 레이턴시 초과를 감지하여 롤백을 실행했다. 원인을 분석해 보니 모델 성능 자체에는 문제가 없었고, 카나리 리비전의 낮은 트래픽으로 인해 autoscaler가 Pod을 0으로 스케일 다운한 뒤 요청이 들어올 때마다 콜드 스타트가 발생한 것이었다. 이 팀은 모델 로딩에 8초가 걸리는 대형 모델을 사용하고 있었고, 그 콜드 스타트 레이턴시가 p99를 끌어올려 오판을 유발했다.

교훈: 카나리 배포 시 min-scale을 최소 1로 설정하여 콜드 스타트를 방지한다. 또한 자동 롤백 판정에서 콜드 스타트 요청을 필터링하는 로직을 추가한다 (예: Pod 기동 후 30초 이내의 요청은 레이턴시 계산에서 제외).

사례 3: 통계적 유의성 없이 승격한 모델

A/B 테스트를 72시간 진행한 후 새 모델의 전환율이 0.3% 높은 것을 확인하고 champion으로 승격했다. 그런데 승격 후 2주가 지나자 전환율이 오히려 이전보다 낮아졌다. 원인을 분석해 보니 72시간 동안의 데이터로는 0.3%의 차이를 유의하게 검증하기에 샘플이 부족했고, 관측된 차이는 단순한 통계적 노이즈였다. 해당 서비스의 일일 요청 수와 기대 효과 크기를 고려하면 최소 10일 이상의 테스트 기간이 필요했다.

교훈: A/B 테스트 시작 전에 power analysis를 수행하여 필요한 최소 샘플 수와 테스트 기간을 산정한다. p-value 0.05와 검정력 0.8을 기준으로 최소 기간을 계산하고, 그 기간이 지나기 전에는 승격/롤백 판단을 하지 않는다.

사례 4: 모델 레지스트리 없이 운영하다 발생한 장애

초기 스타트업에서 모델 레지스트리 없이 S3 경로를 직접 KServe storageUri에 하드코딩하여 배포했다. 어느 날 한 엔지니어가 학습 파이프라인의 output 경로를 변경하면서 동일한 S3 경로에 새로운 (검증되지 않은) 모델 artifact가 덮어씌워졌다. KServe Pod이 재시작되면서 검증되지 않은 모델이 자동으로 로드되었고, 4시간 동안 비정상적인 추론 결과를 반환했다.

교훈: 모델 artifact의 불변성(immutability)은 모델 레지스트리의 가장 기본적인 기능이다. S3 경로 직접 참조 대신 레지스트리를 통한 간접 참조를 반드시 사용하고, 등록된 artifact는 덮어쓰기가 불가능하도록 S3 Object Lock이나 버전 관리를 활성화한다.

참고자료