Skip to content
Published on

Building an Intelligent Telegram FAQ Bot with LangChain + RAG: A Document-Based Q&A System

Authors
  • Name
    Twitter

Introduction

Rule-based chatbots can only answer predefined questions, but RAG (Retrieval-Augmented Generation)-based chatbots retrieve relevant information from documents and respond in natural language. In this article, we build a Telegram bot that answers questions based on company FAQ documents.

Architecture

User Question
Telegram Bot API
LangChain RAG Pipeline
    ├── 1. Query Embedding (OpenAI)
    ├── 2. Vector Search (ChromaDB)
    ├── 3. Context Retrieval (Top-K)
    └── 4. LLM Generation (GPT-4o)
Answer + Source Citation

Environment Setup

pip install langchain langchain-openai langchain-community \
  chromadb python-telegram-bot tiktoken \
  pypdf docx2txt unstructured
# config.py
import os

TELEGRAM_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

# RAG Settings
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
TOP_K = 4
MODEL_NAME = "gpt-4o"
EMBEDDING_MODEL = "text-embedding-3-small"

Document Loading and Indexing

# indexer.py
from langchain_community.document_loaders import (
    DirectoryLoader,
    PyPDFLoader,
    TextLoader,
    Docx2txtLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

def load_documents(docs_dir: str):
    """Load documents in various formats"""
    loaders = {
        "**/*.pdf": PyPDFLoader,
        "**/*.txt": TextLoader,
        "**/*.md": TextLoader,
        "**/*.docx": Docx2txtLoader,
    }

    all_docs = []
    for glob_pattern, loader_cls in loaders.items():
        loader = DirectoryLoader(
            docs_dir,
            glob=glob_pattern,
            loader_cls=loader_cls,
            show_progress=True,
        )
        docs = loader.load()
        all_docs.extend(docs)
        print(f"Loaded {len(docs)} docs from {glob_pattern}")

    return all_docs

def create_vector_store(docs_dir: str, persist_dir: str = "./chroma_db"):
    """Split documents into chunks and store in the vector store"""
    # Load documents
    documents = load_documents(docs_dir)
    print(f"Total documents: {len(documents)}")

    # Text splitting
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
        separators=["\n\n", "\n", ".", "!", "?", ",", " "],
    )
    chunks = text_splitter.split_documents(documents)
    print(f"Total chunks: {len(chunks)}")

    # Generate embeddings & save to vector store
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=persist_dir,
        collection_metadata={"hnsw:space": "cosine"},
    )

    print(f"Vector store created at {persist_dir}")
    return vectorstore

if __name__ == "__main__":
    create_vector_store("./docs")

RAG Chain Implementation

# rag_chain.py
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferWindowMemory
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate

SYSTEM_PROMPT = """You are a company FAQ assistant. Answer questions based on the provided context.

Rules:
1. Only use information from the context.
2. If unsure, respond with "I could not find that information in the provided documents."
3. Include the source documents referenced at the end of your answer.
4. Keep answers concise and clear.

Context:
{context}"""

def create_rag_chain(persist_dir: str = "./chroma_db"):
    """Create the RAG chain"""
    # Load vector store
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    vectorstore = Chroma(
        persist_directory=persist_dir,
        embedding_function=embeddings,
    )

    # Retriever configuration
    retriever = vectorstore.as_retriever(
        search_type="mmr",  # Maximal Marginal Relevance
        search_kwargs={
            "k": 4,
            "fetch_k": 10,
            "lambda_mult": 0.7,
        },
    )

    # LLM
    llm = ChatOpenAI(
        model="gpt-4o",
        temperature=0.1,
        max_tokens=1024,
    )

    # Conversation memory (last 5 turns)
    memory = ConversationBufferWindowMemory(
        k=5,
        memory_key="chat_history",
        return_messages=True,
        output_key="answer",
    )

    # Prompt
    prompt = ChatPromptTemplate.from_messages([
        SystemMessagePromptTemplate.from_template(SYSTEM_PROMPT),
        HumanMessagePromptTemplate.from_template("{question}"),
    ])

    # Create chain
    chain = ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=retriever,
        memory=memory,
        return_source_documents=True,
        combine_docs_chain_kwargs={"prompt": prompt},
        verbose=False,
    )

    return chain

class RAGBot:
    """RAG bot that manages per-user conversation context"""

    def __init__(self, persist_dir: str = "./chroma_db"):
        self.persist_dir = persist_dir
        self.user_chains: dict[int, ConversationalRetrievalChain] = {}

    def get_chain(self, user_id: int):
        """Per-user chain (separate conversation memory)"""
        if user_id not in self.user_chains:
            self.user_chains[user_id] = create_rag_chain(self.persist_dir)
        return self.user_chains[user_id]

    async def ask(self, user_id: int, question: str) -> tuple[str, list[str]]:
        """Answer a question and return sources"""
        chain = self.get_chain(user_id)
        result = chain.invoke({"question": question})

        answer = result["answer"]
        sources = []
        for doc in result.get("source_documents", []):
            source = doc.metadata.get("source", "Unknown")
            page = doc.metadata.get("page", "")
            if page:
                sources.append(f"{source} (p.{page})")
            else:
                sources.append(source)

        # Remove duplicates
        sources = list(dict.fromkeys(sources))
        return answer, sources

    def reset_memory(self, user_id: int):
        """Reset conversation memory for a user"""
        if user_id in self.user_chains:
            del self.user_chains[user_id]

Telegram Bot Implementation

# bot.py
import logging
from telegram import Update, BotCommand
from telegram.ext import (
    Application,
    CommandHandler,
    MessageHandler,
    filters,
    ContextTypes,
)
from rag_chain import RAGBot
from config import TELEGRAM_TOKEN

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

rag_bot = RAGBot()

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Start command"""
    welcome = (
        "Hello! I'm the FAQ assistant.\n\n"
        "Feel free to ask me anything.\n"
        "I'll answer based on company documents.\n\n"
        "Commands:\n"
        "/reset - Reset conversation\n"
        "/sources - List searchable documents"
    )
    await update.message.reply_text(welcome)

async def reset(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Reset conversation memory"""
    user_id = update.effective_user.id
    rag_bot.reset_memory(user_id)
    await update.message.reply_text("Conversation has been reset.")

async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Handle general messages"""
    user_id = update.effective_user.id
    question = update.message.text

    # Show typing indicator
    await context.bot.send_chat_action(
        chat_id=update.effective_chat.id,
        action="typing"
    )

    try:
        answer, sources = await rag_bot.ask(user_id, question)

        # Format response
        response = answer
        if sources:
            response += "\n\nReference Documents:\n"
            for src in sources[:3]:
                response += f"  - {src}\n"

        await update.message.reply_text(response)

    except Exception as e:
        logger.error(f"Error: {e}")
        await update.message.reply_text(
            "Sorry, an error occurred while generating the answer."
        )

async def post_init(application: Application):
    """Register commands on bot startup"""
    commands = [
        BotCommand("start", "Start the bot"),
        BotCommand("reset", "Reset conversation"),
        BotCommand("sources", "List searchable documents"),
    ]
    await application.bot.set_my_commands(commands)

def main():
    app = Application.builder().token(TELEGRAM_TOKEN).post_init(post_init).build()

    app.add_handler(CommandHandler("start", start))
    app.add_handler(CommandHandler("reset", reset))
    app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

    logger.info("Bot started")
    app.run_polling(allowed_updates=Update.ALL_TYPES)

if __name__ == "__main__":
    main()

Deploying with Docker

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Index documents
RUN python indexer.py

CMD ["python", "bot.py"]
# docker-compose.yml
services:
  faq-bot:
    build: .
    environment:
      - TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - ./docs:/app/docs
      - chroma-data:/app/chroma_db
    restart: unless-stopped

volumes:
  chroma-data:
docker-compose up -d

Automatic Document Updates

# watcher.py - Detect document changes and auto-reindex
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time

class DocChangeHandler(FileSystemEventHandler):
    def __init__(self, indexer_fn):
        self.indexer_fn = indexer_fn
        self.last_indexed = 0

    def on_modified(self, event):
        if event.is_directory:
            return
        # Debounce (prevent duplicates within 5 seconds)
        now = time.time()
        if now - self.last_indexed < 5:
            return
        self.last_indexed = now

        print(f"Document changed: {event.src_path}")
        self.indexer_fn()

def watch_docs(docs_dir, indexer_fn):
    handler = DocChangeHandler(indexer_fn)
    observer = Observer()
    observer.schedule(handler, docs_dir, recursive=True)
    observer.start()
    return observer

Performance Optimization

Caching

from functools import lru_cache
import hashlib

class CachedRAGBot(RAGBot):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache: dict[str, tuple[str, list[str]]] = {}

    async def ask(self, user_id: int, question: str):
        cache_key = hashlib.md5(question.lower().strip().encode()).hexdigest()

        if cache_key in self.cache:
            return self.cache[cache_key]

        answer, sources = await super().ask(user_id, question)
        self.cache[cache_key] = (answer, sources)
        return answer, sources

Summary

We built an intelligent FAQ bot using LangChain + RAG + Telegram:

  • Document-based answers: Provides only accurate information, minimizing hallucinations
  • Conversation memory: Maintains per-user context
  • Source citation: Transparently presents the reference documents for answers
  • MMR search: Balances diversity and relevance in search results
  • Auto-update: Automatically re-indexes when documents change

Quiz: RAG Telegram Bot Comprehension Check (7 Questions)

Q1. What is the role of Retrieval in RAG?

It finds document chunks related to the user's question through vector similarity search and provides them as context to the LLM.

Q2. What is the advantage of MMR (Maximal Marginal Relevance) search?

Unlike simple similarity search, it considers diversity in results, reducing chunks with overlapping content.

Q3. Why do we set chunk_overlap?

To prevent context loss when sentences get cut off at chunk boundaries.

Q4. Why do we separate conversation memory per user?

To prevent conversation contexts from mixing between different users when multiple users are using the bot simultaneously.

Q5. What does k=5 mean in ConversationBufferWindowMemory?

Only the last 5 turns of conversation are kept in memory to control token costs.

Q6. Why is it important for the bot to respond "I could not find that information in the provided documents"?

To prevent the RAG bot from generating information not present in the documents through hallucination.

Q7. How does the automatic document update (watchdog) work?

It detects file system changes and automatically re-indexes the vector store when documents are modified.