Skip to content
Published on

Feast Feature Store本番運用ガイド:フィーチャーエンジニアリングからリアルタイムサービングと学習 - サービングスキュー防止まで

Authors
  • Name
    Twitter

Feast Feature Store

##入り

MLモデルが本番で失敗する原因の多くは、モデル自体ではなくフィーチャから来ています。学習時に計算したフィーチャとサービング時に照会されるフィーチャが異なる学習-サービングスキュー(Training-Serving Skew)、フィーチャパイプライン障害による stale データサービング、フィーチャ定義変更時のサブモデルとの互換性破壊などが代表的である。 Feature Storeは、これらの問題をアーキテクチャレベルで解決するために登場したインフラコンポーネントです。

Feast(Feature Store)は、2019年にGojekとGoogleによって開始されたオープンソースFeature Storeプロジェクトで、現在最も活発なコミュニティを保有している。この記事では、Faastを実稼働環境で実際に運営する際に遭遇する設計決定、アーキテクチャ選択、障害対応、および学習サービスのスキューを構造的に防止する戦略まで包括的に取り上げます。単純なチュートリアルレベルではなく、数十のFeature Viewと数百万のエンティティを管理するチームが参考にできる実践運用ガイドを目指す。

Feature Store ニーズ

フィーチャ管理の現実的な問題

Feature StoreなしでMLシステムを運営すると、次のような問題が繰り返し発生します。

  1. フィーチャロジック冗長:データサイエンティストがJupyter Notebookで作成したフィーチャ変換コードとバックエンドエンジニアがサービングサーバに実装したコードが微妙に異なる。同じavg_purchase_amount_30dというフィーチャなのに学習ではNULLを0で満たし、サービングでは平均値で埋める。
  2. フィーチャー発見性不在: チームAが既に計算しているuser_click_rate_7dフィーチャーをチームBが知らないで再計算する。組織内のフィーチャー資産は管理されていません。
  3. タイムトラベル(Time Travel)不可:「2週間前の時点で、このユーザーのフィーチャ値はいくらでしたか?」に答えることはできません。再学習やデバッグが不可能になる。
  4. サービングレイテンシ: モデル推論時に複数のデータソースからリアルタイムでフィーチャを計算すると、p99レイテンシが数百ミリ秒まで上昇します。

Feature Storeが解決するもの

Feature Storeは、**フィーチャーの単一の真実源(Single Source of Truth)**として機能します。 1つのフィーチャ定義でオフライン学習データとオンラインサービングデータの両方が生成されるため、ロジックミスマッチが発生する可能性があります。中央レジストリを使用してフィーチャを検索して再利用し、Point-in-Time Joinを使用して過去の時点のフィーチャを正確に再現できます。

Feast アーキテクチャ

Feastのアーキテクチャは主にオフラインストアオンラインストアレジストリ、およびフィーチャサーバの4つのコアコンポーネントで構成されています。

オフラインストア (Offline Store)

オフラインストアは、大量のヒストリカルフィーチャデータを保存し、学習データ生成時にPoint-in-Time Joinを実行する役割を果たす。 BigQuery、Snowflake、Redshift、S3/Parquetなど、大規模な分析用のデータウェアハウスやデータレイクをバックエンドとして使用する。読み取り性能が重要であり、数十TBの規模のデータを効率的にスキャンできる必要があります。

オンラインストア (Online Store)

オンラインストアは、リアルタイムサービングのための低遅延キー値ストアです。 Redis、DynamoDB、Bigtableなどがバックエンドとして使用され、エンティティキーに基づいて最新のフィーチャ値をp99から10ms以内に返す必要があります。マテリアライゼーションプロセスによって、オフラインストアのデータがオンラインストアにコピーされます。

レジストリ(Registry)

レジストリは、エンティティ、Feature View、Feature Serviceなどのメタデータを保存する中央カタログです。ファイルベース(ローカル、S3、GCS)やSQLベース(PostgreSQL、MySQL)で運用できる。本番では、SQLベースのレジストリをお勧めします。複数のチームが同時にfeast apply実行時に衝突を防ぐことができるからだ。

フィーチャーサーバー (Feature Server)

Feast組み込みフィーチャーサーバーは、REST / gRPCエンドポイントを介してオンラインフィーチャーにサービスを提供します。 Goベースの高性能サーバーで、モデルサービングインフラストラクチャ(KServe、Seldonなど)からサイドカーや独立サービスとして展開することができる。

インストールと初期設定

Feastのインストールとプロジェクトの初期化

# Feast 설치 (Redis, PostgreSQL 백엔드 포함)
pip install "feast[redis,postgres]"

# 새 프로젝트 초기화
feast init recommendation_features
cd recommendation_features

# 프로젝트 구조 확인
# recommendation_features/
#   feature_repo/
#     __init__.py
#     feature_store.yaml    # Feast 프로젝트 설정
#     entities.py            # 엔티티 정의
#     feature_views.py       # Feature View 정의
#     feature_services.py    # Feature Service 정의
#     data/                  # 로컬 테스트용 데이터

本番 feature_store.yaml 設定

# feature_store.yaml - 프로덕션 환경 구성
project: recommendation_platform
provider: local
entity_key_serialization_version: 3

registry:
  registry_type: sql
  path: postgresql://feast_user:${FEAST_REGISTRY_PASSWORD}@pg-registry.internal:5432/feast_registry
  cache_ttl_seconds: 60
  sqlalchemy_config_kwargs:
    pool_size: 10
    max_overflow: 20
    pool_pre_ping: true

offline_store:
  type: bigquery
  project_id: ml-platform-prod
  dataset: feast_offline_store
  location: asia-northeast3

online_store:
  type: redis
  connection_string: redis://:${REDIS_PASSWORD}@redis-feast.internal:6379
  key_ttl_seconds: 172800 # 48시간 TTL
  redis_type: redis_cluster

flags:
  alpha_features: true
  on_demand_transforms: true

この設定の注意点はcache_ttl_secondsだ。レジストリキャッシュTTLを長すぎるとfeast apply後の変更はすぐに反映されず、サービング障害が発生する可能性があります。逆に短すぎると、レジストリDBに負荷が集中します。プロダクションでは30~120秒が適切だ。

フィーチャ定義とエンティティ管理

エンティティの定義

エンティティは特徴の主体を表す。ユーザー、アイテム、注文など、ビジネスドメインのコアオブジェクトをエンティティとして定義する。

# entities.py
from feast import Entity, ValueType

# 유저 엔티티
user = Entity(
    name="user_id",
    description="서비스 유저의 고유 식별자. UUID v4 형식.",
    value_type=ValueType.STRING,
    tags={
        "owner": "ml-platform-team",
        "pii": "true",
        "domain": "user",
    },
)

# 아이템(상품) 엔티티
item = Entity(
    name="item_id",
    description="상품 카탈로그의 고유 식별자. 정수형 ID.",
    value_type=ValueType.INT64,
    tags={
        "owner": "recommendation-team",
        "domain": "catalog",
    },
)

# 유저-아이템 상호작용 엔티티 (복합 키)
user_item = Entity(
    name="user_item_id",
    description="유저-아이템 상호작용 복합 키. 'user_id:item_id' 형식.",
    value_type=ValueType.STRING,
    tags={
        "owner": "recommendation-team",
        "domain": "interaction",
    },
)

エンティティを設計する際の重要な原則は、1つのエンティティが1つのビジネスエンティティを代表することです。複合キーが必要な場合は別々のエンティティを定義しますが、結合性能を考慮してキーカーディナリティが過度に高くならないように注意してください。

Feature ViewとFeature Service

Feature Viewの定義

Feature Viewは、1つのデータソースから派生した関連フィーチャの論理グループです。オフライン/オンラインストアの両方で一貫した方法でフィーチャーを提供するコア抽象化です。

# feature_views.py
from datetime import timedelta
from feast import (
    FeatureView,
    Field,
    BigQuerySource,
    PushSource,
    StreamSource,
)
from feast.types import Float32, Float64, Int64, String, UnixTimestamp

# 유저 구매 통계 Feature View
user_purchase_source = BigQuerySource(
    name="user_purchase_stats_source",
    table="ml-platform-prod.feast_offline_store.user_purchase_stats",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_at",
    description="유저별 구매 통계 집계 테이블. 매시간 dbt로 갱신.",
)

user_purchase_stats = FeatureView(
    name="user_purchase_stats",
    entities=[user],
    ttl=timedelta(days=7),
    schema=[
        Field(name="total_purchases_7d", dtype=Int64, description="최근 7일 총 구매 건수"),
        Field(name="total_purchases_30d", dtype=Int64, description="최근 30일 총 구매 건수"),
        Field(name="avg_order_value_7d", dtype=Float64, description="최근 7일 평균 주문 금액"),
        Field(name="avg_order_value_30d", dtype=Float64, description="최근 30일 평균 주문 금액"),
        Field(name="max_order_value_30d", dtype=Float64, description="최근 30일 최대 주문 금액"),
        Field(name="purchase_frequency_score", dtype=Float32, description="구매 빈도 정규화 점수 (0~1)"),
        Field(name="last_purchase_days_ago", dtype=Int64, description="마지막 구매 후 경과 일수"),
    ],
    source=user_purchase_source,
    online=True,
    tags={
        "team": "recommendation",
        "sla_freshness_minutes": "60",
        "data_quality_owner": "data-eng@company.com",
    },
)

# 유저 세션 피처 (실시간 Push 소스 포함)
user_session_push_source = PushSource(
    name="user_session_push",
    batch_source=BigQuerySource(
        name="user_session_batch_source",
        table="ml-platform-prod.feast_offline_store.user_session_features",
        timestamp_field="event_timestamp",
    ),
)

user_session_features = FeatureView(
    name="user_session_features",
    entities=[user],
    ttl=timedelta(hours=24),
    schema=[
        Field(name="session_count_24h", dtype=Int64, description="최근 24시간 세션 수"),
        Field(name="avg_session_duration_min", dtype=Float32, description="평균 세션 시간(분)"),
        Field(name="pages_viewed_1h", dtype=Int64, description="최근 1시간 조회 페이지 수"),
        Field(name="last_active_minutes_ago", dtype=Int64, description="마지막 활동 후 경과 분"),
        Field(name="is_currently_active", dtype=Int64, description="현재 활성 세션 여부 (0/1)"),
    ],
    source=user_session_push_source,
    online=True,
    tags={
        "team": "recommendation",
        "sla_freshness_minutes": "5",
        "realtime": "true",
    },
)

On-Demand Feature View

学習とサービングの両方で同じ変換ロジックを適用する必要がある場合は、On-Demand Feature Viewを使用してください。これが学習サービスのスキューを防ぐための重要なメカニズムの1つです。

# on_demand_features.py
from feast import on_demand_feature_view, Field
from feast.types import Float32, Int64
import pandas as pd

@on_demand_feature_view(
    sources=[user_purchase_stats, user_session_features],
    schema=[
        Field(name="engagement_score", dtype=Float32),
        Field(name="purchase_recency_bucket", dtype=Int64),
        Field(name="activity_intensity", dtype=Float32),
    ],
    description="구매와 세션 피처를 결합하여 계산하는 파생 피처",
)
def user_engagement_features(inputs: pd.DataFrame) -> pd.DataFrame:
    """
    학습과 서빙에서 동일하게 실행되는 변환 로직.
    이 함수 안에서 정의된 로직은 get_historical_features()와
    get_online_features() 양쪽에서 동일하게 적용된다.
    """
    df = pd.DataFrame()

    # 참여도 점수: 구매 빈도와 세션 활동을 결합
    purchase_norm = inputs["purchase_frequency_score"].fillna(0.0)
    session_norm = (
        inputs["session_count_24h"].fillna(0).clip(upper=50) / 50.0
    )
    df["engagement_score"] = (0.6 * purchase_norm + 0.4 * session_norm).astype("float32")

    # 구매 최신성 버킷: 0(오늘), 1(1~3일), 2(4~7일), 3(8~14일), 4(15일+)
    days = inputs["last_purchase_days_ago"].fillna(999)
    df["purchase_recency_bucket"] = pd.cut(
        days,
        bins=[-1, 0, 3, 7, 14, float("inf")],
        labels=[0, 1, 2, 3, 4],
    ).astype("int64")

    # 활동 강도: 최근 1시간 페이지 뷰 기반
    df["activity_intensity"] = (
        inputs["pages_viewed_1h"].fillna(0).clip(upper=100) / 100.0
    ).astype("float32")

    return df

Feature Service 定義

Feature Serviceは、特定のモデルが使用するフィーチャセットを1つにまとめてバージョン管理する単位です。モデルとFeature Serviceを1:1にマッピングすると、どのモデルがどのフィーチャに依存するかが明らかになります。

# feature_services.py
from feast import FeatureService

# 추천 모델 v3용 Feature Service
recommendation_v3_service = FeatureService(
    name="recommendation_v3",
    features=[
        user_purchase_stats[["total_purchases_7d", "avg_order_value_30d", "purchase_frequency_score"]],
        user_session_features[["session_count_24h", "last_active_minutes_ago"]],
        user_engagement_features,  # On-Demand Feature View 전체 포함
    ],
    description="추천 모델 v3이 사용하는 피처 세트. 모델 버전 변경 시 새 FeatureService를 생성할 것.",
    tags={
        "model_version": "v3.2.1",
        "model_owner": "rec-team",
        "last_validated": "2026-03-07",
    },
)

# 이탈 예측 모델용 Feature Service
churn_prediction_service = FeatureService(
    name="churn_prediction_v1",
    features=[
        user_purchase_stats,  # 전체 피처 사용
        user_session_features[["session_count_24h", "avg_session_duration_min"]],
    ],
    description="이탈 예측 모델 v1용 피처 세트",
    tags={
        "model_version": "v1.0.0",
        "model_owner": "growth-team",
    },
)

オフライン学習データの生成

学習データを生成するときは、必ずPoint-in-Time Joinを使用する必要があります。これは、予測時点基準として、その時点で実際に知られたフィーチャ値のみを使用するという原則である。この原則を破るとFeature Leakageが発生し、モデルが学習環境では良い性能を見せるが、実際のサービングでは性能が大きく低下する。

# training_data_generator.py
from feast import FeatureStore
from datetime import datetime, timedelta
import pandas as pd

store = FeatureStore(repo_path="./feature_repo")

# 1. 엔티티 데이터프레임 준비: 예측 대상 + 예측 시점
# 각 row가 "이 유저에 대해 이 시점에 예측해야 했다"는 의미
entity_df = pd.DataFrame({
    "user_id": ["u_1001", "u_1002", "u_1003", "u_1004", "u_1005"],
    "event_timestamp": [
        datetime(2026, 2, 15, 10, 0, 0),
        datetime(2026, 2, 20, 14, 30, 0),
        datetime(2026, 2, 25, 9, 0, 0),
        datetime(2026, 3, 1, 11, 15, 0),
        datetime(2026, 3, 5, 16, 45, 0),
    ],
})

# 2. Feature Service를 통해 학습 데이터 조회
# Point-in-Time Join이 자동으로 적용됨
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=store.get_feature_service("recommendation_v3"),
).to_df()

print(f"학습 데이터 shape: {training_df.shape}")
print(f"컬럼: {list(training_df.columns)}")

# 3. 대규모 학습 데이터 생성 시 BigQuery에서 직접 실행
# 수백만 건의 엔티티 데이터는 BigQuery로 직접 읽어 효율적으로 처리
large_entity_df = pd.read_gbq(
    """
    SELECT
        user_id,
        prediction_timestamp AS event_timestamp
    FROM `ml-platform-prod.labels.recommendation_labels`
    WHERE prediction_timestamp BETWEEN '2026-01-01' AND '2026-03-01'
    """,
    project_id="ml-platform-prod",
)

training_df_large = store.get_historical_features(
    entity_df=large_entity_df,
    features=store.get_feature_service("recommendation_v3"),
).to_df()

# 4. 학습 데이터 품질 검증
assert training_df_large.isnull().sum().sum() / training_df_large.size < 0.05, \
    "NULL 비율이 5%를 초과합니다. 데이터 소스를 점검하세요."

training_df_large.to_parquet(
    "gs://ml-datasets/recommendation/training_2026_q1.parquet",
    index=False,
)

オンラインサービングパイプライン

Feast Feature Serverの展開

# online_serving.py
"""
모델 서빙 시 Feast에서 온라인 피처를 조회하는 클라이언트 코드.
KServe Transformer 또는 서빙 애플리케이션에서 사용한다.
"""
from feast import FeatureStore
from typing import Dict, List, Any
import time
import logging

logger = logging.getLogger(__name__)

class FeatureClient:
    """
    프로덕션용 Feast 온라인 피처 클라이언트.
    타임아웃, fallback, 메트릭 수집을 포함한다.
    """

    def __init__(
        self,
        repo_path: str = "./feature_repo",
        timeout_ms: int = 50,
        fallback_enabled: bool = True,
    ):
        self.store = FeatureStore(repo_path=repo_path)
        self.timeout_ms = timeout_ms
        self.fallback_enabled = fallback_enabled
        self._default_values = self._load_default_values()

    def _load_default_values(self) -> Dict[str, Any]:
        """Feature View별 기본값을 사전에 로드한다."""
        return {
            "total_purchases_7d": 0,
            "avg_order_value_30d": 0.0,
            "purchase_frequency_score": 0.0,
            "session_count_24h": 0,
            "last_active_minutes_ago": 1440,
            "engagement_score": 0.0,
            "purchase_recency_bucket": 4,
            "activity_intensity": 0.0,
        }

    def get_features_for_prediction(
        self, user_ids: List[str]
    ) -> Dict[str, List[Any]]:
        """
        추천 모델 추론을 위한 피처 조회.
        타임아웃 시 fallback 기본값을 반환한다.
        """
        entity_rows = [{"user_id": uid} for uid in user_ids]

        start_time = time.monotonic()
        try:
            result = self.store.get_online_features(
                features=self.store.get_feature_service("recommendation_v3"),
                entity_rows=entity_rows,
            ).to_dict()

            elapsed_ms = (time.monotonic() - start_time) * 1000
            logger.info(
                f"Feature retrieval: {len(user_ids)} entities, {elapsed_ms:.1f}ms"
            )

            if elapsed_ms > self.timeout_ms:
                logger.warning(
                    f"Feature retrieval exceeded SLA: {elapsed_ms:.1f}ms > {self.timeout_ms}ms"
                )

            return result

        except Exception as e:
            elapsed_ms = (time.monotonic() - start_time) * 1000
            logger.error(
                f"Feature retrieval failed after {elapsed_ms:.1f}ms: {e}"
            )

            if self.fallback_enabled:
                return self._build_fallback_response(user_ids)
            raise

    def _build_fallback_response(
        self, user_ids: List[str]
    ) -> Dict[str, List[Any]]:
        """피처 조회 실패 시 기본값으로 fallback 응답을 생성한다."""
        response = {"user_id": user_ids}
        for feat_name, default_val in self._default_values.items():
            response[feat_name] = [default_val] * len(user_ids)
        logger.warning(
            f"Returning fallback features for {len(user_ids)} entities"
        )
        return response


# 사용 예시
client = FeatureClient(repo_path="./feature_repo")
features = client.get_features_for_prediction(["u_1001", "u_1002"])

Materialization戦略

マテリアライゼーションは、オフラインストアのデータをオンラインストアにコピーするプロセスです。バッチサービングとリアルタイムサービングの要件によって戦略が異なります。

配置 Materialization

# 전체 Feature View를 특정 시간 범위로 materialize
feast materialize 2026-03-06T00:00:00 2026-03-07T00:00:00

# 점진적 materialization (마지막 실행 이후 ~ 현재)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

プログラムマテリアライゼーション(Airflow DAG)

# airflow_materialization_dag.py
"""
Airflow DAG: 1시간 간격으로 Feast Feature View를 materialize한다.
Feature View별로 독립적인 태스크로 실행하여 병렬 처리하고,
실패 시 개별 재시도가 가능하도록 구성한다.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from feast import FeatureStore

FEATURE_VIEWS_TO_MATERIALIZE = [
    "user_purchase_stats",
    "user_session_features",
    "item_popularity_stats",
]

def materialize_feature_view(feature_view_name: str, **context):
    """개별 Feature View를 materialize한다."""
    store = FeatureStore(repo_path="/opt/feast/feature_repo")

    end_date = context["data_interval_end"]
    start_date = end_date - timedelta(hours=2)  # 2시간 lookback (안전 마진)

    store.materialize(
        start_date=start_date,
        end_date=end_date,
        feature_views=[feature_view_name],
    )

    # 성공 메트릭 기록
    row_count = _verify_materialization(store, feature_view_name, end_date)
    context["ti"].xcom_push(
        key=f"{feature_view_name}_materialized_rows",
        value=row_count,
    )

def _verify_materialization(store, fv_name, end_date):
    """Materialization 결과를 검증한다."""
    # 샘플 엔티티로 온라인 스토어 조회 테스트
    sample_result = store.get_online_features(
        features=[f"{fv_name}:total_purchases_7d"],
        entity_rows=[{"user_id": "u_health_check"}],
    ).to_dict()
    return len(sample_result.get("user_id", []))

with DAG(
    dag_id="feast_materialization",
    schedule_interval="0 * * * *",  # 매시간
    start_date=datetime(2026, 3, 1),
    catchup=False,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "execution_timeout": timedelta(minutes=30),
    },
    tags=["feast", "feature-store", "materialization"],
) as dag:

    for fv_name in FEATURE_VIEWS_TO_MATERIALIZE:
        PythonOperator(
            task_id=f"materialize_{fv_name}",
            python_callable=materialize_feature_view,
            op_args=[fv_name],
        )

マテリアライゼーションサイクル決定ガイド

フィーチャータイプ変更頻度推奨マテリアライゼーションサイクル
静的プロファイルほとんど変更なし1日1回ユーザー登録日、性別、地域
毎日の集計1日1回更新日1〜2回30日間の平均購入額、週間訪問数

|時間別集計時間単位の更新毎時間過去24時間のセッション数 |リアルタイムフィーチャー秒単位の更新Pushベース/ストリーミング|現在のセッションがアクティブかどうか、最近クリック

##ラーニング - サービングスキューの防止

Training-Serving Skewは、MLシステムで最も巧妙で発見するのが難しいバグです。モデルが学習時に見たフィーチャ分布とサービング時に受けるフィーチャ分布が異なり、性能が低下する現象だ。 Feastを使用すると自動的に解決されるのではなく、意識的な設計と検証が必要です。

スキュー発生の主な原因

  1. 変換ロジック二重実装: 学習用SQLとサービング用Pythonコードが分離して微妙に異なる。
  2. Materialization 遅延: オンラインストアに古い特徴が残っている。
  3. NULL 処理の不一致: オフラインでは NULL を削除し、オンラインでは 0 で埋める。
  4. タイムゾーン差: オフラインはUTC、オンラインはKSTで計算し、「最近7日」の範囲が変わる。
  5. Feature Leakage: 学習時に未来データが組み込まれる。

構造的防止戦略

# skew_prevention_test.py
"""
학습-서빙 스큐를 자동으로 탐지하는 CI 테스트.
정기적으로 실행하여 온라인-오프라인 값의 일관성을 검증한다.
"""
import pytest
import pandas as pd
import numpy as np
from datetime import datetime
from feast import FeatureStore

@pytest.fixture(scope="module")
def store():
    return FeatureStore(repo_path="./feature_repo")

@pytest.fixture(scope="module")
def sample_entity_ids():
    """검증용 샘플 엔티티. 프로덕션 엔티티 중 랜덤 샘플링."""
    return ["u_1001", "u_1002", "u_1003", "u_1005", "u_1010"]

FEATURE_LIST = [
    "user_purchase_stats:total_purchases_7d",
    "user_purchase_stats:avg_order_value_30d",
    "user_purchase_stats:purchase_frequency_score",
    "user_session_features:session_count_24h",
]

def test_online_offline_consistency(store, sample_entity_ids):
    """
    온라인 스토어 값과 오프라인 스토어 최신 값이 일치하는지 검증.
    Materialization이 정상적으로 동작하면 두 값은 동일해야 한다.
    """
    # 온라인 조회
    online_result = store.get_online_features(
        features=FEATURE_LIST,
        entity_rows=[{"user_id": eid} for eid in sample_entity_ids],
    ).to_dict()

    # 오프라인 조회 (현재 시점 기준)
    entity_df = pd.DataFrame({
        "user_id": sample_entity_ids,
        "event_timestamp": [datetime.utcnow()] * len(sample_entity_ids),
    })

    offline_result = store.get_historical_features(
        entity_df=entity_df,
        features=FEATURE_LIST,
    ).to_df()

    # 각 피처별 온라인-오프라인 값 비교
    for feat in FEATURE_LIST:
        feat_short = feat.split(":")[1]
        for i, eid in enumerate(sample_entity_ids):
            online_val = online_result[feat_short][i]
            offline_val = offline_result.loc[
                offline_result["user_id"] == eid, feat_short
            ].iloc[0]

            if online_val is None and pd.isna(offline_val):
                continue  # 둘 다 NULL이면 일치

            if isinstance(online_val, float):
                assert np.isclose(online_val, offline_val, rtol=1e-5), (
                    f"Skew detected for {eid}.{feat_short}: "
                    f"online={online_val}, offline={offline_val}"
                )
            else:
                assert online_val == offline_val, (
                    f"Skew detected for {eid}.{feat_short}: "
                    f"online={online_val}, offline={offline_val}"
                )

def test_null_handling_consistency(store, sample_entity_ids):
    """
    NULL 처리가 온라인과 오프라인에서 동일한지 검증.
    존재하지 않는 엔티티를 조회했을 때 양쪽의 동작이 같아야 한다.
    """
    ghost_entities = ["u_nonexistent_001", "u_nonexistent_002"]

    online_result = store.get_online_features(
        features=FEATURE_LIST,
        entity_rows=[{"user_id": eid} for eid in ghost_entities],
    ).to_dict()

    for feat in FEATURE_LIST:
        feat_short = feat.split(":")[1]
        for val in online_result[feat_short]:
            assert val is None, (
                f"Non-existent entity should return None, got {val} for {feat_short}"
            )

スキュー検出ダッシュボード指標

指標正常範囲通知しきい値説明
Online-Offline Parity Rate100%99.5%未満オンライン/オフライン値の一致率
Feature Distribution Drift(KL Divergence)0.01未満0.05超学習時点に対するサービス提供時点の特徴分布の変化
NULL Rate Difference0%1%超オンライン/オフラインNULL比の違い
Materialization Lag p9560分未満120分超マテリアライゼーションが完了するまでの時間
Stale Feature Rate0%5%超TTLオーバーフィーチャー比

ストリーミング機能の統合

配置マテリアライゼーションだけではリアルタイムフィーチャ(現在のセッション状態、直前クリックなど)をサービングできない。 FeastのPush APIとKafkaストリーミングを組み合わせて、リアルタイムフィーチャーをオンラインストアに反映します。

# streaming_feature_ingest.py
"""
Kafka에서 유저 세션 이벤트를 소비하여 Feast Online Store에
실시간으로 반영하는 스트리밍 워커.
Faust(Python stream processing) 또는 Kafka Consumer로 구현한다.
"""
import json
from datetime import datetime
from confluent_kafka import Consumer, KafkaError
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="./feature_repo")

consumer = Consumer({
    "bootstrap.servers": "kafka-broker:9092",
    "group.id": "feast-session-materializer",
    "auto.offset.reset": "latest",
    "enable.auto.commit": False,
    "max.poll.interval.ms": 300000,
    "session.timeout.ms": 45000,
})
consumer.subscribe(["user-session-events"])

MICRO_BATCH_SIZE = 200
FLUSH_INTERVAL_SEC = 5
buffer = []
last_flush = datetime.utcnow()

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is not None and not msg.error():
            event = json.loads(msg.value().decode("utf-8"))
            buffer.append({
                "user_id": event["user_id"],
                "session_count_24h": event["session_count_24h"],
                "avg_session_duration_min": event["avg_session_duration_min"],
                "pages_viewed_1h": event["pages_viewed_1h"],
                "last_active_minutes_ago": 0,  # 방금 활동했으므로 0
                "is_currently_active": 1,
                "event_timestamp": datetime.utcnow(),
            })
        elif msg is not None and msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                raise Exception(f"Kafka error: {msg.error()}")

        elapsed = (datetime.utcnow() - last_flush).total_seconds()
        should_flush = (
            len(buffer) >= MICRO_BATCH_SIZE
            or (buffer and elapsed >= FLUSH_INTERVAL_SEC)
        )

        if should_flush:
            df = pd.DataFrame(buffer)
            # PushSource를 통해 온라인 + 오프라인 스토어 동시 기록
            store.push(
                push_source_name="user_session_push",
                df=df,
                to=store.PushMode.ONLINE_AND_OFFLINE,
            )
            consumer.commit()
            buffer.clear()
            last_flush = datetime.utcnow()

finally:
    consumer.close()

Feature Storeの比較

アイテムFeastTectonHopsworksDatabricks Feature Store
ライセンスApache 2.0(オープンソース)商用(SaaS / Self-hosted)AGPL +商用Databricksプラットフォームに依存
配布方法Self-managedManaged SaaS自己管理/管理Databricks Workspace内蔵
オフラインストアBigQuery、Snowflake、Redshift、S3 Parquet、Fileスパーク、スノーフレーク、リフトHopsFS、S3デルタ湖(ユニティカタログ)
オンラインストアRedis、DynamoDB、Bigtable、PostgreSQL、SQLite DynamoDB(組み込み最適化)RonDB(MySQL NDB Cluster)Cosmos DB、DynamoDB
ストリーミングサポートPush API、Kafka(手動実装)ネイティブスパークストリーミングネイティブKafka / Sparkスパーク構造ストリーミング
変換エンジンOn-Demand(Python)、dbt連携Tecton SDK(Spark / Rift)Hopsworks Feature PipelineSpark UDF
Feature RegistrySQL / Fileベース内蔵(豊富なUI)組み込み(Hopsworks UI)Unity Catalogの統合
学習 - サービングスキュー防止オンデマンドFV、ポイントインタイムジョインTime-Travel、マテリアライゼーション保証Provenance追跡タイムトラベル(デルタ湖)
コミュニティ活動度非常に高い(GitHub 5.5k + stars)中(商用中心)高(学術/オープンソース)高(Databricksエコシステム)
適切なチームクラウド非依存、カスタマイジングニーズチームエンタープライズ、完全管理優先チームオンプレム、GPU学習中心チームすでにDatabricksを使用しているチーム

モニタリング

###コアモニタリング指標

プロダクションFeature Storeを運営する際に必ず追跡しなければならない指標がある。

  1. Freshness: オンラインストアのフィーチャーがどれほど最新か。현재시각 - 피처의 event_timestampで測定する。
  2. Serving Latency: オンライン機能照会の p50/p95/p99 待ち時間。 SLAは通常p99 < 50ms。
  3. Materialization Success Rate: Materialization ジョブの成功/失敗率。
  4. Feature Staleness Rate: TTLを超えたフィーチャーの割合。
  5. Online Store Hit Rate: 照会要求中に実際の値が返された割合 (NULL 以外の応答率)。
# feast_metrics_exporter.py
"""
Feast 상태를 Prometheus 메트릭으로 내보내는 exporter.
Kubernetes CronJob으로 5분 간격 실행을 권장한다.
"""
from prometheus_client import Gauge, Histogram, start_http_server
from feast import FeatureStore
import time

# Prometheus 메트릭 정의
FRESHNESS_GAUGE = Gauge(
    "feast_feature_view_freshness_seconds",
    "Feature View의 온라인 스토어 freshness (초)",
    ["feature_view", "project"],
)

MATERIALIZATION_LAG = Gauge(
    "feast_materialization_lag_seconds",
    "마지막 Materialization 이후 경과 시간 (초)",
    ["feature_view"],
)

ONLINE_STORE_LATENCY = Histogram(
    "feast_online_store_latency_seconds",
    "온라인 스토어 조회 레이턴시",
    ["feature_view"],
    buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5],
)

def collect_metrics():
    store = FeatureStore(repo_path="./feature_repo")
    feature_views = store.list_feature_views()

    for fv in feature_views:
        # Freshness 측정
        try:
            start = time.monotonic()
            result = store.get_online_features(
                features=[f"{fv.name}:{fv.features[0].name}"],
                entity_rows=[{"user_id": "u_health_check"}],
            ).to_dict()
            elapsed = time.monotonic() - start

            ONLINE_STORE_LATENCY.labels(feature_view=fv.name).observe(elapsed)
        except Exception:
            pass

if __name__ == "__main__":
    start_http_server(8000)
    while True:
        collect_metrics()
        time.sleep(300)  # 5분 간격

トラブルシューティング

問題 1: feast apply 後にオンライン サービングでフィーチャが見つかりません

증상: feast apply 성공 후 get_online_features()에서 KeyError 발생
에러: KeyError: "Feature view 'user_purchase_stats_v2' not found in registry"
원인: 레지스트리 캐시 TTL 내에 오래된 메타데이터를 참조

해결:
  1. 레지스트리 캐시를 즉시 갱신:
     store = FeatureStore(repo_path="./feature_repo")
     store.refresh_registry()
  2. 배포 프로세스에 registry refresh를 포함
  3. cache_ttl_seconds를 60초 이하로 설정

問題2:マテリアライゼーション中のメモリ不足

증상: feast materialize 실행 중 OOMKilled
에러: Container killed due to OOM (exit code 137)
원인: 대규모 Feature View를 한 번에 materialize할 때 전체 데이터를
      메모리에 로드하려 함

해결:
  1. 시간 범위를 좁혀서 분할 실행:
     feast materialize 2026-03-06T00:00:00 2026-03-06T06:00:00
     feast materialize 2026-03-06T06:00:00 2026-03-06T12:00:00
  2. Feature View별로 개별 materialize
  3. Kubernetes Job의 메모리 limit 증가 (최소 4Gi 권장)
  4. 배치 크기를 줄이는 환경변수 설정:
     FEAST_BATCH_MATERIALIZATION_MAX_WORKERS=2

問題3:Redisオンラインストア接続タイムアウトが急増

증상: 서빙 레이턴시 p99가 200ms 이상으로 급증
에러: redis.exceptions.TimeoutError: Timeout reading from socket
원인: Redis 클러스터 리밸런싱 또는 단일 노드 과부하

해결:
  1. Redis 클러스터 노드별 메모리/CPU 확인
  2. 핫 키 분석: MONITOR 또는 redis-cli --hotkeys로 확인
  3. connection_string에 timeout 파라미터 추가:
     redis://host:6379?socket_timeout=0.1&socket_connect_timeout=0.1
  4. 서빙 코드에 circuit breaker + fallback 패턴 적용

操作時の注意事項

TTLデザイン

Feature Viewのttl値は、「この機能がどれだけ古いものまで有効か」を定義します。 TTLを短すぎると、マテリアライゼーションサイクルと一致しないため、オンライン照会時にNULLが返されます。 TTLは必ずマテリアライゼーションサイクルの2倍以上に設定してください。たとえば、毎時間マテリアライズすると、TTLは少なくとも2時間以上でなければなりません。

エンティティキーの設計

エンティティキーのカーディナリティがオンラインストアのサイズを決定します。ユーザー数が1億人でFeature Viewが10個の場合、オンラインストアには10億個のキーと値のペアが格納される。 Redisベースでキーあたり約500バイトを占めると仮定すると、約500GBのメモリが必要です。非アクティブユーザーをTTLで自動的に期限切れにするか、最近N日以内に活動したユーザーだけをmaterializeする戦略が必要である。

feast applyと配布順序

Feature Viewスキーマを変更するときは、下位互換性を維持する必要があります。新しいフィーチャーを追加するのは安全ですが、既存のフィーチャーを削除したりタイプを変更したりすると、依存モデルはすぐに障害を引き起こします。配布順序は次のとおりです。

  1. 新しいフィーチャーを追加した Feature View 定義の作成 2.feast applyでレジストリを更新
  2. Materializationを実行してオンラインストアに新しいフィーチャーをロードする
  3. 新しいフィーチャーを使用したモデルのデプロイ
  4. 以前の機能が使用されなくなった場合は、次のリリースから削除

失敗事例と回復

ケース 1: Materialization 障害で 48 時間 stale フィーチャーをサービス

상황: Airflow DAG 장애로 Materialization이 48시간 중단됨.
       동안 온라인 스토어에는 이틀 전 피처가 서빙됨.
영향: 추천 모델의 CTR15% 하락.
      "최근 24시간 세션 수"48시간 전 값이었기 때문.

복구 절차:
  1. Airflow DAG 원인 분석 및 수정 (BigQuery 인증 토큰 만료)
  2. 수동 backfill materialization 실행:
     feast materialize 2026-03-05T00:00:00 2026-03-07T12:00:00
  3. 온라인 스토어 freshness 확인 후 알림 해제
  4. 사후 조치: Materialization 실패 시 30분 내 PagerDuty 알림 설정

교훈: Materialization 상태를 별도로 모니터링하지 않으면
      모델 성능 저하로만 발견되어 대응이 늦어진다.

ケース 2: Feature View スキーマ変更で完全なサービングダウン

상황: "avg_order_value_30d" 피처의 타입을 Float64에서 Int64로 변경하고
      feast apply를 실행한 직후, 서빙 중인 추천 모델에서 타입 에러 발생.
영향: 5분간 추천 API 500 에러. 긴급 롤백 수행.

복구 절차:
  1. git revert로 Feature View 정의를 이전 버전으로 복원
  2. feast apply 재실행
  3. 서빙 정상화 확인

교훈: Feature View 스키마 변경은 반드시 staging 환경에서 먼저 테스트하고,
      의존 모델의 호환성 테스트를 통과한 후에만 프로덕션에 적용해야 한다.

ケース3:プッシュベースのストリーミング機能の重複イベントの処理

상황: Kafka consumer의 at-least-once 보장으로 인해 동일 이벤트가
      2회 처리되어 session_count_24h가 실제보다 높게 기록됨.
영향: 일부 유저의 engagement_score가 비정상적으로 높아져
      추천 결과 왜곡 발생.

복구 절차:
  1. 중복 이벤트 탐지 로직 추가 (이벤트 ID 기반 dedup)
  2. 영향받은 엔티티 식별 후 재 materialize
  3. Kafka consumer에 idempotent 처리 추가

교훈: 스트리밍 피처 파이프라인에는 반드시 멱등성(idempotency) 보장이
      필요하다. 이벤트 IDRedis Set으로 관리하여 중복을 제거한다.

チェックリスト

###初期構築チェックリスト

  • 本番feature_store.yaml設定完了(SQLレジストリ、Redisオンラインストア)
  • [] エンティティ定義と命名規則の確立
  • Feature View定義とTTL設定
  • Feature Serviceとモデルバージョンのマッピング
  • [ ]feast applyをCI/CDパイプラインに統合
  • Materialization DAGの構築とスケジューリング
  • [ ]オンラインストア容量の見積もりとプロビジョニング

日常業務チェックリスト

  • マテリアライゼーションが成功したかどうかを毎日チェックする
  • [ ]オンラインストアfreshness SLA準拠を確認
  • Feature View変更時の下位互換性検証
  • [ ]オンライン - オフラインパリティテスト週間実行
  • [ ]オンラインストアメモリ使用量の傾向を監視する
  • [ ]フィーチャーサービングレイテンシp99追跡

Feature View 変更配布チェックリスト

  • staging環境でfeast applyとマテリアライゼーションテスト
  • [ ]スキーマ互換性テスト(依存モデル全体)
  • Point-in-Time Leakage 検証パス
  • Online-Offline Parityテストに合格
  • 本番feast apply実行
  • Materialization 正常完了確認
  • サービングレイテンシおよびエラー率10分間観察
  • ロールバック手順文書化およびテスト完了

##参考資料