Skip to content

✍️ 필사 모드: Feature Store & MLOps Pipeline Complete Guide 2025: Feast, Feature Engineering, Model Serving

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

Table of Contents

1. Why Feature Store?

1.1 The Training-Serving Skew Problem

The most frequent source of failures in ML model deployment is feature inconsistency between training and serving time.

When data scientists create features with Pandas in Jupyter Notebooks, and engineers re-implement the same logic in Java or Go, subtle differences creep in.

# Training time: Feature creation with Pandas
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
    lambda x: x.rolling('7D').mean()
)

# Serving time: Re-implemented in SQL or another language -> subtle differences
# - Boundary conditions (inclusive/exclusive) differences
# - Timezone handling differences
# - NULL handling differences

Feature Store solves this problem fundamentally by sharing a single feature definition between both training and serving.

1.2 Feature Reuse and Team Collaboration

In large ML organizations, dozens of teams independently create similar features.

ProblemBefore Feature StoreAfter Feature Store
Feature duplicationEach team recreates similar featuresSearch/reuse from central registry
ConsistencyDifferent computation logic per teamSingle definition, version controlled
FreshnessManual updatesAutomated materialization
DiscoverabilityDepends on Slack/WikiFeature catalog + metadata
GovernanceNoneOwner, lineage, access control

1.3 Reducing Data Pipeline Complexity

Without a Feature Store, each model maintains its own data pipeline:

Model A: raw data -> ETL A -> features A -> training A
Model B: raw data -> ETL B -> features B -> training B
Model C: raw data -> ETL C -> features C -> training C

After Feature Store adoption:

raw data -> Feature Store (centralized) -> Models A, B, C share features

2. Feature Store Architecture

2.1 Core Components

A Feature Store consists of four core components.

Offline Store

  • Stores large volumes of historical feature data
  • Used for training data generation
  • BigQuery, Snowflake, Redshift, Parquet files
  • Supports Point-in-Time Joins

Online Store

  • Low-latency lookup of latest feature values
  • Used for real-time inference
  • Redis, DynamoDB, Bigtable
  • Target P99 latency under 10ms

Feature Registry

  • Manages metadata for all features
  • Name, type, owner, description, tags
  • Data lineage tracking

Transformation Engine

  • Creates features from raw data
  • Supports batch/streaming transformations
  • Spark, Flink, dbt integration

2.2 Data Flow

[Data Sources] -> [Transform Engine] -> [Offline Store] <-> [Online Store]
      ^                                      |                    |
  Raw Data                            Training Pipeline     Inference Service
                                           |
                                    [Feature Registry]
                                    (Metadata Management)

2.3 Feature Store Solution Comparison

SolutionTypeOfflineOnlineStreamingCost
FeastOSSRedshift/BQ/FileRedis/DynamoDBPush-basedFree (infra only)
TectonManagedSpark + DeltaDynamoDBSpark StreamingSubscription
HopsworksOSS/ManagedHudiRonDBFlinkCommunity free
Vertex AI FSGCP ManagedBigQueryBigtableDataflowUsage-based
SageMaker FSAWS ManagedS3 + GlueBuilt-in OnlineKinesisUsage-based

3. Feast Deep Dive

3.1 Installation and Initial Setup

# Install Feast
pip install feast

# Initialize project
feast init my_feature_store
cd my_feature_store

# Project structure
# my_feature_store/
#   feature_store.yaml    # Project config
#   features.py           # Feature definitions
#   data/                 # Sample data

feature_store.yaml configuration:

project: my_ml_project
registry: data/registry.db
provider: local  # local, gcp, aws
online_store:
  type: redis
  connection_string: "localhost:6379"
offline_store:
  type: file  # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2

3.2 Feature Definitions

# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

# Entity definition
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="Unique user identifier",
)

# Data source definition
user_stats_source = FileSource(
    path="data/user_stats.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# Feature view definition
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Field(name="total_purchases", dtype=Int64),
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="days_since_last_login", dtype=Int64),
        Field(name="preferred_category", dtype=String),
    ],
    online=True,
    source=user_stats_source,
    tags={"team": "recommendation", "version": "v2"},
)

3.3 Materialization (Offline to Online)

# Apply feature definitions
feast apply

# Materialize feature values from offline to online store
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59

# Incremental materialization (since last run)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

3.4 Training Data Generation (Point-in-Time Join)

Point-in-Time Join is the core capability of a Feature Store. It prevents data leakage while accurately retrieving feature values at specific historical timestamps.

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# Entity dataframe with timestamps
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003, 1001],
    "event_timestamp": pd.to_datetime([
        "2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
    ]),
})

# Get features with Point-in-Time Join
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
).to_df()

print(training_df)
# Each row has feature values as of that point in time
# user 1001's features at 2024-09-01 != features at 2024-10-01

3.5 Online Serving

# Online feature retrieval for real-time inference
feature_vector = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_purchase_amount",
        "user_stats:days_since_last_login",
    ],
    entity_rows=[
        {"user_id": 1001},
        {"user_id": 1002},
    ],
).to_dict()

# Result: Returns latest feature values (P99 latency ~5-10ms)

3.6 GCP/AWS Production Configuration

# GCP production setup
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
  type: datastore  # or bigtable
  project_id: my-gcp-project
offline_store:
  type: bigquery
  project_id: my-gcp-project
  dataset: feast_features
# AWS production setup
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
  type: dynamodb
  region: us-east-1
offline_store:
  type: redshift
  cluster_id: my-redshift-cluster
  region: us-east-1
  database: ml_features
  user: feast_user
  s3_staging_location: s3://my-bucket/feast-staging/

4. Feature Engineering Patterns

4.1 Temporal Features

import pandas as pd

def create_temporal_features(df, timestamp_col, entity_col, value_col):
    """Create time-window-based aggregation features"""
    df = df.sort_values(timestamp_col)

    features = pd.DataFrame()
    features[entity_col] = df[entity_col]
    features[timestamp_col] = df[timestamp_col]

    # Moving averages across different windows
    for window in ['1D', '7D', '30D']:
        features[f'{value_col}_mean_{window}'] = (
            df.groupby(entity_col)[value_col]
            .transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
        )

    # Trend feature: 7-day avg / 30-day avg
    features[f'{value_col}_trend_7d_30d'] = (
        features[f'{value_col}_mean_7D'] /
        features[f'{value_col}_mean_30D'].replace(0, float('nan'))
    )

    # Day of week / hour features
    features['day_of_week'] = df[timestamp_col].dt.dayofweek
    features['hour_of_day'] = df[timestamp_col].dt.hour
    features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)

    return features

4.2 Aggregation Features

def create_aggregation_features(events_df, entity_col, group_col):
    """Per-entity aggregation features"""
    agg_features = events_df.groupby(entity_col).agg(
        event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
        unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
        total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
        avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
        max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
        std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
    ).reset_index()

    # Ratio features
    agg_features['high_value_ratio'] = (
        events_df[events_df['amount'] > 100]
        .groupby(entity_col)
        .size()
        .reindex(agg_features[entity_col], fill_value=0)
        .values / agg_features['event_count']
    )

    return agg_features

4.3 Embedding Features

from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer('all-MiniLM-L6-v2')

def create_text_embedding_features(df, text_col, prefix='emb'):
    """Generate text embedding features"""
    embeddings = model.encode(df[text_col].fillna('').tolist())
    emb_df = pd.DataFrame(
        embeddings,
        columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
        index=df.index,
    )
    return pd.concat([df, emb_df], axis=1)

4.4 Cross Features

def create_cross_features(df):
    """Cross / interaction features between existing features"""
    # Ratio feature
    df['purchase_to_visit_ratio'] = (
        df['purchase_count'] / df['visit_count'].replace(0, 1)
    )

    # Bucketing + cross
    df['age_bucket'] = pd.cut(
        df['age'], bins=[0, 25, 35, 50, 100],
        labels=['young', 'middle', 'senior', 'elder']
    )
    df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']

    # Numeric interaction
    df['income_x_age'] = df['income'] * df['age']

    return df

5. MLOps Maturity Levels

Google defines the MLOps maturity model in four levels.

Level 0: Manual Process

Data scientist manually:
1. Collects and preprocesses data
2. Trains model in Jupyter
3. Hands model file to engineer
4. Engineer writes serving code
5. Manual deployment

Problems:
- Deployment cycle: months
- No reproducibility
- No monitoring

Level 1: ML Pipeline Automation

Automated ML Pipeline:
1. Data validation -> Feature engineering -> Training -> Evaluation -> Deployment
2. Pipeline orchestrator (Kubeflow, Airflow)
3. CT (Continuous Training): trigger-based auto-retraining

Improvements:
- Deployment cycle: weekly
- Reproducible pipeline
- Basic monitoring

Level 2: CI/CD for ML

CI/CD Pipeline Integration:
1. Code change -> Auto test -> Pipeline build -> Model training
2. Model validation gates (performance thresholds)
3. Shadow Deployment -> Canary -> Full Rollout
4. Automated A/B testing

Improvements:
- Deployment cycle: daily
- Automatic rollback
- Systematic experiment management

Level 3: Auto-Retraining + Full Automation

Full Automation:
1. Drift detection -> Auto-retraining trigger
2. Auto feature selection / hyperparameter optimization
3. Automatic model comparison + champion/challenger
4. Auto-scaling + cost optimization

Improvements:
- Deployment cycle: hourly
- Unattended operation
- Proactive monitoring

6. ML Pipeline Orchestration

6.1 Kubeflow Pipelines

# Define pipeline with Kubeflow Pipelines DSL
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(
    base_image="python:3.11",
    packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
    raw_data: Input[Dataset],
    processed_data: Output[Dataset],
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    df = pd.read_parquet(raw_data.path)
    scaler = StandardScaler()
    df_scaled = pd.DataFrame(
        scaler.fit_transform(df.select_dtypes(include='number')),
        columns=df.select_dtypes(include='number').columns,
    )
    df_scaled.to_parquet(processed_data.path)


@dsl.component(
    base_image="python:3.11",
    packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
    training_data: Input[Dataset],
    model_artifact: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 100,
    max_depth: int = 10,
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import cross_val_score
    import joblib

    df = pd.read_parquet(training_data.path)
    X = df.drop('target', axis=1)
    y = df['target']

    clf = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42,
    )
    scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')

    clf.fit(X, y)
    joblib.dump(clf, model_artifact.path)

    metrics.log_metric("f1_mean", float(scores.mean()))
    metrics.log_metric("f1_std", float(scores.std()))


@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
    preprocess_task = preprocess_data(
        raw_data=dsl.importer(
            artifact_uri="gs://my-bucket/raw-data/",
            artifact_class=Dataset,
        ).output,
    )

    train_task = train_model(
        training_data=preprocess_task.outputs["processed_data"],
        n_estimators=n_estimators,
        max_depth=max_depth,
    )


# Compile and run
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

6.2 Airflow ML Pipeline

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'ml_training_pipeline',
    default_args=default_args,
    schedule_interval='@weekly',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'training'],
) as dag:

    validate_data = KubernetesPodOperator(
        task_id='validate_data',
        name='data-validation',
        image='ml-pipeline:latest',
        cmds=['python', 'validate.py'],
        namespace='ml-pipelines',
    )

    extract_features = KubernetesPodOperator(
        task_id='extract_features',
        name='feature-extraction',
        image='ml-pipeline:latest',
        cmds=['python', 'extract_features.py'],
        namespace='ml-pipelines',
    )

    train_model = KubernetesPodOperator(
        task_id='train_model',
        name='model-training',
        image='ml-pipeline:latest',
        cmds=['python', 'train.py'],
        namespace='ml-pipelines',
    )

    evaluate_model = PythonOperator(
        task_id='evaluate_model',
        python_callable=lambda: print("Evaluating model..."),
    )

    validate_data >> extract_features >> train_model >> evaluate_model

6.3 Pipeline Solution Comparison

FeatureKubeflowAirflowVertex AISageMaker
RuntimeKubernetesVariousGCP ManagedAWS Managed
ML-specificHighMediumHighHigh
UI/VisualizationBasicExcellentExcellentExcellent
ScalabilityHighHighHighHigh
Learning curveSteepModerateLowLow
CostInfra onlyInfra onlyUsage-basedUsage-based

7. Experiment Tracking

7.1 MLflow Experiment Tracking

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score, precision_score, recall_score

# MLflow server configuration
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")

# Experiment run
with mlflow.start_run(run_name="gbm-experiment-42") as run:
    # Log hyperparameters
    params = {
        "n_estimators": 200,
        "max_depth": 8,
        "learning_rate": 0.1,
        "subsample": 0.8,
    }
    mlflow.log_params(params)

    # Train model
    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)

    # Log metrics
    y_pred = model.predict(X_test)
    metrics = {
        "f1": f1_score(y_test, y_pred, average='macro'),
        "precision": precision_score(y_test, y_pred, average='macro'),
        "recall": recall_score(y_test, y_pred, average='macro'),
    }
    mlflow.log_metrics(metrics)

    # Log model artifact
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        registered_model_name="recommendation-gbm",
    )

    mlflow.log_artifact("feature_importance.png")

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

7.2 Weights and Biases Integration

import wandb

wandb.init(
    project="recommendation-model",
    config={
        "architecture": "GBM",
        "n_estimators": 200,
        "learning_rate": 0.1,
    },
)

# Log metrics during training loop
for epoch in range(100):
    train_loss = train_one_epoch(model, train_loader)
    val_loss, val_f1 = evaluate(model, val_loader)

    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "val_loss": val_loss,
        "val_f1": val_f1,
    })

# Save model artifact
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)

wandb.finish()

8. Model Registry

8.1 MLflow Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Create model version
model_version = client.create_model_version(
    name="recommendation-gbm",
    source=f"runs:/{run_id}/model",
    run_id=run_id,
    description="GBM model v3: added new features",
)

# Stage transition: None -> Staging
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Staging",
    archive_existing_versions=False,
)

# Promote to Production after staging validation
client.transition_model_version_stage(
    name="recommendation-gbm",
    version=model_version.version,
    stage="Production",
    archive_existing_versions=True,  # Archive existing production version
)

8.2 Model Versioning Strategy

Model Lifecycle:
None -> Staging -> Production -> Archived

Versioning Policy:
- Staging: Upon passing automated performance tests
- Production: Manual approval or A/B test pass
- Archived: Automatically when new version is promoted
- Keep 3 most recent versions, delete older ones
# Load production model
import mlflow.pyfunc

model = mlflow.pyfunc.load_model(
    model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)

9. Model Serving

9.1 BentoML

# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON

# Load saved model
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()

svc = bentoml.Service("recommendation_service", runners=[runner])

@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
    features = np.array(input_data["features"]).reshape(1, -1)
    prediction = await runner.predict.async_run(features)
    return {
        "prediction": int(prediction[0]),
        "model_version": "v3.2.1",
    }
# bentofile.yaml
service: "service:svc"
include:
  - "*.py"
python:
  packages:
    - scikit-learn
    - numpy
docker:
  python_version: "3.11"
# Build and deploy
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest

9.2 Seldon Core (Kubernetes)

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: recommendation-model
  namespace: ml-serving
spec:
  predictors:
    - name: default
      replicas: 3
      graph:
        name: classifier
        implementation: SKLEARN_SERVER
        modelUri: s3://models/recommendation/v3
        envSecretRefName: s3-credentials
      componentSpecs:
        - spec:
            containers:
              - name: classifier
                resources:
                  requests:
                    cpu: "1"
                    memory: "2Gi"
                  limits:
                    cpu: "2"
                    memory: "4Gi"
      traffic: 100
      labels:
        version: v3

9.3 TensorFlow Serving

# Run TFServing Docker
docker run -p 8501:8501 \
  --mount type=bind,source=/models/recommendation,target=/models/recommendation \
  -e MODEL_NAME=recommendation \
  tensorflow/serving:latest
# gRPC client
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
    tf.make_tensor_proto(input_data, shape=[1, 128])
)

response = stub.Predict(request, timeout=5.0)

9.4 vLLM (LLM Serving)

# Start vLLM server
# python -m vllm.entrypoints.openai.api_server \
#   --model meta-llama/Llama-3-8B-Instruct \
#   --tensor-parallel-size 2 \
#   --max-model-len 8192

# Client call
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed",
)

response = client.chat.completions.create(
    model="meta-llama/Llama-3-8B-Instruct",
    messages=[
        {"role": "user", "content": "What are the benefits of recommendation systems?"},
    ],
    temperature=0.7,
    max_tokens=512,
)

9.5 Serving Solution Comparison

SolutionModel TypesDeployment EnvBatch InferenceGPU SupportAuto-scaling
BentoMLUniversalDocker/K8sSupportedSupportedSupported
Seldon CoreUniversalK8s onlySupportedSupportedHPA
TFServingTF modelsDocker/K8sSupportedSupportedManual
TritonMulti-frameworkDocker/K8sSupportedOptimizedSupported
vLLMLLMDocker/K8sNot supportedRequiredSupported

10. Model Monitoring

10.1 Data Drift Detection

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

# Reference data (from training time)
reference_data = pd.read_parquet("training_data.parquet")
# Current data (production)
current_data = pd.read_parquet("production_data_latest.parquet")

column_mapping = ColumnMapping(
    target='label',
    numerical_features=['age', 'income', 'purchase_count'],
    categorical_features=['category', 'region'],
)

# Generate drift report
report = Report(metrics=[
    DataDriftPreset(),
    TargetDriftPreset(),
])

report.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

# Save HTML report
report.save_html("drift_report.html")

# Programmatic access
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']

if dataset_drift:
    print(f"Drift detected! Drifted feature share: {drift_share:.2%}")
    trigger_retraining_pipeline()

10.2 Prediction Drift Detection

from evidently.test_suite import TestSuite
from evidently.tests import (
    TestColumnDrift,
    TestShareOfDriftedColumns,
    TestMeanInNSigmas,
)

test_suite = TestSuite(tests=[
    TestShareOfDriftedColumns(lt=0.3),  # Less than 30% drifted features
    TestColumnDrift(column_name="prediction_score"),
    TestMeanInNSigmas(column_name="prediction_score", n=2),
])

test_suite.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping=column_mapping,
)

test_results = test_suite.as_dict()
all_passed = all(
    t['status'] == 'SUCCESS' for t in test_results['tests']
)

if not all_passed:
    alert_team("Model drift detected - retraining review required")

10.3 WhyLabs Integration

import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter

# Create profile
results = why.log(current_data)
profile = results.profile()

# Send to WhyLabs
writer = WhyLabsWriter()
writer.write(profile)

11. A/B Testing for ML Models

11.1 Traffic Splitting

# Traffic splitting with Istio VirtualService
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: recommendation-ab-test
spec:
  hosts:
    - recommendation-service
  http:
    - match:
        - headers:
            x-experiment-group:
              exact: "treatment"
      route:
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 100
    - route:
        - destination:
            host: recommendation-service
            subset: v1-champion
          weight: 80
        - destination:
            host: recommendation-service
            subset: v2-challenger
          weight: 20

11.2 Statistical Significance Testing

from scipy import stats
import numpy as np

def ab_test_significance(
    control_conversions, control_total,
    treatment_conversions, treatment_total,
    alpha=0.05,
):
    """A/B test statistical significance verification"""
    control_rate = control_conversions / control_total
    treatment_rate = treatment_conversions / treatment_total

    # Z-test for proportions
    pooled_rate = (
        (control_conversions + treatment_conversions) /
        (control_total + treatment_total)
    )
    se = np.sqrt(
        pooled_rate * (1 - pooled_rate) *
        (1/control_total + 1/treatment_total)
    )
    z_stat = (treatment_rate - control_rate) / se
    p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

    lift = (treatment_rate - control_rate) / control_rate

    return {
        "control_rate": control_rate,
        "treatment_rate": treatment_rate,
        "lift": f"{lift:.2%}",
        "p_value": p_value,
        "significant": p_value < alpha,
        "recommendation": (
            "Promote challenger model" if p_value < alpha and lift > 0
            else "Keep current model"
        ),
    }

11.3 Multi-Armed Bandit

import numpy as np

class ThompsonSampling:
    """Adaptive traffic splitting using Thompson Sampling"""

    def __init__(self, n_arms):
        self.n_arms = n_arms
        self.successes = np.ones(n_arms)  # Beta prior alpha
        self.failures = np.ones(n_arms)   # Beta prior beta

    def select_arm(self):
        """Sample from Beta distribution to select optimal arm"""
        samples = [
            np.random.beta(self.successes[i], self.failures[i])
            for i in range(self.n_arms)
        ]
        return int(np.argmax(samples))

    def update(self, arm, reward):
        """Update results"""
        if reward:
            self.successes[arm] += 1
        else:
            self.failures[arm] += 1

    def get_allocation(self):
        """Current traffic allocation ratio"""
        total = self.successes + self.failures
        rates = self.successes / total
        return rates / rates.sum()


# Usage example
bandit = ThompsonSampling(n_arms=3)  # 3 model versions

for request in incoming_requests:
    arm = bandit.select_arm()
    prediction = models[arm].predict(request)
    reward = get_conversion(request, prediction)
    bandit.update(arm, reward)

12. CI/CD for ML

12.1 Model Validation Gates

# model_validation.py
import mlflow
import json

def validate_model(
    model_uri: str,
    test_data_path: str,
    min_f1: float = 0.85,
    max_latency_ms: float = 50.0,
    max_model_size_mb: float = 500.0,
):
    """Model deployment validation gates"""
    results = {"passed": True, "checks": []}

    # 1. Performance validation
    model = mlflow.pyfunc.load_model(model_uri)
    test_data = pd.read_parquet(test_data_path)
    predictions = model.predict(test_data.drop('target', axis=1))
    f1 = f1_score(test_data['target'], predictions, average='macro')

    results["checks"].append({
        "name": "performance",
        "metric": "f1_score",
        "value": f1,
        "threshold": min_f1,
        "passed": f1 >= min_f1,
    })

    # 2. Latency validation
    import time
    latencies = []
    for i in range(100):
        start = time.time()
        model.predict(test_data.iloc[[i]])
        latencies.append((time.time() - start) * 1000)

    p99_latency = np.percentile(latencies, 99)
    results["checks"].append({
        "name": "latency",
        "metric": "p99_ms",
        "value": p99_latency,
        "threshold": max_latency_ms,
        "passed": p99_latency <= max_latency_ms,
    })

    # 3. Model size validation
    import os
    model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
    results["checks"].append({
        "name": "model_size",
        "metric": "size_mb",
        "value": model_size_mb,
        "threshold": max_model_size_mb,
        "passed": model_size_mb <= max_model_size_mb,
    })

    results["passed"] = all(c["passed"] for c in results["checks"])
    return results

12.2 GitHub Actions ML CI/CD

name: ML CI/CD Pipeline
on:
  push:
    branches: [main]
    paths:
      - 'models/**'
      - 'features/**'

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate Data Schema
        run: python scripts/validate_data.py

  train-and-evaluate:
    needs: data-validation
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v4
      - name: Train Model
        run: python train.py --config configs/production.yaml
      - name: Evaluate Model
        run: python evaluate.py --model-path outputs/model

  model-validation:
    needs: train-and-evaluate
    runs-on: ubuntu-latest
    steps:
      - name: Run Validation Gates
        run: python model_validation.py

  deploy-staging:
    needs: model-validation
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Staging
        run: |
          kubectl apply -f k8s/staging/
          kubectl rollout status deployment/model-serving -n staging

  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Canary Rollout
        run: |
          kubectl apply -f k8s/production/canary-10.yaml
          sleep 300
          python check_canary_metrics.py
          kubectl apply -f k8s/production/full-rollout.yaml

13. Cost Optimization

13.1 Batch vs Real-Time Inference

# Batch inference: cost-efficient (bulk processing)
def batch_inference(model, data_path, output_path):
    """Batch inference using Spark"""
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("batch-inference").getOrCreate()
    df = spark.read.parquet(data_path)

    predict_udf = spark.udf.register(
        "predict",
        lambda features: float(model.predict([features])[0]),
    )
    result = df.withColumn("prediction", predict_udf(df["features"]))
    result.write.parquet(output_path)

13.2 Prediction Caching

import redis
import hashlib
import json

class PredictionCache:
    def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
        self.hit_count = 0
        self.miss_count = 0

    def _make_key(self, features: dict) -> str:
        feature_str = json.dumps(features, sort_keys=True)
        return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"

    def get_or_predict(self, features: dict, model) -> dict:
        key = self._make_key(features)
        cached = self.redis.get(key)

        if cached:
            self.hit_count += 1
            return json.loads(cached)

        self.miss_count += 1
        prediction = model.predict(features)
        self.redis.setex(key, self.ttl, json.dumps(prediction))
        return prediction

    @property
    def hit_rate(self):
        total = self.hit_count + self.miss_count
        return self.hit_count / total if total > 0 else 0

13.3 Model Compression

# Quantization
import torch

model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8,
)

# Size comparison
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)

print(f"Original: {original_size:.1f}MB -> Quantized: {quantized_size:.1f}MB")
print(f"Compression ratio: {(1 - quantized_size/original_size):.1%}")

13.4 Cost Optimization Checklist

StrategyExpected SavingsTrade-off
Switch to batch inference50-80%Lose real-time capability
Prediction caching30-60%Cache freshness
Model quantization40-60%Slight accuracy loss
Spot/Preemptible instances60-90%Availability risk
Auto-scaling20-40%Cold start
Model distillation50-70%Development cost

14. Quiz

Q1: What is the core value of a Feature Store?

A: The core value is preventing Training-Serving Skew. By sharing a single feature definition and transformation logic between training and serving, it fundamentally eliminates data inconsistencies. Additional values include feature reuse, team collaboration, and feature governance.

Q2: Why is Point-in-Time Join necessary?

A: Point-in-Time Join prevents Data Leakage. When generating training data, you must join only the feature values that were actually available at the time of each event. If future data is included, the model shows unrealistically high performance during training but degrades significantly in production.

Q3: What is the key element for moving from MLOps Level 2 to Level 3?

A: Moving from Level 2 (CI/CD for ML) to Level 3 (auto-retraining) requires an automatic drift detection and retraining trigger system. This monitors data drift and prediction drift in real-time, automatically triggers the retraining pipeline when thresholds are exceeded, and auto-promotes through champion/challenger comparison.

Q4: What is the main difference between BentoML and Seldon Core?

A: BentoML is a framework-agnostic model packaging tool that creates Docker containers deployable anywhere. Seldon Core is a Kubernetes-native serving platform that deeply integrates with the K8s ecosystem through CRD-based deployment, A/B testing, canary deployment, and explainability (Explainer).

Q5: What are 3 main methods for detecting model drift?

A: (1) Data Drift: Detect changes in input feature distributions using KS test, PSI (Population Stability Index), etc. (2) Prediction Drift: Monitor changes in model output distributions. (3) Performance Drift: Track degradation in accuracy, F1, etc. by comparing with actual labels. Evidently AI and WhyLabs are representative tools.


15. References

  1. Feast Official Documentation - https://docs.feast.dev/
  2. MLflow Official Documentation - https://mlflow.org/docs/latest/index.html
  3. Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
  4. Evidently AI Documentation - https://docs.evidentlyai.com/
  5. BentoML Official Documentation - https://docs.bentoml.com/
  6. Seldon Core Documentation - https://docs.seldon.io/
  7. Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
  8. WhyLabs Documentation - https://docs.whylabs.ai/
  9. vLLM Project - https://docs.vllm.ai/
  10. Weights and Biases - https://docs.wandb.ai/
  11. Tecton Feature Store - https://docs.tecton.ai/
  12. Hopsworks Feature Store - https://docs.hopsworks.ai/
  13. NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/

현재 단락 (1/837)

The most frequent source of failures in ML model deployment is **feature inconsistency between train...

작성 글자: 0원문 글자: 29,680작성 단락: 0/837