- Published on
Feature Store & MLOps 파이프라인 완전 가이드 2025: Feast, Feature Engineering, 모델 서빙
- Authors

- Name
- Youngju Kim
- @fjvbn20031
목차
1. 왜 Feature Store인가?
1.1 Training-Serving Skew 문제
ML 모델 개발에서 가장 빈번한 장애 원인은 학습 시점과 서빙 시점의 피처 불일치입니다.
데이터 과학자가 Jupyter Notebook에서 Pandas로 피처를 생성하고, 엔지니어가 Java/Go로 동일한 로직을 재구현할 때 미묘한 차이가 발생합니다.
# 학습 시: Pandas로 피처 생성
df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(
lambda x: x.rolling('7D').mean()
)
# 서빙 시: SQL 또는 다른 언어로 재구현 → 미묘한 차이 발생
# - 경계 조건(inclusive/exclusive) 차이
# - 타임존 처리 차이
# - NULL 처리 방식 차이
Feature Store는 단일 피처 정의를 학습과 서빙 양쪽에서 공유하여 이 문제를 근본적으로 해결합니다.
1.2 피처 재사용과 팀 협업
대규모 ML 조직에서는 수십 개 팀이 유사한 피처를 독립적으로 생성합니다.
| 문제 | Feature Store 도입 전 | Feature Store 도입 후 |
|---|---|---|
| 피처 중복 | 팀마다 유사 피처 재생성 | 중앙 레지스트리에서 검색/재사용 |
| 일관성 | 팀마다 다른 계산 로직 | 단일 정의, 버전 관리 |
| 신선도 | 수동 업데이트 | 자동 materialization |
| 발견 가능성 | Slack/Wiki 의존 | 피처 카탈로그 + 메타데이터 |
| 거버넌스 | 없음 | 소유자, 라인리지, 접근 제어 |
1.3 데이터 파이프라인 복잡성 감소
Feature Store 없이는 각 모델이 자체 데이터 파이프라인을 유지합니다.
모델 A: raw data → ETL A → features A → training A
모델 B: raw data → ETL B → features B → training B
모델 C: raw data → ETL C → features C → training C
Feature Store 도입 후:
raw data → Feature Store (중앙 집중) → 모델 A, B, C 공유
2. Feature Store 아키텍처
2.1 핵심 구성 요소
Feature Store는 4가지 핵심 구성 요소로 이루어집니다.
오프라인 스토어 (Offline Store)
- 대량의 히스토리컬 피처 데이터 저장
- 학습 데이터 생성에 사용
- BigQuery, Snowflake, Redshift, Parquet 파일
- Point-in-Time Join 지원
온라인 스토어 (Online Store)
- 최신 피처 값의 저속 지연 조회
- 실시간 추론에 사용
- Redis, DynamoDB, Bigtable
- P99 레이턴시 10ms 이하 목표
피처 레지스트리 (Feature Registry)
- 모든 피처의 메타데이터 관리
- 이름, 타입, 소유자, 설명, 태그
- 데이터 라인리지 추적
변환 엔진 (Transformation Engine)
- 원시 데이터에서 피처 생성
- 배치/스트리밍 변환 지원
- Spark, Flink, dbt 통합
2.2 데이터 흐름
[데이터 소스] → [변환 엔진] → [오프라인 스토어] ←→ [온라인 스토어]
↑ ↓ ↓
원시 데이터 학습 파이프라인 추론 서비스
↓
[피처 레지스트리]
(메타데이터 관리)
2.3 주요 Feature Store 솔루션 비교
| 솔루션 | 유형 | 오프라인 | 온라인 | 스트리밍 | 비용 |
|---|---|---|---|---|---|
| Feast | OSS | Redshift/BQ/File | Redis/DynamoDB | Push 기반 | 무료(인프라 비용만) |
| Tecton | 관리형 | Spark + Delta | DynamoDB | Spark Streaming | 구독형 |
| Hopsworks | OSS/관리형 | Hudi | RonDB | Flink | 커뮤니티 무료 |
| Vertex AI FS | GCP 관리형 | BigQuery | Bigtable | Dataflow | 사용량 기반 |
| SageMaker FS | AWS 관리형 | S3 + Glue | 내장 온라인 | Kinesis | 사용량 기반 |
3. Feast Deep Dive
3.1 Feast 설치 및 초기 설정
# Feast 설치
pip install feast
# 프로젝트 초기화
feast init my_feature_store
cd my_feature_store
# 프로젝트 구조
# my_feature_store/
# feature_store.yaml # 프로젝트 설정
# features.py # 피처 정의
# data/ # 샘플 데이터
feature_store.yaml 설정:
project: my_ml_project
registry: data/registry.db
provider: local # local, gcp, aws
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file # file, bigquery, redshift, snowflake
entity_key_serialization_version: 2
3.2 피처 정의
# features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
# 엔티티 정의
user = Entity(
name="user_id",
join_keys=["user_id"],
description="사용자 고유 ID",
)
# 데이터 소스 정의
user_stats_source = FileSource(
path="data/user_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# 피처 뷰 정의
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="total_purchases", dtype=Int64),
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="days_since_last_login", dtype=Int64),
Field(name="preferred_category", dtype=String),
],
online=True,
source=user_stats_source,
tags={"team": "recommendation", "version": "v2"},
)
3.3 Materialization (오프라인 to 온라인)
# 피처 정의 적용
feast apply
# 오프라인 → 온라인 스토어로 피처 값 물리화
feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59
# 증분 물리화 (마지막 이후만)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
3.4 학습 데이터 생성 (Point-in-Time Join)
Point-in-Time Join은 Feature Store의 핵심 기능입니다. 데이터 누수(Data Leakage)를 방지하면서 과거 특정 시점의 피처 값을 정확히 가져옵니다.
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# 학습 데이터의 엔티티 + 타임스탬프
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003, 1001],
"event_timestamp": pd.to_datetime([
"2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"
]),
})
# Point-in-Time Join으로 피처 가져오기
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
).to_df()
print(training_df)
# 각 행은 해당 시점 기준의 피처 값을 가짐
# → 2024-09-01 시점의 user 1001 피처 != 2024-10-01 시점의 user 1001 피처
3.5 온라인 서빙
# 실시간 추론을 위한 온라인 피처 조회
feature_vector = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_purchase_amount",
"user_stats:days_since_last_login",
],
entity_rows=[
{"user_id": 1001},
{"user_id": 1002},
],
).to_dict()
# 결과: 가장 최신 피처 값 반환 (P99 레이턴시 약 5-10ms)
3.6 GCP/AWS 프로덕션 설정
# GCP 프로덕션 설정
project: production_ml
registry: gs://my-bucket/feast-registry/registry.db
provider: gcp
online_store:
type: datastore # 또는 bigtable
project_id: my-gcp-project
offline_store:
type: bigquery
project_id: my-gcp-project
dataset: feast_features
# AWS 프로덕션 설정
project: production_ml
registry: s3://my-bucket/feast-registry/registry.db
provider: aws
online_store:
type: dynamodb
region: ap-northeast-2
offline_store:
type: redshift
cluster_id: my-redshift-cluster
region: ap-northeast-2
database: ml_features
user: feast_user
s3_staging_location: s3://my-bucket/feast-staging/
4. Feature Engineering 패턴
4.1 시간 기반 피처 (Temporal Features)
import pandas as pd
def create_temporal_features(df, timestamp_col, entity_col, value_col):
"""시간 윈도우 기반 집계 피처 생성"""
df = df.sort_values(timestamp_col)
features = pd.DataFrame()
features[entity_col] = df[entity_col]
features[timestamp_col] = df[timestamp_col]
# 다양한 윈도우의 이동 평균
for window in ['1D', '7D', '30D']:
features[f'{value_col}_mean_{window}'] = (
df.groupby(entity_col)[value_col]
.transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())
)
# 추세 피처: 7일 평균 / 30일 평균
features[f'{value_col}_trend_7d_30d'] = (
features[f'{value_col}_mean_7D'] /
features[f'{value_col}_mean_30D'].replace(0, float('nan'))
)
# 요일/시간 피처
features['day_of_week'] = df[timestamp_col].dt.dayofweek
features['hour_of_day'] = df[timestamp_col].dt.hour
features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
return features
4.2 집계 피처 (Aggregation Features)
def create_aggregation_features(events_df, entity_col, group_col):
"""엔티티별 집계 피처"""
agg_features = events_df.groupby(entity_col).agg(
event_count=pd.NamedAgg(column=group_col, aggfunc='count'),
unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),
total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),
avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),
max_amount=pd.NamedAgg(column='amount', aggfunc='max'),
std_amount=pd.NamedAgg(column='amount', aggfunc='std'),
).reset_index()
# 비율 피처
agg_features['high_value_ratio'] = (
events_df[events_df['amount'] > 100]
.groupby(entity_col)
.size()
.reindex(agg_features[entity_col], fill_value=0)
.values / agg_features['event_count']
)
return agg_features
4.3 임베딩 피처
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
def create_text_embedding_features(df, text_col, prefix='emb'):
"""텍스트 임베딩 피처 생성"""
embeddings = model.encode(df[text_col].fillna('').tolist())
emb_df = pd.DataFrame(
embeddings,
columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],
index=df.index,
)
return pd.concat([df, emb_df], axis=1)
4.4 교차 피처 (Cross Features)
def create_cross_features(df):
"""피처 간 교차/상호작용 피처"""
# 비율 피처
df['purchase_to_visit_ratio'] = (
df['purchase_count'] / df['visit_count'].replace(0, 1)
)
# 구간화 + 교차
df['age_bucket'] = pd.cut(
df['age'], bins=[0, 25, 35, 50, 100],
labels=['young', 'middle', 'senior', 'elder']
)
df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']
# 수치 상호작용
df['income_x_age'] = df['income'] * df['age']
return df
5. MLOps 성숙도 레벨
Google에서 정의한 MLOps 성숙도 모델은 4단계로 나뉩니다.
Level 0: 수동 프로세스
데이터 과학자가 수동으로:
1. 데이터 수집 및 전처리
2. Jupyter에서 모델 학습
3. 모델 파일을 엔지니어에게 전달
4. 엔지니어가 서빙 코드 작성
5. 수동 배포
문제점:
- 배포 주기: 수개월
- 재현성 없음
- 모니터링 없음
Level 1: ML 파이프라인 자동화
자동화된 ML 파이프라인:
1. 데이터 검증 → 피처 엔지니어링 → 학습 → 평가 → 배포
2. 파이프라인 오케스트레이터 사용 (Kubeflow, Airflow)
3. CT (Continuous Training): 트리거 기반 자동 재학습
개선점:
- 배포 주기: 주 단위
- 파이프라인 재현 가능
- 기본 모니터링
Level 2: CI/CD for ML
CI/CD 파이프라인 통합:
1. 코드 변경 → 자동 테스트 → 파이프라인 빌드 → 모델 학습
2. 모델 검증 게이트 (성능 임계값)
3. Shadow Deployment → Canary → Full Rollout
4. A/B 테스트 자동화
개선점:
- 배포 주기: 일 단위
- 자동 롤백
- 체계적 실험 관리
Level 3: 자동 재학습 + 완전 자동화
완전 자동화:
1. 드리프트 감지 → 자동 재학습 트리거
2. 자동 피처 선택/하이퍼파라미터 최적화
3. 모델 성능 자동 비교 + 챔피언/챌린저
4. 자동 스케일링 + 비용 최적화
개선점:
- 배포 주기: 시간 단위
- 무인 운영
- 프로액티브 모니터링
6. ML 파이프라인 오케스트레이션
6.1 Kubeflow Pipelines
# Kubeflow Pipelines DSL로 파이프라인 정의
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11",
packages_to_install=["pandas", "scikit-learn"],
)
def preprocess_data(
raw_data: Input[Dataset],
processed_data: Output[Dataset],
):
import pandas as pd
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(raw_data.path)
scaler = StandardScaler()
df_scaled = pd.DataFrame(
scaler.fit_transform(df.select_dtypes(include='number')),
columns=df.select_dtypes(include='number').columns,
)
df_scaled.to_parquet(processed_data.path)
@dsl.component(
base_image="python:3.11",
packages_to_install=["scikit-learn", "pandas", "joblib"],
)
def train_model(
training_data: Input[Dataset],
model_artifact: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 10,
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import joblib
df = pd.read_parquet(training_data.path)
X = df.drop('target', axis=1)
y = df['target']
clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
)
scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')
clf.fit(X, y)
joblib.dump(clf, model_artifact.path)
metrics.log_metric("f1_mean", float(scores.mean()))
metrics.log_metric("f1_std", float(scores.std()))
@dsl.pipeline(name="ML Training Pipeline")
def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):
preprocess_task = preprocess_data(
raw_data=dsl.importer(
artifact_uri="gs://my-bucket/raw-data/",
artifact_class=Dataset,
).output,
)
train_task = train_model(
training_data=preprocess_task.outputs["processed_data"],
n_estimators=n_estimators,
max_depth=max_depth,
)
# 컴파일 및 실행
compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")
6.2 Airflow ML 파이프라인
# Airflow DAG으로 ML 파이프라인 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'ml_training_pipeline',
default_args=default_args,
schedule_interval='@weekly',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'training'],
) as dag:
validate_data = KubernetesPodOperator(
task_id='validate_data',
name='data-validation',
image='ml-pipeline:latest',
cmds=['python', 'validate.py'],
namespace='ml-pipelines',
)
extract_features = KubernetesPodOperator(
task_id='extract_features',
name='feature-extraction',
image='ml-pipeline:latest',
cmds=['python', 'extract_features.py'],
namespace='ml-pipelines',
)
train_model = KubernetesPodOperator(
task_id='train_model',
name='model-training',
image='ml-pipeline:latest',
cmds=['python', 'train.py'],
namespace='ml-pipelines',
resources={
'request_memory': '8Gi',
'request_cpu': '4',
'limit_gpu': '1',
},
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=lambda: print("Evaluating model..."),
)
validate_data >> extract_features >> train_model >> evaluate_model
6.3 파이프라인 솔루션 비교
| 기능 | Kubeflow | Airflow | Vertex AI | SageMaker |
|---|---|---|---|---|
| 실행 환경 | Kubernetes | 다양 | GCP 관리형 | AWS 관리형 |
| ML 특화 | 높음 | 중간 | 높음 | 높음 |
| UI/시각화 | 기본 | 우수 | 우수 | 우수 |
| 확장성 | 높음 | 높음 | 높음 | 높음 |
| 학습 곡선 | 높음 | 중간 | 낮음 | 낮음 |
| 비용 | 인프라만 | 인프라만 | 사용량 | 사용량 |
7. 실험 추적 (Experiment Tracking)
7.1 MLflow 실험 추적
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score
# MLflow 서버 설정
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model-v2")
# 실험 실행
with mlflow.start_run(run_name="gbm-experiment-42") as run:
# 하이퍼파라미터 로깅
params = {
"n_estimators": 200,
"max_depth": 8,
"learning_rate": 0.1,
"subsample": 0.8,
}
mlflow.log_params(params)
# 모델 학습
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# 메트릭 로깅
y_pred = model.predict(X_test)
metrics = {
"f1": f1_score(y_test, y_pred, average='macro'),
"precision": precision_score(y_test, y_pred, average='macro'),
"recall": recall_score(y_test, y_pred, average='macro'),
}
mlflow.log_metrics(metrics)
# 모델 아티팩트 로깅
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="recommendation-gbm",
)
# 커스텀 아티팩트 (피처 중요도 차트 등)
mlflow.log_artifact("feature_importance.png")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
7.2 Weights and Biases 통합
import wandb
wandb.init(
project="recommendation-model",
config={
"architecture": "GBM",
"n_estimators": 200,
"learning_rate": 0.1,
},
)
# 학습 루프에서 메트릭 로깅
for epoch in range(100):
train_loss = train_one_epoch(model, train_loader)
val_loss, val_f1 = evaluate(model, val_loader)
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"val_loss": val_loss,
"val_f1": val_f1,
})
# 모델 아티팩트 저장
artifact = wandb.Artifact('model-weights', type='model')
artifact.add_file('model.pt')
wandb.log_artifact(artifact)
wandb.finish()
8. 모델 레지스트리 (Model Registry)
8.1 MLflow Model Registry
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 모델 버전 생성
model_version = client.create_model_version(
name="recommendation-gbm",
source=f"runs:/{run_id}/model",
run_id=run_id,
description="GBM 모델 v3: 신규 피처 추가",
)
# 스테이지 전환: None → Staging
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Staging",
archive_existing_versions=False,
)
# 스테이징 검증 후 프로덕션 승격
client.transition_model_version_stage(
name="recommendation-gbm",
version=model_version.version,
stage="Production",
archive_existing_versions=True, # 기존 프로덕션 버전 아카이브
)
8.2 모델 버전 관리 전략
모델 라이프사이클:
None → Staging → Production → Archived
버전 관리 정책:
- Staging: 자동 성능 테스트 통과 시
- Production: 수동 승인 또는 A/B 테스트 통과 시
- Archived: 새 버전 프로덕션 승격 시 자동
- 최근 3개 버전 보관, 이전 버전 삭제
# 프로덕션 모델 로드
import mlflow.pyfunc
model = mlflow.pyfunc.load_model(
model_uri="models:/recommendation-gbm/Production"
)
predictions = model.predict(input_df)
9. 모델 서빙
9.1 BentoML
# service.py
import bentoml
import numpy as np
from bentoml.io import NumpyNdarray, JSON
# 저장된 모델 로드
runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()
svc = bentoml.Service("recommendation_service", runners=[runner])
@svc.api(input=JSON(), output=JSON())
async def predict(input_data: dict) -> dict:
features = np.array(input_data["features"]).reshape(1, -1)
prediction = await runner.predict.async_run(features)
return {
"prediction": int(prediction[0]),
"model_version": "v3.2.1",
}
# bentofile.yaml
service: "service:svc"
include:
- "*.py"
python:
packages:
- scikit-learn
- numpy
docker:
python_version: "3.11"
# 빌드 및 배포
bentoml build
bentoml containerize recommendation_service:latest
docker run -p 3000:3000 recommendation_service:latest
9.2 Seldon Core (Kubernetes)
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: recommendation-model
namespace: ml-serving
spec:
predictors:
- name: default
replicas: 3
graph:
name: classifier
implementation: SKLEARN_SERVER
modelUri: s3://models/recommendation/v3
envSecretRefName: s3-credentials
componentSpecs:
- spec:
containers:
- name: classifier
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
traffic: 100
labels:
version: v3
9.3 TensorFlow Serving
# TFServing Docker 실행
docker run -p 8501:8501 \
--mount type=bind,source=/models/recommendation,target=/models/recommendation \
-e MODEL_NAME=recommendation \
tensorflow/serving:latest
# gRPC 클라이언트
import grpc
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
request = predict_pb2.PredictRequest()
request.model_spec.name = 'recommendation'
request.model_spec.signature_name = 'serving_default'
request.inputs['input'].CopyFrom(
tf.make_tensor_proto(input_data, shape=[1, 128])
)
response = stub.Predict(request, timeout=5.0)
9.4 vLLM (LLM 서빙)
# vLLM 서버 실행
# python -m vllm.entrypoints.openai.api_server \
# --model meta-llama/Llama-3-8B-Instruct \
# --tensor-parallel-size 2 \
# --max-model-len 8192
# 클라이언트 호출
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed",
)
response = client.chat.completions.create(
model="meta-llama/Llama-3-8B-Instruct",
messages=[
{"role": "user", "content": "추천 시스템의 장점은?"},
],
temperature=0.7,
max_tokens=512,
)
9.5 서빙 솔루션 비교
| 솔루션 | 모델 유형 | 배포 환경 | 배치 추론 | GPU 지원 | 자동 스케일링 |
|---|---|---|---|---|---|
| BentoML | 범용 | Docker/K8s | 지원 | 지원 | 지원 |
| Seldon Core | 범용 | K8s 전용 | 지원 | 지원 | HPA |
| TFServing | TF 모델 | Docker/K8s | 지원 | 지원 | 수동 |
| Triton | 멀티프레임워크 | Docker/K8s | 지원 | 최적화 | 지원 |
| vLLM | LLM | Docker/K8s | 미지원 | 필수 | 지원 |
10. 모델 모니터링
10.1 데이터 드리프트 감지
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
# 기준 데이터 (학습 시점)
reference_data = pd.read_parquet("training_data.parquet")
# 현재 데이터 (프로덕션)
current_data = pd.read_parquet("production_data_latest.parquet")
column_mapping = ColumnMapping(
target='label',
numerical_features=['age', 'income', 'purchase_count'],
categorical_features=['category', 'region'],
)
# 드리프트 리포트 생성
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset(),
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
# HTML 리포트 저장
report.save_html("drift_report.html")
# 프로그래밍적 접근
result = report.as_dict()
dataset_drift = result['metrics'][0]['result']['dataset_drift']
drift_share = result['metrics'][0]['result']['share_of_drifted_columns']
if dataset_drift:
print(f"드리프트 감지! 드리프트 피처 비율: {drift_share:.2%}")
# 재학습 파이프라인 트리거
trigger_retraining_pipeline()
10.2 예측 드리프트 감지
from evidently.test_suite import TestSuite
from evidently.tests import (
TestColumnDrift,
TestShareOfDriftedColumns,
TestMeanInNSigmas,
)
test_suite = TestSuite(tests=[
TestShareOfDriftedColumns(lt=0.3), # 드리프트 피처 30% 미만
TestColumnDrift(column_name="prediction_score"),
TestMeanInNSigmas(column_name="prediction_score", n=2),
])
test_suite.run(
reference_data=reference_data,
current_data=current_data,
column_mapping=column_mapping,
)
# 테스트 결과 확인
test_results = test_suite.as_dict()
all_passed = all(
t['status'] == 'SUCCESS' for t in test_results['tests']
)
if not all_passed:
alert_team("모델 드리프트 감지 - 재학습 검토 필요")
10.3 WhyLabs 통합
import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter
# 프로파일 생성
results = why.log(current_data)
profile = results.profile()
# WhyLabs 전송
writer = WhyLabsWriter()
writer.write(profile)
# 커스텀 메트릭 모니터링
from whylogs.experimental.core.udf_schema import udf_schema
@udf_schema()
def custom_metrics(df):
return {
"avg_prediction_confidence": df['confidence'].mean(),
"null_feature_ratio": df.isnull().sum().sum() / df.size,
"prediction_distribution_skew": df['prediction'].skew(),
}
11. A/B 테스트
11.1 트래픽 분할
# Istio VirtualService로 트래픽 분할
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: recommendation-ab-test
spec:
hosts:
- recommendation-service
http:
- match:
- headers:
x-experiment-group:
exact: "treatment"
route:
- destination:
host: recommendation-service
subset: v2-challenger
weight: 100
- route:
- destination:
host: recommendation-service
subset: v1-champion
weight: 80
- destination:
host: recommendation-service
subset: v2-challenger
weight: 20
11.2 통계적 유의성 검증
from scipy import stats
import numpy as np
def ab_test_significance(
control_conversions, control_total,
treatment_conversions, treatment_total,
alpha=0.05,
):
"""A/B 테스트 통계적 유의성 검증"""
control_rate = control_conversions / control_total
treatment_rate = treatment_conversions / treatment_total
# Z-test for proportions
pooled_rate = (
(control_conversions + treatment_conversions) /
(control_total + treatment_total)
)
se = np.sqrt(
pooled_rate * (1 - pooled_rate) *
(1/control_total + 1/treatment_total)
)
z_stat = (treatment_rate - control_rate) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))
lift = (treatment_rate - control_rate) / control_rate
return {
"control_rate": control_rate,
"treatment_rate": treatment_rate,
"lift": f"{lift:.2%}",
"p_value": p_value,
"significant": p_value < alpha,
"recommendation": (
"챌린저 모델 승격" if p_value < alpha and lift > 0
else "기존 모델 유지"
),
}
11.3 Multi-Armed Bandit
import numpy as np
class ThompsonSampling:
"""Thompson Sampling을 이용한 적응적 트래픽 분할"""
def __init__(self, n_arms):
self.n_arms = n_arms
self.successes = np.ones(n_arms) # Beta prior alpha
self.failures = np.ones(n_arms) # Beta prior beta
def select_arm(self):
"""Beta 분포에서 샘플링하여 최적 arm 선택"""
samples = [
np.random.beta(self.successes[i], self.failures[i])
for i in range(self.n_arms)
]
return int(np.argmax(samples))
def update(self, arm, reward):
"""결과 업데이트"""
if reward:
self.successes[arm] += 1
else:
self.failures[arm] += 1
def get_allocation(self):
"""현재 트래픽 할당 비율"""
total = self.successes + self.failures
rates = self.successes / total
return rates / rates.sum()
# 사용 예시
bandit = ThompsonSampling(n_arms=3) # 3개 모델 버전
for request in incoming_requests:
arm = bandit.select_arm()
prediction = models[arm].predict(request)
reward = get_conversion(request, prediction)
bandit.update(arm, reward)
12. CI/CD for ML
12.1 모델 검증 게이트
# model_validation.py
import mlflow
import json
def validate_model(
model_uri: str,
test_data_path: str,
min_f1: float = 0.85,
max_latency_ms: float = 50.0,
max_model_size_mb: float = 500.0,
):
"""모델 배포 전 검증 게이트"""
results = {"passed": True, "checks": []}
# 1. 성능 검증
model = mlflow.pyfunc.load_model(model_uri)
test_data = pd.read_parquet(test_data_path)
predictions = model.predict(test_data.drop('target', axis=1))
f1 = f1_score(test_data['target'], predictions, average='macro')
results["checks"].append({
"name": "performance",
"metric": "f1_score",
"value": f1,
"threshold": min_f1,
"passed": f1 >= min_f1,
})
# 2. 레이턴시 검증
import time
latencies = []
for i in range(100):
start = time.time()
model.predict(test_data.iloc[[i]])
latencies.append((time.time() - start) * 1000)
p99_latency = np.percentile(latencies, 99)
results["checks"].append({
"name": "latency",
"metric": "p99_ms",
"value": p99_latency,
"threshold": max_latency_ms,
"passed": p99_latency <= max_latency_ms,
})
# 3. 모델 크기 검증
import os
model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)
results["checks"].append({
"name": "model_size",
"metric": "size_mb",
"value": model_size_mb,
"threshold": max_model_size_mb,
"passed": model_size_mb <= max_model_size_mb,
})
results["passed"] = all(c["passed"] for c in results["checks"])
return results
12.2 GitHub Actions ML CI/CD
name: ML CI/CD Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'features/**'
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate Data Schema
run: python scripts/validate_data.py
train-and-evaluate:
needs: data-validation
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v4
- name: Train Model
run: python train.py --config configs/production.yaml
- name: Evaluate Model
run: python evaluate.py --model-path outputs/model
- name: Upload Metrics
run: python upload_metrics.py
model-validation:
needs: train-and-evaluate
runs-on: ubuntu-latest
steps:
- name: Run Validation Gates
run: python model_validation.py
- name: Performance Regression Check
run: python check_regression.py
deploy-staging:
needs: model-validation
runs-on: ubuntu-latest
steps:
- name: Deploy to Staging
run: |
kubectl apply -f k8s/staging/
kubectl rollout status deployment/model-serving -n staging
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Shadow Deployment
run: kubectl apply -f k8s/production/shadow.yaml
- name: Canary Rollout
run: |
kubectl apply -f k8s/production/canary-10.yaml
sleep 300
python check_canary_metrics.py
kubectl apply -f k8s/production/canary-50.yaml
sleep 300
python check_canary_metrics.py
kubectl apply -f k8s/production/full-rollout.yaml
13. 비용 최적화
13.1 배치 vs 실시간 추론
# 배치 추론: 비용 효율적 (대량 처리)
def batch_inference(model, data_path, output_path):
"""Spark를 이용한 배치 추론"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("batch-inference").getOrCreate()
df = spark.read.parquet(data_path)
# UDF로 모델 적용
predict_udf = spark.udf.register(
"predict",
lambda features: float(model.predict([features])[0]),
)
result = df.withColumn("prediction", predict_udf(df["features"]))
result.write.parquet(output_path)
# 실시간 추론: 지연 시간 중요 (개별 요청)
# → 온라인 서빙 인프라 필요 (비용 높음)
13.2 예측 캐싱
import redis
import hashlib
import json
class PredictionCache:
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
self.hit_count = 0
self.miss_count = 0
def _make_key(self, features: dict) -> str:
feature_str = json.dumps(features, sort_keys=True)
return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"
def get_or_predict(self, features: dict, model) -> dict:
key = self._make_key(features)
cached = self.redis.get(key)
if cached:
self.hit_count += 1
return json.loads(cached)
self.miss_count += 1
prediction = model.predict(features)
self.redis.setex(key, self.ttl, json.dumps(prediction))
return prediction
@property
def hit_rate(self):
total = self.hit_count + self.miss_count
return self.hit_count / total if total > 0 else 0
13.3 모델 압축
# 양자화 (Quantization)
import torch
model = torch.load("model.pt")
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8,
)
# 크기 비교
original_size = os.path.getsize("model.pt") / (1024 * 1024)
torch.save(quantized_model.state_dict(), "model_quantized.pt")
quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)
print(f"원본: {original_size:.1f}MB → 양자화: {quantized_size:.1f}MB")
print(f"압축률: {(1 - quantized_size/original_size):.1%}")
13.4 비용 최적화 체크리스트
| 전략 | 예상 절감 | 트레이드오프 |
|---|---|---|
| 배치 추론 전환 | 50-80% | 실시간성 포기 |
| 예측 캐싱 | 30-60% | 캐시 신선도 |
| 모델 양자화 | 40-60% | 미세 정확도 손실 |
| Spot/Preemptible 인스턴스 | 60-90% | 가용성 불안정 |
| 자동 스케일링 | 20-40% | 콜드 스타트 |
| 모델 증류 | 50-70% | 개발 비용 |
14. 퀴즈
Q1: Feature Store의 핵심 가치는 무엇인가?
A: Feature Store의 핵심 가치는 Training-Serving Skew 방지입니다. 학습 시점과 서빙 시점에서 동일한 피처 정의와 변환 로직을 공유하여 데이터 불일치를 근본적으로 제거합니다. 추가로 피처 재사용, 팀 협업, 피처 거버넌스도 중요한 가치입니다.
Q2: Point-in-Time Join이 필요한 이유는?
A: Point-in-Time Join은 데이터 누수(Data Leakage)를 방지합니다. 학습 데이터를 생성할 때, 각 이벤트 시점에서 실제로 사용 가능했던 피처 값만 조인해야 합니다. 미래 데이터가 포함되면 모델이 학습 시 비현실적으로 높은 성능을 보이지만, 프로덕션에서는 성능이 크게 저하됩니다.
Q3: MLOps Level 2에서 Level 3으로 넘어가기 위한 핵심 요소는?
A: Level 2(CI/CD for ML)에서 Level 3(자동 재학습)으로 넘어가려면 자동 드리프트 감지 및 재학습 트리거 시스템이 필요합니다. 데이터 드리프트, 예측 드리프트를 실시간으로 모니터링하고, 임계값 초과 시 자동으로 재학습 파이프라인을 실행하며, 챔피언/챌린저 비교를 통해 자동 승격합니다.
Q4: BentoML과 Seldon Core의 주요 차이점은?
A: BentoML은 프레임워크 독립적인 모델 패키징 도구로, Docker 컨테이너를 생성하여 어디서든 배포할 수 있습니다. Seldon Core는 Kubernetes 네이티브 서빙 플랫폼으로, CRD 기반의 배포, A/B 테스트, Canary 배포, 설명 가능성(Explainer) 등 K8s 생태계와 깊이 통합됩니다.
Q5: 모델 드리프트를 감지하는 주요 방법 3가지는?
A: (1) 데이터 드리프트: 입력 피처 분포의 변화를 KS 검정, PSI(Population Stability Index) 등으로 감지. (2) 예측 드리프트: 모델 출력 분포의 변화를 모니터링. (3) 성능 드리프트: 실제 레이블과 비교하여 정확도, F1 등의 메트릭 저하를 추적. Evidently AI와 WhyLabs가 대표적인 도구입니다.
15. 참고 자료
- Feast 공식 문서 - https://docs.feast.dev/
- MLflow 공식 문서 - https://mlflow.org/docs/latest/index.html
- Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
- Evidently AI 문서 - https://docs.evidentlyai.com/
- BentoML 공식 문서 - https://docs.bentoml.com/
- Seldon Core 문서 - https://docs.seldon.io/
- Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/
- WhyLabs 문서 - https://docs.whylabs.ai/
- vLLM 프로젝트 - https://docs.vllm.ai/
- Weights and Biases - https://docs.wandb.ai/
- Tecton Feature Store - https://docs.tecton.ai/
- Hopsworks Feature Store - https://docs.hopsworks.ai/
- NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/