- Authors
- Name
- はじめに
- RAGパイプラインアーキテクチャ概要
- チャンキング戦略の深堀り
- エンベディングモデルの選択と最適化
- ベクトルデータベース比較
- ハイブリッド検索:Dense + Sparseの結合
- リランキングモデルの適用
- 評価指標とベンチマーキング
- 運用上の注意事項とトラブルシューティング
- 障害事例と復旧手順
- おわりに
- 参考資料

はじめに
RAG(Retrieval-Augmented Generation)をプロダクションに適用したチームなら、一度はこんな経験をしたことがあるでしょう。「検索はされるが回答がおかしい」「関連文書が確かにあるのに検索にヒットしない」「短い質問にはうまくいくが、複雑な質問にはハルシネーションが起きる」。これらの問題の根本原因はほとんどの場合検索品質にあります。LLMがどれほど賢くても、間違ったコンテキストを受け取れば間違った回答を生成するしかありません。
この記事では、RAGパイプラインの検索品質を最大化するための3つの核心軸を扱います。
- チャンキング(Chunking):文書をどのように分割するか
- ハイブリッド検索(Hybrid Search):DenseベクトルとSparseキーワード検索をどう組み合わせるか
- リランキング(Reranking):検索結果をどのように再順位付けするか
各戦略について実践コード、ベンチマーク数値、運用上の注意事項を具体的に見ていきます。2026年3月時点の最新モデルとツールをベースに作成しました。
RAGパイプラインアーキテクチャ概要
高度化されたRAGパイプラインの全体フローは以下の通りです。
[インデクシングフェーズ]
文書収集 → 前処理 → チャンキング → エンベディング → ベクトルDB保存 + 転置インデックス保存
[クエリフェーズ]
質問 → クエリ変換 → ハイブリッド検索 (Dense + Sparse)
→ リランキング → 上位K件選択 → プロンプト構成 → LLM生成
基本RAGとの核心的な違いは3つです。
- チャンキング戦略の細分化:単純な固定サイズではなく、セマンティック、再帰的、エージェンティックチャンキングを適用
- ハイブリッド検索:ベクトル類似度だけに依存せず、BM25キーワード検索を併用
- リランキングレイヤーの追加:初期検索結果をCross-Encoderで再評価して精度を向上
この3つを組み合わせると、検索精度(Precision@K)を30〜50%以上改善できます。以下でそれぞれを深く見ていきましょう。
チャンキング戦略の深堀り
チャンキングはRAGパイプラインで最初に決定する要素であり、全体のパフォーマンスへの影響が最も大きいです。間違ったチャンキングはその後どのような高度化をしても回復が困難です。
チャンキング戦略比較表
| 戦略 | 原理 | 適した状況 | チャンクサイズ | 長所 | 短所 |
|---|---|---|---|---|---|
| Fixed-size | 固定トークン/文字数で分割 | 均一な構造の文書 | 256-512トークン | 実装が単純、高速 | 意味単位を無視 |
| Recursive | 区切り文字の階層で再帰的に分割 | 汎用テキスト | 512-1024トークン | 段落/文境界を尊重 | 区切り文字の設定必要 |
| Semantic | エンベディング類似度で意味境界を検出 | トピック遷移が多い文書 | 可変 | 意味保存に優れる | エンベディングコスト、低速 |
| Agentic | LLMが文書構造を分析して分割 | 複雑な技術文書 | 可変 | 最高品質 | 高コスト、低速 |
Fixed-sizeチャンキング
最もシンプルですが依然として効果的な戦略です。2026年のベンチマークでも、512トークンの固定サイズチャンキングが複雑なセマンティックチャンキングより良い結果を示すケースが報告されています。
from langchain_text_splitters import CharacterTextSplitter
# 固定サイズチャンキング - 最も基本的な方式
fixed_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=512,
chunk_overlap=50, # 10%オーバーラップ推奨
length_function=len,
is_separator_regex=False,
)
chunks = fixed_splitter.split_text(document_text)
print(f"合計 {len(chunks)}個のチャンク作成")
推奨設定:
- ファクトイド(事実確認)クエリ:256〜512トークン
- 分析型クエリ:1024+トークン
- オーバーラップ:チャンクサイズ全体の10〜20%
Recursiveチャンキング
LangChainのRecursiveCharacterTextSplitterは区切り文字の階層に沿って再帰的に分割します。段落、文、単語の順序で境界を尊重しながら目標サイズに合わせます。
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 再帰的チャンキング - プロダクション基本推奨
recursive_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", ". ", " ", ""],
chunk_size=512,
chunk_overlap=64,
length_function=len,
add_start_index=True, # 原文位置追跡用
)
chunks = recursive_splitter.split_documents(documents)
# 各チャンクにメタデータが自動的に含まれる
for chunk in chunks[:3]:
print(f"サイズ: {len(chunk.page_content)}, "
f"開始位置: {chunk.metadata.get('start_index')}")
実践的なヒント: ほとんどのプロダクション環境では、Recursiveチャンキングから始めることを推奨します。シンプルでありながら段落の境界を尊重するため、コスト対効果が最も優れています。
Semanticチャンキング
エンベディングを使用して隣接する文間の意味的類似度を計算し、類似度が急激に低下する地点で分割します。
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
# セマンティックチャンキング - 意味単位での分割
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
semantic_splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile", # percentile, standard_deviation, interquartile
breakpoint_threshold_amount=75, # 75パーセンタイル以上の差で分割
)
semantic_chunks = semantic_splitter.split_text(document_text)
print(f"セマンティックチャンク数: {len(semantic_chunks)}")
print(f"平均チャンク長: {sum(len(c) for c in semantic_chunks) / len(semantic_chunks):.0f}")
注意: セマンティックチャンキングはすべての文ペアに対してエンベディングを生成する必要があるため、大規模文書ではコストと時間が大幅に増加します。10万件以上の文書を処理する場合は、Recursiveチャンキングの方が現実的です。
Agenticチャンキング
LLMを活用して文書の論理的構造を把握し、最適な分割ポイントを決定します。最も精巧ですがコストが高いです。
from openai import OpenAI
client = OpenAI()
def agentic_chunk(text: str, max_chunks: int = 20) -> list[dict]:
"""LLMベースのエージェンティックチャンキング"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"与えられたテキストを論理的単位に分割してください。"
"各チャンクは1つの完結したトピックを扱う必要があります。"
"JSON配列で返してください: "
'[{"title": "チャンクタイトル", "content": "チャンク内容", "summary": "一行要約"}]'
),
},
{"role": "user", "content": text[:8000]}, # トークン制限に注意
],
response_format={"type": "json_object"},
temperature=0,
)
import json
result = json.loads(response.choices[0].message.content)
return result.get("chunks", [])
# 使用例
chunks = agentic_chunk(long_document_text)
for chunk in chunks:
print(f"[{chunk['title']}] {chunk['summary']}")
コストに関する考慮: Agenticチャンキングは文書ごとにLLM API呼び出しが発生するため、大量インデクシングには適していません。高価値文書(契約書、技術仕様書など)の少量処理に適しています。
エンベディングモデルの選択と最適化
チャンキング後のベクトルエンベディングモデルの選択は、検索品質に直接的に影響します。
主要エンベディングモデル比較
| モデル | 次元 | 最大トークン | 多言語 | MTEBスコア | 特徴 |
|---|---|---|---|---|---|
| text-embedding-3-large | 3072 | 8191 | O | 64.6 | OpenAI最新、次元削減可能 |
| text-embedding-3-small | 1536 | 8191 | O | 62.3 | コスト効率的 |
| BAAI/bge-m3 | 1024 | 8192 | O | 68.2 | オープンソース、Dense+Sparse同時 |
| Cohere embed-v4 | 1024 | 512 | O | 66.1 | マルチモーダル対応 |
| voyage-3-large | 1024 | 32000 | O | 67.5 | 長いコンテキスト特化 |
from langchain_openai import OpenAIEmbeddings
# OpenAIエンベディング - 次元削減の活用
embeddings = OpenAIEmbeddings(
model="text-embedding-3-large",
dimensions=1024, # 3072 -> 1024に削減してコスト/ストレージを節約
)
# BGE-M3: Dense + Sparse同時生成(ハイブリッド検索に最適)
from FlagEmbedding import BGEM3FlagModel
bge_model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=True)
# DenseとSparseベクトルを同時生成
output = bge_model.encode(
["RAGパイプライン最適化方法論"],
return_dense=True,
return_sparse=True,
)
dense_vector = output["dense_vecs"][0] # (1024,) floatベクトル
sparse_vector = output["lexical_weights"][0] # スパースベクトル(単語別重み)
print(f"Dense次元: {len(dense_vector)}")
print(f"Sparseアクティブトークン数: {len(sparse_vector)}")
実践的な推奨事項:
- コスト優先:text-embedding-3-small(OpenAI)またはbge-m3(セルフホスティング)
- 品質優先:text-embedding-3-largeまたはvoyage-3-large
- ハイブリッド検索を計画している場合:bge-m3(Dense + Sparse同時生成でインフラを簡素化)
ベクトルデータベース比較
ベクトルDB選択は運用の複雑さ、コスト、パフォーマンスに大きく影響します。
主要ベクトルデータベース比較
| 項目 | Pinecone | Weaviate | Qdrant | Milvus |
|---|---|---|---|---|
| ホスティング | マネージド(Serverless) | マネージド + セルフホスティング | マネージド + セルフホスティング | セルフホスティング中心(Zilliz Cloud) |
| ハイブリッド検索 | 対応(Sparseベクトル) | ネイティブ対応 | 対応(Sparseベクトル) | 対応 |
| メタデータフィルタ | 基本 | GraphQLベースで強力 | Rustベース高性能 | 基本 |
| 無料ティア | Starter(10万ベクトル) | Sandbox | 1GB無料(永久) | オープンソース |
| クエリレイテンシ | 50ms以下 | 50-100ms | 50ms以下 | 30-50ms |
| スケーラビリティ | 自動スケーリング | 手動設定必要 | 水平スケーリング | K8sネイティブ |
| 適したチーム | 運用最小化を望むチーム | OSS+柔軟性を望むチーム | フィルタリングが複雑 | 大規模エンタープライズ |
選択ガイド:
- 素早いスタート + 運用負担の最小化:Pinecone
- セルフホスティング + 複雑なフィルタリング:Qdrant
- オープンソース + ネイティブハイブリッド検索:Weaviate
- 大規模(10億+ベクトル)+ GPUアクセラレーション:Milvus/Zilliz
ハイブリッド検索:Dense + Sparseの結合
ベクトル検索だけではキーワードの正確なマッチングが困難で、BM25だけでは意味的類似度を捉えられません。ハイブリッド検索は2つの方式を結合してRecallを15〜30%向上させます。
ハイブリッド検索アーキテクチャ
質問: "Kubernetes HPAの設定でCPU閾値は?"
Dense検索(ベクトル類似度):
→ "コンテナオーケストレーションでの自動スケーリング設定方法" (意味的に類似)
Sparse検索(BM25キーワードマッチング):
→ "HPAのtargetCPUUtilizationPercentageを80に設定" (キーワード正確マッチング)
ハイブリッド結合(RRFまたは加重和):
→ 両方の結果を統合して最適な文書を返却
Weaviateハイブリッド検索の実装
import weaviate, { WeaviateClient } from 'weaviate-client'
// Weaviateクライアント接続
const client: WeaviateClient = await weaviate.connectToLocal({
host: 'localhost',
port: 8080,
})
// ハイブリッド検索の実行
const collection = client.collections.get('Documents')
const result = await collection.query.hybrid('Kubernetes HPA CPU 閾値', {
alpha: 0.7, // 0 = 純粋BM25, 1 = 純粋ベクトル, 0.7 = ベクトル70%
limit: 20, // リランキング前の候補数
fusionType: 'RelativeScore', // RelativeScoreまたはRanked
returnMetadata: ['score', 'explainScore'],
returnProperties: ['title', 'content', 'source'],
})
for (const item of result.objects) {
console.log(`[${item.metadata?.score?.toFixed(3)}] ${item.properties.title}`)
}
PythonでのBM25 + Dense直接実装
ベクトルDBのネイティブハイブリッド検索が使用できない場合、Reciprocal Rank Fusion(RRF)で直接実装できます。
from rank_bm25 import BM25Okapi
import numpy as np
from typing import List, Tuple
def hybrid_search(
query: str,
documents: list[dict],
dense_scores: np.ndarray,
k: int = 10,
alpha: float = 0.7,
rrf_k: int = 60,
) -> list[dict]:
"""
RRFベースのハイブリッド検索
alpha: Dense検索の重み(1-alphaがSparseの重み)
rrf_k: RRF定数(デフォルト60、論文推奨)
"""
# Sparse検索(BM25)
tokenized_docs = [doc["content"].split() for doc in documents]
bm25 = BM25Okapi(tokenized_docs)
sparse_scores = bm25.get_scores(query.split())
# Denseランク計算
dense_ranks = np.argsort(-dense_scores) + 1 # 1-indexedランク
sparse_ranks = np.argsort(-sparse_scores) + 1
# RRFスコア計算
rrf_scores = []
for i in range(len(documents)):
dense_rrf = alpha / (rrf_k + dense_ranks[i])
sparse_rrf = (1 - alpha) / (rrf_k + sparse_ranks[i])
rrf_scores.append(dense_rrf + sparse_rrf)
# 上位K件返却
top_indices = np.argsort(rrf_scores)[::-1][:k]
return [
{**documents[i], "hybrid_score": rrf_scores[i]}
for i in top_indices
]
# 使用例
results = hybrid_search(
query="Kubernetes HPA CPU threshold",
documents=all_documents,
dense_scores=embedding_similarity_scores,
k=20, # リランキング前なので余裕を持って
alpha=0.7, # Dense 70%, Sparse 30%
)
Alphaチューニングガイド
| クエリタイプ | 推奨alpha | 理由 |
|---|---|---|
| 専門用語が多い技術クエリ | 0.3〜0.5 | キーワード正確マッチングが重要 |
| 自然言語の質問 | 0.7〜0.8 | 意味的類似度が重要 |
| コード検索 | 0.2〜0.4 | 関数名、変数名の正確マッチング |
| 一般的なFAQ | 0.5〜0.6 | バランスの取れた検索 |
核心: クエリタイプ別にalphaを動的に調整すると、静的設定と比べてPrecision@1で2〜7.5ポイントの向上が期待できます。
リランキングモデルの適用
ハイブリッド検索で候補を広げた後、リランキングモデルで最終順位を精密に調整します。リランカーはクエリと文書を一緒に入力として受け取り(Cross-Encoding)、直接関連度スコアを算出するため、Bi-Encoderエンベディングより精度が高いです。
リランキングモデル比較
| モデル | タイプ | パラメータ | 多言語 | レイテンシ(100件) | コスト |
|---|---|---|---|---|---|
| Cohere Rerank 4 | API | 非公開 | 100+言語 | 200-400ms | 従量制 |
| BAAI/bge-reranker-v2-m3 | OSS | 0.6B | O | 500-800ms (GPU) | 無料 |
| BAAI/bge-reranker-large | OSS | 560M | 限定的 | 400-600ms (GPU) | 無料 |
| cross-encoder/ms-marco-MiniLM-L-12-v2 | OSS | 33M | X(英語) | 100-200ms (GPU) | 無料 |
Cohere Rerankの適用
import cohere
co = cohere.ClientV2(api_key="your-cohere-api-key")
def rerank_with_cohere(
query: str,
documents: list[str],
top_n: int = 5,
) -> list[dict]:
"""Cohere Rerank 4で文書を再順位付け"""
response = co.rerank(
model="rerank-v3.5",
query=query,
documents=documents,
top_n=top_n,
return_documents=True,
)
results = []
for item in response.results:
results.append({
"index": item.index,
"score": item.relevance_score,
"text": item.document.text if item.document else documents[item.index],
})
return results
# 使用例:ハイブリッド検索結果20件をリランキングして上位5件を選択
hybrid_results = hybrid_search(query, documents, dense_scores, k=20)
reranked = rerank_with_cohere(
query="Kubernetes HPA CPUの閾値設定方法",
documents=[r["content"] for r in hybrid_results],
top_n=5,
)
for r in reranked:
print(f"[{r['score']:.4f}] {r['text'][:80]}...")
BGE Rerankerのセルフホスティング
APIコストを避けたい場合やデータの外部送信ができない環境では、オープンソースのリランカーを直接ホスティングします。
from FlagEmbedding import FlagReranker
# BGE Reranker v2 M3 - 多言語対応の軽量リランカー
reranker = FlagReranker(
"BAAI/bge-reranker-v2-m3",
use_fp16=True, # GPUメモリ節約
)
def rerank_with_bge(
query: str,
documents: list[str],
top_n: int = 5,
) -> list[dict]:
"""BGE Rerankerで文書を再順位付け"""
# クエリ-文書ペアの生成
pairs = [[query, doc] for doc in documents]
# 関連度スコアの計算
scores = reranker.compute_score(pairs, normalize=True)
# スコア順にソート
scored_docs = [
{"index": i, "score": s, "text": d}
for i, (s, d) in enumerate(zip(scores, documents))
]
scored_docs.sort(key=lambda x: x["score"], reverse=True)
return scored_docs[:top_n]
# 使用例
results = rerank_with_bge(
query="RAGパイプラインでチャンキングサイズを決める基準は?",
documents=candidate_documents,
top_n=5,
)
for r in results:
print(f"[{r['score']:.4f}] {r['text'][:100]}...")
パイプライン全体の統合
チャンキング、ハイブリッド検索、リランキングを1つのパイプラインに統合する例です。
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
class OptimizedRAGPipeline:
"""高度化されたRAGパイプライン"""
def __init__(self):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=512, chunk_overlap=64
)
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-large", dimensions=1024
)
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.reranker = FlagReranker("BAAI/bge-reranker-v2-m3", use_fp16=True)
def query(self, question: str, top_k: int = 5) -> str:
# 1. ハイブリッド検索:候補20件
candidates = self._hybrid_search(question, k=20)
# 2. リランキング:上位5件を選択
reranked = self._rerank(question, candidates, top_n=top_k)
# 3. プロンプト構成とLLM生成
context = "\n\n---\n\n".join([doc["text"] for doc in reranked])
prompt = ChatPromptTemplate.from_messages([
("system", "以下のコンテキストに基づいて質問に回答してください。\n\n{context}"),
("human", "{question}"),
])
chain = prompt | self.llm
response = chain.invoke({"context": context, "question": question})
return response.content
def _hybrid_search(self, query: str, k: int = 20) -> list[dict]:
# Dense + Sparseハイブリッド検索(上記コード参照)
...
def _rerank(self, query: str, docs: list[dict], top_n: int) -> list[dict]:
pairs = [[query, doc["text"]] for doc in docs]
scores = self.reranker.compute_score(pairs, normalize=True)
for doc, score in zip(docs, scores):
doc["rerank_score"] = score
docs.sort(key=lambda x: x["rerank_score"], reverse=True)
return docs[:top_n]
評価指標とベンチマーキング
RAGパイプラインを最適化するには、定量的な評価が不可欠です。「なんとなく良くなった気がする」はプロダクションでは通用しません。
RGASフレームワーク
RAGAS(Retrieval-Augmented Generation Assessment Suite)はRAG専用の評価フレームワークで、Ground Truthなしでも LLMを評価者として活用して自動採点します。
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
from datasets import Dataset
# 評価データセットの構成
eval_data = {
"question": [
"Kubernetes HPAのCPU閾値のデフォルト値は?",
"RAGでチャンキングサイズを決める基準は?",
],
"answer": [
"HPAのCPU閾値のデフォルト値は80%です。",
"クエリタイプに応じて256-1024トークンの範囲で決定します。",
],
"contexts": [
["HPAはtargetCPUUtilizationPercentageのデフォルト値80を使用します。"],
["ファクトイドクエリは256-512、分析型は1024+トークンを推奨します。"],
],
"ground_truth": [
"デフォルト値は80%である。",
"クエリタイプと文書特性に応じて決定する。",
],
}
dataset = Dataset.from_dict(eval_data)
# 評価の実行
results = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
)
print(results)
# faithfulness: 0.92, answer_relevancy: 0.88,
# context_precision: 0.85, context_recall: 0.90
DeepEvalでの単体テスト
DeepEvalはPytestスタイルでRAGをテストでき、CI/CDパイプラインへの統合に最適です。
from deepeval import assert_test
from deepeval.test_case import LLMTestCase
from deepeval.metrics import (
FaithfulnessMetric,
ContextualRelevancyMetric,
AnswerRelevancyMetric,
)
def test_rag_faithfulness():
"""RAG応答がコンテキストに忠実かテスト"""
test_case = LLMTestCase(
input="Kubernetes HPA CPUの閾値は?",
actual_output="HPAのCPU閾値のデフォルト値は80%です。",
retrieval_context=[
"HPAはtargetCPUUtilizationPercentageのデフォルト値80を使用します。"
],
)
faithfulness = FaithfulnessMetric(threshold=0.8)
relevancy = ContextualRelevancyMetric(threshold=0.7)
answer_rel = AnswerRelevancyMetric(threshold=0.7)
assert_test(test_case, [faithfulness, relevancy, answer_rel])
# pytestで実行: pytest test_rag.py -v
核心評価指標まとめ
| 指標 | 測定対象 | 期待値 | ツール |
|---|---|---|---|
| Context Precision | 検索されたコンテキストの関連性 | 0.8以上 | RAGAS |
| Context Recall | 必要なコンテキストの検索比率 | 0.85以上 | RAGAS |
| Faithfulness | 応答がコンテキストに忠実な程度 | 0.9以上 | RAGAS, DeepEval |
| Answer Relevancy | 応答が質問に適切な程度 | 0.85以上 | RAGAS, DeepEval |
| MRR@K | 最初の関連文書の逆順位平均 | 0.7以上 | カスタム |
| NDCG@K | 順位品質の正規化割引累積利得 | 0.75以上 | カスタム |
運用上の注意事項とトラブルシューティング
1. エンベディングモデル変更時に全体再インデクシングが必要
エンベディングモデルをアップグレードすると、既存ベクトルと新ベクトルの空間が異なります。部分再インデクシングは検索品質を深刻に低下させます。
対応: Blue-Greenインデックス戦略を使用します。新モデルで別のインデックスを完成させた後、トラフィックを切り替えます。
# Blue-Greenインデックス切り替え例
import time
def reindex_with_blue_green(
old_collection: str,
new_collection: str,
new_embedding_model: str,
):
"""無停止再インデクシング"""
# 1. 新コレクションにインデクシング(既存サービスはold_collectionを使用中)
print(f"新コレクション '{new_collection}' のインデクシング開始...")
create_and_populate_collection(new_collection, new_embedding_model)
# 2. 検証:新コレクションに対してテストクエリを実行
test_results = run_evaluation_suite(new_collection)
if test_results["context_precision"] < 0.8:
raise ValueError(
f"新インデックスの品質が基準未満: {test_results['context_precision']:.2f}"
)
# 3. トラフィック切り替え:aliasまたは設定変更
update_active_collection(new_collection)
print(f"トラフィック切り替え完了: {old_collection} -> {new_collection}")
# 4. 旧コレクションはロールバック用に一定期間保持
schedule_cleanup(old_collection, delay_days=7)
2. チャンクサイズとエンベディングモデルのトークン制限の不一致
チャンクがエンベディングモデルの最大トークン数を超えると、切り詰められるかエラーが発生します。
対応: チャンキング段階でエンベディングモデルのトークン制限を考慮した長さ関数を使用します。
import tiktoken
enc = tiktoken.encoding_for_model("text-embedding-3-small")
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=64,
length_function=lambda text: len(enc.encode(text)), # トークン数基準
)
3. リランキングレイテンシの管理
リランカーはCross-Encoderなので、候補数に比例してレイテンシが増加します。100件のリランキングで500ms〜1秒が追加されます。
対応:
- ハイブリッド検索で候補を20〜30件に制限
- バッチ処理時は非同期呼び出しを活用
- オープンソースのリランカーはGPUサーバーにデプロイしてレイテンシを確保
4. ベクトルDBインデックスのメモリ管理
HNSWインデックスはメモリにグラフ全体を保持する必要があります。100万ベクトル(1024次元)は約4〜8GBのメモリを使用します。
対応:
- ベクトル次元の削減(3072 -> 1024)
- 量子化(Quantization)の適用:Scalar、Product、Binary量子化
- DiskANNインデックスの活用(Milvus対応)
5. メタデータフィルタと検索パフォーマンス
メタデータフィルタが過度になると、ベクトル検索パフォーマンスが急激に低下します。特にカーディナリティの高いフィールド(タイムスタンプ、ユーザーIDなど)にフィルタをかけると問題が悪化します。
対応:
- カーディナリティの低いフィールドのみフィルタに使用(カテゴリ、部門、文書タイプなど)
- 日付フィルタは範囲を広くとり、リランキングで最新性の重みを適用
障害事例と復旧手順
事例1:セマンティックチャンキングへの切り替え後、検索品質が低下
状況: Recursiveチャンキングからセマンティックチャンキングに切り替えたところ、Context Precisionが0.85から0.72に低下しました。
原因: セマンティックチャンキングが生成したチャンクサイズが非常に不均一でした。一部のチャンクは50トークン、一部は2000トークンとなり、エンベディング品質にばらつきが出ました。
復旧:
- セマンティックチャンキング結果に最小/最大サイズ制限を追加
- 既存のRecursiveチャンキングインデックスに即座にロールバック(Blue-Green方式だったため可能)
- 最小200トークン、最大800トークン制限をかけたセマンティックチャンキングで再試行
事例2:ハイブリッド検索alpha固定による特定クエリタイプのパフォーマンス低下
状況: alpha=0.7固定で運用中、コード検索クエリで正確な関数名が見つからない問題が多発。
原因: コード関連クエリはキーワードの正確なマッチングが重要なのに、Denseの重みが高すぎました。
復旧:
- クエリ分類器を追加してクエリタイプを自動判別
- コード/技術クエリはalpha=0.3、自然言語質問はalpha=0.7に動的調整
- 分類器自体は軽量モデル(distilbertベース)でレイテンシ10ms未満
事例3:リランカー障害時のサービスダウン
状況: Cohere Rerank APIの障害でRAGパイプライン全体が応答不能状態に。
原因: リランキングステップを必須として構成し、フォールバックロジックがありませんでした。
復旧:
- リランキングステップをオプショナルに変更
- タイムアウト(2秒)超過またはAPIエラー時、ハイブリッド検索結果をそのまま返却
- セルフホスティングのBGE Rerankerをバックアップリランカーとしてデプロイし、二重化
import asyncio
async def rerank_with_fallback(
query: str,
documents: list[str],
top_n: int = 5,
timeout: float = 2.0,
) -> list[dict]:
"""フォールバック付きリランキング"""
try:
# Primary: Cohere Rerank(タイムアウト2秒)
result = await asyncio.wait_for(
cohere_rerank_async(query, documents, top_n),
timeout=timeout,
)
return result
except (asyncio.TimeoutError, Exception) as e:
print(f"Cohereリランキング失敗、BGEフォールバック: {e}")
try:
# Fallback: セルフホスティングBGE Reranker
return rerank_with_bge(query, documents, top_n)
except Exception as e2:
print(f"BGEリランキングも失敗、元の順位を返却: {e2}")
# 最終フォールバック:ハイブリッド検索結果をそのまま返却
return [
{"index": i, "score": 1.0 - i * 0.05, "text": d}
for i, d in enumerate(documents[:top_n])
]
事例4:大量文書再インデクシング中のサービス品質低下
状況: 10万件の文書再インデクシング中にエンベディングAPI呼び出しが急増してRate Limitに引っかかり、リアルタイムクエリのエンベディング応答も遅延。
復旧:
- インデクシングとクエリのAPIキー/エンドポイントを分離
- インデクシングにはバッチサイズ制御とRate Limit対応ロジックを追加
- 夜間時間帯にインデクシングを実行してクエリトラフィックとの競合を回避
おわりに
RAGパイプラインの高度化は単一の技術ではなく、チャンキング、検索、リランキング、評価の組み合わせです。各段階を独立して最適化しつつ、パイプライン全体の評価指標を基準に意思決定する必要があります。
実践適用の推奨順序:
- Recursiveチャンキング + Dense検索でベースラインを構築(1週間)
- RAGAS/DeepEvalで評価パイプラインを構築(1週間)
- ハイブリッド検索を追加してRecallを改善(1週間)
- リランキングを追加してPrecisionを改善(1週間)
- クエリ別の動的alphaと評価に基づく持続的な改善(継続)
「一度にすべてを適用しよう」というアプローチは失敗します。各段階を追加するたびに評価指標の変化を確認し、むしろ悪化した場合は即座にロールバックすることが、プロダクションでの正攻法です。
参考資料
- LangChain Text Splitters公式ドキュメント
- Weaviate Hybrid Search Explained
- Cohere Rerank公式ドキュメント
- BAAI/bge-reranker-v2-m3 (Hugging Face)
- RAGAS: Automated Evaluation of RAG (arXiv)
- DeepEval RAG Evaluation Guide
- Pinecone Chunking Strategies
- FlagEmbedding GitHub (FlagOpen)
- Optimizing RAG with Hybrid Search and Reranking (VectorHub)