Skip to content

필사 모드: Feature Store設計と運用ガイド: Feast基盤Online/Offline Store構築·MLフィーチャーパイプライン自動化

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

はじめに

機械学習モデルのプロダクションデプロイが一般化するにつれ、フィーチャー(Feature)管理がMLOpsの核心課題として浮上しています。モデル学習に使用したフィーチャーをリアルタイムサービングでも同一に再現する必要があり、複数のチームが同一のフィーチャーを重複計算しないように共有する必要があり、フィーチャーの品質と鮮度を継続的にモニタリングする必要があります。

Feature Storeはこうした問題を解決するために登場したインフラレイヤーで、フィーチャーの定義、保存、サービング、モニタリングを中央で管理します。その中でFeast(Feature Store)は最も広く使用されているオープンソースFeature Storeで、既存のデータインフラを再利用しながら柔軟なフィーチャーサービングを提供します。

この記事では、Feature Storeの核心概念、Feastアーキテクチャ、フィーチャー定義とEntity設計、Materializationパイプライン、Online/Offline Store設定、Training-Serving Skew防止戦略、Feature Monitoring、Tecton/Hopsworksとの比較、プロダクションデプロイパターン、障害対応まで全過程を解説します。

Feature Store核心概念

なぜFeature Storeが必要なのか

Feature Storeなしでパイプラインを運用すると以下の問題が発生します。

| 問題 | 説明 | 影響 |

| ---------------------- | -------------------------------------------------- | -------------------------------- |

| Training-Serving Skew | 学習とサービングでフィーチャー計算ロジックが異なる | モデル性能低下 |

| フィーチャー重複計算 | チームごとに同一フィーチャーを各自実装 | コンピューティングリソースの浪費 |

| データ漏洩 | 未来のデータが学習に含まれる | 過学習、誤った評価 |

| フィーチャー発見の困難 | どのフィーチャーが存在するか分からない | 開発生産性低下 |

| サービングレイテンシ | リアルタイムでフィーチャーを計算すると遅延が発生 | ユーザー体験の悪化 |

Online Store vs Offline Store

Feature Storeは2つのストレージを基本として持ちます。

| 項目 | Online Store | Offline Store |

| -------------- | ---------------- | ---------------------- |

| 用途 | リアルタイム推論 | モデル学習、バッチ推論 |

| レイテンシ | 1〜10ms | 秒〜分 |

| データ範囲 | 最新値のみ | 全履歴 |

| ストレージ例 | Redis, DynamoDB | BigQuery, Redshift, S3 |

| クエリパターン | Key-Value検索 | SQL/DataFrame検索 |

| データ量 | GBレベル | TB〜PBレベル |

| 一貫性 | 結果整合性 | 強い一貫性 |

Feature FreshnessとConsistency

フィーチャーの鮮度(Freshness)は、どれほど最新のデータを反映しているかを示します。

- **バッチフィーチャー**: 1時間〜1日単位で更新(例: ユーザーの直近30日間の購入回数)

- **ストリーミングフィーチャー**: 秒〜分単位で更新(例: 直近5分間の取引金額)

- **リアルタイムフィーチャー**: リクエスト時に計算(例: 現在のセッションのクリック数)

Feastアーキテクチャ

全体構造

Feastは以下のコンポーネントで構成されています。

feature_store.yaml - Feast project configuration

project: fraud_detection

registry: gs://ml-feature-store/registry.db

provider: gcp

offline_store:

type: bigquery

dataset: feature_store

online_store:

type: redis

connection_string: 'redis-cluster.internal:6379'

redis_type: redis_cluster

entity_key_serialization_version: 2

| コンポーネント | 役割 | 説明 |

| ---------------------- | -------------------------- | --------------------------------------------------------------- |

| Feature Registry | フィーチャーメタデータ管理 | フィーチャー定義、エンティティ、データソース情報を保存 |

| Offline Store | 履歴データ保存 | BigQuery、Redshift、Sparkなどから学習データを抽出 |

| Online Store | 最新フィーチャーサービング | Redis、DynamoDBなどからリアルタイム検索 |

| Feature Server | REST APIサービング | FastAPIベースの低レイテンシフィーチャーサービングエンドポイント |

| Materialization Engine | データ同期 | Offline StoreからOnline Storeへフィーチャーをコピー |

Feature定義とEntity設計

features/fraud_detection.py

from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource, BigQuerySource

from feast.types import Float32, Int64, String

Entity definition - the target that features are linked to

user_entity = Entity(

name="user_id",

join_keys=["user_id"],

description="Unique identifier for a user",

)

merchant_entity = Entity(

name="merchant_id",

join_keys=["merchant_id"],

description="Unique identifier for a merchant",

)

Data Source definition

user_transactions_source = BigQuerySource(

name="user_transactions",

table="ml_data.user_transaction_features",

timestamp_field="event_timestamp",

created_timestamp_column="created_timestamp",

)

merchant_stats_source = BigQuerySource(

name="merchant_stats",

table="ml_data.merchant_statistics",

timestamp_field="event_timestamp",

)

Feature View definition - a group of features

user_transaction_features = FeatureView(

name="user_transaction_features",

entities=[user_entity],

ttl=timedelta(days=7),

schema=[

Field(name="transaction_count_7d", dtype=Int64),

Field(name="transaction_amount_avg_7d", dtype=Float32),

Field(name="transaction_amount_max_7d", dtype=Float32),

Field(name="unique_merchants_7d", dtype=Int64),

Field(name="avg_time_between_transactions", dtype=Float32),

],

source=user_transactions_source,

online=True,

tags={

"team": "fraud-detection",

"version": "v2",

},

)

merchant_risk_features = FeatureView(

name="merchant_risk_features",

entities=[merchant_entity],

ttl=timedelta(days=30),

schema=[

Field(name="chargeback_rate_30d", dtype=Float32),

Field(name="avg_transaction_amount", dtype=Float32),

Field(name="total_transactions_30d", dtype=Int64),

Field(name="risk_score", dtype=Float32),

],

source=merchant_stats_source,

online=True,

tags={

"team": "fraud-detection",

"version": "v1",

},

)

Feature Service定義

features/services.py

from feast import FeatureService

fraud_detection_service = FeatureService(

name="fraud_detection_v2",

features=[

user_transaction_features,

merchant_risk_features,

],

tags={

"model": "fraud_detector_v2",

"owner": "ml-team",

},

)

Materializationパイプライン

バッチMaterialization

Apply Feature Registry with Feast CLI

feast apply

Run batch materialization

feast materialize 2026-03-01T00:00:00 2026-03-12T00:00:00

Incremental materialization (since last run)

feast materialize-incremental 2026-03-12T00:00:00

Airflowを活用した自動化

dags/feast_materialization.py

from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import PythonOperator

default_args = {

"owner": "ml-team",

"retries": 3,

"retry_delay": timedelta(minutes=5),

}

dag = DAG(

"feast_materialization",

default_args=default_args,

description="Daily feature materialization pipeline",

schedule_interval="0 6 * * *",

start_date=datetime(2026, 1, 1),

catchup=False,

tags=["feast", "mlops"],

)

Validate feature source data

validate_sources = PythonOperator(

task_id="validate_sources",

python_callable=lambda: __import__("feast").FeatureStore(

repo_path="/opt/feast/feature_repo"

),

dag=dag,

)

Run materialization

materialize = BashOperator(

task_id="materialize_features",

bash_command="""

cd /opt/feast/feature_repo && \

feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S)

""",

dag=dag,

)

Validate Online Store consistency

validate_online = PythonOperator(

task_id="validate_online_store",

python_callable=lambda: print("Validating online store consistency..."),

dag=dag,

)

validate_sources >> materialize >> validate_online

Online Storeバックエンド設定

RedisベースOnline Store

feature_store.yaml - Redis configuration

project: fraud_detection

registry: gs://ml-feature-store/registry.db

provider: gcp

online_store:

type: redis

connection_string: 'redis-cluster.internal:6379,redis-cluster.internal:6380,redis-cluster.internal:6381'

redis_type: redis_cluster

key_ttl_seconds: 604800 # 7 days

DynamoDBベースOnline Store

feature_store.yaml - DynamoDB configuration

project: fraud_detection

registry: s3://ml-feature-store/registry.db

provider: aws

online_store:

type: dynamodb

region: ap-northeast-2

table_name_template: 'feast_online_{project}_{table}'

Online Storeバックエンド比較

| 項目 | Redis | DynamoDB | PostgreSQL |

| ---------------- | -------------------- | ----------------- | ------------------ |

| レイテンシ | 0.5〜2ms | 1〜5ms | 2〜10ms |

| スケーラビリティ | 手動(クラスタ) | 自動 | 手動 |

| コストモデル | インスタンスベース | リクエストベース | インスタンスベース |

| TTLサポート | ネイティブ | ネイティブ | 手動実装 |

| 運用負担 | 中程度 | 低い | 中程度 |

| 適した環境 | 超低レイテンシが必要 | サーバーレス、AWS | 小規模、コスト重視 |

Offline Store設定

BigQueryベースOffline Store

feature_store.yaml - BigQuery configuration

project: fraud_detection

registry: gs://ml-feature-store/registry.db

provider: gcp

offline_store:

type: bigquery

dataset: feature_store

location: asia-northeast3

学習データ生成(Point-in-Time Join)

training_data.py

from feast import FeatureStore

store = FeatureStore(repo_path="./feature_repo")

Entity DataFrame - timestamps and entities for training

entity_df = pd.DataFrame({

"user_id": ["user_001", "user_002", "user_003", "user_001"],

"merchant_id": ["merch_100", "merch_200", "merch_100", "merch_300"],

"event_timestamp": pd.to_datetime([

"2026-03-01 10:00:00",

"2026-03-02 14:30:00",

"2026-03-03 09:15:00",

"2026-03-05 16:45:00",

]),

"label": [0, 1, 0, 1], # fraud label

})

Extract features with point-in-time correctness

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"user_transaction_features:transaction_count_7d",

"user_transaction_features:transaction_amount_avg_7d",

"user_transaction_features:transaction_amount_max_7d",

"user_transaction_features:unique_merchants_7d",

"merchant_risk_features:chargeback_rate_30d",

"merchant_risk_features:risk_score",

],

).to_df()

print(training_df.head())

print(f"Training data shape: {training_df.shape}")

Onlineフィーチャー取得(リアルタイム推論)

inference.py

from feast import FeatureStore

store = FeatureStore(repo_path="./feature_repo")

Real-time feature retrieval

feature_vector = store.get_online_features(

features=[

"user_transaction_features:transaction_count_7d",

"user_transaction_features:transaction_amount_avg_7d",

"user_transaction_features:unique_merchants_7d",

"merchant_risk_features:chargeback_rate_30d",

"merchant_risk_features:risk_score",

],

entity_rows=[

{"user_id": "user_001", "merchant_id": "merch_100"},

],

).to_dict()

print(feature_vector)

Example output:

{

"user_id": ["user_001"],

"merchant_id": ["merch_100"],

"transaction_count_7d": [23],

"transaction_amount_avg_7d": [45000.5],

"unique_merchants_7d": [8],

"chargeback_rate_30d": [0.02],

"risk_score": [0.15]

}

Training-Serving Skew防止戦略

Training-Serving Skewは、MLモデル性能を低下させる最も一般的な原因の一つです。

Skew発生原因と対応

| 原因 | 説明 | 対応方法 |

| ------------------------------ | ----------------------------------- | ---------------------------------- |

| フィーチャー計算ロジック不一致 | 学習/サービングで異なるコードを使用 | Feature Storeで単一ソース化 |

| データ漏洩 | 未来のデータが学習に含まれる | Point-in-Time Join適用 |

| フィーチャー鮮度の差異 | バッチvsリアルタイム更新周期の違い | TTL管理とFreshnessモニタリング |

| スキーマ変更 | フィーチャー定義が変更された | Feature Registryバージョン管理 |

| NULL処理の違い | デフォルト値処理方式の不一致 | 統一されたデフォルト値ポリシー設定 |

FeastによるSkew防止

skew_detection.py

from feast import FeatureStore

from scipy import stats

store = FeatureStore(repo_path="./feature_repo")

def detect_training_serving_skew(

feature_name: str,

training_values: pd.Series,

sample_size: int = 1000,

):

"""Compare feature distributions between training data and Online Store."""

Sample from Online Store

online_features = []

entity_rows = [{"user_id": f"user_{i:04d}"} for i in range(sample_size)]

online_result = store.get_online_features(

features=[feature_name],

entity_rows=entity_rows,

).to_df()

serving_values = online_result[feature_name.split(":")[-1]].dropna()

KS test for distribution comparison

ks_stat, p_value = stats.ks_2samp(

training_values.dropna(),

serving_values,

)

PSI (Population Stability Index) calculation

psi = calculate_psi(training_values.dropna(), serving_values)

return {

"feature": feature_name,

"ks_statistic": ks_stat,

"p_value": p_value,

"psi": psi,

"skew_detected": psi > 0.2 or p_value < 0.05,

}

def calculate_psi(expected, actual, bins=10):

"""Calculate Population Stability Index (PSI)."""

breakpoints = np.linspace(

min(expected.min(), actual.min()),

max(expected.max(), actual.max()),

bins + 1,

)

expected_counts = np.histogram(expected, breakpoints)[0] / len(expected)

actual_counts = np.histogram(actual, breakpoints)[0] / len(actual)

Avoid division by zero

expected_counts = np.clip(expected_counts, 0.001, None)

actual_counts = np.clip(actual_counts, 0.001, None)

psi = np.sum(

(actual_counts - expected_counts) * np.log(actual_counts / expected_counts)

)

return psi

Feature MonitoringとDrift Detection

モニタリング指標

monitoring/feature_monitor.py

from dataclasses import dataclass

from datetime import datetime

from typing import Optional

@dataclass

class FeatureStats:

feature_name: str

timestamp: datetime

mean: float

std: float

min_val: float

max_val: float

null_rate: float

unique_count: int

p99_latency_ms: Optional[float] = None

def compute_feature_stats(df: pd.DataFrame, feature_name: str) -> FeatureStats:

"""Compute statistical information for a feature."""

series = df[feature_name]

return FeatureStats(

feature_name=feature_name,

timestamp=datetime.utcnow(),

mean=series.mean(),

std=series.std(),

min_val=series.min(),

max_val=series.max(),

null_rate=series.isnull().sum() / len(series),

unique_count=series.nunique(),

)

def check_drift_alerts(

current: FeatureStats,

baseline: FeatureStats,

thresholds: dict,

) -> list:

"""Check for feature drift alerts."""

alerts = []

Check mean change rate

if baseline.mean != 0:

mean_change = abs(current.mean - baseline.mean) / abs(baseline.mean)

if mean_change > thresholds.get("mean_change", 0.3):

alerts.append(

f"Mean drift detected: {baseline.mean:.4f} -> {current.mean:.4f} "

f"(change: {mean_change:.2%})"

)

NULL rate change

null_diff = abs(current.null_rate - baseline.null_rate)

if null_diff > thresholds.get("null_rate_change", 0.05):

alerts.append(

f"Null rate change: {baseline.null_rate:.4f} -> {current.null_rate:.4f}"

)

Range anomaly

if current.max_val > baseline.max_val * thresholds.get("max_multiplier", 2.0):

alerts.append(

f"Max value anomaly: {current.max_val} (baseline max: {baseline.max_val})"

)

return alerts

Feature Storeソリューション比較

| 項目 | Feast | Tecton | Hopsworks |

| ------------------------ | ---------------------------------- | ---------------------------- | ------------------------ |

| ライセンス | Apache 2.0(オープンソース) | 商用(マネージド) | AGPL + 商用 |

| アーキテクチャ | モジュラー型、プラガブル | マネージド、エンドツーエンド | 統合プラットフォーム |

| リアルタイムフィーチャー | 制限的 | ネイティブサポート | サポート |

| ストリーミング | Pushベース | Kafka/Kinesisネイティブ | Kafka連携 |

| フィーチャー変換 | Python SDK | Spark/Pandas/SQL | Spark/Flink |

| モニタリング | 基本的 | 内蔵(自動アラート) | 内蔵(ドリフト検知) |

| ガバナンス | 基本的 | RBAC、監査ログ | RBAC、監査、系譜追跡 |

| クラウド | マルチクラウド | AWS/Databricks | AWS/Azure/GCP |

| 適した組織 | 柔軟性重視、エンジニアリング力あり | 大企業、リアルタイムML必須 | 規制産業、オールインワン |

| コミュニティ | 非常に活発(CNCF関連) | 商用サポート | 活発 |

運用時の注意事項

1. Entity設計原則

- Entityキーはビジネスドメインに合わせて設計します(user_id、order_id、device_idなど)

- 複合Entityキーは検索パフォーマンスに影響するため慎重に使用します

- Entityキーのカーディナリティが高すぎるとOnline Storeのメモリ使用量が急増します

2. TTL管理

- Online StoreのTTLはMaterialization周期よりも余裕を持って設定します

- TTLが短すぎるとMaterialization遅延時にNULL値が返されます

- TTLが長すぎるとOnline Storeのストレージコストが増加します

3. スキーマ変更管理

- フィーチャー追加は下位互換性がありますが、フィーチャー削除やタイプ変更はモデル再学習が必要です

- Feature Viewのバージョン管理のため名前にバージョンを含めます(例: `user_features_v2`)

- スキーマ変更時は既存モデルとの互換性を必ず検証します

4. Materialization失敗への対応

Check materialization status

feast materialize-incremental 2026-03-12T00:00:00 --verbose

Rerun for specific Feature View only

feast materialize-incremental 2026-03-12T00:00:00 \

--feature-views user_transaction_features

Check feature freshness in Online Store

feast feature-views list

障害事例と復旧手順

障害事例1: Online Store障害(Redisクラスタダウン)

**症状**: すべてのリアルタイム推論リクエストでフィーチャー取得が失敗

**復旧手順**:

1. Redisクラスタの状態確認と復旧

2. 復旧不可の場合、バックアップRedisに切り替え(Sentinel/Cluster Failover)

3. Materializationを再実行してOnline Storeデータを復元

4. フィーチャー鮮度の検証後、推論サービスを再開

障害事例2: Materializationパイプライン障害

**症状**: Online Storeのフィーチャーデータが更新されず古い値がサービングされる

Feature freshness check script

from feast import FeatureStore

from datetime import datetime, timedelta

store = FeatureStore(repo_path="./feature_repo")

Check last materialization time per Feature View

for fv in store.list_feature_views():

if fv.materialization_intervals:

last_mat = fv.materialization_intervals[-1]

staleness = datetime.utcnow() - last_mat.end_date

if staleness > timedelta(hours=24):

print(f"ALERT: {fv.name} is stale by {staleness}")

else:

print(f"OK: {fv.name} last materialized at {last_mat.end_date}")

else:

print(f"WARNING: {fv.name} has never been materialized")

**復旧手順**:

1. Materializationログから失敗原因を特定(Offline Storeアクセス問題、スキーマ変更など)

2. データソースの可用性を確認

3. 失敗したFeature Viewに対してMaterializationを再試行

4. Online Storeのフィーチャー整合性を検証

障害事例3: Feature Drift検知

**症状**: モデルパフォーマンス指標が段階的に低下

**復旧手順**:

1. Feature Monitoringダッシュボードでドリフトを確認

2. 原因を特定(データパイプライン変更、アップストリームスキーマ変更、実際の分布変化)

3. 必要に応じてフィーチャーパイプラインを修正

4. 深刻なドリフトの場合、モデル再学習をトリガー

プロダクションデプロイチェックリスト

プロダクション環境にFeature Storeをデプロイする際、以下の項目を必ず確認します。

| 項目 | チェックポイント | 推奨設定 |

| ------------------- | -------------------------------- | --------------------------- |

| Online Store可用性 | クラスタ構成、レプリカ | Redis Cluster 3+ノード |

| Materialization周期 | ビジネス要件対比の鮮度 | SLAに合った周期設定 |

| TTL設定 | Materialization失敗時の影響 | Materialization周期の2〜3倍 |

| バックアップ戦略 | Online/Offline Storeバックアップ | 日次スナップショット |

| モニタリング | フィーチャードリフト、レイテンシ | Prometheus + Grafana |

| アラート | Materialization失敗、ドリフト | PagerDuty/Slack連携 |

| セキュリティ | 認証/認可、ネットワーク | IAM、VPC、TLS |

まとめ

Feature StoreはMLモデルのプロダクション運用においてフィーチャー管理の複雑さを解決する核心インフラです。Feastはオープンソースの柔軟性とモジュラーアーキテクチャを通じて、既存インフラに自然に統合できる選択肢を提供します。

核心ポイントは以下の通りです。

- **Online/Offline Storeの分離**: リアルタイムサービングとバッチ学習の要件をそれぞれ最適化する

- **Point-in-Time Correctness**: Feature Storeのタイムトラベルクエリでデータ漏洩を根本的に防止する

- **Training-Serving Skew防止**: 単一のフィーチャー定義から学習とサービングの両方をサポートして一貫性を保証する

- **Materialization自動化**: Airflowなどと連携してフィーチャー更新パイプラインを安定的に運用する

- **Feature Monitoring**: ドリフト検知とフィーチャー品質モニタリングでモデル性能を継続的に維持する

Feature Storeの導入は単一モデルではなく、組織全体のML成熟度を高める投資です。小規模から始めて1つのプロダクションモデルで検証した後、段階的に拡大するアプローチを推奨します。

参考資料

- [Feast Official Documentation](https://docs.feast.dev)

- [Feast Architecture Overview](https://docs.feast.dev/getting-started/architecture/overview)

- [Feature Store Architecture and Storage - DragonflyDB](https://www.dragonflydb.io/blog/feature-store-architecture-and-storage)

- [A Comparative Analysis: Feast vs Tecton vs Hopsworks - Uplatz](https://uplatz.com/blog/a-comparative-analysis-of-modern-feature-stores-feast-vs-tecton-vs-hopsworks/)

- [Feature Store 101: Build, Serve, and Scale ML Features - Aerospike](https://aerospike.com/blog/feature-store/)

- [What is a Feature Store? - Databricks](https://www.databricks.com/blog/what-feature-store-complete-guide-ml-feature-engineering)

- [Solving Training-Serving Skew with Feast - Medium](https://medium.com/@scoopnisker/solving-the-training-serving-skew-problem-with-feast-feature-store-3719b47e23a2)

현재 단락 (1/411)

機械学習モデルのプロダクションデプロイが一般化するにつれ、フィーチャー(Feature)管理がMLOpsの核心課題として浮上しています。モデル学習に使用したフィーチャーをリアルタイムサービングでも同一...

작성 글자: 0원문 글자: 16,127작성 단락: 0/411