Skip to content
Published on

AIプラットフォーム:Feature StoreとRAGOpsブループリント 2026

Authors
  • Name
    Twitter
AIプラットフォーム:Feature StoreとRAGOpsブループリント 2026

ブループリント概要:Feature StoreとRAGパイプラインの合流

「ブループリント」というタイトルの通り、本記事はFeature StoreとRAG(Retrieval-Augmented Generation)パイプラインを1つのAIプラットフォーム内で統合運用するための全体設計図を提示する。個別ツールの使い方ではなく、2つのシステムが交わる接合部で発生する設計決定と運用ルールを扱う。

Feature Storeは構造化データフィーチャーを管理し、RAGパイプラインは非構造化テキストをベクトルに変換してLLMにコンテキストを提供する。2026年現在、この2つのシステムは別々に運用される場合が多いが、実際のAI製品では「ユーザーの直近の購買履歴(Feature Store)+ 関連商品レビュー要約(RAG)」のように組み合わせて使うケースが増えている。本記事はその統合アーキテクチャの設計原則、実装パターン、運用ランブックを青写真として整理する。

アーキテクチャ青写真

全体のデータフローを4つのレイヤーに区分する。

┌──────────────────────────────────────────────────────┐
Layer 4: Serving (KServe / API Gateway)- Feature照会 + Vector検索結果をLLMに伝達           │
- Guardrail、Citation検証、レスポンスフィルタリング├──────────────────────────────────────────────────────┤
Layer 3: Feature Store + Vector Store- Feast Online Store (Redis):構造化フィーチャー    │
- Vector DB (Qdrant/Weaviate):非構造化エンベディング│
- 統合照会API2つのstoreを1つのインターフェースで  │
├──────────────────────────────────────────────────────┤
Layer 2: Ingestion & Transformation- CDCパイプライン (Debezium -> Kafka)- ドキュメントチャンキング + エンベディング (Kubeflow)- Feature View定義 + Vector Index管理├──────────────────────────────────────────────────────┤
Layer 1: Source Data- OLTP DB、Data Warehouse、ドキュメントストア、APIログ│
└──────────────────────────────────────────────────────┘

統合照会API設計

Feature StoreとVector Storeを1つのリクエストで照会する統合APIがこのブループリントの核心である。

"""
Feature Store(Feast)とVector Store(Qdrant)を統合照会するサービス。
サービングレイヤーでモデルやLLMにコンテキストを提供する際に使用する。
"""
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import asyncio
from feast import FeatureStore
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue

@dataclass
class UnifiedContext:
    """Feature Storeフィーチャー + Vector検索結果を合わせた統合コンテキスト"""
    entity_id: str
    structured_features: Dict[str, Any]          # Feature Storeから照会
    retrieved_documents: List[Dict[str, Any]]    # Vector Storeから検索
    feature_freshness_ms: float = 0.0
    retrieval_latency_ms: float = 0.0

class UnifiedContextService:
    def __init__(
        self,
        feast_repo_path: str,
        qdrant_url: str,
        qdrant_collection: str,
    ):
        self.feast_store = FeatureStore(repo_path=feast_repo_path)
        self.qdrant = QdrantClient(url=qdrant_url, timeout=5.0)
        self.collection = qdrant_collection

    async def get_context(
        self,
        entity_id: str,
        query_embedding: List[float],
        feature_list: List[str],
        top_k: int = 5,
        metadata_filter: Optional[Dict[str, str]] = None,
    ) -> UnifiedContext:
        """
        Feature Store照会とVector検索を並列実行して
        統合コンテキストを返す。
        """
        # 並列実行:Feature Store照会 + Vector検索
        feature_task = asyncio.to_thread(
            self._fetch_features, entity_id, feature_list
        )
        vector_task = asyncio.to_thread(
            self._search_vectors, query_embedding, top_k, metadata_filter, entity_id
        )

        features_result, vectors_result = await asyncio.gather(
            feature_task, vector_task
        )

        return UnifiedContext(
            entity_id=entity_id,
            structured_features=features_result["features"],
            retrieved_documents=vectors_result["documents"],
            feature_freshness_ms=features_result["latency_ms"],
            retrieval_latency_ms=vectors_result["latency_ms"],
        )

    def _fetch_features(self, entity_id: str, feature_list: List[str]) -> dict:
        import time
        start = time.monotonic()
        result = self.feast_store.get_online_features(
            features=feature_list,
            entity_rows=[{"user_id": entity_id}],
        ).to_dict()
        elapsed = (time.monotonic() - start) * 1000

        # dictのリスト値をスカラーに変換
        features = {k: v[0] if isinstance(v, list) and v else v for k, v in result.items()}
        return {"features": features, "latency_ms": elapsed}

    def _search_vectors(
        self,
        query_embedding: List[float],
        top_k: int,
        metadata_filter: Optional[Dict[str, str]],
        entity_id: str,
    ) -> dict:
        import time
        start = time.monotonic()

        search_filter = None
        if metadata_filter:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in metadata_filter.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.qdrant.search(
            collection_name=self.collection,
            query_vector=query_embedding,
            query_filter=search_filter,
            limit=top_k,
            with_payload=True,
        )
        elapsed = (time.monotonic() - start) * 1000

        documents = [
            {
                "text": hit.payload.get("text", ""),
                "source": hit.payload.get("source", ""),
                "score": hit.score,
                "chunk_id": hit.id,
            }
            for hit in results
        ]
        return {"documents": documents, "latency_ms": elapsed}

ドキュメントエンベディングパイプライン:チャンキング戦略とインデックス管理

RAGパイプラインでドキュメントをベクトルに変換するプロセスは、Feature StoreのFeature View定義と対称的な概念である。Feature Viewが「どのような変換を経てフィーチャーを作成するか」を定義するように、エンベディングパイプラインは「どのようなチャンキングとエンベディングを経てベクトルを作成するか」を定義する。

"""
ドキュメントチャンキング + エンベディングパイプライン。
Kubeflow Pipelineコンポーネントとして実行、または独立スクリプトとして使用する。
"""
from dataclasses import dataclass
from typing import List, Generator
import hashlib
import json

@dataclass
class ChunkConfig:
    """チャンキング設定。ドキュメントタイプ別に異なる戦略を適用する。"""
    chunk_size: int = 512          # トークン基準
    chunk_overlap: int = 64        # チャンク間の重複
    separators: tuple = ("\n\n", "\n", ". ", " ")
    min_chunk_size: int = 50       # これ未満は前のチャンクにマージ
    metadata_fields: tuple = ("source", "doc_type", "updated_at")

# ドキュメントタイプ別チャンキング戦略
CHUNK_CONFIGS = {
    "product_review": ChunkConfig(chunk_size=256, chunk_overlap=32),
    "technical_doc": ChunkConfig(chunk_size=512, chunk_overlap=64),
    "faq": ChunkConfig(chunk_size=128, chunk_overlap=0),  # FAQは重複不要
    "legal": ChunkConfig(chunk_size=1024, chunk_overlap=128),
}

def recursive_split(text: str, config: ChunkConfig) -> List[str]:
    """再帰的テキスト分割。separatorsを順番に試す。"""
    if len(text.split()) <= config.chunk_size:
        return [text] if len(text.split()) >= config.min_chunk_size else []

    for sep in config.separators:
        parts = text.split(sep)
        if len(parts) > 1:
            chunks = []
            current = ""
            for part in parts:
                candidate = current + sep + part if current else part
                if len(candidate.split()) > config.chunk_size:
                    if current:
                        chunks.append(current.strip())
                    current = part
                else:
                    current = candidate
            if current:
                chunks.append(current.strip())
            return [c for c in chunks if len(c.split()) >= config.min_chunk_size]

    # すべてのseparatorで分割不可の場合、強制分割
    words = text.split()
    return [
        " ".join(words[i:i + config.chunk_size])
        for i in range(0, len(words), config.chunk_size - config.chunk_overlap)
    ]

def create_chunk_id(source: str, chunk_index: int, text: str) -> str:
    """決定論的chunk ID生成。同一ドキュメントの同一チャンクは同じIDを持つ。"""
    content_hash = hashlib.sha256(text.encode()).hexdigest()[:12]
    return f"{source}::{chunk_index}::{content_hash}"

def process_document(
    text: str,
    source: str,
    doc_type: str,
    metadata: dict,
    embedding_fn,
) -> List[dict]:
    """
    1つのドキュメントをチャンキングしエンベディングして、
    Vector Storeにupsertするレコードリストを返す。
    """
    config = CHUNK_CONFIGS.get(doc_type, ChunkConfig())
    chunks = recursive_split(text, config)

    records = []
    for i, chunk_text in enumerate(chunks):
        embedding = embedding_fn(chunk_text)
        chunk_id = create_chunk_id(source, i, chunk_text)
        records.append({
            "id": chunk_id,
            "vector": embedding,
            "payload": {
                "text": chunk_text,
                "source": source,
                "doc_type": doc_type,
                "chunk_index": i,
                "total_chunks": len(chunks),
                "chunk_size_tokens": len(chunk_text.split()),
                **{k: metadata.get(k, "") for k in config.metadata_fields},
            },
        })
    return records

Feature StoreフィーチャーとRAG結果の結合パターン

LLMに渡すプロンプトを構成する際、Feature Storeの構造化フィーチャーとRAGの検索結果をどのように結合するかが応答品質を決定する。

"""
Feature Storeフィーチャー + RAG検索結果を結合して
LLMプロンプトを構成するパターン。
"""
from typing import Dict, List, Any

def build_augmented_prompt(
    user_query: str,
    structured_features: Dict[str, Any],
    retrieved_documents: List[Dict[str, Any]],
    system_instruction: str = "",
) -> str:
    """
    構造化フィーチャーと非構造化検索結果を結合したプロンプト生成。

    原則:
    1. 構造化フィーチャーは「ユーザーコンテキスト」セクションにテーブル形式で提供
    2. RAG検索結果は「参考ドキュメント」セクションに出典と共に提供
    3. フィーチャーとドキュメント間で矛盾がある場合、フィーチャー(リアルタイムデータ)を優先するよう明示
    """
    # 構造化フィーチャーセクション
    feature_lines = []
    feature_display_names = {
        "total_purchases_7d": "直近7日間の購入回数",
        "avg_order_value_30d": "直近30日間の平均注文金額",
        "preferred_category": "好みカテゴリ",
        "membership_tier": "メンバーシップランク",
    }
    for key, value in structured_features.items():
        display = feature_display_names.get(key, key)
        feature_lines.append(f"- {display}: {value}")

    features_section = "\n".join(feature_lines) if feature_lines else "なし"

    # RAG検索結果セクション
    doc_sections = []
    for i, doc in enumerate(retrieved_documents, 1):
        score = doc.get("score", 0)
        # relevanceスコアが低いドキュメントは除外
        if score < 0.7:
            continue
        doc_sections.append(
            f"[ドキュメント {i}](出典:{doc['source']}、関連度:{score:.2f})\n{doc['text']}"
        )

    docs_section = "\n\n".join(doc_sections) if doc_sections else "関連ドキュメントなし"

    prompt = f"""{system_instruction}

## ユーザーコンテキスト(Feature Storeリアルタイムデータ)
{features_section}

## 参考ドキュメント(RAG検索結果)
{docs_section}

## 指示事項
- ユーザーコンテキストと参考ドキュメントの両方を活用して回答してください。
- 構造化データ(ユーザーコンテキスト)とドキュメント内容が矛盾する場合、構造化データを優先します。
- 回答に使用した参考ドキュメント番号を引用してください。
- 参考ドキュメントにない内容は推測しないでください。

## ユーザー質問
{user_query}"""

    return prompt

RAGOps:検索品質モニタリングとインデックス更新自動化

Feature StoreにFreshness SLAがあるように、Vector Storeにもインデックス最新性管理が必要である。これをRAGOpsと呼ぶ。

インデックスFreshness追跡

"""
Vector Storeインデックスの最新性を追跡するモニタリングモジュール。
ドキュメントソースの最新更新時刻とインデックスの最終更新時刻を
比較してstale状態を検知する。
"""
from datetime import datetime, timedelta
from typing import Dict, List
from dataclasses import dataclass
import json

@dataclass
class IndexFreshnessReport:
    collection_name: str
    total_documents: int
    stale_documents: int
    freshness_ratio: float
    oldest_document_age_hours: float
    avg_document_age_hours: float
    recommendations: List[str]

def check_index_freshness(
    qdrant_client,
    collection_name: str,
    freshness_threshold_hours: int = 24,
    sample_size: int = 1000,
) -> IndexFreshnessReport:
    """
    Vector Storeインデックスのドキュメント最新性を点検する。
    """
    # サンプルドキュメントのupdated_atフィールドを照会
    points, _ = qdrant_client.scroll(
        collection_name=collection_name,
        limit=sample_size,
        with_payload=["updated_at", "source"],
    )

    now = datetime.utcnow()
    ages_hours = []
    stale_count = 0
    stale_sources = set()

    for point in points:
        updated_at_str = point.payload.get("updated_at", "")
        if not updated_at_str:
            stale_count += 1
            continue
        try:
            updated_at = datetime.fromisoformat(updated_at_str)
            age_hours = (now - updated_at).total_seconds() / 3600
            ages_hours.append(age_hours)
            if age_hours > freshness_threshold_hours:
                stale_count += 1
                stale_sources.add(point.payload.get("source", "unknown"))
        except ValueError:
            stale_count += 1

    total = len(points)
    freshness_ratio = 1.0 - (stale_count / max(total, 1))

    recommendations = []
    if freshness_ratio < 0.95:
        recommendations.append(
            f"Staleドキュメント比率が{(1-freshness_ratio)*100:.1f}%です。"
            f"エンベディングパイプラインの実行が必要です。"
        )
    if stale_sources:
        recommendations.append(
            f"Staleドキュメントソース:{', '.join(list(stale_sources)[:5])}"
        )
    if ages_hours and max(ages_hours) > freshness_threshold_hours * 3:
        recommendations.append(
            "非常に古いドキュメントが存在します。全体再インデックスを検討してください。"
        )

    return IndexFreshnessReport(
        collection_name=collection_name,
        total_documents=total,
        stale_documents=stale_count,
        freshness_ratio=freshness_ratio,
        oldest_document_age_hours=max(ages_hours) if ages_hours else 0,
        avg_document_age_hours=sum(ages_hours) / len(ages_hours) if ages_hours else 0,
        recommendations=recommendations,
    )

インデックス更新Kubeflow Pipeline

# rag-index-update-pipeline.yaml
# 定期的に変更されたドキュメントを検知してベクトルインデックスを更新する。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: rag-index-updater
  namespace: ml-pipelines
spec:
  schedule: '0 */4 * * *' # 4時間ごとに実行
  concurrencyPolicy: Replace
  workflowSpec:
    entrypoint: update-index
    templates:
      - name: update-index
        steps:
          - - name: detect-changes
              template: detect-document-changes
          - - name: chunk-and-embed
              template: run-embedding-pipeline
              arguments:
                parameters:
                  - name: changed-docs
                    value: '{{steps.detect-changes.outputs.parameters.changed-docs}}'
          - - name: verify-freshness
              template: check-freshness

      - name: detect-document-changes
        container:
          image: ml-platform/doc-change-detector:v1.2.0
          command: [python, detect_changes.py]
          args:
            - --since=4h
            - --output=/tmp/changed_docs.json
          env:
            - name: DOC_STORE_URL
              valueFrom:
                secretKeyRef:
                  name: doc-store-credentials
                  key: url

      - name: run-embedding-pipeline
        container:
          image: ml-platform/embedding-worker:v2.1.0
          command: [python, embed_and_upsert.py]
          args:
            - --docs-file={{inputs.parameters.changed-docs}}
            - --collection=product_knowledge
            - --model=text-embedding-3-large
          resources:
            requests:
              nvidia.com/gpu: 1
              memory: 8Gi

      - name: check-freshness
        container:
          image: ml-platform/rag-monitor:v1.0.0
          command: [python, check_freshness.py]
          args:
            - --collection=product_knowledge
            - --threshold-hours=24
            - --alert-on-failure

コストとパフォーマンスのトレードオフ

Feature StoreとVector Storeを併せて運用する際、コスト構造を理解する必要がある。

構成要素コスト要因削減戦略注意事項
Feast Online Store (Redis)メモリ容量 * 時間TTL設定で未使用エンティティ削除TTLが短すぎるとcache miss急増
Vector Store (Qdrant)ベクトル次元数 _ ドキュメント数 _ レプリカ次元削減(1536 -> 512)、量子化適用量子化時recall 1-3%低下
エンベディングAPIコストトークン数 * 呼び出し回数変更分のみ再エンベディング、バッチ処理モデル変更時に全体再エンベディング必要
CDCパイプライン (Kafka)パーティション数 * 保管期間圧縮、保管期間短縮保管期間がbackfillウィンドウ未満だと復旧不可
サービングGPU (LLM)GPU時間 * モデルサイズバッチ推論、モデル量子化量子化レベル別品質ベンチマーク必須

障害シナリオ別対応ランブック

シナリオ1:Vector Store検索結果が突然不正確になる

症状:RAGチャットボットの回答品質が急落。ユーザーフィードバック「的外れな回答」が増加。
     Retrieval recall@50.85から0.52に低下。

点検順序:
  1. 最近のエンベディングパイプライン実行ログを確認
     -> エンベディングモデルのバージョンが変わったかチェック
  2. Qdrant collection infoを確認
     -> vector dimensionが変更されたかチェック
  3. 原因:エンベディングモデルをtext-embedding-3-smallからtext-embedding-3-largeに
     アップグレードしたが、既存ベクトルは再エンベディングしなかったため次元不一致

解決:
  1. エンベディングモデル変更時は必ず全体再インデックスを実行
  2. 新規collection作成 -> 全体エンベディング -> alias切り替え(blue-greenパターン)
  3. 再インデックス完了前までは以前のモデルにロールバック

シナリオ2:Feature Store照会とVector検索のどちらかのみ失敗

症状:統合照会APIで間欠的500エラー。
     エラーログ:"TimeoutError: Feast online store read timed out after 50ms"

原因:Feature Store(Redis)はタイムアウトだがVector Store(Qdrant)は正常。
     一方の失敗が全体リクエストを失敗させる。

解決:
  1. 統合照会APIにpartial failure許容ロジックを追加
  2. Feature Store失敗時:デフォルト値(人口統計平均)を使用、ログ記録
  3. Vector Store失敗時:事前キャッシュされた人気ドキュメントでfallback
  4. 両方失敗時:LLMにコンテキストなしで一般的な応答を生成(品質低下警告を含む)

ブループリント導入ロードマップ

このブループリントを一度に実装しようとせず、4段階に分けて導入する。

Phase 1(4週間):基盤構築

  • Feast + Redis Online Store構成
  • Qdrant単一ノードデプロイ
  • 統合照会APIプロトタイプ
  • 検証:単一ユースケースでFeature Store照会 + Vector検索の応答時間 100ms未満

Phase 2(4週間):パイプライン自動化

  • CDCパイプライン構築(Feature Store freshness SLA達成)
  • エンベディングパイプライン自動化(変更検知 -> 再エンベディング)
  • 検証:Feature freshness p95 5分未満、インデックスfreshness 4時間未満

Phase 3(4週間):品質モニタリング

  • Retrieval品質メトリクス(recall@k、MRR)ダッシュボード
  • Feature Store parityテストCI統合
  • アラーティング体系構築
  • 検証:品質回帰時30分以内にアラート受信

Phase 4(4週間):最適化とスケーリング

  • ベクトル量子化、TTL最適化、コストダッシュボード
  • マルチcollection対応(ドキュメントタイプ別分離)
  • Blue-greenインデックス切り替え自動化
  • 検証:月間インフラコストPhase 2対比30%削減

クイズ

クイズ

Q1. Feature StoreのFeature ViewとRAGパイプラインのチャンキング設定が対称関係にある理由は?

どちらも「ソースデータをどのような変換を経てサービング可能な形に作るか」を定義する宣言的な仕様であり、変換ロジックの一貫性が学習-サービング/インデックス-検索品質を決定するためである。

Q2. 統合照会APIでFeature Store照会とVector検索を並列実行する理由とリスクは?

全体応答時間をmax(Feature Storeレイテンシ、Vector Storeレイテンシ)に短縮するためである。リスクは一方が失敗すると全体が失敗する可能性があるため、partial failure許容ロジックが必要となる。

Q3. エンベディングモデルをアップグレードする際に既存ベクトルを再エンベディングしないとどのような問題が発生するか?

新モデルで生成したクエリエンベディングと旧モデルで生成したドキュメントエンベディングが同じベクトル空間にないため、similarity計算が無意味になりretrieval recallが急落する。

Q4. Blue-greenインデックス切り替えパターンの核心的なステップは?

新規collectionに全体再エンベディング完了 -> aliasを新collectionに切り替え -> 旧collectionを一定期間保持後削除。切り替え時点でダウンタイムがない。

Q5. RAGOpsでインデックスfreshness SLAを設定する際に考慮すべき主要要素は?

ドキュメント変更頻度、エンベディングパイプライン実行コスト、ビジネス要件(リアルタイム性の必要性)、エンベディングAPI呼び出しコストを総合的に考慮する必要がある。FAQのように変更が稀なドキュメントは24時間、商品レビューは4時間など、ドキュメントタイプ別に差別化する。

Q6. Feature Storeの構造化フィーチャーとRAGドキュメント内容が矛盾した場合、構造化データを優先するようLLMに指示する理由は?

Feature StoreのデータはリアルタイムのcDCで更新され最新性が保証されるが、RAGドキュメントはインデックスサイクルにより遅延する可能性がある。リアルタイムデータがより信頼できるソースであるためである。

Q7. ブループリントを4段階に分けて導入する最大の理由は?

各段階で定量的検証基準を満たしているか確認してから次の段階に進む必要があり、一度に実装すると障害原因の切り分けが困難で、チームの学習曲線を消化できないためである。

References