- Authors
- Name
はじめに
ルールベースのチャットボットは事前に定義された質問にしか答えられませんが、RAG(Retrieval-Augmented Generation) ベースのチャットボットはドキュメントから関連情報を検索し、自然言語で回答します。この記事では、会社のFAQドキュメントに基づいて質問に答えるTelegramボットを構築します。
アーキテクチャ
ユーザーの質問
↓
Telegram Bot API
↓
LangChain RAG Pipeline
├── 1. Query Embedding (OpenAI)
├── 2. Vector Search (ChromaDB)
├── 3. Context Retrieval (Top-K)
└── 4. LLM Generation (GPT-4o)
↓
回答 + ソース表示
環境設定
pip install langchain langchain-openai langchain-community \
chromadb python-telegram-bot tiktoken \
pypdf docx2txt unstructured
# config.py
import os
TELEGRAM_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
# RAG設定
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
TOP_K = 4
MODEL_NAME = "gpt-4o"
EMBEDDING_MODEL = "text-embedding-3-small"
ドキュメントの読み込みとインデックス作成
# indexer.py
from langchain_community.document_loaders import (
DirectoryLoader,
PyPDFLoader,
TextLoader,
Docx2txtLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
def load_documents(docs_dir: str):
"""様々な形式のドキュメントを読み込む"""
loaders = {
"**/*.pdf": PyPDFLoader,
"**/*.txt": TextLoader,
"**/*.md": TextLoader,
"**/*.docx": Docx2txtLoader,
}
all_docs = []
for glob_pattern, loader_cls in loaders.items():
loader = DirectoryLoader(
docs_dir,
glob=glob_pattern,
loader_cls=loader_cls,
show_progress=True,
)
docs = loader.load()
all_docs.extend(docs)
print(f"Loaded {len(docs)} docs from {glob_pattern}")
return all_docs
def create_vector_store(docs_dir: str, persist_dir: str = "./chroma_db"):
"""ドキュメントをチャンクに分割し、ベクトルストアに保存"""
# ドキュメントの読み込み
documents = load_documents(docs_dir)
print(f"Total documents: {len(documents)}")
# テキスト分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", "!", "?", ",", " "],
)
chunks = text_splitter.split_documents(documents)
print(f"Total chunks: {len(chunks)}")
# エンベディング生成 & ベクトルストアに保存
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_dir,
collection_metadata={"hnsw:space": "cosine"},
)
print(f"Vector store created at {persist_dir}")
return vectorstore
if __name__ == "__main__":
create_vector_store("./docs")
RAGチェーンの実装
# rag_chain.py
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
SYSTEM_PROMPT = """あなたは会社のFAQアシスタントです。提供されたコンテキストに基づいて質問に回答してください。
ルール:
1. コンテキストにある情報のみを使用してください。
2. 確信が持てない場合は「提供されたドキュメントにはその情報が見つかりません」と答えてください。
3. 回答の最後に参照したドキュメントのソースを表示してください。
4. 簡潔かつ明確に回答してください。
コンテキスト:
{context}"""
def create_rag_chain(persist_dir: str = "./chroma_db"):
"""RAGチェーンを作成"""
# ベクトルストアの読み込み
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
persist_directory=persist_dir,
embedding_function=embeddings,
)
# リトリーバーの設定
retriever = vectorstore.as_retriever(
search_type="mmr", # Maximal Marginal Relevance
search_kwargs={
"k": 4,
"fetch_k": 10,
"lambda_mult": 0.7,
},
)
# LLM
llm = ChatOpenAI(
model="gpt-4o",
temperature=0.1,
max_tokens=1024,
)
# 会話メモリ(直近5ターン)
memory = ConversationBufferWindowMemory(
k=5,
memory_key="chat_history",
return_messages=True,
output_key="answer",
)
# プロンプト
prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(SYSTEM_PROMPT),
HumanMessagePromptTemplate.from_template("{question}"),
])
# チェーンの作成
chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=retriever,
memory=memory,
return_source_documents=True,
combine_docs_chain_kwargs={"prompt": prompt},
verbose=False,
)
return chain
class RAGBot:
"""ユーザーごとの会話コンテキストを管理するRAGボット"""
def __init__(self, persist_dir: str = "./chroma_db"):
self.persist_dir = persist_dir
self.user_chains: dict[int, ConversationalRetrievalChain] = {}
def get_chain(self, user_id: int):
"""ユーザーごとのチェーン(会話メモリを分離)"""
if user_id not in self.user_chains:
self.user_chains[user_id] = create_rag_chain(self.persist_dir)
return self.user_chains[user_id]
async def ask(self, user_id: int, question: str) -> tuple[str, list[str]]:
"""質問に回答し、ソースを返す"""
chain = self.get_chain(user_id)
result = chain.invoke({"question": question})
answer = result["answer"]
sources = []
for doc in result.get("source_documents", []):
source = doc.metadata.get("source", "Unknown")
page = doc.metadata.get("page", "")
if page:
sources.append(f"{source} (p.{page})")
else:
sources.append(source)
# 重複を除去
sources = list(dict.fromkeys(sources))
return answer, sources
def reset_memory(self, user_id: int):
"""ユーザーの会話メモリをリセット"""
if user_id in self.user_chains:
del self.user_chains[user_id]
Telegramボットの実装
# bot.py
import logging
from telegram import Update, BotCommand
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
from rag_chain import RAGBot
from config import TELEGRAM_TOKEN
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
rag_bot = RAGBot()
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""開始コマンド"""
welcome = (
"こんにちは!FAQアシスタントです。\n\n"
"お気軽にご質問ください。\n"
"会社のドキュメントに基づいてお答えします。\n\n"
"コマンド:\n"
"/reset - 会話をリセット\n"
"/sources - 検索可能なドキュメント一覧"
)
await update.message.reply_text(welcome)
async def reset(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""会話メモリをリセット"""
user_id = update.effective_user.id
rag_bot.reset_memory(user_id)
await update.message.reply_text("会話がリセットされました。")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""一般メッセージの処理"""
user_id = update.effective_user.id
question = update.message.text
# タイピング表示
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action="typing"
)
try:
answer, sources = await rag_bot.ask(user_id, question)
# 回答のフォーマット
response = answer
if sources:
response += "\n\n参考ドキュメント:\n"
for src in sources[:3]:
response += f" - {src}\n"
await update.message.reply_text(response)
except Exception as e:
logger.error(f"Error: {e}")
await update.message.reply_text(
"申し訳ありません。回答の生成中にエラーが発生しました。"
)
async def post_init(application: Application):
"""ボット起動時にコマンドを登録"""
commands = [
BotCommand("start", "ボットを開始"),
BotCommand("reset", "会話をリセット"),
BotCommand("sources", "検索可能なドキュメント一覧"),
]
await application.bot.set_my_commands(commands)
def main():
app = Application.builder().token(TELEGRAM_TOKEN).post_init(post_init).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("reset", reset))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("Bot started")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()
Dockerでのデプロイ
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# ドキュメントのインデックス作成
RUN python indexer.py
CMD ["python", "bot.py"]
# docker-compose.yml
services:
faq-bot:
build: .
environment:
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./docs:/app/docs
- chroma-data:/app/chroma_db
restart: unless-stopped
volumes:
chroma-data:
docker-compose up -d
ドキュメントの自動更新
# watcher.py - ドキュメント変更を検知して自動再インデックス
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
class DocChangeHandler(FileSystemEventHandler):
def __init__(self, indexer_fn):
self.indexer_fn = indexer_fn
self.last_indexed = 0
def on_modified(self, event):
if event.is_directory:
return
# デバウンス(5秒以内の重複を防止)
now = time.time()
if now - self.last_indexed < 5:
return
self.last_indexed = now
print(f"Document changed: {event.src_path}")
self.indexer_fn()
def watch_docs(docs_dir, indexer_fn):
handler = DocChangeHandler(indexer_fn)
observer = Observer()
observer.schedule(handler, docs_dir, recursive=True)
observer.start()
return observer
パフォーマンス最適化
キャッシング
from functools import lru_cache
import hashlib
class CachedRAGBot(RAGBot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache: dict[str, tuple[str, list[str]]] = {}
async def ask(self, user_id: int, question: str):
cache_key = hashlib.md5(question.lower().strip().encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
answer, sources = await super().ask(user_id, question)
self.cache[cache_key] = (answer, sources)
return answer, sources
まとめ
LangChain + RAG + Telegramでインテリジェントなメモリを構築しました:
- ドキュメントベースの回答:正確な情報のみ提供し、ハルシネーションを最小化
- 会話メモリ:ユーザーごとのコンテキストを維持
- ソース表示:回答の根拠となるドキュメントを透明に提示
- MMR検索:多様性と関連性をバランスよく検索
- 自動更新:ドキュメント変更時に自動で再インデックス
クイズ:RAG Telegramボット理解度チェック(7問)
Q1. RAGにおけるRetrievalの役割は?
ユーザーの質問に関連するドキュメントチャンクをベクトル類似度検索で見つけ、LLMのコンテキストとして提供します。
Q2. MMR(Maximal Marginal Relevance)検索の利点は?
単純な類似度検索と異なり、結果の多様性を考慮し、重複した内容のチャンクを減らします。
Q3. chunk_overlapを設定する理由は?
文章がチャンク境界で切断される場合に文脈が失われるのを防ぐためです。
Q4. ユーザーごとに会話メモリを分離する理由は?
複数のユーザーが同時に使用する際、他のユーザーの会話コンテキストが混ざらないようにするためです。
Q5. ConversationBufferWindowMemoryのk=5は何を意味しますか?
直近5ターンの会話のみをメモリに保持し、トークンコストを制御します。
Q6. ボットが「提供されたドキュメントにはその情報が見つかりません」と回答することが重要な理由は?
RAGボットがドキュメントにない情報をハルシネーション(幻覚)で生成するのを防ぐためです。
Q7. ドキュメント自動更新(watchdog)の動作原理は?
ファイルシステムの変更を検知し、ドキュメントが修正されると自動的にベクトルストアを再インデックスします。