Skip to content
Published on

RAG Pipeline Production Guide: From Vector DB Selection to Chunking, Reranking, and Evaluation

Authors
  • Name
    Twitter
RAG Pipeline Production Guide

Introduction

RAG (Retrieval-Augmented Generation) is a critical architecture pattern for solving LLM hallucination problems and generating answers grounded in up-to-date information. While it may seem as simple as "search and paste into a prompt," production environments require numerous engineering decisions: embedding model selection, vector DB operations, chunking strategies, hybrid search, reranking, and evaluation frameworks.

This article analyzes the full RAG pipeline architecture, covering selection criteria and implementation methods for each component with production-ready code. We also examine common failure patterns in production, recovery strategies, and systematic evaluation methodologies using RAGAS.

RAG Architecture Overview

A RAG pipeline consists of three main stages: Retrieval, Augmentation, and Generation. Each stage can be optimized independently, and the overall pipeline quality is determined by its weakest stage.

End-to-End Architecture Flow

[User Query]
    |
    v
[Query Processing] <- Query decomposition, HyDE, query expansion
    |
    v
[Retrieval] <- BM25 + Vector search (hybrid)
    |
    v
[Reranking] <- Cross-encoder, Cohere Rerank
    |
    v
[Context Assembly] <- Chunk merging, deduplication, token limit
    |
    v
[Generation] <- Pass context and query to LLM
    |
    v
[Post-processing] <- Source attribution, confidence scores, hallucination check

Basic RAG Implementation

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# 1. Document chunking
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)

# 2. Embedding and vector store creation
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    collection_name="production_docs"
)

# 3. Retriever configuration
retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 5, "fetch_k": 20}
)

# 4. RAG chain assembly
template = """Answer the question based on the following context.
If the answer cannot be found in the context, respond with "Insufficient information."

Context:
{context}

Question: {question}
Answer:"""

prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

response = rag_chain.invoke("What are the key components of a RAG pipeline?")

Embedding Model Comparison

The embedding model is one of the most critical factors determining RAG pipeline retrieval quality. Here is a comparison of major embedding models as of 2025-2026.

ModelDimensionsMax TokensMTEB AverageMultilingualCostFeatures
OpenAI text-embedding-3-large3072819164.6Yes$0.13/1M tokensDimension reduction support
OpenAI text-embedding-3-small1536819162.3Yes$0.02/1M tokensCost-effective
Cohere embed-v3102451264.5Yes (100+)$0.10/1M tokensSearch-optimized
BGE-M31024819266.1Yes (100+)Free (self-hosted)Dense+Sparse+ColBERT
E5-mistral-7b-instruct40963276866.6YesFree (self-hosted)Instruction-based embedding
Jina-embeddings-v31024819265.5Yes (89)$0.02/1M tokensTask-specific LoRA

Embedding Model Selection Criteria

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List

class EmbeddingBenchmark:
    """Embedding model comparison benchmark"""

    def __init__(self, models: List[str]):
        self.models = {}
        for model_name in models:
            self.models[model_name] = SentenceTransformer(model_name)

    def evaluate_retrieval_quality(
        self,
        queries: List[str],
        relevant_docs: List[List[str]],
        corpus: List[str]
    ) -> dict:
        results = {}
        for name, model in self.models.items():
            corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
            query_embeddings = model.encode(queries, normalize_embeddings=True)

            # Cosine similarity calculation
            scores = np.dot(query_embeddings, corpus_embeddings.T)

            # Recall@k calculation
            recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
            recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)

            results[name] = {
                "recall@5": recall_at_5,
                "recall@10": recall_at_10,
                "embedding_dim": model.get_sentence_embedding_dimension(),
                "encoding_speed": self._measure_speed(model, queries)
            }
        return results

    def _compute_recall(self, scores, relevant_docs, k):
        recalls = []
        for i, rel_docs in enumerate(relevant_docs):
            top_k_indices = np.argsort(scores[i])[-k:]
            retrieved = set(top_k_indices)
            relevant = set(rel_docs)
            recalls.append(len(retrieved & relevant) / len(relevant))
        return np.mean(recalls)

    def _measure_speed(self, model, texts, n_runs=3):
        import time
        times = []
        for _ in range(n_runs):
            start = time.time()
            model.encode(texts)
            times.append(time.time() - start)
        return np.mean(times)

Vector Database Comparison

Vector database selection directly impacts the operational complexity, cost, and performance of a RAG system. Here is a multi-dimensional comparison of major vector databases.

FeaturePineconeMilvusWeaviateQdrantChroma
DeploymentFully managedSelf-hosted/Zilliz CloudSelf-hosted/CloudSelf-hosted/CloudIn-memory/Self-hosted
Max VectorsBillionsBillionsBillionsBillionsMillions
Search AlgorithmProprietaryHNSW, IVF, DiskANNHNSWHNSWHNSW
Hybrid SearchYes (Sparse+Dense)Yes (BM25+Dense)Yes (BM25+Dense)Yes (Sparse+Dense)No
Multi-tenancyYes (namespaces)Yes (partitions)Yes (tenants)Yes (collections)Yes (collections)
FilteringMetadata filtersScalar filteringGraphQL filtersPayload filtersMetadata filters
Price (1M vectors)~$70/monthFree (self-hosted)Free (self-hosted)Free (self-hosted)Free
Best ForQuick prototyping, managedLarge enterpriseGraphQL ecosystemHigh-perf filteringDev/small scale

Production Vector Search with Qdrant

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue,
    SearchParams, HnswConfigDiff
)
import uuid

class ProductionVectorStore:
    """Production vector store manager"""

    def __init__(self, url: str, api_key: str, collection_name: str):
        self.client = QdrantClient(url=url, api_key=api_key)
        self.collection_name = collection_name

    def create_collection(self, vector_size: int = 1536):
        """Create collection with optimized HNSW index"""
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=Distance.COSINE,
                on_disk=True  # Disk-based storage for large datasets
            ),
            hnsw_config=HnswConfigDiff(
                m=16,              # Graph connectivity (default: 16)
                ef_construct=128,  # Search range during index build
                full_scan_threshold=10000  # Full scan below this count
            ),
            optimizers_config={
                "indexing_threshold": 20000,
                "memmap_threshold": 50000
            }
        )

    def upsert_documents(self, documents: list, embeddings: list, metadata: list):
        """Batch upsert"""
        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding,
                payload={**meta, "text": doc}
            )
            for doc, embedding, meta in zip(documents, embeddings, metadata)
        ]

        # Upload in batches of 100
        batch_size = 100
        for i in range(0, len(points), batch_size):
            batch = points[i:i + batch_size]
            self.client.upsert(
                collection_name=self.collection_name,
                points=batch,
                wait=True
            )

    def hybrid_search(self, query_vector: list, query_text: str,
                      filters: dict = None, top_k: int = 10):
        """Hybrid search (vector + metadata filter)"""
        search_filter = None
        if filters:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in filters.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            query_filter=search_filter,
            limit=top_k,
            search_params=SearchParams(
                hnsw_ef=128,  # Search range (higher = more accurate, slower)
                exact=False   # Use approximate search
            )
        )
        return results

Chunking Strategies

Chunking is the preprocessing stage that has the greatest impact on RAG quality. Poor chunking can separate related information or include unnecessary noise, significantly degrading retrieval quality.

Chunking Strategy Comparison

StrategyProsConsBest For
Fixed-sizeSimple to implement, predictableIgnores semantic boundariesUniform text
RecursiveRespects paragraph/sentence boundariesRequires size tuningGeneral documents
SemanticPreserves semantic unitsHigh computational costUnstructured text
Document-structureMaintains heading/section hierarchyRequires parser implementationMarkdown, HTML
Parent-ChildSearch precision + sufficient context2x storage spaceTechnical documentation

Semantic Chunking Implementation

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np

class AdvancedChunker:
    """Advanced chunker supporting multiple chunking strategies"""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)

    def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
        """Semantic chunking - split at semantic change points"""
        chunker = SemanticChunker(
            self.embeddings,
            breakpoint_threshold_type="percentile",
            breakpoint_threshold_amount=breakpoint_threshold * 100
        )
        return chunker.split_text(text)

    def parent_child_chunk(self, text: str,
                            parent_size: int = 2000,
                            child_size: int = 400,
                            child_overlap: int = 50):
        """Parent-child chunking - search on children, context from parents"""
        from langchain.text_splitter import RecursiveCharacterTextSplitter

        # Create parent chunks
        parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=parent_size,
            chunk_overlap=0
        )
        parent_chunks = parent_splitter.split_text(text)

        # Create child chunks from each parent
        child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=child_size,
            chunk_overlap=child_overlap
        )

        result = []
        for i, parent in enumerate(parent_chunks):
            children = child_splitter.split_text(parent)
            for child in children:
                result.append({
                    "child_text": child,
                    "parent_text": parent,
                    "parent_id": i
                })
        return result

    def markdown_structure_chunk(self, markdown_text: str):
        """Markdown document structure-based chunking"""
        from langchain.text_splitter import MarkdownHeaderTextSplitter

        headers_to_split_on = [
            ("#", "h1"),
            ("##", "h2"),
            ("###", "h3"),
        ]

        splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on
        )
        chunks = splitter.split_text(markdown_text)

        # Add hierarchy metadata to each chunk
        enriched_chunks = []
        for chunk in chunks:
            enriched_chunks.append({
                "text": chunk.page_content,
                "metadata": chunk.metadata,
                "hierarchy": " > ".join(
                    chunk.metadata.get(h, "")
                    for h in ["h1", "h2", "h3"]
                    if chunk.metadata.get(h)
                )
            })
        return enriched_chunks

Retrieval Optimization Strategies

Simple vector similarity search alone cannot achieve production-grade retrieval quality. Various optimization techniques such as hybrid search, HyDE, and query decomposition must be combined.

Hybrid Search (BM25 + Vector)

from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple

class HybridRetriever:
    """Hybrid retriever combining BM25 + vector search"""

    def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
        self.documents = documents
        self.embeddings = OpenAIEmbeddings(model=embeddings_model)

        # Build BM25 index
        tokenized_docs = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)

        # Build vector index
        self.doc_embeddings = np.array(
            self.embeddings.embed_documents(documents)
        )

    def search(self, query: str, top_k: int = 5,
               alpha: float = 0.5) -> List[Tuple[str, float]]:
        """
        Perform hybrid search
        alpha: vector search weight (0=BM25 only, 1=vector only)
        """
        # BM25 scores
        bm25_scores = self.bm25.get_scores(query.lower().split())
        bm25_scores = self._normalize_scores(bm25_scores)

        # Vector similarity scores
        query_embedding = np.array(self.embeddings.embed_query(query))
        vector_scores = np.dot(self.doc_embeddings, query_embedding)
        vector_scores = self._normalize_scores(vector_scores)

        # Weighted combination (Reciprocal Rank Fusion alternative)
        combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores

        # Return top-k results
        top_indices = np.argsort(combined_scores)[-top_k:][::-1]
        return [
            (self.documents[i], combined_scores[i])
            for i in top_indices
        ]

    def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
        """Min-Max normalization"""
        min_s, max_s = scores.min(), scores.max()
        if max_s == min_s:
            return np.zeros_like(scores)
        return (scores - min_s) / (max_s - min_s)

HyDE (Hypothetical Document Embedding)

HyDE generates a hypothetical answer to the query first, then uses that answer's embedding for retrieval. This bridges the semantic gap between questions and answers, improving retrieval quality.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate

class HyDERetriever:
    """HyDE-based retriever"""

    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

        self.hyde_prompt = ChatPromptTemplate.from_template(
            "Write a hypothetical answer to the following question. "
            "It is more important to include relevant keywords and concepts "
            "than to be factually accurate.\n\n"
            "Question: {question}\n\n"
            "Hypothetical answer:"
        )

    def retrieve(self, query: str, top_k: int = 5):
        """Perform HyDE retrieval"""
        # 1. Generate hypothetical answer
        chain = self.hyde_prompt | self.llm
        hypothetical_answer = chain.invoke({"question": query}).content

        # 2. Create embedding from hypothetical answer
        hyde_embedding = self.embeddings.embed_query(hypothetical_answer)

        # 3. Search using hypothetical answer embedding
        results = self.vectorstore.similarity_search_by_vector(
            hyde_embedding, k=top_k
        )
        return results

Query Decomposition

This strategy breaks complex questions into sub-questions, retrieves results for each, then aggregates them.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json

class QueryDecomposer:
    """Decompose complex queries into sub-queries"""

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.decompose_prompt = ChatPromptTemplate.from_template(
            "Decompose the following question into 2-4 sub-questions "
            "optimized for retrieval.\n"
            "Return as a JSON array.\n\n"
            "Original question: {question}\n\n"
            "Sub-questions:"
        )

    def decompose(self, question: str) -> list:
        """Decompose query"""
        chain = self.decompose_prompt | self.llm
        result = chain.invoke({"question": question}).content

        try:
            sub_queries = json.loads(result)
        except json.JSONDecodeError:
            sub_queries = [question]

        return sub_queries

    def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
        """Retrieve with decomposed queries and merge results"""
        sub_queries = self.decompose(question)

        all_docs = []
        seen_contents = set()

        for sub_query in sub_queries:
            docs = retriever.invoke(sub_query)[:top_k]
            for doc in docs:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    all_docs.append(doc)

        return all_docs

Reranking

Reranking re-evaluates initial search results with a more precise model and reorders them. Cross-encoder-based reranking provides more accurate relevance judgments than bi-encoder-based vector search.

Using Cohere Rerank

import cohere
from typing import List, Dict

class CohereReranker:
    """Reranking using Cohere Rerank API"""

    def __init__(self, api_key: str, model: str = "rerank-v3.5"):
        self.client = cohere.Client(api_key)
        self.model = model

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """Rerank documents"""
        response = self.client.rerank(
            model=self.model,
            query=query,
            documents=documents,
            top_n=top_n,
            return_documents=True
        )

        results = []
        for hit in response.results:
            results.append({
                "text": hit.document.text,
                "relevance_score": hit.relevance_score,
                "index": hit.index
            })
        return results


class CrossEncoderReranker:
    """Local cross-encoder model-based reranking"""

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """Rerank with cross-encoder"""
        pairs = [[query, doc] for doc in documents]
        scores = self.model.predict(pairs)

        # Sort by score
        scored_docs = list(zip(documents, scores, range(len(documents))))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        results = []
        for text, score, idx in scored_docs[:top_n]:
            results.append({
                "text": text,
                "relevance_score": float(score),
                "index": idx
            })
        return results

Integrated Reranking Pipeline

class RAGPipelineWithReranking:
    """RAG pipeline with integrated reranking"""

    def __init__(self, retriever, reranker, llm):
        self.retriever = retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str, top_k_retrieve: int = 20,
              top_k_rerank: int = 5) -> str:
        """Retrieve -> Rerank -> Generate"""
        # Stage 1: Initial retrieval (broad)
        initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
        doc_texts = [doc.page_content for doc in initial_docs]

        # Stage 2: Reranking (precise)
        reranked = self.reranker.rerank(
            query=question,
            documents=doc_texts,
            top_n=top_k_rerank
        )

        # Stage 3: Context assembly
        context = "\n\n---\n\n".join(
            r["text"] for r in reranked
        )

        # Stage 4: LLM generation
        prompt = f"""Answer the question based on the following context.
Cite the source of each piece of information.

Context:
{context}

Question: {question}
Answer:"""

        response = self.llm.invoke(prompt)
        return response.content

RAG Evaluation with RAGAS

RAGAS (Retrieval Augmented Generation Assessment) is a framework for automatically evaluating RAG pipelines. It measures retrieval quality and generation quality with separate metrics.

Core RAGAS Metrics

MetricMeasuresDescriptionTarget
Context PrecisionRetrievalProportion of retrieved contexts that are relevant0.8+
Context RecallRetrievalProportion of needed information included in context0.9+
FaithfulnessGenerationDegree to which the answer is grounded in context0.9+
Answer RelevancyGenerationHow well the answer addresses the question0.8+
Answer CorrectnessOverallAccuracy compared to ground truth0.7+

RAGAS Evaluation Implementation

from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    answer_correctness
)
from datasets import Dataset

class RAGEvaluator:
    """RAGAS-based RAG evaluator"""

    def __init__(self):
        self.metrics = [
            context_precision,
            context_recall,
            faithfulness,
            answer_relevancy,
            answer_correctness
        ]

    def create_eval_dataset(self, eval_samples: list) -> Dataset:
        """Create evaluation dataset"""
        data = {
            "question": [],
            "answer": [],
            "contexts": [],
            "ground_truth": []
        }

        for sample in eval_samples:
            data["question"].append(sample["question"])
            data["answer"].append(sample["generated_answer"])
            data["contexts"].append(sample["retrieved_contexts"])
            data["ground_truth"].append(sample["expected_answer"])

        return Dataset.from_dict(data)

    def evaluate(self, eval_samples: list) -> dict:
        """Perform RAG pipeline evaluation"""
        dataset = self.create_eval_dataset(eval_samples)

        result = evaluate(
            dataset=dataset,
            metrics=self.metrics
        )

        return {
            "context_precision": result["context_precision"],
            "context_recall": result["context_recall"],
            "faithfulness": result["faithfulness"],
            "answer_relevancy": result["answer_relevancy"],
            "answer_correctness": result["answer_correctness"],
            "overall_score": sum(result.values()) / len(result)
        }

    def generate_report(self, results: dict) -> str:
        """Generate evaluation report"""
        report = "=== RAG Pipeline Evaluation Report ===\n\n"

        for metric, score in results.items():
            status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
            report += f"  {metric}: {score:.4f} [{status}]\n"

        report += f"\n  Overall: {results.get('overall_score', 0):.4f}\n"
        return report

Production Deployment Patterns

Async Indexing Pipeline

In production, document indexing and search serving must be separated. When new documents are added, embeddings are generated asynchronously and the index is updated.

import asyncio
from datetime import datetime
from typing import Optional
import hashlib

class AsyncIndexingPipeline:
    """Async document indexing pipeline"""

    def __init__(self, embeddings, vector_store, chunker):
        self.embeddings = embeddings
        self.vector_store = vector_store
        self.chunker = chunker
        self.index_queue = asyncio.Queue()
        self._processed_hashes = set()

    async def enqueue_document(self, doc_id: str, content: str,
                                metadata: Optional[dict] = None):
        """Add document to indexing queue"""
        doc_hash = hashlib.md5(content.encode()).hexdigest()

        if doc_hash in self._processed_hashes:
            return {"status": "skipped", "reason": "duplicate"}

        await self.index_queue.put({
            "doc_id": doc_id,
            "content": content,
            "metadata": metadata or {},
            "hash": doc_hash,
            "enqueued_at": datetime.utcnow().isoformat()
        })
        return {"status": "enqueued", "queue_size": self.index_queue.qsize()}

    async def process_queue(self, batch_size: int = 10):
        """Process documents from queue in batches"""
        while True:
            batch = []
            try:
                for _ in range(batch_size):
                    item = self.index_queue.get_nowait()
                    batch.append(item)
            except asyncio.QueueEmpty:
                pass

            if batch:
                await self._process_batch(batch)
            else:
                await asyncio.sleep(1)

    async def _process_batch(self, batch: list):
        """Batch processing: chunk -> embed -> upsert"""
        all_chunks = []
        all_metadata = []

        for item in batch:
            chunks = self.chunker.semantic_chunk(item["content"])
            for chunk in chunks:
                all_chunks.append(chunk)
                all_metadata.append({
                    "doc_id": item["doc_id"],
                    "hash": item["hash"],
                    **item["metadata"]
                })

        # Batch embedding generation
        chunk_embeddings = self.embeddings.embed_documents(all_chunks)

        # Upsert to vector store
        self.vector_store.upsert_documents(
            documents=all_chunks,
            embeddings=chunk_embeddings,
            metadata=all_metadata
        )

        for item in batch:
            self._processed_hashes.add(item["hash"])

Search Result Caching

Apply caching for frequently repeated queries to improve response time and reduce API costs.

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis

class RAGCache:
    """RAG search result cache"""

    def __init__(self, redis_url: str = "redis://localhost:6379",
                 ttl_seconds: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl_seconds

    def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
        """Generate cache key from query and filters"""
        key_data = {"query": query.lower().strip(), "filters": filters or {}}
        key_hash = hashlib.sha256(
            json.dumps(key_data, sort_keys=True).encode()
        ).hexdigest()
        return f"rag:cache:{key_hash}"

    def get(self, query: str, filters: Optional[dict] = None):
        """Retrieve search results from cache"""
        key = self._cache_key(query, filters)
        cached = self.redis.get(key)

        if cached:
            data = json.loads(cached)
            data["cache_hit"] = True
            return data
        return None

    def set(self, query: str, results: list,
            filters: Optional[dict] = None):
        """Store search results in cache"""
        key = self._cache_key(query, filters)
        data = {
            "results": results,
            "cached_at": datetime.utcnow().isoformat(),
            "query": query
        }
        self.redis.setex(key, self.ttl, json.dumps(data))

    def invalidate_by_pattern(self, pattern: str):
        """Invalidate cache by pattern matching"""
        keys = self.redis.keys(f"rag:cache:*{pattern}*")
        if keys:
            self.redis.delete(*keys)

Failure Cases and Recovery

Retrieval Quality Degradation Patterns

Failure TypeSymptomsCauseMitigation
Embedding DriftGradual accuracy degradation over timeEmbedding model version change, data distribution shiftPeriodic re-indexing, model version pinning
Chunk MismatchRelevant information not retrievedInappropriate chunk size/strategyChunk size experimentation, overlap adjustment
Filter OverfitIncreasing zero-result queriesOverly strict metadata filtersFilter fallback strategy, filter relaxation
Context PollutionLLM answers with irrelevant informationMissing reranking, excessive top-kIntroduce reranking, reduce top-k
Latency SpikeResponse time exceeds 2 secondsNon-optimized index, network latencyIndex tuning, caching, async processing

Embedding Drift Monitoring

import numpy as np
from datetime import datetime
from typing import List

class EmbeddingDriftMonitor:
    """Embedding drift detection and monitoring"""

    def __init__(self, reference_embeddings: np.ndarray):
        self.reference_mean = np.mean(reference_embeddings, axis=0)
        self.reference_std = np.std(reference_embeddings, axis=0)
        self.drift_history = []

    def check_drift(self, new_embeddings: np.ndarray,
                     threshold: float = 0.1) -> dict:
        """Check drift of new embeddings"""
        new_mean = np.mean(new_embeddings, axis=0)

        # Measure drift via cosine distance
        cosine_sim = np.dot(self.reference_mean, new_mean) / (
            np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
        )
        drift_score = 1 - cosine_sim

        # Distribution comparison (KL Divergence approximation)
        distribution_shift = np.mean(
            np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
            / (self.reference_std + 1e-8)
        )

        result = {
            "drift_score": float(drift_score),
            "distribution_shift": float(distribution_shift),
            "is_drifted": drift_score > threshold,
            "timestamp": datetime.utcnow().isoformat(),
            "recommendation": (
                "Re-indexing required" if drift_score > threshold
                else "Within normal range"
            )
        }

        self.drift_history.append(result)
        return result

Fallback Strategy for Quality Degradation

class RAGWithFallback:
    """RAG pipeline with fallback strategies"""

    def __init__(self, primary_retriever, fallback_retriever,
                 reranker, llm):
        self.primary = primary_retriever
        self.fallback = fallback_retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str) -> dict:
        """Query processing with fallback"""
        # Primary retrieval
        primary_docs = self.primary.invoke(question)

        if not primary_docs:
            # Fallback 1: Relaxed filters
            primary_docs = self.fallback.invoke(question)

        if not primary_docs:
            return {
                "answer": "No relevant information found.",
                "confidence": 0.0,
                "source": "no_results"
            }

        # Reranking
        reranked = self.reranker.rerank(
            query=question,
            documents=[d.page_content for d in primary_docs],
            top_n=5
        )

        # Minimum relevance validation
        if reranked[0]["relevance_score"] < 0.3:
            return {
                "answer": "Unable to generate a high-confidence answer.",
                "confidence": reranked[0]["relevance_score"],
                "source": "low_confidence"
            }

        # LLM generation
        context = "\n\n".join(r["text"] for r in reranked)
        answer = self.llm.invoke(
            f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
        ).content

        return {
            "answer": answer,
            "confidence": reranked[0]["relevance_score"],
            "source": "primary",
            "num_sources": len(reranked)
        }

Production Checklist

Design Phase

  • Benchmark embedding models for your specific use case
  • Select vector DB based on scale, budget, and operational capability
  • Plan A/B testing for chunking strategies
  • Build evaluation dataset (minimum 100 QA pairs)

Development Phase

  • Implement hybrid search (BM25 + vector)
  • Integrate reranking pipeline
  • Build async indexing pipeline
  • Implement caching layer (Redis/Memcached)
  • Implement error handling and fallback strategies

Deployment Phase

  • Set quality gates based on RAGAS metrics
  • Build embedding drift monitoring dashboard
  • Set search latency alerts (P95 target: 500ms)
  • Build automated index rebuild pipeline
  • Perform load testing (2x target QPS)

Operations Phase

  • Generate weekly retrieval quality reports
  • Monthly embedding model update review
  • Update evaluation dataset based on user feedback
  • Monitor costs (embedding API calls, vector DB storage)
  • Periodic index rebuilds (at least quarterly)

References