- Published on
RAGパイプライン本番構築ガイド: ベクトルDB選定からチャンキング・リランキング・評価まで
- Authors
- Name
- はじめに
- RAGアーキテクチャ概要
- 埋め込みモデル比較
- ベクトルデータベース比較
- チャンキング戦略
- 検索最適化戦略
- リランキング
- RAGASを活用したRAG評価
- 本番デプロイパターン
- 障害事例と対応
- 本番チェックリスト
- 参考資料

はじめに
RAG(Retrieval-Augmented Generation)は、LLMのハルシネーション(幻覚)問題を解決し、最新情報に基づいた回答を生成するための重要なアーキテクチャパターンである。単に「検索してプロンプトに入れる」だけに見えるかもしれないが、本番環境では埋め込みモデルの選定、ベクトルDBの運用、チャンキング戦略、ハイブリッド検索、リランキング、評価体系まで数多くのエンジニアリング判断が必要となる。
本記事では、RAGパイプラインの全体アーキテクチャを分析し、各構成要素の選定基準と実装方法を実践的なコードとともに解説する。特に本番環境で頻繁に発生する障害パターンとその対応戦略、そしてRAGASを活用した体系的な評価方法論まで包括的に扱う。
RAGアーキテクチャ概要
RAGパイプラインは大きく3つのステージで構成される:Retrieval(検索)、Augmentation(拡張)、Generation(生成)。各ステージは独立して最適化でき、パイプライン全体の品質は最も弱いステージによって決まる。
エンドツーエンドのアーキテクチャフロー
[ユーザークエリ]
│
▼
[クエリ処理] ← クエリ分解、HyDE、クエリ拡張
│
▼
[検索] ← BM25 + ベクトル検索(ハイブリッド)
│
▼
[リランキング] ← Cross-encoder、Cohere Rerank
│
▼
[コンテキスト組立] ← チャンク統合、重複排除、トークン制限
│
▼
[生成] ← LLMにコンテキストとクエリを渡す
│
▼
[後処理] ← 出典表示、信頼度スコア、ハルシネーション検証
基本的なRAG実装
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# 1. ドキュメントチャンキング
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)
# 2. 埋め込みとベクトルストア作成
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_name="production_docs"
)
# 3. Retriever設定
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20}
)
# 4. RAGチェーン構築
template = """以下のコンテキストに基づいて質問に回答してください。
コンテキストから答えが見つからない場合は「情報が不足しています」と回答してください。
コンテキスト:
{context}
質問: {question}
回答:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
response = rag_chain.invoke("RAGパイプラインの主要な構成要素は?")
埋め込みモデル比較
埋め込みモデルは、RAGパイプラインの検索品質を決定する最も重要な要素の一つである。2025-2026年現在の主要な埋め込みモデルの特性を比較する。
| モデル | 次元数 | 最大トークン | MTEB平均 | 多言語 | コスト | 特徴 |
|---|---|---|---|---|---|---|
| OpenAI text-embedding-3-large | 3072 | 8191 | 64.6 | O | $0.13/1M tokens | 次元削減可能 |
| OpenAI text-embedding-3-small | 1536 | 8191 | 62.3 | O | $0.02/1M tokens | コスト効率的 |
| Cohere embed-v3 | 1024 | 512 | 64.5 | O (100+) | $0.10/1M tokens | 検索特化 |
| BGE-M3 | 1024 | 8192 | 66.1 | O (100+) | 無料(セルフホスト) | Dense+Sparse+ColBERT |
| E5-mistral-7b-instruct | 4096 | 32768 | 66.6 | O | 無料(セルフホスト) | 指示ベース埋め込み |
| Jina-embeddings-v3 | 1024 | 8192 | 65.5 | O (89) | $0.02/1M tokens | タスク特化LoRA |
埋め込みモデル選定基準
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
class EmbeddingBenchmark:
"""埋め込みモデル比較ベンチマーク"""
def __init__(self, models: List[str]):
self.models = {}
for model_name in models:
self.models[model_name] = SentenceTransformer(model_name)
def evaluate_retrieval_quality(
self,
queries: List[str],
relevant_docs: List[List[str]],
corpus: List[str]
) -> dict:
results = {}
for name, model in self.models.items():
corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
query_embeddings = model.encode(queries, normalize_embeddings=True)
# コサイン類似度計算
scores = np.dot(query_embeddings, corpus_embeddings.T)
# Recall@k計算
recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)
results[name] = {
"recall@5": recall_at_5,
"recall@10": recall_at_10,
"embedding_dim": model.get_sentence_embedding_dimension(),
"encoding_speed": self._measure_speed(model, queries)
}
return results
def _compute_recall(self, scores, relevant_docs, k):
recalls = []
for i, rel_docs in enumerate(relevant_docs):
top_k_indices = np.argsort(scores[i])[-k:]
retrieved = set(top_k_indices)
relevant = set(rel_docs)
recalls.append(len(retrieved & relevant) / len(relevant))
return np.mean(recalls)
def _measure_speed(self, model, texts, n_runs=3):
import time
times = []
for _ in range(n_runs):
start = time.time()
model.encode(texts)
times.append(time.time() - start)
return np.mean(times)
ベクトルデータベース比較
ベクトルデータベースの選定は、RAGシステムの運用複雑度、コスト、パフォーマンスに直接影響する。主要なベクトルDBを多角的に比較する。
| 項目 | Pinecone | Milvus | Weaviate | Qdrant | Chroma |
|---|---|---|---|---|---|
| デプロイ方式 | フルマネージド | セルフホスト/Zilliz Cloud | セルフホスト/Cloud | セルフホスト/Cloud | インメモリ/セルフホスト |
| 最大ベクトル数 | 数十億 | 数十億 | 数十億 | 数十億 | 数百万 |
| 検索アルゴリズム | 独自エンジン | HNSW, IVF, DiskANN | HNSW | HNSW | HNSW |
| ハイブリッド検索 | O(Sparse+Dense) | O(BM25+Dense) | O(BM25+Dense) | O(Sparse+Dense) | X |
| マルチテナンシー | O(ネームスペース) | O(パーティション) | O(テナント) | O(コレクション) | O(コレクション) |
| フィルタリング | メタデータフィルタ | スカラーフィルタリング | GraphQLフィルタ | ペイロードフィルタ | メタデータフィルタ |
| 価格(100万ベクトル) | 約$70/月 | 無料(セルフホスト) | 無料(セルフホスト) | 無料(セルフホスト) | 無料 |
| 適用用途 | クイックプロト、マネージド志向 | 大規模エンタープライズ | GraphQLエコシステム | 高性能フィルタリング | 開発/小規模 |
Qdrantを活用した本番ベクトル検索
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue,
SearchParams, HnswConfigDiff
)
import uuid
class ProductionVectorStore:
"""本番ベクトルストア管理クラス"""
def __init__(self, url: str, api_key: str, collection_name: str):
self.client = QdrantClient(url=url, api_key=api_key)
self.collection_name = collection_name
def create_collection(self, vector_size: int = 1536):
"""HNSW インデックス最適化されたコレクション作成"""
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE,
on_disk=True # 大規模データセット向けディスクベース
),
hnsw_config=HnswConfigDiff(
m=16, # グラフ接続数(デフォルト: 16)
ef_construct=128, # インデックス構築時の探索範囲
full_scan_threshold=10000 # この数以下なら全スキャン
),
optimizers_config={
"indexing_threshold": 20000,
"memmap_threshold": 50000
}
)
def upsert_documents(self, documents: list, embeddings: list, metadata: list):
"""バッチアップサート"""
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={**meta, "text": doc}
)
for doc, embedding, meta in zip(documents, embeddings, metadata)
]
# バッチサイズ100で分割アップロード
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch,
wait=True
)
def hybrid_search(self, query_vector: list, query_text: str,
filters: dict = None, top_k: int = 10):
"""ハイブリッド検索(ベクトル + メタデータフィルタ)"""
search_filter = None
if filters:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in filters.items()
]
search_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=search_filter,
limit=top_k,
search_params=SearchParams(
hnsw_ef=128, # 検索時の探索範囲(高いほど正確だが遅い)
exact=False # 近似検索を使用
)
)
return results
チャンキング戦略
チャンキングは、RAG品質に最も大きな影響を与える前処理ステップである。不適切なチャンキングは関連情報を分離させたり、不要なノイズを含めてしまい、検索品質を大幅に低下させる。
チャンキング戦略比較
| 戦略 | 利点 | 欠点 | 適用用途 |
|---|---|---|---|
| 固定サイズ | 実装簡単、予測可能 | 意味的境界を無視 | 均一なテキスト |
| 再帰的 | 段落/文の境界を尊重 | 最適サイズのチューニング必要 | 汎用ドキュメント |
| セマンティック | 意味単位を保持 | 計算コスト高い | 構造のないテキスト |
| ドキュメント構造ベース | 見出し/セクション階層を維持 | パーサー実装が必要 | Markdown、HTML |
| 親子(Parent-Child) | 検索精度 + 十分なコンテキスト | ストレージ2倍 | 技術ドキュメント |
セマンティックチャンキング実装
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np
class AdvancedChunker:
"""複数のチャンキング戦略をサポートする高度なチャンカー"""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
"""セマンティックチャンキング - 意味変化ポイントで分割"""
chunker = SemanticChunker(
self.embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=breakpoint_threshold * 100
)
return chunker.split_text(text)
def parent_child_chunk(self, text: str,
parent_size: int = 2000,
child_size: int = 400,
child_overlap: int = 50):
"""親子チャンキング - 検索は子、コンテキストは親"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 親チャンク生成
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=parent_size,
chunk_overlap=0
)
parent_chunks = parent_splitter.split_text(text)
# 各親から子チャンク生成
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=child_size,
chunk_overlap=child_overlap
)
result = []
for i, parent in enumerate(parent_chunks):
children = child_splitter.split_text(parent)
for child in children:
result.append({
"child_text": child,
"parent_text": parent,
"parent_id": i
})
return result
def markdown_structure_chunk(self, markdown_text: str):
"""Markdownドキュメント構造ベースのチャンキング"""
from langchain.text_splitter import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
chunks = splitter.split_text(markdown_text)
# 各チャンクに階層構造メタデータを追加
enriched_chunks = []
for chunk in chunks:
enriched_chunks.append({
"text": chunk.page_content,
"metadata": chunk.metadata,
"hierarchy": " > ".join(
chunk.metadata.get(h, "")
for h in ["h1", "h2", "h3"]
if chunk.metadata.get(h)
)
})
return enriched_chunks
検索最適化戦略
単純なベクトル類似度検索だけでは、本番レベルの検索品質を達成することは難しい。ハイブリッド検索、HyDE、クエリ分解などの最適化技法を組み合わせる必要がある。
ハイブリッド検索(BM25 + ベクトル)
from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple
class HybridRetriever:
"""BM25 + ベクトル検索を組み合わせたハイブリッド検索器"""
def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
self.documents = documents
self.embeddings = OpenAIEmbeddings(model=embeddings_model)
# BM25インデックス構築
tokenized_docs = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
# ベクトルインデックス構築
self.doc_embeddings = np.array(
self.embeddings.embed_documents(documents)
)
def search(self, query: str, top_k: int = 5,
alpha: float = 0.5) -> List[Tuple[str, float]]:
"""
ハイブリッド検索の実行
alpha: ベクトル検索の重み(0=BM25のみ、1=ベクトルのみ)
"""
# BM25スコア
bm25_scores = self.bm25.get_scores(query.lower().split())
bm25_scores = self._normalize_scores(bm25_scores)
# ベクトル類似度スコア
query_embedding = np.array(self.embeddings.embed_query(query))
vector_scores = np.dot(self.doc_embeddings, query_embedding)
vector_scores = self._normalize_scores(vector_scores)
# 重み付き結合(Reciprocal Rank Fusionの代替)
combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
# 上位k件を返す
top_indices = np.argsort(combined_scores)[-top_k:][::-1]
return [
(self.documents[i], combined_scores[i])
for i in top_indices
]
def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
"""Min-Max正規化"""
min_s, max_s = scores.min(), scores.max()
if max_s == min_s:
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
HyDE(Hypothetical Document Embedding)
HyDEは、クエリに対する仮想的な回答をまず生成し、その回答の埋め込みを検索に使用する技法である。質問と回答の間の意味的なギャップを縮め、検索品質を向上させる。
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
class HyDERetriever:
"""HyDEベースの検索器"""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
self.hyde_prompt = ChatPromptTemplate.from_template(
"次の質問に対して仮想的な回答を作成してください。"
"実際の正確性よりも、関連するキーワードと概念を含めることが重要です。\n\n"
"質問: {question}\n\n"
"仮想回答:"
)
def retrieve(self, query: str, top_k: int = 5):
"""HyDE検索の実行"""
# 1. 仮想回答の生成
chain = self.hyde_prompt | self.llm
hypothetical_answer = chain.invoke({"question": query}).content
# 2. 仮想回答の埋め込み生成
hyde_embedding = self.embeddings.embed_query(hypothetical_answer)
# 3. 仮想回答の埋め込みで検索
results = self.vectorstore.similarity_search_by_vector(
hyde_embedding, k=top_k
)
return results
クエリ分解(Query Decomposition)
複雑な質問をサブ質問に分解し、それぞれで検索した後、結果を統合する戦略である。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json
class QueryDecomposer:
"""複雑なクエリをサブクエリに分解"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.decompose_prompt = ChatPromptTemplate.from_template(
"次の質問を検索に最適化された2〜4個のサブ質問に分解してください。\n"
"JSON配列形式で返してください。\n\n"
"元の質問: {question}\n\n"
"サブ質問:"
)
def decompose(self, question: str) -> list:
"""クエリ分解"""
chain = self.decompose_prompt | self.llm
result = chain.invoke({"question": question}).content
try:
sub_queries = json.loads(result)
except json.JSONDecodeError:
sub_queries = [question]
return sub_queries
def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
"""分解されたクエリで検索し結果を統合"""
sub_queries = self.decompose(question)
all_docs = []
seen_contents = set()
for sub_query in sub_queries:
docs = retriever.invoke(sub_query)[:top_k]
for doc in docs:
if doc.page_content not in seen_contents:
seen_contents.add(doc.page_content)
all_docs.append(doc)
return all_docs
リランキング
初期検索結果をより精密なモデルで再評価し、順位を並べ替えるステップである。Bi-encoderベースのベクトル検索よりも、Cross-encoderベースのリランキングがより正確な関連性判定を行える。
Cohere Rerankの活用
import cohere
from typing import List, Dict
class CohereReranker:
"""Cohere Rerank APIを活用したリランキング"""
def __init__(self, api_key: str, model: str = "rerank-v3.5"):
self.client = cohere.Client(api_key)
self.model = model
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""ドキュメントのリランキング"""
response = self.client.rerank(
model=self.model,
query=query,
documents=documents,
top_n=top_n,
return_documents=True
)
results = []
for hit in response.results:
results.append({
"text": hit.document.text,
"relevance_score": hit.relevance_score,
"index": hit.index
})
return results
class CrossEncoderReranker:
"""ローカルCross-Encoderモデルベースのリランキング"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""Cross-encoderでリランキング"""
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
# スコア順にソート
scored_docs = list(zip(documents, scores, range(len(documents))))
scored_docs.sort(key=lambda x: x[1], reverse=True)
results = []
for text, score, idx in scored_docs[:top_n]:
results.append({
"text": text,
"relevance_score": float(score),
"index": idx
})
return results
リランキングパイプライン統合
class RAGPipelineWithReranking:
"""リランキング統合RAGパイプライン"""
def __init__(self, retriever, reranker, llm):
self.retriever = retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str, top_k_retrieve: int = 20,
top_k_rerank: int = 5) -> str:
"""検索 -> リランキング -> 生成"""
# ステージ1: 初期検索(広範囲)
initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
doc_texts = [doc.page_content for doc in initial_docs]
# ステージ2: リランキング(精密)
reranked = self.reranker.rerank(
query=question,
documents=doc_texts,
top_n=top_k_rerank
)
# ステージ3: コンテキスト組立
context = "\n\n---\n\n".join(
r["text"] for r in reranked
)
# ステージ4: LLM生成
prompt = f"""以下のコンテキストに基づいて質問に回答してください。
各情報の出典を表示してください。
コンテキスト:
{context}
質問: {question}
回答:"""
response = self.llm.invoke(prompt)
return response.content
RAGASを活用したRAG評価
RAGAS(Retrieval Augmented Generation Assessment)は、RAGパイプラインを自動的に評価するフレームワークである。検索品質と生成品質を別々のメトリクスで測定する。
RAGASコアメトリクス
| メトリクス | 測定対象 | 説明 | 理想値 |
|---|---|---|---|
| Context Precision | 検索 | 検索されたコンテキスト中の関連率 | 0.8以上 |
| Context Recall | 検索 | 必要情報がコンテキストに含まれる率 | 0.9以上 |
| Faithfulness | 生成 | 回答がコンテキストに根拠を持つ度合い | 0.9以上 |
| Answer Relevancy | 生成 | 回答が質問に適切である度合い | 0.8以上 |
| Answer Correctness | 全体 | 正解と比較した正確度 | 0.7以上 |
RAGAS評価実装
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
)
from datasets import Dataset
class RAGEvaluator:
"""RAGASベースのRAG評価器"""
def __init__(self):
self.metrics = [
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
]
def create_eval_dataset(self, eval_samples: list) -> Dataset:
"""評価データセット作成"""
data = {
"question": [],
"answer": [],
"contexts": [],
"ground_truth": []
}
for sample in eval_samples:
data["question"].append(sample["question"])
data["answer"].append(sample["generated_answer"])
data["contexts"].append(sample["retrieved_contexts"])
data["ground_truth"].append(sample["expected_answer"])
return Dataset.from_dict(data)
def evaluate(self, eval_samples: list) -> dict:
"""RAGパイプライン評価の実行"""
dataset = self.create_eval_dataset(eval_samples)
result = evaluate(
dataset=dataset,
metrics=self.metrics
)
return {
"context_precision": result["context_precision"],
"context_recall": result["context_recall"],
"faithfulness": result["faithfulness"],
"answer_relevancy": result["answer_relevancy"],
"answer_correctness": result["answer_correctness"],
"overall_score": sum(result.values()) / len(result)
}
def generate_report(self, results: dict) -> str:
"""評価レポート生成"""
report = "=== RAG Pipeline Evaluation Report ===\n\n"
for metric, score in results.items():
status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
report += f" {metric}: {score:.4f} [{status}]\n"
report += f"\n Overall: {results.get('overall_score', 0):.4f}\n"
return report
本番デプロイパターン
非同期インデクシングパイプライン
本番環境では、ドキュメントのインデクシングと検索サービングを分離する必要がある。新しいドキュメントが追加された際、非同期で埋め込みを生成しインデックスを更新する。
import asyncio
from datetime import datetime
from typing import Optional
import hashlib
class AsyncIndexingPipeline:
"""非同期ドキュメントインデクシングパイプライン"""
def __init__(self, embeddings, vector_store, chunker):
self.embeddings = embeddings
self.vector_store = vector_store
self.chunker = chunker
self.index_queue = asyncio.Queue()
self._processed_hashes = set()
async def enqueue_document(self, doc_id: str, content: str,
metadata: Optional[dict] = None):
"""ドキュメントをインデクシングキューに追加"""
doc_hash = hashlib.md5(content.encode()).hexdigest()
if doc_hash in self._processed_hashes:
return {"status": "skipped", "reason": "duplicate"}
await self.index_queue.put({
"doc_id": doc_id,
"content": content,
"metadata": metadata or {},
"hash": doc_hash,
"enqueued_at": datetime.utcnow().isoformat()
})
return {"status": "enqueued", "queue_size": self.index_queue.qsize()}
async def process_queue(self, batch_size: int = 10):
"""キューからバッチ単位でドキュメントを処理"""
while True:
batch = []
try:
for _ in range(batch_size):
item = self.index_queue.get_nowait()
batch.append(item)
except asyncio.QueueEmpty:
pass
if batch:
await self._process_batch(batch)
else:
await asyncio.sleep(1)
async def _process_batch(self, batch: list):
"""バッチ処理: チャンキング -> 埋め込み -> アップサート"""
all_chunks = []
all_metadata = []
for item in batch:
chunks = self.chunker.semantic_chunk(item["content"])
for chunk in chunks:
all_chunks.append(chunk)
all_metadata.append({
"doc_id": item["doc_id"],
"hash": item["hash"],
**item["metadata"]
})
# バッチ埋め込み生成
chunk_embeddings = self.embeddings.embed_documents(all_chunks)
# ベクトルストアにアップサート
self.vector_store.upsert_documents(
documents=all_chunks,
embeddings=chunk_embeddings,
metadata=all_metadata
)
for item in batch:
self._processed_hashes.add(item["hash"])
検索結果キャッシング
頻繁に繰り返されるクエリに対してキャッシングを適用し、レスポンス速度を向上させAPIコストを削減する。
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis
class RAGCache:
"""RAG検索結果キャッシュ"""
def __init__(self, redis_url: str = "redis://localhost:6379",
ttl_seconds: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl_seconds
def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
"""クエリとフィルタからキャッシュキーを生成"""
key_data = {"query": query.lower().strip(), "filters": filters or {}}
key_hash = hashlib.sha256(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()
return f"rag:cache:{key_hash}"
def get(self, query: str, filters: Optional[dict] = None):
"""キャッシュから検索結果を取得"""
key = self._cache_key(query, filters)
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
data["cache_hit"] = True
return data
return None
def set(self, query: str, results: list,
filters: Optional[dict] = None):
"""検索結果をキャッシュに保存"""
key = self._cache_key(query, filters)
data = {
"results": results,
"cached_at": datetime.utcnow().isoformat(),
"query": query
}
self.redis.setex(key, self.ttl, json.dumps(data))
def invalidate_by_pattern(self, pattern: str):
"""パターンマッチングによるキャッシュ無効化"""
keys = self.redis.keys(f"rag:cache:*{pattern}*")
if keys:
self.redis.delete(*keys)
障害事例と対応
検索品質劣化パターン
| 障害タイプ | 症状 | 原因 | 対応策 |
|---|---|---|---|
| 埋め込みドリフト | 時間経過に伴う検索精度の低下 | 埋め込みモデルバージョン変更、データ分布変化 | 定期的な再インデクシング、モデルバージョン固定 |
| チャンク不整合 | 関連情報が検索されない | チャンクサイズ/戦略の不適切さ | チャンクサイズの実験、オーバーラップ調整 |
| フィルタ過学習 | 結果0件のクエリ増加 | メタデータフィルタが厳しすぎる | フィルタフォールバック戦略、フィルタ緩和 |
| コンテキスト汚染 | LLMが無関係な情報で回答 | リランキング不在、top-k過多 | リランキング導入、top-k削減 |
| レイテンシ増加 | レスポンス時間2秒超過 | インデックス非最適化、ネットワーク遅延 | インデックスチューニング、キャッシング、非同期処理 |
埋め込みドリフトモニタリング
import numpy as np
from datetime import datetime
from typing import List
class EmbeddingDriftMonitor:
"""埋め込みドリフト検出およびモニタリング"""
def __init__(self, reference_embeddings: np.ndarray):
self.reference_mean = np.mean(reference_embeddings, axis=0)
self.reference_std = np.std(reference_embeddings, axis=0)
self.drift_history = []
def check_drift(self, new_embeddings: np.ndarray,
threshold: float = 0.1) -> dict:
"""新しい埋め込みのドリフト検査"""
new_mean = np.mean(new_embeddings, axis=0)
# コサイン距離でドリフトを測定
cosine_sim = np.dot(self.reference_mean, new_mean) / (
np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
)
drift_score = 1 - cosine_sim
# 分布比較(KLダイバージェンスの近似)
distribution_shift = np.mean(
np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
/ (self.reference_std + 1e-8)
)
result = {
"drift_score": float(drift_score),
"distribution_shift": float(distribution_shift),
"is_drifted": drift_score > threshold,
"timestamp": datetime.utcnow().isoformat(),
"recommendation": (
"再インデクシングが必要" if drift_score > threshold
else "正常範囲"
)
}
self.drift_history.append(result)
return result
検索品質劣化時のフォールバック戦略
class RAGWithFallback:
"""フォールバック戦略を含むRAGパイプライン"""
def __init__(self, primary_retriever, fallback_retriever,
reranker, llm):
self.primary = primary_retriever
self.fallback = fallback_retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str) -> dict:
"""フォールバックを含むクエリ処理"""
# プライマリ検索
primary_docs = self.primary.invoke(question)
if not primary_docs:
# フォールバック1: フィルタ緩和
primary_docs = self.fallback.invoke(question)
if not primary_docs:
return {
"answer": "関連情報が見つかりませんでした。",
"confidence": 0.0,
"source": "no_results"
}
# リランキング
reranked = self.reranker.rerank(
query=question,
documents=[d.page_content for d in primary_docs],
top_n=5
)
# 最低関連度の検証
if reranked[0]["relevance_score"] < 0.3:
return {
"answer": "高い信頼度の回答を生成できません。",
"confidence": reranked[0]["relevance_score"],
"source": "low_confidence"
}
# LLM生成
context = "\n\n".join(r["text"] for r in reranked)
answer = self.llm.invoke(
f"コンテキスト:\n{context}\n\n質問: {question}\n回答:"
).content
return {
"answer": answer,
"confidence": reranked[0]["relevance_score"],
"source": "primary",
"num_sources": len(reranked)
}
本番チェックリスト
設計フェーズ
- ユースケースに応じた埋め込みモデルベンチマークの実施
- ベクトルDB選定: 規模、予算、運用能力を考慮
- チャンキング戦略のA/Bテスト計画策定
- 評価データセット(最低100個のQAペア)構築
開発フェーズ
- ハイブリッド検索(BM25 + ベクトル)実装
- リランキングパイプライン統合
- 非同期インデクシングパイプライン構築
- キャッシングレイヤー実装(Redis/Memcached)
- エラーハンドリングおよびフォールバック戦略実装
デプロイフェーズ
- RAGASメトリクスベースの品質ゲート設定
- 埋め込みドリフトモニタリングダッシュボード構築
- 検索レイテンシアラート設定(P95基準500ms)
- インデックス再構築自動化パイプライン構築
- ロードテスト実施(目標QPS比2倍)
運用フェーズ
- 週次の検索品質レポート生成
- 月次の埋め込みモデルアップデートレビュー
- ユーザーフィードバックベースの評価データセット更新
- コストモニタリング(埋め込みAPI呼び出し、ベクトルDBストレージ)
- 定期的なインデックス再構築(四半期に1回以上)