✍️ 필사 모드: Feature Store & MLOps Pipeline Complete Guide 2025: Feast, Feature Engineering, Model Serving
EnglishTable of Contents
1. Why Feature Store?
1.1 The Training-Serving Skew Problem
The most frequent source of failures in ML model deployment is feature inconsistency between training and serving time.
When data scientists create features with Pandas in Jupyter Notebooks, and engineers re-implement the same logic in Java or Go, subtle differences creep in.
# Training time: Feature creation with Pandas
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling('7D').mean()
)
# Serving time: Re-implemented in SQL or another language -> subtle differences
# - Boundary conditions (inclusive/exclusive) differences
# - Timezone handling differences
# - NULL handling differences
Feature Store solves this problem fundamentally by sharing a single feature definition between both training and serving.
1.2 Feature Reuse and Team Collaboration
In large ML organizations, dozens of teams independently create similar features.
| Problem | Before Feature Store | After Feature Store |
|---|---|---|
| Feature duplication | Each team recreates similar features | Search/reuse from central registry |
| Consistency | Different computation logic per team | Single definition, version controlled |
| Freshness | Manual updates | Automated materialization |
| Discoverability | Depends on Slack/Wiki | Feature catalog + metadata |
| Governance | None | Owner, lineage, access control |
1.3 Reducing Data Pipeline Complexity
Without a Feature Store, each model maintains its own data pipeline:
Model A: raw data -> ETL A -> features A -> training A
Model B: raw data -> ETL B -> features B -> training B
Model C: raw data -> ETL C -> features C -> training C
After Feature Store adoption:
raw data -> Feature Store (centralized) -> Models A, B, C share features
2. Feature Store Architecture
2.1 Core Components
A Feature Store consists of four core components.
Offline Store
- Stores large volumes of historical feature data
- Used for training data generation
- BigQuery, Snowflake, Redshift, Parquet files
- Supports Point-in-Time Joins
Online Store
- Low-latency lookup of latest feature values
- Used for real-time inference
- Redis, DynamoDB, Bigtable
- Target P99 latency under 10ms
Feature Registry
- Manages metadata for all features
- Name, type, owner, description, tags
- Data lineage tracking
Transformation Engine
- Creates features from raw data
- Supports batch/streaming transformations
- Spark, Flink, dbt integration
2.2 Data Flow
[Data Sources] -> [Transform Engine] -> [Offline Store] <-> [Online Store]
^ | |
Raw Data Training Pipeline Inference Service
|
[Feature Registry]
(Metadata Management)
2.3 Feature Store Solution Comparison
| Solution | Type | Offline | Online | Streaming | Cost |
|---|---|---|---|---|---|
| Feast | OSS | Redshift/BQ/File | Redis/DynamoDB | Push-based | Free (infra only) |
| Tecton | Managed | Spark + Delta | DynamoDB | Spark Streaming | Subscription |
| Hopsworks | OSS/Managed | Hudi | RonDB | Flink | Community free |
| Vertex AI FS | GCP Managed | BigQuery | Bigtable | Dataflow | Usage-based |
| SageMaker FS | AWS Managed | S3 + Glue | Built-in Online | Kinesis | Usage-based |
3. Feast Deep Dive
3.1 Installation and Initial Setup
# Install Feast
pip install feast
# Initialize project
feast init my_feature_store
cd my_feature_store
# Project structure
# my_feature_store/
# feature_store.yaml # Project config
# features.py # Feature definitions
# data/ # Sample data
feature_store.yaml configuration:
project: my_ml_project
registry: data/registry.db
provider: local # local, gcp, aws
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2
3.2 Feature Definitions
# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
# Entity definition
user = Entity(
name="user_id",
join_keys=["user_id"],
description="Unique user identifier",
)
# Data source definition
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# Feature view definition
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="days_since_last_login", dtype=Int64),
Field(name="preferred_category", dtype=String),
],
online=True,
source=user_stats_source,
tags={"team": "recommendation", "version": "v2"},
)
3.3 Materialization (Offline to Online)
# Apply feature definitions
feast apply
# Materialize feature values from offline to online store
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59
# Incremental materialization (since last run)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
3.4 Training Data Generation (Point-in-Time Join)
Point-in-Time Join is the core capability of a Feature Store. It prevents data leakage while accurately retrieving feature values at specific historical timestamps.
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# Entity dataframe with timestamps
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1001],
"event_timestamp": pd.to_datetime([
"2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
]),
})
# Get features with Point-in-Time Join
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
).to_df()
print(training_df)
# Each row has feature values as of that point in time
# user 1001's features at 2024-09-01 != features at 2024-10-01
3.5 Online Serving
# Online feature retrieval for real-time inference
feature_vector = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
],
).to_dict()
# Result: Returns latest feature values (P99 latency ~5-10ms)
3.6 GCP/AWS Production Configuration
# GCP production setup
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
type: datastore # or bigtable
project_id: my-gcp-project
offline_store:
type: bigquery
project_id: my-gcp-project
dataset: feast_features
# AWS production setup
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
type: dynamodb
region: us-east-1
offline_store:
type: redshift
cluster_id: my-redshift-cluster
region: us-east-1
database: ml_features
user: feast_user
s3_staging_location: s3://my-bucket/feast-staging/
4. Feature Engineering Patterns
4.1 Temporal Features
import pandas as pd
def create_temporal_features(df, timestamp_col, entity_col, value_col):
"""Create time-window-based aggregation features"""
df = df.sort_values(timestamp_col)
features = pd.DataFrame()
features[entity_col] = df[entity_col]
features[timestamp_col] = df[timestamp_col]
# Moving averages across different windows
for window in ['1D', '7D', '30D']:
features[f'{value_col}_mean_{window}'] = (
df.groupby(entity_col)[value_col]
.transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
)
# Trend feature: 7-day avg / 30-day avg
features[f'{value_col}_trend_7d_30d'] = (
features[f'{value_col}_mean_7D'] /
features[f'{value_col}_mean_30D'].replace(0, float('nan'))
)
# Day of week / hour features
features['day_of_week'] = df[timestamp_col].dt.dayofweek
features['hour_of_day'] = df[timestamp_col].dt.hour
features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
return features
4.2 Aggregation Features
def create_aggregation_features(events_df, entity_col, group_col):
"""Per-entity aggregation features"""
agg_features = events_df.groupby(entity_col).agg(
event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
).reset_index()
# Ratio features
agg_features['high_value_ratio'] = (
events_df[events_df['amount'] > 100]
.groupby(entity_col)
.size()
.reindex(agg_features[entity_col], fill_value=0)
.values / agg_features['event_count']
)
return agg_features
4.3 Embedding Features
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
def create_text_embedding_features(df, text_col, prefix='emb'):
"""Generate text embedding features"""
embeddings = model.encode(df[text_col].fillna('').tolist())
emb_df = pd.DataFrame(
embeddings,
columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
index=df.index,
)
return pd.concat([df, emb_df], axis=1)
4.4 Cross Features
def create_cross_features(df):
"""Cross / interaction features between existing features"""
# Ratio feature
df['purchase_to_visit_ratio'] = (
df['purchase_count'] / df['visit_count'].replace(0, 1)
)
# Bucketing + cross
df['age_bucket'] = pd.cut(
df['age'], bins=[0, 25, 35, 50, 100],
labels=['young', 'middle', 'senior', 'elder']
)
df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']
# Numeric interaction
df['income_x_age'] = df['income'] * df['age']
return df
5. MLOps Maturity Levels
Google defines the MLOps maturity model in four levels.
Level 0: Manual Process
Data scientist manually:
1. Collects and preprocesses data
2. Trains model in Jupyter
3. Hands model file to engineer
4. Engineer writes serving code
5. Manual deployment
Problems:
- Deployment cycle: months
- No reproducibility
- No monitoring
Level 1: ML Pipeline Automation
Automated ML Pipeline:
1. Data validation -> Feature engineering -> Training -> Evaluation -> Deployment
2. Pipeline orchestrator (Kubeflow, Airflow)
3. CT (Continuous Training): trigger-based auto-retraining
Improvements:
- Deployment cycle: weekly
- Reproducible pipeline
- Basic monitoring
Level 2: CI/CD for ML
CI/CD Pipeline Integration:
1. Code change -> Auto test -> Pipeline build -> Model training
2. Model validation gates (performance thresholds)
3. Shadow Deployment -> Canary -> Full Rollout
4. Automated A/B testing
Improvements:
- Deployment cycle: daily
- Automatic rollback
- Systematic experiment management
Level 3: Auto-Retraining + Full Automation
Full Automation:
1. Drift detection -> Auto-retraining trigger
2. Auto feature selection / hyperparameter optimization
3. Automatic model comparison + champion/challenger
4. Auto-scaling + cost optimization
Improvements:
- Deployment cycle: hourly
- Unattended operation
- Proactive monitoring
6. ML Pipeline Orchestration
6.1 Kubeflow Pipelines
# Define pipeline with Kubeflow Pipelines DSL
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11",
packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
raw_data: Input[Dataset],
processed_data: Output[Dataset],
):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(raw_data.path)
scaler = StandardScaler()
df_scaled = pd.DataFrame(
scaler.fit_transform(df.select_dtypes(include='number')),
columns=df.select_dtypes(include='number').columns,
)
df_scaled.to_parquet(processed_data.path)
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
training_data: Input[Dataset],
model_artifact: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 10,
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import joblib
df = pd.read_parquet(training_data.path)
X = df.drop('target', axis=1)
y = df['target']
clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
)
scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')
clf.fit(X, y)
joblib.dump(clf, model_artifact.path)
metrics.log_metric("f1_mean", float(scores.mean()))
metrics.log_metric("f1_std", float(scores.std()))
@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
preprocess_task = preprocess_data(
raw_data=dsl.importer(
artifact_uri="gs://my-bucket/raw-data/",
artifact_class=Dataset,
).output,
)
train_task = train_model(
training_data=preprocess_task.outputs["processed_data"],
n_estimators=n_estimators,
max_depth=max_depth,
)
# Compile and run
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")
6.2 Airflow ML Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_training_pipeline',
default_args=default_args,
schedule_interval='@weekly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'training'],
) as dag:
validate_data = KubernetesPodOperator(
task_id='validate_data',
name='data-validation',
image='ml-pipeline:latest',
cmds=['python', 'validate.py'],
namespace='ml-pipelines',
)
extract_features = KubernetesPodOperator(
task_id='extract_features',
name='feature-extraction',
image='ml-pipeline:latest',
cmds=['python', 'extract_features.py'],
namespace='ml-pipelines',
)
train_model = KubernetesPodOperator(
task_id='train_model',
name='model-training',
image='ml-pipeline:latest',
cmds=['python', 'train.py'],
namespace='ml-pipelines',
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=lambda: print("Evaluating model..."),
)
validate_data >> extract_features >> train_model >> evaluate_model
6.3 Pipeline Solution Comparison
| Feature | Kubeflow | Airflow | Vertex AI | SageMaker |
|---|---|---|---|---|
| Runtime | Kubernetes | Various | GCP Managed | AWS Managed |
| ML-specific | High | Medium | High | High |
| UI/Visualization | Basic | Excellent | Excellent | Excellent |
| Scalability | High | High | High | High |
| Learning curve | Steep | Moderate | Low | Low |
| Cost | Infra only | Infra only | Usage-based | Usage-based |
7. Experiment Tracking
7.1 MLflow Experiment Tracking
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, precision_score, recall_score
# MLflow server configuration
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")
# Experiment run
with mlflow.start_run(run_name="gbm-experiment-42") as run:
# Log hyperparameters
params = {
"n_estimators": 200,
"max_depth": 8,
"learning_rate": 0.1,
"subsample": 0.8,
}
mlflow.log_params(params)
# Train model
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# Log metrics
y_pred = model.predict(X_test)
metrics = {
"f1": f1_score(y_test, y_pred, average='macro'),
"precision": precision_score(y_test, y_pred, average='macro'),
"recall": recall_score(y_test, y_pred, average='macro'),
}
mlflow.log_metrics(metrics)
# Log model artifact
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="recommendation-gbm",
)
mlflow.log_artifact("feature_importance.png")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
7.2 Weights and Biases Integration
import wandb
wandb.init(
project="recommendation-model",
config={
"architecture": "GBM",
"n_estimators": 200,
"learning_rate": 0.1,
},
)
# Log metrics during training loop
for epoch in range(100):
train_loss = train_one_epoch(model, train_loader)
val_loss, val_f1 = evaluate(model, val_loader)
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"val_loss": val_loss,
"val_f1": val_f1,
})
# Save model artifact
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)
wandb.finish()
8. Model Registry
8.1 MLflow Model Registry
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Create model version
model_version = client.create_model_version(
name="recommendation-gbm",
source=f"runs:/{run_id}/model",
run_id=run_id,
description="GBM model v3: added new features",
)
# Stage transition: None -> Staging
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Staging",
archive_existing_versions=False,
)
# Promote to Production after staging validation
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Production",
archive_existing_versions=True, # Archive existing production version
)
8.2 Model Versioning Strategy
Model Lifecycle:
None -> Staging -> Production -> Archived
Versioning Policy:
- Staging: Upon passing automated performance tests
- Production: Manual approval or A/B test pass
- Archived: Automatically when new version is promoted
- Keep 3 most recent versions, delete older ones
# Load production model
import mlflow.pyfunc
model = mlflow.pyfunc.load_model(
model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)
9. Model Serving
9.1 BentoML
# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON
# Load saved model
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()
svc = bentoml.Service("recommendation_service", runners=[runner])
@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
features = np.array(input_data["features"]).reshape(1, -1)
prediction = await runner.predict.async_run(features)
return {
"prediction": int(prediction[0]),
"model_version": "v3.2.1",
}
# bentofile.yaml
service: "service:svc"
include:
- "*.py"
python:
packages:
- scikit-learn
- numpy
docker:
python_version: "3.11"
# Build and deploy
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest
9.2 Seldon Core (Kubernetes)
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: recommendation-model
namespace: ml-serving
spec:
predictors:
- name: default
replicas: 3
graph:
name: classifier
implementation: SKLEARN_SERVER
modelUri: s3://models/recommendation/v3
envSecretRefName: s3-credentials
componentSpecs:
- spec:
containers:
- name: classifier
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
traffic: 100
labels:
version: v3
9.3 TensorFlow Serving
# Run TFServing Docker
docker run -p 8501:8501 \
--mount type=bind,source=/models/recommendation,target=/models/recommendation \
-e MODEL_NAME=recommendation \
tensorflow/serving:latest
# gRPC client
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
tf.make_tensor_proto(input_data, shape=[1, 128])
)
response = stub.Predict(request, timeout=5.0)
9.4 vLLM (LLM Serving)
# Start vLLM server
# python -m vllm.entrypoints.openai.api_server \
# --model meta-llama/Llama-3-8B-Instruct \
# --tensor-parallel-size 2 \
# --max-model-len 8192
# Client call
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed",
)
response = client.chat.completions.create(
model="meta-llama/Llama-3-8B-Instruct",
messages=[
{"role": "user", "content": "What are the benefits of recommendation systems?"},
],
temperature=0.7,
max_tokens=512,
)
9.5 Serving Solution Comparison
| Solution | Model Types | Deployment Env | Batch Inference | GPU Support | Auto-scaling |
|---|---|---|---|---|---|
| BentoML | Universal | Docker/K8s | Supported | Supported | Supported |
| Seldon Core | Universal | K8s only | Supported | Supported | HPA |
| TFServing | TF models | Docker/K8s | Supported | Supported | Manual |
| Triton | Multi-framework | Docker/K8s | Supported | Optimized | Supported |
| vLLM | LLM | Docker/K8s | Not supported | Required | Supported |
10. Model Monitoring
10.1 Data Drift Detection
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# Reference data (from training time)
reference_data = pd.read_parquet("training_data.parquet")
# Current data (production)
current_data = pd.read_parquet("production_data_latest.parquet")
column_mapping = ColumnMapping(
target='label',
numerical_features=['age', 'income', 'purchase_count'],
categorical_features=['category', 'region'],
)
# Generate drift report
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
# Save HTML report
report.save_html("drift_report.html")
# Programmatic access
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
if dataset_drift:
print(f"Drift detected! Drifted feature share: {drift_share:.2%}")
trigger_retraining_pipeline()
10.2 Prediction Drift Detection
from evidently.test_suite import TestSuite
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestMeanInNSigmas,
)
test_suite = TestSuite(tests=[
TestShareOfDriftedColumns(lt=0.3), # Less than 30% drifted features
TestColumnDrift(column_name="prediction_score"),
TestMeanInNSigmas(column_name="prediction_score", n=2),
])
test_suite.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
test_results = test_suite.as_dict()
all_passed = all(
t['status'] == 'SUCCESS' for t in test_results['tests']
)
if not all_passed:
alert_team("Model drift detected - retraining review required")
10.3 WhyLabs Integration
import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter
# Create profile
results = why.log(current_data)
profile = results.profile()
# Send to WhyLabs
writer = WhyLabsWriter()
writer.write(profile)
11. A/B Testing for ML Models
11.1 Traffic Splitting
# Traffic splitting with Istio VirtualService
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: recommendation-ab-test
spec:
hosts:
- recommendation-service
http:
- match:
- headers:
x-experiment-group:
exact: "treatment"
route:
- destination:
host: recommendation-service
subset: v2-challenger
weight: 100
- route:
- destination:
host: recommendation-service
subset: v1-champion
weight: 80
- destination:
host: recommendation-service
subset: v2-challenger
weight: 20
11.2 Statistical Significance Testing
from scipy import stats
import numpy as np
def ab_test_significance(
control_conversions, control_total,
treatment_conversions, treatment_total,
alpha=0.05,
):
"""A/B test statistical significance verification"""
control_rate = control_conversions / control_total
treatment_rate = treatment_conversions / treatment_total
# Z-test for proportions
pooled_rate = (
(control_conversions + treatment_conversions) /
(control_total + treatment_total)
)
se = np.sqrt(
pooled_rate * (1 - pooled_rate) *
(1/control_total + 1/treatment_total)
)
z_stat = (treatment_rate - control_rate) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))
lift = (treatment_rate - control_rate) / control_rate
return {
"control_rate": control_rate,
"treatment_rate": treatment_rate,
"lift": f"{lift:.2%}",
"p_value": p_value,
"significant": p_value < alpha,
"recommendation": (
"Promote challenger model" if p_value < alpha and lift > 0
else "Keep current model"
),
}
11.3 Multi-Armed Bandit
import numpy as np
class ThompsonSampling:
"""Adaptive traffic splitting using Thompson Sampling"""
def __init__(self, n_arms):
self.n_arms = n_arms
self.successes = np.ones(n_arms) # Beta prior alpha
self.failures = np.ones(n_arms) # Beta prior beta
def select_arm(self):
"""Sample from Beta distribution to select optimal arm"""
samples = [
np.random.beta(self.successes[i], self.failures[i])
for i in range(self.n_arms)
]
return int(np.argmax(samples))
def update(self, arm, reward):
"""Update results"""
if reward:
self.successes[arm] += 1
else:
self.failures[arm] += 1
def get_allocation(self):
"""Current traffic allocation ratio"""
total = self.successes + self.failures
rates = self.successes / total
return rates / rates.sum()
# Usage example
bandit = ThompsonSampling(n_arms=3) # 3 model versions
for request in incoming_requests:
arm = bandit.select_arm()
prediction = models[arm].predict(request)
reward = get_conversion(request, prediction)
bandit.update(arm, reward)
12. CI/CD for ML
12.1 Model Validation Gates
# model_validation.py
import mlflow
import json
def validate_model(
model_uri: str,
test_data_path: str,
min_f1: float = 0.85,
max_latency_ms: float = 50.0,
max_model_size_mb: float = 500.0,
):
"""Model deployment validation gates"""
results = {"passed": True, "checks": []}
# 1. Performance validation
model = mlflow.pyfunc.load_model(model_uri)
test_data = pd.read_parquet(test_data_path)
predictions = model.predict(test_data.drop('target', axis=1))
f1 = f1_score(test_data['target'], predictions, average='macro')
results["checks"].append({
"name": "performance",
"metric": "f1_score",
"value": f1,
"threshold": min_f1,
"passed": f1 >= min_f1,
})
# 2. Latency validation
import time
latencies = []
for i in range(100):
start = time.time()
model.predict(test_data.iloc[[i]])
latencies.append((time.time() - start) * 1000)
p99_latency = np.percentile(latencies, 99)
results["checks"].append({
"name": "latency",
"metric": "p99_ms",
"value": p99_latency,
"threshold": max_latency_ms,
"passed": p99_latency <= max_latency_ms,
})
# 3. Model size validation
import os
model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
results["checks"].append({
"name": "model_size",
"metric": "size_mb",
"value": model_size_mb,
"threshold": max_model_size_mb,
"passed": model_size_mb <= max_model_size_mb,
})
results["passed"] = all(c["passed"] for c in results["checks"])
return results
12.2 GitHub Actions ML CI/CD
name: ML CI/CD Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'features/**'
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate Data Schema
run: python scripts/validate_data.py
train-and-evaluate:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train Model
run: python train.py --config configs/production.yaml
- name: Evaluate Model
run: python evaluate.py --model-path outputs/model
model-validation:
needs: train-and-evaluate
runs-on: ubuntu-latest
steps:
- name: Run Validation Gates
run: python model_validation.py
deploy-staging:
needs: model-validation
runs-on: ubuntu-latest
steps:
- name: Deploy to Staging
run: |
kubectl apply -f k8s/staging/
kubectl rollout status deployment/model-serving -n staging
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Canary Rollout
run: |
kubectl apply -f k8s/production/canary-10.yaml
sleep 300
python check_canary_metrics.py
kubectl apply -f k8s/production/full-rollout.yaml
13. Cost Optimization
13.1 Batch vs Real-Time Inference
# Batch inference: cost-efficient (bulk processing)
def batch_inference(model, data_path, output_path):
"""Batch inference using Spark"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("batch-inference").getOrCreate()
df = spark.read.parquet(data_path)
predict_udf = spark.udf.register(
"predict",
lambda features: float(model.predict([features])[0]),
)
result = df.withColumn("prediction", predict_udf(df["features"]))
result.write.parquet(output_path)
13.2 Prediction Caching
import redis
import hashlib
import json
class PredictionCache:
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
self.hit_count = 0
self.miss_count = 0
def _make_key(self, features: dict) -> str:
feature_str = json.dumps(features, sort_keys=True)
return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"
def get_or_predict(self, features: dict, model) -> dict:
key = self._make_key(features)
cached = self.redis.get(key)
if cached:
self.hit_count += 1
return json.loads(cached)
self.miss_count += 1
prediction = model.predict(features)
self.redis.setex(key, self.ttl, json.dumps(prediction))
return prediction
@property
def hit_rate(self):
total = self.hit_count + self.miss_count
return self.hit_count / total if total > 0 else 0
13.3 Model Compression
# Quantization
import torch
model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8,
)
# Size comparison
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)
print(f"Original: {original_size:.1f}MB -> Quantized: {quantized_size:.1f}MB")
print(f"Compression ratio: {(1 - quantized_size/original_size):.1%}")
13.4 Cost Optimization Checklist
| Strategy | Expected Savings | Trade-off |
|---|---|---|
| Switch to batch inference | 50-80% | Lose real-time capability |
| Prediction caching | 30-60% | Cache freshness |
| Model quantization | 40-60% | Slight accuracy loss |
| Spot/Preemptible instances | 60-90% | Availability risk |
| Auto-scaling | 20-40% | Cold start |
| Model distillation | 50-70% | Development cost |
14. Quiz
Q1: What is the core value of a Feature Store?
A: The core value is preventing Training-Serving Skew. By sharing a single feature definition and transformation logic between training and serving, it fundamentally eliminates data inconsistencies. Additional values include feature reuse, team collaboration, and feature governance.
Q2: Why is Point-in-Time Join necessary?
A: Point-in-Time Join prevents Data Leakage. When generating training data, you must join only the feature values that were actually available at the time of each event. If future data is included, the model shows unrealistically high performance during training but degrades significantly in production.
Q3: What is the key element for moving from MLOps Level 2 to Level 3?
A: Moving from Level 2 (CI/CD for ML) to Level 3 (auto-retraining) requires an automatic drift detection and retraining trigger system. This monitors data drift and prediction drift in real-time, automatically triggers the retraining pipeline when thresholds are exceeded, and auto-promotes through champion/challenger comparison.
Q4: What is the main difference between BentoML and Seldon Core?
A: BentoML is a framework-agnostic model packaging tool that creates Docker containers deployable anywhere. Seldon Core is a Kubernetes-native serving platform that deeply integrates with the K8s ecosystem through CRD-based deployment, A/B testing, canary deployment, and explainability (Explainer).
Q5: What are 3 main methods for detecting model drift?
A: (1) Data Drift: Detect changes in input feature distributions using KS test, PSI (Population Stability Index), etc. (2) Prediction Drift: Monitor changes in model output distributions. (3) Performance Drift: Track degradation in accuracy, F1, etc. by comparing with actual labels. Evidently AI and WhyLabs are representative tools.
15. References
- Feast Official Documentation - https://docs.feast.dev/
- MLflow Official Documentation - https://mlflow.org/docs/latest/index.html
- Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
- Evidently AI Documentation - https://docs.evidentlyai.com/
- BentoML Official Documentation - https://docs.bentoml.com/
- Seldon Core Documentation - https://docs.seldon.io/
- Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
- WhyLabs Documentation - https://docs.whylabs.ai/
- vLLM Project - https://docs.vllm.ai/
- Weights and Biases - https://docs.wandb.ai/
- Tecton Feature Store - https://docs.tecton.ai/
- Hopsworks Feature Store - https://docs.hopsworks.ai/
- NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/
현재 단락 (1/837)
The most frequent source of failures in ML model deployment is **feature inconsistency between train...