Skip to content
Published on

Telegram Bot + Webhook Practical Guide: From Building a Bot with Python to Kubernetes Deployment

Authors
  • Name
    Twitter

1. Telegram Bot API Basics

1.1 Creating a Bot

1. Search for @BotFather on Telegram
2. Enter the /newbot command
3. Enter the bot name: "My DevOps Bot"
4. Enter the bot username: "my_devops_bot" (must end with _bot)
5. Receive API token: 123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11

1.2 Polling vs Webhook

MethodPollingWebhook
OperationBot periodically requests updates from serverTelegram sends HTTP POST to bot server
InfrastructureNo server required (can run locally)HTTPS server required
LatencyDepends on polling intervalInstant
ScalabilitySingle instanceHorizontally scalable
Use CaseDevelopment/TestingProduction

2. Long Polling Method (For Development)

2.1 Basic Bot Implementation

pip install python-telegram-bot==21.8
# bot_polling.py
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
    ApplicationBuilder, CommandHandler, MessageHandler,
    CallbackQueryHandler, ContextTypes, filters
)

TOKEN = "YOUR_BOT_TOKEN"

# /start command
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    keyboard = [
        [InlineKeyboardButton("Server Status", callback_data="status")],
        [InlineKeyboardButton("Metrics", callback_data="metrics")],
        [InlineKeyboardButton("Deploy", callback_data="deploy")],
    ]
    reply_markup = InlineKeyboardMarkup(keyboard)
    await update.message.reply_text(
        "Welcome to DevOps Bot!\nChoose an action:",
        reply_markup=reply_markup
    )

# /status command
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
    import subprocess
    result = subprocess.run(
        ["kubectl", "get", "nodes", "-o", "wide"],
        capture_output=True, text=True, timeout=10
    )
    await update.message.reply_text(
        f"```\n{result.stdout}\n```",
        parse_mode="MarkdownV2"
    )

# Inline button callback
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
    await query.answer()

    if query.data == "status":
        await query.edit_message_text("Checking server status...")
        # Run kubectl
        result = check_cluster_status()
        await query.edit_message_text(f"Cluster Status:\n```\n{result}\n```",
                                       parse_mode="MarkdownV2")
    elif query.data == "metrics":
        await query.edit_message_text("Collecting metrics...")
        metrics = get_prometheus_metrics()
        await query.edit_message_text(f"Current Metrics:\n{metrics}")
    elif query.data == "deploy":
        keyboard = [
            [InlineKeyboardButton("Approve", callback_data="deploy_approve")],
            [InlineKeyboardButton("Cancel", callback_data="deploy_cancel")],
        ]
        await query.edit_message_text(
            "Do you want to proceed with the deployment?",
            reply_markup=InlineKeyboardMarkup(keyboard)
        )

# General message handler
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    text = update.message.text
    # AI response (OpenAI integration)
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a DevOps assistant."},
            {"role": "user", "content": text}
        ]
    )
    await update.message.reply_text(response.choices[0].message.content)

# Run the app
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("status", status))
app.add_handler(CallbackQueryHandler(button_callback))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

print("Bot is running...")
app.run_polling()

3. Webhook Method (Production)

3.1 FastAPI + Webhook

# bot_webhook.py
from fastapi import FastAPI, Request
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, filters
import uvicorn

TOKEN = "YOUR_BOT_TOKEN"
WEBHOOK_URL = "https://bot.example.com/webhook"

# FastAPI app
fastapi_app = FastAPI()

# Telegram Application
tg_app = Application.builder().token(TOKEN).build()

# Register handlers
async def start(update: Update, context):
    await update.message.reply_text("Hello!")

async def echo(update: Update, context):
    await update.message.reply_text(f"Echo: {update.message.text}")

tg_app.add_handler(CommandHandler("start", start))
tg_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))

@fastapi_app.on_event("startup")
async def on_startup():
    await tg_app.initialize()
    await tg_app.start()
    # Set Webhook
    await tg_app.bot.set_webhook(
        url=f"{WEBHOOK_URL}/{TOKEN}",
        allowed_updates=["message", "callback_query"],
        drop_pending_updates=True,
    )
    print(f"Webhook set to {WEBHOOK_URL}/{TOKEN}")

@fastapi_app.on_event("shutdown")
async def on_shutdown():
    await tg_app.stop()
    await tg_app.shutdown()

@fastapi_app.post(f"/webhook/{TOKEN}")
async def webhook(request: Request):
    data = await request.json()
    update = Update.de_json(data, tg_app.bot)
    await tg_app.process_update(update)
    return {"ok": True}

@fastapi_app.get("/health")
async def health():
    return {"status": "ok"}

if __name__ == "__main__":
    uvicorn.run(fastapi_app, host="0.0.0.0", port=8080)

3.2 Verifying Webhook Configuration

# Check Webhook status
curl "https://api.telegram.org/bot${TOKEN}/getWebhookInfo" | jq

# Delete Webhook (when switching to Polling mode)
curl "https://api.telegram.org/bot${TOKEN}/deleteWebhook"

4. Advanced Features

4.1 Conversation Flow (ConversationHandler)

from telegram.ext import ConversationHandler

# State definitions
NAME, EMAIL, CONFIRM = range(3)

async def ticket_start(update, context):
    await update.message.reply_text("Creating a ticket. Enter the title:")
    return NAME

async def ticket_name(update, context):
    context.user_data["title"] = update.message.text
    await update.message.reply_text("Enter the description:")
    return EMAIL

async def ticket_desc(update, context):
    context.user_data["description"] = update.message.text
    title = context.user_data["title"]
    desc = context.user_data["description"]

    keyboard = [
        [InlineKeyboardButton("Create", callback_data="confirm_yes")],
        [InlineKeyboardButton("Cancel", callback_data="confirm_no")],
    ]
    await update.message.reply_text(
        f"Ticket Confirmation:\nTitle: {title}\nDescription: {desc}",
        reply_markup=InlineKeyboardMarkup(keyboard)
    )
    return CONFIRM

conv_handler = ConversationHandler(
    entry_points=[CommandHandler("ticket", ticket_start)],
    states={
        NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_name)],
        EMAIL: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_desc)],
        CONFIRM: [CallbackQueryHandler(ticket_confirm)],
    },
    fallbacks=[CommandHandler("cancel", cancel)],
)

4.2 Scheduling (Regular Notifications)

from telegram.ext import ApplicationBuilder

async def daily_report(context: ContextTypes.DEFAULT_TYPE):
    """Daily report at 9 AM"""
    chat_id = context.job.data["chat_id"]

    # Collect metrics
    cpu = get_cluster_cpu()
    memory = get_cluster_memory()
    pods = get_pod_count()

    message = (
        "Daily Cluster Report\n"
        f"━━━━━━━━━━━━━━\n"
        f"CPU: {cpu}%\n"
        f"Memory: {memory}%\n"
        f"Pods: {pods}\n"
        f"━━━━━━━━━━━━━━"
    )
    await context.bot.send_message(chat_id=chat_id, text=message)

# Register schedule
async def setup_schedule(update, context):
    chat_id = update.effective_chat.id
    context.job_queue.run_daily(
        daily_report,
        time=datetime.time(hour=9, minute=0, tzinfo=ZoneInfo("Asia/Seoul")),
        data={"chat_id": chat_id},
        name=f"daily_report_{chat_id}"
    )
    await update.message.reply_text("Daily reports will be sent at 9 AM!")

4.3 File Upload/Download

async def handle_document(update: Update, context):
    """Handle file upload"""
    doc = update.message.document
    file = await context.bot.get_file(doc.file_id)

    # Download
    local_path = f"/tmp/{doc.file_name}"
    await file.download_to_drive(local_path)

    await update.message.reply_text(f"File saved: {doc.file_name}")

async def send_file(update: Update, context):
    """Send file"""
    await context.bot.send_document(
        chat_id=update.effective_chat.id,
        document=open("/tmp/report.pdf", "rb"),
        filename="cluster_report.pdf",
        caption="Cluster Report"
    )

5. Kubernetes Deployment

5.1 Dockerfile

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8080
CMD ["uvicorn", "bot_webhook:fastapi_app", "--host", "0.0.0.0", "--port", "8080"]

5.2 Kubernetes Manifests

apiVersion: apps/v1
kind: Deployment
metadata:
  name: telegram-bot
  namespace: chatbot
spec:
  replicas: 2
  selector:
    matchLabels:
      app: telegram-bot
  template:
    metadata:
      labels:
        app: telegram-bot
    spec:
      containers:
        - name: bot
          image: registry.example.com/telegram-bot:latest
          ports:
            - containerPort: 8080
          env:
            - name: BOT_TOKEN
              valueFrom:
                secretKeyRef:
                  name: telegram-bot-secret
                  key: token
            - name: WEBHOOK_URL
              value: 'https://bot.example.com'
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 500m
              memory: 256Mi
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: telegram-bot
  namespace: chatbot
spec:
  selector:
    app: telegram-bot
  ports:
    - port: 80
      targetPort: 8080
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: telegram-bot
  namespace: chatbot
  annotations:
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  tls:
    - hosts:
        - bot.example.com
      secretName: telegram-bot-tls
  rules:
    - host: bot.example.com
      http:
        paths:
          - path: /webhook
            pathType: Prefix
            backend:
              service:
                name: telegram-bot
                port:
                  number: 80

6. Security Best Practices

# 1) Include token in Webhook URL (prevent unauthorized requests)
WEBHOOK_PATH = f"/webhook/{TOKEN}"

# 2) IP whitelist (Telegram server IPs)
TELEGRAM_IPS = ["149.154.160.0/20", "91.108.4.0/22"]

from fastapi import Request, HTTPException
import ipaddress

@fastapi_app.middleware("http")
async def check_telegram_ip(request: Request, call_next):
    if request.url.path.startswith("/webhook"):
        client_ip = ipaddress.ip_address(request.client.host)
        allowed = any(
            client_ip in ipaddress.ip_network(net)
            for net in TELEGRAM_IPS
        )
        if not allowed:
            raise HTTPException(status_code=403)
    return await call_next(request)

# 3) Secret Token (Telegram Bot API 6.1+)
await bot.set_webhook(
    url=WEBHOOK_URL,
    secret_token="my-secret-token-123"
)

@fastapi_app.post(WEBHOOK_PATH)
async def webhook(request: Request):
    if request.headers.get("X-Telegram-Bot-Api-Secret-Token") != "my-secret-token-123":
        raise HTTPException(status_code=403)
    # ...

7. Quiz

Q1. What is the biggest difference between Polling and Webhook?

Polling: The bot periodically requests updates from the Telegram server. Webhook: Telegram delivers updates immediately via HTTP POST to the bot server. Webhooks have lower latency and are more resource-efficient.

Q2. What is absolutely required to use Webhooks?

A public server with an HTTPS certificate. Telegram does not allow HTTP webhooks. Free certificates from Let's Encrypt can be used.

Q3. What is the purpose of ConversationHandler?

Managing multi-step conversation flows. It maps different handlers per state, enabling scenarios (ticket creation, setup wizards, etc.) where the conversation progresses based on user input.

Q4. What is the role of InlineKeyboardButton's callback_data?

An identifier string sent to the bot when the button is clicked. The CallbackQueryHandler uses this value for branching logic. Maximum 64 bytes.

Q5. Why include the bot token in the Webhook URL?

To prevent unauthorized requests. It blocks external parties who don't know the token from sending arbitrary payloads. Additionally, verifying the X-Telegram-Bot-Api-Secret-Token header is recommended.

Q6. What should you be careful about when running a Telegram bot with 2 or more replicas on Kubernetes?

The Webhook method is fine since all replicas share the same Webhook URL (the load balancer distributes traffic). However, Polling only allows a single instance (duplicate message reception issue).

Q7. Why is the timezone setting important in job_queue.run_daily()?

Server time and user time may differ. Kubernetes Pods default to UTC. You must specify ZoneInfo("Asia/Seoul") to operate accurately based on Korean time.