Skip to content

필사 모드: RAG 파이프라인 프로덕션 구축 가이드: 벡터 DB 선택부터 청킹·리랭킹·평가까지

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

RAG(Retrieval-Augmented Generation)는 LLM의 환각(hallucination) 문제를 해결하고 최신 정보를 활용한 답변을 생성하기 위한 핵심 아키텍처 패턴이다. 단순히 "검색해서 프롬프트에 넣기"로 보일 수 있지만, 프로덕션 환경에서는 임베딩 모델 선택, 벡터 DB 운영, 청킹 전략, 하이브리드 검색, 리랭킹, 평가 체계까지 수많은 엔지니어링 결정이 필요하다.

이 글에서는 RAG 파이프라인의 전체 아키텍처를 분석하고, 각 구성 요소의 선택 기준과 구현 방법을 실전 코드와 함께 다룬다. 특히 프로덕션에서 자주 발생하는 장애 패턴과 그 대응 전략, 그리고 RAGAS를 활용한 체계적 평가 방법론까지 포괄적으로 살펴본다.

RAG 아키텍처 개요

RAG 파이프라인은 크게 세 단계로 구성된다: Retrieval(검색), Augmentation(증강), Generation(생성). 각 단계는 독립적으로 최적화할 수 있으며, 전체 파이프라인의 품질은 가장 약한 단계에 의해 결정된다.

전체 아키텍처 흐름

[사용자 질의]

[Query Processing] ← 쿼리 분해, HyDE, 쿼리 확장

[Retrieval] ← BM25 + 벡터 검색 (하이브리드)

[Reranking] ← Cross-encoder, Cohere Rerank

[Context Assembly] ← 청크 병합, 중복 제거, 토큰 제한

[Generation] ← LLM에 컨텍스트와 질의 전달

[Post-processing] ← 출처 표시, 신뢰도 점수, 환각 검증

기본 RAG 구현

from langchain_openai import ChatOpenAI, OpenAIEmbeddings

from langchain_community.vectorstores import Chroma

from langchain.text_splitter import RecursiveCharacterTextSplitter

from langchain_core.prompts import ChatPromptTemplate

from langchain_core.runnables import RunnablePassthrough

from langchain_core.output_parsers import StrOutputParser

1. 문서 청킹

text_splitter = RecursiveCharacterTextSplitter(

chunk_size=1000,

chunk_overlap=200,

separators=["\n\n", "\n", ".", " "]

)

chunks = text_splitter.split_documents(documents)

2. 임베딩 및 벡터 저장소 생성

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

vectorstore = Chroma.from_documents(

documents=chunks,

embedding=embeddings,

collection_name="production_docs"

)

3. Retriever 설정

retriever = vectorstore.as_retriever(

search_type="mmr",

search_kwargs={"k": 5, "fetch_k": 20}

)

4. RAG 체인 구성

template = """다음 컨텍스트를 기반으로 질문에 답변하세요.

컨텍스트에서 답을 찾을 수 없다면 "정보가 부족합니다"라고 답하세요.

컨텍스트:

{context}

질문: {question}

답변:"""

prompt = ChatPromptTemplate.from_template(template)

llm = ChatOpenAI(model="gpt-4o", temperature=0)

rag_chain = (

{"context": retriever, "question": RunnablePassthrough()}

| prompt

| llm

| StrOutputParser()

)

response = rag_chain.invoke("RAG 파이프라인의 핵심 구성요소는?")

임베딩 모델 비교

임베딩 모델은 RAG 파이프라인의 검색 품질을 결정하는 가장 중요한 요소 중 하나다. 2025-2026년 현재 주요 임베딩 모델들의 특성을 비교한다.

| 모델 | 차원 | 최대 토큰 | MTEB 평균 | 다국어 | 비용 | 특징 |

| ----------------------------- | ---- | --------- | --------- | -------- | ------------------ | -------------------- |

| OpenAI text-embedding-3-large | 3072 | 8191 | 64.6 | O | \$0.13/1M tokens | 차원 축소 가능 |

| OpenAI text-embedding-3-small | 1536 | 8191 | 62.3 | O | \$0.02/1M tokens | 비용 효율적 |

| Cohere embed-v3 | 1024 | 512 | 64.5 | O (100+) | \$0.10/1M tokens | 검색 특화 |

| BGE-M3 | 1024 | 8192 | 66.1 | O (100+) | 무료 (자체 호스팅) | Dense+Sparse+ColBERT |

| E5-mistral-7b-instruct | 4096 | 32768 | 66.6 | O | 무료 (자체 호스팅) | 지시 기반 임베딩 |

| Jina-embeddings-v3 | 1024 | 8192 | 65.5 | O (89) | \$0.02/1M tokens | Task-specific LoRA |

임베딩 모델 선택 기준

from sentence_transformers import SentenceTransformer

from typing import List

class EmbeddingBenchmark:

"""임베딩 모델 비교 벤치마크"""

def __init__(self, models: List[str]):

self.models = {}

for model_name in models:

self.models[model_name] = SentenceTransformer(model_name)

def evaluate_retrieval_quality(

self,

queries: List[str],

relevant_docs: List[List[str]],

corpus: List[str]

) -> dict:

results = {}

for name, model in self.models.items():

corpus_embeddings = model.encode(corpus, normalize_embeddings=True)

query_embeddings = model.encode(queries, normalize_embeddings=True)

코사인 유사도 계산

scores = np.dot(query_embeddings, corpus_embeddings.T)

Recall@k 계산

recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)

recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)

results[name] = {

"recall@5": recall_at_5,

"recall@10": recall_at_10,

"embedding_dim": model.get_sentence_embedding_dimension(),

"encoding_speed": self._measure_speed(model, queries)

}

return results

def _compute_recall(self, scores, relevant_docs, k):

recalls = []

for i, rel_docs in enumerate(relevant_docs):

top_k_indices = np.argsort(scores[i])[-k:]

retrieved = set(top_k_indices)

relevant = set(rel_docs)

recalls.append(len(retrieved & relevant) / len(relevant))

return np.mean(recalls)

def _measure_speed(self, model, texts, n_runs=3):

times = []

for _ in range(n_runs):

start = time.time()

model.encode(texts)

times.append(time.time() - start)

return np.mean(times)

벡터 데이터베이스 비교

벡터 데이터베이스 선택은 RAG 시스템의 운영 복잡도, 비용, 성능에 직접적인 영향을 미친다. 주요 벡터 DB를 다각도로 비교한다.

| 항목 | Pinecone | Milvus | Weaviate | Qdrant | Chroma |

| ----------------- | ---------------------------- | ------------------------ | ------------------ | ------------------ | -------------------- |

| 배포 방식 | 완전 관리형 | 자체 호스팅/Zilliz Cloud | 자체 호스팅/Cloud | 자체 호스팅/Cloud | 인메모리/자체 호스팅 |

| 최대 벡터 수 | 수십억 | 수십억 | 수십억 | 수십억 | 수백만 |

| 검색 알고리즘 | 자체 엔진 | HNSW, IVF, DiskANN | HNSW | HNSW | HNSW |

| 하이브리드 검색 | O (Sparse+Dense) | O (BM25+Dense) | O (BM25+Dense) | O (Sparse+Dense) | X |

| 멀티테넌시 | O (네임스페이스) | O (파티션) | O (테넌트) | O (컬렉션) | O (컬렉션) |

| 필터링 | 메타데이터 필터 | 스칼라 필터링 | GraphQL 필터 | 페이로드 필터 | 메타데이터 필터 |

| 가격 (100만 벡터) | 약 \$70/월 | 무료 (자체 호스팅) | 무료 (자체 호스팅) | 무료 (자체 호스팅) | 무료 |

| 적합 용도 | 빠른 프로토타입, 관리형 선호 | 대규모 엔터프라이즈 | GraphQL 생태계 | 고성능 필터링 | 개발/소규모 |

Qdrant를 활용한 프로덕션 벡터 검색

from qdrant_client import QdrantClient

from qdrant_client.models import (

Distance, VectorParams, PointStruct,

Filter, FieldCondition, MatchValue,

SearchParams, HnswConfigDiff

)

class ProductionVectorStore:

"""프로덕션 벡터 저장소 관리자"""

def __init__(self, url: str, api_key: str, collection_name: str):

self.client = QdrantClient(url=url, api_key=api_key)

self.collection_name = collection_name

def create_collection(self, vector_size: int = 1536):

"""HNSW 인덱스 최적화된 컬렉션 생성"""

self.client.create_collection(

collection_name=self.collection_name,

vectors_config=VectorParams(

size=vector_size,

distance=Distance.COSINE,

on_disk=True # 대규모 데이터셋을 위한 디스크 기반 저장

),

hnsw_config=HnswConfigDiff(

m=16, # 그래프 연결 수 (기본: 16)

ef_construct=128, # 인덱스 빌드 시 탐색 범위

full_scan_threshold=10000 # 이 수 이하면 전체 스캔

),

optimizers_config={

"indexing_threshold": 20000,

"memmap_threshold": 50000

}

)

def upsert_documents(self, documents: list, embeddings: list, metadata: list):

"""배치 업서트"""

points = [

PointStruct(

id=str(uuid.uuid4()),

vector=embedding,

payload={**meta, "text": doc}

)

for doc, embedding, meta in zip(documents, embeddings, metadata)

]

배치 크기 100으로 분할 업로드

batch_size = 100

for i in range(0, len(points), batch_size):

batch = points[i:i + batch_size]

self.client.upsert(

collection_name=self.collection_name,

points=batch,

wait=True

)

def hybrid_search(self, query_vector: list, query_text: str,

filters: dict = None, top_k: int = 10):

"""하이브리드 검색 (벡터 + 메타데이터 필터)"""

search_filter = None

if filters:

conditions = [

FieldCondition(key=k, match=MatchValue(value=v))

for k, v in filters.items()

]

search_filter = Filter(must=conditions)

results = self.client.search(

collection_name=self.collection_name,

query_vector=query_vector,

query_filter=search_filter,

limit=top_k,

search_params=SearchParams(

hnsw_ef=128, # 검색 시 탐색 범위 (높을수록 정확, 느림)

exact=False # 근사 검색 사용

)

)

return results

청킹 전략

청킹은 RAG 품질에 가장 큰 영향을 미치는 전처리 단계다. 잘못된 청킹은 관련 정보를 분리시키거나, 불필요한 노이즈를 포함시켜 검색 품질을 크게 저하시킨다.

청킹 전략 비교

| 전략 | 장점 | 단점 | 적합 용도 |

| ------------------------ | ------------------------- | ------------------- | ---------------- |

| 고정 크기 (Fixed-size) | 구현 간단, 예측 가능 | 의미 단위 무시 | 균일한 텍스트 |

| 재귀적 (Recursive) | 문단/문장 경계 존중 | 최적 크기 튜닝 필요 | 범용 문서 |

| 시맨틱 (Semantic) | 의미 단위 보존 | 계산 비용 높음 | 구조 없는 텍스트 |

| 문서 구조 기반 | 제목/섹션 계층 유지 | 파서 구현 필요 | 마크다운, HTML |

| 부모-자식 (Parent-Child) | 검색 정밀도 + 충분한 맥락 | 저장 공간 2배 | 기술 문서 |

시맨틱 청킹 구현

from langchain_experimental.text_splitter import SemanticChunker

from langchain_openai import OpenAIEmbeddings

class AdvancedChunker:

"""다양한 청킹 전략을 지원하는 고급 청커"""

def __init__(self, embedding_model: str = "text-embedding-3-small"):

self.embeddings = OpenAIEmbeddings(model=embedding_model)

def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):

"""시맨틱 청킹 - 의미 변화 지점에서 분할"""

chunker = SemanticChunker(

self.embeddings,

breakpoint_threshold_type="percentile",

breakpoint_threshold_amount=breakpoint_threshold * 100

)

return chunker.split_text(text)

def parent_child_chunk(self, text: str,

parent_size: int = 2000,

child_size: int = 400,

child_overlap: int = 50):

"""부모-자식 청킹 - 검색은 자식, 컨텍스트는 부모"""

from langchain.text_splitter import RecursiveCharacterTextSplitter

부모 청크 생성

parent_splitter = RecursiveCharacterTextSplitter(

chunk_size=parent_size,

chunk_overlap=0

)

parent_chunks = parent_splitter.split_text(text)

각 부모에서 자식 청크 생성

child_splitter = RecursiveCharacterTextSplitter(

chunk_size=child_size,

chunk_overlap=child_overlap

)

result = []

for i, parent in enumerate(parent_chunks):

children = child_splitter.split_text(parent)

for child in children:

result.append({

"child_text": child,

"parent_text": parent,

"parent_id": i

})

return result

def markdown_structure_chunk(self, markdown_text: str):

"""마크다운 문서 구조 기반 청킹"""

from langchain.text_splitter import MarkdownHeaderTextSplitter

headers_to_split_on = [

("#", "h1"),

("##", "h2"),

("###", "h3"),

]

splitter = MarkdownHeaderTextSplitter(

headers_to_split_on=headers_to_split_on

)

chunks = splitter.split_text(markdown_text)

각 청크에 계층 구조 메타데이터 추가

enriched_chunks = []

for chunk in chunks:

enriched_chunks.append({

"text": chunk.page_content,

"metadata": chunk.metadata,

"hierarchy": " > ".join(

chunk.metadata.get(h, "")

for h in ["h1", "h2", "h3"]

if chunk.metadata.get(h)

)

})

return enriched_chunks

검색 최적화 전략

단순 벡터 유사도 검색만으로는 프로덕션 수준의 검색 품질을 달성하기 어렵다. 하이브리드 검색, HyDE, 쿼리 분해 등 다양한 최적화 기법을 조합해야 한다.

하이브리드 검색 (BM25 + 벡터)

from rank_bm25 import BM25Okapi

from langchain_openai import OpenAIEmbeddings

from typing import List, Tuple

class HybridRetriever:

"""BM25 + 벡터 검색을 결합한 하이브리드 검색기"""

def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):

self.documents = documents

self.embeddings = OpenAIEmbeddings(model=embeddings_model)

BM25 인덱스 구축

tokenized_docs = [doc.lower().split() for doc in documents]

self.bm25 = BM25Okapi(tokenized_docs)

벡터 인덱스 구축

self.doc_embeddings = np.array(

self.embeddings.embed_documents(documents)

)

def search(self, query: str, top_k: int = 5,

alpha: float = 0.5) -> List[Tuple[str, float]]:

"""

하이브리드 검색 수행

alpha: 벡터 검색 가중치 (0=BM25만, 1=벡터만)

"""

BM25 점수

bm25_scores = self.bm25.get_scores(query.lower().split())

bm25_scores = self._normalize_scores(bm25_scores)

벡터 유사도 점수

query_embedding = np.array(self.embeddings.embed_query(query))

vector_scores = np.dot(self.doc_embeddings, query_embedding)

vector_scores = self._normalize_scores(vector_scores)

가중 결합 (Reciprocal Rank Fusion 대안)

combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores

상위 k개 반환

top_indices = np.argsort(combined_scores)[-top_k:][::-1]

return [

(self.documents[i], combined_scores[i])

for i in top_indices

]

def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:

"""Min-Max 정규화"""

min_s, max_s = scores.min(), scores.max()

if max_s == min_s:

return np.zeros_like(scores)

return (scores - min_s) / (max_s - min_s)

HyDE (Hypothetical Document Embedding)

HyDE는 쿼리에 대한 가상의 답변을 먼저 생성한 후, 그 답변의 임베딩을 검색에 사용하는 기법이다. 질문과 답변 사이의 의미적 차이를 줄여 검색 품질을 향상시킨다.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings

from langchain_core.prompts import ChatPromptTemplate

class HyDERetriever:

"""HyDE 기반 검색기"""

def __init__(self, vectorstore):

self.vectorstore = vectorstore

self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

self.hyde_prompt = ChatPromptTemplate.from_template(

"다음 질문에 대해 가상의 답변을 작성해주세요. "

"실제 정확성보다는 관련 키워드와 개념을 포함하는 것이 중요합니다.\n\n"

"질문: {question}\n\n"

"가상 답변:"

)

def retrieve(self, query: str, top_k: int = 5):

"""HyDE 검색 수행"""

1. 가상 답변 생성

chain = self.hyde_prompt | self.llm

hypothetical_answer = chain.invoke({"question": query}).content

2. 가상 답변의 임베딩 생성

hyde_embedding = self.embeddings.embed_query(hypothetical_answer)

3. 가상 답변 임베딩으로 검색

results = self.vectorstore.similarity_search_by_vector(

hyde_embedding, k=top_k

)

return results

쿼리 분해 (Query Decomposition)

복잡한 질문을 하위 질문들로 분해하여 각각 검색한 후 결과를 종합하는 전략이다.

from langchain_openai import ChatOpenAI

from langchain_core.prompts import ChatPromptTemplate

class QueryDecomposer:

"""복잡한 쿼리를 하위 쿼리로 분해"""

def __init__(self):

self.llm = ChatOpenAI(model="gpt-4o", temperature=0)

self.decompose_prompt = ChatPromptTemplate.from_template(

"다음 질문을 검색에 최적화된 2-4개의 하위 질문으로 분해하세요.\n"

"JSON 배열 형태로 반환하세요.\n\n"

"원래 질문: {question}\n\n"

"하위 질문들:"

)

def decompose(self, question: str) -> list:

"""쿼리 분해"""

chain = self.decompose_prompt | self.llm

result = chain.invoke({"question": question}).content

try:

sub_queries = json.loads(result)

except json.JSONDecodeError:

sub_queries = [question]

return sub_queries

def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):

"""분해된 쿼리로 검색 후 결과 병합"""

sub_queries = self.decompose(question)

all_docs = []

seen_contents = set()

for sub_query in sub_queries:

docs = retriever.invoke(sub_query)[:top_k]

for doc in docs:

if doc.page_content not in seen_contents:

seen_contents.add(doc.page_content)

all_docs.append(doc)

return all_docs

리랭킹 (Reranking)

초기 검색 결과를 더 정밀한 모델로 재평가하여 순위를 재정렬하는 단계다. Bi-encoder 기반의 벡터 검색보다 Cross-encoder 기반 리랭킹이 더 정확한 관련성 판단을 수행한다.

Cohere Rerank 활용

from typing import List, Dict

class CohereReranker:

"""Cohere Rerank API를 활용한 리랭킹"""

def __init__(self, api_key: str, model: str = "rerank-v3.5"):

self.client = cohere.Client(api_key)

self.model = model

def rerank(self, query: str, documents: List[str],

top_n: int = 5) -> List[Dict]:

"""문서 리랭킹"""

response = self.client.rerank(

model=self.model,

query=query,

documents=documents,

top_n=top_n,

return_documents=True

)

results = []

for hit in response.results:

results.append({

"text": hit.document.text,

"relevance_score": hit.relevance_score,

"index": hit.index

})

return results

class CrossEncoderReranker:

"""로컬 Cross-Encoder 모델 기반 리랭킹"""

def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):

from sentence_transformers import CrossEncoder

self.model = CrossEncoder(model_name)

def rerank(self, query: str, documents: List[str],

top_n: int = 5) -> List[Dict]:

"""Cross-encoder로 리랭킹"""

pairs = [[query, doc] for doc in documents]

scores = self.model.predict(pairs)

점수 기준 정렬

scored_docs = list(zip(documents, scores, range(len(documents))))

scored_docs.sort(key=lambda x: x[1], reverse=True)

results = []

for text, score, idx in scored_docs[:top_n]:

results.append({

"text": text,

"relevance_score": float(score),

"index": idx

})

return results

리랭킹 파이프라인 통합

class RAGPipelineWithReranking:

"""리랭킹이 통합된 RAG 파이프라인"""

def __init__(self, retriever, reranker, llm):

self.retriever = retriever

self.reranker = reranker

self.llm = llm

def query(self, question: str, top_k_retrieve: int = 20,

top_k_rerank: int = 5) -> str:

"""검색 -> 리랭킹 -> 생성"""

1단계: 초기 검색 (넓게)

initial_docs = self.retriever.invoke(question)[:top_k_retrieve]

doc_texts = [doc.page_content for doc in initial_docs]

2단계: 리랭킹 (정밀)

reranked = self.reranker.rerank(

query=question,

documents=doc_texts,

top_n=top_k_rerank

)

3단계: 컨텍스트 조립

context = "\n\n---\n\n".join(

r["text"] for r in reranked

)

4단계: LLM 생성

prompt = f"""다음 컨텍스트를 기반으로 질문에 답변하세요.

각 정보의 출처를 표시하세요.

컨텍스트:

{context}

질문: {question}

답변:"""

response = self.llm.invoke(prompt)

return response.content

RAGAS를 활용한 RAG 평가

RAGAS(Retrieval Augmented Generation Assessment)는 RAG 파이프라인을 자동으로 평가하는 프레임워크다. 검색 품질과 생성 품질을 별도의 메트릭으로 측정한다.

RAGAS 핵심 메트릭

| 메트릭 | 측정 대상 | 설명 | 이상적 값 |

| ------------------ | --------- | ------------------------------------ | --------- |

| Context Precision | 검색 | 검색된 컨텍스트 중 관련 있는 비율 | 0.8 이상 |

| Context Recall | 검색 | 필요한 정보가 컨텍스트에 포함된 비율 | 0.9 이상 |

| Faithfulness | 생성 | 답변이 컨텍스트에 근거한 정도 | 0.9 이상 |

| Answer Relevancy | 생성 | 답변이 질문에 적합한 정도 | 0.8 이상 |

| Answer Correctness | 전체 | 정답과 비교한 정확도 | 0.7 이상 |

RAGAS 평가 구현

from ragas import evaluate

from ragas.metrics import (

context_precision,

context_recall,

faithfulness,

answer_relevancy,

answer_correctness

)

from datasets import Dataset

class RAGEvaluator:

"""RAGAS 기반 RAG 평가기"""

def __init__(self):

self.metrics = [

context_precision,

context_recall,

faithfulness,

answer_relevancy,

answer_correctness

]

def create_eval_dataset(self, eval_samples: list) -> Dataset:

"""평가 데이터셋 생성"""

data = {

"question": [],

"answer": [],

"contexts": [],

"ground_truth": []

}

for sample in eval_samples:

data["question"].append(sample["question"])

data["answer"].append(sample["generated_answer"])

data["contexts"].append(sample["retrieved_contexts"])

data["ground_truth"].append(sample["expected_answer"])

return Dataset.from_dict(data)

def evaluate(self, eval_samples: list) -> dict:

"""RAG 파이프라인 평가 수행"""

dataset = self.create_eval_dataset(eval_samples)

result = evaluate(

dataset=dataset,

metrics=self.metrics

)

return {

"context_precision": result["context_precision"],

"context_recall": result["context_recall"],

"faithfulness": result["faithfulness"],

"answer_relevancy": result["answer_relevancy"],

"answer_correctness": result["answer_correctness"],

"overall_score": sum(result.values()) / len(result)

}

def generate_report(self, results: dict) -> str:

"""평가 리포트 생성"""

report = "=== RAG Pipeline Evaluation Report ===\n\n"

for metric, score in results.items():

status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"

report += f" {metric}: {score:.4f} [{status}]\n"

report += f"\n Overall: {results.get('overall_score', 0):.4f}\n"

return report

프로덕션 배포 패턴

비동기 인덱싱 파이프라인

프로덕션 환경에서는 문서 인덱싱과 검색 서빙을 분리해야 한다. 새 문서가 추가될 때 비동기로 임베딩을 생성하고 인덱스를 업데이트한다.

from datetime import datetime

from typing import Optional

class AsyncIndexingPipeline:

"""비동기 문서 인덱싱 파이프라인"""

def __init__(self, embeddings, vector_store, chunker):

self.embeddings = embeddings

self.vector_store = vector_store

self.chunker = chunker

self.index_queue = asyncio.Queue()

self._processed_hashes = set()

async def enqueue_document(self, doc_id: str, content: str,

metadata: Optional[dict] = None):

"""문서를 인덱싱 큐에 추가"""

doc_hash = hashlib.md5(content.encode()).hexdigest()

if doc_hash in self._processed_hashes:

return {"status": "skipped", "reason": "duplicate"}

await self.index_queue.put({

"doc_id": doc_id,

"content": content,

"metadata": metadata or {},

"hash": doc_hash,

"enqueued_at": datetime.utcnow().isoformat()

})

return {"status": "enqueued", "queue_size": self.index_queue.qsize()}

async def process_queue(self, batch_size: int = 10):

"""큐에서 배치 단위로 문서 처리"""

while True:

batch = []

try:

for _ in range(batch_size):

item = self.index_queue.get_nowait()

batch.append(item)

except asyncio.QueueEmpty:

pass

if batch:

await self._process_batch(batch)

else:

await asyncio.sleep(1)

async def _process_batch(self, batch: list):

"""배치 처리: 청킹 -> 임베딩 -> 업서트"""

all_chunks = []

all_metadata = []

for item in batch:

chunks = self.chunker.semantic_chunk(item["content"])

for chunk in chunks:

all_chunks.append(chunk)

all_metadata.append({

"doc_id": item["doc_id"],

"hash": item["hash"],

**item["metadata"]

})

배치 임베딩 생성

chunk_embeddings = self.embeddings.embed_documents(all_chunks)

벡터 저장소에 업서트

self.vector_store.upsert_documents(

documents=all_chunks,

embeddings=chunk_embeddings,

metadata=all_metadata

)

for item in batch:

self._processed_hashes.add(item["hash"])

검색 결과 캐싱

자주 반복되는 쿼리에 대해 캐싱을 적용하여 응답 속도를 향상시키고 API 비용을 절감한다.

from datetime import datetime, timedelta

from typing import Optional

class RAGCache:

"""RAG 검색 결과 캐시"""

def __init__(self, redis_url: str = "redis://localhost:6379",

ttl_seconds: int = 3600):

self.redis = redis.from_url(redis_url)

self.ttl = ttl_seconds

def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:

"""쿼리와 필터 기반 캐시 키 생성"""

key_data = {"query": query.lower().strip(), "filters": filters or {}}

key_hash = hashlib.sha256(

json.dumps(key_data, sort_keys=True).encode()

).hexdigest()

return f"rag:cache:{key_hash}"

def get(self, query: str, filters: Optional[dict] = None):

"""캐시에서 검색 결과 조회"""

key = self._cache_key(query, filters)

cached = self.redis.get(key)

if cached:

data = json.loads(cached)

data["cache_hit"] = True

return data

return None

def set(self, query: str, results: list,

filters: Optional[dict] = None):

"""검색 결과를 캐시에 저장"""

key = self._cache_key(query, filters)

data = {

"results": results,

"cached_at": datetime.utcnow().isoformat(),

"query": query

}

self.redis.setex(key, self.ttl, json.dumps(data))

def invalidate_by_pattern(self, pattern: str):

"""패턴 매칭으로 캐시 무효화"""

keys = self.redis.keys(f"rag:cache:*{pattern}*")

if keys:

self.redis.delete(*keys)

장애 사례 및 대응

검색 품질 저하 패턴

| 장애 유형 | 증상 | 원인 | 대응 방안 |

| --------------- | --------------------------------- | --------------------------------------- | ------------------------------- |

| 임베딩 드리프트 | 시간 경과에 따른 검색 정확도 저하 | 임베딩 모델 버전 변경, 데이터 분포 변화 | 정기적 재인덱싱, 모델 버전 고정 |

| 청크 불일치 | 관련 정보가 검색되지 않음 | 청크 크기/전략 부적절 | 청크 크기 실험, 오버랩 조정 |

| 필터 과적합 | 결과가 0건인 쿼리 증가 | 메타데이터 필터가 너무 엄격 | 필터 폴백 전략, 필터 완화 |

| 컨텍스트 오염 | LLM이 무관한 정보로 답변 | 리랭킹 부재, top-k 과다 | 리랭킹 도입, top-k 감소 |

| 지연 시간 증가 | 응답 시간 2초 초과 | 인덱스 비최적화, 네트워크 지연 | 인덱스 튜닝, 캐싱, 비동기 처리 |

임베딩 드리프트 모니터링

from datetime import datetime

from typing import List

class EmbeddingDriftMonitor:

"""임베딩 드리프트 감지 및 모니터링"""

def __init__(self, reference_embeddings: np.ndarray):

self.reference_mean = np.mean(reference_embeddings, axis=0)

self.reference_std = np.std(reference_embeddings, axis=0)

self.drift_history = []

def check_drift(self, new_embeddings: np.ndarray,

threshold: float = 0.1) -> dict:

"""새 임베딩의 드리프트 검사"""

new_mean = np.mean(new_embeddings, axis=0)

코사인 거리로 드리프트 측정

cosine_sim = np.dot(self.reference_mean, new_mean) / (

np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)

)

drift_score = 1 - cosine_sim

분포 비교 (KL Divergence 근사)

distribution_shift = np.mean(

np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)

/ (self.reference_std + 1e-8)

)

result = {

"drift_score": float(drift_score),

"distribution_shift": float(distribution_shift),

"is_drifted": drift_score > threshold,

"timestamp": datetime.utcnow().isoformat(),

"recommendation": (

"재인덱싱 필요" if drift_score > threshold

else "정상 범위"

)

}

self.drift_history.append(result)

return result

검색 품질 저하 시 폴백 전략

class RAGWithFallback:

"""폴백 전략이 포함된 RAG 파이프라인"""

def __init__(self, primary_retriever, fallback_retriever,

reranker, llm):

self.primary = primary_retriever

self.fallback = fallback_retriever

self.reranker = reranker

self.llm = llm

def query(self, question: str) -> dict:

"""폴백을 포함한 질의 처리"""

1차: 기본 검색

primary_docs = self.primary.invoke(question)

if not primary_docs:

폴백 1: 필터 완화

primary_docs = self.fallback.invoke(question)

if not primary_docs:

return {

"answer": "관련 정보를 찾을 수 없습니다.",

"confidence": 0.0,

"source": "no_results"

}

리랭킹

reranked = self.reranker.rerank(

query=question,

documents=[d.page_content for d in primary_docs],

top_n=5

)

최소 관련도 검증

if reranked[0]["relevance_score"] < 0.3:

return {

"answer": "높은 신뢰도의 답변을 생성할 수 없습니다.",

"confidence": reranked[0]["relevance_score"],

"source": "low_confidence"

}

LLM 생성

context = "\n\n".join(r["text"] for r in reranked)

answer = self.llm.invoke(

f"컨텍스트:\n{context}\n\n질문: {question}\n답변:"

).content

return {

"answer": answer,

"confidence": reranked[0]["relevance_score"],

"source": "primary",

"num_sources": len(reranked)

}

프로덕션 체크리스트

설계 단계

- 사용 사례에 따른 임베딩 모델 벤치마크 수행

- 벡터 DB 선택: 규모, 예산, 운영 역량 고려

- 청킹 전략 A/B 테스트 계획 수립

- 평가 데이터셋 (최소 100개 QA 쌍) 구축

개발 단계

- 하이브리드 검색 (BM25 + 벡터) 구현

- 리랭킹 파이프라인 통합

- 비동기 인덱싱 파이프라인 구축

- 캐싱 레이어 구현 (Redis/Memcached)

- 에러 핸들링 및 폴백 전략 구현

배포 단계

- RAGAS 메트릭 기반 품질 게이트 설정

- 임베딩 드리프트 모니터링 대시보드 구축

- 검색 지연 시간 알림 설정 (P95 기준 500ms)

- 인덱스 재구축 자동화 파이프라인 구축

- 로드 테스트 수행 (목표 QPS 대비 2배)

운영 단계

- 주간 검색 품질 리포트 생성

- 월간 임베딩 모델 업데이트 검토

- 사용자 피드백 기반 평가 데이터셋 업데이트

- 비용 모니터링 (임베딩 API 호출, 벡터 DB 스토리지)

- 정기 인덱스 재구축 (분기 1회 이상)

참고자료

- [Pinecone: Retrieval Augmented Generation](https://www.pinecone.io/learn/retrieval-augmented-generation/)

- [LlamaIndex Documentation](https://docs.llamaindex.ai/en/stable/)

- [LangChain RAG Tutorial](https://python.langchain.com/docs/tutorials/rag/)

- [Hugging Face Blog: RAG with Ray](https://huggingface.co/blog/ray-rag)

- [RAGAS: Automated Evaluation of RAG (arXiv 2312.10997)](https://arxiv.org/abs/2312.10997)

현재 단락 (1/696)

RAG(Retrieval-Augmented Generation)는 LLM의 환각(hallucination) 문제를 해결하고 최신 정보를 활용한 답변을 생성하기 위한 핵심 아키텍처 ...

작성 글자: 0원문 글자: 21,073작성 단락: 0/696