Skip to content

Split View: RAG 파이프라인 프로덕션 구축 가이드: 벡터 DB 선택부터 청킹·리랭킹·평가까지

✨ Learn with Quiz
|

RAG 파이프라인 프로덕션 구축 가이드: 벡터 DB 선택부터 청킹·리랭킹·평가까지

RAG Pipeline Production Guide

들어가며

RAG(Retrieval-Augmented Generation)는 LLM의 환각(hallucination) 문제를 해결하고 최신 정보를 활용한 답변을 생성하기 위한 핵심 아키텍처 패턴이다. 단순히 "검색해서 프롬프트에 넣기"로 보일 수 있지만, 프로덕션 환경에서는 임베딩 모델 선택, 벡터 DB 운영, 청킹 전략, 하이브리드 검색, 리랭킹, 평가 체계까지 수많은 엔지니어링 결정이 필요하다.

이 글에서는 RAG 파이프라인의 전체 아키텍처를 분석하고, 각 구성 요소의 선택 기준과 구현 방법을 실전 코드와 함께 다룬다. 특히 프로덕션에서 자주 발생하는 장애 패턴과 그 대응 전략, 그리고 RAGAS를 활용한 체계적 평가 방법론까지 포괄적으로 살펴본다.

RAG 아키텍처 개요

RAG 파이프라인은 크게 세 단계로 구성된다: Retrieval(검색), Augmentation(증강), Generation(생성). 각 단계는 독립적으로 최적화할 수 있으며, 전체 파이프라인의 품질은 가장 약한 단계에 의해 결정된다.

전체 아키텍처 흐름

[사용자 질의]
[Query Processing] ← 쿼리 분해, HyDE, 쿼리 확장
[Retrieval] ← BM25 + 벡터 검색 (하이브리드)
[Reranking] ← Cross-encoder, Cohere Rerank
[Context Assembly] ← 청크 병합, 중복 제거, 토큰 제한
[Generation] ← LLM에 컨텍스트와 질의 전달
[Post-processing] ← 출처 표시, 신뢰도 점수, 환각 검증

기본 RAG 구현

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# 1. 문서 청킹
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)

# 2. 임베딩 및 벡터 저장소 생성
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    collection_name="production_docs"
)

# 3. Retriever 설정
retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 5, "fetch_k": 20}
)

# 4. RAG 체인 구성
template = """다음 컨텍스트를 기반으로 질문에 답변하세요.
컨텍스트에서 답을 찾을 수 없다면 "정보가 부족합니다"라고 답하세요.

컨텍스트:
{context}

질문: {question}
답변:"""

prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

response = rag_chain.invoke("RAG 파이프라인의 핵심 구성요소는?")

임베딩 모델 비교

임베딩 모델은 RAG 파이프라인의 검색 품질을 결정하는 가장 중요한 요소 중 하나다. 2025-2026년 현재 주요 임베딩 모델들의 특성을 비교한다.

모델차원최대 토큰MTEB 평균다국어비용특징
OpenAI text-embedding-3-large3072819164.6O$0.13/1M tokens차원 축소 가능
OpenAI text-embedding-3-small1536819162.3O$0.02/1M tokens비용 효율적
Cohere embed-v3102451264.5O (100+)$0.10/1M tokens검색 특화
BGE-M31024819266.1O (100+)무료 (자체 호스팅)Dense+Sparse+ColBERT
E5-mistral-7b-instruct40963276866.6O무료 (자체 호스팅)지시 기반 임베딩
Jina-embeddings-v31024819265.5O (89)$0.02/1M tokensTask-specific LoRA

임베딩 모델 선택 기준

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List

class EmbeddingBenchmark:
    """임베딩 모델 비교 벤치마크"""

    def __init__(self, models: List[str]):
        self.models = {}
        for model_name in models:
            self.models[model_name] = SentenceTransformer(model_name)

    def evaluate_retrieval_quality(
        self,
        queries: List[str],
        relevant_docs: List[List[str]],
        corpus: List[str]
    ) -> dict:
        results = {}
        for name, model in self.models.items():
            corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
            query_embeddings = model.encode(queries, normalize_embeddings=True)

            # 코사인 유사도 계산
            scores = np.dot(query_embeddings, corpus_embeddings.T)

            # Recall@k 계산
            recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
            recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)

            results[name] = {
                "recall@5": recall_at_5,
                "recall@10": recall_at_10,
                "embedding_dim": model.get_sentence_embedding_dimension(),
                "encoding_speed": self._measure_speed(model, queries)
            }
        return results

    def _compute_recall(self, scores, relevant_docs, k):
        recalls = []
        for i, rel_docs in enumerate(relevant_docs):
            top_k_indices = np.argsort(scores[i])[-k:]
            retrieved = set(top_k_indices)
            relevant = set(rel_docs)
            recalls.append(len(retrieved & relevant) / len(relevant))
        return np.mean(recalls)

    def _measure_speed(self, model, texts, n_runs=3):
        import time
        times = []
        for _ in range(n_runs):
            start = time.time()
            model.encode(texts)
            times.append(time.time() - start)
        return np.mean(times)

벡터 데이터베이스 비교

벡터 데이터베이스 선택은 RAG 시스템의 운영 복잡도, 비용, 성능에 직접적인 영향을 미친다. 주요 벡터 DB를 다각도로 비교한다.

항목PineconeMilvusWeaviateQdrantChroma
배포 방식완전 관리형자체 호스팅/Zilliz Cloud자체 호스팅/Cloud자체 호스팅/Cloud인메모리/자체 호스팅
최대 벡터 수수십억수십억수십억수십억수백만
검색 알고리즘자체 엔진HNSW, IVF, DiskANNHNSWHNSWHNSW
하이브리드 검색O (Sparse+Dense)O (BM25+Dense)O (BM25+Dense)O (Sparse+Dense)X
멀티테넌시O (네임스페이스)O (파티션)O (테넌트)O (컬렉션)O (컬렉션)
필터링메타데이터 필터스칼라 필터링GraphQL 필터페이로드 필터메타데이터 필터
가격 (100만 벡터)약 $70/월무료 (자체 호스팅)무료 (자체 호스팅)무료 (자체 호스팅)무료
적합 용도빠른 프로토타입, 관리형 선호대규모 엔터프라이즈GraphQL 생태계고성능 필터링개발/소규모

Qdrant를 활용한 프로덕션 벡터 검색

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue,
    SearchParams, HnswConfigDiff
)
import uuid

class ProductionVectorStore:
    """프로덕션 벡터 저장소 관리자"""

    def __init__(self, url: str, api_key: str, collection_name: str):
        self.client = QdrantClient(url=url, api_key=api_key)
        self.collection_name = collection_name

    def create_collection(self, vector_size: int = 1536):
        """HNSW 인덱스 최적화된 컬렉션 생성"""
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=Distance.COSINE,
                on_disk=True  # 대규모 데이터셋을 위한 디스크 기반 저장
            ),
            hnsw_config=HnswConfigDiff(
                m=16,              # 그래프 연결 수 (기본: 16)
                ef_construct=128,  # 인덱스 빌드 시 탐색 범위
                full_scan_threshold=10000  # 이 수 이하면 전체 스캔
            ),
            optimizers_config={
                "indexing_threshold": 20000,
                "memmap_threshold": 50000
            }
        )

    def upsert_documents(self, documents: list, embeddings: list, metadata: list):
        """배치 업서트"""
        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding,
                payload={**meta, "text": doc}
            )
            for doc, embedding, meta in zip(documents, embeddings, metadata)
        ]

        # 배치 크기 100으로 분할 업로드
        batch_size = 100
        for i in range(0, len(points), batch_size):
            batch = points[i:i + batch_size]
            self.client.upsert(
                collection_name=self.collection_name,
                points=batch,
                wait=True
            )

    def hybrid_search(self, query_vector: list, query_text: str,
                      filters: dict = None, top_k: int = 10):
        """하이브리드 검색 (벡터 + 메타데이터 필터)"""
        search_filter = None
        if filters:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in filters.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            query_filter=search_filter,
            limit=top_k,
            search_params=SearchParams(
                hnsw_ef=128,  # 검색 시 탐색 범위 (높을수록 정확, 느림)
                exact=False   # 근사 검색 사용
            )
        )
        return results

청킹 전략

청킹은 RAG 품질에 가장 큰 영향을 미치는 전처리 단계다. 잘못된 청킹은 관련 정보를 분리시키거나, 불필요한 노이즈를 포함시켜 검색 품질을 크게 저하시킨다.

청킹 전략 비교

전략장점단점적합 용도
고정 크기 (Fixed-size)구현 간단, 예측 가능의미 단위 무시균일한 텍스트
재귀적 (Recursive)문단/문장 경계 존중최적 크기 튜닝 필요범용 문서
시맨틱 (Semantic)의미 단위 보존계산 비용 높음구조 없는 텍스트
문서 구조 기반제목/섹션 계층 유지파서 구현 필요마크다운, HTML
부모-자식 (Parent-Child)검색 정밀도 + 충분한 맥락저장 공간 2배기술 문서

시맨틱 청킹 구현

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np

class AdvancedChunker:
    """다양한 청킹 전략을 지원하는 고급 청커"""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)

    def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
        """시맨틱 청킹 - 의미 변화 지점에서 분할"""
        chunker = SemanticChunker(
            self.embeddings,
            breakpoint_threshold_type="percentile",
            breakpoint_threshold_amount=breakpoint_threshold * 100
        )
        return chunker.split_text(text)

    def parent_child_chunk(self, text: str,
                            parent_size: int = 2000,
                            child_size: int = 400,
                            child_overlap: int = 50):
        """부모-자식 청킹 - 검색은 자식, 컨텍스트는 부모"""
        from langchain.text_splitter import RecursiveCharacterTextSplitter

        # 부모 청크 생성
        parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=parent_size,
            chunk_overlap=0
        )
        parent_chunks = parent_splitter.split_text(text)

        # 각 부모에서 자식 청크 생성
        child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=child_size,
            chunk_overlap=child_overlap
        )

        result = []
        for i, parent in enumerate(parent_chunks):
            children = child_splitter.split_text(parent)
            for child in children:
                result.append({
                    "child_text": child,
                    "parent_text": parent,
                    "parent_id": i
                })
        return result

    def markdown_structure_chunk(self, markdown_text: str):
        """마크다운 문서 구조 기반 청킹"""
        from langchain.text_splitter import MarkdownHeaderTextSplitter

        headers_to_split_on = [
            ("#", "h1"),
            ("##", "h2"),
            ("###", "h3"),
        ]

        splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on
        )
        chunks = splitter.split_text(markdown_text)

        # 각 청크에 계층 구조 메타데이터 추가
        enriched_chunks = []
        for chunk in chunks:
            enriched_chunks.append({
                "text": chunk.page_content,
                "metadata": chunk.metadata,
                "hierarchy": " > ".join(
                    chunk.metadata.get(h, "")
                    for h in ["h1", "h2", "h3"]
                    if chunk.metadata.get(h)
                )
            })
        return enriched_chunks

검색 최적화 전략

단순 벡터 유사도 검색만으로는 프로덕션 수준의 검색 품질을 달성하기 어렵다. 하이브리드 검색, HyDE, 쿼리 분해 등 다양한 최적화 기법을 조합해야 한다.

하이브리드 검색 (BM25 + 벡터)

from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple

class HybridRetriever:
    """BM25 + 벡터 검색을 결합한 하이브리드 검색기"""

    def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
        self.documents = documents
        self.embeddings = OpenAIEmbeddings(model=embeddings_model)

        # BM25 인덱스 구축
        tokenized_docs = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)

        # 벡터 인덱스 구축
        self.doc_embeddings = np.array(
            self.embeddings.embed_documents(documents)
        )

    def search(self, query: str, top_k: int = 5,
               alpha: float = 0.5) -> List[Tuple[str, float]]:
        """
        하이브리드 검색 수행
        alpha: 벡터 검색 가중치 (0=BM25만, 1=벡터만)
        """
        # BM25 점수
        bm25_scores = self.bm25.get_scores(query.lower().split())
        bm25_scores = self._normalize_scores(bm25_scores)

        # 벡터 유사도 점수
        query_embedding = np.array(self.embeddings.embed_query(query))
        vector_scores = np.dot(self.doc_embeddings, query_embedding)
        vector_scores = self._normalize_scores(vector_scores)

        # 가중 결합 (Reciprocal Rank Fusion 대안)
        combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores

        # 상위 k개 반환
        top_indices = np.argsort(combined_scores)[-top_k:][::-1]
        return [
            (self.documents[i], combined_scores[i])
            for i in top_indices
        ]

    def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
        """Min-Max 정규화"""
        min_s, max_s = scores.min(), scores.max()
        if max_s == min_s:
            return np.zeros_like(scores)
        return (scores - min_s) / (max_s - min_s)

HyDE (Hypothetical Document Embedding)

HyDE는 쿼리에 대한 가상의 답변을 먼저 생성한 후, 그 답변의 임베딩을 검색에 사용하는 기법이다. 질문과 답변 사이의 의미적 차이를 줄여 검색 품질을 향상시킨다.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate

class HyDERetriever:
    """HyDE 기반 검색기"""

    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

        self.hyde_prompt = ChatPromptTemplate.from_template(
            "다음 질문에 대해 가상의 답변을 작성해주세요. "
            "실제 정확성보다는 관련 키워드와 개념을 포함하는 것이 중요합니다.\n\n"
            "질문: {question}\n\n"
            "가상 답변:"
        )

    def retrieve(self, query: str, top_k: int = 5):
        """HyDE 검색 수행"""
        # 1. 가상 답변 생성
        chain = self.hyde_prompt | self.llm
        hypothetical_answer = chain.invoke({"question": query}).content

        # 2. 가상 답변의 임베딩 생성
        hyde_embedding = self.embeddings.embed_query(hypothetical_answer)

        # 3. 가상 답변 임베딩으로 검색
        results = self.vectorstore.similarity_search_by_vector(
            hyde_embedding, k=top_k
        )
        return results

쿼리 분해 (Query Decomposition)

복잡한 질문을 하위 질문들로 분해하여 각각 검색한 후 결과를 종합하는 전략이다.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json

class QueryDecomposer:
    """복잡한 쿼리를 하위 쿼리로 분해"""

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.decompose_prompt = ChatPromptTemplate.from_template(
            "다음 질문을 검색에 최적화된 2-4개의 하위 질문으로 분해하세요.\n"
            "JSON 배열 형태로 반환하세요.\n\n"
            "원래 질문: {question}\n\n"
            "하위 질문들:"
        )

    def decompose(self, question: str) -> list:
        """쿼리 분해"""
        chain = self.decompose_prompt | self.llm
        result = chain.invoke({"question": question}).content

        try:
            sub_queries = json.loads(result)
        except json.JSONDecodeError:
            sub_queries = [question]

        return sub_queries

    def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
        """분해된 쿼리로 검색 후 결과 병합"""
        sub_queries = self.decompose(question)

        all_docs = []
        seen_contents = set()

        for sub_query in sub_queries:
            docs = retriever.invoke(sub_query)[:top_k]
            for doc in docs:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    all_docs.append(doc)

        return all_docs

리랭킹 (Reranking)

초기 검색 결과를 더 정밀한 모델로 재평가하여 순위를 재정렬하는 단계다. Bi-encoder 기반의 벡터 검색보다 Cross-encoder 기반 리랭킹이 더 정확한 관련성 판단을 수행한다.

Cohere Rerank 활용

import cohere
from typing import List, Dict

class CohereReranker:
    """Cohere Rerank API를 활용한 리랭킹"""

    def __init__(self, api_key: str, model: str = "rerank-v3.5"):
        self.client = cohere.Client(api_key)
        self.model = model

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """문서 리랭킹"""
        response = self.client.rerank(
            model=self.model,
            query=query,
            documents=documents,
            top_n=top_n,
            return_documents=True
        )

        results = []
        for hit in response.results:
            results.append({
                "text": hit.document.text,
                "relevance_score": hit.relevance_score,
                "index": hit.index
            })
        return results


class CrossEncoderReranker:
    """로컬 Cross-Encoder 모델 기반 리랭킹"""

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """Cross-encoder로 리랭킹"""
        pairs = [[query, doc] for doc in documents]
        scores = self.model.predict(pairs)

        # 점수 기준 정렬
        scored_docs = list(zip(documents, scores, range(len(documents))))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        results = []
        for text, score, idx in scored_docs[:top_n]:
            results.append({
                "text": text,
                "relevance_score": float(score),
                "index": idx
            })
        return results

리랭킹 파이프라인 통합

class RAGPipelineWithReranking:
    """리랭킹이 통합된 RAG 파이프라인"""

    def __init__(self, retriever, reranker, llm):
        self.retriever = retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str, top_k_retrieve: int = 20,
              top_k_rerank: int = 5) -> str:
        """검색 -> 리랭킹 -> 생성"""
        # 1단계: 초기 검색 (넓게)
        initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
        doc_texts = [doc.page_content for doc in initial_docs]

        # 2단계: 리랭킹 (정밀)
        reranked = self.reranker.rerank(
            query=question,
            documents=doc_texts,
            top_n=top_k_rerank
        )

        # 3단계: 컨텍스트 조립
        context = "\n\n---\n\n".join(
            r["text"] for r in reranked
        )

        # 4단계: LLM 생성
        prompt = f"""다음 컨텍스트를 기반으로 질문에 답변하세요.
각 정보의 출처를 표시하세요.

컨텍스트:
{context}

질문: {question}
답변:"""

        response = self.llm.invoke(prompt)
        return response.content

RAGAS를 활용한 RAG 평가

RAGAS(Retrieval Augmented Generation Assessment)는 RAG 파이프라인을 자동으로 평가하는 프레임워크다. 검색 품질과 생성 품질을 별도의 메트릭으로 측정한다.

RAGAS 핵심 메트릭

메트릭측정 대상설명이상적 값
Context Precision검색검색된 컨텍스트 중 관련 있는 비율0.8 이상
Context Recall검색필요한 정보가 컨텍스트에 포함된 비율0.9 이상
Faithfulness생성답변이 컨텍스트에 근거한 정도0.9 이상
Answer Relevancy생성답변이 질문에 적합한 정도0.8 이상
Answer Correctness전체정답과 비교한 정확도0.7 이상

RAGAS 평가 구현

from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    answer_correctness
)
from datasets import Dataset

class RAGEvaluator:
    """RAGAS 기반 RAG 평가기"""

    def __init__(self):
        self.metrics = [
            context_precision,
            context_recall,
            faithfulness,
            answer_relevancy,
            answer_correctness
        ]

    def create_eval_dataset(self, eval_samples: list) -> Dataset:
        """평가 데이터셋 생성"""
        data = {
            "question": [],
            "answer": [],
            "contexts": [],
            "ground_truth": []
        }

        for sample in eval_samples:
            data["question"].append(sample["question"])
            data["answer"].append(sample["generated_answer"])
            data["contexts"].append(sample["retrieved_contexts"])
            data["ground_truth"].append(sample["expected_answer"])

        return Dataset.from_dict(data)

    def evaluate(self, eval_samples: list) -> dict:
        """RAG 파이프라인 평가 수행"""
        dataset = self.create_eval_dataset(eval_samples)

        result = evaluate(
            dataset=dataset,
            metrics=self.metrics
        )

        return {
            "context_precision": result["context_precision"],
            "context_recall": result["context_recall"],
            "faithfulness": result["faithfulness"],
            "answer_relevancy": result["answer_relevancy"],
            "answer_correctness": result["answer_correctness"],
            "overall_score": sum(result.values()) / len(result)
        }

    def generate_report(self, results: dict) -> str:
        """평가 리포트 생성"""
        report = "=== RAG Pipeline Evaluation Report ===\n\n"

        for metric, score in results.items():
            status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
            report += f"  {metric}: {score:.4f} [{status}]\n"

        report += f"\n  Overall: {results.get('overall_score', 0):.4f}\n"
        return report

프로덕션 배포 패턴

비동기 인덱싱 파이프라인

프로덕션 환경에서는 문서 인덱싱과 검색 서빙을 분리해야 한다. 새 문서가 추가될 때 비동기로 임베딩을 생성하고 인덱스를 업데이트한다.

import asyncio
from datetime import datetime
from typing import Optional
import hashlib

class AsyncIndexingPipeline:
    """비동기 문서 인덱싱 파이프라인"""

    def __init__(self, embeddings, vector_store, chunker):
        self.embeddings = embeddings
        self.vector_store = vector_store
        self.chunker = chunker
        self.index_queue = asyncio.Queue()
        self._processed_hashes = set()

    async def enqueue_document(self, doc_id: str, content: str,
                                metadata: Optional[dict] = None):
        """문서를 인덱싱 큐에 추가"""
        doc_hash = hashlib.md5(content.encode()).hexdigest()

        if doc_hash in self._processed_hashes:
            return {"status": "skipped", "reason": "duplicate"}

        await self.index_queue.put({
            "doc_id": doc_id,
            "content": content,
            "metadata": metadata or {},
            "hash": doc_hash,
            "enqueued_at": datetime.utcnow().isoformat()
        })
        return {"status": "enqueued", "queue_size": self.index_queue.qsize()}

    async def process_queue(self, batch_size: int = 10):
        """큐에서 배치 단위로 문서 처리"""
        while True:
            batch = []
            try:
                for _ in range(batch_size):
                    item = self.index_queue.get_nowait()
                    batch.append(item)
            except asyncio.QueueEmpty:
                pass

            if batch:
                await self._process_batch(batch)
            else:
                await asyncio.sleep(1)

    async def _process_batch(self, batch: list):
        """배치 처리: 청킹 -> 임베딩 -> 업서트"""
        all_chunks = []
        all_metadata = []

        for item in batch:
            chunks = self.chunker.semantic_chunk(item["content"])
            for chunk in chunks:
                all_chunks.append(chunk)
                all_metadata.append({
                    "doc_id": item["doc_id"],
                    "hash": item["hash"],
                    **item["metadata"]
                })

        # 배치 임베딩 생성
        chunk_embeddings = self.embeddings.embed_documents(all_chunks)

        # 벡터 저장소에 업서트
        self.vector_store.upsert_documents(
            documents=all_chunks,
            embeddings=chunk_embeddings,
            metadata=all_metadata
        )

        for item in batch:
            self._processed_hashes.add(item["hash"])

검색 결과 캐싱

자주 반복되는 쿼리에 대해 캐싱을 적용하여 응답 속도를 향상시키고 API 비용을 절감한다.

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis

class RAGCache:
    """RAG 검색 결과 캐시"""

    def __init__(self, redis_url: str = "redis://localhost:6379",
                 ttl_seconds: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl_seconds

    def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
        """쿼리와 필터 기반 캐시 키 생성"""
        key_data = {"query": query.lower().strip(), "filters": filters or {}}
        key_hash = hashlib.sha256(
            json.dumps(key_data, sort_keys=True).encode()
        ).hexdigest()
        return f"rag:cache:{key_hash}"

    def get(self, query: str, filters: Optional[dict] = None):
        """캐시에서 검색 결과 조회"""
        key = self._cache_key(query, filters)
        cached = self.redis.get(key)

        if cached:
            data = json.loads(cached)
            data["cache_hit"] = True
            return data
        return None

    def set(self, query: str, results: list,
            filters: Optional[dict] = None):
        """검색 결과를 캐시에 저장"""
        key = self._cache_key(query, filters)
        data = {
            "results": results,
            "cached_at": datetime.utcnow().isoformat(),
            "query": query
        }
        self.redis.setex(key, self.ttl, json.dumps(data))

    def invalidate_by_pattern(self, pattern: str):
        """패턴 매칭으로 캐시 무효화"""
        keys = self.redis.keys(f"rag:cache:*{pattern}*")
        if keys:
            self.redis.delete(*keys)

장애 사례 및 대응

검색 품질 저하 패턴

장애 유형증상원인대응 방안
임베딩 드리프트시간 경과에 따른 검색 정확도 저하임베딩 모델 버전 변경, 데이터 분포 변화정기적 재인덱싱, 모델 버전 고정
청크 불일치관련 정보가 검색되지 않음청크 크기/전략 부적절청크 크기 실험, 오버랩 조정
필터 과적합결과가 0건인 쿼리 증가메타데이터 필터가 너무 엄격필터 폴백 전략, 필터 완화
컨텍스트 오염LLM이 무관한 정보로 답변리랭킹 부재, top-k 과다리랭킹 도입, top-k 감소
지연 시간 증가응답 시간 2초 초과인덱스 비최적화, 네트워크 지연인덱스 튜닝, 캐싱, 비동기 처리

임베딩 드리프트 모니터링

import numpy as np
from datetime import datetime
from typing import List

class EmbeddingDriftMonitor:
    """임베딩 드리프트 감지 및 모니터링"""

    def __init__(self, reference_embeddings: np.ndarray):
        self.reference_mean = np.mean(reference_embeddings, axis=0)
        self.reference_std = np.std(reference_embeddings, axis=0)
        self.drift_history = []

    def check_drift(self, new_embeddings: np.ndarray,
                     threshold: float = 0.1) -> dict:
        """새 임베딩의 드리프트 검사"""
        new_mean = np.mean(new_embeddings, axis=0)

        # 코사인 거리로 드리프트 측정
        cosine_sim = np.dot(self.reference_mean, new_mean) / (
            np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
        )
        drift_score = 1 - cosine_sim

        # 분포 비교 (KL Divergence 근사)
        distribution_shift = np.mean(
            np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
            / (self.reference_std + 1e-8)
        )

        result = {
            "drift_score": float(drift_score),
            "distribution_shift": float(distribution_shift),
            "is_drifted": drift_score > threshold,
            "timestamp": datetime.utcnow().isoformat(),
            "recommendation": (
                "재인덱싱 필요" if drift_score > threshold
                else "정상 범위"
            )
        }

        self.drift_history.append(result)
        return result

검색 품질 저하 시 폴백 전략

class RAGWithFallback:
    """폴백 전략이 포함된 RAG 파이프라인"""

    def __init__(self, primary_retriever, fallback_retriever,
                 reranker, llm):
        self.primary = primary_retriever
        self.fallback = fallback_retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str) -> dict:
        """폴백을 포함한 질의 처리"""
        # 1차: 기본 검색
        primary_docs = self.primary.invoke(question)

        if not primary_docs:
            # 폴백 1: 필터 완화
            primary_docs = self.fallback.invoke(question)

        if not primary_docs:
            return {
                "answer": "관련 정보를 찾을 수 없습니다.",
                "confidence": 0.0,
                "source": "no_results"
            }

        # 리랭킹
        reranked = self.reranker.rerank(
            query=question,
            documents=[d.page_content for d in primary_docs],
            top_n=5
        )

        # 최소 관련도 검증
        if reranked[0]["relevance_score"] < 0.3:
            return {
                "answer": "높은 신뢰도의 답변을 생성할 수 없습니다.",
                "confidence": reranked[0]["relevance_score"],
                "source": "low_confidence"
            }

        # LLM 생성
        context = "\n\n".join(r["text"] for r in reranked)
        answer = self.llm.invoke(
            f"컨텍스트:\n{context}\n\n질문: {question}\n답변:"
        ).content

        return {
            "answer": answer,
            "confidence": reranked[0]["relevance_score"],
            "source": "primary",
            "num_sources": len(reranked)
        }

프로덕션 체크리스트

설계 단계

  • 사용 사례에 따른 임베딩 모델 벤치마크 수행
  • 벡터 DB 선택: 규모, 예산, 운영 역량 고려
  • 청킹 전략 A/B 테스트 계획 수립
  • 평가 데이터셋 (최소 100개 QA 쌍) 구축

개발 단계

  • 하이브리드 검색 (BM25 + 벡터) 구현
  • 리랭킹 파이프라인 통합
  • 비동기 인덱싱 파이프라인 구축
  • 캐싱 레이어 구현 (Redis/Memcached)
  • 에러 핸들링 및 폴백 전략 구현

배포 단계

  • RAGAS 메트릭 기반 품질 게이트 설정
  • 임베딩 드리프트 모니터링 대시보드 구축
  • 검색 지연 시간 알림 설정 (P95 기준 500ms)
  • 인덱스 재구축 자동화 파이프라인 구축
  • 로드 테스트 수행 (목표 QPS 대비 2배)

운영 단계

  • 주간 검색 품질 리포트 생성
  • 월간 임베딩 모델 업데이트 검토
  • 사용자 피드백 기반 평가 데이터셋 업데이트
  • 비용 모니터링 (임베딩 API 호출, 벡터 DB 스토리지)
  • 정기 인덱스 재구축 (분기 1회 이상)

참고자료

RAG Pipeline Production Guide: From Vector DB Selection to Chunking, Reranking, and Evaluation

RAG Pipeline Production Guide

Introduction

RAG (Retrieval-Augmented Generation) is a critical architecture pattern for solving LLM hallucination problems and generating answers grounded in up-to-date information. While it may seem as simple as "search and paste into a prompt," production environments require numerous engineering decisions: embedding model selection, vector DB operations, chunking strategies, hybrid search, reranking, and evaluation frameworks.

This article analyzes the full RAG pipeline architecture, covering selection criteria and implementation methods for each component with production-ready code. We also examine common failure patterns in production, recovery strategies, and systematic evaluation methodologies using RAGAS.

RAG Architecture Overview

A RAG pipeline consists of three main stages: Retrieval, Augmentation, and Generation. Each stage can be optimized independently, and the overall pipeline quality is determined by its weakest stage.

End-to-End Architecture Flow

[User Query]
    |
    v
[Query Processing] <- Query decomposition, HyDE, query expansion
    |
    v
[Retrieval] <- BM25 + Vector search (hybrid)
    |
    v
[Reranking] <- Cross-encoder, Cohere Rerank
    |
    v
[Context Assembly] <- Chunk merging, deduplication, token limit
    |
    v
[Generation] <- Pass context and query to LLM
    |
    v
[Post-processing] <- Source attribution, confidence scores, hallucination check

Basic RAG Implementation

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# 1. Document chunking
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)

# 2. Embedding and vector store creation
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    collection_name="production_docs"
)

# 3. Retriever configuration
retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 5, "fetch_k": 20}
)

# 4. RAG chain assembly
template = """Answer the question based on the following context.
If the answer cannot be found in the context, respond with "Insufficient information."

Context:
{context}

Question: {question}
Answer:"""

prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

response = rag_chain.invoke("What are the key components of a RAG pipeline?")

Embedding Model Comparison

The embedding model is one of the most critical factors determining RAG pipeline retrieval quality. Here is a comparison of major embedding models as of 2025-2026.

ModelDimensionsMax TokensMTEB AverageMultilingualCostFeatures
OpenAI text-embedding-3-large3072819164.6Yes$0.13/1M tokensDimension reduction support
OpenAI text-embedding-3-small1536819162.3Yes$0.02/1M tokensCost-effective
Cohere embed-v3102451264.5Yes (100+)$0.10/1M tokensSearch-optimized
BGE-M31024819266.1Yes (100+)Free (self-hosted)Dense+Sparse+ColBERT
E5-mistral-7b-instruct40963276866.6YesFree (self-hosted)Instruction-based embedding
Jina-embeddings-v31024819265.5Yes (89)$0.02/1M tokensTask-specific LoRA

Embedding Model Selection Criteria

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List

class EmbeddingBenchmark:
    """Embedding model comparison benchmark"""

    def __init__(self, models: List[str]):
        self.models = {}
        for model_name in models:
            self.models[model_name] = SentenceTransformer(model_name)

    def evaluate_retrieval_quality(
        self,
        queries: List[str],
        relevant_docs: List[List[str]],
        corpus: List[str]
    ) -> dict:
        results = {}
        for name, model in self.models.items():
            corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
            query_embeddings = model.encode(queries, normalize_embeddings=True)

            # Cosine similarity calculation
            scores = np.dot(query_embeddings, corpus_embeddings.T)

            # Recall@k calculation
            recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
            recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)

            results[name] = {
                "recall@5": recall_at_5,
                "recall@10": recall_at_10,
                "embedding_dim": model.get_sentence_embedding_dimension(),
                "encoding_speed": self._measure_speed(model, queries)
            }
        return results

    def _compute_recall(self, scores, relevant_docs, k):
        recalls = []
        for i, rel_docs in enumerate(relevant_docs):
            top_k_indices = np.argsort(scores[i])[-k:]
            retrieved = set(top_k_indices)
            relevant = set(rel_docs)
            recalls.append(len(retrieved & relevant) / len(relevant))
        return np.mean(recalls)

    def _measure_speed(self, model, texts, n_runs=3):
        import time
        times = []
        for _ in range(n_runs):
            start = time.time()
            model.encode(texts)
            times.append(time.time() - start)
        return np.mean(times)

Vector Database Comparison

Vector database selection directly impacts the operational complexity, cost, and performance of a RAG system. Here is a multi-dimensional comparison of major vector databases.

FeaturePineconeMilvusWeaviateQdrantChroma
DeploymentFully managedSelf-hosted/Zilliz CloudSelf-hosted/CloudSelf-hosted/CloudIn-memory/Self-hosted
Max VectorsBillionsBillionsBillionsBillionsMillions
Search AlgorithmProprietaryHNSW, IVF, DiskANNHNSWHNSWHNSW
Hybrid SearchYes (Sparse+Dense)Yes (BM25+Dense)Yes (BM25+Dense)Yes (Sparse+Dense)No
Multi-tenancyYes (namespaces)Yes (partitions)Yes (tenants)Yes (collections)Yes (collections)
FilteringMetadata filtersScalar filteringGraphQL filtersPayload filtersMetadata filters
Price (1M vectors)~$70/monthFree (self-hosted)Free (self-hosted)Free (self-hosted)Free
Best ForQuick prototyping, managedLarge enterpriseGraphQL ecosystemHigh-perf filteringDev/small scale

Production Vector Search with Qdrant

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue,
    SearchParams, HnswConfigDiff
)
import uuid

class ProductionVectorStore:
    """Production vector store manager"""

    def __init__(self, url: str, api_key: str, collection_name: str):
        self.client = QdrantClient(url=url, api_key=api_key)
        self.collection_name = collection_name

    def create_collection(self, vector_size: int = 1536):
        """Create collection with optimized HNSW index"""
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=Distance.COSINE,
                on_disk=True  # Disk-based storage for large datasets
            ),
            hnsw_config=HnswConfigDiff(
                m=16,              # Graph connectivity (default: 16)
                ef_construct=128,  # Search range during index build
                full_scan_threshold=10000  # Full scan below this count
            ),
            optimizers_config={
                "indexing_threshold": 20000,
                "memmap_threshold": 50000
            }
        )

    def upsert_documents(self, documents: list, embeddings: list, metadata: list):
        """Batch upsert"""
        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding,
                payload={**meta, "text": doc}
            )
            for doc, embedding, meta in zip(documents, embeddings, metadata)
        ]

        # Upload in batches of 100
        batch_size = 100
        for i in range(0, len(points), batch_size):
            batch = points[i:i + batch_size]
            self.client.upsert(
                collection_name=self.collection_name,
                points=batch,
                wait=True
            )

    def hybrid_search(self, query_vector: list, query_text: str,
                      filters: dict = None, top_k: int = 10):
        """Hybrid search (vector + metadata filter)"""
        search_filter = None
        if filters:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in filters.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            query_filter=search_filter,
            limit=top_k,
            search_params=SearchParams(
                hnsw_ef=128,  # Search range (higher = more accurate, slower)
                exact=False   # Use approximate search
            )
        )
        return results

Chunking Strategies

Chunking is the preprocessing stage that has the greatest impact on RAG quality. Poor chunking can separate related information or include unnecessary noise, significantly degrading retrieval quality.

Chunking Strategy Comparison

StrategyProsConsBest For
Fixed-sizeSimple to implement, predictableIgnores semantic boundariesUniform text
RecursiveRespects paragraph/sentence boundariesRequires size tuningGeneral documents
SemanticPreserves semantic unitsHigh computational costUnstructured text
Document-structureMaintains heading/section hierarchyRequires parser implementationMarkdown, HTML
Parent-ChildSearch precision + sufficient context2x storage spaceTechnical documentation

Semantic Chunking Implementation

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np

class AdvancedChunker:
    """Advanced chunker supporting multiple chunking strategies"""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)

    def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
        """Semantic chunking - split at semantic change points"""
        chunker = SemanticChunker(
            self.embeddings,
            breakpoint_threshold_type="percentile",
            breakpoint_threshold_amount=breakpoint_threshold * 100
        )
        return chunker.split_text(text)

    def parent_child_chunk(self, text: str,
                            parent_size: int = 2000,
                            child_size: int = 400,
                            child_overlap: int = 50):
        """Parent-child chunking - search on children, context from parents"""
        from langchain.text_splitter import RecursiveCharacterTextSplitter

        # Create parent chunks
        parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=parent_size,
            chunk_overlap=0
        )
        parent_chunks = parent_splitter.split_text(text)

        # Create child chunks from each parent
        child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=child_size,
            chunk_overlap=child_overlap
        )

        result = []
        for i, parent in enumerate(parent_chunks):
            children = child_splitter.split_text(parent)
            for child in children:
                result.append({
                    "child_text": child,
                    "parent_text": parent,
                    "parent_id": i
                })
        return result

    def markdown_structure_chunk(self, markdown_text: str):
        """Markdown document structure-based chunking"""
        from langchain.text_splitter import MarkdownHeaderTextSplitter

        headers_to_split_on = [
            ("#", "h1"),
            ("##", "h2"),
            ("###", "h3"),
        ]

        splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on
        )
        chunks = splitter.split_text(markdown_text)

        # Add hierarchy metadata to each chunk
        enriched_chunks = []
        for chunk in chunks:
            enriched_chunks.append({
                "text": chunk.page_content,
                "metadata": chunk.metadata,
                "hierarchy": " > ".join(
                    chunk.metadata.get(h, "")
                    for h in ["h1", "h2", "h3"]
                    if chunk.metadata.get(h)
                )
            })
        return enriched_chunks

Retrieval Optimization Strategies

Simple vector similarity search alone cannot achieve production-grade retrieval quality. Various optimization techniques such as hybrid search, HyDE, and query decomposition must be combined.

Hybrid Search (BM25 + Vector)

from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple

class HybridRetriever:
    """Hybrid retriever combining BM25 + vector search"""

    def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
        self.documents = documents
        self.embeddings = OpenAIEmbeddings(model=embeddings_model)

        # Build BM25 index
        tokenized_docs = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)

        # Build vector index
        self.doc_embeddings = np.array(
            self.embeddings.embed_documents(documents)
        )

    def search(self, query: str, top_k: int = 5,
               alpha: float = 0.5) -> List[Tuple[str, float]]:
        """
        Perform hybrid search
        alpha: vector search weight (0=BM25 only, 1=vector only)
        """
        # BM25 scores
        bm25_scores = self.bm25.get_scores(query.lower().split())
        bm25_scores = self._normalize_scores(bm25_scores)

        # Vector similarity scores
        query_embedding = np.array(self.embeddings.embed_query(query))
        vector_scores = np.dot(self.doc_embeddings, query_embedding)
        vector_scores = self._normalize_scores(vector_scores)

        # Weighted combination (Reciprocal Rank Fusion alternative)
        combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores

        # Return top-k results
        top_indices = np.argsort(combined_scores)[-top_k:][::-1]
        return [
            (self.documents[i], combined_scores[i])
            for i in top_indices
        ]

    def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
        """Min-Max normalization"""
        min_s, max_s = scores.min(), scores.max()
        if max_s == min_s:
            return np.zeros_like(scores)
        return (scores - min_s) / (max_s - min_s)

HyDE (Hypothetical Document Embedding)

HyDE generates a hypothetical answer to the query first, then uses that answer's embedding for retrieval. This bridges the semantic gap between questions and answers, improving retrieval quality.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate

class HyDERetriever:
    """HyDE-based retriever"""

    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

        self.hyde_prompt = ChatPromptTemplate.from_template(
            "Write a hypothetical answer to the following question. "
            "It is more important to include relevant keywords and concepts "
            "than to be factually accurate.\n\n"
            "Question: {question}\n\n"
            "Hypothetical answer:"
        )

    def retrieve(self, query: str, top_k: int = 5):
        """Perform HyDE retrieval"""
        # 1. Generate hypothetical answer
        chain = self.hyde_prompt | self.llm
        hypothetical_answer = chain.invoke({"question": query}).content

        # 2. Create embedding from hypothetical answer
        hyde_embedding = self.embeddings.embed_query(hypothetical_answer)

        # 3. Search using hypothetical answer embedding
        results = self.vectorstore.similarity_search_by_vector(
            hyde_embedding, k=top_k
        )
        return results

Query Decomposition

This strategy breaks complex questions into sub-questions, retrieves results for each, then aggregates them.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json

class QueryDecomposer:
    """Decompose complex queries into sub-queries"""

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.decompose_prompt = ChatPromptTemplate.from_template(
            "Decompose the following question into 2-4 sub-questions "
            "optimized for retrieval.\n"
            "Return as a JSON array.\n\n"
            "Original question: {question}\n\n"
            "Sub-questions:"
        )

    def decompose(self, question: str) -> list:
        """Decompose query"""
        chain = self.decompose_prompt | self.llm
        result = chain.invoke({"question": question}).content

        try:
            sub_queries = json.loads(result)
        except json.JSONDecodeError:
            sub_queries = [question]

        return sub_queries

    def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
        """Retrieve with decomposed queries and merge results"""
        sub_queries = self.decompose(question)

        all_docs = []
        seen_contents = set()

        for sub_query in sub_queries:
            docs = retriever.invoke(sub_query)[:top_k]
            for doc in docs:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    all_docs.append(doc)

        return all_docs

Reranking

Reranking re-evaluates initial search results with a more precise model and reorders them. Cross-encoder-based reranking provides more accurate relevance judgments than bi-encoder-based vector search.

Using Cohere Rerank

import cohere
from typing import List, Dict

class CohereReranker:
    """Reranking using Cohere Rerank API"""

    def __init__(self, api_key: str, model: str = "rerank-v3.5"):
        self.client = cohere.Client(api_key)
        self.model = model

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """Rerank documents"""
        response = self.client.rerank(
            model=self.model,
            query=query,
            documents=documents,
            top_n=top_n,
            return_documents=True
        )

        results = []
        for hit in response.results:
            results.append({
                "text": hit.document.text,
                "relevance_score": hit.relevance_score,
                "index": hit.index
            })
        return results


class CrossEncoderReranker:
    """Local cross-encoder model-based reranking"""

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """Rerank with cross-encoder"""
        pairs = [[query, doc] for doc in documents]
        scores = self.model.predict(pairs)

        # Sort by score
        scored_docs = list(zip(documents, scores, range(len(documents))))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        results = []
        for text, score, idx in scored_docs[:top_n]:
            results.append({
                "text": text,
                "relevance_score": float(score),
                "index": idx
            })
        return results

Integrated Reranking Pipeline

class RAGPipelineWithReranking:
    """RAG pipeline with integrated reranking"""

    def __init__(self, retriever, reranker, llm):
        self.retriever = retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str, top_k_retrieve: int = 20,
              top_k_rerank: int = 5) -> str:
        """Retrieve -> Rerank -> Generate"""
        # Stage 1: Initial retrieval (broad)
        initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
        doc_texts = [doc.page_content for doc in initial_docs]

        # Stage 2: Reranking (precise)
        reranked = self.reranker.rerank(
            query=question,
            documents=doc_texts,
            top_n=top_k_rerank
        )

        # Stage 3: Context assembly
        context = "\n\n---\n\n".join(
            r["text"] for r in reranked
        )

        # Stage 4: LLM generation
        prompt = f"""Answer the question based on the following context.
Cite the source of each piece of information.

Context:
{context}

Question: {question}
Answer:"""

        response = self.llm.invoke(prompt)
        return response.content

RAG Evaluation with RAGAS

RAGAS (Retrieval Augmented Generation Assessment) is a framework for automatically evaluating RAG pipelines. It measures retrieval quality and generation quality with separate metrics.

Core RAGAS Metrics

MetricMeasuresDescriptionTarget
Context PrecisionRetrievalProportion of retrieved contexts that are relevant0.8+
Context RecallRetrievalProportion of needed information included in context0.9+
FaithfulnessGenerationDegree to which the answer is grounded in context0.9+
Answer RelevancyGenerationHow well the answer addresses the question0.8+
Answer CorrectnessOverallAccuracy compared to ground truth0.7+

RAGAS Evaluation Implementation

from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    answer_correctness
)
from datasets import Dataset

class RAGEvaluator:
    """RAGAS-based RAG evaluator"""

    def __init__(self):
        self.metrics = [
            context_precision,
            context_recall,
            faithfulness,
            answer_relevancy,
            answer_correctness
        ]

    def create_eval_dataset(self, eval_samples: list) -> Dataset:
        """Create evaluation dataset"""
        data = {
            "question": [],
            "answer": [],
            "contexts": [],
            "ground_truth": []
        }

        for sample in eval_samples:
            data["question"].append(sample["question"])
            data["answer"].append(sample["generated_answer"])
            data["contexts"].append(sample["retrieved_contexts"])
            data["ground_truth"].append(sample["expected_answer"])

        return Dataset.from_dict(data)

    def evaluate(self, eval_samples: list) -> dict:
        """Perform RAG pipeline evaluation"""
        dataset = self.create_eval_dataset(eval_samples)

        result = evaluate(
            dataset=dataset,
            metrics=self.metrics
        )

        return {
            "context_precision": result["context_precision"],
            "context_recall": result["context_recall"],
            "faithfulness": result["faithfulness"],
            "answer_relevancy": result["answer_relevancy"],
            "answer_correctness": result["answer_correctness"],
            "overall_score": sum(result.values()) / len(result)
        }

    def generate_report(self, results: dict) -> str:
        """Generate evaluation report"""
        report = "=== RAG Pipeline Evaluation Report ===\n\n"

        for metric, score in results.items():
            status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
            report += f"  {metric}: {score:.4f} [{status}]\n"

        report += f"\n  Overall: {results.get('overall_score', 0):.4f}\n"
        return report

Production Deployment Patterns

Async Indexing Pipeline

In production, document indexing and search serving must be separated. When new documents are added, embeddings are generated asynchronously and the index is updated.

import asyncio
from datetime import datetime
from typing import Optional
import hashlib

class AsyncIndexingPipeline:
    """Async document indexing pipeline"""

    def __init__(self, embeddings, vector_store, chunker):
        self.embeddings = embeddings
        self.vector_store = vector_store
        self.chunker = chunker
        self.index_queue = asyncio.Queue()
        self._processed_hashes = set()

    async def enqueue_document(self, doc_id: str, content: str,
                                metadata: Optional[dict] = None):
        """Add document to indexing queue"""
        doc_hash = hashlib.md5(content.encode()).hexdigest()

        if doc_hash in self._processed_hashes:
            return {"status": "skipped", "reason": "duplicate"}

        await self.index_queue.put({
            "doc_id": doc_id,
            "content": content,
            "metadata": metadata or {},
            "hash": doc_hash,
            "enqueued_at": datetime.utcnow().isoformat()
        })
        return {"status": "enqueued", "queue_size": self.index_queue.qsize()}

    async def process_queue(self, batch_size: int = 10):
        """Process documents from queue in batches"""
        while True:
            batch = []
            try:
                for _ in range(batch_size):
                    item = self.index_queue.get_nowait()
                    batch.append(item)
            except asyncio.QueueEmpty:
                pass

            if batch:
                await self._process_batch(batch)
            else:
                await asyncio.sleep(1)

    async def _process_batch(self, batch: list):
        """Batch processing: chunk -> embed -> upsert"""
        all_chunks = []
        all_metadata = []

        for item in batch:
            chunks = self.chunker.semantic_chunk(item["content"])
            for chunk in chunks:
                all_chunks.append(chunk)
                all_metadata.append({
                    "doc_id": item["doc_id"],
                    "hash": item["hash"],
                    **item["metadata"]
                })

        # Batch embedding generation
        chunk_embeddings = self.embeddings.embed_documents(all_chunks)

        # Upsert to vector store
        self.vector_store.upsert_documents(
            documents=all_chunks,
            embeddings=chunk_embeddings,
            metadata=all_metadata
        )

        for item in batch:
            self._processed_hashes.add(item["hash"])

Search Result Caching

Apply caching for frequently repeated queries to improve response time and reduce API costs.

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis

class RAGCache:
    """RAG search result cache"""

    def __init__(self, redis_url: str = "redis://localhost:6379",
                 ttl_seconds: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl_seconds

    def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
        """Generate cache key from query and filters"""
        key_data = {"query": query.lower().strip(), "filters": filters or {}}
        key_hash = hashlib.sha256(
            json.dumps(key_data, sort_keys=True).encode()
        ).hexdigest()
        return f"rag:cache:{key_hash}"

    def get(self, query: str, filters: Optional[dict] = None):
        """Retrieve search results from cache"""
        key = self._cache_key(query, filters)
        cached = self.redis.get(key)

        if cached:
            data = json.loads(cached)
            data["cache_hit"] = True
            return data
        return None

    def set(self, query: str, results: list,
            filters: Optional[dict] = None):
        """Store search results in cache"""
        key = self._cache_key(query, filters)
        data = {
            "results": results,
            "cached_at": datetime.utcnow().isoformat(),
            "query": query
        }
        self.redis.setex(key, self.ttl, json.dumps(data))

    def invalidate_by_pattern(self, pattern: str):
        """Invalidate cache by pattern matching"""
        keys = self.redis.keys(f"rag:cache:*{pattern}*")
        if keys:
            self.redis.delete(*keys)

Failure Cases and Recovery

Retrieval Quality Degradation Patterns

Failure TypeSymptomsCauseMitigation
Embedding DriftGradual accuracy degradation over timeEmbedding model version change, data distribution shiftPeriodic re-indexing, model version pinning
Chunk MismatchRelevant information not retrievedInappropriate chunk size/strategyChunk size experimentation, overlap adjustment
Filter OverfitIncreasing zero-result queriesOverly strict metadata filtersFilter fallback strategy, filter relaxation
Context PollutionLLM answers with irrelevant informationMissing reranking, excessive top-kIntroduce reranking, reduce top-k
Latency SpikeResponse time exceeds 2 secondsNon-optimized index, network latencyIndex tuning, caching, async processing

Embedding Drift Monitoring

import numpy as np
from datetime import datetime
from typing import List

class EmbeddingDriftMonitor:
    """Embedding drift detection and monitoring"""

    def __init__(self, reference_embeddings: np.ndarray):
        self.reference_mean = np.mean(reference_embeddings, axis=0)
        self.reference_std = np.std(reference_embeddings, axis=0)
        self.drift_history = []

    def check_drift(self, new_embeddings: np.ndarray,
                     threshold: float = 0.1) -> dict:
        """Check drift of new embeddings"""
        new_mean = np.mean(new_embeddings, axis=0)

        # Measure drift via cosine distance
        cosine_sim = np.dot(self.reference_mean, new_mean) / (
            np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
        )
        drift_score = 1 - cosine_sim

        # Distribution comparison (KL Divergence approximation)
        distribution_shift = np.mean(
            np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
            / (self.reference_std + 1e-8)
        )

        result = {
            "drift_score": float(drift_score),
            "distribution_shift": float(distribution_shift),
            "is_drifted": drift_score > threshold,
            "timestamp": datetime.utcnow().isoformat(),
            "recommendation": (
                "Re-indexing required" if drift_score > threshold
                else "Within normal range"
            )
        }

        self.drift_history.append(result)
        return result

Fallback Strategy for Quality Degradation

class RAGWithFallback:
    """RAG pipeline with fallback strategies"""

    def __init__(self, primary_retriever, fallback_retriever,
                 reranker, llm):
        self.primary = primary_retriever
        self.fallback = fallback_retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str) -> dict:
        """Query processing with fallback"""
        # Primary retrieval
        primary_docs = self.primary.invoke(question)

        if not primary_docs:
            # Fallback 1: Relaxed filters
            primary_docs = self.fallback.invoke(question)

        if not primary_docs:
            return {
                "answer": "No relevant information found.",
                "confidence": 0.0,
                "source": "no_results"
            }

        # Reranking
        reranked = self.reranker.rerank(
            query=question,
            documents=[d.page_content for d in primary_docs],
            top_n=5
        )

        # Minimum relevance validation
        if reranked[0]["relevance_score"] < 0.3:
            return {
                "answer": "Unable to generate a high-confidence answer.",
                "confidence": reranked[0]["relevance_score"],
                "source": "low_confidence"
            }

        # LLM generation
        context = "\n\n".join(r["text"] for r in reranked)
        answer = self.llm.invoke(
            f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
        ).content

        return {
            "answer": answer,
            "confidence": reranked[0]["relevance_score"],
            "source": "primary",
            "num_sources": len(reranked)
        }

Production Checklist

Design Phase

  • Benchmark embedding models for your specific use case
  • Select vector DB based on scale, budget, and operational capability
  • Plan A/B testing for chunking strategies
  • Build evaluation dataset (minimum 100 QA pairs)

Development Phase

  • Implement hybrid search (BM25 + vector)
  • Integrate reranking pipeline
  • Build async indexing pipeline
  • Implement caching layer (Redis/Memcached)
  • Implement error handling and fallback strategies

Deployment Phase

  • Set quality gates based on RAGAS metrics
  • Build embedding drift monitoring dashboard
  • Set search latency alerts (P95 target: 500ms)
  • Build automated index rebuild pipeline
  • Perform load testing (2x target QPS)

Operations Phase

  • Generate weekly retrieval quality reports
  • Monthly embedding model update review
  • Update evaluation dataset based on user feedback
  • Monitor costs (embedding API calls, vector DB storage)
  • Periodic index rebuilds (at least quarterly)

References