- Published on
LangChain + RAG로 지능형 Telegram FAQ 봇 만들기: 문서 기반 질의응답 시스템
- Authors
- Name
들어가며
규칙 기반 챗봇은 미리 정의된 질문에만 답할 수 있지만, RAG(Retrieval-Augmented Generation) 기반 챗봇은 문서에서 관련 정보를 검색하여 자연어로 답변합니다. 이 글에서는 회사 FAQ 문서를 기반으로 질문에 답하는 Telegram 봇을 구축합니다.
아키텍처
사용자 질문
↓
Telegram Bot API
↓
LangChain RAG Pipeline
├── 1. Query Embedding (OpenAI)
├── 2. Vector Search (ChromaDB)
├── 3. Context Retrieval (Top-K)
└── 4. LLM Generation (GPT-4o)
↓
답변 + 출처 표시
환경 설정
pip install langchain langchain-openai langchain-community \
chromadb python-telegram-bot tiktoken \
pypdf docx2txt unstructured
# config.py
import os
TELEGRAM_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
# RAG 설정
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
TOP_K = 4
MODEL_NAME = "gpt-4o"
EMBEDDING_MODEL = "text-embedding-3-small"
문서 로딩과 인덱싱
# indexer.py
from langchain_community.document_loaders import (
DirectoryLoader,
PyPDFLoader,
TextLoader,
Docx2txtLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
def load_documents(docs_dir: str):
"""다양한 형식의 문서 로딩"""
loaders = {
"**/*.pdf": PyPDFLoader,
"**/*.txt": TextLoader,
"**/*.md": TextLoader,
"**/*.docx": Docx2txtLoader,
}
all_docs = []
for glob_pattern, loader_cls in loaders.items():
loader = DirectoryLoader(
docs_dir,
glob=glob_pattern,
loader_cls=loader_cls,
show_progress=True,
)
docs = loader.load()
all_docs.extend(docs)
print(f"Loaded {len(docs)} docs from {glob_pattern}")
return all_docs
def create_vector_store(docs_dir: str, persist_dir: str = "./chroma_db"):
"""문서를 청크로 분할하고 벡터 스토어에 저장"""
# 문서 로딩
documents = load_documents(docs_dir)
print(f"Total documents: {len(documents)}")
# 텍스트 분할
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", "!", "?", ",", " "],
)
chunks = text_splitter.split_documents(documents)
print(f"Total chunks: {len(chunks)}")
# 임베딩 생성 & 벡터 스토어 저장
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_dir,
collection_metadata={"hnsw:space": "cosine"},
)
print(f"Vector store created at {persist_dir}")
return vectorstore
if __name__ == "__main__":
create_vector_store("./docs")
RAG 체인 구현
# rag_chain.py
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
SYSTEM_PROMPT = """당신은 회사 FAQ 도우미입니다. 제공된 컨텍스트를 기반으로 질문에 답변하세요.
규칙:
1. 컨텍스트에 있는 정보만 사용하세요.
2. 확실하지 않으면 "제공된 문서에서 해당 정보를 찾을 수 없습니다"라고 답하세요.
3. 답변 끝에 참고한 문서 출처를 표시하세요.
4. 간결하고 명확하게 답변하세요.
컨텍스트:
{context}"""
def create_rag_chain(persist_dir: str = "./chroma_db"):
"""RAG 체인 생성"""
# 벡터 스토어 로드
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
persist_directory=persist_dir,
embedding_function=embeddings,
)
# 리트리버 설정
retriever = vectorstore.as_retriever(
search_type="mmr", # Maximal Marginal Relevance
search_kwargs={
"k": 4,
"fetch_k": 10,
"lambda_mult": 0.7,
},
)
# LLM
llm = ChatOpenAI(
model="gpt-4o",
temperature=0.1,
max_tokens=1024,
)
# 대화 메모리 (최근 5턴)
memory = ConversationBufferWindowMemory(
k=5,
memory_key="chat_history",
return_messages=True,
output_key="answer",
)
# 프롬프트
prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(SYSTEM_PROMPT),
HumanMessagePromptTemplate.from_template("{question}"),
])
# 체인 생성
chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=retriever,
memory=memory,
return_source_documents=True,
combine_docs_chain_kwargs={"prompt": prompt},
verbose=False,
)
return chain
class RAGBot:
"""사용자별 대화 컨텍스트를 관리하는 RAG 봇"""
def __init__(self, persist_dir: str = "./chroma_db"):
self.persist_dir = persist_dir
self.user_chains: dict[int, ConversationalRetrievalChain] = {}
def get_chain(self, user_id: int):
"""사용자별 체인 (대화 메모리 분리)"""
if user_id not in self.user_chains:
self.user_chains[user_id] = create_rag_chain(self.persist_dir)
return self.user_chains[user_id]
async def ask(self, user_id: int, question: str) -> tuple[str, list[str]]:
"""질문에 답변하고 출처를 반환"""
chain = self.get_chain(user_id)
result = chain.invoke({"question": question})
answer = result["answer"]
sources = []
for doc in result.get("source_documents", []):
source = doc.metadata.get("source", "Unknown")
page = doc.metadata.get("page", "")
if page:
sources.append(f"{source} (p.{page})")
else:
sources.append(source)
# 중복 제거
sources = list(dict.fromkeys(sources))
return answer, sources
def reset_memory(self, user_id: int):
"""사용자의 대화 메모리 초기화"""
if user_id in self.user_chains:
del self.user_chains[user_id]
Telegram 봇 구현
# bot.py
import logging
from telegram import Update, BotCommand
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
from rag_chain import RAGBot
from config import TELEGRAM_TOKEN
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
rag_bot = RAGBot()
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""시작 명령어"""
welcome = (
"안녕하세요! FAQ 도우미입니다.\n\n"
"궁금한 것을 자유롭게 물어보세요.\n"
"회사 문서를 기반으로 답변해 드립니다.\n\n"
"명령어:\n"
"/reset - 대화 초기화\n"
"/sources - 검색 가능한 문서 목록"
)
await update.message.reply_text(welcome)
async def reset(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""대화 메모리 초기화"""
user_id = update.effective_user.id
rag_bot.reset_memory(user_id)
await update.message.reply_text("대화가 초기화되었습니다.")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""일반 메시지 처리"""
user_id = update.effective_user.id
question = update.message.text
# 타이핑 표시
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action="typing"
)
try:
answer, sources = await rag_bot.ask(user_id, question)
# 답변 포맷팅
response = answer
if sources:
response += "\n\n📚 참고 문서:\n"
for src in sources[:3]:
response += f" • {src}\n"
await update.message.reply_text(response)
except Exception as e:
logger.error(f"Error: {e}")
await update.message.reply_text(
"죄송합니다. 답변을 생성하는 중 오류가 발생했습니다."
)
async def post_init(application: Application):
"""봇 시작 시 명령어 등록"""
commands = [
BotCommand("start", "봇 시작"),
BotCommand("reset", "대화 초기화"),
BotCommand("sources", "검색 가능한 문서 목록"),
]
await application.bot.set_my_commands(commands)
def main():
app = Application.builder().token(TELEGRAM_TOKEN).post_init(post_init).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("reset", reset))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("Bot started")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()
Docker로 배포
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 문서 인덱싱
RUN python indexer.py
CMD ["python", "bot.py"]
# docker-compose.yml
services:
faq-bot:
build: .
environment:
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./docs:/app/docs
- chroma-data:/app/chroma_db
restart: unless-stopped
volumes:
chroma-data:
docker-compose up -d
문서 자동 업데이트
# watcher.py - 문서 변경 감지 및 자동 재인덱싱
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
class DocChangeHandler(FileSystemEventHandler):
def __init__(self, indexer_fn):
self.indexer_fn = indexer_fn
self.last_indexed = 0
def on_modified(self, event):
if event.is_directory:
return
# 디바운스 (5초 이내 중복 방지)
now = time.time()
if now - self.last_indexed < 5:
return
self.last_indexed = now
print(f"Document changed: {event.src_path}")
self.indexer_fn()
def watch_docs(docs_dir, indexer_fn):
handler = DocChangeHandler(indexer_fn)
observer = Observer()
observer.schedule(handler, docs_dir, recursive=True)
observer.start()
return observer
성능 최적화
캐싱
from functools import lru_cache
import hashlib
class CachedRAGBot(RAGBot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache: dict[str, tuple[str, list[str]]] = {}
async def ask(self, user_id: int, question: str):
cache_key = hashlib.md5(question.lower().strip().encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
answer, sources = await super().ask(user_id, question)
self.cache[cache_key] = (answer, sources)
return answer, sources
정리
LangChain + RAG + Telegram으로 지능형 FAQ 봇을 구축했습니다:
- 문서 기반 답변: 정확한 정보만 제공, 환각 최소화
- 대화 메모리: 사용자별 컨텍스트 유지
- 출처 표시: 답변의 근거 문서를 투명하게 제시
- MMR 검색: 다양성과 관련성을 균형 있게 검색
- 자동 업데이트: 문서 변경 시 자동 재인덱싱
✅ 퀴즈: RAG Telegram 봇 이해도 점검 (7문제)
Q1. RAG에서 Retrieval의 역할은?
사용자 질문과 관련된 문서 청크를 벡터 유사도 검색으로 찾아 LLM의 컨텍스트로 제공합니다.
Q2. MMR(Maximal Marginal Relevance) 검색의 장점은?
단순 유사도 검색과 달리 결과의 다양성을 고려하여 중복된 내용의 청크를 줄입니다.
Q3. chunk_overlap을 설정하는 이유는?
문장이 청크 경계에서 잘리는 경우 문맥이 손실되는 것을 방지합니다.
Q4. 사용자별 대화 메모리를 분리하는 이유는?
여러 사용자가 동시에 사용할 때, 다른 사용자의 대화 컨텍스트가 섞이지 않도록 합니다.
Q5. ConversationBufferWindowMemory의 k=5는 무엇을 의미하나요?
최근 5턴의 대화만 메모리에 유지하여 토큰 비용을 제어합니다.
Q6. 봇이 "제공된 문서에서 해당 정보를 찾을 수 없습니다"라고 답하는 것이 중요한 이유는?
RAG 봇이 문서에 없는 정보를 환각(hallucination)으로 생성하는 것을 방지합니다.
Q7. 문서 자동 업데이트(watchdog)의 동작 원리는?
파일 시스템 변경을 감지하여 문서가 수정되면 자동으로 벡터 스토어를 재인덱싱합니다.