Skip to content
Published on

RAG 파이프라인 프로덕션 구축 가이드: 벡터 DB 선택부터 청킹·리랭킹·평가까지

Authors
  • Name
    Twitter
RAG Pipeline Production Guide

들어가며

RAG(Retrieval-Augmented Generation)는 LLM의 환각(hallucination) 문제를 해결하고 최신 정보를 활용한 답변을 생성하기 위한 핵심 아키텍처 패턴이다. 단순히 "검색해서 프롬프트에 넣기"로 보일 수 있지만, 프로덕션 환경에서는 임베딩 모델 선택, 벡터 DB 운영, 청킹 전략, 하이브리드 검색, 리랭킹, 평가 체계까지 수많은 엔지니어링 결정이 필요하다.

이 글에서는 RAG 파이프라인의 전체 아키텍처를 분석하고, 각 구성 요소의 선택 기준과 구현 방법을 실전 코드와 함께 다룬다. 특히 프로덕션에서 자주 발생하는 장애 패턴과 그 대응 전략, 그리고 RAGAS를 활용한 체계적 평가 방법론까지 포괄적으로 살펴본다.

RAG 아키텍처 개요

RAG 파이프라인은 크게 세 단계로 구성된다: Retrieval(검색), Augmentation(증강), Generation(생성). 각 단계는 독립적으로 최적화할 수 있으며, 전체 파이프라인의 품질은 가장 약한 단계에 의해 결정된다.

전체 아키텍처 흐름

[사용자 질의]
[Query Processing] ← 쿼리 분해, HyDE, 쿼리 확장
[Retrieval] ← BM25 + 벡터 검색 (하이브리드)
[Reranking] ← Cross-encoder, Cohere Rerank
[Context Assembly] ← 청크 병합, 중복 제거, 토큰 제한
[Generation] ← LLM에 컨텍스트와 질의 전달
[Post-processing] ← 출처 표시, 신뢰도 점수, 환각 검증

기본 RAG 구현

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# 1. 문서 청킹
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)

# 2. 임베딩 및 벡터 저장소 생성
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    collection_name="production_docs"
)

# 3. Retriever 설정
retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 5, "fetch_k": 20}
)

# 4. RAG 체인 구성
template = """다음 컨텍스트를 기반으로 질문에 답변하세요.
컨텍스트에서 답을 찾을 수 없다면 "정보가 부족합니다"라고 답하세요.

컨텍스트:
{context}

질문: {question}
답변:"""

prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o", temperature=0)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

response = rag_chain.invoke("RAG 파이프라인의 핵심 구성요소는?")

임베딩 모델 비교

임베딩 모델은 RAG 파이프라인의 검색 품질을 결정하는 가장 중요한 요소 중 하나다. 2025-2026년 현재 주요 임베딩 모델들의 특성을 비교한다.

모델차원최대 토큰MTEB 평균다국어비용특징
OpenAI text-embedding-3-large3072819164.6O$0.13/1M tokens차원 축소 가능
OpenAI text-embedding-3-small1536819162.3O$0.02/1M tokens비용 효율적
Cohere embed-v3102451264.5O (100+)$0.10/1M tokens검색 특화
BGE-M31024819266.1O (100+)무료 (자체 호스팅)Dense+Sparse+ColBERT
E5-mistral-7b-instruct40963276866.6O무료 (자체 호스팅)지시 기반 임베딩
Jina-embeddings-v31024819265.5O (89)$0.02/1M tokensTask-specific LoRA

임베딩 모델 선택 기준

from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List

class EmbeddingBenchmark:
    """임베딩 모델 비교 벤치마크"""

    def __init__(self, models: List[str]):
        self.models = {}
        for model_name in models:
            self.models[model_name] = SentenceTransformer(model_name)

    def evaluate_retrieval_quality(
        self,
        queries: List[str],
        relevant_docs: List[List[str]],
        corpus: List[str]
    ) -> dict:
        results = {}
        for name, model in self.models.items():
            corpus_embeddings = model.encode(corpus, normalize_embeddings=True)
            query_embeddings = model.encode(queries, normalize_embeddings=True)

            # 코사인 유사도 계산
            scores = np.dot(query_embeddings, corpus_embeddings.T)

            # Recall@k 계산
            recall_at_5 = self._compute_recall(scores, relevant_docs, k=5)
            recall_at_10 = self._compute_recall(scores, relevant_docs, k=10)

            results[name] = {
                "recall@5": recall_at_5,
                "recall@10": recall_at_10,
                "embedding_dim": model.get_sentence_embedding_dimension(),
                "encoding_speed": self._measure_speed(model, queries)
            }
        return results

    def _compute_recall(self, scores, relevant_docs, k):
        recalls = []
        for i, rel_docs in enumerate(relevant_docs):
            top_k_indices = np.argsort(scores[i])[-k:]
            retrieved = set(top_k_indices)
            relevant = set(rel_docs)
            recalls.append(len(retrieved & relevant) / len(relevant))
        return np.mean(recalls)

    def _measure_speed(self, model, texts, n_runs=3):
        import time
        times = []
        for _ in range(n_runs):
            start = time.time()
            model.encode(texts)
            times.append(time.time() - start)
        return np.mean(times)

벡터 데이터베이스 비교

벡터 데이터베이스 선택은 RAG 시스템의 운영 복잡도, 비용, 성능에 직접적인 영향을 미친다. 주요 벡터 DB를 다각도로 비교한다.

항목PineconeMilvusWeaviateQdrantChroma
배포 방식완전 관리형자체 호스팅/Zilliz Cloud자체 호스팅/Cloud자체 호스팅/Cloud인메모리/자체 호스팅
최대 벡터 수수십억수십억수십억수십억수백만
검색 알고리즘자체 엔진HNSW, IVF, DiskANNHNSWHNSWHNSW
하이브리드 검색O (Sparse+Dense)O (BM25+Dense)O (BM25+Dense)O (Sparse+Dense)X
멀티테넌시O (네임스페이스)O (파티션)O (테넌트)O (컬렉션)O (컬렉션)
필터링메타데이터 필터스칼라 필터링GraphQL 필터페이로드 필터메타데이터 필터
가격 (100만 벡터)약 $70/월무료 (자체 호스팅)무료 (자체 호스팅)무료 (자체 호스팅)무료
적합 용도빠른 프로토타입, 관리형 선호대규모 엔터프라이즈GraphQL 생태계고성능 필터링개발/소규모

Qdrant를 활용한 프로덕션 벡터 검색

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct,
    Filter, FieldCondition, MatchValue,
    SearchParams, HnswConfigDiff
)
import uuid

class ProductionVectorStore:
    """프로덕션 벡터 저장소 관리자"""

    def __init__(self, url: str, api_key: str, collection_name: str):
        self.client = QdrantClient(url=url, api_key=api_key)
        self.collection_name = collection_name

    def create_collection(self, vector_size: int = 1536):
        """HNSW 인덱스 최적화된 컬렉션 생성"""
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=Distance.COSINE,
                on_disk=True  # 대규모 데이터셋을 위한 디스크 기반 저장
            ),
            hnsw_config=HnswConfigDiff(
                m=16,              # 그래프 연결 수 (기본: 16)
                ef_construct=128,  # 인덱스 빌드 시 탐색 범위
                full_scan_threshold=10000  # 이 수 이하면 전체 스캔
            ),
            optimizers_config={
                "indexing_threshold": 20000,
                "memmap_threshold": 50000
            }
        )

    def upsert_documents(self, documents: list, embeddings: list, metadata: list):
        """배치 업서트"""
        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding,
                payload={**meta, "text": doc}
            )
            for doc, embedding, meta in zip(documents, embeddings, metadata)
        ]

        # 배치 크기 100으로 분할 업로드
        batch_size = 100
        for i in range(0, len(points), batch_size):
            batch = points[i:i + batch_size]
            self.client.upsert(
                collection_name=self.collection_name,
                points=batch,
                wait=True
            )

    def hybrid_search(self, query_vector: list, query_text: str,
                      filters: dict = None, top_k: int = 10):
        """하이브리드 검색 (벡터 + 메타데이터 필터)"""
        search_filter = None
        if filters:
            conditions = [
                FieldCondition(key=k, match=MatchValue(value=v))
                for k, v in filters.items()
            ]
            search_filter = Filter(must=conditions)

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            query_filter=search_filter,
            limit=top_k,
            search_params=SearchParams(
                hnsw_ef=128,  # 검색 시 탐색 범위 (높을수록 정확, 느림)
                exact=False   # 근사 검색 사용
            )
        )
        return results

청킹 전략

청킹은 RAG 품질에 가장 큰 영향을 미치는 전처리 단계다. 잘못된 청킹은 관련 정보를 분리시키거나, 불필요한 노이즈를 포함시켜 검색 품질을 크게 저하시킨다.

청킹 전략 비교

전략장점단점적합 용도
고정 크기 (Fixed-size)구현 간단, 예측 가능의미 단위 무시균일한 텍스트
재귀적 (Recursive)문단/문장 경계 존중최적 크기 튜닝 필요범용 문서
시맨틱 (Semantic)의미 단위 보존계산 비용 높음구조 없는 텍스트
문서 구조 기반제목/섹션 계층 유지파서 구현 필요마크다운, HTML
부모-자식 (Parent-Child)검색 정밀도 + 충분한 맥락저장 공간 2배기술 문서

시맨틱 청킹 구현

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
import numpy as np

class AdvancedChunker:
    """다양한 청킹 전략을 지원하는 고급 청커"""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        self.embeddings = OpenAIEmbeddings(model=embedding_model)

    def semantic_chunk(self, text: str, breakpoint_threshold: float = 0.5):
        """시맨틱 청킹 - 의미 변화 지점에서 분할"""
        chunker = SemanticChunker(
            self.embeddings,
            breakpoint_threshold_type="percentile",
            breakpoint_threshold_amount=breakpoint_threshold * 100
        )
        return chunker.split_text(text)

    def parent_child_chunk(self, text: str,
                            parent_size: int = 2000,
                            child_size: int = 400,
                            child_overlap: int = 50):
        """부모-자식 청킹 - 검색은 자식, 컨텍스트는 부모"""
        from langchain.text_splitter import RecursiveCharacterTextSplitter

        # 부모 청크 생성
        parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=parent_size,
            chunk_overlap=0
        )
        parent_chunks = parent_splitter.split_text(text)

        # 각 부모에서 자식 청크 생성
        child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=child_size,
            chunk_overlap=child_overlap
        )

        result = []
        for i, parent in enumerate(parent_chunks):
            children = child_splitter.split_text(parent)
            for child in children:
                result.append({
                    "child_text": child,
                    "parent_text": parent,
                    "parent_id": i
                })
        return result

    def markdown_structure_chunk(self, markdown_text: str):
        """마크다운 문서 구조 기반 청킹"""
        from langchain.text_splitter import MarkdownHeaderTextSplitter

        headers_to_split_on = [
            ("#", "h1"),
            ("##", "h2"),
            ("###", "h3"),
        ]

        splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on
        )
        chunks = splitter.split_text(markdown_text)

        # 각 청크에 계층 구조 메타데이터 추가
        enriched_chunks = []
        for chunk in chunks:
            enriched_chunks.append({
                "text": chunk.page_content,
                "metadata": chunk.metadata,
                "hierarchy": " > ".join(
                    chunk.metadata.get(h, "")
                    for h in ["h1", "h2", "h3"]
                    if chunk.metadata.get(h)
                )
            })
        return enriched_chunks

검색 최적화 전략

단순 벡터 유사도 검색만으로는 프로덕션 수준의 검색 품질을 달성하기 어렵다. 하이브리드 검색, HyDE, 쿼리 분해 등 다양한 최적화 기법을 조합해야 한다.

하이브리드 검색 (BM25 + 벡터)

from rank_bm25 import BM25Okapi
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple

class HybridRetriever:
    """BM25 + 벡터 검색을 결합한 하이브리드 검색기"""

    def __init__(self, documents: List[str], embeddings_model: str = "text-embedding-3-large"):
        self.documents = documents
        self.embeddings = OpenAIEmbeddings(model=embeddings_model)

        # BM25 인덱스 구축
        tokenized_docs = [doc.lower().split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)

        # 벡터 인덱스 구축
        self.doc_embeddings = np.array(
            self.embeddings.embed_documents(documents)
        )

    def search(self, query: str, top_k: int = 5,
               alpha: float = 0.5) -> List[Tuple[str, float]]:
        """
        하이브리드 검색 수행
        alpha: 벡터 검색 가중치 (0=BM25만, 1=벡터만)
        """
        # BM25 점수
        bm25_scores = self.bm25.get_scores(query.lower().split())
        bm25_scores = self._normalize_scores(bm25_scores)

        # 벡터 유사도 점수
        query_embedding = np.array(self.embeddings.embed_query(query))
        vector_scores = np.dot(self.doc_embeddings, query_embedding)
        vector_scores = self._normalize_scores(vector_scores)

        # 가중 결합 (Reciprocal Rank Fusion 대안)
        combined_scores = alpha * vector_scores + (1 - alpha) * bm25_scores

        # 상위 k개 반환
        top_indices = np.argsort(combined_scores)[-top_k:][::-1]
        return [
            (self.documents[i], combined_scores[i])
            for i in top_indices
        ]

    def _normalize_scores(self, scores: np.ndarray) -> np.ndarray:
        """Min-Max 정규화"""
        min_s, max_s = scores.min(), scores.max()
        if max_s == min_s:
            return np.zeros_like(scores)
        return (scores - min_s) / (max_s - min_s)

HyDE (Hypothetical Document Embedding)

HyDE는 쿼리에 대한 가상의 답변을 먼저 생성한 후, 그 답변의 임베딩을 검색에 사용하는 기법이다. 질문과 답변 사이의 의미적 차이를 줄여 검색 품질을 향상시킨다.

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate

class HyDERetriever:
    """HyDE 기반 검색기"""

    def __init__(self, vectorstore):
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

        self.hyde_prompt = ChatPromptTemplate.from_template(
            "다음 질문에 대해 가상의 답변을 작성해주세요. "
            "실제 정확성보다는 관련 키워드와 개념을 포함하는 것이 중요합니다.\n\n"
            "질문: {question}\n\n"
            "가상 답변:"
        )

    def retrieve(self, query: str, top_k: int = 5):
        """HyDE 검색 수행"""
        # 1. 가상 답변 생성
        chain = self.hyde_prompt | self.llm
        hypothetical_answer = chain.invoke({"question": query}).content

        # 2. 가상 답변의 임베딩 생성
        hyde_embedding = self.embeddings.embed_query(hypothetical_answer)

        # 3. 가상 답변 임베딩으로 검색
        results = self.vectorstore.similarity_search_by_vector(
            hyde_embedding, k=top_k
        )
        return results

쿼리 분해 (Query Decomposition)

복잡한 질문을 하위 질문들로 분해하여 각각 검색한 후 결과를 종합하는 전략이다.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json

class QueryDecomposer:
    """복잡한 쿼리를 하위 쿼리로 분해"""

    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
        self.decompose_prompt = ChatPromptTemplate.from_template(
            "다음 질문을 검색에 최적화된 2-4개의 하위 질문으로 분해하세요.\n"
            "JSON 배열 형태로 반환하세요.\n\n"
            "원래 질문: {question}\n\n"
            "하위 질문들:"
        )

    def decompose(self, question: str) -> list:
        """쿼리 분해"""
        chain = self.decompose_prompt | self.llm
        result = chain.invoke({"question": question}).content

        try:
            sub_queries = json.loads(result)
        except json.JSONDecodeError:
            sub_queries = [question]

        return sub_queries

    def retrieve_and_merge(self, question: str, retriever, top_k: int = 3):
        """분해된 쿼리로 검색 후 결과 병합"""
        sub_queries = self.decompose(question)

        all_docs = []
        seen_contents = set()

        for sub_query in sub_queries:
            docs = retriever.invoke(sub_query)[:top_k]
            for doc in docs:
                if doc.page_content not in seen_contents:
                    seen_contents.add(doc.page_content)
                    all_docs.append(doc)

        return all_docs

리랭킹 (Reranking)

초기 검색 결과를 더 정밀한 모델로 재평가하여 순위를 재정렬하는 단계다. Bi-encoder 기반의 벡터 검색보다 Cross-encoder 기반 리랭킹이 더 정확한 관련성 판단을 수행한다.

Cohere Rerank 활용

import cohere
from typing import List, Dict

class CohereReranker:
    """Cohere Rerank API를 활용한 리랭킹"""

    def __init__(self, api_key: str, model: str = "rerank-v3.5"):
        self.client = cohere.Client(api_key)
        self.model = model

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """문서 리랭킹"""
        response = self.client.rerank(
            model=self.model,
            query=query,
            documents=documents,
            top_n=top_n,
            return_documents=True
        )

        results = []
        for hit in response.results:
            results.append({
                "text": hit.document.text,
                "relevance_score": hit.relevance_score,
                "index": hit.index
            })
        return results


class CrossEncoderReranker:
    """로컬 Cross-Encoder 모델 기반 리랭킹"""

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: List[str],
               top_n: int = 5) -> List[Dict]:
        """Cross-encoder로 리랭킹"""
        pairs = [[query, doc] for doc in documents]
        scores = self.model.predict(pairs)

        # 점수 기준 정렬
        scored_docs = list(zip(documents, scores, range(len(documents))))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        results = []
        for text, score, idx in scored_docs[:top_n]:
            results.append({
                "text": text,
                "relevance_score": float(score),
                "index": idx
            })
        return results

리랭킹 파이프라인 통합

class RAGPipelineWithReranking:
    """리랭킹이 통합된 RAG 파이프라인"""

    def __init__(self, retriever, reranker, llm):
        self.retriever = retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str, top_k_retrieve: int = 20,
              top_k_rerank: int = 5) -> str:
        """검색 -> 리랭킹 -> 생성"""
        # 1단계: 초기 검색 (넓게)
        initial_docs = self.retriever.invoke(question)[:top_k_retrieve]
        doc_texts = [doc.page_content for doc in initial_docs]

        # 2단계: 리랭킹 (정밀)
        reranked = self.reranker.rerank(
            query=question,
            documents=doc_texts,
            top_n=top_k_rerank
        )

        # 3단계: 컨텍스트 조립
        context = "\n\n---\n\n".join(
            r["text"] for r in reranked
        )

        # 4단계: LLM 생성
        prompt = f"""다음 컨텍스트를 기반으로 질문에 답변하세요.
각 정보의 출처를 표시하세요.

컨텍스트:
{context}

질문: {question}
답변:"""

        response = self.llm.invoke(prompt)
        return response.content

RAGAS를 활용한 RAG 평가

RAGAS(Retrieval Augmented Generation Assessment)는 RAG 파이프라인을 자동으로 평가하는 프레임워크다. 검색 품질과 생성 품질을 별도의 메트릭으로 측정한다.

RAGAS 핵심 메트릭

메트릭측정 대상설명이상적 값
Context Precision검색검색된 컨텍스트 중 관련 있는 비율0.8 이상
Context Recall검색필요한 정보가 컨텍스트에 포함된 비율0.9 이상
Faithfulness생성답변이 컨텍스트에 근거한 정도0.9 이상
Answer Relevancy생성답변이 질문에 적합한 정도0.8 이상
Answer Correctness전체정답과 비교한 정확도0.7 이상

RAGAS 평가 구현

from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    answer_correctness
)
from datasets import Dataset

class RAGEvaluator:
    """RAGAS 기반 RAG 평가기"""

    def __init__(self):
        self.metrics = [
            context_precision,
            context_recall,
            faithfulness,
            answer_relevancy,
            answer_correctness
        ]

    def create_eval_dataset(self, eval_samples: list) -> Dataset:
        """평가 데이터셋 생성"""
        data = {
            "question": [],
            "answer": [],
            "contexts": [],
            "ground_truth": []
        }

        for sample in eval_samples:
            data["question"].append(sample["question"])
            data["answer"].append(sample["generated_answer"])
            data["contexts"].append(sample["retrieved_contexts"])
            data["ground_truth"].append(sample["expected_answer"])

        return Dataset.from_dict(data)

    def evaluate(self, eval_samples: list) -> dict:
        """RAG 파이프라인 평가 수행"""
        dataset = self.create_eval_dataset(eval_samples)

        result = evaluate(
            dataset=dataset,
            metrics=self.metrics
        )

        return {
            "context_precision": result["context_precision"],
            "context_recall": result["context_recall"],
            "faithfulness": result["faithfulness"],
            "answer_relevancy": result["answer_relevancy"],
            "answer_correctness": result["answer_correctness"],
            "overall_score": sum(result.values()) / len(result)
        }

    def generate_report(self, results: dict) -> str:
        """평가 리포트 생성"""
        report = "=== RAG Pipeline Evaluation Report ===\n\n"

        for metric, score in results.items():
            status = "PASS" if score >= 0.7 else "WARN" if score >= 0.5 else "FAIL"
            report += f"  {metric}: {score:.4f} [{status}]\n"

        report += f"\n  Overall: {results.get('overall_score', 0):.4f}\n"
        return report

프로덕션 배포 패턴

비동기 인덱싱 파이프라인

프로덕션 환경에서는 문서 인덱싱과 검색 서빙을 분리해야 한다. 새 문서가 추가될 때 비동기로 임베딩을 생성하고 인덱스를 업데이트한다.

import asyncio
from datetime import datetime
from typing import Optional
import hashlib

class AsyncIndexingPipeline:
    """비동기 문서 인덱싱 파이프라인"""

    def __init__(self, embeddings, vector_store, chunker):
        self.embeddings = embeddings
        self.vector_store = vector_store
        self.chunker = chunker
        self.index_queue = asyncio.Queue()
        self._processed_hashes = set()

    async def enqueue_document(self, doc_id: str, content: str,
                                metadata: Optional[dict] = None):
        """문서를 인덱싱 큐에 추가"""
        doc_hash = hashlib.md5(content.encode()).hexdigest()

        if doc_hash in self._processed_hashes:
            return {"status": "skipped", "reason": "duplicate"}

        await self.index_queue.put({
            "doc_id": doc_id,
            "content": content,
            "metadata": metadata or {},
            "hash": doc_hash,
            "enqueued_at": datetime.utcnow().isoformat()
        })
        return {"status": "enqueued", "queue_size": self.index_queue.qsize()}

    async def process_queue(self, batch_size: int = 10):
        """큐에서 배치 단위로 문서 처리"""
        while True:
            batch = []
            try:
                for _ in range(batch_size):
                    item = self.index_queue.get_nowait()
                    batch.append(item)
            except asyncio.QueueEmpty:
                pass

            if batch:
                await self._process_batch(batch)
            else:
                await asyncio.sleep(1)

    async def _process_batch(self, batch: list):
        """배치 처리: 청킹 -> 임베딩 -> 업서트"""
        all_chunks = []
        all_metadata = []

        for item in batch:
            chunks = self.chunker.semantic_chunk(item["content"])
            for chunk in chunks:
                all_chunks.append(chunk)
                all_metadata.append({
                    "doc_id": item["doc_id"],
                    "hash": item["hash"],
                    **item["metadata"]
                })

        # 배치 임베딩 생성
        chunk_embeddings = self.embeddings.embed_documents(all_chunks)

        # 벡터 저장소에 업서트
        self.vector_store.upsert_documents(
            documents=all_chunks,
            embeddings=chunk_embeddings,
            metadata=all_metadata
        )

        for item in batch:
            self._processed_hashes.add(item["hash"])

검색 결과 캐싱

자주 반복되는 쿼리에 대해 캐싱을 적용하여 응답 속도를 향상시키고 API 비용을 절감한다.

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis

class RAGCache:
    """RAG 검색 결과 캐시"""

    def __init__(self, redis_url: str = "redis://localhost:6379",
                 ttl_seconds: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl_seconds

    def _cache_key(self, query: str, filters: Optional[dict] = None) -> str:
        """쿼리와 필터 기반 캐시 키 생성"""
        key_data = {"query": query.lower().strip(), "filters": filters or {}}
        key_hash = hashlib.sha256(
            json.dumps(key_data, sort_keys=True).encode()
        ).hexdigest()
        return f"rag:cache:{key_hash}"

    def get(self, query: str, filters: Optional[dict] = None):
        """캐시에서 검색 결과 조회"""
        key = self._cache_key(query, filters)
        cached = self.redis.get(key)

        if cached:
            data = json.loads(cached)
            data["cache_hit"] = True
            return data
        return None

    def set(self, query: str, results: list,
            filters: Optional[dict] = None):
        """검색 결과를 캐시에 저장"""
        key = self._cache_key(query, filters)
        data = {
            "results": results,
            "cached_at": datetime.utcnow().isoformat(),
            "query": query
        }
        self.redis.setex(key, self.ttl, json.dumps(data))

    def invalidate_by_pattern(self, pattern: str):
        """패턴 매칭으로 캐시 무효화"""
        keys = self.redis.keys(f"rag:cache:*{pattern}*")
        if keys:
            self.redis.delete(*keys)

장애 사례 및 대응

검색 품질 저하 패턴

장애 유형증상원인대응 방안
임베딩 드리프트시간 경과에 따른 검색 정확도 저하임베딩 모델 버전 변경, 데이터 분포 변화정기적 재인덱싱, 모델 버전 고정
청크 불일치관련 정보가 검색되지 않음청크 크기/전략 부적절청크 크기 실험, 오버랩 조정
필터 과적합결과가 0건인 쿼리 증가메타데이터 필터가 너무 엄격필터 폴백 전략, 필터 완화
컨텍스트 오염LLM이 무관한 정보로 답변리랭킹 부재, top-k 과다리랭킹 도입, top-k 감소
지연 시간 증가응답 시간 2초 초과인덱스 비최적화, 네트워크 지연인덱스 튜닝, 캐싱, 비동기 처리

임베딩 드리프트 모니터링

import numpy as np
from datetime import datetime
from typing import List

class EmbeddingDriftMonitor:
    """임베딩 드리프트 감지 및 모니터링"""

    def __init__(self, reference_embeddings: np.ndarray):
        self.reference_mean = np.mean(reference_embeddings, axis=0)
        self.reference_std = np.std(reference_embeddings, axis=0)
        self.drift_history = []

    def check_drift(self, new_embeddings: np.ndarray,
                     threshold: float = 0.1) -> dict:
        """새 임베딩의 드리프트 검사"""
        new_mean = np.mean(new_embeddings, axis=0)

        # 코사인 거리로 드리프트 측정
        cosine_sim = np.dot(self.reference_mean, new_mean) / (
            np.linalg.norm(self.reference_mean) * np.linalg.norm(new_mean)
        )
        drift_score = 1 - cosine_sim

        # 분포 비교 (KL Divergence 근사)
        distribution_shift = np.mean(
            np.abs(np.mean(new_embeddings, axis=0) - self.reference_mean)
            / (self.reference_std + 1e-8)
        )

        result = {
            "drift_score": float(drift_score),
            "distribution_shift": float(distribution_shift),
            "is_drifted": drift_score > threshold,
            "timestamp": datetime.utcnow().isoformat(),
            "recommendation": (
                "재인덱싱 필요" if drift_score > threshold
                else "정상 범위"
            )
        }

        self.drift_history.append(result)
        return result

검색 품질 저하 시 폴백 전략

class RAGWithFallback:
    """폴백 전략이 포함된 RAG 파이프라인"""

    def __init__(self, primary_retriever, fallback_retriever,
                 reranker, llm):
        self.primary = primary_retriever
        self.fallback = fallback_retriever
        self.reranker = reranker
        self.llm = llm

    def query(self, question: str) -> dict:
        """폴백을 포함한 질의 처리"""
        # 1차: 기본 검색
        primary_docs = self.primary.invoke(question)

        if not primary_docs:
            # 폴백 1: 필터 완화
            primary_docs = self.fallback.invoke(question)

        if not primary_docs:
            return {
                "answer": "관련 정보를 찾을 수 없습니다.",
                "confidence": 0.0,
                "source": "no_results"
            }

        # 리랭킹
        reranked = self.reranker.rerank(
            query=question,
            documents=[d.page_content for d in primary_docs],
            top_n=5
        )

        # 최소 관련도 검증
        if reranked[0]["relevance_score"] < 0.3:
            return {
                "answer": "높은 신뢰도의 답변을 생성할 수 없습니다.",
                "confidence": reranked[0]["relevance_score"],
                "source": "low_confidence"
            }

        # LLM 생성
        context = "\n\n".join(r["text"] for r in reranked)
        answer = self.llm.invoke(
            f"컨텍스트:\n{context}\n\n질문: {question}\n답변:"
        ).content

        return {
            "answer": answer,
            "confidence": reranked[0]["relevance_score"],
            "source": "primary",
            "num_sources": len(reranked)
        }

프로덕션 체크리스트

설계 단계

  • 사용 사례에 따른 임베딩 모델 벤치마크 수행
  • 벡터 DB 선택: 규모, 예산, 운영 역량 고려
  • 청킹 전략 A/B 테스트 계획 수립
  • 평가 데이터셋 (최소 100개 QA 쌍) 구축

개발 단계

  • 하이브리드 검색 (BM25 + 벡터) 구현
  • 리랭킹 파이프라인 통합
  • 비동기 인덱싱 파이프라인 구축
  • 캐싱 레이어 구현 (Redis/Memcached)
  • 에러 핸들링 및 폴백 전략 구현

배포 단계

  • RAGAS 메트릭 기반 품질 게이트 설정
  • 임베딩 드리프트 모니터링 대시보드 구축
  • 검색 지연 시간 알림 설정 (P95 기준 500ms)
  • 인덱스 재구축 자동화 파이프라인 구축
  • 로드 테스트 수행 (목표 QPS 대비 2배)

운영 단계

  • 주간 검색 품질 리포트 생성
  • 월간 임베딩 모델 업데이트 검토
  • 사용자 피드백 기반 평가 데이터셋 업데이트
  • 비용 모니터링 (임베딩 API 호출, 벡터 DB 스토리지)
  • 정기 인덱스 재구축 (분기 1회 이상)

참고자료