Skip to content

필사 모드: Feature Store構築完全ガイド:Feastアーキテクチャ・オンライン/オフラインサービング・MLパイプライン統合

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

はじめに

MLモデルのプロダクション性能は、モデルアーキテクチャよりも**フィーチャーの品質と一貫性**に大きく左右される。データサイエンティストがJupyter Notebookで作成したフィーチャー変換ロジックと、実際のサービングサーバーに実装されたロジックが微妙に異なる瞬間、学習-サービングスキュー(Training-Serving Skew)が発生し、モデル性能は急激に低下する。

Feature Storeは、この問題をアーキテクチャレベルで解決するコアインフラである。本記事では、オープンソースFeature Storeである**Feast**を中心に、アーキテクチャ設計からオンライン/オフラインサービング実装、AirflowやKubeflowを活用したMLパイプライン統合、そしてTectonやHopsworksなどの競合プラットフォームとの比較まで、包括的に解説する。単純なインストールガイドを超え、数百万のエンティティと数十のFeature Viewを運用するチームが参考にできる実戦ガイドを目指す。

1. Feature Storeの必要性

フィーチャー管理なしで発生する問題

Feature StoreなしでMLシステムを運用すると、以下の問題が繰り返し発生する。

1. **フィーチャーロジックの重複**:学習パイプラインで`avg_order_amount_30d`を計算するコードとサービングサーバーのコードが異なる。NULL処理方式、集計ウィンドウ範囲、タイムゾーン等で微妙な差異が生じる。

2. **フィーチャーの発見性欠如**:チームAが既に計算した`user_click_rate_7d`フィーチャーをチームBが知らずに再計算する。組織内のフィーチャー資産がサイロ化する。

3. **タイムトラベル不可**:「3週間前の時点でこのユーザーのフィーチャー値は何だったか?」に答えられず、再学習やデバッグが不可能になる。

4. **サービングレイテンシ**:推論時に複数のデータソースからリアルタイムでフィーチャーを計算すると、p99レイテンシが数百ミリ秒まで跳ね上がる。

Feature Storeが解決する核心課題

Feature Storeはフィーチャーの**単一の信頼できる情報源(Single Source of Truth)**の役割を果たす。一つのフィーチャー定義からオフライン学習データとオンラインサービングデータの両方を生成するため、ロジックの不一致が根本的に排除される。中央レジストリを通じてフィーチャーを検索・再利用でき、Point-in-Time Joinにより過去の特定時点のフィーチャーを正確に再現できる。

2. Feature Storeアーキテクチャ

Feature Storeのコアアーキテクチャは、**オフラインストア**、**オンラインストア**、**レジストリ**の3つのコンポーネントで構成される。

オフラインストア (Offline Store)

大量のヒストリカルフィーチャーデータを保存し、学習データ生成時にPoint-in-Time Joinを実行する。BigQuery、Snowflake、Redshift、S3/Parquet等の大規模分析用データウェアハウスやデータレイクをバックエンドとして使用する。数十TB規模のデータを効率的にスキャンできる必要がある。

オンラインストア (Online Store)

リアルタイムサービングのための低レイテンシキーバリューストアである。Redis、DynamoDB、Bigtable等をバックエンドとして使用し、エンティティキーを基準に最新のフィーチャー値をp99 10ms以内で返す必要がある。Materializationプロセスを通じてオフラインストアのデータがオンラインストアに同期される。

レジストリ (Registry)

エンティティ、Feature View、Feature Service等のメタデータを保存する中央カタログである。ファイルベース(ローカル、S3、GCS)またはSQLベース(PostgreSQL、MySQL)で運用できる。プロダクション環境ではSQLベースのレジストリを推奨する。

3. Feastフレームワーク深層分析

Feast(Feature Store)は、2019年にGojekとGoogleによって開始されたオープンソースプロジェクトで、現在Linux Foundation傘下で管理されている。Feastの核心的な強みは**プラガブルアーキテクチャ**である。既存のインフラ(Spark、Kafka、Redis、Snowflake等)をそのまま活用しながら、Feature Storeレイヤーのみを追加できる。

コアコンセプト

- **Entity**:フィーチャーが紐付くビジネスオブジェクト(例:ユーザー、商品、ドライバー)

- **Feature View**:同一ソースから派生したフィーチャーグループの論理的単位

- **Feature Service**:特定モデルが使用するフィーチャーのバンドル

- **Data Source**:フィーチャーデータの原本(BigQuery、Parquet、Kafka等)

- **Materialization**:オフラインストアからオンラインストアへデータを同期するプロセス

4. Feastインストールとプロジェクト構成

インストールと初期化

Feastインストール(Redis、PostgreSQLサポート含む)

pip install 'feast[redis,postgres]'

プロジェクト初期化

feast init feature_repo

cd feature_repo

ディレクトリ構造

feature_repo/

feature_store.yaml -- プロジェクト設定

definitions.py -- Entity、Feature View定義

data/ -- サンプルデータ

プロジェクト設定 (feature_store.yaml)

project: my_ml_platform

provider: gcp

registry:

registry_type: sql

path: postgresql://feast:feast@db-host:5432/feast_registry

cache_ttl_seconds: 60

online_store:

type: redis

connection_string: redis-host:6379,password=secret

offline_store:

type: bigquery

dataset: feast_offline

entity_key_serialization_version: 2

この設定で注目すべきポイントは3つある。第一に、SQLベースのレジストリにより複数チームの同時アクセスをサポートする。第二に、オンラインストアはRedisを使用してミリ秒単位のレスポンスを保証する。第三に、オフラインストアはBigQueryを使用して大規模なヒストリカルジョインを処理する。

5. Feature ViewとEntity定義

EntityとFeature Viewの定義

from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource, BigQuerySource

from feast.types import Float32, Float64, Int64, String

Entity定義

customer = Entity(

name="customer_id",

description="顧客の一意識別子",

)

driver = Entity(

name="driver_id",

description="ドライバーの一意識別子",

)

BigQueryソース定義

customer_stats_source = BigQuerySource(

name="customer_stats_source",

table="my_project.feast_dataset.customer_stats",

timestamp_field="event_timestamp",

created_timestamp_column="created_timestamp",

)

Feature View定義

customer_stats_fv = FeatureView(

name="customer_stats",

entities=[customer],

ttl=timedelta(days=3),

schema=[

Field(name="total_orders", dtype=Int64),

Field(name="avg_order_amount", dtype=Float64),

Field(name="lifetime_value", dtype=Float64),

Field(name="preferred_category", dtype=String),

Field(name="churn_risk_score", dtype=Float32),

],

source=customer_stats_source,

online=True,

tags={

"team": "growth",

"version": "v2",

},

)

driver_stats_source = BigQuerySource(

name="driver_stats_source",

table="my_project.feast_dataset.driver_stats",

timestamp_field="event_timestamp",

)

driver_stats_fv = FeatureView(

name="driver_stats",

entities=[driver],

ttl=timedelta(hours=6),

schema=[

Field(name="avg_rating", dtype=Float64),

Field(name="total_trips", dtype=Int64),

Field(name="acceptance_rate", dtype=Float64),

Field(name="avg_delivery_time_min", dtype=Float32),

],

source=driver_stats_source,

online=True,

)

Feature Serviceの定義

特定モデルが使用するフィーチャーをFeature Serviceにまとめて管理する。

from feast import FeatureService

離脱予測モデル用Feature Service

churn_prediction_svc = FeatureService(

name="churn_prediction_service",

features=[

customer_stats_fv[["total_orders", "avg_order_amount", "lifetime_value", "churn_risk_score"]],

],

tags={

"model": "churn_prediction_v3",

"owner": "growth-team",

},

)

ドライバーマッチングモデル用Feature Service

driver_matching_svc = FeatureService(

name="driver_matching_service",

features=[

driver_stats_fv[["avg_rating", "acceptance_rate", "avg_delivery_time_min"]],

customer_stats_fv[["preferred_category"]],

],

)

6. オンライン/オフラインサービング実装

オフラインサービング(学習データ生成)

オフラインサービングはPoint-in-Time Joinを使用して、過去の特定時点のフィーチャー値を正確に取得する。これがFeature Leakageを防止する核心メカニズムである。

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

学習用エンティティデータフレーム(ラベル含む)

entity_df = pd.DataFrame({

"customer_id": [1001, 1002, 1003, 1001, 1002],

"event_timestamp": pd.to_datetime([

"2026-01-15 10:00:00",

"2026-01-15 11:00:00",

"2026-01-16 09:00:00",

"2026-02-01 10:00:00",

"2026-02-01 11:00:00",

]),

"churned": [0, 1, 0, 1, 0], # ラベル

})

Point-in-Time Joinで学習データ生成

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"customer_stats:total_orders",

"customer_stats:avg_order_amount",

"customer_stats:lifetime_value",

"customer_stats:churn_risk_score",

],

).to_df()

print(training_df.head())

customer_id | event_timestamp | churned | total_orders | avg_order_amount | ...

1001 | 2026-01-15 10:00:00 | 0 | 42 | 35.50 | ...

オンラインサービング(リアルタイム推論)

オンラインサービングは最新のフィーチャー値をミリ秒単位で返す。

オンラインフィーチャー取得

online_features = store.get_online_features(

features=[

"customer_stats:total_orders",

"customer_stats:avg_order_amount",

"customer_stats:churn_risk_score",

],

entity_rows=[

{"customer_id": 1001},

{"customer_id": 1002},

],

).to_dict()

print(online_features)

出力例:

{

"customer_id": [1001, 1002],

"total_orders": [45, 12],

"avg_order_amount": [35.50, 28.00],

"churn_risk_score": [0.15, 0.82],

}

Materialization(オフラインからオンラインへの同期)

全Feature Viewのマテリアライゼーション

feast materialize 2026-01-01T00:00:00 2026-03-10T00:00:00

増分マテリアライゼーション(前回実行以降の変更分のみ)

feast materialize-incremental 2026-03-10T00:00:00

7. MLパイプライン統合

Airflowによるフィーチャーパイプライン

FeastのMaterializationをAirflow DAGで自動化することで、安定的な運用が可能になる。

from airflow import DAG

from airflow.decorators import task

from datetime import datetime, timedelta

default_args = {

"owner": "ml-platform",

"retries": 3,

"retry_delay": timedelta(minutes=5),

}

with DAG(

dag_id="feast_materialization_pipeline",

default_args=default_args,

schedule_interval="0 */4 * * *", # 4時間ごとに実行

start_date=datetime(2026, 1, 1),

catchup=False,

) as dag:

@task()

def validate_source_data():

"""ソースデータ品質検証"""

from great_expectations import get_context

context = get_context()

result = context.run_checkpoint(checkpoint_name="feature_source_check")

if not result.success:

raise ValueError("ソースデータ品質検証失敗")

return True

@task()

def materialize_features():

"""オフラインストアからオンラインストアへのマテリアライゼーション"""

from feast import RepoConfig, FeatureStore

from feast.infra.online_stores.redis import RedisOnlineStoreConfig

from feast.repo_config import RegistryConfig

repo_config = RepoConfig(

project="my_ml_platform",

provider="gcp",

registry=RegistryConfig(

registry_type="sql",

path="postgresql://feast:feast@db-host:5432/feast_registry",

),

online_store=RedisOnlineStoreConfig(

connection_string="redis-host:6379",

),

)

store = FeatureStore(config=repo_config)

store.materialize_incremental(end_date=datetime.utcnow())

return True

@task()

def validate_online_store():

"""オンラインストアのフィーチャー値検証"""

from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

result = store.get_online_features(

features=["customer_stats:total_orders"],

entity_rows=[{"customer_id": 1001}],

).to_dict()

if result["total_orders"][0] is None:

raise ValueError("オンラインストアにフィーチャーがロードされていません")

return True

@task()

def notify_completion():

"""Slack通知送信"""

requests.post(

"https://hooks.slack.com/services/YOUR/WEBHOOK/URL",

json={"text": "Feast Materialization完了"},

)

validate_source_data() >> materialize_features() >> validate_online_store() >> notify_completion()

Kubeflow Pipelines統合

Kubeflow Pipelinesでは、コンポーネント単位でFeastタスクを定義できる。

from kfp import dsl

from kfp.dsl import component, Output, Dataset

@component(

base_image="python:3.10",

packages_to_install=["feast[redis,postgres]>=0.40.0"],

)

def feast_materialize_op(

project_name: str,

registry_path: str,

redis_connection: str,

):

from feast import RepoConfig, FeatureStore

from feast.infra.online_stores.redis import RedisOnlineStoreConfig

from feast.repo_config import RegistryConfig

from datetime import datetime

config = RepoConfig(

project=project_name,

provider="gcp",

registry=RegistryConfig(

registry_type="sql",

path=registry_path,

),

online_store=RedisOnlineStoreConfig(

connection_string=redis_connection,

),

)

store = FeatureStore(config=config)

store.materialize_incremental(end_date=datetime.utcnow())

@component(

base_image="python:3.10",

packages_to_install=["feast[redis,postgres]>=0.40.0", "scikit-learn"],

)

def train_model_op(

project_name: str,

model_output: Output[Dataset],

):

from feast import FeatureStore

from sklearn.ensemble import GradientBoostingClassifier

store = FeatureStore(repo_path="feature_repo/")

entity_df = pd.read_parquet("gs://my-bucket/training_entities.parquet")

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"customer_stats:total_orders",

"customer_stats:avg_order_amount",

"customer_stats:churn_risk_score",

],

).to_df()

X = training_df.drop(columns=["customer_id", "event_timestamp", "churned"])

y = training_df["churned"]

model = GradientBoostingClassifier(n_estimators=200)

model.fit(X, y)

with open(model_output.path, "wb") as f:

pickle.dump(model, f)

@dsl.pipeline(name="feast-ml-training-pipeline")

def feast_training_pipeline():

materialize_task = feast_materialize_op(

project_name="my_ml_platform",

registry_path="postgresql://feast:feast@db-host:5432/feast_registry",

redis_connection="redis-host:6379",

)

train_task = train_model_op(

project_name="my_ml_platform",

)

train_task.after(materialize_task)

8. Feature Store比較分析

| 項目 | Feast | Tecton | Hopsworks | SageMaker Feature Store |

| ---------------------- | ------------------------------------ | ---------------------------------- | ------------------------- | ----------------------- |

| **ライセンス** | Apache 2.0(オープンソース) | 商用(マネージド) | AGPL / 商用 | AWS専用 |

| **デプロイ方式** | セルフホスティング | SaaS / VPC | SaaS / セルフホスティング | AWSマネージド |

| **オンラインストア** | Redis, DynamoDB, PostgreSQL | DynamoDB(内蔵) | RonDB(内蔵、高性能) | 独自ストア |

| **オフラインストア** | BigQuery, Snowflake, Redshift, Spark | Spark, Snowflake | Apache Hudi | S3 + Glue Catalog |

| **ストリーミング対応** | Kafka Push(基本) | Kafka, Kinesis(ネイティブ) | Kafka, Spark Streaming | Kinesis |

| **変換エンジン** | On-Demand Transform | Spark, SQL, Python DSL | Spark, Flink | SageMaker Processing |

| **Point-in-Time Join** | サポート | サポート(高度化) | サポート | 限定的サポート |

| **レジストリ** | SQL, ファイルベース | 内蔵(Web UI) | 内蔵(Hopsworks UI) | AWS Glue |

| **GenAI/ベクトル対応** | 限定的 | Embedding対応 | Embedding + RAG | なし |

| **コスト** | 無料(インフラコストのみ) | 高い(エンタープライズ) | 中程度 | AWS使用量ベース |

| **適合対象** | 柔軟性重視、OSS志向チーム | エンタープライズ、リアルタイム要件 | 規制産業、ガバナンス必要 | AWS既存利用者 |

主要な違いのまとめ

- **Feast**:最大の柔軟性。既存インフラに合わせて各コンポーネントを選択可能。運用負荷はチームが直接負担する。

- **Tecton**:ターンキーソリューション。ストリーミングフィーチャーパイプラインが強力だがコストが高い。リアルタイムMLが核心の組織に最適。

- **Hopsworks**:データガバナンスと監査ログが強力で、金融・ヘルスケア等の規制産業で好まれる。RonDBベースのオンラインストアはSageMaker比で15%のレイテンシを実現。

- **SageMaker Feature Store**:AWSエコシステムに既に深く組み込まれた組織には便利だが、ベンダーロックインのリスクがある。

9. 障害事例と解決戦略

Training-Serving Skew防止

学習-サービングスキューは、Feature Storeを導入しても完全には消えない。代表的な発生シナリオと対処法は以下の通りである。

**シナリオ1:TTL超過によるstaleフィーチャー**

オンラインストアのTTLが6時間だが、Materializationバッチが障害で12時間実行されなかった場合、一部フィーチャーがnullで返される。対処法は、Materialization失敗時に即座にアラートを送信し、TTLをMaterialization周期の最低3倍に設定すること。

**シナリオ2:フィーチャー定義変更時の互換性破壊**

`avg_order_amount`フィーチャーの集計ウィンドウを30日から90日に変更すると、既に学習済みのモデルとの互換性が壊れる。対処法は、フィーチャーを変更せず新しいフィーチャー(例:`avg_order_amount_90d`)を追加すること。

**シナリオ3:タイムゾーン不一致**

オフライン学習データがUTC基準だがオンラインソースデータがローカルタイムゾーンを使用している場合、フィーチャー値が異なる。全てのtimestampをUTCに統一する必要がある。

レイテンシ最適化

オンラインサービングレイテンシが高くなる原因と解決法:

- **原因**:単一リクエストで過多なFeature Viewを照会している。

- **解決**:Feature Serviceで必要なフィーチャーのみをバンドルし、バッチ照会(`get_online_features`に複数エンティティを一度に渡す)を活用する。

- **原因**:Redisクラスタのホットキー問題。

- **解決**:エンティティキーにハッシュタグを使用してキーを均等に分散させる。

データ一貫性の保証

オフラインストアとオンラインストア間のデータ一貫性はMaterializationに依存する。これを補強するために:

1. Materialization後にサンプリング検証タスクを実行し、オフラインとオンラインのフィーチャー値を比較する。

2. フィーチャードリフトモニタリングを構築し、フィーチャー分布の異常変化を検知する。

3. ソースデータパイプラインにGreat Expectations等のデータ品質ツールを統合する。

10. 運用チェックリスト

プロダクションFeature Storeを安定的に運用するためのチェックリスト:

設計段階

- エンティティ設計:ビジネスドメインに合ったエンティティキーを定義したか

- TTL設定:各Feature ViewのTTLがデータ更新周期と整合的か

- オフライン/オンライン分離:全てのFeature Viewがオンラインストアに必要なわけではない。`online=False`に設定可能なフィーチャーを識別したか

- レジストリ:SQLベースのレジストリで同時アクセスの競合を防止しているか

パイプライン構成

- Materializationスケジュール:AirflowまたはCronで定期的なMaterializationが設定されているか

- 障害アラート:Materialization失敗時にSlack/PagerDutyアラートが構成されているか

- ソースデータ検証:Great Expectations等でソースデータ品質を事前検証しているか

- 増分Materialization:`materialize-incremental`を使用して全件再処理を避けているか

モニタリング

- オンラインストアレイテンシ:p50、p95、p99レイテンシをモニタリングしているか

- フィーチャーFreshness:各Feature Viewの最新Materialization時刻を追跡しているか

- フィーチャードリフト:フィーチャー分布の変化を検知するモニタリングがあるか

- null率:オンラインフィーチャー照会時のnull返却率を追跡しているか

セキュリティとガバナンス

- RBAC:チーム別にFeature Viewアクセス権限が分離されているか

- 監査ログ:フィーチャー定義変更履歴が記録されているか

- PIIマスキング:個人情報を含むフィーチャーに適切なマスキングが適用されているか

まとめ

Feature Storeは、MLシステムの成熟度を一段階引き上げるコアインフラである。Feastは、オープンソースの柔軟性とプラガブルアーキテクチャを基盤として、ほとんどの組織にとって良い出発点となる。ただし、Feature Store導入自体が目的になってはならない。フィーチャーロジックの一貫性確保、学習-サービングスキュー防止、フィーチャー再利用性向上という明確なビジネス価値にフォーカスすべきである。

AirflowやKubeflowとのパイプライン統合、Redisベースのオンラインサービング、SQLレジストリによるメタデータ管理を組み合わせることで、数十のFeature Viewと数百万のエンティティを安定的に運用できるプロダクションFeature Storeを構築できる。組織の規模と要件に応じて、TectonやHopsworksなどのマネージドソリューションも検討する価値がある。

현재 단락 (1/328)

MLモデルのプロダクション性能は、モデルアーキテクチャよりも**フィーチャーの品質と一貫性**に大きく左右される。データサイエンティストがJupyter Notebookで作成したフィーチャー変換ロジ...

작성 글자: 0원문 글자: 13,544작성 단락: 0/328