Split View: RAG 파이프라인 프로덕션 구축 가이드: 벡터 DB 선택부터 청킹·리랭킹·평가까지
RAG 파이프라인 프로덕션 구축 가이드: 벡터 DB 선택부터 청킹·리랭킹·평가까지
- 들어가며
- RAG 아키텍처 개요
- 임베딩 모델 비교
- 벡터 데이터베이스 비교
- 청킹 전략
- 검색 최적화 전략
- 리랭킹 (Reranking)
- RAGAS를 활용한 RAG 평가
- 프로덕션 배포 패턴
- 장애 사례 및 대응
- 프로덕션 체크리스트
- 참고자료

들어가며
RAG(Retrieval-Augmented Generation)는 LLM의 환각(hallucination) 문제를 해결하고 최신 정보를 활용한 답변을 생성하기 위한 핵심 아키텍처 패턴이다. 단순히 "검색해서 프롬프트에 넣기"로 보일 수 있지만, 프로덕션 환경에서는 임베딩 모델 선택, 벡터 DB 운영, 청킹 전략, 하이브리드 검색, 리랭킹, 평가 체계까지 수많은 엔지니어링 결정이 필요하다.
이 글에서는 RAG 파이프라인의 전체 아키텍처를 분석하고, 각 구성 요소의 선택 기준과 구현 방법을 실전 코드와 함께 다룬다. 특히 프로덕션에서 자주 발생하는 장애 패턴과 그 대응 전략, 그리고 RAGAS를 활용한 체계적 평가 방법론까지 포괄적으로 살펴본다.
RAG 아키텍처 개요
RAG 파이프라인은 크게 세 단계로 구성된다: Retrieval(검색), Augmentation(증강), Generation(생성). 각 단계는 독립적으로 최적화할 수 있으며, 전체 파이프라인의 품질은 가장 약한 단계에 의해 결정된다.
전체 아키텍처 흐름
[사용자 질의]
│
▼
[Query Processing] ← 쿼리 분해, HyDE, 쿼리 확장
│
▼
[Retrieval] ← BM25 + 벡터 검색 (하이브리드)
│
▼
[Reranking] ← Cross-encoder, Cohere Rerank
│
▼
[Context Assembly] ← 청크 병합, 중복 제거, 토큰 제한
│
▼
[Generation] ← LLM에 컨텍스트와 질의 전달
│
▼
[Post-processing] ← 출처 표시, 신뢰도 점수, 환각 검증
기본 RAG 구현
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# 1. 문서 청킹
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)
# 2. 임베딩 및 벡터 저장소 생성
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_name="production_docs"
)
# 3. Retriever 설정
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20}
)
# 4. RAG 체인 구성
template = """다음 컨텍스트를 기반으로 질문에 답변하세요.
컨텍스트에서 답을 찾을 수 없다면 "정보가 부족합니다"라고 답하세요.
컨텍스트:
{context}
질문: {question}
답변:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
response = rag_chain.invoke("RAG 파이프라인의 핵심 구성요소는?")
임베딩 모델 비교
임베딩 모델은 RAG 파이프라인의 검색 품질을 결정하는 가장 중요한 요소 중 하나다. 2025-2026년 현재 주요 임베딩 모델들의 특성을 비교한다.
| 모델 | 차원 | 최대 토큰 | MTEB 평균 | 다국어 | 비용 | 특징 |
|---|---|---|---|---|---|---|
| OpenAI text-embedding-3-large | 3072 | 8191 | 64.6 | O | $0.13/1M tokens | 차원 축소 가능 |
| OpenAI text-embedding-3-small | 1536 | 8191 | 62.3 | O | $0.02/1M tokens | 비용 효율적 |
| Cohere embed-v3 | 1024 | 512 | 64.5 | O (100+) | $0.10/1M tokens | 검색 특화 |
| BGE-M3 | 1024 | 8192 | 66.1 | O (100+) | 무료 (자체 호스팅) | Dense+Sparse+ColBERT |
| E5-mistral-7b-instruct | 4096 | 32768 | 66.6 | O | 무료 (자체 호스팅) | 지시 기반 임베딩 |
| Jina-embeddings-v3 | 1024 | 8192 | 65.5 | O (89) | $0.02/1M tokens | Task-specific LoRA |
임베딩 모델 선택 기준
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
class EmbeddingBenchmark:
"""임베딩 모델 비교 벤치마크"""
def __init__(self, models: List[str]):
self.models = {}
for model_name in models:
self.models[model_name] = SentenceTransformer(model_name)
def evaluate_retrieval_quality(
self,
queries: List[str],
relevant_docs: List[List[str]],
corpus: List[str]
) -> dict:
results = {}
for name, model in self.models.items():
corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
query_embeddings = model.encode(queries, normalize_embeddings=True)
# 코사인 유사도 계산
scores = np.dot(query_embeddings, corpus_embeddings.T)
# Recall@k 계산
recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)
results[name] = {
"recall@5": recall_at_5,
"recall@10": recall_at_10,
"embedding_dim": model.get_sentence_embedding_dimension(),
"encoding_speed": self._measure_speed(model, queries)
}
return results
def _compute_recall(self, scores, relevant_docs, k):
recalls = []
for i, rel_docs in enumerate(relevant_docs):
top_k_indices = np.argsort(scores[i])[-k:]
retrieved = set(top_k_indices)
relevant = set(rel_docs)
recalls.append(len(retrieved & relevant) / len(relevant))
return np.mean(recalls)
def _measure_speed(self, model, texts, n_runs=3):
import time
times = []
for _ in range(n_runs):
start = time.time()
model.encode(texts)
times.append(time.time() - start)
return np.mean(times)
벡터 데이터베이스 비교
벡터 데이터베이스 선택은 RAG 시스템의 운영 복잡도, 비용, 성능에 직접적인 영향을 미친다. 주요 벡터 DB를 다각도로 비교한다.
| 항목 | Pinecone | Milvus | Weaviate | Qdrant | Chroma |
|---|---|---|---|---|---|
| 배포 방식 | 완전 관리형 | 자체 호스팅/Zilliz Cloud | 자체 호스팅/Cloud | 자체 호스팅/Cloud | 인메모리/자체 호스팅 |
| 최대 벡터 수 | 수십억 | 수십억 | 수십억 | 수십억 | 수백만 |
| 검색 알고리즘 | 자체 엔진 | HNSW, IVF, DiskANN | HNSW | HNSW | HNSW |
| 하이브리드 검색 | O (Sparse+Dense) | O (BM25+Dense) | O (BM25+Dense) | O (Sparse+Dense) | X |
| 멀티테넌시 | O (네임스페이스) | O (파티션) | O (테넌트) | O (컬렉션) | O (컬렉션) |
| 필터링 | 메타데이터 필터 | 스칼라 필터링 | GraphQL 필터 | 페이로드 필터 | 메타데이터 필터 |
| 가격 (100만 벡터) | 약 $70/월 | 무료 (자체 호스팅) | 무료 (자체 호스팅) | 무료 (자체 호스팅) | 무료 |
| 적합 용도 | 빠른 프로토타입, 관리형 선호 | 대규모 엔터프라이즈 | GraphQL 생태계 | 고성능 필터링 | 개발/소규모 |
Qdrant를 활용한 프로덕션 벡터 검색
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue,
SearchParams, HnswConfigDiff
)
import uuid
class ProductionVectorStore:
"""프로덕션 벡터 저장소 관리자"""
def __init__(self, url: str, api_key: str, collection_name: str):
self.client = QdrantClient(url=url, api_key=api_key)
self.collection_name = collection_name
def create_collection(self, vector_size: int = 1536):
"""HNSW 인덱스 최적화된 컬렉션 생성"""
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE,
on_disk=True # 대규모 데이터셋을 위한 디스크 기반 저장
),
hnsw_config=HnswConfigDiff(
m=16, # 그래프 연결 수 (기본: 16)
ef_construct=128, # 인덱스 빌드 시 탐색 범위
full_scan_threshold=10000 # 이 수 이하면 전체 스캔
),
optimizers_config={
"indexing_threshold": 20000,
"memmap_threshold": 50000
}
)
def upsert_documents(self, documents: list, embeddings: list, metadata: list):
"""배치 업서트"""
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={**meta, "text": doc}
)
for doc, embedding, meta in zip(documents, embeddings, metadata)
]
# 배치 크기 100으로 분할 업로드
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch,
wait=True
)
def hybrid_search(self, query_vector: list, query_text: str,
filters: dict = None, top_k: int = 10):
"""하이브리드 검색 (벡터 + 메타데이터 필터)"""
search_filter = None
if filters:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in filters.items()
]
search_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=search_filter,
limit=top_k,
search_params=SearchParams(
hnsw_ef=128, # 검색 시 탐색 범위 (높을수록 정확, 느림)
exact=False # 근사 검색 사용
)
)
return results
청킹 전략
청킹은 RAG 품질에 가장 큰 영향을 미치는 전처리 단계다. 잘못된 청킹은 관련 정보를 분리시키거나, 불필요한 노이즈를 포함시켜 검색 품질을 크게 저하시킨다.
청킹 전략 비교
| 전략 | 장점 | 단점 | 적합 용도 |
|---|---|---|---|
| 고정 크기 (Fixed-size) | 구현 간단, 예측 가능 | 의미 단위 무시 | 균일한 텍스트 |
| 재귀적 (Recursive) | 문단/문장 경계 존중 | 최적 크기 튜닝 필요 | 범용 문서 |
| 시맨틱 (Semantic) | 의미 단위 보존 | 계산 비용 높음 | 구조 없는 텍스트 |
| 문서 구조 기반 | 제목/섹션 계층 유지 | 파서 구현 필요 | 마크다운, HTML |
| 부모-자식 (Parent-Child) | 검색 정밀도 + 충분한 맥락 | 저장 공간 2배 | 기술 문서 |
시맨틱 청킹 구현
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np
class AdvancedChunker:
"""다양한 청킹 전략을 지원하는 고급 청커"""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
"""시맨틱 청킹 - 의미 변화 지점에서 분할"""
chunker = SemanticChunker(
self.embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=breakpoint_threshold * 100
)
return chunker.split_text(text)
def parent_child_chunk(self, text: str,
parent_size: int = 2000,
child_size: int = 400,
child_overlap: int = 50):
"""부모-자식 청킹 - 검색은 자식, 컨텍스트는 부모"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 부모 청크 생성
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=parent_size,
chunk_overlap=0
)
parent_chunks = parent_splitter.split_text(text)
# 각 부모에서 자식 청크 생성
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=child_size,
chunk_overlap=child_overlap
)
result = []
for i, parent in enumerate(parent_chunks):
children = child_splitter.split_text(parent)
for child in children:
result.append({
"child_text": child,
"parent_text": parent,
"parent_id": i
})
return result
def markdown_structure_chunk(self, markdown_text: str):
"""마크다운 문서 구조 기반 청킹"""
from langchain.text_splitter import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
chunks = splitter.split_text(markdown_text)
# 각 청크에 계층 구조 메타데이터 추가
enriched_chunks = []
for chunk in chunks:
enriched_chunks.append({
"text": chunk.page_content,
"metadata": chunk.metadata,
"hierarchy": " > ".join(
chunk.metadata.get(h, "")
for h in ["h1", "h2", "h3"]
if chunk.metadata.get(h)
)
})
return enriched_chunks
검색 최적화 전략
단순 벡터 유사도 검색만으로는 프로덕션 수준의 검색 품질을 달성하기 어렵다. 하이브리드 검색, HyDE, 쿼리 분해 등 다양한 최적화 기법을 조합해야 한다.
하이브리드 검색 (BM25 + 벡터)
from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple
class HybridRetriever:
"""BM25 + 벡터 검색을 결합한 하이브리드 검색기"""
def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
self.documents = documents
self.embeddings = OpenAIEmbeddings(model=embeddings_model)
# BM25 인덱스 구축
tokenized_docs = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
# 벡터 인덱스 구축
self.doc_embeddings = np.array(
self.embeddings.embed_documents(documents)
)
def search(self, query: str, top_k: int = 5,
alpha: float = 0.5) -> List[Tuple[str, float]]:
"""
하이브리드 검색 수행
alpha: 벡터 검색 가중치 (0=BM25만, 1=벡터만)
"""
# BM25 점수
bm25_scores = self.bm25.get_scores(query.lower().split())
bm25_scores = self._normalize_scores(bm25_scores)
# 벡터 유사도 점수
query_embedding = np.array(self.embeddings.embed_query(query))
vector_scores = np.dot(self.doc_embeddings, query_embedding)
vector_scores = self._normalize_scores(vector_scores)
# 가중 결합 (Reciprocal Rank Fusion 대안)
combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
# 상위 k개 반환
top_indices = np.argsort(combined_scores)[-top_k:][::-1]
return [
(self.documents[i], combined_scores[i])
for i in top_indices
]
def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
"""Min-Max 정규화"""
min_s, max_s = scores.min(), scores.max()
if max_s == min_s:
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
HyDE (Hypothetical Document Embedding)
HyDE는 쿼리에 대한 가상의 답변을 먼저 생성한 후, 그 답변의 임베딩을 검색에 사용하는 기법이다. 질문과 답변 사이의 의미적 차이를 줄여 검색 품질을 향상시킨다.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
class HyDERetriever:
"""HyDE 기반 검색기"""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
self.hyde_prompt = ChatPromptTemplate.from_template(
"다음 질문에 대해 가상의 답변을 작성해주세요. "
"실제 정확성보다는 관련 키워드와 개념을 포함하는 것이 중요합니다.\n\n"
"질문: {question}\n\n"
"가상 답변:"
)
def retrieve(self, query: str, top_k: int = 5):
"""HyDE 검색 수행"""
# 1. 가상 답변 생성
chain = self.hyde_prompt | self.llm
hypothetical_answer = chain.invoke({"question": query}).content
# 2. 가상 답변의 임베딩 생성
hyde_embedding = self.embeddings.embed_query(hypothetical_answer)
# 3. 가상 답변 임베딩으로 검색
results = self.vectorstore.similarity_search_by_vector(
hyde_embedding, k=top_k
)
return results
쿼리 분해 (Query Decomposition)
복잡한 질문을 하위 질문들로 분해하여 각각 검색한 후 결과를 종합하는 전략이다.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json
class QueryDecomposer:
"""복잡한 쿼리를 하위 쿼리로 분해"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.decompose_prompt = ChatPromptTemplate.from_template(
"다음 질문을 검색에 최적화된 2-4개의 하위 질문으로 분해하세요.\n"
"JSON 배열 형태로 반환하세요.\n\n"
"원래 질문: {question}\n\n"
"하위 질문들:"
)
def decompose(self, question: str) -> list:
"""쿼리 분해"""
chain = self.decompose_prompt | self.llm
result = chain.invoke({"question": question}).content
try:
sub_queries = json.loads(result)
except json.JSONDecodeError:
sub_queries = [question]
return sub_queries
def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
"""분해된 쿼리로 검색 후 결과 병합"""
sub_queries = self.decompose(question)
all_docs = []
seen_contents = set()
for sub_query in sub_queries:
docs = retriever.invoke(sub_query)[:top_k]
for doc in docs:
if doc.page_content not in seen_contents:
seen_contents.add(doc.page_content)
all_docs.append(doc)
return all_docs
리랭킹 (Reranking)
초기 검색 결과를 더 정밀한 모델로 재평가하여 순위를 재정렬하는 단계다. Bi-encoder 기반의 벡터 검색보다 Cross-encoder 기반 리랭킹이 더 정확한 관련성 판단을 수행한다.
Cohere Rerank 활용
import cohere
from typing import List, Dict
class CohereReranker:
"""Cohere Rerank API를 활용한 리랭킹"""
def __init__(self, api_key: str, model: str = "rerank-v3.5"):
self.client = cohere.Client(api_key)
self.model = model
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""문서 리랭킹"""
response = self.client.rerank(
model=self.model,
query=query,
documents=documents,
top_n=top_n,
return_documents=True
)
results = []
for hit in response.results:
results.append({
"text": hit.document.text,
"relevance_score": hit.relevance_score,
"index": hit.index
})
return results
class CrossEncoderReranker:
"""로컬 Cross-Encoder 모델 기반 리랭킹"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""Cross-encoder로 리랭킹"""
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
# 점수 기준 정렬
scored_docs = list(zip(documents, scores, range(len(documents))))
scored_docs.sort(key=lambda x: x[1], reverse=True)
results = []
for text, score, idx in scored_docs[:top_n]:
results.append({
"text": text,
"relevance_score": float(score),
"index": idx
})
return results
리랭킹 파이프라인 통합
class RAGPipelineWithReranking:
"""리랭킹이 통합된 RAG 파이프라인"""
def __init__(self, retriever, reranker, llm):
self.retriever = retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str, top_k_retrieve: int = 20,
top_k_rerank: int = 5) -> str:
"""검색 -> 리랭킹 -> 생성"""
# 1단계: 초기 검색 (넓게)
initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
doc_texts = [doc.page_content for doc in initial_docs]
# 2단계: 리랭킹 (정밀)
reranked = self.reranker.rerank(
query=question,
documents=doc_texts,
top_n=top_k_rerank
)
# 3단계: 컨텍스트 조립
context = "\n\n---\n\n".join(
r["text"] for r in reranked
)
# 4단계: LLM 생성
prompt = f"""다음 컨텍스트를 기반으로 질문에 답변하세요.
각 정보의 출처를 표시하세요.
컨텍스트:
{context}
질문: {question}
답변:"""
response = self.llm.invoke(prompt)
return response.content
RAGAS를 활용한 RAG 평가
RAGAS(Retrieval Augmented Generation Assessment)는 RAG 파이프라인을 자동으로 평가하는 프레임워크다. 검색 품질과 생성 품질을 별도의 메트릭으로 측정한다.
RAGAS 핵심 메트릭
| 메트릭 | 측정 대상 | 설명 | 이상적 값 |
|---|---|---|---|
| Context Precision | 검색 | 검색된 컨텍스트 중 관련 있는 비율 | 0.8 이상 |
| Context Recall | 검색 | 필요한 정보가 컨텍스트에 포함된 비율 | 0.9 이상 |
| Faithfulness | 생성 | 답변이 컨텍스트에 근거한 정도 | 0.9 이상 |
| Answer Relevancy | 생성 | 답변이 질문에 적합한 정도 | 0.8 이상 |
| Answer Correctness | 전체 | 정답과 비교한 정확도 | 0.7 이상 |
RAGAS 평가 구현
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
)
from datasets import Dataset
class RAGEvaluator:
"""RAGAS 기반 RAG 평가기"""
def __init__(self):
self.metrics = [
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
]
def create_eval_dataset(self, eval_samples: list) -> Dataset:
"""평가 데이터셋 생성"""
data = {
"question": [],
"answer": [],
"contexts": [],
"ground_truth": []
}
for sample in eval_samples:
data["question"].append(sample["question"])
data["answer"].append(sample["generated_answer"])
data["contexts"].append(sample["retrieved_contexts"])
data["ground_truth"].append(sample["expected_answer"])
return Dataset.from_dict(data)
def evaluate(self, eval_samples: list) -> dict:
"""RAG 파이프라인 평가 수행"""
dataset = self.create_eval_dataset(eval_samples)
result = evaluate(
dataset=dataset,
metrics=self.metrics
)
return {
"context_precision": result["context_precision"],
"context_recall": result["context_recall"],
"faithfulness": result["faithfulness"],
"answer_relevancy": result["answer_relevancy"],
"answer_correctness": result["answer_correctness"],
"overall_score": sum(result.values()) / len(result)
}
def generate_report(self, results: dict) -> str:
"""평가 리포트 생성"""
report = "=== RAG Pipeline Evaluation Report ===\n\n"
for metric, score in results.items():
status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
report += f" {metric}: {score:.4f} [{status}]\n"
report += f"\n Overall: {results.get('overall_score', 0):.4f}\n"
return report
프로덕션 배포 패턴
비동기 인덱싱 파이프라인
프로덕션 환경에서는 문서 인덱싱과 검색 서빙을 분리해야 한다. 새 문서가 추가될 때 비동기로 임베딩을 생성하고 인덱스를 업데이트한다.
import asyncio
from datetime import datetime
from typing import Optional
import hashlib
class AsyncIndexingPipeline:
"""비동기 문서 인덱싱 파이프라인"""
def __init__(self, embeddings, vector_store, chunker):
self.embeddings = embeddings
self.vector_store = vector_store
self.chunker = chunker
self.index_queue = asyncio.Queue()
self._processed_hashes = set()
async def enqueue_document(self, doc_id: str, content: str,
metadata: Optional[dict] = None):
"""문서를 인덱싱 큐에 추가"""
doc_hash = hashlib.md5(content.encode()).hexdigest()
if doc_hash in self._processed_hashes:
return {"status": "skipped", "reason": "duplicate"}
await self.index_queue.put({
"doc_id": doc_id,
"content": content,
"metadata": metadata or {},
"hash": doc_hash,
"enqueued_at": datetime.utcnow().isoformat()
})
return {"status": "enqueued", "queue_size": self.index_queue.qsize()}
async def process_queue(self, batch_size: int = 10):
"""큐에서 배치 단위로 문서 처리"""
while True:
batch = []
try:
for _ in range(batch_size):
item = self.index_queue.get_nowait()
batch.append(item)
except asyncio.QueueEmpty:
pass
if batch:
await self._process_batch(batch)
else:
await asyncio.sleep(1)
async def _process_batch(self, batch: list):
"""배치 처리: 청킹 -> 임베딩 -> 업서트"""
all_chunks = []
all_metadata = []
for item in batch:
chunks = self.chunker.semantic_chunk(item["content"])
for chunk in chunks:
all_chunks.append(chunk)
all_metadata.append({
"doc_id": item["doc_id"],
"hash": item["hash"],
**item["metadata"]
})
# 배치 임베딩 생성
chunk_embeddings = self.embeddings.embed_documents(all_chunks)
# 벡터 저장소에 업서트
self.vector_store.upsert_documents(
documents=all_chunks,
embeddings=chunk_embeddings,
metadata=all_metadata
)
for item in batch:
self._processed_hashes.add(item["hash"])
검색 결과 캐싱
자주 반복되는 쿼리에 대해 캐싱을 적용하여 응답 속도를 향상시키고 API 비용을 절감한다.
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis
class RAGCache:
"""RAG 검색 결과 캐시"""
def __init__(self, redis_url: str = "redis://localhost:6379",
ttl_seconds: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl_seconds
def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
"""쿼리와 필터 기반 캐시 키 생성"""
key_data = {"query": query.lower().strip(), "filters": filters or {}}
key_hash = hashlib.sha256(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()
return f"rag:cache:{key_hash}"
def get(self, query: str, filters: Optional[dict] = None):
"""캐시에서 검색 결과 조회"""
key = self._cache_key(query, filters)
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
data["cache_hit"] = True
return data
return None
def set(self, query: str, results: list,
filters: Optional[dict] = None):
"""검색 결과를 캐시에 저장"""
key = self._cache_key(query, filters)
data = {
"results": results,
"cached_at": datetime.utcnow().isoformat(),
"query": query
}
self.redis.setex(key, self.ttl, json.dumps(data))
def invalidate_by_pattern(self, pattern: str):
"""패턴 매칭으로 캐시 무효화"""
keys = self.redis.keys(f"rag:cache:*{pattern}*")
if keys:
self.redis.delete(*keys)
장애 사례 및 대응
검색 품질 저하 패턴
| 장애 유형 | 증상 | 원인 | 대응 방안 |
|---|---|---|---|
| 임베딩 드리프트 | 시간 경과에 따른 검색 정확도 저하 | 임베딩 모델 버전 변경, 데이터 분포 변화 | 정기적 재인덱싱, 모델 버전 고정 |
| 청크 불일치 | 관련 정보가 검색되지 않음 | 청크 크기/전략 부적절 | 청크 크기 실험, 오버랩 조정 |
| 필터 과적합 | 결과가 0건인 쿼리 증가 | 메타데이터 필터가 너무 엄격 | 필터 폴백 전략, 필터 완화 |
| 컨텍스트 오염 | LLM이 무관한 정보로 답변 | 리랭킹 부재, top-k 과다 | 리랭킹 도입, top-k 감소 |
| 지연 시간 증가 | 응답 시간 2초 초과 | 인덱스 비최적화, 네트워크 지연 | 인덱스 튜닝, 캐싱, 비동기 처리 |
임베딩 드리프트 모니터링
import numpy as np
from datetime import datetime
from typing import List
class EmbeddingDriftMonitor:
"""임베딩 드리프트 감지 및 모니터링"""
def __init__(self, reference_embeddings: np.ndarray):
self.reference_mean = np.mean(reference_embeddings, axis=0)
self.reference_std = np.std(reference_embeddings, axis=0)
self.drift_history = []
def check_drift(self, new_embeddings: np.ndarray,
threshold: float = 0.1) -> dict:
"""새 임베딩의 드리프트 검사"""
new_mean = np.mean(new_embeddings, axis=0)
# 코사인 거리로 드리프트 측정
cosine_sim = np.dot(self.reference_mean, new_mean) / (
np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
)
drift_score = 1 - cosine_sim
# 분포 비교 (KL Divergence 근사)
distribution_shift = np.mean(
np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
/ (self.reference_std + 1e-8)
)
result = {
"drift_score": float(drift_score),
"distribution_shift": float(distribution_shift),
"is_drifted": drift_score > threshold,
"timestamp": datetime.utcnow().isoformat(),
"recommendation": (
"재인덱싱 필요" if drift_score > threshold
else "정상 범위"
)
}
self.drift_history.append(result)
return result
검색 품질 저하 시 폴백 전략
class RAGWithFallback:
"""폴백 전략이 포함된 RAG 파이프라인"""
def __init__(self, primary_retriever, fallback_retriever,
reranker, llm):
self.primary = primary_retriever
self.fallback = fallback_retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str) -> dict:
"""폴백을 포함한 질의 처리"""
# 1차: 기본 검색
primary_docs = self.primary.invoke(question)
if not primary_docs:
# 폴백 1: 필터 완화
primary_docs = self.fallback.invoke(question)
if not primary_docs:
return {
"answer": "관련 정보를 찾을 수 없습니다.",
"confidence": 0.0,
"source": "no_results"
}
# 리랭킹
reranked = self.reranker.rerank(
query=question,
documents=[d.page_content for d in primary_docs],
top_n=5
)
# 최소 관련도 검증
if reranked[0]["relevance_score"] < 0.3:
return {
"answer": "높은 신뢰도의 답변을 생성할 수 없습니다.",
"confidence": reranked[0]["relevance_score"],
"source": "low_confidence"
}
# LLM 생성
context = "\n\n".join(r["text"] for r in reranked)
answer = self.llm.invoke(
f"컨텍스트:\n{context}\n\n질문: {question}\n답변:"
).content
return {
"answer": answer,
"confidence": reranked[0]["relevance_score"],
"source": "primary",
"num_sources": len(reranked)
}
프로덕션 체크리스트
설계 단계
- 사용 사례에 따른 임베딩 모델 벤치마크 수행
- 벡터 DB 선택: 규모, 예산, 운영 역량 고려
- 청킹 전략 A/B 테스트 계획 수립
- 평가 데이터셋 (최소 100개 QA 쌍) 구축
개발 단계
- 하이브리드 검색 (BM25 + 벡터) 구현
- 리랭킹 파이프라인 통합
- 비동기 인덱싱 파이프라인 구축
- 캐싱 레이어 구현 (Redis/Memcached)
- 에러 핸들링 및 폴백 전략 구현
배포 단계
- RAGAS 메트릭 기반 품질 게이트 설정
- 임베딩 드리프트 모니터링 대시보드 구축
- 검색 지연 시간 알림 설정 (P95 기준 500ms)
- 인덱스 재구축 자동화 파이프라인 구축
- 로드 테스트 수행 (목표 QPS 대비 2배)
운영 단계
- 주간 검색 품질 리포트 생성
- 월간 임베딩 모델 업데이트 검토
- 사용자 피드백 기반 평가 데이터셋 업데이트
- 비용 모니터링 (임베딩 API 호출, 벡터 DB 스토리지)
- 정기 인덱스 재구축 (분기 1회 이상)
참고자료
RAG Pipeline Production Guide: From Vector DB Selection to Chunking, Reranking, and Evaluation
- Introduction
- RAG Architecture Overview
- Embedding Model Comparison
- Vector Database Comparison
- Chunking Strategies
- Retrieval Optimization Strategies
- Reranking
- RAG Evaluation with RAGAS
- Production Deployment Patterns
- Failure Cases and Recovery
- Production Checklist
- References

Introduction
RAG (Retrieval-Augmented Generation) is a critical architecture pattern for solving LLM hallucination problems and generating answers grounded in up-to-date information. While it may seem as simple as "search and paste into a prompt," production environments require numerous engineering decisions: embedding model selection, vector DB operations, chunking strategies, hybrid search, reranking, and evaluation frameworks.
This article analyzes the full RAG pipeline architecture, covering selection criteria and implementation methods for each component with production-ready code. We also examine common failure patterns in production, recovery strategies, and systematic evaluation methodologies using RAGAS.
RAG Architecture Overview
A RAG pipeline consists of three main stages: Retrieval, Augmentation, and Generation. Each stage can be optimized independently, and the overall pipeline quality is determined by its weakest stage.
End-to-End Architecture Flow
[User Query]
|
v
[Query Processing] <- Query decomposition, HyDE, query expansion
|
v
[Retrieval] <- BM25 + Vector search (hybrid)
|
v
[Reranking] <- Cross-encoder, Cohere Rerank
|
v
[Context Assembly] <- Chunk merging, deduplication, token limit
|
v
[Generation] <- Pass context and query to LLM
|
v
[Post-processing] <- Source attribution, confidence scores, hallucination check
Basic RAG Implementation
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# 1. Document chunking
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)
# 2. Embedding and vector store creation
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_name="production_docs"
)
# 3. Retriever configuration
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20}
)
# 4. RAG chain assembly
template = """Answer the question based on the following context.
If the answer cannot be found in the context, respond with "Insufficient information."
Context:
{context}
Question: {question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
response = rag_chain.invoke("What are the key components of a RAG pipeline?")
Embedding Model Comparison
The embedding model is one of the most critical factors determining RAG pipeline retrieval quality. Here is a comparison of major embedding models as of 2025-2026.
| Model | Dimensions | Max Tokens | MTEB Average | Multilingual | Cost | Features |
|---|---|---|---|---|---|---|
| OpenAI text-embedding-3-large | 3072 | 8191 | 64.6 | Yes | $0.13/1M tokens | Dimension reduction support |
| OpenAI text-embedding-3-small | 1536 | 8191 | 62.3 | Yes | $0.02/1M tokens | Cost-effective |
| Cohere embed-v3 | 1024 | 512 | 64.5 | Yes (100+) | $0.10/1M tokens | Search-optimized |
| BGE-M3 | 1024 | 8192 | 66.1 | Yes (100+) | Free (self-hosted) | Dense+Sparse+ColBERT |
| E5-mistral-7b-instruct | 4096 | 32768 | 66.6 | Yes | Free (self-hosted) | Instruction-based embedding |
| Jina-embeddings-v3 | 1024 | 8192 | 65.5 | Yes (89) | $0.02/1M tokens | Task-specific LoRA |
Embedding Model Selection Criteria
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
class EmbeddingBenchmark:
"""Embedding model comparison benchmark"""
def __init__(self, models: List[str]):
self.models = {}
for model_name in models:
self.models[model_name] = SentenceTransformer(model_name)
def evaluate_retrieval_quality(
self,
queries: List[str],
relevant_docs: List[List[str]],
corpus: List[str]
) -> dict:
results = {}
for name, model in self.models.items():
corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
query_embeddings = model.encode(queries, normalize_embeddings=True)
# Cosine similarity calculation
scores = np.dot(query_embeddings, corpus_embeddings.T)
# Recall@k calculation
recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)
results[name] = {
"recall@5": recall_at_5,
"recall@10": recall_at_10,
"embedding_dim": model.get_sentence_embedding_dimension(),
"encoding_speed": self._measure_speed(model, queries)
}
return results
def _compute_recall(self, scores, relevant_docs, k):
recalls = []
for i, rel_docs in enumerate(relevant_docs):
top_k_indices = np.argsort(scores[i])[-k:]
retrieved = set(top_k_indices)
relevant = set(rel_docs)
recalls.append(len(retrieved & relevant) / len(relevant))
return np.mean(recalls)
def _measure_speed(self, model, texts, n_runs=3):
import time
times = []
for _ in range(n_runs):
start = time.time()
model.encode(texts)
times.append(time.time() - start)
return np.mean(times)
Vector Database Comparison
Vector database selection directly impacts the operational complexity, cost, and performance of a RAG system. Here is a multi-dimensional comparison of major vector databases.
| Feature | Pinecone | Milvus | Weaviate | Qdrant | Chroma |
|---|---|---|---|---|---|
| Deployment | Fully managed | Self-hosted/Zilliz Cloud | Self-hosted/Cloud | Self-hosted/Cloud | In-memory/Self-hosted |
| Max Vectors | Billions | Billions | Billions | Billions | Millions |
| Search Algorithm | Proprietary | HNSW, IVF, DiskANN | HNSW | HNSW | HNSW |
| Hybrid Search | Yes (Sparse+Dense) | Yes (BM25+Dense) | Yes (BM25+Dense) | Yes (Sparse+Dense) | No |
| Multi-tenancy | Yes (namespaces) | Yes (partitions) | Yes (tenants) | Yes (collections) | Yes (collections) |
| Filtering | Metadata filters | Scalar filtering | GraphQL filters | Payload filters | Metadata filters |
| Price (1M vectors) | ~$70/month | Free (self-hosted) | Free (self-hosted) | Free (self-hosted) | Free |
| Best For | Quick prototyping, managed | Large enterprise | GraphQL ecosystem | High-perf filtering | Dev/small scale |
Production Vector Search with Qdrant
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue,
SearchParams, HnswConfigDiff
)
import uuid
class ProductionVectorStore:
"""Production vector store manager"""
def __init__(self, url: str, api_key: str, collection_name: str):
self.client = QdrantClient(url=url, api_key=api_key)
self.collection_name = collection_name
def create_collection(self, vector_size: int = 1536):
"""Create collection with optimized HNSW index"""
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE,
on_disk=True # Disk-based storage for large datasets
),
hnsw_config=HnswConfigDiff(
m=16, # Graph connectivity (default: 16)
ef_construct=128, # Search range during index build
full_scan_threshold=10000 # Full scan below this count
),
optimizers_config={
"indexing_threshold": 20000,
"memmap_threshold": 50000
}
)
def upsert_documents(self, documents: list, embeddings: list, metadata: list):
"""Batch upsert"""
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={**meta, "text": doc}
)
for doc, embedding, meta in zip(documents, embeddings, metadata)
]
# Upload in batches of 100
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch,
wait=True
)
def hybrid_search(self, query_vector: list, query_text: str,
filters: dict = None, top_k: int = 10):
"""Hybrid search (vector + metadata filter)"""
search_filter = None
if filters:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in filters.items()
]
search_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=search_filter,
limit=top_k,
search_params=SearchParams(
hnsw_ef=128, # Search range (higher = more accurate, slower)
exact=False # Use approximate search
)
)
return results
Chunking Strategies
Chunking is the preprocessing stage that has the greatest impact on RAG quality. Poor chunking can separate related information or include unnecessary noise, significantly degrading retrieval quality.
Chunking Strategy Comparison
| Strategy | Pros | Cons | Best For |
|---|---|---|---|
| Fixed-size | Simple to implement, predictable | Ignores semantic boundaries | Uniform text |
| Recursive | Respects paragraph/sentence boundaries | Requires size tuning | General documents |
| Semantic | Preserves semantic units | High computational cost | Unstructured text |
| Document-structure | Maintains heading/section hierarchy | Requires parser implementation | Markdown, HTML |
| Parent-Child | Search precision + sufficient context | 2x storage space | Technical documentation |
Semantic Chunking Implementation
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np
class AdvancedChunker:
"""Advanced chunker supporting multiple chunking strategies"""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
"""Semantic chunking - split at semantic change points"""
chunker = SemanticChunker(
self.embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=breakpoint_threshold * 100
)
return chunker.split_text(text)
def parent_child_chunk(self, text: str,
parent_size: int = 2000,
child_size: int = 400,
child_overlap: int = 50):
"""Parent-child chunking - search on children, context from parents"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Create parent chunks
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=parent_size,
chunk_overlap=0
)
parent_chunks = parent_splitter.split_text(text)
# Create child chunks from each parent
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=child_size,
chunk_overlap=child_overlap
)
result = []
for i, parent in enumerate(parent_chunks):
children = child_splitter.split_text(parent)
for child in children:
result.append({
"child_text": child,
"parent_text": parent,
"parent_id": i
})
return result
def markdown_structure_chunk(self, markdown_text: str):
"""Markdown document structure-based chunking"""
from langchain.text_splitter import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
chunks = splitter.split_text(markdown_text)
# Add hierarchy metadata to each chunk
enriched_chunks = []
for chunk in chunks:
enriched_chunks.append({
"text": chunk.page_content,
"metadata": chunk.metadata,
"hierarchy": " > ".join(
chunk.metadata.get(h, "")
for h in ["h1", "h2", "h3"]
if chunk.metadata.get(h)
)
})
return enriched_chunks
Retrieval Optimization Strategies
Simple vector similarity search alone cannot achieve production-grade retrieval quality. Various optimization techniques such as hybrid search, HyDE, and query decomposition must be combined.
Hybrid Search (BM25 + Vector)
from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple
class HybridRetriever:
"""Hybrid retriever combining BM25 + vector search"""
def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
self.documents = documents
self.embeddings = OpenAIEmbeddings(model=embeddings_model)
# Build BM25 index
tokenized_docs = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
# Build vector index
self.doc_embeddings = np.array(
self.embeddings.embed_documents(documents)
)
def search(self, query: str, top_k: int = 5,
alpha: float = 0.5) -> List[Tuple[str, float]]:
"""
Perform hybrid search
alpha: vector search weight (0=BM25 only, 1=vector only)
"""
# BM25 scores
bm25_scores = self.bm25.get_scores(query.lower().split())
bm25_scores = self._normalize_scores(bm25_scores)
# Vector similarity scores
query_embedding = np.array(self.embeddings.embed_query(query))
vector_scores = np.dot(self.doc_embeddings, query_embedding)
vector_scores = self._normalize_scores(vector_scores)
# Weighted combination (Reciprocal Rank Fusion alternative)
combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
# Return top-k results
top_indices = np.argsort(combined_scores)[-top_k:][::-1]
return [
(self.documents[i], combined_scores[i])
for i in top_indices
]
def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
"""Min-Max normalization"""
min_s, max_s = scores.min(), scores.max()
if max_s == min_s:
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
HyDE (Hypothetical Document Embedding)
HyDE generates a hypothetical answer to the query first, then uses that answer's embedding for retrieval. This bridges the semantic gap between questions and answers, improving retrieval quality.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
class HyDERetriever:
"""HyDE-based retriever"""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
self.hyde_prompt = ChatPromptTemplate.from_template(
"Write a hypothetical answer to the following question. "
"It is more important to include relevant keywords and concepts "
"than to be factually accurate.\n\n"
"Question: {question}\n\n"
"Hypothetical answer:"
)
def retrieve(self, query: str, top_k: int = 5):
"""Perform HyDE retrieval"""
# 1. Generate hypothetical answer
chain = self.hyde_prompt | self.llm
hypothetical_answer = chain.invoke({"question": query}).content
# 2. Create embedding from hypothetical answer
hyde_embedding = self.embeddings.embed_query(hypothetical_answer)
# 3. Search using hypothetical answer embedding
results = self.vectorstore.similarity_search_by_vector(
hyde_embedding, k=top_k
)
return results
Query Decomposition
This strategy breaks complex questions into sub-questions, retrieves results for each, then aggregates them.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json
class QueryDecomposer:
"""Decompose complex queries into sub-queries"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.decompose_prompt = ChatPromptTemplate.from_template(
"Decompose the following question into 2-4 sub-questions "
"optimized for retrieval.\n"
"Return as a JSON array.\n\n"
"Original question: {question}\n\n"
"Sub-questions:"
)
def decompose(self, question: str) -> list:
"""Decompose query"""
chain = self.decompose_prompt | self.llm
result = chain.invoke({"question": question}).content
try:
sub_queries = json.loads(result)
except json.JSONDecodeError:
sub_queries = [question]
return sub_queries
def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
"""Retrieve with decomposed queries and merge results"""
sub_queries = self.decompose(question)
all_docs = []
seen_contents = set()
for sub_query in sub_queries:
docs = retriever.invoke(sub_query)[:top_k]
for doc in docs:
if doc.page_content not in seen_contents:
seen_contents.add(doc.page_content)
all_docs.append(doc)
return all_docs
Reranking
Reranking re-evaluates initial search results with a more precise model and reorders them. Cross-encoder-based reranking provides more accurate relevance judgments than bi-encoder-based vector search.
Using Cohere Rerank
import cohere
from typing import List, Dict
class CohereReranker:
"""Reranking using Cohere Rerank API"""
def __init__(self, api_key: str, model: str = "rerank-v3.5"):
self.client = cohere.Client(api_key)
self.model = model
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""Rerank documents"""
response = self.client.rerank(
model=self.model,
query=query,
documents=documents,
top_n=top_n,
return_documents=True
)
results = []
for hit in response.results:
results.append({
"text": hit.document.text,
"relevance_score": hit.relevance_score,
"index": hit.index
})
return results
class CrossEncoderReranker:
"""Local cross-encoder model-based reranking"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""Rerank with cross-encoder"""
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
# Sort by score
scored_docs = list(zip(documents, scores, range(len(documents))))
scored_docs.sort(key=lambda x: x[1], reverse=True)
results = []
for text, score, idx in scored_docs[:top_n]:
results.append({
"text": text,
"relevance_score": float(score),
"index": idx
})
return results
Integrated Reranking Pipeline
class RAGPipelineWithReranking:
"""RAG pipeline with integrated reranking"""
def __init__(self, retriever, reranker, llm):
self.retriever = retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str, top_k_retrieve: int = 20,
top_k_rerank: int = 5) -> str:
"""Retrieve -> Rerank -> Generate"""
# Stage 1: Initial retrieval (broad)
initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
doc_texts = [doc.page_content for doc in initial_docs]
# Stage 2: Reranking (precise)
reranked = self.reranker.rerank(
query=question,
documents=doc_texts,
top_n=top_k_rerank
)
# Stage 3: Context assembly
context = "\n\n---\n\n".join(
r["text"] for r in reranked
)
# Stage 4: LLM generation
prompt = f"""Answer the question based on the following context.
Cite the source of each piece of information.
Context:
{context}
Question: {question}
Answer:"""
response = self.llm.invoke(prompt)
return response.content
RAG Evaluation with RAGAS
RAGAS (Retrieval Augmented Generation Assessment) is a framework for automatically evaluating RAG pipelines. It measures retrieval quality and generation quality with separate metrics.
Core RAGAS Metrics
| Metric | Measures | Description | Target |
|---|---|---|---|
| Context Precision | Retrieval | Proportion of retrieved contexts that are relevant | 0.8+ |
| Context Recall | Retrieval | Proportion of needed information included in context | 0.9+ |
| Faithfulness | Generation | Degree to which the answer is grounded in context | 0.9+ |
| Answer Relevancy | Generation | How well the answer addresses the question | 0.8+ |
| Answer Correctness | Overall | Accuracy compared to ground truth | 0.7+ |
RAGAS Evaluation Implementation
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
)
from datasets import Dataset
class RAGEvaluator:
"""RAGAS-based RAG evaluator"""
def __init__(self):
self.metrics = [
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
]
def create_eval_dataset(self, eval_samples: list) -> Dataset:
"""Create evaluation dataset"""
data = {
"question": [],
"answer": [],
"contexts": [],
"ground_truth": []
}
for sample in eval_samples:
data["question"].append(sample["question"])
data["answer"].append(sample["generated_answer"])
data["contexts"].append(sample["retrieved_contexts"])
data["ground_truth"].append(sample["expected_answer"])
return Dataset.from_dict(data)
def evaluate(self, eval_samples: list) -> dict:
"""Perform RAG pipeline evaluation"""
dataset = self.create_eval_dataset(eval_samples)
result = evaluate(
dataset=dataset,
metrics=self.metrics
)
return {
"context_precision": result["context_precision"],
"context_recall": result["context_recall"],
"faithfulness": result["faithfulness"],
"answer_relevancy": result["answer_relevancy"],
"answer_correctness": result["answer_correctness"],
"overall_score": sum(result.values()) / len(result)
}
def generate_report(self, results: dict) -> str:
"""Generate evaluation report"""
report = "=== RAG Pipeline Evaluation Report ===\n\n"
for metric, score in results.items():
status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
report += f" {metric}: {score:.4f} [{status}]\n"
report += f"\n Overall: {results.get('overall_score', 0):.4f}\n"
return report
Production Deployment Patterns
Async Indexing Pipeline
In production, document indexing and search serving must be separated. When new documents are added, embeddings are generated asynchronously and the index is updated.
import asyncio
from datetime import datetime
from typing import Optional
import hashlib
class AsyncIndexingPipeline:
"""Async document indexing pipeline"""
def __init__(self, embeddings, vector_store, chunker):
self.embeddings = embeddings
self.vector_store = vector_store
self.chunker = chunker
self.index_queue = asyncio.Queue()
self._processed_hashes = set()
async def enqueue_document(self, doc_id: str, content: str,
metadata: Optional[dict] = None):
"""Add document to indexing queue"""
doc_hash = hashlib.md5(content.encode()).hexdigest()
if doc_hash in self._processed_hashes:
return {"status": "skipped", "reason": "duplicate"}
await self.index_queue.put({
"doc_id": doc_id,
"content": content,
"metadata": metadata or {},
"hash": doc_hash,
"enqueued_at": datetime.utcnow().isoformat()
})
return {"status": "enqueued", "queue_size": self.index_queue.qsize()}
async def process_queue(self, batch_size: int = 10):
"""Process documents from queue in batches"""
while True:
batch = []
try:
for _ in range(batch_size):
item = self.index_queue.get_nowait()
batch.append(item)
except asyncio.QueueEmpty:
pass
if batch:
await self._process_batch(batch)
else:
await asyncio.sleep(1)
async def _process_batch(self, batch: list):
"""Batch processing: chunk -> embed -> upsert"""
all_chunks = []
all_metadata = []
for item in batch:
chunks = self.chunker.semantic_chunk(item["content"])
for chunk in chunks:
all_chunks.append(chunk)
all_metadata.append({
"doc_id": item["doc_id"],
"hash": item["hash"],
**item["metadata"]
})
# Batch embedding generation
chunk_embeddings = self.embeddings.embed_documents(all_chunks)
# Upsert to vector store
self.vector_store.upsert_documents(
documents=all_chunks,
embeddings=chunk_embeddings,
metadata=all_metadata
)
for item in batch:
self._processed_hashes.add(item["hash"])
Search Result Caching
Apply caching for frequently repeated queries to improve response time and reduce API costs.
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis
class RAGCache:
"""RAG search result cache"""
def __init__(self, redis_url: str = "redis://localhost:6379",
ttl_seconds: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl_seconds
def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
"""Generate cache key from query and filters"""
key_data = {"query": query.lower().strip(), "filters": filters or {}}
key_hash = hashlib.sha256(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()
return f"rag:cache:{key_hash}"
def get(self, query: str, filters: Optional[dict] = None):
"""Retrieve search results from cache"""
key = self._cache_key(query, filters)
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
data["cache_hit"] = True
return data
return None
def set(self, query: str, results: list,
filters: Optional[dict] = None):
"""Store search results in cache"""
key = self._cache_key(query, filters)
data = {
"results": results,
"cached_at": datetime.utcnow().isoformat(),
"query": query
}
self.redis.setex(key, self.ttl, json.dumps(data))
def invalidate_by_pattern(self, pattern: str):
"""Invalidate cache by pattern matching"""
keys = self.redis.keys(f"rag:cache:*{pattern}*")
if keys:
self.redis.delete(*keys)
Failure Cases and Recovery
Retrieval Quality Degradation Patterns
| Failure Type | Symptoms | Cause | Mitigation |
|---|---|---|---|
| Embedding Drift | Gradual accuracy degradation over time | Embedding model version change, data distribution shift | Periodic re-indexing, model version pinning |
| Chunk Mismatch | Relevant information not retrieved | Inappropriate chunk size/strategy | Chunk size experimentation, overlap adjustment |
| Filter Overfit | Increasing zero-result queries | Overly strict metadata filters | Filter fallback strategy, filter relaxation |
| Context Pollution | LLM answers with irrelevant information | Missing reranking, excessive top-k | Introduce reranking, reduce top-k |
| Latency Spike | Response time exceeds 2 seconds | Non-optimized index, network latency | Index tuning, caching, async processing |
Embedding Drift Monitoring
import numpy as np
from datetime import datetime
from typing import List
class EmbeddingDriftMonitor:
"""Embedding drift detection and monitoring"""
def __init__(self, reference_embeddings: np.ndarray):
self.reference_mean = np.mean(reference_embeddings, axis=0)
self.reference_std = np.std(reference_embeddings, axis=0)
self.drift_history = []
def check_drift(self, new_embeddings: np.ndarray,
threshold: float = 0.1) -> dict:
"""Check drift of new embeddings"""
new_mean = np.mean(new_embeddings, axis=0)
# Measure drift via cosine distance
cosine_sim = np.dot(self.reference_mean, new_mean) / (
np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
)
drift_score = 1 - cosine_sim
# Distribution comparison (KL Divergence approximation)
distribution_shift = np.mean(
np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
/ (self.reference_std + 1e-8)
)
result = {
"drift_score": float(drift_score),
"distribution_shift": float(distribution_shift),
"is_drifted": drift_score > threshold,
"timestamp": datetime.utcnow().isoformat(),
"recommendation": (
"Re-indexing required" if drift_score > threshold
else "Within normal range"
)
}
self.drift_history.append(result)
return result
Fallback Strategy for Quality Degradation
class RAGWithFallback:
"""RAG pipeline with fallback strategies"""
def __init__(self, primary_retriever, fallback_retriever,
reranker, llm):
self.primary = primary_retriever
self.fallback = fallback_retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str) -> dict:
"""Query processing with fallback"""
# Primary retrieval
primary_docs = self.primary.invoke(question)
if not primary_docs:
# Fallback 1: Relaxed filters
primary_docs = self.fallback.invoke(question)
if not primary_docs:
return {
"answer": "No relevant information found.",
"confidence": 0.0,
"source": "no_results"
}
# Reranking
reranked = self.reranker.rerank(
query=question,
documents=[d.page_content for d in primary_docs],
top_n=5
)
# Minimum relevance validation
if reranked[0]["relevance_score"] < 0.3:
return {
"answer": "Unable to generate a high-confidence answer.",
"confidence": reranked[0]["relevance_score"],
"source": "low_confidence"
}
# LLM generation
context = "\n\n".join(r["text"] for r in reranked)
answer = self.llm.invoke(
f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
).content
return {
"answer": answer,
"confidence": reranked[0]["relevance_score"],
"source": "primary",
"num_sources": len(reranked)
}
Production Checklist
Design Phase
- Benchmark embedding models for your specific use case
- Select vector DB based on scale, budget, and operational capability
- Plan A/B testing for chunking strategies
- Build evaluation dataset (minimum 100 QA pairs)
Development Phase
- Implement hybrid search (BM25 + vector)
- Integrate reranking pipeline
- Build async indexing pipeline
- Implement caching layer (Redis/Memcached)
- Implement error handling and fallback strategies
Deployment Phase
- Set quality gates based on RAGAS metrics
- Build embedding drift monitoring dashboard
- Set search latency alerts (P95 target: 500ms)
- Build automated index rebuild pipeline
- Perform load testing (2x target QPS)
Operations Phase
- Generate weekly retrieval quality reports
- Monthly embedding model update review
- Update evaluation dataset based on user feedback
- Monitor costs (embedding API calls, vector DB storage)
- Periodic index rebuilds (at least quarterly)