Split View: Feature Store & MLOps 파이프라인 완전 가이드 2025: Feast, Feature Engineering, 모델 서빙
Feature Store & MLOps 파이프라인 완전 가이드 2025: Feast, Feature Engineering, 모델 서빙
목차
1. 왜 Feature Store인가?
1.1 Training-Serving Skew 문제
ML 모델 개발에서 가장 빈번한 장애 원인은 학습 시점과 서빙 시점의 피처 불일치입니다.
데이터 과학자가 Jupyter Notebook에서 Pandas로 피처를 생성하고, 엔지니어가 Java/Go로 동일한 로직을 재구현할 때 미묘한 차이가 발생합니다.
# 학습 시: Pandas로 피처 생성
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling('7D').mean()
)
# 서빙 시: SQL 또는 다른 언어로 재구현 → 미묘한 차이 발생
# - 경계 조건(inclusive/exclusive) 차이
# - 타임존 처리 차이
# - NULL 처리 방식 차이
Feature Store는 단일 피처 정의를 학습과 서빙 양쪽에서 공유하여 이 문제를 근본적으로 해결합니다.
1.2 피처 재사용과 팀 협업
대규모 ML 조직에서는 수십 개 팀이 유사한 피처를 독립적으로 생성합니다.
| 문제 | Feature Store 도입 전 | Feature Store 도입 후 |
|---|---|---|
| 피처 중복 | 팀마다 유사 피처 재생성 | 중앙 레지스트리에서 검색/재사용 |
| 일관성 | 팀마다 다른 계산 로직 | 단일 정의, 버전 관리 |
| 신선도 | 수동 업데이트 | 자동 materialization |
| 발견 가능성 | Slack/Wiki 의존 | 피처 카탈로그 + 메타데이터 |
| 거버넌스 | 없음 | 소유자, 라인리지, 접근 제어 |
1.3 데이터 파이프라인 복잡성 감소
Feature Store 없이는 각 모델이 자체 데이터 파이프라인을 유지합니다.
모델 A: raw data → ETL A → features A → training A
모델 B: raw data → ETL B → features B → training B
모델 C: raw data → ETL C → features C → training C
Feature Store 도입 후:
raw data → Feature Store (중앙 집중) → 모델 A, B, C 공유
2. Feature Store 아키텍처
2.1 핵심 구성 요소
Feature Store는 4가지 핵심 구성 요소로 이루어집니다.
오프라인 스토어 (Offline Store)
- 대량의 히스토리컬 피처 데이터 저장
- 학습 데이터 생성에 사용
- BigQuery, Snowflake, Redshift, Parquet 파일
- Point-in-Time Join 지원
온라인 스토어 (Online Store)
- 최신 피처 값의 저속 지연 조회
- 실시간 추론에 사용
- Redis, DynamoDB, Bigtable
- P99 레이턴시 10ms 이하 목표
피처 레지스트리 (Feature Registry)
- 모든 피처의 메타데이터 관리
- 이름, 타입, 소유자, 설명, 태그
- 데이터 라인리지 추적
변환 엔진 (Transformation Engine)
- 원시 데이터에서 피처 생성
- 배치/스트리밍 변환 지원
- Spark, Flink, dbt 통합
2.2 데이터 흐름
[데이터 소스] → [변환 엔진] → [오프라인 스토어] ←→ [온라인 스토어]
↑ ↓ ↓
원시 데이터 학습 파이프라인 추론 서비스
↓
[피처 레지스트리]
(메타데이터 관리)
2.3 주요 Feature Store 솔루션 비교
| 솔루션 | 유형 | 오프라인 | 온라인 | 스트리밍 | 비용 |
|---|---|---|---|---|---|
| Feast | OSS | Redshift/BQ/File | Redis/DynamoDB | Push 기반 | 무료(인프라 비용만) |
| Tecton | 관리형 | Spark + Delta | DynamoDB | Spark Streaming | 구독형 |
| Hopsworks | OSS/관리형 | Hudi | RonDB | Flink | 커뮤니티 무료 |
| Vertex AI FS | GCP 관리형 | BigQuery | Bigtable | Dataflow | 사용량 기반 |
| SageMaker FS | AWS 관리형 | S3 + Glue | 내장 온라인 | Kinesis | 사용량 기반 |
3. Feast Deep Dive
3.1 Feast 설치 및 초기 설정
# Feast 설치
pip install feast
# 프로젝트 초기화
feast init my_feature_store
cd my_feature_store
# 프로젝트 구조
# my_feature_store/
# feature_store.yaml # 프로젝트 설정
# features.py # 피처 정의
# data/ # 샘플 데이터
feature_store.yaml 설정:
project: my_ml_project
registry: data/registry.db
provider: local # local, gcp, aws
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2
3.2 피처 정의
# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
# 엔티티 정의
user = Entity(
name="user_id",
join_keys=["user_id"],
description="사용자 고유 ID",
)
# 데이터 소스 정의
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# 피처 뷰 정의
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="days_since_last_login", dtype=Int64),
Field(name="preferred_category", dtype=String),
],
online=True,
source=user_stats_source,
tags={"team": "recommendation", "version": "v2"},
)
3.3 Materialization (오프라인 to 온라인)
# 피처 정의 적용
feast apply
# 오프라인 → 온라인 스토어로 피처 값 물리화
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59
# 증분 물리화 (마지막 이후만)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
3.4 학습 데이터 생성 (Point-in-Time Join)
Point-in-Time Join은 Feature Store의 핵심 기능입니다. 데이터 누수(Data Leakage)를 방지하면서 과거 특정 시점의 피처 값을 정확히 가져옵니다.
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# 학습 데이터의 엔티티 + 타임스탬프
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1001],
"event_timestamp": pd.to_datetime([
"2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
]),
})
# Point-in-Time Join으로 피처 가져오기
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
).to_df()
print(training_df)
# 각 행은 해당 시점 기준의 피처 값을 가짐
# → 2024-09-01 시점의 user 1001 피처 != 2024-10-01 시점의 user 1001 피처
3.5 온라인 서빙
# 실시간 추론을 위한 온라인 피처 조회
feature_vector = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
],
).to_dict()
# 결과: 가장 최신 피처 값 반환 (P99 레이턴시 약 5-10ms)
3.6 GCP/AWS 프로덕션 설정
# GCP 프로덕션 설정
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
type: datastore # 또는 bigtable
project_id: my-gcp-project
offline_store:
type: bigquery
project_id: my-gcp-project
dataset: feast_features
# AWS 프로덕션 설정
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
type: dynamodb
region: ap-northeast-2
offline_store:
type: redshift
cluster_id: my-redshift-cluster
region: ap-northeast-2
database: ml_features
user: feast_user
s3_staging_location: s3://my-bucket/feast-staging/
4. Feature Engineering 패턴
4.1 시간 기반 피처 (Temporal Features)
import pandas as pd
def create_temporal_features(df, timestamp_col, entity_col, value_col):
"""시간 윈도우 기반 집계 피처 생성"""
df = df.sort_values(timestamp_col)
features = pd.DataFrame()
features[entity_col] = df[entity_col]
features[timestamp_col] = df[timestamp_col]
# 다양한 윈도우의 이동 평균
for window in ['1D', '7D', '30D']:
features[f'{value_col}_mean_{window}'] = (
df.groupby(entity_col)[value_col]
.transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
)
# 추세 피처: 7일 평균 / 30일 평균
features[f'{value_col}_trend_7d_30d'] = (
features[f'{value_col}_mean_7D'] /
features[f'{value_col}_mean_30D'].replace(0, float('nan'))
)
# 요일/시간 피처
features['day_of_week'] = df[timestamp_col].dt.dayofweek
features['hour_of_day'] = df[timestamp_col].dt.hour
features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
return features
4.2 집계 피처 (Aggregation Features)
def create_aggregation_features(events_df, entity_col, group_col):
"""엔티티별 집계 피처"""
agg_features = events_df.groupby(entity_col).agg(
event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
).reset_index()
# 비율 피처
agg_features['high_value_ratio'] = (
events_df[events_df['amount'] > 100]
.groupby(entity_col)
.size()
.reindex(agg_features[entity_col], fill_value=0)
.values / agg_features['event_count']
)
return agg_features
4.3 임베딩 피처
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
def create_text_embedding_features(df, text_col, prefix='emb'):
"""텍스트 임베딩 피처 생성"""
embeddings = model.encode(df[text_col].fillna('').tolist())
emb_df = pd.DataFrame(
embeddings,
columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
index=df.index,
)
return pd.concat([df, emb_df], axis=1)
4.4 교차 피처 (Cross Features)
def create_cross_features(df):
"""피처 간 교차/상호작용 피처"""
# 비율 피처
df['purchase_to_visit_ratio'] = (
df['purchase_count'] / df['visit_count'].replace(0, 1)
)
# 구간화 + 교차
df['age_bucket'] = pd.cut(
df['age'], bins=[0, 25, 35, 50, 100],
labels=['young', 'middle', 'senior', 'elder']
)
df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']
# 수치 상호작용
df['income_x_age'] = df['income'] * df['age']
return df
5. MLOps 성숙도 레벨
Google에서 정의한 MLOps 성숙도 모델은 4단계로 나뉩니다.
Level 0: 수동 프로세스
데이터 과학자가 수동으로:
1. 데이터 수집 및 전처리
2. Jupyter에서 모델 학습
3. 모델 파일을 엔지니어에게 전달
4. 엔지니어가 서빙 코드 작성
5. 수동 배포
문제점:
- 배포 주기: 수개월
- 재현성 없음
- 모니터링 없음
Level 1: ML 파이프라인 자동화
자동화된 ML 파이프라인:
1. 데이터 검증 → 피처 엔지니어링 → 학습 → 평가 → 배포
2. 파이프라인 오케스트레이터 사용 (Kubeflow, Airflow)
3. CT (Continuous Training): 트리거 기반 자동 재학습
개선점:
- 배포 주기: 주 단위
- 파이프라인 재현 가능
- 기본 모니터링
Level 2: CI/CD for ML
CI/CD 파이프라인 통합:
1. 코드 변경 → 자동 테스트 → 파이프라인 빌드 → 모델 학습
2. 모델 검증 게이트 (성능 임계값)
3. Shadow Deployment → Canary → Full Rollout
4. A/B 테스트 자동화
개선점:
- 배포 주기: 일 단위
- 자동 롤백
- 체계적 실험 관리
Level 3: 자동 재학습 + 완전 자동화
완전 자동화:
1. 드리프트 감지 → 자동 재학습 트리거
2. 자동 피처 선택/하이퍼파라미터 최적화
3. 모델 성능 자동 비교 + 챔피언/챌린저
4. 자동 스케일링 + 비용 최적화
개선점:
- 배포 주기: 시간 단위
- 무인 운영
- 프로액티브 모니터링
6. ML 파이프라인 오케스트레이션
6.1 Kubeflow Pipelines
# Kubeflow Pipelines DSL로 파이프라인 정의
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11",
packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
raw_data: Input[Dataset],
processed_data: Output[Dataset],
):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(raw_data.path)
scaler = StandardScaler()
df_scaled = pd.DataFrame(
scaler.fit_transform(df.select_dtypes(include='number')),
columns=df.select_dtypes(include='number').columns,
)
df_scaled.to_parquet(processed_data.path)
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
training_data: Input[Dataset],
model_artifact: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 10,
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import joblib
df = pd.read_parquet(training_data.path)
X = df.drop('target', axis=1)
y = df['target']
clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
)
scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')
clf.fit(X, y)
joblib.dump(clf, model_artifact.path)
metrics.log_metric("f1_mean", float(scores.mean()))
metrics.log_metric("f1_std", float(scores.std()))
@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
preprocess_task = preprocess_data(
raw_data=dsl.importer(
artifact_uri="gs://my-bucket/raw-data/",
artifact_class=Dataset,
).output,
)
train_task = train_model(
training_data=preprocess_task.outputs["processed_data"],
n_estimators=n_estimators,
max_depth=max_depth,
)
# 컴파일 및 실행
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")
6.2 Airflow ML 파이프라인
# Airflow DAG으로 ML 파이프라인 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_training_pipeline',
default_args=default_args,
schedule_interval='@weekly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'training'],
) as dag:
validate_data = KubernetesPodOperator(
task_id='validate_data',
name='data-validation',
image='ml-pipeline:latest',
cmds=['python', 'validate.py'],
namespace='ml-pipelines',
)
extract_features = KubernetesPodOperator(
task_id='extract_features',
name='feature-extraction',
image='ml-pipeline:latest',
cmds=['python', 'extract_features.py'],
namespace='ml-pipelines',
)
train_model = KubernetesPodOperator(
task_id='train_model',
name='model-training',
image='ml-pipeline:latest',
cmds=['python', 'train.py'],
namespace='ml-pipelines',
resources={
'request_memory': '8Gi',
'request_cpu': '4',
'limit_gpu': '1',
},
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=lambda: print("Evaluating model..."),
)
validate_data >> extract_features >> train_model >> evaluate_model
6.3 파이프라인 솔루션 비교
| 기능 | Kubeflow | Airflow | Vertex AI | SageMaker |
|---|---|---|---|---|
| 실행 환경 | Kubernetes | 다양 | GCP 관리형 | AWS 관리형 |
| ML 특화 | 높음 | 중간 | 높음 | 높음 |
| UI/시각화 | 기본 | 우수 | 우수 | 우수 |
| 확장성 | 높음 | 높음 | 높음 | 높음 |
| 학습 곡선 | 높음 | 중간 | 낮음 | 낮음 |
| 비용 | 인프라만 | 인프라만 | 사용량 | 사용량 |
7. 실험 추적 (Experiment Tracking)
7.1 MLflow 실험 추적
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score
# MLflow 서버 설정
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")
# 실험 실행
with mlflow.start_run(run_name="gbm-experiment-42") as run:
# 하이퍼파라미터 로깅
params = {
"n_estimators": 200,
"max_depth": 8,
"learning_rate": 0.1,
"subsample": 0.8,
}
mlflow.log_params(params)
# 모델 학습
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# 메트릭 로깅
y_pred = model.predict(X_test)
metrics = {
"f1": f1_score(y_test, y_pred, average='macro'),
"precision": precision_score(y_test, y_pred, average='macro'),
"recall": recall_score(y_test, y_pred, average='macro'),
}
mlflow.log_metrics(metrics)
# 모델 아티팩트 로깅
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="recommendation-gbm",
)
# 커스텀 아티팩트 (피처 중요도 차트 등)
mlflow.log_artifact("feature_importance.png")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
7.2 Weights and Biases 통합
import wandb
wandb.init(
project="recommendation-model",
config={
"architecture": "GBM",
"n_estimators": 200,
"learning_rate": 0.1,
},
)
# 학습 루프에서 메트릭 로깅
for epoch in range(100):
train_loss = train_one_epoch(model, train_loader)
val_loss, val_f1 = evaluate(model, val_loader)
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"val_loss": val_loss,
"val_f1": val_f1,
})
# 모델 아티팩트 저장
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)
wandb.finish()
8. 모델 레지스트리 (Model Registry)
8.1 MLflow Model Registry
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 모델 버전 생성
model_version = client.create_model_version(
name="recommendation-gbm",
source=f"runs:/{run_id}/model",
run_id=run_id,
description="GBM 모델 v3: 신규 피처 추가",
)
# 스테이지 전환: None → Staging
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Staging",
archive_existing_versions=False,
)
# 스테이징 검증 후 프로덕션 승격
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Production",
archive_existing_versions=True, # 기존 프로덕션 버전 아카이브
)
8.2 모델 버전 관리 전략
모델 라이프사이클:
None → Staging → Production → Archived
버전 관리 정책:
- Staging: 자동 성능 테스트 통과 시
- Production: 수동 승인 또는 A/B 테스트 통과 시
- Archived: 새 버전 프로덕션 승격 시 자동
- 최근 3개 버전 보관, 이전 버전 삭제
# 프로덕션 모델 로드
import mlflow.pyfunc
model = mlflow.pyfunc.load_model(
model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)
9. 모델 서빙
9.1 BentoML
# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON
# 저장된 모델 로드
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()
svc = bentoml.Service("recommendation_service", runners=[runner])
@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
features = np.array(input_data["features"]).reshape(1, -1)
prediction = await runner.predict.async_run(features)
return {
"prediction": int(prediction[0]),
"model_version": "v3.2.1",
}
# bentofile.yaml
service: "service:svc"
include:
- "*.py"
python:
packages:
- scikit-learn
- numpy
docker:
python_version: "3.11"
# 빌드 및 배포
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest
9.2 Seldon Core (Kubernetes)
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: recommendation-model
namespace: ml-serving
spec:
predictors:
- name: default
replicas: 3
graph:
name: classifier
implementation: SKLEARN_SERVER
modelUri: s3://models/recommendation/v3
envSecretRefName: s3-credentials
componentSpecs:
- spec:
containers:
- name: classifier
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
traffic: 100
labels:
version: v3
9.3 TensorFlow Serving
# TFServing Docker 실행
docker run -p 8501:8501 \
--mount type=bind,source=/models/recommendation,target=/models/recommendation \
-e MODEL_NAME=recommendation \
tensorflow/serving:latest
# gRPC 클라이언트
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
tf.make_tensor_proto(input_data, shape=[1, 128])
)
response = stub.Predict(request, timeout=5.0)
9.4 vLLM (LLM 서빙)
# vLLM 서버 실행
# python -m vllm.entrypoints.openai.api_server \
# --model meta-llama/Llama-3-8B-Instruct \
# --tensor-parallel-size 2 \
# --max-model-len 8192
# 클라이언트 호출
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed",
)
response = client.chat.completions.create(
model="meta-llama/Llama-3-8B-Instruct",
messages=[
{"role": "user", "content": "추천 시스템의 장점은?"},
],
temperature=0.7,
max_tokens=512,
)
9.5 서빙 솔루션 비교
| 솔루션 | 모델 유형 | 배포 환경 | 배치 추론 | GPU 지원 | 자동 스케일링 |
|---|---|---|---|---|---|
| BentoML | 범용 | Docker/K8s | 지원 | 지원 | 지원 |
| Seldon Core | 범용 | K8s 전용 | 지원 | 지원 | HPA |
| TFServing | TF 모델 | Docker/K8s | 지원 | 지원 | 수동 |
| Triton | 멀티프레임워크 | Docker/K8s | 지원 | 최적화 | 지원 |
| vLLM | LLM | Docker/K8s | 미지원 | 필수 | 지원 |
10. 모델 모니터링
10.1 데이터 드리프트 감지
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# 기준 데이터 (학습 시점)
reference_data = pd.read_parquet("training_data.parquet")
# 현재 데이터 (프로덕션)
current_data = pd.read_parquet("production_data_latest.parquet")
column_mapping = ColumnMapping(
target='label',
numerical_features=['age', 'income', 'purchase_count'],
categorical_features=['category', 'region'],
)
# 드리프트 리포트 생성
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
# HTML 리포트 저장
report.save_html("drift_report.html")
# 프로그래밍적 접근
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
if dataset_drift:
print(f"드리프트 감지! 드리프트 피처 비율: {drift_share:.2%}")
# 재학습 파이프라인 트리거
trigger_retraining_pipeline()
10.2 예측 드리프트 감지
from evidently.test_suite import TestSuite
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestMeanInNSigmas,
)
test_suite = TestSuite(tests=[
TestShareOfDriftedColumns(lt=0.3), # 드리프트 피처 30% 미만
TestColumnDrift(column_name="prediction_score"),
TestMeanInNSigmas(column_name="prediction_score", n=2),
])
test_suite.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
# 테스트 결과 확인
test_results = test_suite.as_dict()
all_passed = all(
t['status'] == 'SUCCESS' for t in test_results['tests']
)
if not all_passed:
alert_team("모델 드리프트 감지 - 재학습 검토 필요")
10.3 WhyLabs 통합
import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter
# 프로파일 생성
results = why.log(current_data)
profile = results.profile()
# WhyLabs 전송
writer = WhyLabsWriter()
writer.write(profile)
# 커스텀 메트릭 모니터링
from whylogs.experimental.core.udf_schema import udf_schema
@udf_schema()
def custom_metrics(df):
return {
"avg_prediction_confidence": df['confidence'].mean(),
"null_feature_ratio": df.isnull().sum().sum() / df.size,
"prediction_distribution_skew": df['prediction'].skew(),
}
11. A/B 테스트
11.1 트래픽 분할
# Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: recommendation-ab-test
spec:
hosts:
- recommendation-service
http:
- match:
- headers:
x-experiment-group:
exact: "treatment"
route:
- destination:
host: recommendation-service
subset: v2-challenger
weight: 100
- route:
- destination:
host: recommendation-service
subset: v1-champion
weight: 80
- destination:
host: recommendation-service
subset: v2-challenger
weight: 20
11.2 통계적 유의성 검증
from scipy import stats
import numpy as np
def ab_test_significance(
control_conversions, control_total,
treatment_conversions, treatment_total,
alpha=0.05,
):
"""A/B 테스트 통계적 유의성 검증"""
control_rate = control_conversions / control_total
treatment_rate = treatment_conversions / treatment_total
# Z-test for proportions
pooled_rate = (
(control_conversions + treatment_conversions) /
(control_total + treatment_total)
)
se = np.sqrt(
pooled_rate * (1 - pooled_rate) *
(1/control_total + 1/treatment_total)
)
z_stat = (treatment_rate - control_rate) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))
lift = (treatment_rate - control_rate) / control_rate
return {
"control_rate": control_rate,
"treatment_rate": treatment_rate,
"lift": f"{lift:.2%}",
"p_value": p_value,
"significant": p_value < alpha,
"recommendation": (
"챌린저 모델 승격" if p_value < alpha and lift > 0
else "기존 모델 유지"
),
}
11.3 Multi-Armed Bandit
import numpy as np
class ThompsonSampling:
"""Thompson Sampling을 이용한 적응적 트래픽 분할"""
def __init__(self, n_arms):
self.n_arms = n_arms
self.successes = np.ones(n_arms) # Beta prior alpha
self.failures = np.ones(n_arms) # Beta prior beta
def select_arm(self):
"""Beta 분포에서 샘플링하여 최적 arm 선택"""
samples = [
np.random.beta(self.successes[i], self.failures[i])
for i in range(self.n_arms)
]
return int(np.argmax(samples))
def update(self, arm, reward):
"""결과 업데이트"""
if reward:
self.successes[arm] += 1
else:
self.failures[arm] += 1
def get_allocation(self):
"""현재 트래픽 할당 비율"""
total = self.successes + self.failures
rates = self.successes / total
return rates / rates.sum()
# 사용 예시
bandit = ThompsonSampling(n_arms=3) # 3개 모델 버전
for request in incoming_requests:
arm = bandit.select_arm()
prediction = models[arm].predict(request)
reward = get_conversion(request, prediction)
bandit.update(arm, reward)
12. CI/CD for ML
12.1 모델 검증 게이트
# model_validation.py
import mlflow
import json
def validate_model(
model_uri: str,
test_data_path: str,
min_f1: float = 0.85,
max_latency_ms: float = 50.0,
max_model_size_mb: float = 500.0,
):
"""모델 배포 전 검증 게이트"""
results = {"passed": True, "checks": []}
# 1. 성능 검증
model = mlflow.pyfunc.load_model(model_uri)
test_data = pd.read_parquet(test_data_path)
predictions = model.predict(test_data.drop('target', axis=1))
f1 = f1_score(test_data['target'], predictions, average='macro')
results["checks"].append({
"name": "performance",
"metric": "f1_score",
"value": f1,
"threshold": min_f1,
"passed": f1 >= min_f1,
})
# 2. 레이턴시 검증
import time
latencies = []
for i in range(100):
start = time.time()
model.predict(test_data.iloc[[i]])
latencies.append((time.time() - start) * 1000)
p99_latency = np.percentile(latencies, 99)
results["checks"].append({
"name": "latency",
"metric": "p99_ms",
"value": p99_latency,
"threshold": max_latency_ms,
"passed": p99_latency <= max_latency_ms,
})
# 3. 모델 크기 검증
import os
model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
results["checks"].append({
"name": "model_size",
"metric": "size_mb",
"value": model_size_mb,
"threshold": max_model_size_mb,
"passed": model_size_mb <= max_model_size_mb,
})
results["passed"] = all(c["passed"] for c in results["checks"])
return results
12.2 GitHub Actions ML CI/CD
name: ML CI/CD Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'features/**'
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate Data Schema
run: python scripts/validate_data.py
train-and-evaluate:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train Model
run: python train.py --config configs/production.yaml
- name: Evaluate Model
run: python evaluate.py --model-path outputs/model
- name: Upload Metrics
run: python upload_metrics.py
model-validation:
needs: train-and-evaluate
runs-on: ubuntu-latest
steps:
- name: Run Validation Gates
run: python model_validation.py
- name: Performance Regression Check
run: python check_regression.py
deploy-staging:
needs: model-validation
runs-on: ubuntu-latest
steps:
- name: Deploy to Staging
run: |
kubectl apply -f k8s/staging/
kubectl rollout status deployment/model-serving -n staging
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Shadow Deployment
run: kubectl apply -f k8s/production/shadow.yaml
- name: Canary Rollout
run: |
kubectl apply -f k8s/production/canary-10.yaml
sleep 300
python check_canary_metrics.py
kubectl apply -f k8s/production/canary-50.yaml
sleep 300
python check_canary_metrics.py
kubectl apply -f k8s/production/full-rollout.yaml
13. 비용 최적화
13.1 배치 vs 실시간 추론
# 배치 추론: 비용 효율적 (대량 처리)
def batch_inference(model, data_path, output_path):
"""Spark를 이용한 배치 추론"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("batch-inference").getOrCreate()
df = spark.read.parquet(data_path)
# UDF로 모델 적용
predict_udf = spark.udf.register(
"predict",
lambda features: float(model.predict([features])[0]),
)
result = df.withColumn("prediction", predict_udf(df["features"]))
result.write.parquet(output_path)
# 실시간 추론: 지연 시간 중요 (개별 요청)
# → 온라인 서빙 인프라 필요 (비용 높음)
13.2 예측 캐싱
import redis
import hashlib
import json
class PredictionCache:
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
self.hit_count = 0
self.miss_count = 0
def _make_key(self, features: dict) -> str:
feature_str = json.dumps(features, sort_keys=True)
return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"
def get_or_predict(self, features: dict, model) -> dict:
key = self._make_key(features)
cached = self.redis.get(key)
if cached:
self.hit_count += 1
return json.loads(cached)
self.miss_count += 1
prediction = model.predict(features)
self.redis.setex(key, self.ttl, json.dumps(prediction))
return prediction
@property
def hit_rate(self):
total = self.hit_count + self.miss_count
return self.hit_count / total if total > 0 else 0
13.3 모델 압축
# 양자화 (Quantization)
import torch
model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8,
)
# 크기 비교
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)
print(f"원본: {original_size:.1f}MB → 양자화: {quantized_size:.1f}MB")
print(f"압축률: {(1 - quantized_size/original_size):.1%}")
13.4 비용 최적화 체크리스트
| 전략 | 예상 절감 | 트레이드오프 |
|---|---|---|
| 배치 추론 전환 | 50-80% | 실시간성 포기 |
| 예측 캐싱 | 30-60% | 캐시 신선도 |
| 모델 양자화 | 40-60% | 미세 정확도 손실 |
| Spot/Preemptible 인스턴스 | 60-90% | 가용성 불안정 |
| 자동 스케일링 | 20-40% | 콜드 스타트 |
| 모델 증류 | 50-70% | 개발 비용 |
14. 퀴즈
Q1: Feature Store의 핵심 가치는 무엇인가?
A: Feature Store의 핵심 가치는 Training-Serving Skew 방지입니다. 학습 시점과 서빙 시점에서 동일한 피처 정의와 변환 로직을 공유하여 데이터 불일치를 근본적으로 제거합니다. 추가로 피처 재사용, 팀 협업, 피처 거버넌스도 중요한 가치입니다.
Q2: Point-in-Time Join이 필요한 이유는?
A: Point-in-Time Join은 데이터 누수(Data Leakage)를 방지합니다. 학습 데이터를 생성할 때, 각 이벤트 시점에서 실제로 사용 가능했던 피처 값만 조인해야 합니다. 미래 데이터가 포함되면 모델이 학습 시 비현실적으로 높은 성능을 보이지만, 프로덕션에서는 성능이 크게 저하됩니다.
Q3: MLOps Level 2에서 Level 3으로 넘어가기 위한 핵심 요소는?
A: Level 2(CI/CD for ML)에서 Level 3(자동 재학습)으로 넘어가려면 자동 드리프트 감지 및 재학습 트리거 시스템이 필요합니다. 데이터 드리프트, 예측 드리프트를 실시간으로 모니터링하고, 임계값 초과 시 자동으로 재학습 파이프라인을 실행하며, 챔피언/챌린저 비교를 통해 자동 승격합니다.
Q4: BentoML과 Seldon Core의 주요 차이점은?
A: BentoML은 프레임워크 독립적인 모델 패키징 도구로, Docker 컨테이너를 생성하여 어디서든 배포할 수 있습니다. Seldon Core는 Kubernetes 네이티브 서빙 플랫폼으로, CRD 기반의 배포, A/B 테스트, Canary 배포, 설명 가능성(Explainer) 등 K8s 생태계와 깊이 통합됩니다.
Q5: 모델 드리프트를 감지하는 주요 방법 3가지는?
A: (1) 데이터 드리프트: 입력 피처 분포의 변화를 KS 검정, PSI(Population Stability Index) 등으로 감지. (2) 예측 드리프트: 모델 출력 분포의 변화를 모니터링. (3) 성능 드리프트: 실제 레이블과 비교하여 정확도, F1 등의 메트릭 저하를 추적. Evidently AI와 WhyLabs가 대표적인 도구입니다.
15. 참고 자료
- Feast 공식 문서 - https://docs.feast.dev/
- MLflow 공식 문서 - https://mlflow.org/docs/latest/index.html
- Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
- Evidently AI 문서 - https://docs.evidentlyai.com/
- BentoML 공식 문서 - https://docs.bentoml.com/
- Seldon Core 문서 - https://docs.seldon.io/
- Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
- WhyLabs 문서 - https://docs.whylabs.ai/
- vLLM 프로젝트 - https://docs.vllm.ai/
- Weights and Biases - https://docs.wandb.ai/
- Tecton Feature Store - https://docs.tecton.ai/
- Hopsworks Feature Store - https://docs.hopsworks.ai/
- NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/
Feature Store & MLOps Pipeline Complete Guide 2025: Feast, Feature Engineering, Model Serving
Table of Contents
1. Why Feature Store?
1.1 The Training-Serving Skew Problem
The most frequent source of failures in ML model deployment is feature inconsistency between training and serving time.
When data scientists create features with Pandas in Jupyter Notebooks, and engineers re-implement the same logic in Java or Go, subtle differences creep in.
# Training time: Feature creation with Pandas
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling('7D').mean()
)
# Serving time: Re-implemented in SQL or another language -> subtle differences
# - Boundary conditions (inclusive/exclusive) differences
# - Timezone handling differences
# - NULL handling differences
Feature Store solves this problem fundamentally by sharing a single feature definition between both training and serving.
1.2 Feature Reuse and Team Collaboration
In large ML organizations, dozens of teams independently create similar features.
| Problem | Before Feature Store | After Feature Store |
|---|---|---|
| Feature duplication | Each team recreates similar features | Search/reuse from central registry |
| Consistency | Different computation logic per team | Single definition, version controlled |
| Freshness | Manual updates | Automated materialization |
| Discoverability | Depends on Slack/Wiki | Feature catalog + metadata |
| Governance | None | Owner, lineage, access control |
1.3 Reducing Data Pipeline Complexity
Without a Feature Store, each model maintains its own data pipeline:
Model A: raw data -> ETL A -> features A -> training A
Model B: raw data -> ETL B -> features B -> training B
Model C: raw data -> ETL C -> features C -> training C
After Feature Store adoption:
raw data -> Feature Store (centralized) -> Models A, B, C share features
2. Feature Store Architecture
2.1 Core Components
A Feature Store consists of four core components.
Offline Store
- Stores large volumes of historical feature data
- Used for training data generation
- BigQuery, Snowflake, Redshift, Parquet files
- Supports Point-in-Time Joins
Online Store
- Low-latency lookup of latest feature values
- Used for real-time inference
- Redis, DynamoDB, Bigtable
- Target P99 latency under 10ms
Feature Registry
- Manages metadata for all features
- Name, type, owner, description, tags
- Data lineage tracking
Transformation Engine
- Creates features from raw data
- Supports batch/streaming transformations
- Spark, Flink, dbt integration
2.2 Data Flow
[Data Sources] -> [Transform Engine] -> [Offline Store] <-> [Online Store]
^ | |
Raw Data Training Pipeline Inference Service
|
[Feature Registry]
(Metadata Management)
2.3 Feature Store Solution Comparison
| Solution | Type | Offline | Online | Streaming | Cost |
|---|---|---|---|---|---|
| Feast | OSS | Redshift/BQ/File | Redis/DynamoDB | Push-based | Free (infra only) |
| Tecton | Managed | Spark + Delta | DynamoDB | Spark Streaming | Subscription |
| Hopsworks | OSS/Managed | Hudi | RonDB | Flink | Community free |
| Vertex AI FS | GCP Managed | BigQuery | Bigtable | Dataflow | Usage-based |
| SageMaker FS | AWS Managed | S3 + Glue | Built-in Online | Kinesis | Usage-based |
3. Feast Deep Dive
3.1 Installation and Initial Setup
# Install Feast
pip install feast
# Initialize project
feast init my_feature_store
cd my_feature_store
# Project structure
# my_feature_store/
# feature_store.yaml # Project config
# features.py # Feature definitions
# data/ # Sample data
feature_store.yaml configuration:
project: my_ml_project
registry: data/registry.db
provider: local # local, gcp, aws
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2
3.2 Feature Definitions
# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
# Entity definition
user = Entity(
name="user_id",
join_keys=["user_id"],
description="Unique user identifier",
)
# Data source definition
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Feature view definition
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="days_since_last_login", dtype=Int64),
Field(name="preferred_category", dtype=String),
],
online=True,
source=user_stats_source,
tags={"team": "recommendation", "version": "v2"},
)
3.3 Materialization (Offline to Online)
# Apply feature definitions
feast apply
# Materialize feature values from offline to online store
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59
# Incremental materialization (since last run)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
3.4 Training Data Generation (Point-in-Time Join)
Point-in-Time Join is the core capability of a Feature Store. It prevents data leakage while accurately retrieving feature values at specific historical timestamps.
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# Entity dataframe with timestamps
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1001],
"event_timestamp": pd.to_datetime([
"2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
]),
})
# Get features with Point-in-Time Join
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
).to_df()
print(training_df)
# Each row has feature values as of that point in time
# user 1001's features at 2024-09-01 != features at 2024-10-01
3.5 Online Serving
# Online feature retrieval for real-time inference
feature_vector = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
],
).to_dict()
# Result: Returns latest feature values (P99 latency ~5-10ms)
3.6 GCP/AWS Production Configuration
# GCP production setup
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
type: datastore # or bigtable
project_id: my-gcp-project
offline_store:
type: bigquery
project_id: my-gcp-project
dataset: feast_features
# AWS production setup
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
type: dynamodb
region: us-east-1
offline_store:
type: redshift
cluster_id: my-redshift-cluster
region: us-east-1
database: ml_features
user: feast_user
s3_staging_location: s3://my-bucket/feast-staging/
4. Feature Engineering Patterns
4.1 Temporal Features
import pandas as pd
def create_temporal_features(df, timestamp_col, entity_col, value_col):
"""Create time-window-based aggregation features"""
df = df.sort_values(timestamp_col)
features = pd.DataFrame()
features[entity_col] = df[entity_col]
features[timestamp_col] = df[timestamp_col]
# Moving averages across different windows
for window in ['1D', '7D', '30D']:
features[f'{value_col}_mean_{window}'] = (
df.groupby(entity_col)[value_col]
.transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
)
# Trend feature: 7-day avg / 30-day avg
features[f'{value_col}_trend_7d_30d'] = (
features[f'{value_col}_mean_7D'] /
features[f'{value_col}_mean_30D'].replace(0, float('nan'))
)
# Day of week / hour features
features['day_of_week'] = df[timestamp_col].dt.dayofweek
features['hour_of_day'] = df[timestamp_col].dt.hour
features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
return features
4.2 Aggregation Features
def create_aggregation_features(events_df, entity_col, group_col):
"""Per-entity aggregation features"""
agg_features = events_df.groupby(entity_col).agg(
event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
).reset_index()
# Ratio features
agg_features['high_value_ratio'] = (
events_df[events_df['amount'] > 100]
.groupby(entity_col)
.size()
.reindex(agg_features[entity_col], fill_value=0)
.values / agg_features['event_count']
)
return agg_features
4.3 Embedding Features
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
def create_text_embedding_features(df, text_col, prefix='emb'):
"""Generate text embedding features"""
embeddings = model.encode(df[text_col].fillna('').tolist())
emb_df = pd.DataFrame(
embeddings,
columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
index=df.index,
)
return pd.concat([df, emb_df], axis=1)
4.4 Cross Features
def create_cross_features(df):
"""Cross / interaction features between existing features"""
# Ratio feature
df['purchase_to_visit_ratio'] = (
df['purchase_count'] / df['visit_count'].replace(0, 1)
)
# Bucketing + cross
df['age_bucket'] = pd.cut(
df['age'], bins=[0, 25, 35, 50, 100],
labels=['young', 'middle', 'senior', 'elder']
)
df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']
# Numeric interaction
df['income_x_age'] = df['income'] * df['age']
return df
5. MLOps Maturity Levels
Google defines the MLOps maturity model in four levels.
Level 0: Manual Process
Data scientist manually:
1. Collects and preprocesses data
2. Trains model in Jupyter
3. Hands model file to engineer
4. Engineer writes serving code
5. Manual deployment
Problems:
- Deployment cycle: months
- No reproducibility
- No monitoring
Level 1: ML Pipeline Automation
Automated ML Pipeline:
1. Data validation -> Feature engineering -> Training -> Evaluation -> Deployment
2. Pipeline orchestrator (Kubeflow, Airflow)
3. CT (Continuous Training): trigger-based auto-retraining
Improvements:
- Deployment cycle: weekly
- Reproducible pipeline
- Basic monitoring
Level 2: CI/CD for ML
CI/CD Pipeline Integration:
1. Code change -> Auto test -> Pipeline build -> Model training
2. Model validation gates (performance thresholds)
3. Shadow Deployment -> Canary -> Full Rollout
4. Automated A/B testing
Improvements:
- Deployment cycle: daily
- Automatic rollback
- Systematic experiment management
Level 3: Auto-Retraining + Full Automation
Full Automation:
1. Drift detection -> Auto-retraining trigger
2. Auto feature selection / hyperparameter optimization
3. Automatic model comparison + champion/challenger
4. Auto-scaling + cost optimization
Improvements:
- Deployment cycle: hourly
- Unattended operation
- Proactive monitoring
6. ML Pipeline Orchestration
6.1 Kubeflow Pipelines
# Define pipeline with Kubeflow Pipelines DSL
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11",
packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
raw_data: Input[Dataset],
processed_data: Output[Dataset],
):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(raw_data.path)
scaler = StandardScaler()
df_scaled = pd.DataFrame(
scaler.fit_transform(df.select_dtypes(include='number')),
columns=df.select_dtypes(include='number').columns,
)
df_scaled.to_parquet(processed_data.path)
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
training_data: Input[Dataset],
model_artifact: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 10,
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import joblib
df = pd.read_parquet(training_data.path)
X = df.drop('target', axis=1)
y = df['target']
clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
)
scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')
clf.fit(X, y)
joblib.dump(clf, model_artifact.path)
metrics.log_metric("f1_mean", float(scores.mean()))
metrics.log_metric("f1_std", float(scores.std()))
@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
preprocess_task = preprocess_data(
raw_data=dsl.importer(
artifact_uri="gs://my-bucket/raw-data/",
artifact_class=Dataset,
).output,
)
train_task = train_model(
training_data=preprocess_task.outputs["processed_data"],
n_estimators=n_estimators,
max_depth=max_depth,
)
# Compile and run
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")
6.2 Airflow ML Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_training_pipeline',
default_args=default_args,
schedule_interval='@weekly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'training'],
) as dag:
validate_data = KubernetesPodOperator(
task_id='validate_data',
name='data-validation',
image='ml-pipeline:latest',
cmds=['python', 'validate.py'],
namespace='ml-pipelines',
)
extract_features = KubernetesPodOperator(
task_id='extract_features',
name='feature-extraction',
image='ml-pipeline:latest',
cmds=['python', 'extract_features.py'],
namespace='ml-pipelines',
)
train_model = KubernetesPodOperator(
task_id='train_model',
name='model-training',
image='ml-pipeline:latest',
cmds=['python', 'train.py'],
namespace='ml-pipelines',
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=lambda: print("Evaluating model..."),
)
validate_data >> extract_features >> train_model >> evaluate_model
6.3 Pipeline Solution Comparison
| Feature | Kubeflow | Airflow | Vertex AI | SageMaker |
|---|---|---|---|---|
| Runtime | Kubernetes | Various | GCP Managed | AWS Managed |
| ML-specific | High | Medium | High | High |
| UI/Visualization | Basic | Excellent | Excellent | Excellent |
| Scalability | High | High | High | High |
| Learning curve | Steep | Moderate | Low | Low |
| Cost | Infra only | Infra only | Usage-based | Usage-based |
7. Experiment Tracking
7.1 MLflow Experiment Tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, precision_score, recall_score
# MLflow server configuration
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")
# Experiment run
with mlflow.start_run(run_name="gbm-experiment-42") as run:
# Log hyperparameters
params = {
"n_estimators": 200,
"max_depth": 8,
"learning_rate": 0.1,
"subsample": 0.8,
}
mlflow.log_params(params)
# Train model
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# Log metrics
y_pred = model.predict(X_test)
metrics = {
"f1": f1_score(y_test, y_pred, average='macro'),
"precision": precision_score(y_test, y_pred, average='macro'),
"recall": recall_score(y_test, y_pred, average='macro'),
}
mlflow.log_metrics(metrics)
# Log model artifact
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="recommendation-gbm",
)
mlflow.log_artifact("feature_importance.png")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
7.2 Weights and Biases Integration
import wandb
wandb.init(
project="recommendation-model",
config={
"architecture": "GBM",
"n_estimators": 200,
"learning_rate": 0.1,
},
)
# Log metrics during training loop
for epoch in range(100):
train_loss = train_one_epoch(model, train_loader)
val_loss, val_f1 = evaluate(model, val_loader)
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"val_loss": val_loss,
"val_f1": val_f1,
})
# Save model artifact
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)
wandb.finish()
8. Model Registry
8.1 MLflow Model Registry
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Create model version
model_version = client.create_model_version(
name="recommendation-gbm",
source=f"runs:/{run_id}/model",
run_id=run_id,
description="GBM model v3: added new features",
)
# Stage transition: None -> Staging
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Staging",
archive_existing_versions=False,
)
# Promote to Production after staging validation
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Production",
archive_existing_versions=True, # Archive existing production version
)
8.2 Model Versioning Strategy
Model Lifecycle:
None -> Staging -> Production -> Archived
Versioning Policy:
- Staging: Upon passing automated performance tests
- Production: Manual approval or A/B test pass
- Archived: Automatically when new version is promoted
- Keep 3 most recent versions, delete older ones
# Load production model
import mlflow.pyfunc
model = mlflow.pyfunc.load_model(
model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)
9. Model Serving
9.1 BentoML
# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON
# Load saved model
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()
svc = bentoml.Service("recommendation_service", runners=[runner])
@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
features = np.array(input_data["features"]).reshape(1, -1)
prediction = await runner.predict.async_run(features)
return {
"prediction": int(prediction[0]),
"model_version": "v3.2.1",
}
# bentofile.yaml
service: "service:svc"
include:
- "*.py"
python:
packages:
- scikit-learn
- numpy
docker:
python_version: "3.11"
# Build and deploy
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest
9.2 Seldon Core (Kubernetes)
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: recommendation-model
namespace: ml-serving
spec:
predictors:
- name: default
replicas: 3
graph:
name: classifier
implementation: SKLEARN_SERVER
modelUri: s3://models/recommendation/v3
envSecretRefName: s3-credentials
componentSpecs:
- spec:
containers:
- name: classifier
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
traffic: 100
labels:
version: v3
9.3 TensorFlow Serving
# Run TFServing Docker
docker run -p 8501:8501 \
--mount type=bind,source=/models/recommendation,target=/models/recommendation \
-e MODEL_NAME=recommendation \
tensorflow/serving:latest
# gRPC client
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
tf.make_tensor_proto(input_data, shape=[1, 128])
)
response = stub.Predict(request, timeout=5.0)
9.4 vLLM (LLM Serving)
# Start vLLM server
# python -m vllm.entrypoints.openai.api_server \
# --model meta-llama/Llama-3-8B-Instruct \
# --tensor-parallel-size 2 \
# --max-model-len 8192
# Client call
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed",
)
response = client.chat.completions.create(
model="meta-llama/Llama-3-8B-Instruct",
messages=[
{"role": "user", "content": "What are the benefits of recommendation systems?"},
],
temperature=0.7,
max_tokens=512,
)
9.5 Serving Solution Comparison
| Solution | Model Types | Deployment Env | Batch Inference | GPU Support | Auto-scaling |
|---|---|---|---|---|---|
| BentoML | Universal | Docker/K8s | Supported | Supported | Supported |
| Seldon Core | Universal | K8s only | Supported | Supported | HPA |
| TFServing | TF models | Docker/K8s | Supported | Supported | Manual |
| Triton | Multi-framework | Docker/K8s | Supported | Optimized | Supported |
| vLLM | LLM | Docker/K8s | Not supported | Required | Supported |
10. Model Monitoring
10.1 Data Drift Detection
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# Reference data (from training time)
reference_data = pd.read_parquet("training_data.parquet")
# Current data (production)
current_data = pd.read_parquet("production_data_latest.parquet")
column_mapping = ColumnMapping(
target='label',
numerical_features=['age', 'income', 'purchase_count'],
categorical_features=['category', 'region'],
)
# Generate drift report
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
# Save HTML report
report.save_html("drift_report.html")
# Programmatic access
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
if dataset_drift:
print(f"Drift detected! Drifted feature share: {drift_share:.2%}")
trigger_retraining_pipeline()
10.2 Prediction Drift Detection
from evidently.test_suite import TestSuite
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestMeanInNSigmas,
)
test_suite = TestSuite(tests=[
TestShareOfDriftedColumns(lt=0.3), # Less than 30% drifted features
TestColumnDrift(column_name="prediction_score"),
TestMeanInNSigmas(column_name="prediction_score", n=2),
])
test_suite.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
test_results = test_suite.as_dict()
all_passed = all(
t['status'] == 'SUCCESS' for t in test_results['tests']
)
if not all_passed:
alert_team("Model drift detected - retraining review required")
10.3 WhyLabs Integration
import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter
# Create profile
results = why.log(current_data)
profile = results.profile()
# Send to WhyLabs
writer = WhyLabsWriter()
writer.write(profile)
11. A/B Testing for ML Models
11.1 Traffic Splitting
# Traffic splitting with Istio VirtualService
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: recommendation-ab-test
spec:
hosts:
- recommendation-service
http:
- match:
- headers:
x-experiment-group:
exact: "treatment"
route:
- destination:
host: recommendation-service
subset: v2-challenger
weight: 100
- route:
- destination:
host: recommendation-service
subset: v1-champion
weight: 80
- destination:
host: recommendation-service
subset: v2-challenger
weight: 20
11.2 Statistical Significance Testing
from scipy import stats
import numpy as np
def ab_test_significance(
control_conversions, control_total,
treatment_conversions, treatment_total,
alpha=0.05,
):
"""A/B test statistical significance verification"""
control_rate = control_conversions / control_total
treatment_rate = treatment_conversions / treatment_total
# Z-test for proportions
pooled_rate = (
(control_conversions + treatment_conversions) /
(control_total + treatment_total)
)
se = np.sqrt(
pooled_rate * (1 - pooled_rate) *
(1/control_total + 1/treatment_total)
)
z_stat = (treatment_rate - control_rate) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))
lift = (treatment_rate - control_rate) / control_rate
return {
"control_rate": control_rate,
"treatment_rate": treatment_rate,
"lift": f"{lift:.2%}",
"p_value": p_value,
"significant": p_value < alpha,
"recommendation": (
"Promote challenger model" if p_value < alpha and lift > 0
else "Keep current model"
),
}
11.3 Multi-Armed Bandit
import numpy as np
class ThompsonSampling:
"""Adaptive traffic splitting using Thompson Sampling"""
def __init__(self, n_arms):
self.n_arms = n_arms
self.successes = np.ones(n_arms) # Beta prior alpha
self.failures = np.ones(n_arms) # Beta prior beta
def select_arm(self):
"""Sample from Beta distribution to select optimal arm"""
samples = [
np.random.beta(self.successes[i], self.failures[i])
for i in range(self.n_arms)
]
return int(np.argmax(samples))
def update(self, arm, reward):
"""Update results"""
if reward:
self.successes[arm] += 1
else:
self.failures[arm] += 1
def get_allocation(self):
"""Current traffic allocation ratio"""
total = self.successes + self.failures
rates = self.successes / total
return rates / rates.sum()
# Usage example
bandit = ThompsonSampling(n_arms=3) # 3 model versions
for request in incoming_requests:
arm = bandit.select_arm()
prediction = models[arm].predict(request)
reward = get_conversion(request, prediction)
bandit.update(arm, reward)
12. CI/CD for ML
12.1 Model Validation Gates
# model_validation.py
import mlflow
import json
def validate_model(
model_uri: str,
test_data_path: str,
min_f1: float = 0.85,
max_latency_ms: float = 50.0,
max_model_size_mb: float = 500.0,
):
"""Model deployment validation gates"""
results = {"passed": True, "checks": []}
# 1. Performance validation
model = mlflow.pyfunc.load_model(model_uri)
test_data = pd.read_parquet(test_data_path)
predictions = model.predict(test_data.drop('target', axis=1))
f1 = f1_score(test_data['target'], predictions, average='macro')
results["checks"].append({
"name": "performance",
"metric": "f1_score",
"value": f1,
"threshold": min_f1,
"passed": f1 >= min_f1,
})
# 2. Latency validation
import time
latencies = []
for i in range(100):
start = time.time()
model.predict(test_data.iloc[[i]])
latencies.append((time.time() - start) * 1000)
p99_latency = np.percentile(latencies, 99)
results["checks"].append({
"name": "latency",
"metric": "p99_ms",
"value": p99_latency,
"threshold": max_latency_ms,
"passed": p99_latency <= max_latency_ms,
})
# 3. Model size validation
import os
model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
results["checks"].append({
"name": "model_size",
"metric": "size_mb",
"value": model_size_mb,
"threshold": max_model_size_mb,
"passed": model_size_mb <= max_model_size_mb,
})
results["passed"] = all(c["passed"] for c in results["checks"])
return results
12.2 GitHub Actions ML CI/CD
name: ML CI/CD Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'features/**'
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate Data Schema
run: python scripts/validate_data.py
train-and-evaluate:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train Model
run: python train.py --config configs/production.yaml
- name: Evaluate Model
run: python evaluate.py --model-path outputs/model
model-validation:
needs: train-and-evaluate
runs-on: ubuntu-latest
steps:
- name: Run Validation Gates
run: python model_validation.py
deploy-staging:
needs: model-validation
runs-on: ubuntu-latest
steps:
- name: Deploy to Staging
run: |
kubectl apply -f k8s/staging/
kubectl rollout status deployment/model-serving -n staging
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Canary Rollout
run: |
kubectl apply -f k8s/production/canary-10.yaml
sleep 300
python check_canary_metrics.py
kubectl apply -f k8s/production/full-rollout.yaml
13. Cost Optimization
13.1 Batch vs Real-Time Inference
# Batch inference: cost-efficient (bulk processing)
def batch_inference(model, data_path, output_path):
"""Batch inference using Spark"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("batch-inference").getOrCreate()
df = spark.read.parquet(data_path)
predict_udf = spark.udf.register(
"predict",
lambda features: float(model.predict([features])[0]),
)
result = df.withColumn("prediction", predict_udf(df["features"]))
result.write.parquet(output_path)
13.2 Prediction Caching
import redis
import hashlib
import json
class PredictionCache:
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
self.hit_count = 0
self.miss_count = 0
def _make_key(self, features: dict) -> str:
feature_str = json.dumps(features, sort_keys=True)
return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"
def get_or_predict(self, features: dict, model) -> dict:
key = self._make_key(features)
cached = self.redis.get(key)
if cached:
self.hit_count += 1
return json.loads(cached)
self.miss_count += 1
prediction = model.predict(features)
self.redis.setex(key, self.ttl, json.dumps(prediction))
return prediction
@property
def hit_rate(self):
total = self.hit_count + self.miss_count
return self.hit_count / total if total > 0 else 0
13.3 Model Compression
# Quantization
import torch
model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8,
)
# Size comparison
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)
print(f"Original: {original_size:.1f}MB -> Quantized: {quantized_size:.1f}MB")
print(f"Compression ratio: {(1 - quantized_size/original_size):.1%}")
13.4 Cost Optimization Checklist
| Strategy | Expected Savings | Trade-off |
|---|---|---|
| Switch to batch inference | 50-80% | Lose real-time capability |
| Prediction caching | 30-60% | Cache freshness |
| Model quantization | 40-60% | Slight accuracy loss |
| Spot/Preemptible instances | 60-90% | Availability risk |
| Auto-scaling | 20-40% | Cold start |
| Model distillation | 50-70% | Development cost |
14. Quiz
Q1: What is the core value of a Feature Store?
A: The core value is preventing Training-Serving Skew. By sharing a single feature definition and transformation logic between training and serving, it fundamentally eliminates data inconsistencies. Additional values include feature reuse, team collaboration, and feature governance.
Q2: Why is Point-in-Time Join necessary?
A: Point-in-Time Join prevents Data Leakage. When generating training data, you must join only the feature values that were actually available at the time of each event. If future data is included, the model shows unrealistically high performance during training but degrades significantly in production.
Q3: What is the key element for moving from MLOps Level 2 to Level 3?
A: Moving from Level 2 (CI/CD for ML) to Level 3 (auto-retraining) requires an automatic drift detection and retraining trigger system. This monitors data drift and prediction drift in real-time, automatically triggers the retraining pipeline when thresholds are exceeded, and auto-promotes through champion/challenger comparison.
Q4: What is the main difference between BentoML and Seldon Core?
A: BentoML is a framework-agnostic model packaging tool that creates Docker containers deployable anywhere. Seldon Core is a Kubernetes-native serving platform that deeply integrates with the K8s ecosystem through CRD-based deployment, A/B testing, canary deployment, and explainability (Explainer).
Q5: What are 3 main methods for detecting model drift?
A: (1) Data Drift: Detect changes in input feature distributions using KS test, PSI (Population Stability Index), etc. (2) Prediction Drift: Monitor changes in model output distributions. (3) Performance Drift: Track degradation in accuracy, F1, etc. by comparing with actual labels. Evidently AI and WhyLabs are representative tools.
15. References
- Feast Official Documentation - https://docs.feast.dev/
- MLflow Official Documentation - https://mlflow.org/docs/latest/index.html
- Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
- Evidently AI Documentation - https://docs.evidentlyai.com/
- BentoML Official Documentation - https://docs.bentoml.com/
- Seldon Core Documentation - https://docs.seldon.io/
- Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
- WhyLabs Documentation - https://docs.whylabs.ai/
- vLLM Project - https://docs.vllm.ai/
- Weights and Biases - https://docs.wandb.ai/
- Tecton Feature Store - https://docs.tecton.ai/
- Hopsworks Feature Store - https://docs.hopsworks.ai/
- NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/