Skip to content

필사 모드: Feature Store & MLOps 파이프라인 완전 가이드 2025: Feast, Feature Engineering, 모델 서빙

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

목차

1. 왜 Feature Store인가?

1.1 Training-Serving Skew 문제

ML 모델 개발에서 가장 빈번한 장애 원인은 **학습 시점과 서빙 시점의 피처 불일치**입니다.

데이터 과학자가 Jupyter Notebook에서 Pandas로 피처를 생성하고, 엔지니어가 Java/Go로 동일한 로직을 재구현할 때 미묘한 차이가 발생합니다.

학습 시: Pandas로 피처 생성

df['avg_purchase_7d'] = df.groupby('user_id')['amount'].transform(

lambda x: x.rolling('7D').mean()

)

서빙 시: SQL 또는 다른 언어로 재구현 → 미묘한 차이 발생

- 경계 조건(inclusive/exclusive) 차이

- 타임존 처리 차이

- NULL 처리 방식 차이

Feature Store는 **단일 피처 정의**를 학습과 서빙 양쪽에서 공유하여 이 문제를 근본적으로 해결합니다.

1.2 피처 재사용과 팀 협업

대규모 ML 조직에서는 수십 개 팀이 유사한 피처를 독립적으로 생성합니다.

| 문제 | Feature Store 도입 전 | Feature Store 도입 후 |

|------|----------------------|---------------------|

| 피처 중복 | 팀마다 유사 피처 재생성 | 중앙 레지스트리에서 검색/재사용 |

| 일관성 | 팀마다 다른 계산 로직 | 단일 정의, 버전 관리 |

| 신선도 | 수동 업데이트 | 자동 materialization |

| 발견 가능성 | Slack/Wiki 의존 | 피처 카탈로그 + 메타데이터 |

| 거버넌스 | 없음 | 소유자, 라인리지, 접근 제어 |

1.3 데이터 파이프라인 복잡성 감소

Feature Store 없이는 각 모델이 자체 데이터 파이프라인을 유지합니다.

모델 A: raw data → ETL A → features A → training A

모델 B: raw data → ETL B → features B → training B

모델 C: raw data → ETL C → features C → training C

Feature Store 도입 후:

raw data → Feature Store (중앙 집중) → 모델 A, B, C 공유

2. Feature Store 아키텍처

2.1 핵심 구성 요소

Feature Store는 4가지 핵심 구성 요소로 이루어집니다.

**오프라인 스토어 (Offline Store)**

- 대량의 히스토리컬 피처 데이터 저장

- 학습 데이터 생성에 사용

- BigQuery, Snowflake, Redshift, Parquet 파일

- Point-in-Time Join 지원

**온라인 스토어 (Online Store)**

- 최신 피처 값의 저속 지연 조회

- 실시간 추론에 사용

- Redis, DynamoDB, Bigtable

- P99 레이턴시 10ms 이하 목표

**피처 레지스트리 (Feature Registry)**

- 모든 피처의 메타데이터 관리

- 이름, 타입, 소유자, 설명, 태그

- 데이터 라인리지 추적

**변환 엔진 (Transformation Engine)**

- 원시 데이터에서 피처 생성

- 배치/스트리밍 변환 지원

- Spark, Flink, dbt 통합

2.2 데이터 흐름

[데이터 소스] → [변환 엔진] → [오프라인 스토어] ←→ [온라인 스토어]

↑ ↓ ↓

원시 데이터 학습 파이프라인 추론 서비스

[피처 레지스트리]

(메타데이터 관리)

2.3 주요 Feature Store 솔루션 비교

| 솔루션 | 유형 | 오프라인 | 온라인 | 스트리밍 | 비용 |

|--------|------|---------|--------|---------|------|

| Feast | OSS | Redshift/BQ/File | Redis/DynamoDB | Push 기반 | 무료(인프라 비용만) |

| Tecton | 관리형 | Spark + Delta | DynamoDB | Spark Streaming | 구독형 |

| Hopsworks | OSS/관리형 | Hudi | RonDB | Flink | 커뮤니티 무료 |

| Vertex AI FS | GCP 관리형 | BigQuery | Bigtable | Dataflow | 사용량 기반 |

| SageMaker FS | AWS 관리형 | S3 + Glue | 내장 온라인 | Kinesis | 사용량 기반 |

3. Feast Deep Dive

3.1 Feast 설치 및 초기 설정

Feast 설치

pip install feast

프로젝트 초기화

feast init my_feature_store

cd my_feature_store

프로젝트 구조

my_feature_store/

feature_store.yaml # 프로젝트 설정

features.py # 피처 정의

data/ # 샘플 데이터

`feature_store.yaml` 설정:

project: my_ml_project

registry: data/registry.db

provider: local # local, gcp, aws

online_store:

type: redis

connection_string: "localhost:6379"

offline_store:

type: file # file, bigquery, redshift, snowflake

entity_key_serialization_version: 2

3.2 피처 정의

features.py

from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource

from feast.types import Float32, Int64, String

엔티티 정의

user = Entity(

name="user_id",

join_keys=["user_id"],

description="사용자 고유 ID",

)

데이터 소스 정의

user_stats_source = FileSource(

path="data/user_stats.parquet",

timestamp_field="event_timestamp",

created_timestamp_column="created_timestamp",

)

피처 뷰 정의

user_stats_fv = FeatureView(

name="user_stats",

entities=[user],

ttl=timedelta(days=1),

schema=[

Field(name="total_purchases", dtype=Int64),

Field(name="avg_purchase_amount", dtype=Float32),

Field(name="days_since_last_login", dtype=Int64),

Field(name="preferred_category", dtype=String),

],

online=True,

source=user_stats_source,

tags={"team": "recommendation", "version": "v2"},

)

3.3 Materialization (오프라인 to 온라인)

피처 정의 적용

feast apply

오프라인 → 온라인 스토어로 피처 값 물리화

feast materialize 2024-01-01T00:00:00 2024-12-31T23:59:59

증분 물리화 (마지막 이후만)

feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

3.4 학습 데이터 생성 (Point-in-Time Join)

Point-in-Time Join은 Feature Store의 핵심 기능입니다. **데이터 누수(Data Leakage)를 방지**하면서 과거 특정 시점의 피처 값을 정확히 가져옵니다.

from feast import FeatureStore

store = FeatureStore(repo_path=".")

학습 데이터의 엔티티 + 타임스탬프

entity_df = pd.DataFrame({

"user_id": [1001, 1002, 1003, 1001],

"event_timestamp": pd.to_datetime([

"2024-09-01", "2024-09-02", "2024-09-03", "2024-10-01"

]),

})

Point-in-Time Join으로 피처 가져오기

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"user_stats:total_purchases",

"user_stats:avg_purchase_amount",

"user_stats:days_since_last_login",

],

).to_df()

print(training_df)

각 행은 해당 시점 기준의 피처 값을 가짐

→ 2024-09-01 시점의 user 1001 피처 != 2024-10-01 시점의 user 1001 피처

3.5 온라인 서빙

실시간 추론을 위한 온라인 피처 조회

feature_vector = store.get_online_features(

features=[

"user_stats:total_purchases",

"user_stats:avg_purchase_amount",

"user_stats:days_since_last_login",

],

entity_rows=[

{"user_id": 1001},

{"user_id": 1002},

],

).to_dict()

결과: 가장 최신 피처 값 반환 (P99 레이턴시 약 5-10ms)

3.6 GCP/AWS 프로덕션 설정

GCP 프로덕션 설정

project: production_ml

registry: gs://my-bucket/feast-registry/registry.db

provider: gcp

online_store:

type: datastore # 또는 bigtable

project_id: my-gcp-project

offline_store:

type: bigquery

project_id: my-gcp-project

dataset: feast_features

AWS 프로덕션 설정

project: production_ml

registry: s3://my-bucket/feast-registry/registry.db

provider: aws

online_store:

type: dynamodb

region: ap-northeast-2

offline_store:

type: redshift

cluster_id: my-redshift-cluster

region: ap-northeast-2

database: ml_features

user: feast_user

s3_staging_location: s3://my-bucket/feast-staging/

4. Feature Engineering 패턴

4.1 시간 기반 피처 (Temporal Features)

def create_temporal_features(df, timestamp_col, entity_col, value_col):

"""시간 윈도우 기반 집계 피처 생성"""

df = df.sort_values(timestamp_col)

features = pd.DataFrame()

features[entity_col] = df[entity_col]

features[timestamp_col] = df[timestamp_col]

다양한 윈도우의 이동 평균

for window in ['1D', '7D', '30D']:

features[f'{value_col}_mean_{window}'] = (

df.groupby(entity_col)[value_col]

.transform(lambda x: x.rolling(window, on=df[timestamp_col]).mean())

)

추세 피처: 7일 평균 / 30일 평균

features[f'{value_col}_trend_7d_30d'] = (

features[f'{value_col}_mean_7D'] /

features[f'{value_col}_mean_30D'].replace(0, float('nan'))

)

요일/시간 피처

features['day_of_week'] = df[timestamp_col].dt.dayofweek

features['hour_of_day'] = df[timestamp_col].dt.hour

features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)

return features

4.2 집계 피처 (Aggregation Features)

def create_aggregation_features(events_df, entity_col, group_col):

"""엔티티별 집계 피처"""

agg_features = events_df.groupby(entity_col).agg(

event_count=pd.NamedAgg(column=group_col, aggfunc='count'),

unique_categories=pd.NamedAgg(column='category', aggfunc='nunique'),

total_amount=pd.NamedAgg(column='amount', aggfunc='sum'),

avg_amount=pd.NamedAgg(column='amount', aggfunc='mean'),

max_amount=pd.NamedAgg(column='amount', aggfunc='max'),

std_amount=pd.NamedAgg(column='amount', aggfunc='std'),

).reset_index()

비율 피처

agg_features['high_value_ratio'] = (

events_df[events_df['amount'] > 100]

.groupby(entity_col)

.size()

.reindex(agg_features[entity_col], fill_value=0)

.values / agg_features['event_count']

)

return agg_features

4.3 임베딩 피처

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L6-v2')

def create_text_embedding_features(df, text_col, prefix='emb'):

"""텍스트 임베딩 피처 생성"""

embeddings = model.encode(df[text_col].fillna('').tolist())

emb_df = pd.DataFrame(

embeddings,

columns=[f'{prefix}_{i}' for i in range(embeddings.shape[1])],

index=df.index,

)

return pd.concat([df, emb_df], axis=1)

4.4 교차 피처 (Cross Features)

def create_cross_features(df):

"""피처 간 교차/상호작용 피처"""

비율 피처

df['purchase_to_visit_ratio'] = (

df['purchase_count'] / df['visit_count'].replace(0, 1)

)

구간화 + 교차

df['age_bucket'] = pd.cut(

df['age'], bins=[0, 25, 35, 50, 100],

labels=['young', 'middle', 'senior', 'elder']

)

df['age_x_gender'] = df['age_bucket'].astype(str) + '_' + df['gender']

수치 상호작용

df['income_x_age'] = df['income'] * df['age']

return df

5. MLOps 성숙도 레벨

Google에서 정의한 MLOps 성숙도 모델은 4단계로 나뉩니다.

Level 0: 수동 프로세스

데이터 과학자가 수동으로:

1. 데이터 수집 및 전처리

2. Jupyter에서 모델 학습

3. 모델 파일을 엔지니어에게 전달

4. 엔지니어가 서빙 코드 작성

5. 수동 배포

문제점:

- 배포 주기: 수개월

- 재현성 없음

- 모니터링 없음

Level 1: ML 파이프라인 자동화

자동화된 ML 파이프라인:

1. 데이터 검증 → 피처 엔지니어링 → 학습 → 평가 → 배포

2. 파이프라인 오케스트레이터 사용 (Kubeflow, Airflow)

3. CT (Continuous Training): 트리거 기반 자동 재학습

개선점:

- 배포 주기: 주 단위

- 파이프라인 재현 가능

- 기본 모니터링

Level 2: CI/CD for ML

CI/CD 파이프라인 통합:

1. 코드 변경 → 자동 테스트 → 파이프라인 빌드 → 모델 학습

2. 모델 검증 게이트 (성능 임계값)

3. Shadow Deployment → Canary → Full Rollout

4. A/B 테스트 자동화

개선점:

- 배포 주기: 일 단위

- 자동 롤백

- 체계적 실험 관리

Level 3: 자동 재학습 + 완전 자동화

완전 자동화:

1. 드리프트 감지 → 자동 재학습 트리거

2. 자동 피처 선택/하이퍼파라미터 최적화

3. 모델 성능 자동 비교 + 챔피언/챌린저

4. 자동 스케일링 + 비용 최적화

개선점:

- 배포 주기: 시간 단위

- 무인 운영

- 프로액티브 모니터링

6. ML 파이프라인 오케스트레이션

6.1 Kubeflow Pipelines

Kubeflow Pipelines DSL로 파이프라인 정의

from kfp import dsl, compiler

from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(

base_image="python:3.11",

packages_to_install=["pandas", "scikit-learn"],

)

def preprocess_data(

raw_data: Input[Dataset],

processed_data: Output[Dataset],

):

from sklearn.preprocessing import StandardScaler

df = pd.read_parquet(raw_data.path)

scaler = StandardScaler()

df_scaled = pd.DataFrame(

scaler.fit_transform(df.select_dtypes(include='number')),

columns=df.select_dtypes(include='number').columns,

)

df_scaled.to_parquet(processed_data.path)

@dsl.component(

base_image="python:3.11",

packages_to_install=["scikit-learn", "pandas", "joblib"],

)

def train_model(

training_data: Input[Dataset],

model_artifact: Output[Model],

metrics: Output[Metrics],

n_estimators: int = 100,

max_depth: int = 10,

):

from sklearn.ensemble import RandomForestClassifier

from sklearn.model_selection import cross_val_score

df = pd.read_parquet(training_data.path)

X = df.drop('target', axis=1)

y = df['target']

clf = RandomForestClassifier(

n_estimators=n_estimators,

max_depth=max_depth,

random_state=42,

)

scores = cross_val_score(clf, X, y, cv=5, scoring='f1_macro')

clf.fit(X, y)

joblib.dump(clf, model_artifact.path)

metrics.log_metric("f1_mean", float(scores.mean()))

metrics.log_metric("f1_std", float(scores.std()))

@dsl.pipeline(name="ML Training Pipeline")

def ml_pipeline(n_estimators: int = 100, max_depth: int = 10):

preprocess_task = preprocess_data(

raw_data=dsl.importer(

artifact_uri="gs://my-bucket/raw-data/",

artifact_class=Dataset,

).output,

)

train_task = train_model(

training_data=preprocess_task.outputs["processed_data"],

n_estimators=n_estimators,

max_depth=max_depth,

)

컴파일 및 실행

compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

6.2 Airflow ML 파이프라인

Airflow DAG으로 ML 파이프라인 정의

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from datetime import datetime, timedelta

default_args = {

'owner': 'ml-team',

'depends_on_past': False,

'retries': 2,

'retry_delay': timedelta(minutes=5),

}

with DAG(

'ml_training_pipeline',

default_args=default_args,

schedule_interval='@weekly',

start_date=datetime(2024, 1, 1),

catchup=False,

tags=['ml', 'training'],

) as dag:

validate_data = KubernetesPodOperator(

task_id='validate_data',

name='data-validation',

image='ml-pipeline:latest',

cmds=['python', 'validate.py'],

namespace='ml-pipelines',

)

extract_features = KubernetesPodOperator(

task_id='extract_features',

name='feature-extraction',

image='ml-pipeline:latest',

cmds=['python', 'extract_features.py'],

namespace='ml-pipelines',

)

train_model = KubernetesPodOperator(

task_id='train_model',

name='model-training',

image='ml-pipeline:latest',

cmds=['python', 'train.py'],

namespace='ml-pipelines',

resources={

'request_memory': '8Gi',

'request_cpu': '4',

'limit_gpu': '1',

},

)

evaluate_model = PythonOperator(

task_id='evaluate_model',

python_callable=lambda: print("Evaluating model..."),

)

validate_data >> extract_features >> train_model >> evaluate_model

6.3 파이프라인 솔루션 비교

| 기능 | Kubeflow | Airflow | Vertex AI | SageMaker |

|------|----------|---------|-----------|-----------|

| 실행 환경 | Kubernetes | 다양 | GCP 관리형 | AWS 관리형 |

| ML 특화 | 높음 | 중간 | 높음 | 높음 |

| UI/시각화 | 기본 | 우수 | 우수 | 우수 |

| 확장성 | 높음 | 높음 | 높음 | 높음 |

| 학습 곡선 | 높음 | 중간 | 낮음 | 낮음 |

| 비용 | 인프라만 | 인프라만 | 사용량 | 사용량 |

7. 실험 추적 (Experiment Tracking)

7.1 MLflow 실험 추적

from sklearn.ensemble import GradientBoostingClassifier

from sklearn.model_selection import train_test_split

from sklearn.metrics import f1_score, precision_score, recall_score

MLflow 서버 설정

mlflow.set_tracking_uri("http://mlflow.internal:5000")

mlflow.set_experiment("recommendation-model-v2")

실험 실행

with mlflow.start_run(run_name="gbm-experiment-42") as run:

하이퍼파라미터 로깅

params = {

"n_estimators": 200,

"max_depth": 8,

"learning_rate": 0.1,

"subsample": 0.8,

}

mlflow.log_params(params)

모델 학습

model = GradientBoostingClassifier(**params)

model.fit(X_train, y_train)

메트릭 로깅

y_pred = model.predict(X_test)

metrics = {

"f1": f1_score(y_test, y_pred, average='macro'),

"precision": precision_score(y_test, y_pred, average='macro'),

"recall": recall_score(y_test, y_pred, average='macro'),

}

mlflow.log_metrics(metrics)

모델 아티팩트 로깅

mlflow.sklearn.log_model(

model,

artifact_path="model",

registered_model_name="recommendation-gbm",

)

커스텀 아티팩트 (피처 중요도 차트 등)

mlflow.log_artifact("feature_importance.png")

print(f"Run ID: {run.info.run_id}")

print(f"Metrics: {metrics}")

7.2 Weights and Biases 통합

wandb.init(

project="recommendation-model",

config={

"architecture": "GBM",

"n_estimators": 200,

"learning_rate": 0.1,

},

)

학습 루프에서 메트릭 로깅

for epoch in range(100):

train_loss = train_one_epoch(model, train_loader)

val_loss, val_f1 = evaluate(model, val_loader)

wandb.log({

"epoch": epoch,

"train_loss": train_loss,

"val_loss": val_loss,

"val_f1": val_f1,

})

모델 아티팩트 저장

artifact = wandb.Artifact('model-weights', type='model')

artifact.add_file('model.pt')

wandb.log_artifact(artifact)

wandb.finish()

8. 모델 레지스트리 (Model Registry)

8.1 MLflow Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

모델 버전 생성

model_version = client.create_model_version(

name="recommendation-gbm",

source=f"runs:/{run_id}/model",

run_id=run_id,

description="GBM 모델 v3: 신규 피처 추가",

)

스테이지 전환: None → Staging

client.transition_model_version_stage(

name="recommendation-gbm",

version=model_version.version,

stage="Staging",

archive_existing_versions=False,

)

스테이징 검증 후 프로덕션 승격

client.transition_model_version_stage(

name="recommendation-gbm",

version=model_version.version,

stage="Production",

archive_existing_versions=True, # 기존 프로덕션 버전 아카이브

)

8.2 모델 버전 관리 전략

모델 라이프사이클:

None → Staging → Production → Archived

버전 관리 정책:

- Staging: 자동 성능 테스트 통과 시

- Production: 수동 승인 또는 A/B 테스트 통과 시

- Archived: 새 버전 프로덕션 승격 시 자동

- 최근 3개 버전 보관, 이전 버전 삭제

프로덕션 모델 로드

model = mlflow.pyfunc.load_model(

model_uri="models:/recommendation-gbm/Production"

)

predictions = model.predict(input_df)

9. 모델 서빙

9.1 BentoML

service.py

from bentoml.io import NumpyNdarray, JSON

저장된 모델 로드

runner = bentoml.sklearn.get("recommendation_model:latest").to_runner()

svc = bentoml.Service("recommendation_service", runners=[runner])

@svc.api(input=JSON(), output=JSON())

async def predict(input_data: dict) -> dict:

features = np.array(input_data["features"]).reshape(1, -1)

prediction = await runner.predict.async_run(features)

return {

"prediction": int(prediction[0]),

"model_version": "v3.2.1",

}

bentofile.yaml

service: "service:svc"

include:

- "*.py"

python:

packages:

- scikit-learn

- numpy

docker:

python_version: "3.11"

빌드 및 배포

bentoml build

bentoml containerize recommendation_service:latest

docker run -p 3000:3000 recommendation_service:latest

9.2 Seldon Core (Kubernetes)

apiVersion: machinelearning.seldon.io/v1

kind: SeldonDeployment

metadata:

name: recommendation-model

namespace: ml-serving

spec:

predictors:

- name: default

replicas: 3

graph:

name: classifier

implementation: SKLEARN_SERVER

modelUri: s3://models/recommendation/v3

envSecretRefName: s3-credentials

componentSpecs:

- spec:

containers:

- name: classifier

resources:

requests:

cpu: "1"

memory: "2Gi"

limits:

cpu: "2"

memory: "4Gi"

traffic: 100

labels:

version: v3

9.3 TensorFlow Serving

TFServing Docker 실행

docker run -p 8501:8501 \

--mount type=bind,source=/models/recommendation,target=/models/recommendation \

-e MODEL_NAME=recommendation \

tensorflow/serving:latest

gRPC 클라이언트

from tensorflow_serving.apis import predict_pb2, prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')

stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()

request.model_spec.name = 'recommendation'

request.model_spec.signature_name = 'serving_default'

request.inputs['input'].CopyFrom(

tf.make_tensor_proto(input_data, shape=[1, 128])

)

response = stub.Predict(request, timeout=5.0)

9.4 vLLM (LLM 서빙)

vLLM 서버 실행

python -m vllm.entrypoints.openai.api_server \

--model meta-llama/Llama-3-8B-Instruct \

--tensor-parallel-size 2 \

--max-model-len 8192

클라이언트 호출

from openai import OpenAI

client = OpenAI(

base_url="http://localhost:8000/v1",

api_key="not-needed",

)

response = client.chat.completions.create(

model="meta-llama/Llama-3-8B-Instruct",

messages=[

{"role": "user", "content": "추천 시스템의 장점은?"},

],

temperature=0.7,

max_tokens=512,

)

9.5 서빙 솔루션 비교

| 솔루션 | 모델 유형 | 배포 환경 | 배치 추론 | GPU 지원 | 자동 스케일링 |

|--------|----------|----------|----------|---------|-------------|

| BentoML | 범용 | Docker/K8s | 지원 | 지원 | 지원 |

| Seldon Core | 범용 | K8s 전용 | 지원 | 지원 | HPA |

| TFServing | TF 모델 | Docker/K8s | 지원 | 지원 | 수동 |

| Triton | 멀티프레임워크 | Docker/K8s | 지원 | 최적화 | 지원 |

| vLLM | LLM | Docker/K8s | 미지원 | 필수 | 지원 |

10. 모델 모니터링

10.1 데이터 드리프트 감지

from evidently import ColumnMapping

from evidently.report import Report

from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

기준 데이터 (학습 시점)

reference_data = pd.read_parquet("training_data.parquet")

현재 데이터 (프로덕션)

current_data = pd.read_parquet("production_data_latest.parquet")

column_mapping = ColumnMapping(

target='label',

numerical_features=['age', 'income', 'purchase_count'],

categorical_features=['category', 'region'],

)

드리프트 리포트 생성

report = Report(metrics=[

DataDriftPreset(),

TargetDriftPreset(),

])

report.run(

reference_data=reference_data,

current_data=current_data,

column_mapping=column_mapping,

)

HTML 리포트 저장

report.save_html("drift_report.html")

프로그래밍적 접근

result = report.as_dict()

dataset_drift = result['metrics'][0]['result']['dataset_drift']

drift_share = result['metrics'][0]['result']['share_of_drifted_columns']

if dataset_drift:

print(f"드리프트 감지! 드리프트 피처 비율: {drift_share:.2%}")

재학습 파이프라인 트리거

trigger_retraining_pipeline()

10.2 예측 드리프트 감지

from evidently.test_suite import TestSuite

from evidently.tests import (

TestColumnDrift,

TestShareOfDriftedColumns,

TestMeanInNSigmas,

)

test_suite = TestSuite(tests=[

TestShareOfDriftedColumns(lt=0.3), # 드리프트 피처 30% 미만

TestColumnDrift(column_name="prediction_score"),

TestMeanInNSigmas(column_name="prediction_score", n=2),

])

test_suite.run(

reference_data=reference_data,

current_data=current_data,

column_mapping=column_mapping,

)

테스트 결과 확인

test_results = test_suite.as_dict()

all_passed = all(

t['status'] == 'SUCCESS' for t in test_results['tests']

)

if not all_passed:

alert_team("모델 드리프트 감지 - 재학습 검토 필요")

10.3 WhyLabs 통합

from whylogs.api.writer.whylabs import WhyLabsWriter

프로파일 생성

results = why.log(current_data)

profile = results.profile()

WhyLabs 전송

writer = WhyLabsWriter()

writer.write(profile)

커스텀 메트릭 모니터링

from whylogs.experimental.core.udf_schema import udf_schema

@udf_schema()

def custom_metrics(df):

return {

"avg_prediction_confidence": df['confidence'].mean(),

"null_feature_ratio": df.isnull().sum().sum() / df.size,

"prediction_distribution_skew": df['prediction'].skew(),

}

11. A/B 테스트

11.1 트래픽 분할

Istio VirtualService로 트래픽 분할

apiVersion: networking.istio.io/v1beta1

kind: VirtualService

metadata:

name: recommendation-ab-test

spec:

hosts:

- recommendation-service

http:

- match:

- headers:

x-experiment-group:

exact: "treatment"

route:

- destination:

host: recommendation-service

subset: v2-challenger

weight: 100

- route:

- destination:

host: recommendation-service

subset: v1-champion

weight: 80

- destination:

host: recommendation-service

subset: v2-challenger

weight: 20

11.2 통계적 유의성 검증

from scipy import stats

def ab_test_significance(

control_conversions, control_total,

treatment_conversions, treatment_total,

alpha=0.05,

):

"""A/B 테스트 통계적 유의성 검증"""

control_rate = control_conversions / control_total

treatment_rate = treatment_conversions / treatment_total

Z-test for proportions

pooled_rate = (

(control_conversions + treatment_conversions) /

(control_total + treatment_total)

)

se = np.sqrt(

pooled_rate * (1 - pooled_rate) *

(1/control_total + 1/treatment_total)

)

z_stat = (treatment_rate - control_rate) / se

p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

lift = (treatment_rate - control_rate) / control_rate

return {

"control_rate": control_rate,

"treatment_rate": treatment_rate,

"lift": f"{lift:.2%}",

"p_value": p_value,

"significant": p_value < alpha,

"recommendation": (

"챌린저 모델 승격" if p_value < alpha and lift > 0

else "기존 모델 유지"

),

}

11.3 Multi-Armed Bandit

class ThompsonSampling:

"""Thompson Sampling을 이용한 적응적 트래픽 분할"""

def __init__(self, n_arms):

self.n_arms = n_arms

self.successes = np.ones(n_arms) # Beta prior alpha

self.failures = np.ones(n_arms) # Beta prior beta

def select_arm(self):

"""Beta 분포에서 샘플링하여 최적 arm 선택"""

samples = [

np.random.beta(self.successes[i], self.failures[i])

for i in range(self.n_arms)

]

return int(np.argmax(samples))

def update(self, arm, reward):

"""결과 업데이트"""

if reward:

self.successes[arm] += 1

else:

self.failures[arm] += 1

def get_allocation(self):

"""현재 트래픽 할당 비율"""

total = self.successes + self.failures

rates = self.successes / total

return rates / rates.sum()

사용 예시

bandit = ThompsonSampling(n_arms=3) # 3개 모델 버전

for request in incoming_requests:

arm = bandit.select_arm()

prediction = models[arm].predict(request)

reward = get_conversion(request, prediction)

bandit.update(arm, reward)

12. CI/CD for ML

12.1 모델 검증 게이트

model_validation.py

def validate_model(

model_uri: str,

test_data_path: str,

min_f1: float = 0.85,

max_latency_ms: float = 50.0,

max_model_size_mb: float = 500.0,

):

"""모델 배포 전 검증 게이트"""

results = {"passed": True, "checks": []}

1. 성능 검증

model = mlflow.pyfunc.load_model(model_uri)

test_data = pd.read_parquet(test_data_path)

predictions = model.predict(test_data.drop('target', axis=1))

f1 = f1_score(test_data['target'], predictions, average='macro')

results["checks"].append({

"name": "performance",

"metric": "f1_score",

"value": f1,

"threshold": min_f1,

"passed": f1 >= min_f1,

})

2. 레이턴시 검증

latencies = []

for i in range(100):

start = time.time()

model.predict(test_data.iloc[[i]])

latencies.append((time.time() - start) * 1000)

p99_latency = np.percentile(latencies, 99)

results["checks"].append({

"name": "latency",

"metric": "p99_ms",

"value": p99_latency,

"threshold": max_latency_ms,

"passed": p99_latency <= max_latency_ms,

})

3. 모델 크기 검증

model_size_mb = os.path.getsize(model_uri) / (1024 * 1024)

results["checks"].append({

"name": "model_size",

"metric": "size_mb",

"value": model_size_mb,

"threshold": max_model_size_mb,

"passed": model_size_mb <= max_model_size_mb,

})

results["passed"] = all(c["passed"] for c in results["checks"])

return results

12.2 GitHub Actions ML CI/CD

name: ML CI/CD Pipeline

on:

push:

branches: [main]

paths:

- 'models/**'

- 'features/**'

jobs:

data-validation:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Validate Data Schema

run: python scripts/validate_data.py

train-and-evaluate:

needs: data-validation

runs-on: [self-hosted, gpu]

steps:

- uses: actions/checkout@v4

- name: Train Model

run: python train.py --config configs/production.yaml

- name: Evaluate Model

run: python evaluate.py --model-path outputs/model

- name: Upload Metrics

run: python upload_metrics.py

model-validation:

needs: train-and-evaluate

runs-on: ubuntu-latest

steps:

- name: Run Validation Gates

run: python model_validation.py

- name: Performance Regression Check

run: python check_regression.py

deploy-staging:

needs: model-validation

runs-on: ubuntu-latest

steps:

- name: Deploy to Staging

run: |

kubectl apply -f k8s/staging/

kubectl rollout status deployment/model-serving -n staging

deploy-production:

needs: deploy-staging

runs-on: ubuntu-latest

environment: production

steps:

- name: Shadow Deployment

run: kubectl apply -f k8s/production/shadow.yaml

- name: Canary Rollout

run: |

kubectl apply -f k8s/production/canary-10.yaml

sleep 300

python check_canary_metrics.py

kubectl apply -f k8s/production/canary-50.yaml

sleep 300

python check_canary_metrics.py

kubectl apply -f k8s/production/full-rollout.yaml

13. 비용 최적화

13.1 배치 vs 실시간 추론

배치 추론: 비용 효율적 (대량 처리)

def batch_inference(model, data_path, output_path):

"""Spark를 이용한 배치 추론"""

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("batch-inference").getOrCreate()

df = spark.read.parquet(data_path)

UDF로 모델 적용

predict_udf = spark.udf.register(

"predict",

lambda features: float(model.predict([features])[0]),

)

result = df.withColumn("prediction", predict_udf(df["features"]))

result.write.parquet(output_path)

실시간 추론: 지연 시간 중요 (개별 요청)

→ 온라인 서빙 인프라 필요 (비용 높음)

13.2 예측 캐싱

class PredictionCache:

def __init__(self, redis_url="redis://localhost:6379", ttl=3600):

self.redis = redis.from_url(redis_url)

self.ttl = ttl

self.hit_count = 0

self.miss_count = 0

def _make_key(self, features: dict) -> str:

feature_str = json.dumps(features, sort_keys=True)

return f"pred:{hashlib.md5(feature_str.encode()).hexdigest()}"

def get_or_predict(self, features: dict, model) -> dict:

key = self._make_key(features)

cached = self.redis.get(key)

if cached:

self.hit_count += 1

return json.loads(cached)

self.miss_count += 1

prediction = model.predict(features)

self.redis.setex(key, self.ttl, json.dumps(prediction))

return prediction

@property

def hit_rate(self):

total = self.hit_count + self.miss_count

return self.hit_count / total if total > 0 else 0

13.3 모델 압축

양자화 (Quantization)

model = torch.load("model.pt")

quantized_model = torch.quantization.quantize_dynamic(

model,

{torch.nn.Linear},

dtype=torch.qint8,

)

크기 비교

original_size = os.path.getsize("model.pt") / (1024 * 1024)

torch.save(quantized_model.state_dict(), "model_quantized.pt")

quantized_size = os.path.getsize("model_quantized.pt") / (1024 * 1024)

print(f"원본: {original_size:.1f}MB → 양자화: {quantized_size:.1f}MB")

print(f"압축률: {(1 - quantized_size/original_size):.1%}")

13.4 비용 최적화 체크리스트

| 전략 | 예상 절감 | 트레이드오프 |

|------|----------|------------|

| 배치 추론 전환 | 50-80% | 실시간성 포기 |

| 예측 캐싱 | 30-60% | 캐시 신선도 |

| 모델 양자화 | 40-60% | 미세 정확도 손실 |

| Spot/Preemptible 인스턴스 | 60-90% | 가용성 불안정 |

| 자동 스케일링 | 20-40% | 콜드 스타트 |

| 모델 증류 | 50-70% | 개발 비용 |

14. 퀴즈

**A:** Feature Store의 핵심 가치는 **Training-Serving Skew 방지**입니다. 학습 시점과 서빙 시점에서 동일한 피처 정의와 변환 로직을 공유하여 데이터 불일치를 근본적으로 제거합니다. 추가로 피처 재사용, 팀 협업, 피처 거버넌스도 중요한 가치입니다.

**A:** Point-in-Time Join은 **데이터 누수(Data Leakage)를 방지**합니다. 학습 데이터를 생성할 때, 각 이벤트 시점에서 실제로 사용 가능했던 피처 값만 조인해야 합니다. 미래 데이터가 포함되면 모델이 학습 시 비현실적으로 높은 성능을 보이지만, 프로덕션에서는 성능이 크게 저하됩니다.

**A:** Level 2(CI/CD for ML)에서 Level 3(자동 재학습)으로 넘어가려면 **자동 드리프트 감지 및 재학습 트리거** 시스템이 필요합니다. 데이터 드리프트, 예측 드리프트를 실시간으로 모니터링하고, 임계값 초과 시 자동으로 재학습 파이프라인을 실행하며, 챔피언/챌린저 비교를 통해 자동 승격합니다.

**A:** BentoML은 **프레임워크 독립적인 모델 패키징 도구**로, Docker 컨테이너를 생성하여 어디서든 배포할 수 있습니다. Seldon Core는 **Kubernetes 네이티브 서빙 플랫폼**으로, CRD 기반의 배포, A/B 테스트, Canary 배포, 설명 가능성(Explainer) 등 K8s 생태계와 깊이 통합됩니다.

**A:** (1) **데이터 드리프트**: 입력 피처 분포의 변화를 KS 검정, PSI(Population Stability Index) 등으로 감지. (2) **예측 드리프트**: 모델 출력 분포의 변화를 모니터링. (3) **성능 드리프트**: 실제 레이블과 비교하여 정확도, F1 등의 메트릭 저하를 추적. Evidently AI와 WhyLabs가 대표적인 도구입니다.

15. 참고 자료

1. Feast 공식 문서 - https://docs.feast.dev/

2. MLflow 공식 문서 - https://mlflow.org/docs/latest/index.html

3. Google MLOps Maturity Model - https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning

4. Evidently AI 문서 - https://docs.evidentlyai.com/

5. BentoML 공식 문서 - https://docs.bentoml.com/

6. Seldon Core 문서 - https://docs.seldon.io/

7. Kubeflow Pipelines - https://www.kubeflow.org/docs/components/pipelines/

8. WhyLabs 문서 - https://docs.whylabs.ai/

9. vLLM 프로젝트 - https://docs.vllm.ai/

10. Weights and Biases - https://docs.wandb.ai/

11. Tecton Feature Store - https://docs.tecton.ai/

12. Hopsworks Feature Store - https://docs.hopsworks.ai/

13. NVIDIA Triton Inference Server - https://docs.nvidia.com/deeplearning/triton-inference-server/

현재 단락 (1/845)

ML 모델 개발에서 가장 빈번한 장애 원인은 **학습 시점과 서빙 시점의 피처 불일치**입니다.

작성 글자: 0원문 글자: 25,520작성 단락: 0/845