Skip to content

Split View: Feature Store & MLOps 파이프라인 완전 가이드 2025: Feast, Feature Engineering, 모델 서빙

✨ Learn with Quiz
|

Feature Store & MLOps 파이프라인 완전 가이드 2025: Feast, Feature Engineering, 모델 서빙

목차

1. 왜 Feature Store인가?

1.1 Training-Serving Skew 문제

ML 모델 개발에서 가장 빈번한 장애 원인은 학습 시점과 서빙 시점의 피처 불일치입니다.

데이터 과학자가 Jupyter Notebook에서 Pandas로 피처를 생성하고, 엔지니어가 Java/Go로 동일한 로직을 재구현할 때 미묘한 차이가 발생합니다.

# 학습 시: Pandas로 피처 생성
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
    lambda x: x.rolling('7D').mean()
)

# 서빙 시: SQL 또는 다른 언어로 재구현 → 미묘한 차이 발생
# - 경계 조건(inclusive/exclusive) 차이
# - 타임존 처리 차이
# - NULL 처리 방식 차이

Feature Store는 단일 피처 정의를 학습과 서빙 양쪽에서 공유하여 이 문제를 근본적으로 해결합니다.

1.2 피처 재사용과 팀 협업

대규모 ML 조직에서는 수십 개 팀이 유사한 피처를 독립적으로 생성합니다.

문제Feature Store 도입 전Feature Store 도입 후
피처 중복팀마다 유사 피처 재생성중앙 레지스트리에서 검색/재사용
일관성팀마다 다른 계산 로직단일 정의, 버전 관리
신선도수동 업데이트자동 materialization
발견 가능성Slack/Wiki 의존피처 카탈로그 + 메타데이터
거버넌스없음소유자, 라인리지, 접근 제어

1.3 데이터 파이프라인 복잡성 감소

Feature Store 없이는 각 모델이 자체 데이터 파이프라인을 유지합니다.

모델 A: raw data → ETL A → features A → training A
모델 B: raw data → ETL B → features B → training B
모델 C: raw data → ETL C → features C → training C

Feature Store 도입 후:

raw data → Feature Store (중앙 집중) → 모델 A, B, C 공유

2. Feature Store 아키텍처

2.1 핵심 구성 요소

Feature Store는 4가지 핵심 구성 요소로 이루어집니다.

오프라인 스토어 (Offline Store)

  • 대량의 히스토리컬 피처 데이터 저장
  • 학습 데이터 생성에 사용
  • BigQuery, Snowflake, Redshift, Parquet 파일
  • Point-in-Time Join 지원

온라인 스토어 (Online Store)

  • 최신 피처 값의 저속 지연 조회
  • 실시간 추론에 사용
  • Redis, DynamoDB, Bigtable
  • P99 레이턴시 10ms 이하 목표

피처 레지스트리 (Feature Registry)

  • 모든 피처의 메타데이터 관리
  • 이름, 타입, 소유자, 설명, 태그
  • 데이터 라인리지 추적

변환 엔진 (Transformation Engine)

  • 원시 데이터에서 피처 생성
  • 배치/스트리밍 변환 지원
  • Spark, Flink, dbt 통합

2.2 데이터 흐름

[데이터 소스][변환 엔진][오프라인 스토어] ←→ [온라인 스토어]
      ↑                              ↓                    ↓
  원시 데이터                    학습 파이프라인        추론 서비스
                              [피처 레지스트리]
                              (메타데이터 관리)

2.3 주요 Feature Store 솔루션 비교

솔루션유형오프라인온라인스트리밍비용
FeastOSSRedshift/BQ/FileRedis/DynamoDBPush 기반무료(인프라 비용만)
Tecton관리형Spark + DeltaDynamoDBSpark Streaming구독형
HopsworksOSS/관리형HudiRonDBFlink커뮤니티 무료
Vertex AI FSGCP 관리형BigQueryBigtableDataflow사용량 기반
SageMaker FSAWS 관리형S3 + Glue내장 온라인Kinesis사용량 기반

3. Feast Deep Dive

3.1 Feast 설치 및 초기 설정

# Feast 설치
pip install feast

# 프로젝트 초기화
feast init my_feature_store
cd my_feature_store

# 프로젝트 구조
# my_feature_store/
#   feature_store.yaml    # 프로젝트 설정
#   features.py           # 피처 정의
#   data/                 # 샘플 데이터

feature_store.yaml 설정:

project: my_ml_project
registry: data/registry.db
provider: local  # local, gcp, aws
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: file  # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2

3.2 피처 정의

# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

# 엔티티 정의
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="사용자 고유 ID",
)

# 데이터 소스 정의
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# 피처 뷰 정의
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="days_since_last_login", dtype=Int64),
        Field(name="preferred_category", dtype=String),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "recommendation", "version": "v2"},
)

3.3 Materialization (오프라인 to 온라인)

# 피처 정의 적용
feast apply

# 오프라인 → 온라인 스토어로 피처 값 물리화
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59

# 증분 물리화 (마지막 이후만)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

3.4 학습 데이터 생성 (Point-in-Time Join)

Point-in-Time Join은 Feature Store의 핵심 기능입니다. 데이터 누수(Data Leakage)를 방지하면서 과거 특정 시점의 피처 값을 정확히 가져옵니다.

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# 학습 데이터의 엔티티 + 타임스탬프
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1001],
    "event_timestamp": pd.to_datetime([
        "2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
    ]),
})

# Point-in-Time Join으로 피처 가져오기
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
).to_df()

print(training_df)
# 각 행은 해당 시점 기준의 피처 값을 가짐
# → 2024-09-01 시점의 user 1001 피처 != 2024-10-01 시점의 user 1001 피처

3.5 온라인 서빙

# 실시간 추론을 위한 온라인 피처 조회
feature_vector = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ],
).to_dict()

# 결과: 가장 최신 피처 값 반환 (P99 레이턴시 약 5-10ms)

3.6 GCP/AWS 프로덕션 설정

# GCP 프로덕션 설정
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
  type: datastore  # 또는 bigtable
  project_id: my-gcp-project
offline_store:
  type: bigquery
  project_id: my-gcp-project
  dataset: feast_features
# AWS 프로덕션 설정
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
  type: dynamodb
  region: ap-northeast-2
offline_store:
  type: redshift
  cluster_id: my-redshift-cluster
  region: ap-northeast-2
  database: ml_features
  user: feast_user
  s3_staging_location: s3://my-bucket/feast-staging/

4. Feature Engineering 패턴

4.1 시간 기반 피처 (Temporal Features)

import pandas as pd

def create_temporal_features(df, timestamp_col, entity_col, value_col):
    """시간 윈도우 기반 집계 피처 생성"""
    df = df.sort_values(timestamp_col)

    features = pd.DataFrame()
    features[entity_col] = df[entity_col]
    features[timestamp_col] = df[timestamp_col]

    # 다양한 윈도우의 이동 평균
    for window in ['1D', '7D', '30D']:
        features[f'{value_col}_mean_{window}'] = (
            df.groupby(entity_col)[value_col]
            .transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
        )

    # 추세 피처: 7일 평균 / 30일 평균
    features[f'{value_col}_trend_7d_30d'] = (
        features[f'{value_col}_mean_7D'] /
        features[f'{value_col}_mean_30D'].replace(0, float('nan'))
    )

    # 요일/시간 피처
    features['day_of_week'] = df[timestamp_col].dt.dayofweek
    features['hour_of_day'] = df[timestamp_col].dt.hour
    features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)

    return features

4.2 집계 피처 (Aggregation Features)

def create_aggregation_features(events_df, entity_col, group_col):
    """엔티티별 집계 피처"""
    agg_features = events_df.groupby(entity_col).agg(
        event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
        unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
        total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
        avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
        max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
        std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
    ).reset_index()

    # 비율 피처
    agg_features['high_value_ratio'] = (
        events_df[events_df['amount'] > 100]
        .groupby(entity_col)
        .size()
        .reindex(agg_features[entity_col], fill_value=0)
        .values / agg_features['event_count']
    )

    return agg_features

4.3 임베딩 피처

from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer('all-MiniLM-L6-v2')

def create_text_embedding_features(df, text_col, prefix='emb'):
    """텍스트 임베딩 피처 생성"""
    embeddings = model.encode(df[text_col].fillna('').tolist())
    emb_df = pd.DataFrame(
        embeddings,
        columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
        index=df.index,
    )
    return pd.concat([df, emb_df], axis=1)

4.4 교차 피처 (Cross Features)

def create_cross_features(df):
    """피처 간 교차/상호작용 피처"""
    # 비율 피처
    df['purchase_to_visit_ratio'] = (
        df['purchase_count'] / df['visit_count'].replace(0, 1)
    )

    # 구간화 + 교차
    df['age_bucket'] = pd.cut(
        df['age'], bins=[0, 25, 35, 50, 100],
        labels=['young', 'middle', 'senior', 'elder']
    )
    df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']

    # 수치 상호작용
    df['income_x_age'] = df['income'] * df['age']

    return df

5. MLOps 성숙도 레벨

Google에서 정의한 MLOps 성숙도 모델은 4단계로 나뉩니다.

Level 0: 수동 프로세스

데이터 과학자가 수동으로:
1. 데이터 수집 및 전처리
2. Jupyter에서 모델 학습
3. 모델 파일을 엔지니어에게 전달
4. 엔지니어가 서빙 코드 작성
5. 수동 배포

문제점:
- 배포 주기: 수개월
- 재현성 없음
- 모니터링 없음

Level 1: ML 파이프라인 자동화

자동화된 ML 파이프라인:
1. 데이터 검증 → 피처 엔지니어링 → 학습 → 평가 → 배포
2. 파이프라인 오케스트레이터 사용 (Kubeflow, Airflow)
3. CT (Continuous Training): 트리거 기반 자동 재학습

개선점:
- 배포 주기: 주 단위
- 파이프라인 재현 가능
- 기본 모니터링

Level 2: CI/CD for ML

CI/CD 파이프라인 통합:
1. 코드 변경 → 자동 테스트 → 파이프라인 빌드 → 모델 학습
2. 모델 검증 게이트 (성능 임계값)
3. Shadow DeploymentCanaryFull Rollout
4. A/B 테스트 자동화

개선점:
- 배포 주기: 일 단위
- 자동 롤백
- 체계적 실험 관리

Level 3: 자동 재학습 + 완전 자동화

완전 자동화:
1. 드리프트 감지 → 자동 재학습 트리거
2. 자동 피처 선택/하이퍼파라미터 최적화
3. 모델 성능 자동 비교 + 챔피언/챌린저
4. 자동 스케일링 + 비용 최적화

개선점:
- 배포 주기: 시간 단위
- 무인 운영
- 프로액티브 모니터링

6. ML 파이프라인 오케스트레이션

6.1 Kubeflow Pipelines

# Kubeflow Pipelines DSL로 파이프라인 정의
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
    raw_data: Input[Dataset],
    processed_data: Output[Dataset],
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(raw_data.path)
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df.select_dtypes(include='number')),
        columns=df.select_dtypes(include='number').columns,
    )
    df_scaled.to_parquet(processed_data.path)


@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
    training_data: Input[Dataset],
    model_artifact: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 10,
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    import joblib

    df = pd.read_parquet(training_data.path)
    X = df.drop('target', axis=1)
    y = df['target']

    clf = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
    )
    scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')

    clf.fit(X, y)
    joblib.dump(clf, model_artifact.path)

    metrics.log_metric("f1_mean", float(scores.mean()))
    metrics.log_metric("f1_std", float(scores.std()))


@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
    preprocess_task = preprocess_data(
        raw_data=dsl.importer(
            artifact_uri="gs://my-bucket/raw-data/",
            artifact_class=Dataset,
        ).output,
    )

    train_task = train_model(
        training_data=preprocess_task.outputs["processed_data"],
        n_estimators=n_estimators,
        max_depth=max_depth,
    )


# 컴파일 및 실행
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

6.2 Airflow ML 파이프라인

# Airflow DAG으로 ML 파이프라인 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'ml_training_pipeline',
    default_args=default_args,
    schedule_interval='@weekly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'training'],
) as dag:

    validate_data = KubernetesPodOperator(
        task_id='validate_data',
        name='data-validation',
        image='ml-pipeline:latest',
        cmds=['python', 'validate.py'],
        namespace='ml-pipelines',
    )

    extract_features = KubernetesPodOperator(
        task_id='extract_features',
        name='feature-extraction',
        image='ml-pipeline:latest',
        cmds=['python', 'extract_features.py'],
        namespace='ml-pipelines',
    )

    train_model = KubernetesPodOperator(
        task_id='train_model',
        name='model-training',
        image='ml-pipeline:latest',
        cmds=['python', 'train.py'],
        namespace='ml-pipelines',
        resources={
            'request_memory': '8Gi',
            'request_cpu': '4',
            'limit_gpu': '1',
        },
    )

    evaluate_model = PythonOperator(
        task_id='evaluate_model',
        python_callable=lambda: print("Evaluating model..."),
    )

    validate_data >> extract_features >> train_model >> evaluate_model

6.3 파이프라인 솔루션 비교

기능KubeflowAirflowVertex AISageMaker
실행 환경Kubernetes다양GCP 관리형AWS 관리형
ML 특화높음중간높음높음
UI/시각화기본우수우수우수
확장성높음높음높음높음
학습 곡선높음중간낮음낮음
비용인프라만인프라만사용량사용량

7. 실험 추적 (Experiment Tracking)

7.1 MLflow 실험 추적

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score

# MLflow 서버 설정
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")

# 실험 실행
with mlflow.start_run(run_name="gbm-experiment-42") as run:
    # 하이퍼파라미터 로깅
    params = {
        "n_estimators": 200,
        "max_depth": 8,
        "learning_rate": 0.1,
        "subsample": 0.8,
    }
    mlflow.log_params(params)

    # 모델 학습
    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)

    # 메트릭 로깅
    y_pred = model.predict(X_test)
    metrics = {
        "f1": f1_score(y_test, y_pred, average='macro'),
        "precision": precision_score(y_test, y_pred, average='macro'),
        "recall": recall_score(y_test, y_pred, average='macro'),
    }
    mlflow.log_metrics(metrics)

    # 모델 아티팩트 로깅
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="recommendation-gbm",
    )

    # 커스텀 아티팩트 (피처 중요도 차트 등)
    mlflow.log_artifact("feature_importance.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

7.2 Weights and Biases 통합

import wandb

wandb.init(
    project="recommendation-model",
    config={
        "architecture": "GBM",
        "n_estimators": 200,
        "learning_rate": 0.1,
    },
)

# 학습 루프에서 메트릭 로깅
for epoch in range(100):
    train_loss = train_one_epoch(model, train_loader)
    val_loss, val_f1 = evaluate(model, val_loader)

    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "val_loss": val_loss,
        "val_f1": val_f1,
    })

# 모델 아티팩트 저장
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)

wandb.finish()

8. 모델 레지스트리 (Model Registry)

8.1 MLflow Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# 모델 버전 생성
model_version = client.create_model_version(
    name="recommendation-gbm",
    source=f"runs:/{run_id}/model",
    run_id=run_id,
    description="GBM 모델 v3: 신규 피처 추가",
)

# 스테이지 전환: None → Staging
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False,
)

# 스테이징 검증 후 프로덕션 승격
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Production",
    archive_existing_versions=True,  # 기존 프로덕션 버전 아카이브
)

8.2 모델 버전 관리 전략

모델 라이프사이클:
NoneStagingProductionArchived

버전 관리 정책:
- Staging: 자동 성능 테스트 통과 시
- Production: 수동 승인 또는 A/B 테스트 통과 시
- Archived: 새 버전 프로덕션 승격 시 자동
- 최근 3개 버전 보관, 이전 버전 삭제
# 프로덕션 모델 로드
import mlflow.pyfunc

model = mlflow.pyfunc.load_model(
    model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)

9. 모델 서빙

9.1 BentoML

# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON

# 저장된 모델 로드
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()

svc = bentoml.Service("recommendation_service", runners=[runner])

@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
    features = np.array(input_data["features"]).reshape(1, -1)
    prediction = await runner.predict.async_run(features)
    return {
        "prediction": int(prediction[0]),
        "model_version": "v3.2.1",
    }
# bentofile.yaml
service: "service:svc"
include:
  - "*.py"
python:
  packages:
    - scikit-learn
    - numpy
docker:
  python_version: "3.11"
# 빌드 및 배포
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest

9.2 Seldon Core (Kubernetes)

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: recommendation-model
  namespace: ml-serving
spec:
  predictors:
    - name: default
      replicas: 3
      graph:
        name: classifier
        implementation: SKLEARN_SERVER
        modelUri: s3://models/recommendation/v3
        envSecretRefName: s3-credentials
      componentSpecs:
        - spec:
            containers:
              - name: classifier
                resources:
                  requests:
                    cpu: "1"
                    memory: "2Gi"
                  limits:
                    cpu: "2"
                    memory: "4Gi"
      traffic: 100
      labels:
        version: v3

9.3 TensorFlow Serving

# TFServing Docker 실행
docker run -p 8501:8501 \
  --mount type=bind,source=/models/recommendation,target=/models/recommendation \
  -e MODEL_NAME=recommendation \
  tensorflow/serving:latest
# gRPC 클라이언트
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
    tf.make_tensor_proto(input_data, shape=[1, 128])
)

response = stub.Predict(request, timeout=5.0)

9.4 vLLM (LLM 서빙)

# vLLM 서버 실행
# python -m vllm.entrypoints.openai.api_server \
#   --model meta-llama/Llama-3-8B-Instruct \
#   --tensor-parallel-size 2 \
#   --max-model-len 8192

# 클라이언트 호출
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed",
)

response = client.chat.completions.create(
    model="meta-llama/Llama-3-8B-Instruct",
    messages=[
        {"role": "user", "content": "추천 시스템의 장점은?"},
    ],
    temperature=0.7,
    max_tokens=512,
)

9.5 서빙 솔루션 비교

솔루션모델 유형배포 환경배치 추론GPU 지원자동 스케일링
BentoML범용Docker/K8s지원지원지원
Seldon Core범용K8s 전용지원지원HPA
TFServingTF 모델Docker/K8s지원지원수동
Triton멀티프레임워크Docker/K8s지원최적화지원
vLLMLLMDocker/K8s미지원필수지원

10. 모델 모니터링

10.1 데이터 드리프트 감지

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# 기준 데이터 (학습 시점)
reference_data = pd.read_parquet("training_data.parquet")
# 현재 데이터 (프로덕션)
current_data = pd.read_parquet("production_data_latest.parquet")

column_mapping = ColumnMapping(
    target='label',
    numerical_features=['age', 'income', 'purchase_count'],
    categorical_features=['category', 'region'],
)

# 드리프트 리포트 생성
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

report.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

# HTML 리포트 저장
report.save_html("drift_report.html")

# 프로그래밍적 접근
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']

if dataset_drift:
    print(f"드리프트 감지! 드리프트 피처 비율: {drift_share:.2%}")
    # 재학습 파이프라인 트리거
    trigger_retraining_pipeline()

10.2 예측 드리프트 감지

from evidently.test_suite import TestSuite
from evidently.tests import (
    TestColumnDrift,
    TestShareOfDriftedColumns,
    TestMeanInNSigmas,
)

test_suite = TestSuite(tests=[
    TestShareOfDriftedColumns(lt=0.3),  # 드리프트 피처 30% 미만
    TestColumnDrift(column_name="prediction_score"),
    TestMeanInNSigmas(column_name="prediction_score", n=2),
])

test_suite.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

# 테스트 결과 확인
test_results = test_suite.as_dict()
all_passed = all(
    t['status'] == 'SUCCESS' for t in test_results['tests']
)

if not all_passed:
    alert_team("모델 드리프트 감지 - 재학습 검토 필요")

10.3 WhyLabs 통합

import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter

# 프로파일 생성
results = why.log(current_data)
profile = results.profile()

# WhyLabs 전송
writer = WhyLabsWriter()
writer.write(profile)

# 커스텀 메트릭 모니터링
from whylogs.experimental.core.udf_schema import udf_schema

@udf_schema()
def custom_metrics(df):
    return {
        "avg_prediction_confidence": df['confidence'].mean(),
        "null_feature_ratio": df.isnull().sum().sum() / df.size,
        "prediction_distribution_skew": df['prediction'].skew(),
    }

11. A/B 테스트

11.1 트래픽 분할

# Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: recommendation-ab-test
spec:
  hosts:
    - recommendation-service
  http:
    - match:
        - headers:
            x-experiment-group:
              exact: "treatment"
      route:
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 100
    - route:
        - destination:
            host: recommendation-service
            subset: v1-champion
          weight: 80
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 20

11.2 통계적 유의성 검증

from scipy import stats
import numpy as np

def ab_test_significance(
    control_conversions, control_total,
    treatment_conversions, treatment_total,
    alpha=0.05,
):
    """A/B 테스트 통계적 유의성 검증"""
    control_rate = control_conversions / control_total
    treatment_rate = treatment_conversions / treatment_total

    # Z-test for proportions
    pooled_rate = (
        (control_conversions + treatment_conversions) /
        (control_total + treatment_total)
    )
    se = np.sqrt(
        pooled_rate * (1 - pooled_rate) *
        (1/control_total + 1/treatment_total)
    )
    z_stat = (treatment_rate - control_rate) / se
    p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

    lift = (treatment_rate - control_rate) / control_rate

    return {
        "control_rate": control_rate,
        "treatment_rate": treatment_rate,
        "lift": f"{lift:.2%}",
        "p_value": p_value,
        "significant": p_value < alpha,
        "recommendation": (
            "챌린저 모델 승격" if p_value < alpha and lift > 0
            else "기존 모델 유지"
        ),
    }

11.3 Multi-Armed Bandit

import numpy as np

class ThompsonSampling:
    """Thompson Sampling을 이용한 적응적 트래픽 분할"""

    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.successes = np.ones(n_arms)  # Beta prior alpha
        self.failures = np.ones(n_arms)   # Beta prior beta

    def select_arm(self):
        """Beta 분포에서 샘플링하여 최적 arm 선택"""
        samples = [
            np.random.beta(self.successes[i], self.failures[i])
            for i in range(self.n_arms)
        ]
        return int(np.argmax(samples))

    def update(self, arm, reward):
        """결과 업데이트"""
        if reward:
            self.successes[arm] += 1
        else:
            self.failures[arm] += 1

    def get_allocation(self):
        """현재 트래픽 할당 비율"""
        total = self.successes + self.failures
        rates = self.successes / total
        return rates / rates.sum()


# 사용 예시
bandit = ThompsonSampling(n_arms=3)  # 3개 모델 버전

for request in incoming_requests:
    arm = bandit.select_arm()
    prediction = models[arm].predict(request)
    reward = get_conversion(request, prediction)
    bandit.update(arm, reward)

12. CI/CD for ML

12.1 모델 검증 게이트

# model_validation.py
import mlflow
import json

def validate_model(
    model_uri: str,
    test_data_path: str,
    min_f1: float = 0.85,
    max_latency_ms: float = 50.0,
    max_model_size_mb: float = 500.0,
):
    """모델 배포 전 검증 게이트"""
    results = {"passed": True, "checks": []}

    # 1. 성능 검증
    model = mlflow.pyfunc.load_model(model_uri)
    test_data = pd.read_parquet(test_data_path)
    predictions = model.predict(test_data.drop('target', axis=1))
    f1 = f1_score(test_data['target'], predictions, average='macro')

    results["checks"].append({
        "name": "performance",
        "metric": "f1_score",
        "value": f1,
        "threshold": min_f1,
        "passed": f1 >= min_f1,
    })

    # 2. 레이턴시 검증
    import time
    latencies = []
    for i in range(100):
        start = time.time()
        model.predict(test_data.iloc[[i]])
        latencies.append((time.time() - start) * 1000)

    p99_latency = np.percentile(latencies, 99)
    results["checks"].append({
        "name": "latency",
        "metric": "p99_ms",
        "value": p99_latency,
        "threshold": max_latency_ms,
        "passed": p99_latency <= max_latency_ms,
    })

    # 3. 모델 크기 검증
    import os
    model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
    results["checks"].append({
        "name": "model_size",
        "metric": "size_mb",
        "value": model_size_mb,
        "threshold": max_model_size_mb,
        "passed": model_size_mb <= max_model_size_mb,
    })

    results["passed"] = all(c["passed"] for c in results["checks"])
    return results

12.2 GitHub Actions ML CI/CD

name: ML CI/CD Pipeline
on:
  push:
    branches: [main]
    paths:
      - 'models/**'
      - 'features/**'

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate Data Schema
        run: python scripts/validate_data.py

  train-and-evaluate:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train Model
        run: python train.py --config configs/production.yaml
      - name: Evaluate Model
        run: python evaluate.py --model-path outputs/model
      - name: Upload Metrics
        run: python upload_metrics.py

  model-validation:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    steps:
      - name: Run Validation Gates
        run: python model_validation.py
      - name: Performance Regression Check
        run: python check_regression.py

  deploy-staging:
    needs: model-validation
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Staging
        run: |
          kubectl apply -f k8s/staging/
          kubectl rollout status deployment/model-serving -n staging

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Shadow Deployment
        run: kubectl apply -f k8s/production/shadow.yaml
      - name: Canary Rollout
        run: |
          kubectl apply -f k8s/production/canary-10.yaml
          sleep 300
          python check_canary_metrics.py
          kubectl apply -f k8s/production/canary-50.yaml
          sleep 300
          python check_canary_metrics.py
          kubectl apply -f k8s/production/full-rollout.yaml

13. 비용 최적화

13.1 배치 vs 실시간 추론

# 배치 추론: 비용 효율적 (대량 처리)
def batch_inference(model, data_path, output_path):
    """Spark를 이용한 배치 추론"""
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("batch-inference").getOrCreate()
    df = spark.read.parquet(data_path)

    # UDF로 모델 적용
    predict_udf = spark.udf.register(
        "predict",
        lambda features: float(model.predict([features])[0]),
    )
    result = df.withColumn("prediction", predict_udf(df["features"]))
    result.write.parquet(output_path)

# 실시간 추론: 지연 시간 중요 (개별 요청)
# → 온라인 서빙 인프라 필요 (비용 높음)

13.2 예측 캐싱

import redis
import hashlib
import json

class PredictionCache:
    def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
        self.hit_count = 0
        self.miss_count = 0

    def _make_key(self, features: dict) -> str:
        feature_str = json.dumps(features, sort_keys=True)
        return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"

    def get_or_predict(self, features: dict, model) -> dict:
        key = self._make_key(features)
        cached = self.redis.get(key)

        if cached:
            self.hit_count += 1
            return json.loads(cached)

        self.miss_count += 1
        prediction = model.predict(features)
        self.redis.setex(key, self.ttl, json.dumps(prediction))
        return prediction

    @property
    def hit_rate(self):
        total = self.hit_count + self.miss_count
        return self.hit_count / total if total > 0 else 0

13.3 모델 압축

# 양자화 (Quantization)
import torch

model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8,
)

# 크기 비교
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)

print(f"원본: {original_size:.1f}MB → 양자화: {quantized_size:.1f}MB")
print(f"압축률: {(1 - quantized_size/original_size):.1%}")

13.4 비용 최적화 체크리스트

전략예상 절감트레이드오프
배치 추론 전환50-80%실시간성 포기
예측 캐싱30-60%캐시 신선도
모델 양자화40-60%미세 정확도 손실
Spot/Preemptible 인스턴스60-90%가용성 불안정
자동 스케일링20-40%콜드 스타트
모델 증류50-70%개발 비용

14. 퀴즈

Q1: Feature Store의 핵심 가치는 무엇인가?

A: Feature Store의 핵심 가치는 Training-Serving Skew 방지입니다. 학습 시점과 서빙 시점에서 동일한 피처 정의와 변환 로직을 공유하여 데이터 불일치를 근본적으로 제거합니다. 추가로 피처 재사용, 팀 협업, 피처 거버넌스도 중요한 가치입니다.

Q2: Point-in-Time Join이 필요한 이유는?

A: Point-in-Time Join은 데이터 누수(Data Leakage)를 방지합니다. 학습 데이터를 생성할 때, 각 이벤트 시점에서 실제로 사용 가능했던 피처 값만 조인해야 합니다. 미래 데이터가 포함되면 모델이 학습 시 비현실적으로 높은 성능을 보이지만, 프로덕션에서는 성능이 크게 저하됩니다.

Q3: MLOps Level 2에서 Level 3으로 넘어가기 위한 핵심 요소는?

A: Level 2(CI/CD for ML)에서 Level 3(자동 재학습)으로 넘어가려면 자동 드리프트 감지 및 재학습 트리거 시스템이 필요합니다. 데이터 드리프트, 예측 드리프트를 실시간으로 모니터링하고, 임계값 초과 시 자동으로 재학습 파이프라인을 실행하며, 챔피언/챌린저 비교를 통해 자동 승격합니다.

Q4: BentoML과 Seldon Core의 주요 차이점은?

A: BentoML은 프레임워크 독립적인 모델 패키징 도구로, Docker 컨테이너를 생성하여 어디서든 배포할 수 있습니다. Seldon Core는 Kubernetes 네이티브 서빙 플랫폼으로, CRD 기반의 배포, A/B 테스트, Canary 배포, 설명 가능성(Explainer) 등 K8s 생태계와 깊이 통합됩니다.

Q5: 모델 드리프트를 감지하는 주요 방법 3가지는?

A: (1) 데이터 드리프트: 입력 피처 분포의 변화를 KS 검정, PSI(Population Stability Index) 등으로 감지. (2) 예측 드리프트: 모델 출력 분포의 변화를 모니터링. (3) 성능 드리프트: 실제 레이블과 비교하여 정확도, F1 등의 메트릭 저하를 추적. Evidently AI와 WhyLabs가 대표적인 도구입니다.


15. 참고 자료

  1. Feast 공식 문서 - https://docs.feast.dev/
  2. MLflow 공식 문서 - https://mlflow.org/docs/latest/index.html
  3. Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
  4. Evidently AI 문서 - https://docs.evidentlyai.com/
  5. BentoML 공식 문서 - https://docs.bentoml.com/
  6. Seldon Core 문서 - https://docs.seldon.io/
  7. Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
  8. WhyLabs 문서 - https://docs.whylabs.ai/
  9. vLLM 프로젝트 - https://docs.vllm.ai/
  10. Weights and Biases - https://docs.wandb.ai/
  11. Tecton Feature Store - https://docs.tecton.ai/
  12. Hopsworks Feature Store - https://docs.hopsworks.ai/
  13. NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/

Feature Store & MLOps Pipeline Complete Guide 2025: Feast, Feature Engineering, Model Serving

Table of Contents

1. Why Feature Store?

1.1 The Training-Serving Skew Problem

The most frequent source of failures in ML model deployment is feature inconsistency between training and serving time.

When data scientists create features with Pandas in Jupyter Notebooks, and engineers re-implement the same logic in Java or Go, subtle differences creep in.

# Training time: Feature creation with Pandas
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
    lambda x: x.rolling('7D').mean()
)

# Serving time: Re-implemented in SQL or another language -> subtle differences
# - Boundary conditions (inclusive/exclusive) differences
# - Timezone handling differences
# - NULL handling differences

Feature Store solves this problem fundamentally by sharing a single feature definition between both training and serving.

1.2 Feature Reuse and Team Collaboration

In large ML organizations, dozens of teams independently create similar features.

ProblemBefore Feature StoreAfter Feature Store
Feature duplicationEach team recreates similar featuresSearch/reuse from central registry
ConsistencyDifferent computation logic per teamSingle definition, version controlled
FreshnessManual updatesAutomated materialization
DiscoverabilityDepends on Slack/WikiFeature catalog + metadata
GovernanceNoneOwner, lineage, access control

1.3 Reducing Data Pipeline Complexity

Without a Feature Store, each model maintains its own data pipeline:

Model A: raw data -> ETL A -> features A -> training A
Model B: raw data -> ETL B -> features B -> training B
Model C: raw data -> ETL C -> features C -> training C

After Feature Store adoption:

raw data -> Feature Store (centralized) -> Models A, B, C share features

2. Feature Store Architecture

2.1 Core Components

A Feature Store consists of four core components.

Offline Store

  • Stores large volumes of historical feature data
  • Used for training data generation
  • BigQuery, Snowflake, Redshift, Parquet files
  • Supports Point-in-Time Joins

Online Store

  • Low-latency lookup of latest feature values
  • Used for real-time inference
  • Redis, DynamoDB, Bigtable
  • Target P99 latency under 10ms

Feature Registry

  • Manages metadata for all features
  • Name, type, owner, description, tags
  • Data lineage tracking

Transformation Engine

  • Creates features from raw data
  • Supports batch/streaming transformations
  • Spark, Flink, dbt integration

2.2 Data Flow

[Data Sources] -> [Transform Engine] -> [Offline Store] <-> [Online Store]
      ^                                      |                    |
  Raw Data                            Training Pipeline     Inference Service
                                           |
                                    [Feature Registry]
                                    (Metadata Management)

2.3 Feature Store Solution Comparison

SolutionTypeOfflineOnlineStreamingCost
FeastOSSRedshift/BQ/FileRedis/DynamoDBPush-basedFree (infra only)
TectonManagedSpark + DeltaDynamoDBSpark StreamingSubscription
HopsworksOSS/ManagedHudiRonDBFlinkCommunity free
Vertex AI FSGCP ManagedBigQueryBigtableDataflowUsage-based
SageMaker FSAWS ManagedS3 + GlueBuilt-in OnlineKinesisUsage-based

3. Feast Deep Dive

3.1 Installation and Initial Setup

# Install Feast
pip install feast

# Initialize project
feast init my_feature_store
cd my_feature_store

# Project structure
# my_feature_store/
#   feature_store.yaml    # Project config
#   features.py           # Feature definitions
#   data/                 # Sample data

feature_store.yaml configuration:

project: my_ml_project
registry: data/registry.db
provider: local  # local, gcp, aws
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: file  # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2

3.2 Feature Definitions

# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

# Entity definition
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="Unique user identifier",
)

# Data source definition
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# Feature view definition
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="days_since_last_login", dtype=Int64),
        Field(name="preferred_category", dtype=String),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "recommendation", "version": "v2"},
)

3.3 Materialization (Offline to Online)

# Apply feature definitions
feast apply

# Materialize feature values from offline to online store
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59

# Incremental materialization (since last run)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

3.4 Training Data Generation (Point-in-Time Join)

Point-in-Time Join is the core capability of a Feature Store. It prevents data leakage while accurately retrieving feature values at specific historical timestamps.

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# Entity dataframe with timestamps
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1001],
    "event_timestamp": pd.to_datetime([
        "2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
    ]),
})

# Get features with Point-in-Time Join
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
).to_df()

print(training_df)
# Each row has feature values as of that point in time
# user 1001's features at 2024-09-01 != features at 2024-10-01

3.5 Online Serving

# Online feature retrieval for real-time inference
feature_vector = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ],
).to_dict()

# Result: Returns latest feature values (P99 latency ~5-10ms)

3.6 GCP/AWS Production Configuration

# GCP production setup
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
  type: datastore  # or bigtable
  project_id: my-gcp-project
offline_store:
  type: bigquery
  project_id: my-gcp-project
  dataset: feast_features
# AWS production setup
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
  type: dynamodb
  region: us-east-1
offline_store:
  type: redshift
  cluster_id: my-redshift-cluster
  region: us-east-1
  database: ml_features
  user: feast_user
  s3_staging_location: s3://my-bucket/feast-staging/

4. Feature Engineering Patterns

4.1 Temporal Features

import pandas as pd

def create_temporal_features(df, timestamp_col, entity_col, value_col):
    """Create time-window-based aggregation features"""
    df = df.sort_values(timestamp_col)

    features = pd.DataFrame()
    features[entity_col] = df[entity_col]
    features[timestamp_col] = df[timestamp_col]

    # Moving averages across different windows
    for window in ['1D', '7D', '30D']:
        features[f'{value_col}_mean_{window}'] = (
            df.groupby(entity_col)[value_col]
            .transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
        )

    # Trend feature: 7-day avg / 30-day avg
    features[f'{value_col}_trend_7d_30d'] = (
        features[f'{value_col}_mean_7D'] /
        features[f'{value_col}_mean_30D'].replace(0, float('nan'))
    )

    # Day of week / hour features
    features['day_of_week'] = df[timestamp_col].dt.dayofweek
    features['hour_of_day'] = df[timestamp_col].dt.hour
    features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)

    return features

4.2 Aggregation Features

def create_aggregation_features(events_df, entity_col, group_col):
    """Per-entity aggregation features"""
    agg_features = events_df.groupby(entity_col).agg(
        event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
        unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
        total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
        avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
        max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
        std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
    ).reset_index()

    # Ratio features
    agg_features['high_value_ratio'] = (
        events_df[events_df['amount'] > 100]
        .groupby(entity_col)
        .size()
        .reindex(agg_features[entity_col], fill_value=0)
        .values / agg_features['event_count']
    )

    return agg_features

4.3 Embedding Features

from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer('all-MiniLM-L6-v2')

def create_text_embedding_features(df, text_col, prefix='emb'):
    """Generate text embedding features"""
    embeddings = model.encode(df[text_col].fillna('').tolist())
    emb_df = pd.DataFrame(
        embeddings,
        columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
        index=df.index,
    )
    return pd.concat([df, emb_df], axis=1)

4.4 Cross Features

def create_cross_features(df):
    """Cross / interaction features between existing features"""
    # Ratio feature
    df['purchase_to_visit_ratio'] = (
        df['purchase_count'] / df['visit_count'].replace(0, 1)
    )

    # Bucketing + cross
    df['age_bucket'] = pd.cut(
        df['age'], bins=[0, 25, 35, 50, 100],
        labels=['young', 'middle', 'senior', 'elder']
    )
    df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']

    # Numeric interaction
    df['income_x_age'] = df['income'] * df['age']

    return df

5. MLOps Maturity Levels

Google defines the MLOps maturity model in four levels.

Level 0: Manual Process

Data scientist manually:
1. Collects and preprocesses data
2. Trains model in Jupyter
3. Hands model file to engineer
4. Engineer writes serving code
5. Manual deployment

Problems:
- Deployment cycle: months
- No reproducibility
- No monitoring

Level 1: ML Pipeline Automation

Automated ML Pipeline:
1. Data validation -> Feature engineering -> Training -> Evaluation -> Deployment
2. Pipeline orchestrator (Kubeflow, Airflow)
3. CT (Continuous Training): trigger-based auto-retraining

Improvements:
- Deployment cycle: weekly
- Reproducible pipeline
- Basic monitoring

Level 2: CI/CD for ML

CI/CD Pipeline Integration:
1. Code change -> Auto test -> Pipeline build -> Model training
2. Model validation gates (performance thresholds)
3. Shadow Deployment -> Canary -> Full Rollout
4. Automated A/B testing

Improvements:
- Deployment cycle: daily
- Automatic rollback
- Systematic experiment management

Level 3: Auto-Retraining + Full Automation

Full Automation:
1. Drift detection -> Auto-retraining trigger
2. Auto feature selection / hyperparameter optimization
3. Automatic model comparison + champion/challenger
4. Auto-scaling + cost optimization

Improvements:
- Deployment cycle: hourly
- Unattended operation
- Proactive monitoring

6. ML Pipeline Orchestration

6.1 Kubeflow Pipelines

# Define pipeline with Kubeflow Pipelines DSL
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
    raw_data: Input[Dataset],
    processed_data: Output[Dataset],
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(raw_data.path)
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df.select_dtypes(include='number')),
        columns=df.select_dtypes(include='number').columns,
    )
    df_scaled.to_parquet(processed_data.path)


@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
    training_data: Input[Dataset],
    model_artifact: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 10,
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    import joblib

    df = pd.read_parquet(training_data.path)
    X = df.drop('target', axis=1)
    y = df['target']

    clf = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
    )
    scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')

    clf.fit(X, y)
    joblib.dump(clf, model_artifact.path)

    metrics.log_metric("f1_mean", float(scores.mean()))
    metrics.log_metric("f1_std", float(scores.std()))


@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
    preprocess_task = preprocess_data(
        raw_data=dsl.importer(
            artifact_uri="gs://my-bucket/raw-data/",
            artifact_class=Dataset,
        ).output,
    )

    train_task = train_model(
        training_data=preprocess_task.outputs["processed_data"],
        n_estimators=n_estimators,
        max_depth=max_depth,
    )


# Compile and run
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

6.2 Airflow ML Pipeline

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'ml_training_pipeline',
    default_args=default_args,
    schedule_interval='@weekly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'training'],
) as dag:

    validate_data = KubernetesPodOperator(
        task_id='validate_data',
        name='data-validation',
        image='ml-pipeline:latest',
        cmds=['python', 'validate.py'],
        namespace='ml-pipelines',
    )

    extract_features = KubernetesPodOperator(
        task_id='extract_features',
        name='feature-extraction',
        image='ml-pipeline:latest',
        cmds=['python', 'extract_features.py'],
        namespace='ml-pipelines',
    )

    train_model = KubernetesPodOperator(
        task_id='train_model',
        name='model-training',
        image='ml-pipeline:latest',
        cmds=['python', 'train.py'],
        namespace='ml-pipelines',
    )

    evaluate_model = PythonOperator(
        task_id='evaluate_model',
        python_callable=lambda: print("Evaluating model..."),
    )

    validate_data >> extract_features >> train_model >> evaluate_model

6.3 Pipeline Solution Comparison

FeatureKubeflowAirflowVertex AISageMaker
RuntimeKubernetesVariousGCP ManagedAWS Managed
ML-specificHighMediumHighHigh
UI/VisualizationBasicExcellentExcellentExcellent
ScalabilityHighHighHighHigh
Learning curveSteepModerateLowLow
CostInfra onlyInfra onlyUsage-basedUsage-based

7. Experiment Tracking

7.1 MLflow Experiment Tracking

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, precision_score, recall_score

# MLflow server configuration
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")

# Experiment run
with mlflow.start_run(run_name="gbm-experiment-42") as run:
    # Log hyperparameters
    params = {
        "n_estimators": 200,
        "max_depth": 8,
        "learning_rate": 0.1,
        "subsample": 0.8,
    }
    mlflow.log_params(params)

    # Train model
    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)

    # Log metrics
    y_pred = model.predict(X_test)
    metrics = {
        "f1": f1_score(y_test, y_pred, average='macro'),
        "precision": precision_score(y_test, y_pred, average='macro'),
        "recall": recall_score(y_test, y_pred, average='macro'),
    }
    mlflow.log_metrics(metrics)

    # Log model artifact
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="recommendation-gbm",
    )

    mlflow.log_artifact("feature_importance.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

7.2 Weights and Biases Integration

import wandb

wandb.init(
    project="recommendation-model",
    config={
        "architecture": "GBM",
        "n_estimators": 200,
        "learning_rate": 0.1,
    },
)

# Log metrics during training loop
for epoch in range(100):
    train_loss = train_one_epoch(model, train_loader)
    val_loss, val_f1 = evaluate(model, val_loader)

    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "val_loss": val_loss,
        "val_f1": val_f1,
    })

# Save model artifact
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)

wandb.finish()

8. Model Registry

8.1 MLflow Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Create model version
model_version = client.create_model_version(
    name="recommendation-gbm",
    source=f"runs:/{run_id}/model",
    run_id=run_id,
    description="GBM model v3: added new features",
)

# Stage transition: None -> Staging
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False,
)

# Promote to Production after staging validation
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Production",
    archive_existing_versions=True,  # Archive existing production version
)

8.2 Model Versioning Strategy

Model Lifecycle:
None -> Staging -> Production -> Archived

Versioning Policy:
- Staging: Upon passing automated performance tests
- Production: Manual approval or A/B test pass
- Archived: Automatically when new version is promoted
- Keep 3 most recent versions, delete older ones
# Load production model
import mlflow.pyfunc

model = mlflow.pyfunc.load_model(
    model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)

9. Model Serving

9.1 BentoML

# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON

# Load saved model
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()

svc = bentoml.Service("recommendation_service", runners=[runner])

@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
    features = np.array(input_data["features"]).reshape(1, -1)
    prediction = await runner.predict.async_run(features)
    return {
        "prediction": int(prediction[0]),
        "model_version": "v3.2.1",
    }
# bentofile.yaml
service: "service:svc"
include:
  - "*.py"
python:
  packages:
    - scikit-learn
    - numpy
docker:
  python_version: "3.11"
# Build and deploy
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest

9.2 Seldon Core (Kubernetes)

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: recommendation-model
  namespace: ml-serving
spec:
  predictors:
    - name: default
      replicas: 3
      graph:
        name: classifier
        implementation: SKLEARN_SERVER
        modelUri: s3://models/recommendation/v3
        envSecretRefName: s3-credentials
      componentSpecs:
        - spec:
            containers:
              - name: classifier
                resources:
                  requests:
                    cpu: "1"
                    memory: "2Gi"
                  limits:
                    cpu: "2"
                    memory: "4Gi"
      traffic: 100
      labels:
        version: v3

9.3 TensorFlow Serving

# Run TFServing Docker
docker run -p 8501:8501 \
  --mount type=bind,source=/models/recommendation,target=/models/recommendation \
  -e MODEL_NAME=recommendation \
  tensorflow/serving:latest
# gRPC client
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
    tf.make_tensor_proto(input_data, shape=[1, 128])
)

response = stub.Predict(request, timeout=5.0)

9.4 vLLM (LLM Serving)

# Start vLLM server
# python -m vllm.entrypoints.openai.api_server \
#   --model meta-llama/Llama-3-8B-Instruct \
#   --tensor-parallel-size 2 \
#   --max-model-len 8192

# Client call
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed",
)

response = client.chat.completions.create(
    model="meta-llama/Llama-3-8B-Instruct",
    messages=[
        {"role": "user", "content": "What are the benefits of recommendation systems?"},
    ],
    temperature=0.7,
    max_tokens=512,
)

9.5 Serving Solution Comparison

SolutionModel TypesDeployment EnvBatch InferenceGPU SupportAuto-scaling
BentoMLUniversalDocker/K8sSupportedSupportedSupported
Seldon CoreUniversalK8s onlySupportedSupportedHPA
TFServingTF modelsDocker/K8sSupportedSupportedManual
TritonMulti-frameworkDocker/K8sSupportedOptimizedSupported
vLLMLLMDocker/K8sNot supportedRequiredSupported

10. Model Monitoring

10.1 Data Drift Detection

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# Reference data (from training time)
reference_data = pd.read_parquet("training_data.parquet")
# Current data (production)
current_data = pd.read_parquet("production_data_latest.parquet")

column_mapping = ColumnMapping(
    target='label',
    numerical_features=['age', 'income', 'purchase_count'],
    categorical_features=['category', 'region'],
)

# Generate drift report
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

report.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

# Save HTML report
report.save_html("drift_report.html")

# Programmatic access
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']

if dataset_drift:
    print(f"Drift detected! Drifted feature share: {drift_share:.2%}")
    trigger_retraining_pipeline()

10.2 Prediction Drift Detection

from evidently.test_suite import TestSuite
from evidently.tests import (
    TestColumnDrift,
    TestShareOfDriftedColumns,
    TestMeanInNSigmas,
)

test_suite = TestSuite(tests=[
    TestShareOfDriftedColumns(lt=0.3),  # Less than 30% drifted features
    TestColumnDrift(column_name="prediction_score"),
    TestMeanInNSigmas(column_name="prediction_score", n=2),
])

test_suite.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

test_results = test_suite.as_dict()
all_passed = all(
    t['status'] == 'SUCCESS' for t in test_results['tests']
)

if not all_passed:
    alert_team("Model drift detected - retraining review required")

10.3 WhyLabs Integration

import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter

# Create profile
results = why.log(current_data)
profile = results.profile()

# Send to WhyLabs
writer = WhyLabsWriter()
writer.write(profile)

11. A/B Testing for ML Models

11.1 Traffic Splitting

# Traffic splitting with Istio VirtualService
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: recommendation-ab-test
spec:
  hosts:
    - recommendation-service
  http:
    - match:
        - headers:
            x-experiment-group:
              exact: "treatment"
      route:
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 100
    - route:
        - destination:
            host: recommendation-service
            subset: v1-champion
          weight: 80
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 20

11.2 Statistical Significance Testing

from scipy import stats
import numpy as np

def ab_test_significance(
    control_conversions, control_total,
    treatment_conversions, treatment_total,
    alpha=0.05,
):
    """A/B test statistical significance verification"""
    control_rate = control_conversions / control_total
    treatment_rate = treatment_conversions / treatment_total

    # Z-test for proportions
    pooled_rate = (
        (control_conversions + treatment_conversions) /
        (control_total + treatment_total)
    )
    se = np.sqrt(
        pooled_rate * (1 - pooled_rate) *
        (1/control_total + 1/treatment_total)
    )
    z_stat = (treatment_rate - control_rate) / se
    p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

    lift = (treatment_rate - control_rate) / control_rate

    return {
        "control_rate": control_rate,
        "treatment_rate": treatment_rate,
        "lift": f"{lift:.2%}",
        "p_value": p_value,
        "significant": p_value < alpha,
        "recommendation": (
            "Promote challenger model" if p_value < alpha and lift > 0
            else "Keep current model"
        ),
    }

11.3 Multi-Armed Bandit

import numpy as np

class ThompsonSampling:
    """Adaptive traffic splitting using Thompson Sampling"""

    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.successes = np.ones(n_arms)  # Beta prior alpha
        self.failures = np.ones(n_arms)   # Beta prior beta

    def select_arm(self):
        """Sample from Beta distribution to select optimal arm"""
        samples = [
            np.random.beta(self.successes[i], self.failures[i])
            for i in range(self.n_arms)
        ]
        return int(np.argmax(samples))

    def update(self, arm, reward):
        """Update results"""
        if reward:
            self.successes[arm] += 1
        else:
            self.failures[arm] += 1

    def get_allocation(self):
        """Current traffic allocation ratio"""
        total = self.successes + self.failures
        rates = self.successes / total
        return rates / rates.sum()


# Usage example
bandit = ThompsonSampling(n_arms=3)  # 3 model versions

for request in incoming_requests:
    arm = bandit.select_arm()
    prediction = models[arm].predict(request)
    reward = get_conversion(request, prediction)
    bandit.update(arm, reward)

12. CI/CD for ML

12.1 Model Validation Gates

# model_validation.py
import mlflow
import json

def validate_model(
    model_uri: str,
    test_data_path: str,
    min_f1: float = 0.85,
    max_latency_ms: float = 50.0,
    max_model_size_mb: float = 500.0,
):
    """Model deployment validation gates"""
    results = {"passed": True, "checks": []}

    # 1. Performance validation
    model = mlflow.pyfunc.load_model(model_uri)
    test_data = pd.read_parquet(test_data_path)
    predictions = model.predict(test_data.drop('target', axis=1))
    f1 = f1_score(test_data['target'], predictions, average='macro')

    results["checks"].append({
        "name": "performance",
        "metric": "f1_score",
        "value": f1,
        "threshold": min_f1,
        "passed": f1 >= min_f1,
    })

    # 2. Latency validation
    import time
    latencies = []
    for i in range(100):
        start = time.time()
        model.predict(test_data.iloc[[i]])
        latencies.append((time.time() - start) * 1000)

    p99_latency = np.percentile(latencies, 99)
    results["checks"].append({
        "name": "latency",
        "metric": "p99_ms",
        "value": p99_latency,
        "threshold": max_latency_ms,
        "passed": p99_latency <= max_latency_ms,
    })

    # 3. Model size validation
    import os
    model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
    results["checks"].append({
        "name": "model_size",
        "metric": "size_mb",
        "value": model_size_mb,
        "threshold": max_model_size_mb,
        "passed": model_size_mb <= max_model_size_mb,
    })

    results["passed"] = all(c["passed"] for c in results["checks"])
    return results

12.2 GitHub Actions ML CI/CD

name: ML CI/CD Pipeline
on:
  push:
    branches: [main]
    paths:
      - 'models/**'
      - 'features/**'

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate Data Schema
        run: python scripts/validate_data.py

  train-and-evaluate:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train Model
        run: python train.py --config configs/production.yaml
      - name: Evaluate Model
        run: python evaluate.py --model-path outputs/model

  model-validation:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    steps:
      - name: Run Validation Gates
        run: python model_validation.py

  deploy-staging:
    needs: model-validation
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Staging
        run: |
          kubectl apply -f k8s/staging/
          kubectl rollout status deployment/model-serving -n staging

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Canary Rollout
        run: |
          kubectl apply -f k8s/production/canary-10.yaml
          sleep 300
          python check_canary_metrics.py
          kubectl apply -f k8s/production/full-rollout.yaml

13. Cost Optimization

13.1 Batch vs Real-Time Inference

# Batch inference: cost-efficient (bulk processing)
def batch_inference(model, data_path, output_path):
    """Batch inference using Spark"""
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("batch-inference").getOrCreate()
    df = spark.read.parquet(data_path)

    predict_udf = spark.udf.register(
        "predict",
        lambda features: float(model.predict([features])[0]),
    )
    result = df.withColumn("prediction", predict_udf(df["features"]))
    result.write.parquet(output_path)

13.2 Prediction Caching

import redis
import hashlib
import json

class PredictionCache:
    def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
        self.hit_count = 0
        self.miss_count = 0

    def _make_key(self, features: dict) -> str:
        feature_str = json.dumps(features, sort_keys=True)
        return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"

    def get_or_predict(self, features: dict, model) -> dict:
        key = self._make_key(features)
        cached = self.redis.get(key)

        if cached:
            self.hit_count += 1
            return json.loads(cached)

        self.miss_count += 1
        prediction = model.predict(features)
        self.redis.setex(key, self.ttl, json.dumps(prediction))
        return prediction

    @property
    def hit_rate(self):
        total = self.hit_count + self.miss_count
        return self.hit_count / total if total > 0 else 0

13.3 Model Compression

# Quantization
import torch

model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8,
)

# Size comparison
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)

print(f"Original: {original_size:.1f}MB -> Quantized: {quantized_size:.1f}MB")
print(f"Compression ratio: {(1 - quantized_size/original_size):.1%}")

13.4 Cost Optimization Checklist

StrategyExpected SavingsTrade-off
Switch to batch inference50-80%Lose real-time capability
Prediction caching30-60%Cache freshness
Model quantization40-60%Slight accuracy loss
Spot/Preemptible instances60-90%Availability risk
Auto-scaling20-40%Cold start
Model distillation50-70%Development cost

14. Quiz

Q1: What is the core value of a Feature Store?

A: The core value is preventing Training-Serving Skew. By sharing a single feature definition and transformation logic between training and serving, it fundamentally eliminates data inconsistencies. Additional values include feature reuse, team collaboration, and feature governance.

Q2: Why is Point-in-Time Join necessary?

A: Point-in-Time Join prevents Data Leakage. When generating training data, you must join only the feature values that were actually available at the time of each event. If future data is included, the model shows unrealistically high performance during training but degrades significantly in production.

Q3: What is the key element for moving from MLOps Level 2 to Level 3?

A: Moving from Level 2 (CI/CD for ML) to Level 3 (auto-retraining) requires an automatic drift detection and retraining trigger system. This monitors data drift and prediction drift in real-time, automatically triggers the retraining pipeline when thresholds are exceeded, and auto-promotes through champion/challenger comparison.

Q4: What is the main difference between BentoML and Seldon Core?

A: BentoML is a framework-agnostic model packaging tool that creates Docker containers deployable anywhere. Seldon Core is a Kubernetes-native serving platform that deeply integrates with the K8s ecosystem through CRD-based deployment, A/B testing, canary deployment, and explainability (Explainer).

Q5: What are 3 main methods for detecting model drift?

A: (1) Data Drift: Detect changes in input feature distributions using KS test, PSI (Population Stability Index), etc. (2) Prediction Drift: Monitor changes in model output distributions. (3) Performance Drift: Track degradation in accuracy, F1, etc. by comparing with actual labels. Evidently AI and WhyLabs are representative tools.


15. References

  1. Feast Official Documentation - https://docs.feast.dev/
  2. MLflow Official Documentation - https://mlflow.org/docs/latest/index.html
  3. Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
  4. Evidently AI Documentation - https://docs.evidentlyai.com/
  5. BentoML Official Documentation - https://docs.bentoml.com/
  6. Seldon Core Documentation - https://docs.seldon.io/
  7. Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
  8. WhyLabs Documentation - https://docs.whylabs.ai/
  9. vLLM Project - https://docs.vllm.ai/
  10. Weights and Biases - https://docs.wandb.ai/
  11. Tecton Feature Store - https://docs.tecton.ai/
  12. Hopsworks Feature Store - https://docs.hopsworks.ai/
  13. NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/