Skip to content
Published on

RAGパイプライン本番構築ガイド: ベクトルDB選定からチャンキング・リランキング・評価まで

Authors
  • Name
    Twitter
RAG Pipeline Production Guide

はじめに

RAG(Retrieval-Augmented Generation)は、LLMのハルシネーション(幻覚)問題を解決し、最新情報に基づいた回答を生成するための重要なアーキテクチャパターンである。単に「検索してプロンプトに入れる」だけに見えるかもしれないが、本番環境では埋め込みモデルの選定、ベクトルDBの運用、チャンキング戦略、ハイブリッド検索、リランキング、評価体系まで数多くのエンジニアリング判断が必要となる。

本記事では、RAGパイプラインの全体アーキテクチャを分析し、各構成要素の選定基準と実装方法を実践的なコードとともに解説する。特に本番環境で頻繁に発生する障害パターンとその対応戦略、そしてRAGASを活用した体系的な評価方法論まで包括的に扱う。

RAGアーキテクチャ概要

RAGパイプラインは大きく3つのステージで構成される:Retrieval(検索)、Augmentation(拡張)、Generation(生成)。各ステージは独立して最適化でき、パイプライン全体の品質は最も弱いステージによって決まる。

エンドツーエンドのアーキテクチャフロー

[ユーザークエリ]
[クエリ処理] ← クエリ分解、HyDE、クエリ拡張
[検索] ← BM25 + ベクトル検索(ハイブリッド)
[リランキング] ← Cross-encoder、Cohere Rerank
[コンテキスト組立] ← チャンク統合、重複排除、トークン制限
[生成] ← LLMにコンテキストとクエリを渡す
[後処理] ← 出典表示、信頼度スコア、ハルシネーション検証

基本的なRAG実装

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# 1. ドキュメントチャンキング
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)

# 2. 埋め込みとベクトルストア作成
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    collection_name="production_docs"
)

# 3. Retriever設定
retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 5, "fetch_k": 20}
)

# 4. RAGチェーン構築
template = """以下のコンテキストに基づいて質問に回答してください。
コンテキストから答えが見つからない場合は「情報が不足しています」と回答してください。

コンテキスト:
{context}

質問: {question}
回答:"""

prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

response = rag_chain.invoke("RAGパイプラインの主要な構成要素は?")

埋め込みモデル比較

埋め込みモデルは、RAGパイプラインの検索品質を決定する最も重要な要素の一つである。2025-2026年現在の主要な埋め込みモデルの特性を比較する。

モデル次元数最大トークンMTEB平均多言語コスト特徴
OpenAI text-embedding-3-large3072819164.6O$0.13/1M tokens次元削減可能
OpenAI text-embedding-3-small1536819162.3O$0.02/1M tokensコスト効率的
Cohere embed-v3102451264.5O (100+)$0.10/1M tokens検索特化
BGE-M31024819266.1O (100+)無料(セルフホスト)Dense+Sparse+ColBERT
E5-mistral-7b-instruct40963276866.6O無料(セルフホスト)指示ベース埋め込み
Jina-embeddings-v31024819265.5O (89)$0.02/1M tokensタスク特化LoRA

埋め込みモデル選定基準

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List

class EmbeddingBenchmark:
    """埋め込みモデル比較ベンチマーク"""

    def __init__(self, models: List[str]):
        self.models = {}
        for model_name in models:
            self.models[model_name] = SentenceTransformer(model_name)

    def evaluate_retrieval_quality(
        self,
        queries: List[str],
        relevant_docs: List[List[str]],
        corpus: List[str]
    ) -> dict:
        results = {}
        for name, model in self.models.items():
            corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
            query_embeddings = model.encode(queries, normalize_embeddings=True)

            # コサイン類似度計算
            scores = np.dot(query_embeddings, corpus_embeddings.T)

            # Recall@k計算
            recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
            recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)

            results[name] = {
                "recall@5": recall_at_5,
                "recall@10": recall_at_10,
                "embedding_dim": model.get_sentence_embedding_dimension(),
                "encoding_speed": self._measure_speed(model, queries)
            }
        return results

    def _compute_recall(self, scores, relevant_docs, k):
        recalls = []
        for i, rel_docs in enumerate(relevant_docs):
            top_k_indices = np.argsort(scores[i])[-k:]
            retrieved = set(top_k_indices)
            relevant = set(rel_docs)
            recalls.append(len(retrieved & relevant) / len(relevant))
        return np.mean(recalls)

    def _measure_speed(self, model, texts, n_runs=3):
        import time
        times = []
        for _ in range(n_runs):
            start = time.time()
            model.encode(texts)
            times.append(time.time() - start)
        return np.mean(times)

ベクトルデータベース比較

ベクトルデータベースの選定は、RAGシステムの運用複雑度、コスト、パフォーマンスに直接影響する。主要なベクトルDBを多角的に比較する。

項目PineconeMilvusWeaviateQdrantChroma
デプロイ方式フルマネージドセルフホスト/Zilliz Cloudセルフホスト/Cloudセルフホスト/Cloudインメモリ/セルフホスト
最大ベクトル数数十億数十億数十億数十億数百万
検索アルゴリズム独自エンジンHNSW, IVF, DiskANNHNSWHNSWHNSW
ハイブリッド検索O(Sparse+Dense)O(BM25+Dense)O(BM25+Dense)O(Sparse+Dense)X
マルチテナンシーO(ネームスペース)O(パーティション)O(テナント)O(コレクション)O(コレクション)
フィルタリングメタデータフィルタスカラーフィルタリングGraphQLフィルタペイロードフィルタメタデータフィルタ
価格(100万ベクトル)約$70/月無料(セルフホスト)無料(セルフホスト)無料(セルフホスト)無料
適用用途クイックプロト、マネージド志向大規模エンタープライズGraphQLエコシステム高性能フィルタリング開発/小規模

Qdrantを活用した本番ベクトル検索

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue,
    SearchParams, HnswConfigDiff
)
import uuid

class ProductionVectorStore:
    """本番ベクトルストア管理クラス"""

    def __init__(self, url: str, api_key: str, collection_name: str):
        self.client = QdrantClient(url=url, api_key=api_key)
        self.collection_name = collection_name

    def create_collection(self, vector_size: int = 1536):
        """HNSW インデックス最適化されたコレクション作成"""
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=Distance.COSINE,
                on_disk=True  # 大規模データセット向けディスクベース
            ),
            hnsw_config=HnswConfigDiff(
                m=16,              # グラフ接続数(デフォルト: 16)
                ef_construct=128,  # インデックス構築時の探索範囲
                full_scan_threshold=10000  # この数以下なら全スキャン
            ),
            optimizers_config={
                "indexing_threshold": 20000,
                "memmap_threshold": 50000
            }
        )

    def upsert_documents(self, documents: list, embeddings: list, metadata: list):
        """バッチアップサート"""
        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding,
                payload={**meta, "text": doc}
            )
            for doc, embedding, meta in zip(documents, embeddings, metadata)
        ]

        # バッチサイズ100で分割アップロード
        batch_size = 100
        for i in range(0, len(points), batch_size):
            batch = points[i:i + batch_size]
            self.client.upsert(
                collection_name=self.collection_name,
                points=batch,
                wait=True
            )

    def hybrid_search(self, query_vector: list, query_text: str,
                      filters: dict = None, top_k: int = 10):
        """ハイブリッド検索(ベクトル + メタデータフィルタ)"""
        search_filter = None
        if filters:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in filters.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            query_filter=search_filter,
            limit=top_k,
            search_params=SearchParams(
                hnsw_ef=128,  # 検索時の探索範囲(高いほど正確だが遅い)
                exact=False   # 近似検索を使用
            )
        )
        return results

チャンキング戦略

チャンキングは、RAG品質に最も大きな影響を与える前処理ステップである。不適切なチャンキングは関連情報を分離させたり、不要なノイズを含めてしまい、検索品質を大幅に低下させる。

チャンキング戦略比較

戦略利点欠点適用用途
固定サイズ実装簡単、予測可能意味的境界を無視均一なテキスト
再帰的段落/文の境界を尊重最適サイズのチューニング必要汎用ドキュメント
セマンティック意味単位を保持計算コスト高い構造のないテキスト
ドキュメント構造ベース見出し/セクション階層を維持パーサー実装が必要Markdown、HTML
親子(Parent-Child)検索精度 + 十分なコンテキストストレージ2倍技術ドキュメント

セマンティックチャンキング実装

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np

class AdvancedChunker:
    """複数のチャンキング戦略をサポートする高度なチャンカー"""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)

    def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
        """セマンティックチャンキング - 意味変化ポイントで分割"""
        chunker = SemanticChunker(
            self.embeddings,
            breakpoint_threshold_type="percentile",
            breakpoint_threshold_amount=breakpoint_threshold * 100
        )
        return chunker.split_text(text)

    def parent_child_chunk(self, text: str,
                            parent_size: int = 2000,
                            child_size: int = 400,
                            child_overlap: int = 50):
        """親子チャンキング - 検索は子、コンテキストは親"""
        from langchain.text_splitter import RecursiveCharacterTextSplitter

        # 親チャンク生成
        parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=parent_size,
            chunk_overlap=0
        )
        parent_chunks = parent_splitter.split_text(text)

        # 各親から子チャンク生成
        child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=child_size,
            chunk_overlap=child_overlap
        )

        result = []
        for i, parent in enumerate(parent_chunks):
            children = child_splitter.split_text(parent)
            for child in children:
                result.append({
                    "child_text": child,
                    "parent_text": parent,
                    "parent_id": i
                })
        return result

    def markdown_structure_chunk(self, markdown_text: str):
        """Markdownドキュメント構造ベースのチャンキング"""
        from langchain.text_splitter import MarkdownHeaderTextSplitter

        headers_to_split_on = [
            ("#", "h1"),
            ("##", "h2"),
            ("###", "h3"),
        ]

        splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on
        )
        chunks = splitter.split_text(markdown_text)

        # 各チャンクに階層構造メタデータを追加
        enriched_chunks = []
        for chunk in chunks:
            enriched_chunks.append({
                "text": chunk.page_content,
                "metadata": chunk.metadata,
                "hierarchy": " > ".join(
                    chunk.metadata.get(h, "")
                    for h in ["h1", "h2", "h3"]
                    if chunk.metadata.get(h)
                )
            })
        return enriched_chunks

検索最適化戦略

単純なベクトル類似度検索だけでは、本番レベルの検索品質を達成することは難しい。ハイブリッド検索、HyDE、クエリ分解などの最適化技法を組み合わせる必要がある。

ハイブリッド検索(BM25 + ベクトル)

from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple

class HybridRetriever:
    """BM25 + ベクトル検索を組み合わせたハイブリッド検索器"""

    def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
        self.documents = documents
        self.embeddings = OpenAIEmbeddings(model=embeddings_model)

        # BM25インデックス構築
        tokenized_docs = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)

        # ベクトルインデックス構築
        self.doc_embeddings = np.array(
            self.embeddings.embed_documents(documents)
        )

    def search(self, query: str, top_k: int = 5,
               alpha: float = 0.5) -> List[Tuple[str, float]]:
        """
        ハイブリッド検索の実行
        alpha: ベクトル検索の重み(0=BM25のみ、1=ベクトルのみ)
        """
        # BM25スコア
        bm25_scores = self.bm25.get_scores(query.lower().split())
        bm25_scores = self._normalize_scores(bm25_scores)

        # ベクトル類似度スコア
        query_embedding = np.array(self.embeddings.embed_query(query))
        vector_scores = np.dot(self.doc_embeddings, query_embedding)
        vector_scores = self._normalize_scores(vector_scores)

        # 重み付き結合(Reciprocal Rank Fusionの代替)
        combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores

        # 上位k件を返す
        top_indices = np.argsort(combined_scores)[-top_k:][::-1]
        return [
            (self.documents[i], combined_scores[i])
            for i in top_indices
        ]

    def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
        """Min-Max正規化"""
        min_s, max_s = scores.min(), scores.max()
        if max_s == min_s:
            return np.zeros_like(scores)
        return (scores - min_s) / (max_s - min_s)

HyDE(Hypothetical Document Embedding)

HyDEは、クエリに対する仮想的な回答をまず生成し、その回答の埋め込みを検索に使用する技法である。質問と回答の間の意味的なギャップを縮め、検索品質を向上させる。

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate

class HyDERetriever:
    """HyDEベースの検索器"""

    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

        self.hyde_prompt = ChatPromptTemplate.from_template(
            "次の質問に対して仮想的な回答を作成してください。"
            "実際の正確性よりも、関連するキーワードと概念を含めることが重要です。\n\n"
            "質問: {question}\n\n"
            "仮想回答:"
        )

    def retrieve(self, query: str, top_k: int = 5):
        """HyDE検索の実行"""
        # 1. 仮想回答の生成
        chain = self.hyde_prompt | self.llm
        hypothetical_answer = chain.invoke({"question": query}).content

        # 2. 仮想回答の埋め込み生成
        hyde_embedding = self.embeddings.embed_query(hypothetical_answer)

        # 3. 仮想回答の埋め込みで検索
        results = self.vectorstore.similarity_search_by_vector(
            hyde_embedding, k=top_k
        )
        return results

クエリ分解(Query Decomposition)

複雑な質問をサブ質問に分解し、それぞれで検索した後、結果を統合する戦略である。

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json

class QueryDecomposer:
    """複雑なクエリをサブクエリに分解"""

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.decompose_prompt = ChatPromptTemplate.from_template(
            "次の質問を検索に最適化された2〜4個のサブ質問に分解してください。\n"
            "JSON配列形式で返してください。\n\n"
            "元の質問: {question}\n\n"
            "サブ質問:"
        )

    def decompose(self, question: str) -> list:
        """クエリ分解"""
        chain = self.decompose_prompt | self.llm
        result = chain.invoke({"question": question}).content

        try:
            sub_queries = json.loads(result)
        except json.JSONDecodeError:
            sub_queries = [question]

        return sub_queries

    def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
        """分解されたクエリで検索し結果を統合"""
        sub_queries = self.decompose(question)

        all_docs = []
        seen_contents = set()

        for sub_query in sub_queries:
            docs = retriever.invoke(sub_query)[:top_k]
            for doc in docs:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    all_docs.append(doc)

        return all_docs

リランキング

初期検索結果をより精密なモデルで再評価し、順位を並べ替えるステップである。Bi-encoderベースのベクトル検索よりも、Cross-encoderベースのリランキングがより正確な関連性判定を行える。

Cohere Rerankの活用

import cohere
from typing import List, Dict

class CohereReranker:
    """Cohere Rerank APIを活用したリランキング"""

    def __init__(self, api_key: str, model: str = "rerank-v3.5"):
        self.client = cohere.Client(api_key)
        self.model = model

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """ドキュメントのリランキング"""
        response = self.client.rerank(
            model=self.model,
            query=query,
            documents=documents,
            top_n=top_n,
            return_documents=True
        )

        results = []
        for hit in response.results:
            results.append({
                "text": hit.document.text,
                "relevance_score": hit.relevance_score,
                "index": hit.index
            })
        return results


class CrossEncoderReranker:
    """ローカルCross-Encoderモデルベースのリランキング"""

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """Cross-encoderでリランキング"""
        pairs = [[query, doc] for doc in documents]
        scores = self.model.predict(pairs)

        # スコア順にソート
        scored_docs = list(zip(documents, scores, range(len(documents))))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        results = []
        for text, score, idx in scored_docs[:top_n]:
            results.append({
                "text": text,
                "relevance_score": float(score),
                "index": idx
            })
        return results

リランキングパイプライン統合

class RAGPipelineWithReranking:
    """リランキング統合RAGパイプライン"""

    def __init__(self, retriever, reranker, llm):
        self.retriever = retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str, top_k_retrieve: int = 20,
              top_k_rerank: int = 5) -> str:
        """検索 -> リランキング -> 生成"""
        # ステージ1: 初期検索(広範囲)
        initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
        doc_texts = [doc.page_content for doc in initial_docs]

        # ステージ2: リランキング(精密)
        reranked = self.reranker.rerank(
            query=question,
            documents=doc_texts,
            top_n=top_k_rerank
        )

        # ステージ3: コンテキスト組立
        context = "\n\n---\n\n".join(
            r["text"] for r in reranked
        )

        # ステージ4: LLM生成
        prompt = f"""以下のコンテキストに基づいて質問に回答してください。
各情報の出典を表示してください。

コンテキスト:
{context}

質問: {question}
回答:"""

        response = self.llm.invoke(prompt)
        return response.content

RAGASを活用したRAG評価

RAGAS(Retrieval Augmented Generation Assessment)は、RAGパイプラインを自動的に評価するフレームワークである。検索品質と生成品質を別々のメトリクスで測定する。

RAGASコアメトリクス

メトリクス測定対象説明理想値
Context Precision検索検索されたコンテキスト中の関連率0.8以上
Context Recall検索必要情報がコンテキストに含まれる率0.9以上
Faithfulness生成回答がコンテキストに根拠を持つ度合い0.9以上
Answer Relevancy生成回答が質問に適切である度合い0.8以上
Answer Correctness全体正解と比較した正確度0.7以上

RAGAS評価実装

from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    answer_correctness
)
from datasets import Dataset

class RAGEvaluator:
    """RAGASベースのRAG評価器"""

    def __init__(self):
        self.metrics = [
            context_precision,
            context_recall,
            faithfulness,
            answer_relevancy,
            answer_correctness
        ]

    def create_eval_dataset(self, eval_samples: list) -> Dataset:
        """評価データセット作成"""
        data = {
            "question": [],
            "answer": [],
            "contexts": [],
            "ground_truth": []
        }

        for sample in eval_samples:
            data["question"].append(sample["question"])
            data["answer"].append(sample["generated_answer"])
            data["contexts"].append(sample["retrieved_contexts"])
            data["ground_truth"].append(sample["expected_answer"])

        return Dataset.from_dict(data)

    def evaluate(self, eval_samples: list) -> dict:
        """RAGパイプライン評価の実行"""
        dataset = self.create_eval_dataset(eval_samples)

        result = evaluate(
            dataset=dataset,
            metrics=self.metrics
        )

        return {
            "context_precision": result["context_precision"],
            "context_recall": result["context_recall"],
            "faithfulness": result["faithfulness"],
            "answer_relevancy": result["answer_relevancy"],
            "answer_correctness": result["answer_correctness"],
            "overall_score": sum(result.values()) / len(result)
        }

    def generate_report(self, results: dict) -> str:
        """評価レポート生成"""
        report = "=== RAG Pipeline Evaluation Report ===\n\n"

        for metric, score in results.items():
            status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
            report += f"  {metric}: {score:.4f} [{status}]\n"

        report += f"\n  Overall: {results.get('overall_score', 0):.4f}\n"
        return report

本番デプロイパターン

非同期インデクシングパイプライン

本番環境では、ドキュメントのインデクシングと検索サービングを分離する必要がある。新しいドキュメントが追加された際、非同期で埋め込みを生成しインデックスを更新する。

import asyncio
from datetime import datetime
from typing import Optional
import hashlib

class AsyncIndexingPipeline:
    """非同期ドキュメントインデクシングパイプライン"""

    def __init__(self, embeddings, vector_store, chunker):
        self.embeddings = embeddings
        self.vector_store = vector_store
        self.chunker = chunker
        self.index_queue = asyncio.Queue()
        self._processed_hashes = set()

    async def enqueue_document(self, doc_id: str, content: str,
                                metadata: Optional[dict] = None):
        """ドキュメントをインデクシングキューに追加"""
        doc_hash = hashlib.md5(content.encode()).hexdigest()

        if doc_hash in self._processed_hashes:
            return {"status": "skipped", "reason": "duplicate"}

        await self.index_queue.put({
            "doc_id": doc_id,
            "content": content,
            "metadata": metadata or {},
            "hash": doc_hash,
            "enqueued_at": datetime.utcnow().isoformat()
        })
        return {"status": "enqueued", "queue_size": self.index_queue.qsize()}

    async def process_queue(self, batch_size: int = 10):
        """キューからバッチ単位でドキュメントを処理"""
        while True:
            batch = []
            try:
                for _ in range(batch_size):
                    item = self.index_queue.get_nowait()
                    batch.append(item)
            except asyncio.QueueEmpty:
                pass

            if batch:
                await self._process_batch(batch)
            else:
                await asyncio.sleep(1)

    async def _process_batch(self, batch: list):
        """バッチ処理: チャンキング -> 埋め込み -> アップサート"""
        all_chunks = []
        all_metadata = []

        for item in batch:
            chunks = self.chunker.semantic_chunk(item["content"])
            for chunk in chunks:
                all_chunks.append(chunk)
                all_metadata.append({
                    "doc_id": item["doc_id"],
                    "hash": item["hash"],
                    **item["metadata"]
                })

        # バッチ埋め込み生成
        chunk_embeddings = self.embeddings.embed_documents(all_chunks)

        # ベクトルストアにアップサート
        self.vector_store.upsert_documents(
            documents=all_chunks,
            embeddings=chunk_embeddings,
            metadata=all_metadata
        )

        for item in batch:
            self._processed_hashes.add(item["hash"])

検索結果キャッシング

頻繁に繰り返されるクエリに対してキャッシングを適用し、レスポンス速度を向上させAPIコストを削減する。

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis

class RAGCache:
    """RAG検索結果キャッシュ"""

    def __init__(self, redis_url: str = "redis://localhost:6379",
                 ttl_seconds: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl_seconds

    def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
        """クエリとフィルタからキャッシュキーを生成"""
        key_data = {"query": query.lower().strip(), "filters": filters or {}}
        key_hash = hashlib.sha256(
            json.dumps(key_data, sort_keys=True).encode()
        ).hexdigest()
        return f"rag:cache:{key_hash}"

    def get(self, query: str, filters: Optional[dict] = None):
        """キャッシュから検索結果を取得"""
        key = self._cache_key(query, filters)
        cached = self.redis.get(key)

        if cached:
            data = json.loads(cached)
            data["cache_hit"] = True
            return data
        return None

    def set(self, query: str, results: list,
            filters: Optional[dict] = None):
        """検索結果をキャッシュに保存"""
        key = self._cache_key(query, filters)
        data = {
            "results": results,
            "cached_at": datetime.utcnow().isoformat(),
            "query": query
        }
        self.redis.setex(key, self.ttl, json.dumps(data))

    def invalidate_by_pattern(self, pattern: str):
        """パターンマッチングによるキャッシュ無効化"""
        keys = self.redis.keys(f"rag:cache:*{pattern}*")
        if keys:
            self.redis.delete(*keys)

障害事例と対応

検索品質劣化パターン

障害タイプ症状原因対応策
埋め込みドリフト時間経過に伴う検索精度の低下埋め込みモデルバージョン変更、データ分布変化定期的な再インデクシング、モデルバージョン固定
チャンク不整合関連情報が検索されないチャンクサイズ/戦略の不適切さチャンクサイズの実験、オーバーラップ調整
フィルタ過学習結果0件のクエリ増加メタデータフィルタが厳しすぎるフィルタフォールバック戦略、フィルタ緩和
コンテキスト汚染LLMが無関係な情報で回答リランキング不在、top-k過多リランキング導入、top-k削減
レイテンシ増加レスポンス時間2秒超過インデックス非最適化、ネットワーク遅延インデックスチューニング、キャッシング、非同期処理

埋め込みドリフトモニタリング

import numpy as np
from datetime import datetime
from typing import List

class EmbeddingDriftMonitor:
    """埋め込みドリフト検出およびモニタリング"""

    def __init__(self, reference_embeddings: np.ndarray):
        self.reference_mean = np.mean(reference_embeddings, axis=0)
        self.reference_std = np.std(reference_embeddings, axis=0)
        self.drift_history = []

    def check_drift(self, new_embeddings: np.ndarray,
                     threshold: float = 0.1) -> dict:
        """新しい埋め込みのドリフト検査"""
        new_mean = np.mean(new_embeddings, axis=0)

        # コサイン距離でドリフトを測定
        cosine_sim = np.dot(self.reference_mean, new_mean) / (
            np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
        )
        drift_score = 1 - cosine_sim

        # 分布比較(KLダイバージェンスの近似)
        distribution_shift = np.mean(
            np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
            / (self.reference_std + 1e-8)
        )

        result = {
            "drift_score": float(drift_score),
            "distribution_shift": float(distribution_shift),
            "is_drifted": drift_score > threshold,
            "timestamp": datetime.utcnow().isoformat(),
            "recommendation": (
                "再インデクシングが必要" if drift_score > threshold
                else "正常範囲"
            )
        }

        self.drift_history.append(result)
        return result

検索品質劣化時のフォールバック戦略

class RAGWithFallback:
    """フォールバック戦略を含むRAGパイプライン"""

    def __init__(self, primary_retriever, fallback_retriever,
                 reranker, llm):
        self.primary = primary_retriever
        self.fallback = fallback_retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str) -> dict:
        """フォールバックを含むクエリ処理"""
        # プライマリ検索
        primary_docs = self.primary.invoke(question)

        if not primary_docs:
            # フォールバック1: フィルタ緩和
            primary_docs = self.fallback.invoke(question)

        if not primary_docs:
            return {
                "answer": "関連情報が見つかりませんでした。",
                "confidence": 0.0,
                "source": "no_results"
            }

        # リランキング
        reranked = self.reranker.rerank(
            query=question,
            documents=[d.page_content for d in primary_docs],
            top_n=5
        )

        # 最低関連度の検証
        if reranked[0]["relevance_score"] < 0.3:
            return {
                "answer": "高い信頼度の回答を生成できません。",
                "confidence": reranked[0]["relevance_score"],
                "source": "low_confidence"
            }

        # LLM生成
        context = "\n\n".join(r["text"] for r in reranked)
        answer = self.llm.invoke(
            f"コンテキスト:\n{context}\n\n質問: {question}\n回答:"
        ).content

        return {
            "answer": answer,
            "confidence": reranked[0]["relevance_score"],
            "source": "primary",
            "num_sources": len(reranked)
        }

本番チェックリスト

設計フェーズ

  • ユースケースに応じた埋め込みモデルベンチマークの実施
  • ベクトルDB選定: 規模、予算、運用能力を考慮
  • チャンキング戦略のA/Bテスト計画策定
  • 評価データセット(最低100個のQAペア)構築

開発フェーズ

  • ハイブリッド検索(BM25 + ベクトル)実装
  • リランキングパイプライン統合
  • 非同期インデクシングパイプライン構築
  • キャッシングレイヤー実装(Redis/Memcached)
  • エラーハンドリングおよびフォールバック戦略実装

デプロイフェーズ

  • RAGASメトリクスベースの品質ゲート設定
  • 埋め込みドリフトモニタリングダッシュボード構築
  • 検索レイテンシアラート設定(P95基準500ms)
  • インデックス再構築自動化パイプライン構築
  • ロードテスト実施(目標QPS比2倍)

運用フェーズ

  • 週次の検索品質レポート生成
  • 月次の埋め込みモデルアップデートレビュー
  • ユーザーフィードバックベースの評価データセット更新
  • コストモニタリング(埋め込みAPI呼び出し、ベクトルDBストレージ)
  • 定期的なインデックス再構築(四半期に1回以上)

参考資料