Skip to content
Published on

Feature Store構築完全ガイド:Feastアーキテクチャ・オンライン/オフラインサービング・MLパイプライン統合

Authors
  • Name
    Twitter
Feature Store Guide

はじめに

MLモデルのプロダクション性能は、モデルアーキテクチャよりもフィーチャーの品質と一貫性に大きく左右される。データサイエンティストがJupyter Notebookで作成したフィーチャー変換ロジックと、実際のサービングサーバーに実装されたロジックが微妙に異なる瞬間、学習-サービングスキュー(Training-Serving Skew)が発生し、モデル性能は急激に低下する。

Feature Storeは、この問題をアーキテクチャレベルで解決するコアインフラである。本記事では、オープンソースFeature StoreであるFeastを中心に、アーキテクチャ設計からオンライン/オフラインサービング実装、AirflowやKubeflowを活用したMLパイプライン統合、そしてTectonやHopsworksなどの競合プラットフォームとの比較まで、包括的に解説する。単純なインストールガイドを超え、数百万のエンティティと数十のFeature Viewを運用するチームが参考にできる実戦ガイドを目指す。

1. Feature Storeの必要性

フィーチャー管理なしで発生する問題

Feature StoreなしでMLシステムを運用すると、以下の問題が繰り返し発生する。

  1. フィーチャーロジックの重複:学習パイプラインでavg_order_amount_30dを計算するコードとサービングサーバーのコードが異なる。NULL処理方式、集計ウィンドウ範囲、タイムゾーン等で微妙な差異が生じる。
  2. フィーチャーの発見性欠如:チームAが既に計算したuser_click_rate_7dフィーチャーをチームBが知らずに再計算する。組織内のフィーチャー資産がサイロ化する。
  3. タイムトラベル不可:「3週間前の時点でこのユーザーのフィーチャー値は何だったか?」に答えられず、再学習やデバッグが不可能になる。
  4. サービングレイテンシ:推論時に複数のデータソースからリアルタイムでフィーチャーを計算すると、p99レイテンシが数百ミリ秒まで跳ね上がる。

Feature Storeが解決する核心課題

Feature Storeはフィーチャーの**単一の信頼できる情報源(Single Source of Truth)**の役割を果たす。一つのフィーチャー定義からオフライン学習データとオンラインサービングデータの両方を生成するため、ロジックの不一致が根本的に排除される。中央レジストリを通じてフィーチャーを検索・再利用でき、Point-in-Time Joinにより過去の特定時点のフィーチャーを正確に再現できる。

2. Feature Storeアーキテクチャ

Feature Storeのコアアーキテクチャは、オフラインストアオンラインストアレジストリの3つのコンポーネントで構成される。

オフラインストア (Offline Store)

大量のヒストリカルフィーチャーデータを保存し、学習データ生成時にPoint-in-Time Joinを実行する。BigQuery、Snowflake、Redshift、S3/Parquet等の大規模分析用データウェアハウスやデータレイクをバックエンドとして使用する。数十TB規模のデータを効率的にスキャンできる必要がある。

オンラインストア (Online Store)

リアルタイムサービングのための低レイテンシキーバリューストアである。Redis、DynamoDB、Bigtable等をバックエンドとして使用し、エンティティキーを基準に最新のフィーチャー値をp99 10ms以内で返す必要がある。Materializationプロセスを通じてオフラインストアのデータがオンラインストアに同期される。

レジストリ (Registry)

エンティティ、Feature View、Feature Service等のメタデータを保存する中央カタログである。ファイルベース(ローカル、S3、GCS)またはSQLベース(PostgreSQL、MySQL)で運用できる。プロダクション環境ではSQLベースのレジストリを推奨する。

3. Feastフレームワーク深層分析

Feast(Feature Store)は、2019年にGojekとGoogleによって開始されたオープンソースプロジェクトで、現在Linux Foundation傘下で管理されている。Feastの核心的な強みはプラガブルアーキテクチャである。既存のインフラ(Spark、Kafka、Redis、Snowflake等)をそのまま活用しながら、Feature Storeレイヤーのみを追加できる。

コアコンセプト

  • Entity:フィーチャーが紐付くビジネスオブジェクト(例:ユーザー、商品、ドライバー)
  • Feature View:同一ソースから派生したフィーチャーグループの論理的単位
  • Feature Service:特定モデルが使用するフィーチャーのバンドル
  • Data Source:フィーチャーデータの原本(BigQuery、Parquet、Kafka等)
  • Materialization:オフラインストアからオンラインストアへデータを同期するプロセス

4. Feastインストールとプロジェクト構成

インストールと初期化

# Feastインストール(Redis、PostgreSQLサポート含む)
pip install 'feast[redis,postgres]'

# プロジェクト初期化
feast init feature_repo
cd feature_repo

# ディレクトリ構造
# feature_repo/
#   feature_store.yaml    -- プロジェクト設定
#   definitions.py        -- Entity、Feature View定義
#   data/                 -- サンプルデータ

プロジェクト設定 (feature_store.yaml)

project: my_ml_platform
provider: gcp
registry:
  registry_type: sql
  path: postgresql://feast:feast@db-host:5432/feast_registry
  cache_ttl_seconds: 60
online_store:
  type: redis
  connection_string: redis-host:6379,password=secret
offline_store:
  type: bigquery
  dataset: feast_offline
entity_key_serialization_version: 2

この設定で注目すべきポイントは3つある。第一に、SQLベースのレジストリにより複数チームの同時アクセスをサポートする。第二に、オンラインストアはRedisを使用してミリ秒単位のレスポンスを保証する。第三に、オフラインストアはBigQueryを使用して大規模なヒストリカルジョインを処理する。

5. Feature ViewとEntity定義

EntityとFeature Viewの定義

from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, BigQuerySource
from feast.types import Float32, Float64, Int64, String

# Entity定義
customer = Entity(
    name="customer_id",
    description="顧客の一意識別子",
)

driver = Entity(
    name="driver_id",
    description="ドライバーの一意識別子",
)

# BigQueryソース定義
customer_stats_source = BigQuerySource(
    name="customer_stats_source",
    table="my_project.feast_dataset.customer_stats",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# Feature View定義
customer_stats_fv = FeatureView(
    name="customer_stats",
    entities=[customer],
    ttl=timedelta(days=3),
    schema=[
        Field(name="total_orders", dtype=Int64),
        Field(name="avg_order_amount", dtype=Float64),
        Field(name="lifetime_value", dtype=Float64),
        Field(name="preferred_category", dtype=String),
        Field(name="churn_risk_score", dtype=Float32),
    ],
    source=customer_stats_source,
    online=True,
    tags={
        "team": "growth",
        "version": "v2",
    },
)

driver_stats_source = BigQuerySource(
    name="driver_stats_source",
    table="my_project.feast_dataset.driver_stats",
    timestamp_field="event_timestamp",
)

driver_stats_fv = FeatureView(
    name="driver_stats",
    entities=[driver],
    ttl=timedelta(hours=6),
    schema=[
        Field(name="avg_rating", dtype=Float64),
        Field(name="total_trips", dtype=Int64),
        Field(name="acceptance_rate", dtype=Float64),
        Field(name="avg_delivery_time_min", dtype=Float32),
    ],
    source=driver_stats_source,
    online=True,
)

Feature Serviceの定義

特定モデルが使用するフィーチャーをFeature Serviceにまとめて管理する。

from feast import FeatureService

# 離脱予測モデル用Feature Service
churn_prediction_svc = FeatureService(
    name="churn_prediction_service",
    features=[
        customer_stats_fv[["total_orders", "avg_order_amount", "lifetime_value", "churn_risk_score"]],
    ],
    tags={
        "model": "churn_prediction_v3",
        "owner": "growth-team",
    },
)

# ドライバーマッチングモデル用Feature Service
driver_matching_svc = FeatureService(
    name="driver_matching_service",
    features=[
        driver_stats_fv[["avg_rating", "acceptance_rate", "avg_delivery_time_min"]],
        customer_stats_fv[["preferred_category"]],
    ],
)

6. オンライン/オフラインサービング実装

オフラインサービング(学習データ生成)

オフラインサービングはPoint-in-Time Joinを使用して、過去の特定時点のフィーチャー値を正確に取得する。これがFeature Leakageを防止する核心メカニズムである。

from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# 学習用エンティティデータフレーム(ラベル含む)
entity_df = pd.DataFrame({
    "customer_id": [1001, 1002, 1003, 1001, 1002],
    "event_timestamp": pd.to_datetime([
        "2026-01-15 10:00:00",
        "2026-01-15 11:00:00",
        "2026-01-16 09:00:00",
        "2026-02-01 10:00:00",
        "2026-02-01 11:00:00",
    ]),
    "churned": [0, 1, 0, 1, 0],  # ラベル
})

# Point-in-Time Joinで学習データ生成
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_stats:total_orders",
        "customer_stats:avg_order_amount",
        "customer_stats:lifetime_value",
        "customer_stats:churn_risk_score",
    ],
).to_df()

print(training_df.head())
# customer_id | event_timestamp     | churned | total_orders | avg_order_amount | ...
# 1001        | 2026-01-15 10:00:00 | 0       | 42           | 35.50            | ...

オンラインサービング(リアルタイム推論)

オンラインサービングは最新のフィーチャー値をミリ秒単位で返す。

# オンラインフィーチャー取得
online_features = store.get_online_features(
    features=[
        "customer_stats:total_orders",
        "customer_stats:avg_order_amount",
        "customer_stats:churn_risk_score",
    ],
    entity_rows=[
        {"customer_id": 1001},
        {"customer_id": 1002},
    ],
).to_dict()

print(online_features)
# 出力例:
# {
#     "customer_id": [1001, 1002],
#     "total_orders": [45, 12],
#     "avg_order_amount": [35.50, 28.00],
#     "churn_risk_score": [0.15, 0.82],
# }

Materialization(オフラインからオンラインへの同期)

# 全Feature Viewのマテリアライゼーション
feast materialize 2026-01-01T00:00:00 2026-03-10T00:00:00

# 増分マテリアライゼーション(前回実行以降の変更分のみ)
feast materialize-incremental 2026-03-10T00:00:00

7. MLパイプライン統合

Airflowによるフィーチャーパイプライン

FeastのMaterializationをAirflow DAGで自動化することで、安定的な運用が可能になる。

from airflow import DAG
from airflow.decorators import task
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-platform",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="feast_materialization_pipeline",
    default_args=default_args,
    schedule_interval="0 */4 * * *",  # 4時間ごとに実行
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    @task()
    def validate_source_data():
        """ソースデータ品質検証"""
        from great_expectations import get_context
        context = get_context()
        result = context.run_checkpoint(checkpoint_name="feature_source_check")
        if not result.success:
            raise ValueError("ソースデータ品質検証失敗")
        return True

    @task()
    def materialize_features():
        """オフラインストアからオンラインストアへのマテリアライゼーション"""
        from feast import RepoConfig, FeatureStore
        from feast.infra.online_stores.redis import RedisOnlineStoreConfig
        from feast.repo_config import RegistryConfig

        repo_config = RepoConfig(
            project="my_ml_platform",
            provider="gcp",
            registry=RegistryConfig(
                registry_type="sql",
                path="postgresql://feast:feast@db-host:5432/feast_registry",
            ),
            online_store=RedisOnlineStoreConfig(
                connection_string="redis-host:6379",
            ),
        )
        store = FeatureStore(config=repo_config)
        store.materialize_incremental(end_date=datetime.utcnow())
        return True

    @task()
    def validate_online_store():
        """オンラインストアのフィーチャー値検証"""
        from feast import FeatureStore
        store = FeatureStore(repo_path="feature_repo/")

        result = store.get_online_features(
            features=["customer_stats:total_orders"],
            entity_rows=[{"customer_id": 1001}],
        ).to_dict()

        if result["total_orders"][0] is None:
            raise ValueError("オンラインストアにフィーチャーがロードされていません")
        return True

    @task()
    def notify_completion():
        """Slack通知送信"""
        import requests
        requests.post(
            "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
            json={"text": "Feast Materialization完了"},
        )

    validate_source_data() >> materialize_features() >> validate_online_store() >> notify_completion()

Kubeflow Pipelines統合

Kubeflow Pipelinesでは、コンポーネント単位でFeastタスクを定義できる。

from kfp import dsl
from kfp.dsl import component, Output, Dataset

@component(
    base_image="python:3.10",
    packages_to_install=["feast[redis,postgres]>=0.40.0"],
)
def feast_materialize_op(
    project_name: str,
    registry_path: str,
    redis_connection: str,
):
    from feast import RepoConfig, FeatureStore
    from feast.infra.online_stores.redis import RedisOnlineStoreConfig
    from feast.repo_config import RegistryConfig
    from datetime import datetime

    config = RepoConfig(
        project=project_name,
        provider="gcp",
        registry=RegistryConfig(
            registry_type="sql",
            path=registry_path,
        ),
        online_store=RedisOnlineStoreConfig(
            connection_string=redis_connection,
        ),
    )
    store = FeatureStore(config=config)
    store.materialize_incremental(end_date=datetime.utcnow())

@component(
    base_image="python:3.10",
    packages_to_install=["feast[redis,postgres]>=0.40.0", "scikit-learn"],
)
def train_model_op(
    project_name: str,
    model_output: Output[Dataset],
):
    from feast import FeatureStore
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier
    import pickle

    store = FeatureStore(repo_path="feature_repo/")
    entity_df = pd.read_parquet("gs://my-bucket/training_entities.parquet")

    training_df = store.get_historical_features(
        entity_df=entity_df,
        features=[
            "customer_stats:total_orders",
            "customer_stats:avg_order_amount",
            "customer_stats:churn_risk_score",
        ],
    ).to_df()

    X = training_df.drop(columns=["customer_id", "event_timestamp", "churned"])
    y = training_df["churned"]

    model = GradientBoostingClassifier(n_estimators=200)
    model.fit(X, y)

    with open(model_output.path, "wb") as f:
        pickle.dump(model, f)

@dsl.pipeline(name="feast-ml-training-pipeline")
def feast_training_pipeline():
    materialize_task = feast_materialize_op(
        project_name="my_ml_platform",
        registry_path="postgresql://feast:feast@db-host:5432/feast_registry",
        redis_connection="redis-host:6379",
    )
    train_task = train_model_op(
        project_name="my_ml_platform",
    )
    train_task.after(materialize_task)

8. Feature Store比較分析

項目FeastTectonHopsworksSageMaker Feature Store
ライセンスApache 2.0(オープンソース)商用(マネージド)AGPL / 商用AWS専用
デプロイ方式セルフホスティングSaaS / VPCSaaS / セルフホスティングAWSマネージド
オンラインストアRedis, DynamoDB, PostgreSQLDynamoDB(内蔵)RonDB(内蔵、高性能)独自ストア
オフラインストアBigQuery, Snowflake, Redshift, SparkSpark, SnowflakeApache HudiS3 + Glue Catalog
ストリーミング対応Kafka Push(基本)Kafka, Kinesis(ネイティブ)Kafka, Spark StreamingKinesis
変換エンジンOn-Demand TransformSpark, SQL, Python DSLSpark, FlinkSageMaker Processing
Point-in-Time Joinサポートサポート(高度化)サポート限定的サポート
レジストリSQL, ファイルベース内蔵(Web UI)内蔵(Hopsworks UI)AWS Glue
GenAI/ベクトル対応限定的Embedding対応Embedding + RAGなし
コスト無料(インフラコストのみ)高い(エンタープライズ)中程度AWS使用量ベース
適合対象柔軟性重視、OSS志向チームエンタープライズ、リアルタイム要件規制産業、ガバナンス必要AWS既存利用者

主要な違いのまとめ

  • Feast:最大の柔軟性。既存インフラに合わせて各コンポーネントを選択可能。運用負荷はチームが直接負担する。
  • Tecton:ターンキーソリューション。ストリーミングフィーチャーパイプラインが強力だがコストが高い。リアルタイムMLが核心の組織に最適。
  • Hopsworks:データガバナンスと監査ログが強力で、金融・ヘルスケア等の規制産業で好まれる。RonDBベースのオンラインストアはSageMaker比で15%のレイテンシを実現。
  • SageMaker Feature Store:AWSエコシステムに既に深く組み込まれた組織には便利だが、ベンダーロックインのリスクがある。

9. 障害事例と解決戦略

Training-Serving Skew防止

学習-サービングスキューは、Feature Storeを導入しても完全には消えない。代表的な発生シナリオと対処法は以下の通りである。

シナリオ1:TTL超過によるstaleフィーチャー

オンラインストアのTTLが6時間だが、Materializationバッチが障害で12時間実行されなかった場合、一部フィーチャーがnullで返される。対処法は、Materialization失敗時に即座にアラートを送信し、TTLをMaterialization周期の最低3倍に設定すること。

シナリオ2:フィーチャー定義変更時の互換性破壊

avg_order_amountフィーチャーの集計ウィンドウを30日から90日に変更すると、既に学習済みのモデルとの互換性が壊れる。対処法は、フィーチャーを変更せず新しいフィーチャー(例:avg_order_amount_90d)を追加すること。

シナリオ3:タイムゾーン不一致

オフライン学習データがUTC基準だがオンラインソースデータがローカルタイムゾーンを使用している場合、フィーチャー値が異なる。全てのtimestampをUTCに統一する必要がある。

レイテンシ最適化

オンラインサービングレイテンシが高くなる原因と解決法:

  • 原因:単一リクエストで過多なFeature Viewを照会している。
  • 解決:Feature Serviceで必要なフィーチャーのみをバンドルし、バッチ照会(get_online_featuresに複数エンティティを一度に渡す)を活用する。
  • 原因:Redisクラスタのホットキー問題。
  • 解決:エンティティキーにハッシュタグを使用してキーを均等に分散させる。

データ一貫性の保証

オフラインストアとオンラインストア間のデータ一貫性はMaterializationに依存する。これを補強するために:

  1. Materialization後にサンプリング検証タスクを実行し、オフラインとオンラインのフィーチャー値を比較する。
  2. フィーチャードリフトモニタリングを構築し、フィーチャー分布の異常変化を検知する。
  3. ソースデータパイプラインにGreat Expectations等のデータ品質ツールを統合する。

10. 運用チェックリスト

プロダクションFeature Storeを安定的に運用するためのチェックリスト:

設計段階

  • エンティティ設計:ビジネスドメインに合ったエンティティキーを定義したか
  • TTL設定:各Feature ViewのTTLがデータ更新周期と整合的か
  • オフライン/オンライン分離:全てのFeature Viewがオンラインストアに必要なわけではない。online=Falseに設定可能なフィーチャーを識別したか
  • レジストリ:SQLベースのレジストリで同時アクセスの競合を防止しているか

パイプライン構成

  • Materializationスケジュール:AirflowまたはCronで定期的なMaterializationが設定されているか
  • 障害アラート:Materialization失敗時にSlack/PagerDutyアラートが構成されているか
  • ソースデータ検証:Great Expectations等でソースデータ品質を事前検証しているか
  • 増分Materialization:materialize-incrementalを使用して全件再処理を避けているか

モニタリング

  • オンラインストアレイテンシ:p50、p95、p99レイテンシをモニタリングしているか
  • フィーチャーFreshness:各Feature Viewの最新Materialization時刻を追跡しているか
  • フィーチャードリフト:フィーチャー分布の変化を検知するモニタリングがあるか
  • null率:オンラインフィーチャー照会時のnull返却率を追跡しているか

セキュリティとガバナンス

  • RBAC:チーム別にFeature Viewアクセス権限が分離されているか
  • 監査ログ:フィーチャー定義変更履歴が記録されているか
  • PIIマスキング:個人情報を含むフィーチャーに適切なマスキングが適用されているか

まとめ

Feature Storeは、MLシステムの成熟度を一段階引き上げるコアインフラである。Feastは、オープンソースの柔軟性とプラガブルアーキテクチャを基盤として、ほとんどの組織にとって良い出発点となる。ただし、Feature Store導入自体が目的になってはならない。フィーチャーロジックの一貫性確保、学習-サービングスキュー防止、フィーチャー再利用性向上という明確なビジネス価値にフォーカスすべきである。

AirflowやKubeflowとのパイプライン統合、Redisベースのオンラインサービング、SQLレジストリによるメタデータ管理を組み合わせることで、数十のFeature Viewと数百万のエンティティを安定的に運用できるプロダクションFeature Storeを構築できる。組織の規模と要件に応じて、TectonやHopsworksなどのマネージドソリューションも検討する価値がある。