- Published on
Feature Store設計と運用ガイド: Feast基盤Online/Offline Store構築·MLフィーチャーパイプライン自動化
- Authors
- Name
- はじめに
- Feature Store核心概念
- Feastアーキテクチャ
- Materializationパイプライン
- Online Storeバックエンド設定
- Offline Store設定
- Training-Serving Skew防止戦略
- Feature MonitoringとDrift Detection
- Feature Storeソリューション比較
- 運用時の注意事項
- 障害事例と復旧手順
- プロダクションデプロイチェックリスト
- まとめ
- 参考資料

はじめに
機械学習モデルのプロダクションデプロイが一般化するにつれ、フィーチャー(Feature)管理がMLOpsの核心課題として浮上しています。モデル学習に使用したフィーチャーをリアルタイムサービングでも同一に再現する必要があり、複数のチームが同一のフィーチャーを重複計算しないように共有する必要があり、フィーチャーの品質と鮮度を継続的にモニタリングする必要があります。
Feature Storeはこうした問題を解決するために登場したインフラレイヤーで、フィーチャーの定義、保存、サービング、モニタリングを中央で管理します。その中でFeast(Feature Store)は最も広く使用されているオープンソースFeature Storeで、既存のデータインフラを再利用しながら柔軟なフィーチャーサービングを提供します。
この記事では、Feature Storeの核心概念、Feastアーキテクチャ、フィーチャー定義とEntity設計、Materializationパイプライン、Online/Offline Store設定、Training-Serving Skew防止戦略、Feature Monitoring、Tecton/Hopsworksとの比較、プロダクションデプロイパターン、障害対応まで全過程を解説します。
Feature Store核心概念
なぜFeature Storeが必要なのか
Feature Storeなしでパイプラインを運用すると以下の問題が発生します。
| 問題 | 説明 | 影響 |
|---|---|---|
| Training-Serving Skew | 学習とサービングでフィーチャー計算ロジックが異なる | モデル性能低下 |
| フィーチャー重複計算 | チームごとに同一フィーチャーを各自実装 | コンピューティングリソースの浪費 |
| データ漏洩 | 未来のデータが学習に含まれる | 過学習、誤った評価 |
| フィーチャー発見の困難 | どのフィーチャーが存在するか分からない | 開発生産性低下 |
| サービングレイテンシ | リアルタイムでフィーチャーを計算すると遅延が発生 | ユーザー体験の悪化 |
Online Store vs Offline Store
Feature Storeは2つのストレージを基本として持ちます。
| 項目 | Online Store | Offline Store |
|---|---|---|
| 用途 | リアルタイム推論 | モデル学習、バッチ推論 |
| レイテンシ | 1〜10ms | 秒〜分 |
| データ範囲 | 最新値のみ | 全履歴 |
| ストレージ例 | Redis, DynamoDB | BigQuery, Redshift, S3 |
| クエリパターン | Key-Value検索 | SQL/DataFrame検索 |
| データ量 | GBレベル | TB〜PBレベル |
| 一貫性 | 結果整合性 | 強い一貫性 |
Feature FreshnessとConsistency
フィーチャーの鮮度(Freshness)は、どれほど最新のデータを反映しているかを示します。
- バッチフィーチャー: 1時間〜1日単位で更新(例: ユーザーの直近30日間の購入回数)
- ストリーミングフィーチャー: 秒〜分単位で更新(例: 直近5分間の取引金額)
- リアルタイムフィーチャー: リクエスト時に計算(例: 現在のセッションのクリック数)
Feastアーキテクチャ
全体構造
Feastは以下のコンポーネントで構成されています。
# feature_store.yaml - Feast project configuration
project: fraud_detection
registry: gs://ml-feature-store/registry.db
provider: gcp
offline_store:
type: bigquery
dataset: feature_store
online_store:
type: redis
connection_string: 'redis-cluster.internal:6379'
redis_type: redis_cluster
entity_key_serialization_version: 2
| コンポーネント | 役割 | 説明 |
|---|---|---|
| Feature Registry | フィーチャーメタデータ管理 | フィーチャー定義、エンティティ、データソース情報を保存 |
| Offline Store | 履歴データ保存 | BigQuery、Redshift、Sparkなどから学習データを抽出 |
| Online Store | 最新フィーチャーサービング | Redis、DynamoDBなどからリアルタイム検索 |
| Feature Server | REST APIサービング | FastAPIベースの低レイテンシフィーチャーサービングエンドポイント |
| Materialization Engine | データ同期 | Offline StoreからOnline Storeへフィーチャーをコピー |
Feature定義とEntity設計
# features/fraud_detection.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, BigQuerySource
from feast.types import Float32, Int64, String
# Entity definition - the target that features are linked to
user_entity = Entity(
name="user_id",
join_keys=["user_id"],
description="Unique identifier for a user",
)
merchant_entity = Entity(
name="merchant_id",
join_keys=["merchant_id"],
description="Unique identifier for a merchant",
)
# Data Source definition
user_transactions_source = BigQuerySource(
name="user_transactions",
table="ml_data.user_transaction_features",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
merchant_stats_source = BigQuerySource(
name="merchant_stats",
table="ml_data.merchant_statistics",
timestamp_field="event_timestamp",
)
# Feature View definition - a group of features
user_transaction_features = FeatureView(
name="user_transaction_features",
entities=[user_entity],
ttl=timedelta(days=7),
schema=[
Field(name="transaction_count_7d", dtype=Int64),
Field(name="transaction_amount_avg_7d", dtype=Float32),
Field(name="transaction_amount_max_7d", dtype=Float32),
Field(name="unique_merchants_7d", dtype=Int64),
Field(name="avg_time_between_transactions", dtype=Float32),
],
source=user_transactions_source,
online=True,
tags={
"team": "fraud-detection",
"version": "v2",
},
)
merchant_risk_features = FeatureView(
name="merchant_risk_features",
entities=[merchant_entity],
ttl=timedelta(days=30),
schema=[
Field(name="chargeback_rate_30d", dtype=Float32),
Field(name="avg_transaction_amount", dtype=Float32),
Field(name="total_transactions_30d", dtype=Int64),
Field(name="risk_score", dtype=Float32),
],
source=merchant_stats_source,
online=True,
tags={
"team": "fraud-detection",
"version": "v1",
},
)
Feature Service定義
# features/services.py
from feast import FeatureService
fraud_detection_service = FeatureService(
name="fraud_detection_v2",
features=[
user_transaction_features,
merchant_risk_features,
],
tags={
"model": "fraud_detector_v2",
"owner": "ml-team",
},
)
Materializationパイプライン
バッチMaterialization
# Apply Feature Registry with Feast CLI
feast apply
# Run batch materialization
feast materialize 2026-03-01T00:00:00 2026-03-12T00:00:00
# Incremental materialization (since last run)
feast materialize-incremental 2026-03-12T00:00:00
Airflowを活用した自動化
# dags/feast_materialization.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
default_args = {
"owner": "ml-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"feast_materialization",
default_args=default_args,
description="Daily feature materialization pipeline",
schedule_interval="0 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["feast", "mlops"],
)
# Validate feature source data
validate_sources = PythonOperator(
task_id="validate_sources",
python_callable=lambda: __import__("feast").FeatureStore(
repo_path="/opt/feast/feature_repo"
),
dag=dag,
)
# Run materialization
materialize = BashOperator(
task_id="materialize_features",
bash_command="""
cd /opt/feast/feature_repo && \
feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S)
""",
dag=dag,
)
# Validate Online Store consistency
validate_online = PythonOperator(
task_id="validate_online_store",
python_callable=lambda: print("Validating online store consistency..."),
dag=dag,
)
validate_sources >> materialize >> validate_online
Online Storeバックエンド設定
RedisベースOnline Store
# feature_store.yaml - Redis configuration
project: fraud_detection
registry: gs://ml-feature-store/registry.db
provider: gcp
online_store:
type: redis
connection_string: 'redis-cluster.internal:6379,redis-cluster.internal:6380,redis-cluster.internal:6381'
redis_type: redis_cluster
key_ttl_seconds: 604800 # 7 days
DynamoDBベースOnline Store
# feature_store.yaml - DynamoDB configuration
project: fraud_detection
registry: s3://ml-feature-store/registry.db
provider: aws
online_store:
type: dynamodb
region: ap-northeast-2
table_name_template: 'feast_online_{project}_{table}'
Online Storeバックエンド比較
| 項目 | Redis | DynamoDB | PostgreSQL |
|---|---|---|---|
| レイテンシ | 0.5〜2ms | 1〜5ms | 2〜10ms |
| スケーラビリティ | 手動(クラスタ) | 自動 | 手動 |
| コストモデル | インスタンスベース | リクエストベース | インスタンスベース |
| TTLサポート | ネイティブ | ネイティブ | 手動実装 |
| 運用負担 | 中程度 | 低い | 中程度 |
| 適した環境 | 超低レイテンシが必要 | サーバーレス、AWS | 小規模、コスト重視 |
Offline Store設定
BigQueryベースOffline Store
# feature_store.yaml - BigQuery configuration
project: fraud_detection
registry: gs://ml-feature-store/registry.db
provider: gcp
offline_store:
type: bigquery
dataset: feature_store
location: asia-northeast3
学習データ生成(Point-in-Time Join)
# training_data.py
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path="./feature_repo")
# Entity DataFrame - timestamps and entities for training
entity_df = pd.DataFrame({
"user_id": ["user_001", "user_002", "user_003", "user_001"],
"merchant_id": ["merch_100", "merch_200", "merch_100", "merch_300"],
"event_timestamp": pd.to_datetime([
"2026-03-01 10:00:00",
"2026-03-02 14:30:00",
"2026-03-03 09:15:00",
"2026-03-05 16:45:00",
]),
"label": [0, 1, 0, 1], # fraud label
})
# Extract features with point-in-time correctness
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_transaction_features:transaction_count_7d",
"user_transaction_features:transaction_amount_avg_7d",
"user_transaction_features:transaction_amount_max_7d",
"user_transaction_features:unique_merchants_7d",
"merchant_risk_features:chargeback_rate_30d",
"merchant_risk_features:risk_score",
],
).to_df()
print(training_df.head())
print(f"Training data shape: {training_df.shape}")
Onlineフィーチャー取得(リアルタイム推論)
# inference.py
from feast import FeatureStore
store = FeatureStore(repo_path="./feature_repo")
# Real-time feature retrieval
feature_vector = store.get_online_features(
features=[
"user_transaction_features:transaction_count_7d",
"user_transaction_features:transaction_amount_avg_7d",
"user_transaction_features:unique_merchants_7d",
"merchant_risk_features:chargeback_rate_30d",
"merchant_risk_features:risk_score",
],
entity_rows=[
{"user_id": "user_001", "merchant_id": "merch_100"},
],
).to_dict()
print(feature_vector)
# Example output:
# {
# "user_id": ["user_001"],
# "merchant_id": ["merch_100"],
# "transaction_count_7d": [23],
# "transaction_amount_avg_7d": [45000.5],
# "unique_merchants_7d": [8],
# "chargeback_rate_30d": [0.02],
# "risk_score": [0.15]
# }
Training-Serving Skew防止戦略
Training-Serving Skewは、MLモデル性能を低下させる最も一般的な原因の一つです。
Skew発生原因と対応
| 原因 | 説明 | 対応方法 |
|---|---|---|
| フィーチャー計算ロジック不一致 | 学習/サービングで異なるコードを使用 | Feature Storeで単一ソース化 |
| データ漏洩 | 未来のデータが学習に含まれる | Point-in-Time Join適用 |
| フィーチャー鮮度の差異 | バッチvsリアルタイム更新周期の違い | TTL管理とFreshnessモニタリング |
| スキーマ変更 | フィーチャー定義が変更された | Feature Registryバージョン管理 |
| NULL処理の違い | デフォルト値処理方式の不一致 | 統一されたデフォルト値ポリシー設定 |
FeastによるSkew防止
# skew_detection.py
import pandas as pd
import numpy as np
from feast import FeatureStore
from scipy import stats
store = FeatureStore(repo_path="./feature_repo")
def detect_training_serving_skew(
feature_name: str,
training_values: pd.Series,
sample_size: int = 1000,
):
"""Compare feature distributions between training data and Online Store."""
# Sample from Online Store
online_features = []
entity_rows = [{"user_id": f"user_{i:04d}"} for i in range(sample_size)]
online_result = store.get_online_features(
features=[feature_name],
entity_rows=entity_rows,
).to_df()
serving_values = online_result[feature_name.split(":")[-1]].dropna()
# KS test for distribution comparison
ks_stat, p_value = stats.ks_2samp(
training_values.dropna(),
serving_values,
)
# PSI (Population Stability Index) calculation
psi = calculate_psi(training_values.dropna(), serving_values)
return {
"feature": feature_name,
"ks_statistic": ks_stat,
"p_value": p_value,
"psi": psi,
"skew_detected": psi > 0.2 or p_value < 0.05,
}
def calculate_psi(expected, actual, bins=10):
"""Calculate Population Stability Index (PSI)."""
breakpoints = np.linspace(
min(expected.min(), actual.min()),
max(expected.max(), actual.max()),
bins + 1,
)
expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)
# Avoid division by zero
expected_counts = np.clip(expected_counts, 0.001, None)
actual_counts = np.clip(actual_counts, 0.001, None)
psi = np.sum(
(actual_counts - expected_counts) * np.log(actual_counts / expected_counts)
)
return psi
Feature MonitoringとDrift Detection
モニタリング指標
# monitoring/feature_monitor.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import pandas as pd
@dataclass
class FeatureStats:
feature_name: str
timestamp: datetime
mean: float
std: float
min_val: float
max_val: float
null_rate: float
unique_count: int
p99_latency_ms: Optional[float] = None
def compute_feature_stats(df: pd.DataFrame, feature_name: str) -> FeatureStats:
"""Compute statistical information for a feature."""
series = df[feature_name]
return FeatureStats(
feature_name=feature_name,
timestamp=datetime.utcnow(),
mean=series.mean(),
std=series.std(),
min_val=series.min(),
max_val=series.max(),
null_rate=series.isnull().sum() / len(series),
unique_count=series.nunique(),
)
def check_drift_alerts(
current: FeatureStats,
baseline: FeatureStats,
thresholds: dict,
) -> list:
"""Check for feature drift alerts."""
alerts = []
# Check mean change rate
if baseline.mean != 0:
mean_change = abs(current.mean - baseline.mean) / abs(baseline.mean)
if mean_change > thresholds.get("mean_change", 0.3):
alerts.append(
f"Mean drift detected: {baseline.mean:.4f} -> {current.mean:.4f} "
f"(change: {mean_change:.2%})"
)
# NULL rate change
null_diff = abs(current.null_rate - baseline.null_rate)
if null_diff > thresholds.get("null_rate_change", 0.05):
alerts.append(
f"Null rate change: {baseline.null_rate:.4f} -> {current.null_rate:.4f}"
)
# Range anomaly
if current.max_val > baseline.max_val * thresholds.get("max_multiplier", 2.0):
alerts.append(
f"Max value anomaly: {current.max_val} (baseline max: {baseline.max_val})"
)
return alerts
Feature Storeソリューション比較
| 項目 | Feast | Tecton | Hopsworks |
|---|---|---|---|
| ライセンス | Apache 2.0(オープンソース) | 商用(マネージド) | AGPL + 商用 |
| アーキテクチャ | モジュラー型、プラガブル | マネージド、エンドツーエンド | 統合プラットフォーム |
| リアルタイムフィーチャー | 制限的 | ネイティブサポート | サポート |
| ストリーミング | Pushベース | Kafka/Kinesisネイティブ | Kafka連携 |
| フィーチャー変換 | Python SDK | Spark/Pandas/SQL | Spark/Flink |
| モニタリング | 基本的 | 内蔵(自動アラート) | 内蔵(ドリフト検知) |
| ガバナンス | 基本的 | RBAC、監査ログ | RBAC、監査、系譜追跡 |
| クラウド | マルチクラウド | AWS/Databricks | AWS/Azure/GCP |
| 適した組織 | 柔軟性重視、エンジニアリング力あり | 大企業、リアルタイムML必須 | 規制産業、オールインワン |
| コミュニティ | 非常に活発(CNCF関連) | 商用サポート | 活発 |
運用時の注意事項
1. Entity設計原則
- Entityキーはビジネスドメインに合わせて設計します(user_id、order_id、device_idなど)
- 複合Entityキーは検索パフォーマンスに影響するため慎重に使用します
- Entityキーのカーディナリティが高すぎるとOnline Storeのメモリ使用量が急増します
2. TTL管理
- Online StoreのTTLはMaterialization周期よりも余裕を持って設定します
- TTLが短すぎるとMaterialization遅延時にNULL値が返されます
- TTLが長すぎるとOnline Storeのストレージコストが増加します
3. スキーマ変更管理
- フィーチャー追加は下位互換性がありますが、フィーチャー削除やタイプ変更はモデル再学習が必要です
- Feature Viewのバージョン管理のため名前にバージョンを含めます(例:
user_features_v2) - スキーマ変更時は既存モデルとの互換性を必ず検証します
4. Materialization失敗への対応
# Check materialization status
feast materialize-incremental 2026-03-12T00:00:00 --verbose
# Rerun for specific Feature View only
feast materialize-incremental 2026-03-12T00:00:00 \
--feature-views user_transaction_features
# Check feature freshness in Online Store
feast feature-views list
障害事例と復旧手順
障害事例1: Online Store障害(Redisクラスタダウン)
症状: すべてのリアルタイム推論リクエストでフィーチャー取得が失敗
復旧手順:
- Redisクラスタの状態確認と復旧
- 復旧不可の場合、バックアップRedisに切り替え(Sentinel/Cluster Failover)
- Materializationを再実行してOnline Storeデータを復元
- フィーチャー鮮度の検証後、推論サービスを再開
障害事例2: Materializationパイプライン障害
症状: Online Storeのフィーチャーデータが更新されず古い値がサービングされる
# Feature freshness check script
from feast import FeatureStore
from datetime import datetime, timedelta
store = FeatureStore(repo_path="./feature_repo")
# Check last materialization time per Feature View
for fv in store.list_feature_views():
if fv.materialization_intervals:
last_mat = fv.materialization_intervals[-1]
staleness = datetime.utcnow() - last_mat.end_date
if staleness > timedelta(hours=24):
print(f"ALERT: {fv.name} is stale by {staleness}")
else:
print(f"OK: {fv.name} last materialized at {last_mat.end_date}")
else:
print(f"WARNING: {fv.name} has never been materialized")
復旧手順:
- Materializationログから失敗原因を特定(Offline Storeアクセス問題、スキーマ変更など)
- データソースの可用性を確認
- 失敗したFeature Viewに対してMaterializationを再試行
- Online Storeのフィーチャー整合性を検証
障害事例3: Feature Drift検知
症状: モデルパフォーマンス指標が段階的に低下
復旧手順:
- Feature Monitoringダッシュボードでドリフトを確認
- 原因を特定(データパイプライン変更、アップストリームスキーマ変更、実際の分布変化)
- 必要に応じてフィーチャーパイプラインを修正
- 深刻なドリフトの場合、モデル再学習をトリガー
プロダクションデプロイチェックリスト
プロダクション環境にFeature Storeをデプロイする際、以下の項目を必ず確認します。
| 項目 | チェックポイント | 推奨設定 |
|---|---|---|
| Online Store可用性 | クラスタ構成、レプリカ | Redis Cluster 3+ノード |
| Materialization周期 | ビジネス要件対比の鮮度 | SLAに合った周期設定 |
| TTL設定 | Materialization失敗時の影響 | Materialization周期の2〜3倍 |
| バックアップ戦略 | Online/Offline Storeバックアップ | 日次スナップショット |
| モニタリング | フィーチャードリフト、レイテンシ | Prometheus + Grafana |
| アラート | Materialization失敗、ドリフト | PagerDuty/Slack連携 |
| セキュリティ | 認証/認可、ネットワーク | IAM、VPC、TLS |
まとめ
Feature StoreはMLモデルのプロダクション運用においてフィーチャー管理の複雑さを解決する核心インフラです。Feastはオープンソースの柔軟性とモジュラーアーキテクチャを通じて、既存インフラに自然に統合できる選択肢を提供します。
核心ポイントは以下の通りです。
- Online/Offline Storeの分離: リアルタイムサービングとバッチ学習の要件をそれぞれ最適化する
- Point-in-Time Correctness: Feature Storeのタイムトラベルクエリでデータ漏洩を根本的に防止する
- Training-Serving Skew防止: 単一のフィーチャー定義から学習とサービングの両方をサポートして一貫性を保証する
- Materialization自動化: Airflowなどと連携してフィーチャー更新パイプラインを安定的に運用する
- Feature Monitoring: ドリフト検知とフィーチャー品質モニタリングでモデル性能を継続的に維持する
Feature Storeの導入は単一モデルではなく、組織全体のML成熟度を高める投資です。小規模から始めて1つのプロダクションモデルで検証した後、段階的に拡大するアプローチを推奨します。
参考資料
- Feast Official Documentation
- Feast Architecture Overview
- Feature Store Architecture and Storage - DragonflyDB
- A Comparative Analysis: Feast vs Tecton vs Hopsworks - Uplatz
- Feature Store 101: Build, Serve, and Scale ML Features - Aerospike
- What is a Feature Store? - Databricks
- Solving Training-Serving Skew with Feast - Medium