Skip to content
Published on

RAGパイプライン高度化戦略:チャンキング、リランキング、ハイブリッド検索最適化

Authors
  • Name
    Twitter
RAGパイプライン高度化戦略

はじめに

RAG(Retrieval-Augmented Generation)をプロダクションに適用したチームなら、一度はこんな経験をしたことがあるでしょう。「検索はされるが回答がおかしい」「関連文書が確かにあるのに検索にヒットしない」「短い質問にはうまくいくが、複雑な質問にはハルシネーションが起きる」。これらの問題の根本原因はほとんどの場合検索品質にあります。LLMがどれほど賢くても、間違ったコンテキストを受け取れば間違った回答を生成するしかありません。

この記事では、RAGパイプラインの検索品質を最大化するための3つの核心軸を扱います。

  1. チャンキング(Chunking):文書をどのように分割するか
  2. ハイブリッド検索(Hybrid Search):DenseベクトルとSparseキーワード検索をどう組み合わせるか
  3. リランキング(Reranking):検索結果をどのように再順位付けするか

各戦略について実践コード、ベンチマーク数値、運用上の注意事項を具体的に見ていきます。2026年3月時点の最新モデルとツールをベースに作成しました。

RAGパイプラインアーキテクチャ概要

高度化されたRAGパイプラインの全体フローは以下の通りです。

[インデクシングフェーズ]
文書収集 → 前処理 → チャンキング → エンベディング → ベクトルDB保存 + 転置インデックス保存

[クエリフェーズ]
質問 → クエリ変換 → ハイブリッド検索 (Dense + Sparse)
     → リランキング → 上位K件選択 → プロンプト構成 → LLM生成

基本RAGとの核心的な違いは3つです。

  • チャンキング戦略の細分化:単純な固定サイズではなく、セマンティック、再帰的、エージェンティックチャンキングを適用
  • ハイブリッド検索:ベクトル類似度だけに依存せず、BM25キーワード検索を併用
  • リランキングレイヤーの追加:初期検索結果をCross-Encoderで再評価して精度を向上

この3つを組み合わせると、検索精度(Precision@K)を30〜50%以上改善できます。以下でそれぞれを深く見ていきましょう。

チャンキング戦略の深堀り

チャンキングはRAGパイプラインで最初に決定する要素であり、全体のパフォーマンスへの影響が最も大きいです。間違ったチャンキングはその後どのような高度化をしても回復が困難です。

チャンキング戦略比較表

戦略原理適した状況チャンクサイズ長所短所
Fixed-size固定トークン/文字数で分割均一な構造の文書256-512トークン実装が単純、高速意味単位を無視
Recursive区切り文字の階層で再帰的に分割汎用テキスト512-1024トークン段落/文境界を尊重区切り文字の設定必要
Semanticエンベディング類似度で意味境界を検出トピック遷移が多い文書可変意味保存に優れるエンベディングコスト、低速
AgenticLLMが文書構造を分析して分割複雑な技術文書可変最高品質高コスト、低速

Fixed-sizeチャンキング

最もシンプルですが依然として効果的な戦略です。2026年のベンチマークでも、512トークンの固定サイズチャンキングが複雑なセマンティックチャンキングより良い結果を示すケースが報告されています。

from langchain_text_splitters import CharacterTextSplitter

# 固定サイズチャンキング - 最も基本的な方式
fixed_splitter = CharacterTextSplitter(
    separator="\n",
    chunk_size=512,
    chunk_overlap=50,       # 10%オーバーラップ推奨
    length_function=len,
    is_separator_regex=False,
)

chunks = fixed_splitter.split_text(document_text)
print(f"合計 {len(chunks)}個のチャンク作成")

推奨設定:

  • ファクトイド(事実確認)クエリ:256〜512トークン
  • 分析型クエリ:1024+トークン
  • オーバーラップ:チャンクサイズ全体の10〜20%

Recursiveチャンキング

LangChainのRecursiveCharacterTextSplitterは区切り文字の階層に沿って再帰的に分割します。段落、文、単語の順序で境界を尊重しながら目標サイズに合わせます。

from langchain_text_splitters import RecursiveCharacterTextSplitter

# 再帰的チャンキング - プロダクション基本推奨
recursive_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", ". ", " ", ""],
    chunk_size=512,
    chunk_overlap=64,
    length_function=len,
    add_start_index=True,   # 原文位置追跡用
)

chunks = recursive_splitter.split_documents(documents)

# 各チャンクにメタデータが自動的に含まれる
for chunk in chunks[:3]:
    print(f"サイズ: {len(chunk.page_content)}, "
          f"開始位置: {chunk.metadata.get('start_index')}")

実践的なヒント: ほとんどのプロダクション環境では、Recursiveチャンキングから始めることを推奨します。シンプルでありながら段落の境界を尊重するため、コスト対効果が最も優れています。

Semanticチャンキング

エンベディングを使用して隣接する文間の意味的類似度を計算し、類似度が急激に低下する地点で分割します。

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

# セマンティックチャンキング - 意味単位での分割
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

semantic_splitter = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile",  # percentile, standard_deviation, interquartile
    breakpoint_threshold_amount=75,          # 75パーセンタイル以上の差で分割
)

semantic_chunks = semantic_splitter.split_text(document_text)
print(f"セマンティックチャンク数: {len(semantic_chunks)}")
print(f"平均チャンク長: {sum(len(c) for c in semantic_chunks) / len(semantic_chunks):.0f}")

注意: セマンティックチャンキングはすべての文ペアに対してエンベディングを生成する必要があるため、大規模文書ではコストと時間が大幅に増加します。10万件以上の文書を処理する場合は、Recursiveチャンキングの方が現実的です。

Agenticチャンキング

LLMを活用して文書の論理的構造を把握し、最適な分割ポイントを決定します。最も精巧ですがコストが高いです。

from openai import OpenAI

client = OpenAI()

def agentic_chunk(text: str, max_chunks: int = 20) -> list[dict]:
    """LLMベースのエージェンティックチャンキング"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "与えられたテキストを論理的単位に分割してください。"
                    "各チャンクは1つの完結したトピックを扱う必要があります。"
                    "JSON配列で返してください: "
                    '[{"title": "チャンクタイトル", "content": "チャンク内容", "summary": "一行要約"}]'
                ),
            },
            {"role": "user", "content": text[:8000]},  # トークン制限に注意
        ],
        response_format={"type": "json_object"},
        temperature=0,
    )
    import json
    result = json.loads(response.choices[0].message.content)
    return result.get("chunks", [])

# 使用例
chunks = agentic_chunk(long_document_text)
for chunk in chunks:
    print(f"[{chunk['title']}] {chunk['summary']}")

コストに関する考慮: Agenticチャンキングは文書ごとにLLM API呼び出しが発生するため、大量インデクシングには適していません。高価値文書(契約書、技術仕様書など)の少量処理に適しています。

エンベディングモデルの選択と最適化

チャンキング後のベクトルエンベディングモデルの選択は、検索品質に直接的に影響します。

主要エンベディングモデル比較

モデル次元最大トークン多言語MTEBスコア特徴
text-embedding-3-large30728191O64.6OpenAI最新、次元削減可能
text-embedding-3-small15368191O62.3コスト効率的
BAAI/bge-m310248192O68.2オープンソース、Dense+Sparse同時
Cohere embed-v41024512O66.1マルチモーダル対応
voyage-3-large102432000O67.5長いコンテキスト特化
from langchain_openai import OpenAIEmbeddings

# OpenAIエンベディング - 次元削減の活用
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-large",
    dimensions=1024,  # 3072 -> 1024に削減してコスト/ストレージを節約
)

# BGE-M3: Dense + Sparse同時生成(ハイブリッド検索に最適)
from FlagEmbedding import BGEM3FlagModel

bge_model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=True)

# DenseとSparseベクトルを同時生成
output = bge_model.encode(
    ["RAGパイプライン最適化方法論"],
    return_dense=True,
    return_sparse=True,
)

dense_vector = output["dense_vecs"][0]    # (1024,) floatベクトル
sparse_vector = output["lexical_weights"][0]  # スパースベクトル(単語別重み)

print(f"Dense次元: {len(dense_vector)}")
print(f"Sparseアクティブトークン数: {len(sparse_vector)}")

実践的な推奨事項:

  • コスト優先:text-embedding-3-small(OpenAI)またはbge-m3(セルフホスティング)
  • 品質優先:text-embedding-3-largeまたはvoyage-3-large
  • ハイブリッド検索を計画している場合:bge-m3(Dense + Sparse同時生成でインフラを簡素化)

ベクトルデータベース比較

ベクトルDB選択は運用の複雑さ、コスト、パフォーマンスに大きく影響します。

主要ベクトルデータベース比較

項目PineconeWeaviateQdrantMilvus
ホスティングマネージド(Serverless)マネージド + セルフホスティングマネージド + セルフホスティングセルフホスティング中心(Zilliz Cloud)
ハイブリッド検索対応(Sparseベクトル)ネイティブ対応対応(Sparseベクトル)対応
メタデータフィルタ基本GraphQLベースで強力Rustベース高性能基本
無料ティアStarter(10万ベクトル)Sandbox1GB無料(永久)オープンソース
クエリレイテンシ50ms以下50-100ms50ms以下30-50ms
スケーラビリティ自動スケーリング手動設定必要水平スケーリングK8sネイティブ
適したチーム運用最小化を望むチームOSS+柔軟性を望むチームフィルタリングが複雑大規模エンタープライズ

選択ガイド:

  • 素早いスタート + 運用負担の最小化:Pinecone
  • セルフホスティング + 複雑なフィルタリング:Qdrant
  • オープンソース + ネイティブハイブリッド検索:Weaviate
  • 大規模(10億+ベクトル)+ GPUアクセラレーション:Milvus/Zilliz

ハイブリッド検索:Dense + Sparseの結合

ベクトル検索だけではキーワードの正確なマッチングが困難で、BM25だけでは意味的類似度を捉えられません。ハイブリッド検索は2つの方式を結合してRecallを15〜30%向上させます。

ハイブリッド検索アーキテクチャ

質問: "Kubernetes HPAの設定でCPU閾値は?"

Dense検索(ベクトル類似度):
"コンテナオーケストレーションでの自動スケーリング設定方法" (意味的に類似)

Sparse検索(BM25キーワードマッチング):
"HPAのtargetCPUUtilizationPercentageを80に設定" (キーワード正確マッチング)

ハイブリッド結合(RRFまたは加重和):
  → 両方の結果を統合して最適な文書を返却

Weaviateハイブリッド検索の実装

import weaviate, { WeaviateClient } from 'weaviate-client'

// Weaviateクライアント接続
const client: WeaviateClient = await weaviate.connectToLocal({
  host: 'localhost',
  port: 8080,
})

// ハイブリッド検索の実行
const collection = client.collections.get('Documents')

const result = await collection.query.hybrid('Kubernetes HPA CPU 閾値', {
  alpha: 0.7, // 0 = 純粋BM25, 1 = 純粋ベクトル, 0.7 = ベクトル70%
  limit: 20, // リランキング前の候補数
  fusionType: 'RelativeScore', // RelativeScoreまたはRanked
  returnMetadata: ['score', 'explainScore'],
  returnProperties: ['title', 'content', 'source'],
})

for (const item of result.objects) {
  console.log(`[${item.metadata?.score?.toFixed(3)}] ${item.properties.title}`)
}

PythonでのBM25 + Dense直接実装

ベクトルDBのネイティブハイブリッド検索が使用できない場合、Reciprocal Rank Fusion(RRF)で直接実装できます。

from rank_bm25 import BM25Okapi
import numpy as np
from typing import List, Tuple

def hybrid_search(
    query: str,
    documents: list[dict],
    dense_scores: np.ndarray,
    k: int = 10,
    alpha: float = 0.7,
    rrf_k: int = 60,
) -> list[dict]:
    """
    RRFベースのハイブリッド検索
    alpha: Dense検索の重み(1-alphaがSparseの重み)
    rrf_k: RRF定数(デフォルト60、論文推奨)
    """
    # Sparse検索(BM25)
    tokenized_docs = [doc["content"].split() for doc in documents]
    bm25 = BM25Okapi(tokenized_docs)
    sparse_scores = bm25.get_scores(query.split())

    # Denseランク計算
    dense_ranks = np.argsort(-dense_scores) + 1  # 1-indexedランク
    sparse_ranks = np.argsort(-sparse_scores) + 1

    # RRFスコア計算
    rrf_scores = []
    for i in range(len(documents)):
        dense_rrf = alpha / (rrf_k + dense_ranks[i])
        sparse_rrf = (1 - alpha) / (rrf_k + sparse_ranks[i])
        rrf_scores.append(dense_rrf + sparse_rrf)

    # 上位K件返却
    top_indices = np.argsort(rrf_scores)[::-1][:k]
    return [
        {**documents[i], "hybrid_score": rrf_scores[i]}
        for i in top_indices
    ]

# 使用例
results = hybrid_search(
    query="Kubernetes HPA CPU threshold",
    documents=all_documents,
    dense_scores=embedding_similarity_scores,
    k=20,       # リランキング前なので余裕を持って
    alpha=0.7,  # Dense 70%, Sparse 30%
)

Alphaチューニングガイド

クエリタイプ推奨alpha理由
専門用語が多い技術クエリ0.3〜0.5キーワード正確マッチングが重要
自然言語の質問0.7〜0.8意味的類似度が重要
コード検索0.2〜0.4関数名、変数名の正確マッチング
一般的なFAQ0.5〜0.6バランスの取れた検索

核心: クエリタイプ別にalphaを動的に調整すると、静的設定と比べてPrecision@1で2〜7.5ポイントの向上が期待できます。

リランキングモデルの適用

ハイブリッド検索で候補を広げた後、リランキングモデルで最終順位を精密に調整します。リランカーはクエリと文書を一緒に入力として受け取り(Cross-Encoding)、直接関連度スコアを算出するため、Bi-Encoderエンベディングより精度が高いです。

リランキングモデル比較

モデルタイプパラメータ多言語レイテンシ(100件)コスト
Cohere Rerank 4API非公開100+言語200-400ms従量制
BAAI/bge-reranker-v2-m3OSS0.6BO500-800ms (GPU)無料
BAAI/bge-reranker-largeOSS560M限定的400-600ms (GPU)無料
cross-encoder/ms-marco-MiniLM-L-12-v2OSS33MX(英語)100-200ms (GPU)無料

Cohere Rerankの適用

import cohere

co = cohere.ClientV2(api_key="your-cohere-api-key")

def rerank_with_cohere(
    query: str,
    documents: list[str],
    top_n: int = 5,
) -> list[dict]:
    """Cohere Rerank 4で文書を再順位付け"""
    response = co.rerank(
        model="rerank-v3.5",
        query=query,
        documents=documents,
        top_n=top_n,
        return_documents=True,
    )

    results = []
    for item in response.results:
        results.append({
            "index": item.index,
            "score": item.relevance_score,
            "text": item.document.text if item.document else documents[item.index],
        })
    return results

# 使用例:ハイブリッド検索結果20件をリランキングして上位5件を選択
hybrid_results = hybrid_search(query, documents, dense_scores, k=20)
reranked = rerank_with_cohere(
    query="Kubernetes HPA CPUの閾値設定方法",
    documents=[r["content"] for r in hybrid_results],
    top_n=5,
)

for r in reranked:
    print(f"[{r['score']:.4f}] {r['text'][:80]}...")

BGE Rerankerのセルフホスティング

APIコストを避けたい場合やデータの外部送信ができない環境では、オープンソースのリランカーを直接ホスティングします。

from FlagEmbedding import FlagReranker

# BGE Reranker v2 M3 - 多言語対応の軽量リランカー
reranker = FlagReranker(
    "BAAI/bge-reranker-v2-m3",
    use_fp16=True,  # GPUメモリ節約
)

def rerank_with_bge(
    query: str,
    documents: list[str],
    top_n: int = 5,
) -> list[dict]:
    """BGE Rerankerで文書を再順位付け"""
    # クエリ-文書ペアの生成
    pairs = [[query, doc] for doc in documents]

    # 関連度スコアの計算
    scores = reranker.compute_score(pairs, normalize=True)

    # スコア順にソート
    scored_docs = [
        {"index": i, "score": s, "text": d}
        for i, (s, d) in enumerate(zip(scores, documents))
    ]
    scored_docs.sort(key=lambda x: x["score"], reverse=True)

    return scored_docs[:top_n]

# 使用例
results = rerank_with_bge(
    query="RAGパイプラインでチャンキングサイズを決める基準は?",
    documents=candidate_documents,
    top_n=5,
)

for r in results:
    print(f"[{r['score']:.4f}] {r['text'][:100]}...")

パイプライン全体の統合

チャンキング、ハイブリッド検索、リランキングを1つのパイプラインに統合する例です。

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

class OptimizedRAGPipeline:
    """高度化されたRAGパイプライン"""

    def __init__(self):
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=512, chunk_overlap=64
        )
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-large", dimensions=1024
        )
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.reranker = FlagReranker("BAAI/bge-reranker-v2-m3", use_fp16=True)

    def query(self, question: str, top_k: int = 5) -> str:
        # 1. ハイブリッド検索:候補20件
        candidates = self._hybrid_search(question, k=20)

        # 2. リランキング:上位5件を選択
        reranked = self._rerank(question, candidates, top_n=top_k)

        # 3. プロンプト構成とLLM生成
        context = "\n\n---\n\n".join([doc["text"] for doc in reranked])
        prompt = ChatPromptTemplate.from_messages([
            ("system", "以下のコンテキストに基づいて質問に回答してください。\n\n{context}"),
            ("human", "{question}"),
        ])
        chain = prompt | self.llm
        response = chain.invoke({"context": context, "question": question})
        return response.content

    def _hybrid_search(self, query: str, k: int = 20) -> list[dict]:
        # Dense + Sparseハイブリッド検索(上記コード参照)
        ...

    def _rerank(self, query: str, docs: list[dict], top_n: int) -> list[dict]:
        pairs = [[query, doc["text"]] for doc in docs]
        scores = self.reranker.compute_score(pairs, normalize=True)
        for doc, score in zip(docs, scores):
            doc["rerank_score"] = score
        docs.sort(key=lambda x: x["rerank_score"], reverse=True)
        return docs[:top_n]

評価指標とベンチマーキング

RAGパイプラインを最適化するには、定量的な評価が不可欠です。「なんとなく良くなった気がする」はプロダクションでは通用しません。

RGASフレームワーク

RAGAS(Retrieval-Augmented Generation Assessment Suite)はRAG専用の評価フレームワークで、Ground Truthなしでも LLMを評価者として活用して自動採点します。

from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
)
from datasets import Dataset

# 評価データセットの構成
eval_data = {
    "question": [
        "Kubernetes HPAのCPU閾値のデフォルト値は?",
        "RAGでチャンキングサイズを決める基準は?",
    ],
    "answer": [
        "HPAのCPU閾値のデフォルト値は80%です。",
        "クエリタイプに応じて256-1024トークンの範囲で決定します。",
    ],
    "contexts": [
        ["HPAはtargetCPUUtilizationPercentageのデフォルト値80を使用します。"],
        ["ファクトイドクエリは256-512、分析型は1024+トークンを推奨します。"],
    ],
    "ground_truth": [
        "デフォルト値は80%である。",
        "クエリタイプと文書特性に応じて決定する。",
    ],
}

dataset = Dataset.from_dict(eval_data)

# 評価の実行
results = evaluate(
    dataset=dataset,
    metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
)

print(results)
# faithfulness: 0.92, answer_relevancy: 0.88,
# context_precision: 0.85, context_recall: 0.90

DeepEvalでの単体テスト

DeepEvalはPytestスタイルでRAGをテストでき、CI/CDパイプラインへの統合に最適です。

from deepeval import assert_test
from deepeval.test_case import LLMTestCase
from deepeval.metrics import (
    FaithfulnessMetric,
    ContextualRelevancyMetric,
    AnswerRelevancyMetric,
)

def test_rag_faithfulness():
    """RAG応答がコンテキストに忠実かテスト"""
    test_case = LLMTestCase(
        input="Kubernetes HPA CPUの閾値は?",
        actual_output="HPAのCPU閾値のデフォルト値は80%です。",
        retrieval_context=[
            "HPAはtargetCPUUtilizationPercentageのデフォルト値80を使用します。"
        ],
    )

    faithfulness = FaithfulnessMetric(threshold=0.8)
    relevancy = ContextualRelevancyMetric(threshold=0.7)
    answer_rel = AnswerRelevancyMetric(threshold=0.7)

    assert_test(test_case, [faithfulness, relevancy, answer_rel])

# pytestで実行: pytest test_rag.py -v

核心評価指標まとめ

指標測定対象期待値ツール
Context Precision検索されたコンテキストの関連性0.8以上RAGAS
Context Recall必要なコンテキストの検索比率0.85以上RAGAS
Faithfulness応答がコンテキストに忠実な程度0.9以上RAGAS, DeepEval
Answer Relevancy応答が質問に適切な程度0.85以上RAGAS, DeepEval
MRR@K最初の関連文書の逆順位平均0.7以上カスタム
NDCG@K順位品質の正規化割引累積利得0.75以上カスタム

運用上の注意事項とトラブルシューティング

1. エンベディングモデル変更時に全体再インデクシングが必要

エンベディングモデルをアップグレードすると、既存ベクトルと新ベクトルの空間が異なります。部分再インデクシングは検索品質を深刻に低下させます。

対応: Blue-Greenインデックス戦略を使用します。新モデルで別のインデックスを完成させた後、トラフィックを切り替えます。

# Blue-Greenインデックス切り替え例
import time

def reindex_with_blue_green(
    old_collection: str,
    new_collection: str,
    new_embedding_model: str,
):
    """無停止再インデクシング"""
    # 1. 新コレクションにインデクシング(既存サービスはold_collectionを使用中)
    print(f"新コレクション '{new_collection}' のインデクシング開始...")
    create_and_populate_collection(new_collection, new_embedding_model)

    # 2. 検証:新コレクションに対してテストクエリを実行
    test_results = run_evaluation_suite(new_collection)
    if test_results["context_precision"] < 0.8:
        raise ValueError(
            f"新インデックスの品質が基準未満: {test_results['context_precision']:.2f}"
        )

    # 3. トラフィック切り替え:aliasまたは設定変更
    update_active_collection(new_collection)
    print(f"トラフィック切り替え完了: {old_collection} -> {new_collection}")

    # 4. 旧コレクションはロールバック用に一定期間保持
    schedule_cleanup(old_collection, delay_days=7)

2. チャンクサイズとエンベディングモデルのトークン制限の不一致

チャンクがエンベディングモデルの最大トークン数を超えると、切り詰められるかエラーが発生します。

対応: チャンキング段階でエンベディングモデルのトークン制限を考慮した長さ関数を使用します。

import tiktoken

enc = tiktoken.encoding_for_model("text-embedding-3-small")

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=64,
    length_function=lambda text: len(enc.encode(text)),  # トークン数基準
)

3. リランキングレイテンシの管理

リランカーはCross-Encoderなので、候補数に比例してレイテンシが増加します。100件のリランキングで500ms〜1秒が追加されます。

対応:

  • ハイブリッド検索で候補を20〜30件に制限
  • バッチ処理時は非同期呼び出しを活用
  • オープンソースのリランカーはGPUサーバーにデプロイしてレイテンシを確保

4. ベクトルDBインデックスのメモリ管理

HNSWインデックスはメモリにグラフ全体を保持する必要があります。100万ベクトル(1024次元)は約4〜8GBのメモリを使用します。

対応:

  • ベクトル次元の削減(3072 -> 1024)
  • 量子化(Quantization)の適用:Scalar、Product、Binary量子化
  • DiskANNインデックスの活用(Milvus対応)

5. メタデータフィルタと検索パフォーマンス

メタデータフィルタが過度になると、ベクトル検索パフォーマンスが急激に低下します。特にカーディナリティの高いフィールド(タイムスタンプ、ユーザーIDなど)にフィルタをかけると問題が悪化します。

対応:

  • カーディナリティの低いフィールドのみフィルタに使用(カテゴリ、部門、文書タイプなど)
  • 日付フィルタは範囲を広くとり、リランキングで最新性の重みを適用

障害事例と復旧手順

事例1:セマンティックチャンキングへの切り替え後、検索品質が低下

状況: Recursiveチャンキングからセマンティックチャンキングに切り替えたところ、Context Precisionが0.85から0.72に低下しました。

原因: セマンティックチャンキングが生成したチャンクサイズが非常に不均一でした。一部のチャンクは50トークン、一部は2000トークンとなり、エンベディング品質にばらつきが出ました。

復旧:

  1. セマンティックチャンキング結果に最小/最大サイズ制限を追加
  2. 既存のRecursiveチャンキングインデックスに即座にロールバック(Blue-Green方式だったため可能)
  3. 最小200トークン、最大800トークン制限をかけたセマンティックチャンキングで再試行

事例2:ハイブリッド検索alpha固定による特定クエリタイプのパフォーマンス低下

状況: alpha=0.7固定で運用中、コード検索クエリで正確な関数名が見つからない問題が多発。

原因: コード関連クエリはキーワードの正確なマッチングが重要なのに、Denseの重みが高すぎました。

復旧:

  1. クエリ分類器を追加してクエリタイプを自動判別
  2. コード/技術クエリはalpha=0.3、自然言語質問はalpha=0.7に動的調整
  3. 分類器自体は軽量モデル(distilbertベース)でレイテンシ10ms未満

事例3:リランカー障害時のサービスダウン

状況: Cohere Rerank APIの障害でRAGパイプライン全体が応答不能状態に。

原因: リランキングステップを必須として構成し、フォールバックロジックがありませんでした。

復旧:

  1. リランキングステップをオプショナルに変更
  2. タイムアウト(2秒)超過またはAPIエラー時、ハイブリッド検索結果をそのまま返却
  3. セルフホスティングのBGE Rerankerをバックアップリランカーとしてデプロイし、二重化
import asyncio

async def rerank_with_fallback(
    query: str,
    documents: list[str],
    top_n: int = 5,
    timeout: float = 2.0,
) -> list[dict]:
    """フォールバック付きリランキング"""
    try:
        # Primary: Cohere Rerank(タイムアウト2秒)
        result = await asyncio.wait_for(
            cohere_rerank_async(query, documents, top_n),
            timeout=timeout,
        )
        return result
    except (asyncio.TimeoutError, Exception) as e:
        print(f"Cohereリランキング失敗、BGEフォールバック: {e}")
        try:
            # Fallback: セルフホスティングBGE Reranker
            return rerank_with_bge(query, documents, top_n)
        except Exception as e2:
            print(f"BGEリランキングも失敗、元の順位を返却: {e2}")
            # 最終フォールバック:ハイブリッド検索結果をそのまま返却
            return [
                {"index": i, "score": 1.0 - i * 0.05, "text": d}
                for i, d in enumerate(documents[:top_n])
            ]

事例4:大量文書再インデクシング中のサービス品質低下

状況: 10万件の文書再インデクシング中にエンベディングAPI呼び出しが急増してRate Limitに引っかかり、リアルタイムクエリのエンベディング応答も遅延。

復旧:

  1. インデクシングとクエリのAPIキー/エンドポイントを分離
  2. インデクシングにはバッチサイズ制御とRate Limit対応ロジックを追加
  3. 夜間時間帯にインデクシングを実行してクエリトラフィックとの競合を回避

おわりに

RAGパイプラインの高度化は単一の技術ではなく、チャンキング、検索、リランキング、評価の組み合わせです。各段階を独立して最適化しつつ、パイプライン全体の評価指標を基準に意思決定する必要があります。

実践適用の推奨順序:

  1. Recursiveチャンキング + Dense検索でベースラインを構築(1週間)
  2. RAGAS/DeepEvalで評価パイプラインを構築(1週間)
  3. ハイブリッド検索を追加してRecallを改善(1週間)
  4. リランキングを追加してPrecisionを改善(1週間)
  5. クエリ別の動的alphaと評価に基づく持続的な改善(継続)

「一度にすべてを適用しよう」というアプローチは失敗します。各段階を追加するたびに評価指標の変化を確認し、むしろ悪化した場合は即座にロールバックすることが、プロダクションでの正攻法です。

参考資料