- Published on
RAG Pipeline Production Guide: From Vector DB Selection to Chunking, Reranking, and Evaluation
- Authors
- Name
- Introduction
- RAG Architecture Overview
- Embedding Model Comparison
- Vector Database Comparison
- Chunking Strategies
- Retrieval Optimization Strategies
- Reranking
- RAG Evaluation with RAGAS
- Production Deployment Patterns
- Failure Cases and Recovery
- Production Checklist
- References

Introduction
RAG (Retrieval-Augmented Generation) is a critical architecture pattern for solving LLM hallucination problems and generating answers grounded in up-to-date information. While it may seem as simple as "search and paste into a prompt," production environments require numerous engineering decisions: embedding model selection, vector DB operations, chunking strategies, hybrid search, reranking, and evaluation frameworks.
This article analyzes the full RAG pipeline architecture, covering selection criteria and implementation methods for each component with production-ready code. We also examine common failure patterns in production, recovery strategies, and systematic evaluation methodologies using RAGAS.
RAG Architecture Overview
A RAG pipeline consists of three main stages: Retrieval, Augmentation, and Generation. Each stage can be optimized independently, and the overall pipeline quality is determined by its weakest stage.
End-to-End Architecture Flow
[User Query]
|
v
[Query Processing] <- Query decomposition, HyDE, query expansion
|
v
[Retrieval] <- BM25 + Vector search (hybrid)
|
v
[Reranking] <- Cross-encoder, Cohere Rerank
|
v
[Context Assembly] <- Chunk merging, deduplication, token limit
|
v
[Generation] <- Pass context and query to LLM
|
v
[Post-processing] <- Source attribution, confidence scores, hallucination check
Basic RAG Implementation
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# 1. Document chunking
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)
# 2. Embedding and vector store creation
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_name="production_docs"
)
# 3. Retriever configuration
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20}
)
# 4. RAG chain assembly
template = """Answer the question based on the following context.
If the answer cannot be found in the context, respond with "Insufficient information."
Context:
{context}
Question: {question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
response = rag_chain.invoke("What are the key components of a RAG pipeline?")
Embedding Model Comparison
The embedding model is one of the most critical factors determining RAG pipeline retrieval quality. Here is a comparison of major embedding models as of 2025-2026.
| Model | Dimensions | Max Tokens | MTEB Average | Multilingual | Cost | Features |
|---|---|---|---|---|---|---|
| OpenAI text-embedding-3-large | 3072 | 8191 | 64.6 | Yes | $0.13/1M tokens | Dimension reduction support |
| OpenAI text-embedding-3-small | 1536 | 8191 | 62.3 | Yes | $0.02/1M tokens | Cost-effective |
| Cohere embed-v3 | 1024 | 512 | 64.5 | Yes (100+) | $0.10/1M tokens | Search-optimized |
| BGE-M3 | 1024 | 8192 | 66.1 | Yes (100+) | Free (self-hosted) | Dense+Sparse+ColBERT |
| E5-mistral-7b-instruct | 4096 | 32768 | 66.6 | Yes | Free (self-hosted) | Instruction-based embedding |
| Jina-embeddings-v3 | 1024 | 8192 | 65.5 | Yes (89) | $0.02/1M tokens | Task-specific LoRA |
Embedding Model Selection Criteria
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
class EmbeddingBenchmark:
"""Embedding model comparison benchmark"""
def __init__(self, models: List[str]):
self.models = {}
for model_name in models:
self.models[model_name] = SentenceTransformer(model_name)
def evaluate_retrieval_quality(
self,
queries: List[str],
relevant_docs: List[List[str]],
corpus: List[str]
) -> dict:
results = {}
for name, model in self.models.items():
corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
query_embeddings = model.encode(queries, normalize_embeddings=True)
# Cosine similarity calculation
scores = np.dot(query_embeddings, corpus_embeddings.T)
# Recall@k calculation
recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)
results[name] = {
"recall@5": recall_at_5,
"recall@10": recall_at_10,
"embedding_dim": model.get_sentence_embedding_dimension(),
"encoding_speed": self._measure_speed(model, queries)
}
return results
def _compute_recall(self, scores, relevant_docs, k):
recalls = []
for i, rel_docs in enumerate(relevant_docs):
top_k_indices = np.argsort(scores[i])[-k:]
retrieved = set(top_k_indices)
relevant = set(rel_docs)
recalls.append(len(retrieved & relevant) / len(relevant))
return np.mean(recalls)
def _measure_speed(self, model, texts, n_runs=3):
import time
times = []
for _ in range(n_runs):
start = time.time()
model.encode(texts)
times.append(time.time() - start)
return np.mean(times)
Vector Database Comparison
Vector database selection directly impacts the operational complexity, cost, and performance of a RAG system. Here is a multi-dimensional comparison of major vector databases.
| Feature | Pinecone | Milvus | Weaviate | Qdrant | Chroma |
|---|---|---|---|---|---|
| Deployment | Fully managed | Self-hosted/Zilliz Cloud | Self-hosted/Cloud | Self-hosted/Cloud | In-memory/Self-hosted |
| Max Vectors | Billions | Billions | Billions | Billions | Millions |
| Search Algorithm | Proprietary | HNSW, IVF, DiskANN | HNSW | HNSW | HNSW |
| Hybrid Search | Yes (Sparse+Dense) | Yes (BM25+Dense) | Yes (BM25+Dense) | Yes (Sparse+Dense) | No |
| Multi-tenancy | Yes (namespaces) | Yes (partitions) | Yes (tenants) | Yes (collections) | Yes (collections) |
| Filtering | Metadata filters | Scalar filtering | GraphQL filters | Payload filters | Metadata filters |
| Price (1M vectors) | ~$70/month | Free (self-hosted) | Free (self-hosted) | Free (self-hosted) | Free |
| Best For | Quick prototyping, managed | Large enterprise | GraphQL ecosystem | High-perf filtering | Dev/small scale |
Production Vector Search with Qdrant
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue,
SearchParams, HnswConfigDiff
)
import uuid
class ProductionVectorStore:
"""Production vector store manager"""
def __init__(self, url: str, api_key: str, collection_name: str):
self.client = QdrantClient(url=url, api_key=api_key)
self.collection_name = collection_name
def create_collection(self, vector_size: int = 1536):
"""Create collection with optimized HNSW index"""
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE,
on_disk=True # Disk-based storage for large datasets
),
hnsw_config=HnswConfigDiff(
m=16, # Graph connectivity (default: 16)
ef_construct=128, # Search range during index build
full_scan_threshold=10000 # Full scan below this count
),
optimizers_config={
"indexing_threshold": 20000,
"memmap_threshold": 50000
}
)
def upsert_documents(self, documents: list, embeddings: list, metadata: list):
"""Batch upsert"""
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={**meta, "text": doc}
)
for doc, embedding, meta in zip(documents, embeddings, metadata)
]
# Upload in batches of 100
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch,
wait=True
)
def hybrid_search(self, query_vector: list, query_text: str,
filters: dict = None, top_k: int = 10):
"""Hybrid search (vector + metadata filter)"""
search_filter = None
if filters:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in filters.items()
]
search_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=search_filter,
limit=top_k,
search_params=SearchParams(
hnsw_ef=128, # Search range (higher = more accurate, slower)
exact=False # Use approximate search
)
)
return results
Chunking Strategies
Chunking is the preprocessing stage that has the greatest impact on RAG quality. Poor chunking can separate related information or include unnecessary noise, significantly degrading retrieval quality.
Chunking Strategy Comparison
| Strategy | Pros | Cons | Best For |
|---|---|---|---|
| Fixed-size | Simple to implement, predictable | Ignores semantic boundaries | Uniform text |
| Recursive | Respects paragraph/sentence boundaries | Requires size tuning | General documents |
| Semantic | Preserves semantic units | High computational cost | Unstructured text |
| Document-structure | Maintains heading/section hierarchy | Requires parser implementation | Markdown, HTML |
| Parent-Child | Search precision + sufficient context | 2x storage space | Technical documentation |
Semantic Chunking Implementation
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np
class AdvancedChunker:
"""Advanced chunker supporting multiple chunking strategies"""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
"""Semantic chunking - split at semantic change points"""
chunker = SemanticChunker(
self.embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=breakpoint_threshold * 100
)
return chunker.split_text(text)
def parent_child_chunk(self, text: str,
parent_size: int = 2000,
child_size: int = 400,
child_overlap: int = 50):
"""Parent-child chunking - search on children, context from parents"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Create parent chunks
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=parent_size,
chunk_overlap=0
)
parent_chunks = parent_splitter.split_text(text)
# Create child chunks from each parent
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=child_size,
chunk_overlap=child_overlap
)
result = []
for i, parent in enumerate(parent_chunks):
children = child_splitter.split_text(parent)
for child in children:
result.append({
"child_text": child,
"parent_text": parent,
"parent_id": i
})
return result
def markdown_structure_chunk(self, markdown_text: str):
"""Markdown document structure-based chunking"""
from langchain.text_splitter import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
chunks = splitter.split_text(markdown_text)
# Add hierarchy metadata to each chunk
enriched_chunks = []
for chunk in chunks:
enriched_chunks.append({
"text": chunk.page_content,
"metadata": chunk.metadata,
"hierarchy": " > ".join(
chunk.metadata.get(h, "")
for h in ["h1", "h2", "h3"]
if chunk.metadata.get(h)
)
})
return enriched_chunks
Retrieval Optimization Strategies
Simple vector similarity search alone cannot achieve production-grade retrieval quality. Various optimization techniques such as hybrid search, HyDE, and query decomposition must be combined.
Hybrid Search (BM25 + Vector)
from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple
class HybridRetriever:
"""Hybrid retriever combining BM25 + vector search"""
def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
self.documents = documents
self.embeddings = OpenAIEmbeddings(model=embeddings_model)
# Build BM25 index
tokenized_docs = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
# Build vector index
self.doc_embeddings = np.array(
self.embeddings.embed_documents(documents)
)
def search(self, query: str, top_k: int = 5,
alpha: float = 0.5) -> List[Tuple[str, float]]:
"""
Perform hybrid search
alpha: vector search weight (0=BM25 only, 1=vector only)
"""
# BM25 scores
bm25_scores = self.bm25.get_scores(query.lower().split())
bm25_scores = self._normalize_scores(bm25_scores)
# Vector similarity scores
query_embedding = np.array(self.embeddings.embed_query(query))
vector_scores = np.dot(self.doc_embeddings, query_embedding)
vector_scores = self._normalize_scores(vector_scores)
# Weighted combination (Reciprocal Rank Fusion alternative)
combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
# Return top-k results
top_indices = np.argsort(combined_scores)[-top_k:][::-1]
return [
(self.documents[i], combined_scores[i])
for i in top_indices
]
def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
"""Min-Max normalization"""
min_s, max_s = scores.min(), scores.max()
if max_s == min_s:
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
HyDE (Hypothetical Document Embedding)
HyDE generates a hypothetical answer to the query first, then uses that answer's embedding for retrieval. This bridges the semantic gap between questions and answers, improving retrieval quality.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
class HyDERetriever:
"""HyDE-based retriever"""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
self.hyde_prompt = ChatPromptTemplate.from_template(
"Write a hypothetical answer to the following question. "
"It is more important to include relevant keywords and concepts "
"than to be factually accurate.\n\n"
"Question: {question}\n\n"
"Hypothetical answer:"
)
def retrieve(self, query: str, top_k: int = 5):
"""Perform HyDE retrieval"""
# 1. Generate hypothetical answer
chain = self.hyde_prompt | self.llm
hypothetical_answer = chain.invoke({"question": query}).content
# 2. Create embedding from hypothetical answer
hyde_embedding = self.embeddings.embed_query(hypothetical_answer)
# 3. Search using hypothetical answer embedding
results = self.vectorstore.similarity_search_by_vector(
hyde_embedding, k=top_k
)
return results
Query Decomposition
This strategy breaks complex questions into sub-questions, retrieves results for each, then aggregates them.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json
class QueryDecomposer:
"""Decompose complex queries into sub-queries"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.decompose_prompt = ChatPromptTemplate.from_template(
"Decompose the following question into 2-4 sub-questions "
"optimized for retrieval.\n"
"Return as a JSON array.\n\n"
"Original question: {question}\n\n"
"Sub-questions:"
)
def decompose(self, question: str) -> list:
"""Decompose query"""
chain = self.decompose_prompt | self.llm
result = chain.invoke({"question": question}).content
try:
sub_queries = json.loads(result)
except json.JSONDecodeError:
sub_queries = [question]
return sub_queries
def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
"""Retrieve with decomposed queries and merge results"""
sub_queries = self.decompose(question)
all_docs = []
seen_contents = set()
for sub_query in sub_queries:
docs = retriever.invoke(sub_query)[:top_k]
for doc in docs:
if doc.page_content not in seen_contents:
seen_contents.add(doc.page_content)
all_docs.append(doc)
return all_docs
Reranking
Reranking re-evaluates initial search results with a more precise model and reorders them. Cross-encoder-based reranking provides more accurate relevance judgments than bi-encoder-based vector search.
Using Cohere Rerank
import cohere
from typing import List, Dict
class CohereReranker:
"""Reranking using Cohere Rerank API"""
def __init__(self, api_key: str, model: str = "rerank-v3.5"):
self.client = cohere.Client(api_key)
self.model = model
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""Rerank documents"""
response = self.client.rerank(
model=self.model,
query=query,
documents=documents,
top_n=top_n,
return_documents=True
)
results = []
for hit in response.results:
results.append({
"text": hit.document.text,
"relevance_score": hit.relevance_score,
"index": hit.index
})
return results
class CrossEncoderReranker:
"""Local cross-encoder model-based reranking"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""Rerank with cross-encoder"""
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
# Sort by score
scored_docs = list(zip(documents, scores, range(len(documents))))
scored_docs.sort(key=lambda x: x[1], reverse=True)
results = []
for text, score, idx in scored_docs[:top_n]:
results.append({
"text": text,
"relevance_score": float(score),
"index": idx
})
return results
Integrated Reranking Pipeline
class RAGPipelineWithReranking:
"""RAG pipeline with integrated reranking"""
def __init__(self, retriever, reranker, llm):
self.retriever = retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str, top_k_retrieve: int = 20,
top_k_rerank: int = 5) -> str:
"""Retrieve -> Rerank -> Generate"""
# Stage 1: Initial retrieval (broad)
initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
doc_texts = [doc.page_content for doc in initial_docs]
# Stage 2: Reranking (precise)
reranked = self.reranker.rerank(
query=question,
documents=doc_texts,
top_n=top_k_rerank
)
# Stage 3: Context assembly
context = "\n\n---\n\n".join(
r["text"] for r in reranked
)
# Stage 4: LLM generation
prompt = f"""Answer the question based on the following context.
Cite the source of each piece of information.
Context:
{context}
Question: {question}
Answer:"""
response = self.llm.invoke(prompt)
return response.content
RAG Evaluation with RAGAS
RAGAS (Retrieval Augmented Generation Assessment) is a framework for automatically evaluating RAG pipelines. It measures retrieval quality and generation quality with separate metrics.
Core RAGAS Metrics
| Metric | Measures | Description | Target |
|---|---|---|---|
| Context Precision | Retrieval | Proportion of retrieved contexts that are relevant | 0.8+ |
| Context Recall | Retrieval | Proportion of needed information included in context | 0.9+ |
| Faithfulness | Generation | Degree to which the answer is grounded in context | 0.9+ |
| Answer Relevancy | Generation | How well the answer addresses the question | 0.8+ |
| Answer Correctness | Overall | Accuracy compared to ground truth | 0.7+ |
RAGAS Evaluation Implementation
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
)
from datasets import Dataset
class RAGEvaluator:
"""RAGAS-based RAG evaluator"""
def __init__(self):
self.metrics = [
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
]
def create_eval_dataset(self, eval_samples: list) -> Dataset:
"""Create evaluation dataset"""
data = {
"question": [],
"answer": [],
"contexts": [],
"ground_truth": []
}
for sample in eval_samples:
data["question"].append(sample["question"])
data["answer"].append(sample["generated_answer"])
data["contexts"].append(sample["retrieved_contexts"])
data["ground_truth"].append(sample["expected_answer"])
return Dataset.from_dict(data)
def evaluate(self, eval_samples: list) -> dict:
"""Perform RAG pipeline evaluation"""
dataset = self.create_eval_dataset(eval_samples)
result = evaluate(
dataset=dataset,
metrics=self.metrics
)
return {
"context_precision": result["context_precision"],
"context_recall": result["context_recall"],
"faithfulness": result["faithfulness"],
"answer_relevancy": result["answer_relevancy"],
"answer_correctness": result["answer_correctness"],
"overall_score": sum(result.values()) / len(result)
}
def generate_report(self, results: dict) -> str:
"""Generate evaluation report"""
report = "=== RAG Pipeline Evaluation Report ===\n\n"
for metric, score in results.items():
status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
report += f" {metric}: {score:.4f} [{status}]\n"
report += f"\n Overall: {results.get('overall_score', 0):.4f}\n"
return report
Production Deployment Patterns
Async Indexing Pipeline
In production, document indexing and search serving must be separated. When new documents are added, embeddings are generated asynchronously and the index is updated.
import asyncio
from datetime import datetime
from typing import Optional
import hashlib
class AsyncIndexingPipeline:
"""Async document indexing pipeline"""
def __init__(self, embeddings, vector_store, chunker):
self.embeddings = embeddings
self.vector_store = vector_store
self.chunker = chunker
self.index_queue = asyncio.Queue()
self._processed_hashes = set()
async def enqueue_document(self, doc_id: str, content: str,
metadata: Optional[dict] = None):
"""Add document to indexing queue"""
doc_hash = hashlib.md5(content.encode()).hexdigest()
if doc_hash in self._processed_hashes:
return {"status": "skipped", "reason": "duplicate"}
await self.index_queue.put({
"doc_id": doc_id,
"content": content,
"metadata": metadata or {},
"hash": doc_hash,
"enqueued_at": datetime.utcnow().isoformat()
})
return {"status": "enqueued", "queue_size": self.index_queue.qsize()}
async def process_queue(self, batch_size: int = 10):
"""Process documents from queue in batches"""
while True:
batch = []
try:
for _ in range(batch_size):
item = self.index_queue.get_nowait()
batch.append(item)
except asyncio.QueueEmpty:
pass
if batch:
await self._process_batch(batch)
else:
await asyncio.sleep(1)
async def _process_batch(self, batch: list):
"""Batch processing: chunk -> embed -> upsert"""
all_chunks = []
all_metadata = []
for item in batch:
chunks = self.chunker.semantic_chunk(item["content"])
for chunk in chunks:
all_chunks.append(chunk)
all_metadata.append({
"doc_id": item["doc_id"],
"hash": item["hash"],
**item["metadata"]
})
# Batch embedding generation
chunk_embeddings = self.embeddings.embed_documents(all_chunks)
# Upsert to vector store
self.vector_store.upsert_documents(
documents=all_chunks,
embeddings=chunk_embeddings,
metadata=all_metadata
)
for item in batch:
self._processed_hashes.add(item["hash"])
Search Result Caching
Apply caching for frequently repeated queries to improve response time and reduce API costs.
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis
class RAGCache:
"""RAG search result cache"""
def __init__(self, redis_url: str = "redis://localhost:6379",
ttl_seconds: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl_seconds
def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
"""Generate cache key from query and filters"""
key_data = {"query": query.lower().strip(), "filters": filters or {}}
key_hash = hashlib.sha256(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()
return f"rag:cache:{key_hash}"
def get(self, query: str, filters: Optional[dict] = None):
"""Retrieve search results from cache"""
key = self._cache_key(query, filters)
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
data["cache_hit"] = True
return data
return None
def set(self, query: str, results: list,
filters: Optional[dict] = None):
"""Store search results in cache"""
key = self._cache_key(query, filters)
data = {
"results": results,
"cached_at": datetime.utcnow().isoformat(),
"query": query
}
self.redis.setex(key, self.ttl, json.dumps(data))
def invalidate_by_pattern(self, pattern: str):
"""Invalidate cache by pattern matching"""
keys = self.redis.keys(f"rag:cache:*{pattern}*")
if keys:
self.redis.delete(*keys)
Failure Cases and Recovery
Retrieval Quality Degradation Patterns
| Failure Type | Symptoms | Cause | Mitigation |
|---|---|---|---|
| Embedding Drift | Gradual accuracy degradation over time | Embedding model version change, data distribution shift | Periodic re-indexing, model version pinning |
| Chunk Mismatch | Relevant information not retrieved | Inappropriate chunk size/strategy | Chunk size experimentation, overlap adjustment |
| Filter Overfit | Increasing zero-result queries | Overly strict metadata filters | Filter fallback strategy, filter relaxation |
| Context Pollution | LLM answers with irrelevant information | Missing reranking, excessive top-k | Introduce reranking, reduce top-k |
| Latency Spike | Response time exceeds 2 seconds | Non-optimized index, network latency | Index tuning, caching, async processing |
Embedding Drift Monitoring
import numpy as np
from datetime import datetime
from typing import List
class EmbeddingDriftMonitor:
"""Embedding drift detection and monitoring"""
def __init__(self, reference_embeddings: np.ndarray):
self.reference_mean = np.mean(reference_embeddings, axis=0)
self.reference_std = np.std(reference_embeddings, axis=0)
self.drift_history = []
def check_drift(self, new_embeddings: np.ndarray,
threshold: float = 0.1) -> dict:
"""Check drift of new embeddings"""
new_mean = np.mean(new_embeddings, axis=0)
# Measure drift via cosine distance
cosine_sim = np.dot(self.reference_mean, new_mean) / (
np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
)
drift_score = 1 - cosine_sim
# Distribution comparison (KL Divergence approximation)
distribution_shift = np.mean(
np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
/ (self.reference_std + 1e-8)
)
result = {
"drift_score": float(drift_score),
"distribution_shift": float(distribution_shift),
"is_drifted": drift_score > threshold,
"timestamp": datetime.utcnow().isoformat(),
"recommendation": (
"Re-indexing required" if drift_score > threshold
else "Within normal range"
)
}
self.drift_history.append(result)
return result
Fallback Strategy for Quality Degradation
class RAGWithFallback:
"""RAG pipeline with fallback strategies"""
def __init__(self, primary_retriever, fallback_retriever,
reranker, llm):
self.primary = primary_retriever
self.fallback = fallback_retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str) -> dict:
"""Query processing with fallback"""
# Primary retrieval
primary_docs = self.primary.invoke(question)
if not primary_docs:
# Fallback 1: Relaxed filters
primary_docs = self.fallback.invoke(question)
if not primary_docs:
return {
"answer": "No relevant information found.",
"confidence": 0.0,
"source": "no_results"
}
# Reranking
reranked = self.reranker.rerank(
query=question,
documents=[d.page_content for d in primary_docs],
top_n=5
)
# Minimum relevance validation
if reranked[0]["relevance_score"] < 0.3:
return {
"answer": "Unable to generate a high-confidence answer.",
"confidence": reranked[0]["relevance_score"],
"source": "low_confidence"
}
# LLM generation
context = "\n\n".join(r["text"] for r in reranked)
answer = self.llm.invoke(
f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
).content
return {
"answer": answer,
"confidence": reranked[0]["relevance_score"],
"source": "primary",
"num_sources": len(reranked)
}
Production Checklist
Design Phase
- Benchmark embedding models for your specific use case
- Select vector DB based on scale, budget, and operational capability
- Plan A/B testing for chunking strategies
- Build evaluation dataset (minimum 100 QA pairs)
Development Phase
- Implement hybrid search (BM25 + vector)
- Integrate reranking pipeline
- Build async indexing pipeline
- Implement caching layer (Redis/Memcached)
- Implement error handling and fallback strategies
Deployment Phase
- Set quality gates based on RAGAS metrics
- Build embedding drift monitoring dashboard
- Set search latency alerts (P95 target: 500ms)
- Build automated index rebuild pipeline
- Perform load testing (2x target QPS)
Operations Phase
- Generate weekly retrieval quality reports
- Monthly embedding model update review
- Update evaluation dataset based on user feedback
- Monitor costs (embedding API calls, vector DB storage)
- Periodic index rebuilds (at least quarterly)