- Authors
- Name
- 概要
- Feature Storeが必要な理由
- Feastのインストールとプロジェクト初期化
- フィーチャー定義
- サンプルデータの生成
- Feastワークフロー
- Feature Serviceでフィーチャーグループを管理
- Push Sourceによるリアルタイムフィーチャー更新
- Feature Serverのデプロイ
- Airflowとの連携(自動Materialize)
- まとめ
- クイズ

概要
MLモデルをプロダクションにデプロイする際、最もよくある問題の一つがTraining-Serving Skewです。学習時に使用したフィーチャーとサービング時に使用するフィーチャーが異なることで、モデルの性能が低下する現象です。Feature Storeはこの問題を根本的に解決するインフラコンポーネントで、フィーチャーの定義・保存・サービングを一元管理します。
Feast(Feature Store)は最も広く使われているオープンソースのフィーチャーストアで、オフライン(バッチ学習)とオンライン(リアルタイムサービング)の両方のパスをサポートします。本記事では、Feastを活用してフィーチャーパイプラインを構築する全工程を解説します。
Feature Storeが必要な理由
Training-Serving Skew問題
# 学習時(オフライン)
features = pd.read_sql("""
SELECT user_id,
AVG(purchase_amount) as avg_purchase,
COUNT(*) as purchase_count
FROM transactions
WHERE timestamp < '2026-01-01'
GROUP BY user_id
""", conn)
# サービング時(オンライン)- 異なるロジックで計算するとSkewが発生!
features = redis_client.get(f"user:{user_id}:features")
学習とサービングで同じフィーチャーを異なるコードで計算すると微妙な差異が生じ、モデルの性能がオフライン実験と異なってきます。Feature Storeは単一のフィーチャー定義からオフライン/オンライン両方に一貫した値を提供します。
Feature Storeの主要機能
| 機能 | 説明 |
|---|---|
| フィーチャーレジストリ | フィーチャーのメタデータ、スキーマ、オーナー管理 |
| オフラインストア | バッチ学習用の大量フィーチャー取得(Point-in-Time Join) |
| オンラインストア | リアルタイムサービング用の低レイテンシフィーチャー取得 |
| フィーチャーサービス | gRPC/HTTP APIでフィーチャーを提供 |
| Point-in-Time Join | タイムスタンプ基準で正確なフィーチャー値をジョイン |
Feastのインストールとプロジェクト初期化
インストール
# 基本インストール
pip install feast
# PostgreSQLオンラインストア使用時
pip install feast[postgres]
# Redisオンラインストア使用時
pip install feast[redis]
# 全依存関係
pip install feast[postgres,redis,aws,gcp]
プロジェクト初期化
# プロジェクト作成
feast init my_feature_store
cd my_feature_store
# ディレクトリ構成
# my_feature_store/
# ├── feature_repo/
# │ ├── feature_store.yaml # Feast設定
# │ ├── example_repo.py # フィーチャー定義の例
# │ └── data/ # サンプルデータ
# └── README.md
feature_store.yaml設定
project: my_feature_store
registry: data/registry.db
provider: local
online_store:
type: sqlite
path: data/online_store.db
offline_store:
type: file
entity_key_serialization_version: 2
プロダクション環境では以下のように変更します:
project: my_feature_store
registry:
registry_type: sql
path: postgresql://user:pass@host:5432/feast_registry
provider: local
online_store:
type: redis
connection_string: redis://localhost:6379
offline_store:
type: file # またはbigquery, redshift, snowflake
フィーチャー定義
データソースとエンティティの定義
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, PushSource
from feast.types import Float32, Int64, String
# データソース定義
user_transactions_source = FileSource(
path="data/user_transactions.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
# エンティティ定義(フィーチャーの基準となるキー)
user = Entity(
name="user_id",
join_keys=["user_id"],
description="ユーザー固有ID",
)
Feature Viewの定義
# オフライン + オンラインフィーチャービュー
user_transaction_features = FeatureView(
name="user_transaction_features",
entities=[user],
ttl=timedelta(days=7), # オンラインストアで7日後に期限切れ
schema=[
Field(name="total_purchases", dtype=Int64, description="購入総回数"),
Field(name="avg_purchase_amount", dtype=Float32, description="平均購入金額"),
Field(name="last_purchase_amount", dtype=Float32, description="直近の購入金額"),
Field(name="purchase_frequency", dtype=Float32, description="購入頻度(件/日)"),
Field(name="user_segment", dtype=String, description="ユーザーセグメント"),
],
online=True,
source=user_transactions_source,
tags={"team": "ml-platform", "version": "v1"},
)
On-Demand Feature View(リアルタイム変換)
from feast import on_demand_feature_view, RequestSource
# リクエスト時に動的に計算されるフィーチャー
input_request = RequestSource(
name="purchase_request",
schema=[
Field(name="current_amount", dtype=Float32),
],
)
@on_demand_feature_view(
sources=[user_transaction_features, input_request],
schema=[
Field(name="amount_vs_avg_ratio", dtype=Float32),
Field(name="is_high_value", dtype=Int64),
],
)
def purchase_analysis(inputs: dict) -> dict:
"""現在の購入金額と平均購入金額の比率を計算"""
import pandas as pd
df = pd.DataFrame(inputs)
df["amount_vs_avg_ratio"] = df["current_amount"] / (df["avg_purchase_amount"] + 1e-6)
df["is_high_value"] = (df["amount_vs_avg_ratio"] > 2.0).astype(int)
return df[["amount_vs_avg_ratio", "is_high_value"]]
サンプルデータの生成
# scripts/generate_data.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
n_users = 1000
n_records = 5000
user_ids = [f"user_{i:04d}" for i in range(n_users)]
records = []
for _ in range(n_records):
user_id = np.random.choice(user_ids)
ts = datetime(2026, 1, 1) + timedelta(
days=np.random.randint(0, 60),
hours=np.random.randint(0, 24),
)
records.append({
"user_id": user_id,
"total_purchases": np.random.randint(1, 100),
"avg_purchase_amount": round(np.random.uniform(10, 500), 2),
"last_purchase_amount": round(np.random.uniform(5, 1000), 2),
"purchase_frequency": round(np.random.uniform(0.1, 5.0), 3),
"user_segment": np.random.choice(["bronze", "silver", "gold", "platinum"]),
"event_timestamp": ts,
"created_timestamp": ts,
})
df = pd.DataFrame(records)
df.to_parquet("feature_repo/data/user_transactions.parquet", index=False)
print(f"Generated {len(df)} records for {n_users} users")
python scripts/generate_data.py
# Generated 5000 records for 1000 users
Feastワークフロー
1. Apply — フィーチャー定義の登録
cd feature_repo
feast apply
Created entity user_id
Created feature view user_transaction_features
Created on demand feature view purchase_analysis
Deploying infrastructure for my_feature_store...
2. Materialize — オフラインからオンラインストアへの同期
# 特定期間のデータをオンラインストアにロード
feast materialize 2026-01-01T00:00:00 2026-03-01T00:00:00
# 増分ロード(前回のmaterializeから現在まで)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
Materializing 1 feature views from 2026-01-01 to 2026-03-01
user_transaction_features:
100%|████████████████████████| 1000/1000 [00:03<00:00, 312.45it/s]
3. オフラインフィーチャー取得(学習用)
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path="feature_repo")
# 学習データ生成用のエンティティDataFrame
entity_df = pd.DataFrame({
"user_id": ["user_0001", "user_0042", "user_0100", "user_0500"],
"event_timestamp": pd.to_datetime([
"2026-02-01", "2026-02-15", "2026-01-20", "2026-02-28"
]),
})
# Point-in-Time Joinでフィーチャーを取得
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_transaction_features:total_purchases",
"user_transaction_features:avg_purchase_amount",
"user_transaction_features:last_purchase_amount",
"user_transaction_features:purchase_frequency",
"user_transaction_features:user_segment",
],
).to_df()
print(training_df.head())
user_id event_timestamp total_purchases avg_purchase_amount ...
0 user_0001 2026-02-01 45 234.56 ...
1 user_0042 2026-02-15 12 89.30 ...
2 user_0100 2026-01-20 78 456.78 ...
3 user_0500 2026-02-28 33 167.42 ...
Point-in-Time Joinが核心です。各エンティティのevent_timestamp時点で最も新しいフィーチャー値を取得します。これにより、データリーケージ(data leakage)なく正確な学習データを構成できます。
4. オンラインフィーチャー取得(サービング用)
# リアルタイムサービングでフィーチャーを取得
online_features = store.get_online_features(
features=[
"user_transaction_features:total_purchases",
"user_transaction_features:avg_purchase_amount",
"user_transaction_features:user_segment",
"purchase_analysis:amount_vs_avg_ratio",
"purchase_analysis:is_high_value",
],
entity_rows=[
{"user_id": "user_0001", "current_amount": 750.0},
{"user_id": "user_0042", "current_amount": 50.0},
],
).to_dict()
print(online_features)
{
"user_id": ["user_0001", "user_0042"],
"total_purchases": [45, 12],
"avg_purchase_amount": [234.56, 89.30],
"user_segment": ["gold", "silver"],
"amount_vs_avg_ratio": [3.199, 0.560],
"is_high_value": [1, 0],
}
Feature Serviceでフィーチャーグループを管理
from feast import FeatureService
# レコメンドモデルに必要なフィーチャーの束
recommendation_service = FeatureService(
name="recommendation_features",
features=[
user_transaction_features[["total_purchases", "avg_purchase_amount", "user_segment"]],
purchase_analysis,
],
tags={"model": "recommendation-v2"},
)
# 不正検知モデルに必要なフィーチャーの束
fraud_detection_service = FeatureService(
name="fraud_detection_features",
features=[
user_transaction_features,
purchase_analysis,
],
tags={"model": "fraud-detection-v1"},
)
# Feature Serviceで取得
features = store.get_online_features(
features=store.get_feature_service("recommendation_features"),
entity_rows=[{"user_id": "user_0001", "current_amount": 750.0}],
).to_dict()
Push Sourceによるリアルタイムフィーチャー更新
from feast import PushSource
# Pushソース定義
user_realtime_source = PushSource(
name="user_realtime_push",
batch_source=user_transactions_source,
)
# リアルタイムイベント発生時にフィーチャーを更新
store.push(
push_source_name="user_realtime_push",
df=pd.DataFrame({
"user_id": ["user_0001"],
"total_purchases": [46],
"avg_purchase_amount": [240.12],
"last_purchase_amount": [750.0],
"purchase_frequency": [2.1],
"user_segment": ["gold"],
"event_timestamp": [pd.Timestamp.now()],
"created_timestamp": [pd.Timestamp.now()],
}),
)
Feature Serverのデプロイ
# ローカルFeature Serverの起動
feast serve -h 0.0.0.0 -p 6566
# HTTP APIでフィーチャーを取得
curl -X POST http://localhost:6566/get-online-features \
-H "Content-Type: application/json" \
-d '{
"features": [
"user_transaction_features:total_purchases",
"user_transaction_features:avg_purchase_amount"
],
"entities": {
"user_id": ["user_0001", "user_0042"]
}
}'
DockerでFeature Serverをデプロイ
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install feast[redis]
COPY feature_repo/ feature_repo/
WORKDIR /app/feature_repo
# レジストリ適用 & サーバー起動
CMD feast apply && feast serve -h 0.0.0.0 -p 6566
# docker-compose.yml
services:
feast-server:
build: .
ports:
- '6566:6566'
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
redis:
image: redis:7-alpine
ports:
- '6379:6379'
Airflowとの連携(自動Materialize)
# dags/feast_materialize.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "ml-platform",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="feast_materialize",
default_args=default_args,
schedule_interval="0 */6 * * *", # 6時間ごと
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
materialize = BashOperator(
task_id="materialize_incremental",
bash_command=(
"cd /opt/feature_repo && "
"feast materialize-incremental $(date -u +'%Y-%m-%dT%H:%M:%S')"
),
)
まとめ
Feastを活用したフィーチャーパイプラインの主要ポイントを整理すると:
- 一貫したフィーチャー定義: 学習とサービングで同一のフィーチャー定義を使用し、Training-Serving Skewを防止
- Point-in-Time Join: タイムスタンプ基準の正確なフィーチャージョインでデータリーケージを防止
- オフライン/オンライン二重化: バッチ学習はオフラインストア、リアルタイムサービングはオンラインストア
- Feature Service: モデルごとのフィーチャーグループ管理で再利用性を向上
- Push Source: リアルタイムイベントベースのフィーチャー更新をサポート
Feature StoreはMLモデルが1〜2個の場合は過剰に感じるかもしれませんが、モデルが増えチームが大きくなると必須のインフラになります。特に複数のモデルが同じフィーチャーを共有する場合にその価値が最大化されます。
クイズ
Q1: Training-Serving Skewとは?
学習時に使用したフィーチャーとサービング時に使用するフィーチャーが異なることで、モデルの性能が低下する現象です。フィーチャー計算ロジックの不一致、データソースの違い、時間基準の不一致などが原因です。
Q2: Point-in-Time Joinの役割は?
各エンティティのイベント時点(event_timestamp)を基準に、その時点以前の最新のフィーチャー値をジョインします。これにより、未来のデータが学習に使用されるデータリーケージ(data leakage)を防止します。
Q3: Feastのオフラインストアとオンラインストアの違いは?
オフラインストアは大量のヒストリカルフィーチャーを保存してバッチ学習に使用され(ファイル、BigQueryなど)、オンラインストアは最新のフィーチャー値のみを保存して低レイテンシのリアルタイムサービングに使用されます(Redis、DynamoDBなど)。
Q4:
オフラインストアのフィーチャーデータをオンラインストアに同期(ロード)する作業です。指定された時間範囲の最新フィーチャー値をオンラインストアに保存し、リアルタイム取得を可能にします。feast materializeコマンドの役割は?
Q5: On-Demand Feature Viewと通常のFeature Viewの違いは?
通常のFeature Viewは事前に計算されたフィーチャーを保存しますが、On-Demand Feature Viewはリクエスト時に動的にフィーチャーを計算します。リクエストパラメータと既存のフィーチャーを組み合わせたリアルタイム変換に使用されます。
Q6: Feature Serviceのメリットは?
モデルごとに必要なフィーチャーを論理的にグループ化して管理できます。どのモデルがどのフィーチャーを使用しているかを明確に追跡でき、フィーチャー取得時に一貫したインターフェースを提供します。
Q7: TTL(Time To Live)設定の意味は?
オンラインストアでフィーチャー値の有効期間を指定します。TTLを過ぎたフィーチャーは取得時にnullが返され、古い(stale)フィーチャー値がサービングに使用されるのを防止します。
Q8: Push Sourceを使用するシナリオは?
リアルタイムイベント(決済、クリックなど)が発生した際に、即座にオンラインストアのフィーチャーを更新する必要がある場合に使用します。バッチmaterializeの定期的な更新の間に最新状態を維持できます。