Skip to content
Published on

Feature Store設計と運用ガイド: Feast基盤Online/Offline Store構築·MLフィーチャーパイプライン自動化

Authors
  • Name
    Twitter
Feature Store with Feast

はじめに

機械学習モデルのプロダクションデプロイが一般化するにつれ、フィーチャー(Feature)管理がMLOpsの核心課題として浮上しています。モデル学習に使用したフィーチャーをリアルタイムサービングでも同一に再現する必要があり、複数のチームが同一のフィーチャーを重複計算しないように共有する必要があり、フィーチャーの品質と鮮度を継続的にモニタリングする必要があります。

Feature Storeはこうした問題を解決するために登場したインフラレイヤーで、フィーチャーの定義、保存、サービング、モニタリングを中央で管理します。その中でFeast(Feature Store)は最も広く使用されているオープンソースFeature Storeで、既存のデータインフラを再利用しながら柔軟なフィーチャーサービングを提供します。

この記事では、Feature Storeの核心概念、Feastアーキテクチャ、フィーチャー定義とEntity設計、Materializationパイプライン、Online/Offline Store設定、Training-Serving Skew防止戦略、Feature Monitoring、Tecton/Hopsworksとの比較、プロダクションデプロイパターン、障害対応まで全過程を解説します。

Feature Store核心概念

なぜFeature Storeが必要なのか

Feature Storeなしでパイプラインを運用すると以下の問題が発生します。

問題説明影響
Training-Serving Skew学習とサービングでフィーチャー計算ロジックが異なるモデル性能低下
フィーチャー重複計算チームごとに同一フィーチャーを各自実装コンピューティングリソースの浪費
データ漏洩未来のデータが学習に含まれる過学習、誤った評価
フィーチャー発見の困難どのフィーチャーが存在するか分からない開発生産性低下
サービングレイテンシリアルタイムでフィーチャーを計算すると遅延が発生ユーザー体験の悪化

Online Store vs Offline Store

Feature Storeは2つのストレージを基本として持ちます。

項目Online StoreOffline Store
用途リアルタイム推論モデル学習、バッチ推論
レイテンシ1〜10ms秒〜分
データ範囲最新値のみ全履歴
ストレージ例Redis, DynamoDBBigQuery, Redshift, S3
クエリパターンKey-Value検索SQL/DataFrame検索
データ量GBレベルTB〜PBレベル
一貫性結果整合性強い一貫性

Feature FreshnessとConsistency

フィーチャーの鮮度(Freshness)は、どれほど最新のデータを反映しているかを示します。

  • バッチフィーチャー: 1時間〜1日単位で更新(例: ユーザーの直近30日間の購入回数)
  • ストリーミングフィーチャー: 秒〜分単位で更新(例: 直近5分間の取引金額)
  • リアルタイムフィーチャー: リクエスト時に計算(例: 現在のセッションのクリック数)

Feastアーキテクチャ

全体構造

Feastは以下のコンポーネントで構成されています。

# feature_store.yaml - Feast project configuration
project: fraud_detection
registry: gs://ml-feature-store/registry.db
provider: gcp

offline_store:
  type: bigquery
  dataset: feature_store

online_store:
  type: redis
  connection_string: 'redis-cluster.internal:6379'
  redis_type: redis_cluster

entity_key_serialization_version: 2
コンポーネント役割説明
Feature Registryフィーチャーメタデータ管理フィーチャー定義、エンティティ、データソース情報を保存
Offline Store履歴データ保存BigQuery、Redshift、Sparkなどから学習データを抽出
Online Store最新フィーチャーサービングRedis、DynamoDBなどからリアルタイム検索
Feature ServerREST APIサービングFastAPIベースの低レイテンシフィーチャーサービングエンドポイント
Materialization Engineデータ同期Offline StoreからOnline Storeへフィーチャーをコピー

Feature定義とEntity設計

# features/fraud_detection.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, BigQuerySource
from feast.types import Float32, Int64, String

# Entity definition - the target that features are linked to
user_entity = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="Unique identifier for a user",
)

merchant_entity = Entity(
    name="merchant_id",
    join_keys=["merchant_id"],
    description="Unique identifier for a merchant",
)

# Data Source definition
user_transactions_source = BigQuerySource(
    name="user_transactions",
    table="ml_data.user_transaction_features",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

merchant_stats_source = BigQuerySource(
    name="merchant_stats",
    table="ml_data.merchant_statistics",
    timestamp_field="event_timestamp",
)

# Feature View definition - a group of features
user_transaction_features = FeatureView(
    name="user_transaction_features",
    entities=[user_entity],
    ttl=timedelta(days=7),
    schema=[
        Field(name="transaction_count_7d", dtype=Int64),
        Field(name="transaction_amount_avg_7d", dtype=Float32),
        Field(name="transaction_amount_max_7d", dtype=Float32),
        Field(name="unique_merchants_7d", dtype=Int64),
        Field(name="avg_time_between_transactions", dtype=Float32),
    ],
    source=user_transactions_source,
    online=True,
    tags={
        "team": "fraud-detection",
        "version": "v2",
    },
)

merchant_risk_features = FeatureView(
    name="merchant_risk_features",
    entities=[merchant_entity],
    ttl=timedelta(days=30),
    schema=[
        Field(name="chargeback_rate_30d", dtype=Float32),
        Field(name="avg_transaction_amount", dtype=Float32),
        Field(name="total_transactions_30d", dtype=Int64),
        Field(name="risk_score", dtype=Float32),
    ],
    source=merchant_stats_source,
    online=True,
    tags={
        "team": "fraud-detection",
        "version": "v1",
    },
)

Feature Service定義

# features/services.py
from feast import FeatureService

fraud_detection_service = FeatureService(
    name="fraud_detection_v2",
    features=[
        user_transaction_features,
        merchant_risk_features,
    ],
    tags={
        "model": "fraud_detector_v2",
        "owner": "ml-team",
    },
)

Materializationパイプライン

バッチMaterialization

# Apply Feature Registry with Feast CLI
feast apply

# Run batch materialization
feast materialize 2026-03-01T00:00:00 2026-03-12T00:00:00

# Incremental materialization (since last run)
feast materialize-incremental 2026-03-12T00:00:00

Airflowを活用した自動化

# dags/feast_materialization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

default_args = {
    "owner": "ml-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "feast_materialization",
    default_args=default_args,
    description="Daily feature materialization pipeline",
    schedule_interval="0 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["feast", "mlops"],
)

# Validate feature source data
validate_sources = PythonOperator(
    task_id="validate_sources",
    python_callable=lambda: __import__("feast").FeatureStore(
        repo_path="/opt/feast/feature_repo"
    ),
    dag=dag,
)

# Run materialization
materialize = BashOperator(
    task_id="materialize_features",
    bash_command="""
        cd /opt/feast/feature_repo && \
        feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S)
    """,
    dag=dag,
)

# Validate Online Store consistency
validate_online = PythonOperator(
    task_id="validate_online_store",
    python_callable=lambda: print("Validating online store consistency..."),
    dag=dag,
)

validate_sources >> materialize >> validate_online

Online Storeバックエンド設定

RedisベースOnline Store

# feature_store.yaml - Redis configuration
project: fraud_detection
registry: gs://ml-feature-store/registry.db
provider: gcp

online_store:
  type: redis
  connection_string: 'redis-cluster.internal:6379,redis-cluster.internal:6380,redis-cluster.internal:6381'
  redis_type: redis_cluster
  key_ttl_seconds: 604800 # 7 days

DynamoDBベースOnline Store

# feature_store.yaml - DynamoDB configuration
project: fraud_detection
registry: s3://ml-feature-store/registry.db
provider: aws

online_store:
  type: dynamodb
  region: ap-northeast-2
  table_name_template: 'feast_online_{project}_{table}'

Online Storeバックエンド比較

項目RedisDynamoDBPostgreSQL
レイテンシ0.5〜2ms1〜5ms2〜10ms
スケーラビリティ手動(クラスタ)自動手動
コストモデルインスタンスベースリクエストベースインスタンスベース
TTLサポートネイティブネイティブ手動実装
運用負担中程度低い中程度
適した環境超低レイテンシが必要サーバーレス、AWS小規模、コスト重視

Offline Store設定

BigQueryベースOffline Store

# feature_store.yaml - BigQuery configuration
project: fraud_detection
registry: gs://ml-feature-store/registry.db
provider: gcp

offline_store:
  type: bigquery
  dataset: feature_store
  location: asia-northeast3

学習データ生成(Point-in-Time Join)

# training_data.py
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="./feature_repo")

# Entity DataFrame - timestamps and entities for training
entity_df = pd.DataFrame({
    "user_id": ["user_001", "user_002", "user_003", "user_001"],
    "merchant_id": ["merch_100", "merch_200", "merch_100", "merch_300"],
    "event_timestamp": pd.to_datetime([
        "2026-03-01 10:00:00",
        "2026-03-02 14:30:00",
        "2026-03-03 09:15:00",
        "2026-03-05 16:45:00",
    ]),
    "label": [0, 1, 0, 1],  # fraud label
})

# Extract features with point-in-time correctness
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_transaction_features:transaction_count_7d",
        "user_transaction_features:transaction_amount_avg_7d",
        "user_transaction_features:transaction_amount_max_7d",
        "user_transaction_features:unique_merchants_7d",
        "merchant_risk_features:chargeback_rate_30d",
        "merchant_risk_features:risk_score",
    ],
).to_df()

print(training_df.head())
print(f"Training data shape: {training_df.shape}")

Onlineフィーチャー取得(リアルタイム推論)

# inference.py
from feast import FeatureStore

store = FeatureStore(repo_path="./feature_repo")

# Real-time feature retrieval
feature_vector = store.get_online_features(
    features=[
        "user_transaction_features:transaction_count_7d",
        "user_transaction_features:transaction_amount_avg_7d",
        "user_transaction_features:unique_merchants_7d",
        "merchant_risk_features:chargeback_rate_30d",
        "merchant_risk_features:risk_score",
    ],
    entity_rows=[
        {"user_id": "user_001", "merchant_id": "merch_100"},
    ],
).to_dict()

print(feature_vector)
# Example output:
# {
#   "user_id": ["user_001"],
#   "merchant_id": ["merch_100"],
#   "transaction_count_7d": [23],
#   "transaction_amount_avg_7d": [45000.5],
#   "unique_merchants_7d": [8],
#   "chargeback_rate_30d": [0.02],
#   "risk_score": [0.15]
# }

Training-Serving Skew防止戦略

Training-Serving Skewは、MLモデル性能を低下させる最も一般的な原因の一つです。

Skew発生原因と対応

原因説明対応方法
フィーチャー計算ロジック不一致学習/サービングで異なるコードを使用Feature Storeで単一ソース化
データ漏洩未来のデータが学習に含まれるPoint-in-Time Join適用
フィーチャー鮮度の差異バッチvsリアルタイム更新周期の違いTTL管理とFreshnessモニタリング
スキーマ変更フィーチャー定義が変更されたFeature Registryバージョン管理
NULL処理の違いデフォルト値処理方式の不一致統一されたデフォルト値ポリシー設定

FeastによるSkew防止

# skew_detection.py
import pandas as pd
import numpy as np
from feast import FeatureStore
from scipy import stats

store = FeatureStore(repo_path="./feature_repo")

def detect_training_serving_skew(
    feature_name: str,
    training_values: pd.Series,
    sample_size: int = 1000,
):
    """Compare feature distributions between training data and Online Store."""
    # Sample from Online Store
    online_features = []
    entity_rows = [{"user_id": f"user_{i:04d}"} for i in range(sample_size)]

    online_result = store.get_online_features(
        features=[feature_name],
        entity_rows=entity_rows,
    ).to_df()

    serving_values = online_result[feature_name.split(":")[-1]].dropna()

    # KS test for distribution comparison
    ks_stat, p_value = stats.ks_2samp(
        training_values.dropna(),
        serving_values,
    )

    # PSI (Population Stability Index) calculation
    psi = calculate_psi(training_values.dropna(), serving_values)

    return {
        "feature": feature_name,
        "ks_statistic": ks_stat,
        "p_value": p_value,
        "psi": psi,
        "skew_detected": psi > 0.2 or p_value < 0.05,
    }


def calculate_psi(expected, actual, bins=10):
    """Calculate Population Stability Index (PSI)."""
    breakpoints = np.linspace(
        min(expected.min(), actual.min()),
        max(expected.max(), actual.max()),
        bins + 1,
    )

    expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)

    # Avoid division by zero
    expected_counts = np.clip(expected_counts, 0.001, None)
    actual_counts = np.clip(actual_counts, 0.001, None)

    psi = np.sum(
        (actual_counts - expected_counts) * np.log(actual_counts / expected_counts)
    )
    return psi

Feature MonitoringとDrift Detection

モニタリング指標

# monitoring/feature_monitor.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import pandas as pd

@dataclass
class FeatureStats:
    feature_name: str
    timestamp: datetime
    mean: float
    std: float
    min_val: float
    max_val: float
    null_rate: float
    unique_count: int
    p99_latency_ms: Optional[float] = None

def compute_feature_stats(df: pd.DataFrame, feature_name: str) -> FeatureStats:
    """Compute statistical information for a feature."""
    series = df[feature_name]
    return FeatureStats(
        feature_name=feature_name,
        timestamp=datetime.utcnow(),
        mean=series.mean(),
        std=series.std(),
        min_val=series.min(),
        max_val=series.max(),
        null_rate=series.isnull().sum() / len(series),
        unique_count=series.nunique(),
    )

def check_drift_alerts(
    current: FeatureStats,
    baseline: FeatureStats,
    thresholds: dict,
) -> list:
    """Check for feature drift alerts."""
    alerts = []

    # Check mean change rate
    if baseline.mean != 0:
        mean_change = abs(current.mean - baseline.mean) / abs(baseline.mean)
        if mean_change > thresholds.get("mean_change", 0.3):
            alerts.append(
                f"Mean drift detected: {baseline.mean:.4f} -> {current.mean:.4f} "
                f"(change: {mean_change:.2%})"
            )

    # NULL rate change
    null_diff = abs(current.null_rate - baseline.null_rate)
    if null_diff > thresholds.get("null_rate_change", 0.05):
        alerts.append(
            f"Null rate change: {baseline.null_rate:.4f} -> {current.null_rate:.4f}"
        )

    # Range anomaly
    if current.max_val > baseline.max_val * thresholds.get("max_multiplier", 2.0):
        alerts.append(
            f"Max value anomaly: {current.max_val} (baseline max: {baseline.max_val})"
        )

    return alerts

Feature Storeソリューション比較

項目FeastTectonHopsworks
ライセンスApache 2.0(オープンソース)商用(マネージド)AGPL + 商用
アーキテクチャモジュラー型、プラガブルマネージド、エンドツーエンド統合プラットフォーム
リアルタイムフィーチャー制限的ネイティブサポートサポート
ストリーミングPushベースKafka/KinesisネイティブKafka連携
フィーチャー変換Python SDKSpark/Pandas/SQLSpark/Flink
モニタリング基本的内蔵(自動アラート)内蔵(ドリフト検知)
ガバナンス基本的RBAC、監査ログRBAC、監査、系譜追跡
クラウドマルチクラウドAWS/DatabricksAWS/Azure/GCP
適した組織柔軟性重視、エンジニアリング力あり大企業、リアルタイムML必須規制産業、オールインワン
コミュニティ非常に活発(CNCF関連)商用サポート活発

運用時の注意事項

1. Entity設計原則

  • Entityキーはビジネスドメインに合わせて設計します(user_id、order_id、device_idなど)
  • 複合Entityキーは検索パフォーマンスに影響するため慎重に使用します
  • Entityキーのカーディナリティが高すぎるとOnline Storeのメモリ使用量が急増します

2. TTL管理

  • Online StoreのTTLはMaterialization周期よりも余裕を持って設定します
  • TTLが短すぎるとMaterialization遅延時にNULL値が返されます
  • TTLが長すぎるとOnline Storeのストレージコストが増加します

3. スキーマ変更管理

  • フィーチャー追加は下位互換性がありますが、フィーチャー削除やタイプ変更はモデル再学習が必要です
  • Feature Viewのバージョン管理のため名前にバージョンを含めます(例: user_features_v2
  • スキーマ変更時は既存モデルとの互換性を必ず検証します

4. Materialization失敗への対応

# Check materialization status
feast materialize-incremental 2026-03-12T00:00:00 --verbose

# Rerun for specific Feature View only
feast materialize-incremental 2026-03-12T00:00:00 \
  --feature-views user_transaction_features

# Check feature freshness in Online Store
feast feature-views list

障害事例と復旧手順

障害事例1: Online Store障害(Redisクラスタダウン)

症状: すべてのリアルタイム推論リクエストでフィーチャー取得が失敗

復旧手順:

  1. Redisクラスタの状態確認と復旧
  2. 復旧不可の場合、バックアップRedisに切り替え(Sentinel/Cluster Failover)
  3. Materializationを再実行してOnline Storeデータを復元
  4. フィーチャー鮮度の検証後、推論サービスを再開

障害事例2: Materializationパイプライン障害

症状: Online Storeのフィーチャーデータが更新されず古い値がサービングされる

# Feature freshness check script
from feast import FeatureStore
from datetime import datetime, timedelta

store = FeatureStore(repo_path="./feature_repo")

# Check last materialization time per Feature View
for fv in store.list_feature_views():
    if fv.materialization_intervals:
        last_mat = fv.materialization_intervals[-1]
        staleness = datetime.utcnow() - last_mat.end_date
        if staleness > timedelta(hours=24):
            print(f"ALERT: {fv.name} is stale by {staleness}")
        else:
            print(f"OK: {fv.name} last materialized at {last_mat.end_date}")
    else:
        print(f"WARNING: {fv.name} has never been materialized")

復旧手順:

  1. Materializationログから失敗原因を特定(Offline Storeアクセス問題、スキーマ変更など)
  2. データソースの可用性を確認
  3. 失敗したFeature Viewに対してMaterializationを再試行
  4. Online Storeのフィーチャー整合性を検証

障害事例3: Feature Drift検知

症状: モデルパフォーマンス指標が段階的に低下

復旧手順:

  1. Feature Monitoringダッシュボードでドリフトを確認
  2. 原因を特定(データパイプライン変更、アップストリームスキーマ変更、実際の分布変化)
  3. 必要に応じてフィーチャーパイプラインを修正
  4. 深刻なドリフトの場合、モデル再学習をトリガー

プロダクションデプロイチェックリスト

プロダクション環境にFeature Storeをデプロイする際、以下の項目を必ず確認します。

項目チェックポイント推奨設定
Online Store可用性クラスタ構成、レプリカRedis Cluster 3+ノード
Materialization周期ビジネス要件対比の鮮度SLAに合った周期設定
TTL設定Materialization失敗時の影響Materialization周期の2〜3倍
バックアップ戦略Online/Offline Storeバックアップ日次スナップショット
モニタリングフィーチャードリフト、レイテンシPrometheus + Grafana
アラートMaterialization失敗、ドリフトPagerDuty/Slack連携
セキュリティ認証/認可、ネットワークIAM、VPC、TLS

まとめ

Feature StoreはMLモデルのプロダクション運用においてフィーチャー管理の複雑さを解決する核心インフラです。Feastはオープンソースの柔軟性とモジュラーアーキテクチャを通じて、既存インフラに自然に統合できる選択肢を提供します。

核心ポイントは以下の通りです。

  • Online/Offline Storeの分離: リアルタイムサービングとバッチ学習の要件をそれぞれ最適化する
  • Point-in-Time Correctness: Feature Storeのタイムトラベルクエリでデータ漏洩を根本的に防止する
  • Training-Serving Skew防止: 単一のフィーチャー定義から学習とサービングの両方をサポートして一貫性を保証する
  • Materialization自動化: Airflowなどと連携してフィーチャー更新パイプラインを安定的に運用する
  • Feature Monitoring: ドリフト検知とフィーチャー品質モニタリングでモデル性能を継続的に維持する

Feature Storeの導入は単一モデルではなく、組織全体のML成熟度を高める投資です。小規模から始めて1つのプロダクションモデルで検証した後、段階的に拡大するアプローチを推奨します。

参考資料