Skip to content
Published on

Feature Store & MLOps 파이프라인 완전 가이드 2025: Feast, Feature Engineering, 모델 서빙

Authors

목차

1. 왜 Feature Store인가?

1.1 Training-Serving Skew 문제

ML 모델 개발에서 가장 빈번한 장애 원인은 학습 시점과 서빙 시점의 피처 불일치입니다.

데이터 과학자가 Jupyter Notebook에서 Pandas로 피처를 생성하고, 엔지니어가 Java/Go로 동일한 로직을 재구현할 때 미묘한 차이가 발생합니다.

# 학습 시: Pandas로 피처 생성
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
    lambda x: x.rolling('7D').mean()
)

# 서빙 시: SQL 또는 다른 언어로 재구현 → 미묘한 차이 발생
# - 경계 조건(inclusive/exclusive) 차이
# - 타임존 처리 차이
# - NULL 처리 방식 차이

Feature Store는 단일 피처 정의를 학습과 서빙 양쪽에서 공유하여 이 문제를 근본적으로 해결합니다.

1.2 피처 재사용과 팀 협업

대규모 ML 조직에서는 수십 개 팀이 유사한 피처를 독립적으로 생성합니다.

문제Feature Store 도입 전Feature Store 도입 후
피처 중복팀마다 유사 피처 재생성중앙 레지스트리에서 검색/재사용
일관성팀마다 다른 계산 로직단일 정의, 버전 관리
신선도수동 업데이트자동 materialization
발견 가능성Slack/Wiki 의존피처 카탈로그 + 메타데이터
거버넌스없음소유자, 라인리지, 접근 제어

1.3 데이터 파이프라인 복잡성 감소

Feature Store 없이는 각 모델이 자체 데이터 파이프라인을 유지합니다.

모델 A: raw data → ETL A → features A → training A
모델 B: raw data → ETL B → features B → training B
모델 C: raw data → ETL C → features C → training C

Feature Store 도입 후:

raw data → Feature Store (중앙 집중) → 모델 A, B, C 공유

2. Feature Store 아키텍처

2.1 핵심 구성 요소

Feature Store는 4가지 핵심 구성 요소로 이루어집니다.

오프라인 스토어 (Offline Store)

  • 대량의 히스토리컬 피처 데이터 저장
  • 학습 데이터 생성에 사용
  • BigQuery, Snowflake, Redshift, Parquet 파일
  • Point-in-Time Join 지원

온라인 스토어 (Online Store)

  • 최신 피처 값의 저속 지연 조회
  • 실시간 추론에 사용
  • Redis, DynamoDB, Bigtable
  • P99 레이턴시 10ms 이하 목표

피처 레지스트리 (Feature Registry)

  • 모든 피처의 메타데이터 관리
  • 이름, 타입, 소유자, 설명, 태그
  • 데이터 라인리지 추적

변환 엔진 (Transformation Engine)

  • 원시 데이터에서 피처 생성
  • 배치/스트리밍 변환 지원
  • Spark, Flink, dbt 통합

2.2 데이터 흐름

[데이터 소스][변환 엔진][오프라인 스토어] ←→ [온라인 스토어]
      ↑                              ↓                    ↓
  원시 데이터                    학습 파이프라인        추론 서비스
                              [피처 레지스트리]
                              (메타데이터 관리)

2.3 주요 Feature Store 솔루션 비교

솔루션유형오프라인온라인스트리밍비용
FeastOSSRedshift/BQ/FileRedis/DynamoDBPush 기반무료(인프라 비용만)
Tecton관리형Spark + DeltaDynamoDBSpark Streaming구독형
HopsworksOSS/관리형HudiRonDBFlink커뮤니티 무료
Vertex AI FSGCP 관리형BigQueryBigtableDataflow사용량 기반
SageMaker FSAWS 관리형S3 + Glue내장 온라인Kinesis사용량 기반

3. Feast Deep Dive

3.1 Feast 설치 및 초기 설정

# Feast 설치
pip install feast

# 프로젝트 초기화
feast init my_feature_store
cd my_feature_store

# 프로젝트 구조
# my_feature_store/
#   feature_store.yaml    # 프로젝트 설정
#   features.py           # 피처 정의
#   data/                 # 샘플 데이터

feature_store.yaml 설정:

project: my_ml_project
registry: data/registry.db
provider: local  # local, gcp, aws
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: file  # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2

3.2 피처 정의

# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

# 엔티티 정의
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="사용자 고유 ID",
)

# 데이터 소스 정의
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# 피처 뷰 정의
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="days_since_last_login", dtype=Int64),
        Field(name="preferred_category", dtype=String),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "recommendation", "version": "v2"},
)

3.3 Materialization (오프라인 to 온라인)

# 피처 정의 적용
feast apply

# 오프라인 → 온라인 스토어로 피처 값 물리화
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59

# 증분 물리화 (마지막 이후만)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

3.4 학습 데이터 생성 (Point-in-Time Join)

Point-in-Time Join은 Feature Store의 핵심 기능입니다. 데이터 누수(Data Leakage)를 방지하면서 과거 특정 시점의 피처 값을 정확히 가져옵니다.

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# 학습 데이터의 엔티티 + 타임스탬프
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1001],
    "event_timestamp": pd.to_datetime([
        "2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
    ]),
})

# Point-in-Time Join으로 피처 가져오기
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
).to_df()

print(training_df)
# 각 행은 해당 시점 기준의 피처 값을 가짐
# → 2024-09-01 시점의 user 1001 피처 != 2024-10-01 시점의 user 1001 피처

3.5 온라인 서빙

# 실시간 추론을 위한 온라인 피처 조회
feature_vector = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ],
).to_dict()

# 결과: 가장 최신 피처 값 반환 (P99 레이턴시 약 5-10ms)

3.6 GCP/AWS 프로덕션 설정

# GCP 프로덕션 설정
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
  type: datastore  # 또는 bigtable
  project_id: my-gcp-project
offline_store:
  type: bigquery
  project_id: my-gcp-project
  dataset: feast_features
# AWS 프로덕션 설정
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
  type: dynamodb
  region: ap-northeast-2
offline_store:
  type: redshift
  cluster_id: my-redshift-cluster
  region: ap-northeast-2
  database: ml_features
  user: feast_user
  s3_staging_location: s3://my-bucket/feast-staging/

4. Feature Engineering 패턴

4.1 시간 기반 피처 (Temporal Features)

import pandas as pd

def create_temporal_features(df, timestamp_col, entity_col, value_col):
    """시간 윈도우 기반 집계 피처 생성"""
    df = df.sort_values(timestamp_col)

    features = pd.DataFrame()
    features[entity_col] = df[entity_col]
    features[timestamp_col] = df[timestamp_col]

    # 다양한 윈도우의 이동 평균
    for window in ['1D', '7D', '30D']:
        features[f'{value_col}_mean_{window}'] = (
            df.groupby(entity_col)[value_col]
            .transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
        )

    # 추세 피처: 7일 평균 / 30일 평균
    features[f'{value_col}_trend_7d_30d'] = (
        features[f'{value_col}_mean_7D'] /
        features[f'{value_col}_mean_30D'].replace(0, float('nan'))
    )

    # 요일/시간 피처
    features['day_of_week'] = df[timestamp_col].dt.dayofweek
    features['hour_of_day'] = df[timestamp_col].dt.hour
    features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)

    return features

4.2 집계 피처 (Aggregation Features)

def create_aggregation_features(events_df, entity_col, group_col):
    """엔티티별 집계 피처"""
    agg_features = events_df.groupby(entity_col).agg(
        event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
        unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
        total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
        avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
        max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
        std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
    ).reset_index()

    # 비율 피처
    agg_features['high_value_ratio'] = (
        events_df[events_df['amount'] > 100]
        .groupby(entity_col)
        .size()
        .reindex(agg_features[entity_col], fill_value=0)
        .values / agg_features['event_count']
    )

    return agg_features

4.3 임베딩 피처

from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer('all-MiniLM-L6-v2')

def create_text_embedding_features(df, text_col, prefix='emb'):
    """텍스트 임베딩 피처 생성"""
    embeddings = model.encode(df[text_col].fillna('').tolist())
    emb_df = pd.DataFrame(
        embeddings,
        columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
        index=df.index,
    )
    return pd.concat([df, emb_df], axis=1)

4.4 교차 피처 (Cross Features)

def create_cross_features(df):
    """피처 간 교차/상호작용 피처"""
    # 비율 피처
    df['purchase_to_visit_ratio'] = (
        df['purchase_count'] / df['visit_count'].replace(0, 1)
    )

    # 구간화 + 교차
    df['age_bucket'] = pd.cut(
        df['age'], bins=[0, 25, 35, 50, 100],
        labels=['young', 'middle', 'senior', 'elder']
    )
    df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']

    # 수치 상호작용
    df['income_x_age'] = df['income'] * df['age']

    return df

5. MLOps 성숙도 레벨

Google에서 정의한 MLOps 성숙도 모델은 4단계로 나뉩니다.

Level 0: 수동 프로세스

데이터 과학자가 수동으로:
1. 데이터 수집 및 전처리
2. Jupyter에서 모델 학습
3. 모델 파일을 엔지니어에게 전달
4. 엔지니어가 서빙 코드 작성
5. 수동 배포

문제점:
- 배포 주기: 수개월
- 재현성 없음
- 모니터링 없음

Level 1: ML 파이프라인 자동화

자동화된 ML 파이프라인:
1. 데이터 검증 → 피처 엔지니어링 → 학습 → 평가 → 배포
2. 파이프라인 오케스트레이터 사용 (Kubeflow, Airflow)
3. CT (Continuous Training): 트리거 기반 자동 재학습

개선점:
- 배포 주기: 주 단위
- 파이프라인 재현 가능
- 기본 모니터링

Level 2: CI/CD for ML

CI/CD 파이프라인 통합:
1. 코드 변경 → 자동 테스트 → 파이프라인 빌드 → 모델 학습
2. 모델 검증 게이트 (성능 임계값)
3. Shadow DeploymentCanaryFull Rollout
4. A/B 테스트 자동화

개선점:
- 배포 주기: 일 단위
- 자동 롤백
- 체계적 실험 관리

Level 3: 자동 재학습 + 완전 자동화

완전 자동화:
1. 드리프트 감지 → 자동 재학습 트리거
2. 자동 피처 선택/하이퍼파라미터 최적화
3. 모델 성능 자동 비교 + 챔피언/챌린저
4. 자동 스케일링 + 비용 최적화

개선점:
- 배포 주기: 시간 단위
- 무인 운영
- 프로액티브 모니터링

6. ML 파이프라인 오케스트레이션

6.1 Kubeflow Pipelines

# Kubeflow Pipelines DSL로 파이프라인 정의
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
    raw_data: Input[Dataset],
    processed_data: Output[Dataset],
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(raw_data.path)
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df.select_dtypes(include='number')),
        columns=df.select_dtypes(include='number').columns,
    )
    df_scaled.to_parquet(processed_data.path)


@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
    training_data: Input[Dataset],
    model_artifact: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 10,
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    import joblib

    df = pd.read_parquet(training_data.path)
    X = df.drop('target', axis=1)
    y = df['target']

    clf = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
    )
    scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')

    clf.fit(X, y)
    joblib.dump(clf, model_artifact.path)

    metrics.log_metric("f1_mean", float(scores.mean()))
    metrics.log_metric("f1_std", float(scores.std()))


@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
    preprocess_task = preprocess_data(
        raw_data=dsl.importer(
            artifact_uri="gs://my-bucket/raw-data/",
            artifact_class=Dataset,
        ).output,
    )

    train_task = train_model(
        training_data=preprocess_task.outputs["processed_data"],
        n_estimators=n_estimators,
        max_depth=max_depth,
    )


# 컴파일 및 실행
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

6.2 Airflow ML 파이프라인

# Airflow DAG으로 ML 파이프라인 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'ml_training_pipeline',
    default_args=default_args,
    schedule_interval='@weekly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'training'],
) as dag:

    validate_data = KubernetesPodOperator(
        task_id='validate_data',
        name='data-validation',
        image='ml-pipeline:latest',
        cmds=['python', 'validate.py'],
        namespace='ml-pipelines',
    )

    extract_features = KubernetesPodOperator(
        task_id='extract_features',
        name='feature-extraction',
        image='ml-pipeline:latest',
        cmds=['python', 'extract_features.py'],
        namespace='ml-pipelines',
    )

    train_model = KubernetesPodOperator(
        task_id='train_model',
        name='model-training',
        image='ml-pipeline:latest',
        cmds=['python', 'train.py'],
        namespace='ml-pipelines',
        resources={
            'request_memory': '8Gi',
            'request_cpu': '4',
            'limit_gpu': '1',
        },
    )

    evaluate_model = PythonOperator(
        task_id='evaluate_model',
        python_callable=lambda: print("Evaluating model..."),
    )

    validate_data >> extract_features >> train_model >> evaluate_model

6.3 파이프라인 솔루션 비교

기능KubeflowAirflowVertex AISageMaker
실행 환경Kubernetes다양GCP 관리형AWS 관리형
ML 특화높음중간높음높음
UI/시각화기본우수우수우수
확장성높음높음높음높음
학습 곡선높음중간낮음낮음
비용인프라만인프라만사용량사용량

7. 실험 추적 (Experiment Tracking)

7.1 MLflow 실험 추적

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score

# MLflow 서버 설정
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")

# 실험 실행
with mlflow.start_run(run_name="gbm-experiment-42") as run:
    # 하이퍼파라미터 로깅
    params = {
        "n_estimators": 200,
        "max_depth": 8,
        "learning_rate": 0.1,
        "subsample": 0.8,
    }
    mlflow.log_params(params)

    # 모델 학습
    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)

    # 메트릭 로깅
    y_pred = model.predict(X_test)
    metrics = {
        "f1": f1_score(y_test, y_pred, average='macro'),
        "precision": precision_score(y_test, y_pred, average='macro'),
        "recall": recall_score(y_test, y_pred, average='macro'),
    }
    mlflow.log_metrics(metrics)

    # 모델 아티팩트 로깅
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="recommendation-gbm",
    )

    # 커스텀 아티팩트 (피처 중요도 차트 등)
    mlflow.log_artifact("feature_importance.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

7.2 Weights and Biases 통합

import wandb

wandb.init(
    project="recommendation-model",
    config={
        "architecture": "GBM",
        "n_estimators": 200,
        "learning_rate": 0.1,
    },
)

# 학습 루프에서 메트릭 로깅
for epoch in range(100):
    train_loss = train_one_epoch(model, train_loader)
    val_loss, val_f1 = evaluate(model, val_loader)

    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "val_loss": val_loss,
        "val_f1": val_f1,
    })

# 모델 아티팩트 저장
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)

wandb.finish()

8. 모델 레지스트리 (Model Registry)

8.1 MLflow Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# 모델 버전 생성
model_version = client.create_model_version(
    name="recommendation-gbm",
    source=f"runs:/{run_id}/model",
    run_id=run_id,
    description="GBM 모델 v3: 신규 피처 추가",
)

# 스테이지 전환: None → Staging
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False,
)

# 스테이징 검증 후 프로덕션 승격
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Production",
    archive_existing_versions=True,  # 기존 프로덕션 버전 아카이브
)

8.2 모델 버전 관리 전략

모델 라이프사이클:
NoneStagingProductionArchived

버전 관리 정책:
- Staging: 자동 성능 테스트 통과 시
- Production: 수동 승인 또는 A/B 테스트 통과 시
- Archived: 새 버전 프로덕션 승격 시 자동
- 최근 3개 버전 보관, 이전 버전 삭제
# 프로덕션 모델 로드
import mlflow.pyfunc

model = mlflow.pyfunc.load_model(
    model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)

9. 모델 서빙

9.1 BentoML

# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON

# 저장된 모델 로드
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()

svc = bentoml.Service("recommendation_service", runners=[runner])

@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
    features = np.array(input_data["features"]).reshape(1, -1)
    prediction = await runner.predict.async_run(features)
    return {
        "prediction": int(prediction[0]),
        "model_version": "v3.2.1",
    }
# bentofile.yaml
service: "service:svc"
include:
  - "*.py"
python:
  packages:
    - scikit-learn
    - numpy
docker:
  python_version: "3.11"
# 빌드 및 배포
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest

9.2 Seldon Core (Kubernetes)

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: recommendation-model
  namespace: ml-serving
spec:
  predictors:
    - name: default
      replicas: 3
      graph:
        name: classifier
        implementation: SKLEARN_SERVER
        modelUri: s3://models/recommendation/v3
        envSecretRefName: s3-credentials
      componentSpecs:
        - spec:
            containers:
              - name: classifier
                resources:
                  requests:
                    cpu: "1"
                    memory: "2Gi"
                  limits:
                    cpu: "2"
                    memory: "4Gi"
      traffic: 100
      labels:
        version: v3

9.3 TensorFlow Serving

# TFServing Docker 실행
docker run -p 8501:8501 \
  --mount type=bind,source=/models/recommendation,target=/models/recommendation \
  -e MODEL_NAME=recommendation \
  tensorflow/serving:latest
# gRPC 클라이언트
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
    tf.make_tensor_proto(input_data, shape=[1, 128])
)

response = stub.Predict(request, timeout=5.0)

9.4 vLLM (LLM 서빙)

# vLLM 서버 실행
# python -m vllm.entrypoints.openai.api_server \
#   --model meta-llama/Llama-3-8B-Instruct \
#   --tensor-parallel-size 2 \
#   --max-model-len 8192

# 클라이언트 호출
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed",
)

response = client.chat.completions.create(
    model="meta-llama/Llama-3-8B-Instruct",
    messages=[
        {"role": "user", "content": "추천 시스템의 장점은?"},
    ],
    temperature=0.7,
    max_tokens=512,
)

9.5 서빙 솔루션 비교

솔루션모델 유형배포 환경배치 추론GPU 지원자동 스케일링
BentoML범용Docker/K8s지원지원지원
Seldon Core범용K8s 전용지원지원HPA
TFServingTF 모델Docker/K8s지원지원수동
Triton멀티프레임워크Docker/K8s지원최적화지원
vLLMLLMDocker/K8s미지원필수지원

10. 모델 모니터링

10.1 데이터 드리프트 감지

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# 기준 데이터 (학습 시점)
reference_data = pd.read_parquet("training_data.parquet")
# 현재 데이터 (프로덕션)
current_data = pd.read_parquet("production_data_latest.parquet")

column_mapping = ColumnMapping(
    target='label',
    numerical_features=['age', 'income', 'purchase_count'],
    categorical_features=['category', 'region'],
)

# 드리프트 리포트 생성
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

report.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

# HTML 리포트 저장
report.save_html("drift_report.html")

# 프로그래밍적 접근
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']

if dataset_drift:
    print(f"드리프트 감지! 드리프트 피처 비율: {drift_share:.2%}")
    # 재학습 파이프라인 트리거
    trigger_retraining_pipeline()

10.2 예측 드리프트 감지

from evidently.test_suite import TestSuite
from evidently.tests import (
    TestColumnDrift,
    TestShareOfDriftedColumns,
    TestMeanInNSigmas,
)

test_suite = TestSuite(tests=[
    TestShareOfDriftedColumns(lt=0.3),  # 드리프트 피처 30% 미만
    TestColumnDrift(column_name="prediction_score"),
    TestMeanInNSigmas(column_name="prediction_score", n=2),
])

test_suite.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

# 테스트 결과 확인
test_results = test_suite.as_dict()
all_passed = all(
    t['status'] == 'SUCCESS' for t in test_results['tests']
)

if not all_passed:
    alert_team("모델 드리프트 감지 - 재학습 검토 필요")

10.3 WhyLabs 통합

import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter

# 프로파일 생성
results = why.log(current_data)
profile = results.profile()

# WhyLabs 전송
writer = WhyLabsWriter()
writer.write(profile)

# 커스텀 메트릭 모니터링
from whylogs.experimental.core.udf_schema import udf_schema

@udf_schema()
def custom_metrics(df):
    return {
        "avg_prediction_confidence": df['confidence'].mean(),
        "null_feature_ratio": df.isnull().sum().sum() / df.size,
        "prediction_distribution_skew": df['prediction'].skew(),
    }

11. A/B 테스트

11.1 트래픽 분할

# Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: recommendation-ab-test
spec:
  hosts:
    - recommendation-service
  http:
    - match:
        - headers:
            x-experiment-group:
              exact: "treatment"
      route:
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 100
    - route:
        - destination:
            host: recommendation-service
            subset: v1-champion
          weight: 80
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 20

11.2 통계적 유의성 검증

from scipy import stats
import numpy as np

def ab_test_significance(
    control_conversions, control_total,
    treatment_conversions, treatment_total,
    alpha=0.05,
):
    """A/B 테스트 통계적 유의성 검증"""
    control_rate = control_conversions / control_total
    treatment_rate = treatment_conversions / treatment_total

    # Z-test for proportions
    pooled_rate = (
        (control_conversions + treatment_conversions) /
        (control_total + treatment_total)
    )
    se = np.sqrt(
        pooled_rate * (1 - pooled_rate) *
        (1/control_total + 1/treatment_total)
    )
    z_stat = (treatment_rate - control_rate) / se
    p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

    lift = (treatment_rate - control_rate) / control_rate

    return {
        "control_rate": control_rate,
        "treatment_rate": treatment_rate,
        "lift": f"{lift:.2%}",
        "p_value": p_value,
        "significant": p_value < alpha,
        "recommendation": (
            "챌린저 모델 승격" if p_value < alpha and lift > 0
            else "기존 모델 유지"
        ),
    }

11.3 Multi-Armed Bandit

import numpy as np

class ThompsonSampling:
    """Thompson Sampling을 이용한 적응적 트래픽 분할"""

    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.successes = np.ones(n_arms)  # Beta prior alpha
        self.failures = np.ones(n_arms)   # Beta prior beta

    def select_arm(self):
        """Beta 분포에서 샘플링하여 최적 arm 선택"""
        samples = [
            np.random.beta(self.successes[i], self.failures[i])
            for i in range(self.n_arms)
        ]
        return int(np.argmax(samples))

    def update(self, arm, reward):
        """결과 업데이트"""
        if reward:
            self.successes[arm] += 1
        else:
            self.failures[arm] += 1

    def get_allocation(self):
        """현재 트래픽 할당 비율"""
        total = self.successes + self.failures
        rates = self.successes / total
        return rates / rates.sum()


# 사용 예시
bandit = ThompsonSampling(n_arms=3)  # 3개 모델 버전

for request in incoming_requests:
    arm = bandit.select_arm()
    prediction = models[arm].predict(request)
    reward = get_conversion(request, prediction)
    bandit.update(arm, reward)

12. CI/CD for ML

12.1 모델 검증 게이트

# model_validation.py
import mlflow
import json

def validate_model(
    model_uri: str,
    test_data_path: str,
    min_f1: float = 0.85,
    max_latency_ms: float = 50.0,
    max_model_size_mb: float = 500.0,
):
    """모델 배포 전 검증 게이트"""
    results = {"passed": True, "checks": []}

    # 1. 성능 검증
    model = mlflow.pyfunc.load_model(model_uri)
    test_data = pd.read_parquet(test_data_path)
    predictions = model.predict(test_data.drop('target', axis=1))
    f1 = f1_score(test_data['target'], predictions, average='macro')

    results["checks"].append({
        "name": "performance",
        "metric": "f1_score",
        "value": f1,
        "threshold": min_f1,
        "passed": f1 >= min_f1,
    })

    # 2. 레이턴시 검증
    import time
    latencies = []
    for i in range(100):
        start = time.time()
        model.predict(test_data.iloc[[i]])
        latencies.append((time.time() - start) * 1000)

    p99_latency = np.percentile(latencies, 99)
    results["checks"].append({
        "name": "latency",
        "metric": "p99_ms",
        "value": p99_latency,
        "threshold": max_latency_ms,
        "passed": p99_latency <= max_latency_ms,
    })

    # 3. 모델 크기 검증
    import os
    model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
    results["checks"].append({
        "name": "model_size",
        "metric": "size_mb",
        "value": model_size_mb,
        "threshold": max_model_size_mb,
        "passed": model_size_mb <= max_model_size_mb,
    })

    results["passed"] = all(c["passed"] for c in results["checks"])
    return results

12.2 GitHub Actions ML CI/CD

name: ML CI/CD Pipeline
on:
  push:
    branches: [main]
    paths:
      - 'models/**'
      - 'features/**'

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate Data Schema
        run: python scripts/validate_data.py

  train-and-evaluate:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train Model
        run: python train.py --config configs/production.yaml
      - name: Evaluate Model
        run: python evaluate.py --model-path outputs/model
      - name: Upload Metrics
        run: python upload_metrics.py

  model-validation:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    steps:
      - name: Run Validation Gates
        run: python model_validation.py
      - name: Performance Regression Check
        run: python check_regression.py

  deploy-staging:
    needs: model-validation
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Staging
        run: |
          kubectl apply -f k8s/staging/
          kubectl rollout status deployment/model-serving -n staging

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Shadow Deployment
        run: kubectl apply -f k8s/production/shadow.yaml
      - name: Canary Rollout
        run: |
          kubectl apply -f k8s/production/canary-10.yaml
          sleep 300
          python check_canary_metrics.py
          kubectl apply -f k8s/production/canary-50.yaml
          sleep 300
          python check_canary_metrics.py
          kubectl apply -f k8s/production/full-rollout.yaml

13. 비용 최적화

13.1 배치 vs 실시간 추론

# 배치 추론: 비용 효율적 (대량 처리)
def batch_inference(model, data_path, output_path):
    """Spark를 이용한 배치 추론"""
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("batch-inference").getOrCreate()
    df = spark.read.parquet(data_path)

    # UDF로 모델 적용
    predict_udf = spark.udf.register(
        "predict",
        lambda features: float(model.predict([features])[0]),
    )
    result = df.withColumn("prediction", predict_udf(df["features"]))
    result.write.parquet(output_path)

# 실시간 추론: 지연 시간 중요 (개별 요청)
# → 온라인 서빙 인프라 필요 (비용 높음)

13.2 예측 캐싱

import redis
import hashlib
import json

class PredictionCache:
    def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
        self.hit_count = 0
        self.miss_count = 0

    def _make_key(self, features: dict) -> str:
        feature_str = json.dumps(features, sort_keys=True)
        return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"

    def get_or_predict(self, features: dict, model) -> dict:
        key = self._make_key(features)
        cached = self.redis.get(key)

        if cached:
            self.hit_count += 1
            return json.loads(cached)

        self.miss_count += 1
        prediction = model.predict(features)
        self.redis.setex(key, self.ttl, json.dumps(prediction))
        return prediction

    @property
    def hit_rate(self):
        total = self.hit_count + self.miss_count
        return self.hit_count / total if total > 0 else 0

13.3 모델 압축

# 양자화 (Quantization)
import torch

model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8,
)

# 크기 비교
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)

print(f"원본: {original_size:.1f}MB → 양자화: {quantized_size:.1f}MB")
print(f"압축률: {(1 - quantized_size/original_size):.1%}")

13.4 비용 최적화 체크리스트

전략예상 절감트레이드오프
배치 추론 전환50-80%실시간성 포기
예측 캐싱30-60%캐시 신선도
모델 양자화40-60%미세 정확도 손실
Spot/Preemptible 인스턴스60-90%가용성 불안정
자동 스케일링20-40%콜드 스타트
모델 증류50-70%개발 비용

14. 퀴즈

Q1: Feature Store의 핵심 가치는 무엇인가?

A: Feature Store의 핵심 가치는 Training-Serving Skew 방지입니다. 학습 시점과 서빙 시점에서 동일한 피처 정의와 변환 로직을 공유하여 데이터 불일치를 근본적으로 제거합니다. 추가로 피처 재사용, 팀 협업, 피처 거버넌스도 중요한 가치입니다.

Q2: Point-in-Time Join이 필요한 이유는?

A: Point-in-Time Join은 데이터 누수(Data Leakage)를 방지합니다. 학습 데이터를 생성할 때, 각 이벤트 시점에서 실제로 사용 가능했던 피처 값만 조인해야 합니다. 미래 데이터가 포함되면 모델이 학습 시 비현실적으로 높은 성능을 보이지만, 프로덕션에서는 성능이 크게 저하됩니다.

Q3: MLOps Level 2에서 Level 3으로 넘어가기 위한 핵심 요소는?

A: Level 2(CI/CD for ML)에서 Level 3(자동 재학습)으로 넘어가려면 자동 드리프트 감지 및 재학습 트리거 시스템이 필요합니다. 데이터 드리프트, 예측 드리프트를 실시간으로 모니터링하고, 임계값 초과 시 자동으로 재학습 파이프라인을 실행하며, 챔피언/챌린저 비교를 통해 자동 승격합니다.

Q4: BentoML과 Seldon Core의 주요 차이점은?

A: BentoML은 프레임워크 독립적인 모델 패키징 도구로, Docker 컨테이너를 생성하여 어디서든 배포할 수 있습니다. Seldon Core는 Kubernetes 네이티브 서빙 플랫폼으로, CRD 기반의 배포, A/B 테스트, Canary 배포, 설명 가능성(Explainer) 등 K8s 생태계와 깊이 통합됩니다.

Q5: 모델 드리프트를 감지하는 주요 방법 3가지는?

A: (1) 데이터 드리프트: 입력 피처 분포의 변화를 KS 검정, PSI(Population Stability Index) 등으로 감지. (2) 예측 드리프트: 모델 출력 분포의 변화를 모니터링. (3) 성능 드리프트: 실제 레이블과 비교하여 정확도, F1 등의 메트릭 저하를 추적. Evidently AI와 WhyLabs가 대표적인 도구입니다.


15. 참고 자료

  1. Feast 공식 문서 - https://docs.feast.dev/
  2. MLflow 공식 문서 - https://mlflow.org/docs/latest/index.html
  3. Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
  4. Evidently AI 문서 - https://docs.evidentlyai.com/
  5. BentoML 공식 문서 - https://docs.bentoml.com/
  6. Seldon Core 문서 - https://docs.seldon.io/
  7. Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
  8. WhyLabs 문서 - https://docs.whylabs.ai/
  9. vLLM 프로젝트 - https://docs.vllm.ai/
  10. Weights and Biases - https://docs.wandb.ai/
  11. Tecton Feature Store - https://docs.tecton.ai/
  12. Hopsworks Feature Store - https://docs.hopsworks.ai/
  13. NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/