Skip to content
Published on

MLOps Feature Store 実践 — Feastでフィーチャーパイプラインを構築する

Authors
  • Name
    Twitter
MLOps Feature Store - Feast

概要

MLモデルをプロダクションにデプロイする際、最もよくある問題の一つがTraining-Serving Skewです。学習時に使用したフィーチャーとサービング時に使用するフィーチャーが異なることで、モデルの性能が低下する現象です。Feature Storeはこの問題を根本的に解決するインフラコンポーネントで、フィーチャーの定義・保存・サービングを一元管理します。

Feast(Feature Store)は最も広く使われているオープンソースのフィーチャーストアで、オフライン(バッチ学習)とオンライン(リアルタイムサービング)の両方のパスをサポートします。本記事では、Feastを活用してフィーチャーパイプラインを構築する全工程を解説します。

Feature Storeが必要な理由

Training-Serving Skew問題

# 学習時(オフライン)
features = pd.read_sql("""
    SELECT user_id,
           AVG(purchase_amount) as avg_purchase,
           COUNT(*) as purchase_count
    FROM transactions
    WHERE timestamp < '2026-01-01'
    GROUP BY user_id
""", conn)

# サービング時(オンライン)- 異なるロジックで計算するとSkewが発生!
features = redis_client.get(f"user:{user_id}:features")

学習とサービングで同じフィーチャーを異なるコードで計算すると微妙な差異が生じ、モデルの性能がオフライン実験と異なってきます。Feature Storeは単一のフィーチャー定義からオフライン/オンライン両方に一貫した値を提供します。

Feature Storeの主要機能

機能説明
フィーチャーレジストリフィーチャーのメタデータ、スキーマ、オーナー管理
オフラインストアバッチ学習用の大量フィーチャー取得(Point-in-Time Join)
オンラインストアリアルタイムサービング用の低レイテンシフィーチャー取得
フィーチャーサービスgRPC/HTTP APIでフィーチャーを提供
Point-in-Time Joinタイムスタンプ基準で正確なフィーチャー値をジョイン

Feastのインストールとプロジェクト初期化

インストール

# 基本インストール
pip install feast

# PostgreSQLオンラインストア使用時
pip install feast[postgres]

# Redisオンラインストア使用時
pip install feast[redis]

# 全依存関係
pip install feast[postgres,redis,aws,gcp]

プロジェクト初期化

# プロジェクト作成
feast init my_feature_store
cd my_feature_store

# ディレクトリ構成
# my_feature_store/
# ├── feature_repo/
# │   ├── feature_store.yaml    # Feast設定
# │   ├── example_repo.py       # フィーチャー定義の例
# │   └── data/                 # サンプルデータ
# └── README.md

feature_store.yaml設定

project: my_feature_store
registry: data/registry.db
provider: local

online_store:
  type: sqlite
  path: data/online_store.db

offline_store:
  type: file

entity_key_serialization_version: 2

プロダクション環境では以下のように変更します:

project: my_feature_store
registry:
  registry_type: sql
  path: postgresql://user:pass@host:5432/feast_registry

provider: local

online_store:
  type: redis
  connection_string: redis://localhost:6379

offline_store:
  type: file # またはbigquery, redshift, snowflake

フィーチャー定義

データソースとエンティティの定義

# feature_repo/features.py
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, PushSource
from feast.types import Float32, Int64, String

# データソース定義
user_transactions_source = FileSource(
    path="data/user_transactions.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# エンティティ定義(フィーチャーの基準となるキー)
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="ユーザー固有ID",
)

Feature Viewの定義

# オフライン + オンラインフィーチャービュー
user_transaction_features = FeatureView(
    name="user_transaction_features",
    entities=[user],
    ttl=timedelta(days=7),  # オンラインストアで7日後に期限切れ
    schema=[
        Field(name="total_purchases", dtype=Int64, description="購入総回数"),
        Field(name="avg_purchase_amount", dtype=Float32, description="平均購入金額"),
        Field(name="last_purchase_amount", dtype=Float32, description="直近の購入金額"),
        Field(name="purchase_frequency", dtype=Float32, description="購入頻度(件/日)"),
        Field(name="user_segment", dtype=String, description="ユーザーセグメント"),
    ],
    online=True,
    source=user_transactions_source,
    tags={"team": "ml-platform", "version": "v1"},
)

On-Demand Feature View(リアルタイム変換)

from feast import on_demand_feature_view, RequestSource

# リクエスト時に動的に計算されるフィーチャー
input_request = RequestSource(
    name="purchase_request",
    schema=[
        Field(name="current_amount", dtype=Float32),
    ],
)

@on_demand_feature_view(
    sources=[user_transaction_features, input_request],
    schema=[
        Field(name="amount_vs_avg_ratio", dtype=Float32),
        Field(name="is_high_value", dtype=Int64),
    ],
)
def purchase_analysis(inputs: dict) -> dict:
    """現在の購入金額と平均購入金額の比率を計算"""
    import pandas as pd
    df = pd.DataFrame(inputs)
    df["amount_vs_avg_ratio"] = df["current_amount"] / (df["avg_purchase_amount"] + 1e-6)
    df["is_high_value"] = (df["amount_vs_avg_ratio"] > 2.0).astype(int)
    return df[["amount_vs_avg_ratio", "is_high_value"]]

サンプルデータの生成

# scripts/generate_data.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

np.random.seed(42)
n_users = 1000
n_records = 5000

user_ids = [f"user_{i:04d}" for i in range(n_users)]
records = []

for _ in range(n_records):
    user_id = np.random.choice(user_ids)
    ts = datetime(2026, 1, 1) + timedelta(
        days=np.random.randint(0, 60),
        hours=np.random.randint(0, 24),
    )
    records.append({
        "user_id": user_id,
        "total_purchases": np.random.randint(1, 100),
        "avg_purchase_amount": round(np.random.uniform(10, 500), 2),
        "last_purchase_amount": round(np.random.uniform(5, 1000), 2),
        "purchase_frequency": round(np.random.uniform(0.1, 5.0), 3),
        "user_segment": np.random.choice(["bronze", "silver", "gold", "platinum"]),
        "event_timestamp": ts,
        "created_timestamp": ts,
    })

df = pd.DataFrame(records)
df.to_parquet("feature_repo/data/user_transactions.parquet", index=False)
print(f"Generated {len(df)} records for {n_users} users")
python scripts/generate_data.py
# Generated 5000 records for 1000 users

Feastワークフロー

1. Apply — フィーチャー定義の登録

cd feature_repo
feast apply
Created entity user_id
Created feature view user_transaction_features
Created on demand feature view purchase_analysis

Deploying infrastructure for my_feature_store...

2. Materialize — オフラインからオンラインストアへの同期

# 特定期間のデータをオンラインストアにロード
feast materialize 2026-01-01T00:00:00 2026-03-01T00:00:00

# 増分ロード(前回のmaterializeから現在まで)
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
Materializing 1 feature views from 2026-01-01 to 2026-03-01
user_transaction_features:
100%|████████████████████████| 1000/1000 [00:03<00:00, 312.45it/s]

3. オフラインフィーチャー取得(学習用)

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo")

# 学習データ生成用のエンティティDataFrame
entity_df = pd.DataFrame({
    "user_id": ["user_0001", "user_0042", "user_0100", "user_0500"],
    "event_timestamp": pd.to_datetime([
        "2026-02-01", "2026-02-15", "2026-01-20", "2026-02-28"
    ]),
})

# Point-in-Time Joinでフィーチャーを取得
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_transaction_features:total_purchases",
        "user_transaction_features:avg_purchase_amount",
        "user_transaction_features:last_purchase_amount",
        "user_transaction_features:purchase_frequency",
        "user_transaction_features:user_segment",
    ],
).to_df()

print(training_df.head())
    user_id  event_timestamp  total_purchases  avg_purchase_amount  ...
0  user_0001  2026-02-01            45           234.56             ...
1  user_0042  2026-02-15            12            89.30             ...
2  user_0100  2026-01-20            78           456.78             ...
3  user_0500  2026-02-28            33           167.42             ...

Point-in-Time Joinが核心です。各エンティティのevent_timestamp時点で最も新しいフィーチャー値を取得します。これにより、データリーケージ(data leakage)なく正確な学習データを構成できます。

4. オンラインフィーチャー取得(サービング用)

# リアルタイムサービングでフィーチャーを取得
online_features = store.get_online_features(
    features=[
        "user_transaction_features:total_purchases",
        "user_transaction_features:avg_purchase_amount",
        "user_transaction_features:user_segment",
        "purchase_analysis:amount_vs_avg_ratio",
        "purchase_analysis:is_high_value",
    ],
    entity_rows=[
        {"user_id": "user_0001", "current_amount": 750.0},
        {"user_id": "user_0042", "current_amount": 50.0},
    ],
).to_dict()

print(online_features)
{
    "user_id": ["user_0001", "user_0042"],
    "total_purchases": [45, 12],
    "avg_purchase_amount": [234.56, 89.30],
    "user_segment": ["gold", "silver"],
    "amount_vs_avg_ratio": [3.199, 0.560],
    "is_high_value": [1, 0],
}

Feature Serviceでフィーチャーグループを管理

from feast import FeatureService

# レコメンドモデルに必要なフィーチャーの束
recommendation_service = FeatureService(
    name="recommendation_features",
    features=[
        user_transaction_features[["total_purchases", "avg_purchase_amount", "user_segment"]],
        purchase_analysis,
    ],
    tags={"model": "recommendation-v2"},
)

# 不正検知モデルに必要なフィーチャーの束
fraud_detection_service = FeatureService(
    name="fraud_detection_features",
    features=[
        user_transaction_features,
        purchase_analysis,
    ],
    tags={"model": "fraud-detection-v1"},
)
# Feature Serviceで取得
features = store.get_online_features(
    features=store.get_feature_service("recommendation_features"),
    entity_rows=[{"user_id": "user_0001", "current_amount": 750.0}],
).to_dict()

Push Sourceによるリアルタイムフィーチャー更新

from feast import PushSource

# Pushソース定義
user_realtime_source = PushSource(
    name="user_realtime_push",
    batch_source=user_transactions_source,
)

# リアルタイムイベント発生時にフィーチャーを更新
store.push(
    push_source_name="user_realtime_push",
    df=pd.DataFrame({
        "user_id": ["user_0001"],
        "total_purchases": [46],
        "avg_purchase_amount": [240.12],
        "last_purchase_amount": [750.0],
        "purchase_frequency": [2.1],
        "user_segment": ["gold"],
        "event_timestamp": [pd.Timestamp.now()],
        "created_timestamp": [pd.Timestamp.now()],
    }),
)

Feature Serverのデプロイ

# ローカルFeature Serverの起動
feast serve -h 0.0.0.0 -p 6566

# HTTP APIでフィーチャーを取得
curl -X POST http://localhost:6566/get-online-features \
  -H "Content-Type: application/json" \
  -d '{
    "features": [
      "user_transaction_features:total_purchases",
      "user_transaction_features:avg_purchase_amount"
    ],
    "entities": {
      "user_id": ["user_0001", "user_0042"]
    }
  }'

DockerでFeature Serverをデプロイ

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install feast[redis]

COPY feature_repo/ feature_repo/
WORKDIR /app/feature_repo

# レジストリ適用 & サーバー起動
CMD feast apply && feast serve -h 0.0.0.0 -p 6566
# docker-compose.yml
services:
  feast-server:
    build: .
    ports:
      - '6566:6566'
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

Airflowとの連携(自動Materialize)

# dags/feast_materialize.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-platform",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="feast_materialize",
    default_args=default_args,
    schedule_interval="0 */6 * * *",  # 6時間ごと
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    materialize = BashOperator(
        task_id="materialize_incremental",
        bash_command=(
            "cd /opt/feature_repo && "
            "feast materialize-incremental $(date -u +'%Y-%m-%dT%H:%M:%S')"
        ),
    )

まとめ

Feastを活用したフィーチャーパイプラインの主要ポイントを整理すると:

  • 一貫したフィーチャー定義: 学習とサービングで同一のフィーチャー定義を使用し、Training-Serving Skewを防止
  • Point-in-Time Join: タイムスタンプ基準の正確なフィーチャージョインでデータリーケージを防止
  • オフライン/オンライン二重化: バッチ学習はオフラインストア、リアルタイムサービングはオンラインストア
  • Feature Service: モデルごとのフィーチャーグループ管理で再利用性を向上
  • Push Source: リアルタイムイベントベースのフィーチャー更新をサポート

Feature StoreはMLモデルが1〜2個の場合は過剰に感じるかもしれませんが、モデルが増えチームが大きくなると必須のインフラになります。特に複数のモデルが同じフィーチャーを共有する場合にその価値が最大化されます。

クイズ

Q1: Training-Serving Skewとは? 学習時に使用したフィーチャーとサービング時に使用するフィーチャーが異なることで、モデルの性能が低下する現象です。フィーチャー計算ロジックの不一致、データソースの違い、時間基準の不一致などが原因です。

Q2: Point-in-Time Joinの役割は? 各エンティティのイベント時点(event_timestamp)を基準に、その時点以前の最新のフィーチャー値をジョインします。これにより、未来のデータが学習に使用されるデータリーケージ(data leakage)を防止します。

Q3: Feastのオフラインストアとオンラインストアの違いは? オフラインストアは大量のヒストリカルフィーチャーを保存してバッチ学習に使用され(ファイル、BigQueryなど)、オンラインストアは最新のフィーチャー値のみを保存して低レイテンシのリアルタイムサービングに使用されます(Redis、DynamoDBなど)。

Q4: feast materializeコマンドの役割は? オフラインストアのフィーチャーデータをオンラインストアに同期(ロード)する作業です。指定された時間範囲の最新フィーチャー値をオンラインストアに保存し、リアルタイム取得を可能にします。

Q5: On-Demand Feature Viewと通常のFeature Viewの違いは? 通常のFeature Viewは事前に計算されたフィーチャーを保存しますが、On-Demand Feature Viewはリクエスト時に動的にフィーチャーを計算します。リクエストパラメータと既存のフィーチャーを組み合わせたリアルタイム変換に使用されます。

Q6: Feature Serviceのメリットは? モデルごとに必要なフィーチャーを論理的にグループ化して管理できます。どのモデルがどのフィーチャーを使用しているかを明確に追跡でき、フィーチャー取得時に一貫したインターフェースを提供します。

Q7: TTL(Time To Live)設定の意味は? オンラインストアでフィーチャー値の有効期間を指定します。TTLを過ぎたフィーチャーは取得時にnullが返され、古い(stale)フィーチャー値がサービングに使用されるのを防止します。

Q8: Push Sourceを使用するシナリオは? リアルタイムイベント(決済、クリックなど)が発生した際に、即座にオンラインストアのフィーチャーを更新する必要がある場合に使用します。バッチmaterializeの定期的な更新の間に最新状態を維持できます。