- Authors
- Name

- ブループリント概要:Feature StoreとRAGパイプラインの合流
- アーキテクチャ青写真
- 統合照会API設計
- ドキュメントエンベディングパイプライン:チャンキング戦略とインデックス管理
- Feature StoreフィーチャーとRAG結果の結合パターン
- RAGOps:検索品質モニタリングとインデックス更新自動化
- コストとパフォーマンスのトレードオフ
- 障害シナリオ別対応ランブック
- ブループリント導入ロードマップ
- クイズ
- References
ブループリント概要:Feature StoreとRAGパイプラインの合流
「ブループリント」というタイトルの通り、本記事はFeature StoreとRAG(Retrieval-Augmented Generation)パイプラインを1つのAIプラットフォーム内で統合運用するための全体設計図を提示する。個別ツールの使い方ではなく、2つのシステムが交わる接合部で発生する設計決定と運用ルールを扱う。
Feature Storeは構造化データフィーチャーを管理し、RAGパイプラインは非構造化テキストをベクトルに変換してLLMにコンテキストを提供する。2026年現在、この2つのシステムは別々に運用される場合が多いが、実際のAI製品では「ユーザーの直近の購買履歴(Feature Store)+ 関連商品レビュー要約(RAG)」のように組み合わせて使うケースが増えている。本記事はその統合アーキテクチャの設計原則、実装パターン、運用ランブックを青写真として整理する。
アーキテクチャ青写真
全体のデータフローを4つのレイヤーに区分する。
┌──────────────────────────────────────────────────────┐
│ Layer 4: Serving (KServe / API Gateway) │
│ - Feature照会 + Vector検索結果をLLMに伝達 │
│ - Guardrail、Citation検証、レスポンスフィルタリング │
├──────────────────────────────────────────────────────┤
│ Layer 3: Feature Store + Vector Store │
│ - Feast Online Store (Redis):構造化フィーチャー │
│ - Vector DB (Qdrant/Weaviate):非構造化エンベディング│
│ - 統合照会API:2つのstoreを1つのインターフェースで │
├──────────────────────────────────────────────────────┤
│ Layer 2: Ingestion & Transformation │
│ - CDCパイプライン (Debezium -> Kafka) │
│ - ドキュメントチャンキング + エンベディング (Kubeflow)│
│ - Feature View定義 + Vector Index管理 │
├──────────────────────────────────────────────────────┤
│ Layer 1: Source Data │
│ - OLTP DB、Data Warehouse、ドキュメントストア、APIログ│
└──────────────────────────────────────────────────────┘
統合照会API設計
Feature StoreとVector Storeを1つのリクエストで照会する統合APIがこのブループリントの核心である。
"""
Feature Store(Feast)とVector Store(Qdrant)を統合照会するサービス。
サービングレイヤーでモデルやLLMにコンテキストを提供する際に使用する。
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import asyncio
from feast import FeatureStore
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
@dataclass
class UnifiedContext:
"""Feature Storeフィーチャー + Vector検索結果を合わせた統合コンテキスト"""
entity_id: str
structured_features: Dict[str, Any] # Feature Storeから照会
retrieved_documents: List[Dict[str, Any]] # Vector Storeから検索
feature_freshness_ms: float = 0.0
retrieval_latency_ms: float = 0.0
class UnifiedContextService:
def __init__(
self,
feast_repo_path: str,
qdrant_url: str,
qdrant_collection: str,
):
self.feast_store = FeatureStore(repo_path=feast_repo_path)
self.qdrant = QdrantClient(url=qdrant_url, timeout=5.0)
self.collection = qdrant_collection
async def get_context(
self,
entity_id: str,
query_embedding: List[float],
feature_list: List[str],
top_k: int = 5,
metadata_filter: Optional[Dict[str, str]] = None,
) -> UnifiedContext:
"""
Feature Store照会とVector検索を並列実行して
統合コンテキストを返す。
"""
# 並列実行:Feature Store照会 + Vector検索
feature_task = asyncio.to_thread(
self._fetch_features, entity_id, feature_list
)
vector_task = asyncio.to_thread(
self._search_vectors, query_embedding, top_k, metadata_filter, entity_id
)
features_result, vectors_result = await asyncio.gather(
feature_task, vector_task
)
return UnifiedContext(
entity_id=entity_id,
structured_features=features_result["features"],
retrieved_documents=vectors_result["documents"],
feature_freshness_ms=features_result["latency_ms"],
retrieval_latency_ms=vectors_result["latency_ms"],
)
def _fetch_features(self, entity_id: str, feature_list: List[str]) -> dict:
import time
start = time.monotonic()
result = self.feast_store.get_online_features(
features=feature_list,
entity_rows=[{"user_id": entity_id}],
).to_dict()
elapsed = (time.monotonic() - start) * 1000
# dictのリスト値をスカラーに変換
features = {k: v[0] if isinstance(v, list) and v else v for k, v in result.items()}
return {"features": features, "latency_ms": elapsed}
def _search_vectors(
self,
query_embedding: List[float],
top_k: int,
metadata_filter: Optional[Dict[str, str]],
entity_id: str,
) -> dict:
import time
start = time.monotonic()
search_filter = None
if metadata_filter:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in metadata_filter.items()
]
search_filter = Filter(must=conditions)
results = self.qdrant.search(
collection_name=self.collection,
query_vector=query_embedding,
query_filter=search_filter,
limit=top_k,
with_payload=True,
)
elapsed = (time.monotonic() - start) * 1000
documents = [
{
"text": hit.payload.get("text", ""),
"source": hit.payload.get("source", ""),
"score": hit.score,
"chunk_id": hit.id,
}
for hit in results
]
return {"documents": documents, "latency_ms": elapsed}
ドキュメントエンベディングパイプライン:チャンキング戦略とインデックス管理
RAGパイプラインでドキュメントをベクトルに変換するプロセスは、Feature StoreのFeature View定義と対称的な概念である。Feature Viewが「どのような変換を経てフィーチャーを作成するか」を定義するように、エンベディングパイプラインは「どのようなチャンキングとエンベディングを経てベクトルを作成するか」を定義する。
"""
ドキュメントチャンキング + エンベディングパイプライン。
Kubeflow Pipelineコンポーネントとして実行、または独立スクリプトとして使用する。
"""
from dataclasses import dataclass
from typing import List, Generator
import hashlib
import json
@dataclass
class ChunkConfig:
"""チャンキング設定。ドキュメントタイプ別に異なる戦略を適用する。"""
chunk_size: int = 512 # トークン基準
chunk_overlap: int = 64 # チャンク間の重複
separators: tuple = ("\n\n", "\n", ". ", " ")
min_chunk_size: int = 50 # これ未満は前のチャンクにマージ
metadata_fields: tuple = ("source", "doc_type", "updated_at")
# ドキュメントタイプ別チャンキング戦略
CHUNK_CONFIGS = {
"product_review": ChunkConfig(chunk_size=256, chunk_overlap=32),
"technical_doc": ChunkConfig(chunk_size=512, chunk_overlap=64),
"faq": ChunkConfig(chunk_size=128, chunk_overlap=0), # FAQは重複不要
"legal": ChunkConfig(chunk_size=1024, chunk_overlap=128),
}
def recursive_split(text: str, config: ChunkConfig) -> List[str]:
"""再帰的テキスト分割。separatorsを順番に試す。"""
if len(text.split()) <= config.chunk_size:
return [text] if len(text.split()) >= config.min_chunk_size else []
for sep in config.separators:
parts = text.split(sep)
if len(parts) > 1:
chunks = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if len(candidate.split()) > config.chunk_size:
if current:
chunks.append(current.strip())
current = part
else:
current = candidate
if current:
chunks.append(current.strip())
return [c for c in chunks if len(c.split()) >= config.min_chunk_size]
# すべてのseparatorで分割不可の場合、強制分割
words = text.split()
return [
" ".join(words[i:i + config.chunk_size])
for i in range(0, len(words), config.chunk_size - config.chunk_overlap)
]
def create_chunk_id(source: str, chunk_index: int, text: str) -> str:
"""決定論的chunk ID生成。同一ドキュメントの同一チャンクは同じIDを持つ。"""
content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
return f"{source}::{chunk_index}::{content_hash}"
def process_document(
text: str,
source: str,
doc_type: str,
metadata: dict,
embedding_fn,
) -> List[dict]:
"""
1つのドキュメントをチャンキングしエンベディングして、
Vector Storeにupsertするレコードリストを返す。
"""
config = CHUNK_CONFIGS.get(doc_type, ChunkConfig())
chunks = recursive_split(text, config)
records = []
for i, chunk_text in enumerate(chunks):
embedding = embedding_fn(chunk_text)
chunk_id = create_chunk_id(source, i, chunk_text)
records.append({
"id": chunk_id,
"vector": embedding,
"payload": {
"text": chunk_text,
"source": source,
"doc_type": doc_type,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size_tokens": len(chunk_text.split()),
**{k: metadata.get(k, "") for k in config.metadata_fields},
},
})
return records
Feature StoreフィーチャーとRAG結果の結合パターン
LLMに渡すプロンプトを構成する際、Feature Storeの構造化フィーチャーとRAGの検索結果をどのように結合するかが応答品質を決定する。
"""
Feature Storeフィーチャー + RAG検索結果を結合して
LLMプロンプトを構成するパターン。
"""
from typing import Dict, List, Any
def build_augmented_prompt(
user_query: str,
structured_features: Dict[str, Any],
retrieved_documents: List[Dict[str, Any]],
system_instruction: str = "",
) -> str:
"""
構造化フィーチャーと非構造化検索結果を結合したプロンプト生成。
原則:
1. 構造化フィーチャーは「ユーザーコンテキスト」セクションにテーブル形式で提供
2. RAG検索結果は「参考ドキュメント」セクションに出典と共に提供
3. フィーチャーとドキュメント間で矛盾がある場合、フィーチャー(リアルタイムデータ)を優先するよう明示
"""
# 構造化フィーチャーセクション
feature_lines = []
feature_display_names = {
"total_purchases_7d": "直近7日間の購入回数",
"avg_order_value_30d": "直近30日間の平均注文金額",
"preferred_category": "好みカテゴリ",
"membership_tier": "メンバーシップランク",
}
for key, value in structured_features.items():
display = feature_display_names.get(key, key)
feature_lines.append(f"- {display}: {value}")
features_section = "\n".join(feature_lines) if feature_lines else "なし"
# RAG検索結果セクション
doc_sections = []
for i, doc in enumerate(retrieved_documents, 1):
score = doc.get("score", 0)
# relevanceスコアが低いドキュメントは除外
if score < 0.7:
continue
doc_sections.append(
f"[ドキュメント {i}](出典:{doc['source']}、関連度:{score:.2f})\n{doc['text']}"
)
docs_section = "\n\n".join(doc_sections) if doc_sections else "関連ドキュメントなし"
prompt = f"""{system_instruction}
## ユーザーコンテキスト(Feature Storeリアルタイムデータ)
{features_section}
## 参考ドキュメント(RAG検索結果)
{docs_section}
## 指示事項
- ユーザーコンテキストと参考ドキュメントの両方を活用して回答してください。
- 構造化データ(ユーザーコンテキスト)とドキュメント内容が矛盾する場合、構造化データを優先します。
- 回答に使用した参考ドキュメント番号を引用してください。
- 参考ドキュメントにない内容は推測しないでください。
## ユーザー質問
{user_query}"""
return prompt
RAGOps:検索品質モニタリングとインデックス更新自動化
Feature StoreにFreshness SLAがあるように、Vector Storeにもインデックス最新性管理が必要である。これをRAGOpsと呼ぶ。
インデックスFreshness追跡
"""
Vector Storeインデックスの最新性を追跡するモニタリングモジュール。
ドキュメントソースの最新更新時刻とインデックスの最終更新時刻を
比較してstale状態を検知する。
"""
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
import json
@dataclass
class IndexFreshnessReport:
collection_name: str
total_documents: int
stale_documents: int
freshness_ratio: float
oldest_document_age_hours: float
avg_document_age_hours: float
recommendations: List[str]
def check_index_freshness(
qdrant_client,
collection_name: str,
freshness_threshold_hours: int = 24,
sample_size: int = 1000,
) -> IndexFreshnessReport:
"""
Vector Storeインデックスのドキュメント最新性を点検する。
"""
# サンプルドキュメントのupdated_atフィールドを照会
points, _ = qdrant_client.scroll(
collection_name=collection_name,
limit=sample_size,
with_payload=["updated_at", "source"],
)
now = datetime.utcnow()
ages_hours = []
stale_count = 0
stale_sources = set()
for point in points:
updated_at_str = point.payload.get("updated_at", "")
if not updated_at_str:
stale_count += 1
continue
try:
updated_at = datetime.fromisoformat(updated_at_str)
age_hours = (now - updated_at).total_seconds() / 3600
ages_hours.append(age_hours)
if age_hours > freshness_threshold_hours:
stale_count += 1
stale_sources.add(point.payload.get("source", "unknown"))
except ValueError:
stale_count += 1
total = len(points)
freshness_ratio = 1.0 - (stale_count / max(total, 1))
recommendations = []
if freshness_ratio < 0.95:
recommendations.append(
f"Staleドキュメント比率が{(1-freshness_ratio)*100:.1f}%です。"
f"エンベディングパイプラインの実行が必要です。"
)
if stale_sources:
recommendations.append(
f"Staleドキュメントソース:{', '.join(list(stale_sources)[:5])}"
)
if ages_hours and max(ages_hours) > freshness_threshold_hours * 3:
recommendations.append(
"非常に古いドキュメントが存在します。全体再インデックスを検討してください。"
)
return IndexFreshnessReport(
collection_name=collection_name,
total_documents=total,
stale_documents=stale_count,
freshness_ratio=freshness_ratio,
oldest_document_age_hours=max(ages_hours) if ages_hours else 0,
avg_document_age_hours=sum(ages_hours) / len(ages_hours) if ages_hours else 0,
recommendations=recommendations,
)
インデックス更新Kubeflow Pipeline
# rag-index-update-pipeline.yaml
# 定期的に変更されたドキュメントを検知してベクトルインデックスを更新する。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: rag-index-updater
namespace: ml-pipelines
spec:
schedule: '0 */4 * * *' # 4時間ごとに実行
concurrencyPolicy: Replace
workflowSpec:
entrypoint: update-index
templates:
- name: update-index
steps:
- - name: detect-changes
template: detect-document-changes
- - name: chunk-and-embed
template: run-embedding-pipeline
arguments:
parameters:
- name: changed-docs
value: '{{steps.detect-changes.outputs.parameters.changed-docs}}'
- - name: verify-freshness
template: check-freshness
- name: detect-document-changes
container:
image: ml-platform/doc-change-detector:v1.2.0
command: [python, detect_changes.py]
args:
- --since=4h
- --output=/tmp/changed_docs.json
env:
- name: DOC_STORE_URL
valueFrom:
secretKeyRef:
name: doc-store-credentials
key: url
- name: run-embedding-pipeline
container:
image: ml-platform/embedding-worker:v2.1.0
command: [python, embed_and_upsert.py]
args:
- --docs-file={{inputs.parameters.changed-docs}}
- --collection=product_knowledge
- --model=text-embedding-3-large
resources:
requests:
nvidia.com/gpu: 1
memory: 8Gi
- name: check-freshness
container:
image: ml-platform/rag-monitor:v1.0.0
command: [python, check_freshness.py]
args:
- --collection=product_knowledge
- --threshold-hours=24
- --alert-on-failure
コストとパフォーマンスのトレードオフ
Feature StoreとVector Storeを併せて運用する際、コスト構造を理解する必要がある。
| 構成要素 | コスト要因 | 削減戦略 | 注意事項 |
|---|---|---|---|
| Feast Online Store (Redis) | メモリ容量 * 時間 | TTL設定で未使用エンティティ削除 | TTLが短すぎるとcache miss急増 |
| Vector Store (Qdrant) | ベクトル次元数 _ ドキュメント数 _ レプリカ | 次元削減(1536 -> 512)、量子化適用 | 量子化時recall 1-3%低下 |
| エンベディングAPIコスト | トークン数 * 呼び出し回数 | 変更分のみ再エンベディング、バッチ処理 | モデル変更時に全体再エンベディング必要 |
| CDCパイプライン (Kafka) | パーティション数 * 保管期間 | 圧縮、保管期間短縮 | 保管期間がbackfillウィンドウ未満だと復旧不可 |
| サービングGPU (LLM) | GPU時間 * モデルサイズ | バッチ推論、モデル量子化 | 量子化レベル別品質ベンチマーク必須 |
障害シナリオ別対応ランブック
シナリオ1:Vector Store検索結果が突然不正確になる
症状:RAGチャットボットの回答品質が急落。ユーザーフィードバック「的外れな回答」が増加。
Retrieval recall@5が0.85から0.52に低下。
点検順序:
1. 最近のエンベディングパイプライン実行ログを確認
-> エンベディングモデルのバージョンが変わったかチェック
2. Qdrant collection infoを確認
-> vector dimensionが変更されたかチェック
3. 原因:エンベディングモデルをtext-embedding-3-smallからtext-embedding-3-largeに
アップグレードしたが、既存ベクトルは再エンベディングしなかったため次元不一致
解決:
1. エンベディングモデル変更時は必ず全体再インデックスを実行
2. 新規collection作成 -> 全体エンベディング -> alias切り替え(blue-greenパターン)
3. 再インデックス完了前までは以前のモデルにロールバック
シナリオ2:Feature Store照会とVector検索のどちらかのみ失敗
症状:統合照会APIで間欠的500エラー。
エラーログ:"TimeoutError: Feast online store read timed out after 50ms"
原因:Feature Store(Redis)はタイムアウトだがVector Store(Qdrant)は正常。
一方の失敗が全体リクエストを失敗させる。
解決:
1. 統合照会APIにpartial failure許容ロジックを追加
2. Feature Store失敗時:デフォルト値(人口統計平均)を使用、ログ記録
3. Vector Store失敗時:事前キャッシュされた人気ドキュメントでfallback
4. 両方失敗時:LLMにコンテキストなしで一般的な応答を生成(品質低下警告を含む)
ブループリント導入ロードマップ
このブループリントを一度に実装しようとせず、4段階に分けて導入する。
Phase 1(4週間):基盤構築
- Feast + Redis Online Store構成
- Qdrant単一ノードデプロイ
- 統合照会APIプロトタイプ
- 検証:単一ユースケースでFeature Store照会 + Vector検索の応答時間 100ms未満
Phase 2(4週間):パイプライン自動化
- CDCパイプライン構築(Feature Store freshness SLA達成)
- エンベディングパイプライン自動化(変更検知 -> 再エンベディング)
- 検証:Feature freshness p95 5分未満、インデックスfreshness 4時間未満
Phase 3(4週間):品質モニタリング
- Retrieval品質メトリクス(recall@k、MRR)ダッシュボード
- Feature Store parityテストCI統合
- アラーティング体系構築
- 検証:品質回帰時30分以内にアラート受信
Phase 4(4週間):最適化とスケーリング
- ベクトル量子化、TTL最適化、コストダッシュボード
- マルチcollection対応(ドキュメントタイプ別分離)
- Blue-greenインデックス切り替え自動化
- 検証:月間インフラコストPhase 2対比30%削減
クイズ
クイズ
Q1. Feature StoreのFeature ViewとRAGパイプラインのチャンキング設定が対称関係にある理由は?
どちらも「ソースデータをどのような変換を経てサービング可能な形に作るか」を定義する宣言的な仕様であり、変換ロジックの一貫性が学習-サービング/インデックス-検索品質を決定するためである。
Q2. 統合照会APIでFeature Store照会とVector検索を並列実行する理由とリスクは?
全体応答時間をmax(Feature Storeレイテンシ、Vector Storeレイテンシ)に短縮するためである。リスクは一方が失敗すると全体が失敗する可能性があるため、partial failure許容ロジックが必要となる。
Q3. エンベディングモデルをアップグレードする際に既存ベクトルを再エンベディングしないとどのような問題が発生するか?
新モデルで生成したクエリエンベディングと旧モデルで生成したドキュメントエンベディングが同じベクトル空間にないため、similarity計算が無意味になりretrieval recallが急落する。
Q4. Blue-greenインデックス切り替えパターンの核心的なステップは?
新規collectionに全体再エンベディング完了 -> aliasを新collectionに切り替え -> 旧collectionを一定期間保持後削除。切り替え時点でダウンタイムがない。
Q5. RAGOpsでインデックスfreshness SLAを設定する際に考慮すべき主要要素は?
ドキュメント変更頻度、エンベディングパイプライン実行コスト、ビジネス要件(リアルタイム性の必要性)、エンベディングAPI呼び出しコストを総合的に考慮する必要がある。FAQのように変更が稀なドキュメントは24時間、商品レビューは4時間など、ドキュメントタイプ別に差別化する。
Q6. Feature Storeの構造化フィーチャーとRAGドキュメント内容が矛盾した場合、構造化データを優先するようLLMに指示する理由は?
Feature StoreのデータはリアルタイムのcDCで更新され最新性が保証されるが、RAGドキュメントはインデックスサイクルにより遅延する可能性がある。リアルタイムデータがより信頼できるソースであるためである。
Q7. ブループリントを4段階に分けて導入する最大の理由は?
各段階で定量的検証基準を満たしているか確認してから次の段階に進む必要があり、一度に実装すると障害原因の切り分けが困難で、チームの学習曲線を消化できないためである。