Skip to content

필사 모드: MLOps Feature Store 実践 — Feastでフィーチャーパイプラインを構築する

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

概要

MLモデルをプロダクションにデプロイする際、最もよくある問題の一つが**Training-Serving Skew**です。学習時に使用したフィーチャーとサービング時に使用するフィーチャーが異なることで、モデルの性能が低下する現象です。**Feature Store**はこの問題を根本的に解決するインフラコンポーネントで、フィーチャーの定義・保存・サービングを一元管理します。

**Feast**(Feature Store)は最も広く使われているオープンソースのフィーチャーストアで、オフライン(バッチ学習)とオンライン(リアルタイムサービング)の両方のパスをサポートします。本記事では、Feastを活用してフィーチャーパイプラインを構築する全工程を解説します。

Feature Storeが必要な理由

Training-Serving Skew問題

学習時(オフライン)

features = pd.read_sql("""

SELECT user_id,

AVG(purchase_amount) as avg_purchase,

COUNT(*) as purchase_count

FROM transactions

WHERE timestamp < '2026-01-01'

GROUP BY user_id

""", conn)

サービング時(オンライン)- 異なるロジックで計算するとSkewが発生!

features = redis_client.get(f"user:{user_id}:features")

学習とサービングで同じフィーチャーを異なるコードで計算すると微妙な差異が生じ、モデルの性能がオフライン実験と異なってきます。Feature Storeは**単一のフィーチャー定義**からオフライン/オンライン両方に一貫した値を提供します。

Feature Storeの主要機能

| 機能 | 説明 |

| -------------------------- | -------------------------------------------------------- |

| **フィーチャーレジストリ** | フィーチャーのメタデータ、スキーマ、オーナー管理 |

| **オフラインストア** | バッチ学習用の大量フィーチャー取得(Point-in-Time Join) |

| **オンラインストア** | リアルタイムサービング用の低レイテンシフィーチャー取得 |

| **フィーチャーサービス** | gRPC/HTTP APIでフィーチャーを提供 |

| **Point-in-Time Join** | タイムスタンプ基準で正確なフィーチャー値をジョイン |

Feastのインストールとプロジェクト初期化

インストール

基本インストール

pip install feast

PostgreSQLオンラインストア使用時

pip install feast[postgres]

Redisオンラインストア使用時

pip install feast[redis]

全依存関係

pip install feast[postgres,redis,aws,gcp]

プロジェクト初期化

プロジェクト作成

feast init my_feature_store

cd my_feature_store

ディレクトリ構成

my_feature_store/

├── feature_repo/

│ ├── feature_store.yaml # Feast設定

│ ├── example_repo.py # フィーチャー定義の例

│ └── data/ # サンプルデータ

└── README.md

feature_store.yaml設定

project: my_feature_store

registry: data/registry.db

provider: local

online_store:

type: sqlite

path: data/online_store.db

offline_store:

type: file

entity_key_serialization_version: 2

プロダクション環境では以下のように変更します:

project: my_feature_store

registry:

registry_type: sql

path: postgresql://user:pass@host:5432/feast_registry

provider: local

online_store:

type: redis

connection_string: redis://localhost:6379

offline_store:

type: file # またはbigquery, redshift, snowflake

フィーチャー定義

データソースとエンティティの定義

feature_repo/features.py

from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource, PushSource

from feast.types import Float32, Int64, String

データソース定義

user_transactions_source = FileSource(

path="data/user_transactions.parquet",

timestamp_field="event_timestamp",

created_timestamp_column="created_timestamp",

)

エンティティ定義(フィーチャーの基準となるキー)

user = Entity(

name="user_id",

join_keys=["user_id"],

description="ユーザー固有ID",

)

Feature Viewの定義

オフライン + オンラインフィーチャービュー

user_transaction_features = FeatureView(

name="user_transaction_features",

entities=[user],

ttl=timedelta(days=7), # オンラインストアで7日後に期限切れ

schema=[

Field(name="total_purchases", dtype=Int64, description="購入総回数"),

Field(name="avg_purchase_amount", dtype=Float32, description="平均購入金額"),

Field(name="last_purchase_amount", dtype=Float32, description="直近の購入金額"),

Field(name="purchase_frequency", dtype=Float32, description="購入頻度(件/日)"),

Field(name="user_segment", dtype=String, description="ユーザーセグメント"),

],

online=True,

source=user_transactions_source,

tags={"team": "ml-platform", "version": "v1"},

)

On-Demand Feature View(リアルタイム変換)

from feast import on_demand_feature_view, RequestSource

リクエスト時に動的に計算されるフィーチャー

input_request = RequestSource(

name="purchase_request",

schema=[

Field(name="current_amount", dtype=Float32),

],

)

@on_demand_feature_view(

sources=[user_transaction_features, input_request],

schema=[

Field(name="amount_vs_avg_ratio", dtype=Float32),

Field(name="is_high_value", dtype=Int64),

],

)

def purchase_analysis(inputs: dict) -> dict:

"""現在の購入金額と平均購入金額の比率を計算"""

df = pd.DataFrame(inputs)

df["amount_vs_avg_ratio"] = df["current_amount"] / (df["avg_purchase_amount"] + 1e-6)

df["is_high_value"] = (df["amount_vs_avg_ratio"] > 2.0).astype(int)

return df[["amount_vs_avg_ratio", "is_high_value"]]

サンプルデータの生成

scripts/generate_data.py

from datetime import datetime, timedelta

np.random.seed(42)

n_users = 1000

n_records = 5000

user_ids = [f"user_{i:04d}" for i in range(n_users)]

records = []

for _ in range(n_records):

user_id = np.random.choice(user_ids)

ts = datetime(2026, 1, 1) + timedelta(

days=np.random.randint(0, 60),

hours=np.random.randint(0, 24),

)

records.append({

"user_id": user_id,

"total_purchases": np.random.randint(1, 100),

"avg_purchase_amount": round(np.random.uniform(10, 500), 2),

"last_purchase_amount": round(np.random.uniform(5, 1000), 2),

"purchase_frequency": round(np.random.uniform(0.1, 5.0), 3),

"user_segment": np.random.choice(["bronze", "silver", "gold", "platinum"]),

"event_timestamp": ts,

"created_timestamp": ts,

})

df = pd.DataFrame(records)

df.to_parquet("feature_repo/data/user_transactions.parquet", index=False)

print(f"Generated {len(df)} records for {n_users} users")

python scripts/generate_data.py

Generated 5000 records for 1000 users

Feastワークフロー

1. Apply — フィーチャー定義の登録

cd feature_repo

feast apply

Created entity user_id

Created feature view user_transaction_features

Created on demand feature view purchase_analysis

Deploying infrastructure for my_feature_store...

2. Materialize — オフラインからオンラインストアへの同期

特定期間のデータをオンラインストアにロード

feast materialize 2026-01-01T00:00:00 2026-03-01T00:00:00

増分ロード(前回のmaterializeから現在まで)

feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

Materializing 1 feature views from 2026-01-01 to 2026-03-01

user_transaction_features:

100%|████████████████████████| 1000/1000 [00:03<00:00, 312.45it/s]

3. オフラインフィーチャー取得(学習用)

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo")

学習データ生成用のエンティティDataFrame

entity_df = pd.DataFrame({

"user_id": ["user_0001", "user_0042", "user_0100", "user_0500"],

"event_timestamp": pd.to_datetime([

"2026-02-01", "2026-02-15", "2026-01-20", "2026-02-28"

]),

})

Point-in-Time Joinでフィーチャーを取得

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"user_transaction_features:total_purchases",

"user_transaction_features:avg_purchase_amount",

"user_transaction_features:last_purchase_amount",

"user_transaction_features:purchase_frequency",

"user_transaction_features:user_segment",

],

).to_df()

print(training_df.head())

user_id event_timestamp total_purchases avg_purchase_amount ...

0 user_0001 2026-02-01 45 234.56 ...

1 user_0042 2026-02-15 12 89.30 ...

2 user_0100 2026-01-20 78 456.78 ...

3 user_0500 2026-02-28 33 167.42 ...

**Point-in-Time Join**が核心です。各エンティティの`event_timestamp`時点で最も新しいフィーチャー値を取得します。これにより、データリーケージ(data leakage)なく正確な学習データを構成できます。

4. オンラインフィーチャー取得(サービング用)

リアルタイムサービングでフィーチャーを取得

online_features = store.get_online_features(

features=[

"user_transaction_features:total_purchases",

"user_transaction_features:avg_purchase_amount",

"user_transaction_features:user_segment",

"purchase_analysis:amount_vs_avg_ratio",

"purchase_analysis:is_high_value",

],

entity_rows=[

{"user_id": "user_0001", "current_amount": 750.0},

{"user_id": "user_0042", "current_amount": 50.0},

],

).to_dict()

print(online_features)

{

"user_id": ["user_0001", "user_0042"],

"total_purchases": [45, 12],

"avg_purchase_amount": [234.56, 89.30],

"user_segment": ["gold", "silver"],

"amount_vs_avg_ratio": [3.199, 0.560],

"is_high_value": [1, 0],

}

Feature Serviceでフィーチャーグループを管理

from feast import FeatureService

レコメンドモデルに必要なフィーチャーの束

recommendation_service = FeatureService(

name="recommendation_features",

features=[

user_transaction_features[["total_purchases", "avg_purchase_amount", "user_segment"]],

purchase_analysis,

],

tags={"model": "recommendation-v2"},

)

不正検知モデルに必要なフィーチャーの束

fraud_detection_service = FeatureService(

name="fraud_detection_features",

features=[

user_transaction_features,

purchase_analysis,

],

tags={"model": "fraud-detection-v1"},

)

Feature Serviceで取得

features = store.get_online_features(

features=store.get_feature_service("recommendation_features"),

entity_rows=[{"user_id": "user_0001", "current_amount": 750.0}],

).to_dict()

Push Sourceによるリアルタイムフィーチャー更新

from feast import PushSource

Pushソース定義

user_realtime_source = PushSource(

name="user_realtime_push",

batch_source=user_transactions_source,

)

リアルタイムイベント発生時にフィーチャーを更新

store.push(

push_source_name="user_realtime_push",

df=pd.DataFrame({

"user_id": ["user_0001"],

"total_purchases": [46],

"avg_purchase_amount": [240.12],

"last_purchase_amount": [750.0],

"purchase_frequency": [2.1],

"user_segment": ["gold"],

"event_timestamp": [pd.Timestamp.now()],

"created_timestamp": [pd.Timestamp.now()],

}),

)

Feature Serverのデプロイ

ローカルFeature Serverの起動

feast serve -h 0.0.0.0 -p 6566

HTTP APIでフィーチャーを取得

curl -X POST http://localhost:6566/get-online-features \

-H "Content-Type: application/json" \

-d '{

"features": [

"user_transaction_features:total_purchases",

"user_transaction_features:avg_purchase_amount"

],

"entities": {

"user_id": ["user_0001", "user_0042"]

}

}'

DockerでFeature Serverをデプロイ

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install feast[redis]

COPY feature_repo/ feature_repo/

WORKDIR /app/feature_repo

レジストリ適用 & サーバー起動

CMD feast apply && feast serve -h 0.0.0.0 -p 6566

docker-compose.yml

services:

feast-server:

build: .

ports:

- '6566:6566'

depends_on:

- redis

environment:

- REDIS_URL=redis://redis:6379

redis:

image: redis:7-alpine

ports:

- '6379:6379'

Airflowとの連携(自動Materialize)

dags/feast_materialize.py

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {

"owner": "ml-platform",

"retries": 2,

"retry_delay": timedelta(minutes=5),

}

with DAG(

dag_id="feast_materialize",

default_args=default_args,

schedule_interval="0 */6 * * *", # 6時間ごと

start_date=datetime(2026, 1, 1),

catchup=False,

) as dag:

materialize = BashOperator(

task_id="materialize_incremental",

bash_command=(

"cd /opt/feature_repo && "

"feast materialize-incremental $(date -u +'%Y-%m-%dT%H:%M:%S')"

),

)

まとめ

Feastを活用したフィーチャーパイプラインの主要ポイントを整理すると:

- **一貫したフィーチャー定義**: 学習とサービングで同一のフィーチャー定義を使用し、Training-Serving Skewを防止

- **Point-in-Time Join**: タイムスタンプ基準の正確なフィーチャージョインでデータリーケージを防止

- **オフライン/オンライン二重化**: バッチ学習はオフラインストア、リアルタイムサービングはオンラインストア

- **Feature Service**: モデルごとのフィーチャーグループ管理で再利用性を向上

- **Push Source**: リアルタイムイベントベースのフィーチャー更新をサポート

Feature StoreはMLモデルが1〜2個の場合は過剰に感じるかもしれませんが、モデルが増えチームが大きくなると必須のインフラになります。特に複数のモデルが同じフィーチャーを共有する場合にその価値が最大化されます。

クイズ

学習時に使用したフィーチャーとサービング時に使用するフィーチャーが異なることで、モデルの性能が低下する現象です。フィーチャー計算ロジックの不一致、データソースの違い、時間基準の不一致などが原因です。

各エンティティのイベント時点(event_timestamp)を基準に、その時点以前の最新のフィーチャー値をジョインします。これにより、未来のデータが学習に使用されるデータリーケージ(data

leakage)を防止します。

オフラインストアは大量のヒストリカルフィーチャーを保存してバッチ学習に使用され(ファイル、BigQueryなど)、オンラインストアは最新のフィーチャー値のみを保存して低レイテンシのリアルタイムサービングに使用されます(Redis、DynamoDBなど)。

オフラインストアのフィーチャーデータをオンラインストアに同期(ロード)する作業です。指定された時間範囲の最新フィーチャー値をオンラインストアに保存し、リアルタイム取得を可能にします。

通常のFeature Viewは事前に計算されたフィーチャーを保存しますが、On-Demand Feature

Viewはリクエスト時に動的にフィーチャーを計算します。リクエストパラメータと既存のフィーチャーを組み合わせたリアルタイム変換に使用されます。

モデルごとに必要なフィーチャーを論理的にグループ化して管理できます。どのモデルがどのフィーチャーを使用しているかを明確に追跡でき、フィーチャー取得時に一貫したインターフェースを提供します。

オンラインストアでフィーチャー値の有効期間を指定します。TTLを過ぎたフィーチャーは取得時にnullが返され、古い(stale)フィーチャー値がサービングに使用されるのを防止します。

リアルタイムイベント(決済、クリックなど)が発生した際に、即座にオンラインストアのフィーチャーを更新する必要がある場合に使用します。バッチmaterializeの定期的な更新の間に最新状態を維持できます。

현재 단락 (1/287)

MLモデルをプロダクションにデプロイする際、最もよくある問題の一つが**Training-Serving Skew**です。学習時に使用したフィーチャーとサービング時に使用するフィーチャーが異なること...

작성 글자: 0원문 글자: 10,636작성 단락: 0/287