- Published on
RAG 파이프라인 프로덕션 구축 가이드: 벡터 DB 선택부터 청킹·리랭킹·평가까지
- Authors
- Name
- 들어가며
- RAG 아키텍처 개요
- 임베딩 모델 비교
- 벡터 데이터베이스 비교
- 청킹 전략
- 검색 최적화 전략
- 리랭킹 (Reranking)
- RAGAS를 활용한 RAG 평가
- 프로덕션 배포 패턴
- 장애 사례 및 대응
- 프로덕션 체크리스트
- 참고자료

들어가며
RAG(Retrieval-Augmented Generation)는 LLM의 환각(hallucination) 문제를 해결하고 최신 정보를 활용한 답변을 생성하기 위한 핵심 아키텍처 패턴이다. 단순히 "검색해서 프롬프트에 넣기"로 보일 수 있지만, 프로덕션 환경에서는 임베딩 모델 선택, 벡터 DB 운영, 청킹 전략, 하이브리드 검색, 리랭킹, 평가 체계까지 수많은 엔지니어링 결정이 필요하다.
이 글에서는 RAG 파이프라인의 전체 아키텍처를 분석하고, 각 구성 요소의 선택 기준과 구현 방법을 실전 코드와 함께 다룬다. 특히 프로덕션에서 자주 발생하는 장애 패턴과 그 대응 전략, 그리고 RAGAS를 활용한 체계적 평가 방법론까지 포괄적으로 살펴본다.
RAG 아키텍처 개요
RAG 파이프라인은 크게 세 단계로 구성된다: Retrieval(검색), Augmentation(증강), Generation(생성). 각 단계는 독립적으로 최적화할 수 있으며, 전체 파이프라인의 품질은 가장 약한 단계에 의해 결정된다.
전체 아키텍처 흐름
[사용자 질의]
│
▼
[Query Processing] ← 쿼리 분해, HyDE, 쿼리 확장
│
▼
[Retrieval] ← BM25 + 벡터 검색 (하이브리드)
│
▼
[Reranking] ← Cross-encoder, Cohere Rerank
│
▼
[Context Assembly] ← 청크 병합, 중복 제거, 토큰 제한
│
▼
[Generation] ← LLM에 컨텍스트와 질의 전달
│
▼
[Post-processing] ← 출처 표시, 신뢰도 점수, 환각 검증
기본 RAG 구현
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# 1. 문서 청킹
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)
# 2. 임베딩 및 벡터 저장소 생성
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_name="production_docs"
)
# 3. Retriever 설정
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20}
)
# 4. RAG 체인 구성
template = """다음 컨텍스트를 기반으로 질문에 답변하세요.
컨텍스트에서 답을 찾을 수 없다면 "정보가 부족합니다"라고 답하세요.
컨텍스트:
{context}
질문: {question}
답변:"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
response = rag_chain.invoke("RAG 파이프라인의 핵심 구성요소는?")
임베딩 모델 비교
임베딩 모델은 RAG 파이프라인의 검색 품질을 결정하는 가장 중요한 요소 중 하나다. 2025-2026년 현재 주요 임베딩 모델들의 특성을 비교한다.
| 모델 | 차원 | 최대 토큰 | MTEB 평균 | 다국어 | 비용 | 특징 |
|---|---|---|---|---|---|---|
| OpenAI text-embedding-3-large | 3072 | 8191 | 64.6 | O | $0.13/1M tokens | 차원 축소 가능 |
| OpenAI text-embedding-3-small | 1536 | 8191 | 62.3 | O | $0.02/1M tokens | 비용 효율적 |
| Cohere embed-v3 | 1024 | 512 | 64.5 | O (100+) | $0.10/1M tokens | 검색 특화 |
| BGE-M3 | 1024 | 8192 | 66.1 | O (100+) | 무료 (자체 호스팅) | Dense+Sparse+ColBERT |
| E5-mistral-7b-instruct | 4096 | 32768 | 66.6 | O | 무료 (자체 호스팅) | 지시 기반 임베딩 |
| Jina-embeddings-v3 | 1024 | 8192 | 65.5 | O (89) | $0.02/1M tokens | Task-specific LoRA |
임베딩 모델 선택 기준
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
class EmbeddingBenchmark:
"""임베딩 모델 비교 벤치마크"""
def __init__(self, models: List[str]):
self.models = {}
for model_name in models:
self.models[model_name] = SentenceTransformer(model_name)
def evaluate_retrieval_quality(
self,
queries: List[str],
relevant_docs: List[List[str]],
corpus: List[str]
) -> dict:
results = {}
for name, model in self.models.items():
corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
query_embeddings = model.encode(queries, normalize_embeddings=True)
# 코사인 유사도 계산
scores = np.dot(query_embeddings, corpus_embeddings.T)
# Recall@k 계산
recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)
results[name] = {
"recall@5": recall_at_5,
"recall@10": recall_at_10,
"embedding_dim": model.get_sentence_embedding_dimension(),
"encoding_speed": self._measure_speed(model, queries)
}
return results
def _compute_recall(self, scores, relevant_docs, k):
recalls = []
for i, rel_docs in enumerate(relevant_docs):
top_k_indices = np.argsort(scores[i])[-k:]
retrieved = set(top_k_indices)
relevant = set(rel_docs)
recalls.append(len(retrieved & relevant) / len(relevant))
return np.mean(recalls)
def _measure_speed(self, model, texts, n_runs=3):
import time
times = []
for _ in range(n_runs):
start = time.time()
model.encode(texts)
times.append(time.time() - start)
return np.mean(times)
벡터 데이터베이스 비교
벡터 데이터베이스 선택은 RAG 시스템의 운영 복잡도, 비용, 성능에 직접적인 영향을 미친다. 주요 벡터 DB를 다각도로 비교한다.
| 항목 | Pinecone | Milvus | Weaviate | Qdrant | Chroma |
|---|---|---|---|---|---|
| 배포 방식 | 완전 관리형 | 자체 호스팅/Zilliz Cloud | 자체 호스팅/Cloud | 자체 호스팅/Cloud | 인메모리/자체 호스팅 |
| 최대 벡터 수 | 수십억 | 수십억 | 수십억 | 수십억 | 수백만 |
| 검색 알고리즘 | 자체 엔진 | HNSW, IVF, DiskANN | HNSW | HNSW | HNSW |
| 하이브리드 검색 | O (Sparse+Dense) | O (BM25+Dense) | O (BM25+Dense) | O (Sparse+Dense) | X |
| 멀티테넌시 | O (네임스페이스) | O (파티션) | O (테넌트) | O (컬렉션) | O (컬렉션) |
| 필터링 | 메타데이터 필터 | 스칼라 필터링 | GraphQL 필터 | 페이로드 필터 | 메타데이터 필터 |
| 가격 (100만 벡터) | 약 $70/월 | 무료 (자체 호스팅) | 무료 (자체 호스팅) | 무료 (자체 호스팅) | 무료 |
| 적합 용도 | 빠른 프로토타입, 관리형 선호 | 대규모 엔터프라이즈 | GraphQL 생태계 | 고성능 필터링 | 개발/소규모 |
Qdrant를 활용한 프로덕션 벡터 검색
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct,
Filter, FieldCondition, MatchValue,
SearchParams, HnswConfigDiff
)
import uuid
class ProductionVectorStore:
"""프로덕션 벡터 저장소 관리자"""
def __init__(self, url: str, api_key: str, collection_name: str):
self.client = QdrantClient(url=url, api_key=api_key)
self.collection_name = collection_name
def create_collection(self, vector_size: int = 1536):
"""HNSW 인덱스 최적화된 컬렉션 생성"""
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE,
on_disk=True # 대규모 데이터셋을 위한 디스크 기반 저장
),
hnsw_config=HnswConfigDiff(
m=16, # 그래프 연결 수 (기본: 16)
ef_construct=128, # 인덱스 빌드 시 탐색 범위
full_scan_threshold=10000 # 이 수 이하면 전체 스캔
),
optimizers_config={
"indexing_threshold": 20000,
"memmap_threshold": 50000
}
)
def upsert_documents(self, documents: list, embeddings: list, metadata: list):
"""배치 업서트"""
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={**meta, "text": doc}
)
for doc, embedding, meta in zip(documents, embeddings, metadata)
]
# 배치 크기 100으로 분할 업로드
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch,
wait=True
)
def hybrid_search(self, query_vector: list, query_text: str,
filters: dict = None, top_k: int = 10):
"""하이브리드 검색 (벡터 + 메타데이터 필터)"""
search_filter = None
if filters:
conditions = [
FieldCondition(key=k, match=MatchValue(value=v))
for k, v in filters.items()
]
search_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=search_filter,
limit=top_k,
search_params=SearchParams(
hnsw_ef=128, # 검색 시 탐색 범위 (높을수록 정확, 느림)
exact=False # 근사 검색 사용
)
)
return results
청킹 전략
청킹은 RAG 품질에 가장 큰 영향을 미치는 전처리 단계다. 잘못된 청킹은 관련 정보를 분리시키거나, 불필요한 노이즈를 포함시켜 검색 품질을 크게 저하시킨다.
청킹 전략 비교
| 전략 | 장점 | 단점 | 적합 용도 |
|---|---|---|---|
| 고정 크기 (Fixed-size) | 구현 간단, 예측 가능 | 의미 단위 무시 | 균일한 텍스트 |
| 재귀적 (Recursive) | 문단/문장 경계 존중 | 최적 크기 튜닝 필요 | 범용 문서 |
| 시맨틱 (Semantic) | 의미 단위 보존 | 계산 비용 높음 | 구조 없는 텍스트 |
| 문서 구조 기반 | 제목/섹션 계층 유지 | 파서 구현 필요 | 마크다운, HTML |
| 부모-자식 (Parent-Child) | 검색 정밀도 + 충분한 맥락 | 저장 공간 2배 | 기술 문서 |
시맨틱 청킹 구현
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np
class AdvancedChunker:
"""다양한 청킹 전략을 지원하는 고급 청커"""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
"""시맨틱 청킹 - 의미 변화 지점에서 분할"""
chunker = SemanticChunker(
self.embeddings,
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=breakpoint_threshold * 100
)
return chunker.split_text(text)
def parent_child_chunk(self, text: str,
parent_size: int = 2000,
child_size: int = 400,
child_overlap: int = 50):
"""부모-자식 청킹 - 검색은 자식, 컨텍스트는 부모"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 부모 청크 생성
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=parent_size,
chunk_overlap=0
)
parent_chunks = parent_splitter.split_text(text)
# 각 부모에서 자식 청크 생성
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=child_size,
chunk_overlap=child_overlap
)
result = []
for i, parent in enumerate(parent_chunks):
children = child_splitter.split_text(parent)
for child in children:
result.append({
"child_text": child,
"parent_text": parent,
"parent_id": i
})
return result
def markdown_structure_chunk(self, markdown_text: str):
"""마크다운 문서 구조 기반 청킹"""
from langchain.text_splitter import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
chunks = splitter.split_text(markdown_text)
# 각 청크에 계층 구조 메타데이터 추가
enriched_chunks = []
for chunk in chunks:
enriched_chunks.append({
"text": chunk.page_content,
"metadata": chunk.metadata,
"hierarchy": " > ".join(
chunk.metadata.get(h, "")
for h in ["h1", "h2", "h3"]
if chunk.metadata.get(h)
)
})
return enriched_chunks
검색 최적화 전략
단순 벡터 유사도 검색만으로는 프로덕션 수준의 검색 품질을 달성하기 어렵다. 하이브리드 검색, HyDE, 쿼리 분해 등 다양한 최적화 기법을 조합해야 한다.
하이브리드 검색 (BM25 + 벡터)
from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple
class HybridRetriever:
"""BM25 + 벡터 검색을 결합한 하이브리드 검색기"""
def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
self.documents = documents
self.embeddings = OpenAIEmbeddings(model=embeddings_model)
# BM25 인덱스 구축
tokenized_docs = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
# 벡터 인덱스 구축
self.doc_embeddings = np.array(
self.embeddings.embed_documents(documents)
)
def search(self, query: str, top_k: int = 5,
alpha: float = 0.5) -> List[Tuple[str, float]]:
"""
하이브리드 검색 수행
alpha: 벡터 검색 가중치 (0=BM25만, 1=벡터만)
"""
# BM25 점수
bm25_scores = self.bm25.get_scores(query.lower().split())
bm25_scores = self._normalize_scores(bm25_scores)
# 벡터 유사도 점수
query_embedding = np.array(self.embeddings.embed_query(query))
vector_scores = np.dot(self.doc_embeddings, query_embedding)
vector_scores = self._normalize_scores(vector_scores)
# 가중 결합 (Reciprocal Rank Fusion 대안)
combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
# 상위 k개 반환
top_indices = np.argsort(combined_scores)[-top_k:][::-1]
return [
(self.documents[i], combined_scores[i])
for i in top_indices
]
def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
"""Min-Max 정규화"""
min_s, max_s = scores.min(), scores.max()
if max_s == min_s:
return np.zeros_like(scores)
return (scores - min_s) / (max_s - min_s)
HyDE (Hypothetical Document Embedding)
HyDE는 쿼리에 대한 가상의 답변을 먼저 생성한 후, 그 답변의 임베딩을 검색에 사용하는 기법이다. 질문과 답변 사이의 의미적 차이를 줄여 검색 품질을 향상시킨다.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
class HyDERetriever:
"""HyDE 기반 검색기"""
def __init__(self, vectorstore):
self.vectorstore = vectorstore
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
self.hyde_prompt = ChatPromptTemplate.from_template(
"다음 질문에 대해 가상의 답변을 작성해주세요. "
"실제 정확성보다는 관련 키워드와 개념을 포함하는 것이 중요합니다.\n\n"
"질문: {question}\n\n"
"가상 답변:"
)
def retrieve(self, query: str, top_k: int = 5):
"""HyDE 검색 수행"""
# 1. 가상 답변 생성
chain = self.hyde_prompt | self.llm
hypothetical_answer = chain.invoke({"question": query}).content
# 2. 가상 답변의 임베딩 생성
hyde_embedding = self.embeddings.embed_query(hypothetical_answer)
# 3. 가상 답변 임베딩으로 검색
results = self.vectorstore.similarity_search_by_vector(
hyde_embedding, k=top_k
)
return results
쿼리 분해 (Query Decomposition)
복잡한 질문을 하위 질문들로 분해하여 각각 검색한 후 결과를 종합하는 전략이다.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json
class QueryDecomposer:
"""복잡한 쿼리를 하위 쿼리로 분해"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
self.decompose_prompt = ChatPromptTemplate.from_template(
"다음 질문을 검색에 최적화된 2-4개의 하위 질문으로 분해하세요.\n"
"JSON 배열 형태로 반환하세요.\n\n"
"원래 질문: {question}\n\n"
"하위 질문들:"
)
def decompose(self, question: str) -> list:
"""쿼리 분해"""
chain = self.decompose_prompt | self.llm
result = chain.invoke({"question": question}).content
try:
sub_queries = json.loads(result)
except json.JSONDecodeError:
sub_queries = [question]
return sub_queries
def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
"""분해된 쿼리로 검색 후 결과 병합"""
sub_queries = self.decompose(question)
all_docs = []
seen_contents = set()
for sub_query in sub_queries:
docs = retriever.invoke(sub_query)[:top_k]
for doc in docs:
if doc.page_content not in seen_contents:
seen_contents.add(doc.page_content)
all_docs.append(doc)
return all_docs
리랭킹 (Reranking)
초기 검색 결과를 더 정밀한 모델로 재평가하여 순위를 재정렬하는 단계다. Bi-encoder 기반의 벡터 검색보다 Cross-encoder 기반 리랭킹이 더 정확한 관련성 판단을 수행한다.
Cohere Rerank 활용
import cohere
from typing import List, Dict
class CohereReranker:
"""Cohere Rerank API를 활용한 리랭킹"""
def __init__(self, api_key: str, model: str = "rerank-v3.5"):
self.client = cohere.Client(api_key)
self.model = model
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""문서 리랭킹"""
response = self.client.rerank(
model=self.model,
query=query,
documents=documents,
top_n=top_n,
return_documents=True
)
results = []
for hit in response.results:
results.append({
"text": hit.document.text,
"relevance_score": hit.relevance_score,
"index": hit.index
})
return results
class CrossEncoderReranker:
"""로컬 Cross-Encoder 모델 기반 리랭킹"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: List[str],
top_n: int = 5) -> List[Dict]:
"""Cross-encoder로 리랭킹"""
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
# 점수 기준 정렬
scored_docs = list(zip(documents, scores, range(len(documents))))
scored_docs.sort(key=lambda x: x[1], reverse=True)
results = []
for text, score, idx in scored_docs[:top_n]:
results.append({
"text": text,
"relevance_score": float(score),
"index": idx
})
return results
리랭킹 파이프라인 통합
class RAGPipelineWithReranking:
"""리랭킹이 통합된 RAG 파이프라인"""
def __init__(self, retriever, reranker, llm):
self.retriever = retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str, top_k_retrieve: int = 20,
top_k_rerank: int = 5) -> str:
"""검색 -> 리랭킹 -> 생성"""
# 1단계: 초기 검색 (넓게)
initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
doc_texts = [doc.page_content for doc in initial_docs]
# 2단계: 리랭킹 (정밀)
reranked = self.reranker.rerank(
query=question,
documents=doc_texts,
top_n=top_k_rerank
)
# 3단계: 컨텍스트 조립
context = "\n\n---\n\n".join(
r["text"] for r in reranked
)
# 4단계: LLM 생성
prompt = f"""다음 컨텍스트를 기반으로 질문에 답변하세요.
각 정보의 출처를 표시하세요.
컨텍스트:
{context}
질문: {question}
답변:"""
response = self.llm.invoke(prompt)
return response.content
RAGAS를 활용한 RAG 평가
RAGAS(Retrieval Augmented Generation Assessment)는 RAG 파이프라인을 자동으로 평가하는 프레임워크다. 검색 품질과 생성 품질을 별도의 메트릭으로 측정한다.
RAGAS 핵심 메트릭
| 메트릭 | 측정 대상 | 설명 | 이상적 값 |
|---|---|---|---|
| Context Precision | 검색 | 검색된 컨텍스트 중 관련 있는 비율 | 0.8 이상 |
| Context Recall | 검색 | 필요한 정보가 컨텍스트에 포함된 비율 | 0.9 이상 |
| Faithfulness | 생성 | 답변이 컨텍스트에 근거한 정도 | 0.9 이상 |
| Answer Relevancy | 생성 | 답변이 질문에 적합한 정도 | 0.8 이상 |
| Answer Correctness | 전체 | 정답과 비교한 정확도 | 0.7 이상 |
RAGAS 평가 구현
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
)
from datasets import Dataset
class RAGEvaluator:
"""RAGAS 기반 RAG 평가기"""
def __init__(self):
self.metrics = [
context_precision,
context_recall,
faithfulness,
answer_relevancy,
answer_correctness
]
def create_eval_dataset(self, eval_samples: list) -> Dataset:
"""평가 데이터셋 생성"""
data = {
"question": [],
"answer": [],
"contexts": [],
"ground_truth": []
}
for sample in eval_samples:
data["question"].append(sample["question"])
data["answer"].append(sample["generated_answer"])
data["contexts"].append(sample["retrieved_contexts"])
data["ground_truth"].append(sample["expected_answer"])
return Dataset.from_dict(data)
def evaluate(self, eval_samples: list) -> dict:
"""RAG 파이프라인 평가 수행"""
dataset = self.create_eval_dataset(eval_samples)
result = evaluate(
dataset=dataset,
metrics=self.metrics
)
return {
"context_precision": result["context_precision"],
"context_recall": result["context_recall"],
"faithfulness": result["faithfulness"],
"answer_relevancy": result["answer_relevancy"],
"answer_correctness": result["answer_correctness"],
"overall_score": sum(result.values()) / len(result)
}
def generate_report(self, results: dict) -> str:
"""평가 리포트 생성"""
report = "=== RAG Pipeline Evaluation Report ===\n\n"
for metric, score in results.items():
status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
report += f" {metric}: {score:.4f} [{status}]\n"
report += f"\n Overall: {results.get('overall_score', 0):.4f}\n"
return report
프로덕션 배포 패턴
비동기 인덱싱 파이프라인
프로덕션 환경에서는 문서 인덱싱과 검색 서빙을 분리해야 한다. 새 문서가 추가될 때 비동기로 임베딩을 생성하고 인덱스를 업데이트한다.
import asyncio
from datetime import datetime
from typing import Optional
import hashlib
class AsyncIndexingPipeline:
"""비동기 문서 인덱싱 파이프라인"""
def __init__(self, embeddings, vector_store, chunker):
self.embeddings = embeddings
self.vector_store = vector_store
self.chunker = chunker
self.index_queue = asyncio.Queue()
self._processed_hashes = set()
async def enqueue_document(self, doc_id: str, content: str,
metadata: Optional[dict] = None):
"""문서를 인덱싱 큐에 추가"""
doc_hash = hashlib.md5(content.encode()).hexdigest()
if doc_hash in self._processed_hashes:
return {"status": "skipped", "reason": "duplicate"}
await self.index_queue.put({
"doc_id": doc_id,
"content": content,
"metadata": metadata or {},
"hash": doc_hash,
"enqueued_at": datetime.utcnow().isoformat()
})
return {"status": "enqueued", "queue_size": self.index_queue.qsize()}
async def process_queue(self, batch_size: int = 10):
"""큐에서 배치 단위로 문서 처리"""
while True:
batch = []
try:
for _ in range(batch_size):
item = self.index_queue.get_nowait()
batch.append(item)
except asyncio.QueueEmpty:
pass
if batch:
await self._process_batch(batch)
else:
await asyncio.sleep(1)
async def _process_batch(self, batch: list):
"""배치 처리: 청킹 -> 임베딩 -> 업서트"""
all_chunks = []
all_metadata = []
for item in batch:
chunks = self.chunker.semantic_chunk(item["content"])
for chunk in chunks:
all_chunks.append(chunk)
all_metadata.append({
"doc_id": item["doc_id"],
"hash": item["hash"],
**item["metadata"]
})
# 배치 임베딩 생성
chunk_embeddings = self.embeddings.embed_documents(all_chunks)
# 벡터 저장소에 업서트
self.vector_store.upsert_documents(
documents=all_chunks,
embeddings=chunk_embeddings,
metadata=all_metadata
)
for item in batch:
self._processed_hashes.add(item["hash"])
검색 결과 캐싱
자주 반복되는 쿼리에 대해 캐싱을 적용하여 응답 속도를 향상시키고 API 비용을 절감한다.
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis
class RAGCache:
"""RAG 검색 결과 캐시"""
def __init__(self, redis_url: str = "redis://localhost:6379",
ttl_seconds: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl_seconds
def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
"""쿼리와 필터 기반 캐시 키 생성"""
key_data = {"query": query.lower().strip(), "filters": filters or {}}
key_hash = hashlib.sha256(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()
return f"rag:cache:{key_hash}"
def get(self, query: str, filters: Optional[dict] = None):
"""캐시에서 검색 결과 조회"""
key = self._cache_key(query, filters)
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
data["cache_hit"] = True
return data
return None
def set(self, query: str, results: list,
filters: Optional[dict] = None):
"""검색 결과를 캐시에 저장"""
key = self._cache_key(query, filters)
data = {
"results": results,
"cached_at": datetime.utcnow().isoformat(),
"query": query
}
self.redis.setex(key, self.ttl, json.dumps(data))
def invalidate_by_pattern(self, pattern: str):
"""패턴 매칭으로 캐시 무효화"""
keys = self.redis.keys(f"rag:cache:*{pattern}*")
if keys:
self.redis.delete(*keys)
장애 사례 및 대응
검색 품질 저하 패턴
| 장애 유형 | 증상 | 원인 | 대응 방안 |
|---|---|---|---|
| 임베딩 드리프트 | 시간 경과에 따른 검색 정확도 저하 | 임베딩 모델 버전 변경, 데이터 분포 변화 | 정기적 재인덱싱, 모델 버전 고정 |
| 청크 불일치 | 관련 정보가 검색되지 않음 | 청크 크기/전략 부적절 | 청크 크기 실험, 오버랩 조정 |
| 필터 과적합 | 결과가 0건인 쿼리 증가 | 메타데이터 필터가 너무 엄격 | 필터 폴백 전략, 필터 완화 |
| 컨텍스트 오염 | LLM이 무관한 정보로 답변 | 리랭킹 부재, top-k 과다 | 리랭킹 도입, top-k 감소 |
| 지연 시간 증가 | 응답 시간 2초 초과 | 인덱스 비최적화, 네트워크 지연 | 인덱스 튜닝, 캐싱, 비동기 처리 |
임베딩 드리프트 모니터링
import numpy as np
from datetime import datetime
from typing import List
class EmbeddingDriftMonitor:
"""임베딩 드리프트 감지 및 모니터링"""
def __init__(self, reference_embeddings: np.ndarray):
self.reference_mean = np.mean(reference_embeddings, axis=0)
self.reference_std = np.std(reference_embeddings, axis=0)
self.drift_history = []
def check_drift(self, new_embeddings: np.ndarray,
threshold: float = 0.1) -> dict:
"""새 임베딩의 드리프트 검사"""
new_mean = np.mean(new_embeddings, axis=0)
# 코사인 거리로 드리프트 측정
cosine_sim = np.dot(self.reference_mean, new_mean) / (
np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
)
drift_score = 1 - cosine_sim
# 분포 비교 (KL Divergence 근사)
distribution_shift = np.mean(
np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
/ (self.reference_std + 1e-8)
)
result = {
"drift_score": float(drift_score),
"distribution_shift": float(distribution_shift),
"is_drifted": drift_score > threshold,
"timestamp": datetime.utcnow().isoformat(),
"recommendation": (
"재인덱싱 필요" if drift_score > threshold
else "정상 범위"
)
}
self.drift_history.append(result)
return result
검색 품질 저하 시 폴백 전략
class RAGWithFallback:
"""폴백 전략이 포함된 RAG 파이프라인"""
def __init__(self, primary_retriever, fallback_retriever,
reranker, llm):
self.primary = primary_retriever
self.fallback = fallback_retriever
self.reranker = reranker
self.llm = llm
def query(self, question: str) -> dict:
"""폴백을 포함한 질의 처리"""
# 1차: 기본 검색
primary_docs = self.primary.invoke(question)
if not primary_docs:
# 폴백 1: 필터 완화
primary_docs = self.fallback.invoke(question)
if not primary_docs:
return {
"answer": "관련 정보를 찾을 수 없습니다.",
"confidence": 0.0,
"source": "no_results"
}
# 리랭킹
reranked = self.reranker.rerank(
query=question,
documents=[d.page_content for d in primary_docs],
top_n=5
)
# 최소 관련도 검증
if reranked[0]["relevance_score"] < 0.3:
return {
"answer": "높은 신뢰도의 답변을 생성할 수 없습니다.",
"confidence": reranked[0]["relevance_score"],
"source": "low_confidence"
}
# LLM 생성
context = "\n\n".join(r["text"] for r in reranked)
answer = self.llm.invoke(
f"컨텍스트:\n{context}\n\n질문: {question}\n답변:"
).content
return {
"answer": answer,
"confidence": reranked[0]["relevance_score"],
"source": "primary",
"num_sources": len(reranked)
}
프로덕션 체크리스트
설계 단계
- 사용 사례에 따른 임베딩 모델 벤치마크 수행
- 벡터 DB 선택: 규모, 예산, 운영 역량 고려
- 청킹 전략 A/B 테스트 계획 수립
- 평가 데이터셋 (최소 100개 QA 쌍) 구축
개발 단계
- 하이브리드 검색 (BM25 + 벡터) 구현
- 리랭킹 파이프라인 통합
- 비동기 인덱싱 파이프라인 구축
- 캐싱 레이어 구현 (Redis/Memcached)
- 에러 핸들링 및 폴백 전략 구현
배포 단계
- RAGAS 메트릭 기반 품질 게이트 설정
- 임베딩 드리프트 모니터링 대시보드 구축
- 검색 지연 시간 알림 설정 (P95 기준 500ms)
- 인덱스 재구축 자동화 파이프라인 구축
- 로드 테스트 수행 (목표 QPS 대비 2배)
운영 단계
- 주간 검색 품질 리포트 생성
- 월간 임베딩 모델 업데이트 검토
- 사용자 피드백 기반 평가 데이터셋 업데이트
- 비용 모니터링 (임베딩 API 호출, 벡터 DB 스토리지)
- 정기 인덱스 재구축 (분기 1회 이상)