- Authors
- Name
- 1. Telegram Bot API Basics
- 2. Long Polling Method (For Development)
- 3. Webhook Method (Production)
- 4. Advanced Features
- 5. Kubernetes Deployment
- 6. Security Best Practices
- 7. Quiz
1. Telegram Bot API Basics
1.1 Creating a Bot
1. Search for @BotFather on Telegram
2. Enter the /newbot command
3. Enter the bot name: "My DevOps Bot"
4. Enter the bot username: "my_devops_bot" (must end with _bot)
5. Receive API token: 123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11
1.2 Polling vs Webhook
| Method | Polling | Webhook |
|---|---|---|
| Operation | Bot periodically requests updates from server | Telegram sends HTTP POST to bot server |
| Infrastructure | No server required (can run locally) | HTTPS server required |
| Latency | Depends on polling interval | Instant |
| Scalability | Single instance | Horizontally scalable |
| Use Case | Development/Testing | Production |
2. Long Polling Method (For Development)
2.1 Basic Bot Implementation
pip install python-telegram-bot==21.8
# bot_polling.py
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
ApplicationBuilder, CommandHandler, MessageHandler,
CallbackQueryHandler, ContextTypes, filters
)
TOKEN = "YOUR_BOT_TOKEN"
# /start command
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
keyboard = [
[InlineKeyboardButton("Server Status", callback_data="status")],
[InlineKeyboardButton("Metrics", callback_data="metrics")],
[InlineKeyboardButton("Deploy", callback_data="deploy")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"Welcome to DevOps Bot!\nChoose an action:",
reply_markup=reply_markup
)
# /status command
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
import subprocess
result = subprocess.run(
["kubectl", "get", "nodes", "-o", "wide"],
capture_output=True, text=True, timeout=10
)
await update.message.reply_text(
f"```\n{result.stdout}\n```",
parse_mode="MarkdownV2"
)
# Inline button callback
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
if query.data == "status":
await query.edit_message_text("Checking server status...")
# Run kubectl
result = check_cluster_status()
await query.edit_message_text(f"Cluster Status:\n```\n{result}\n```",
parse_mode="MarkdownV2")
elif query.data == "metrics":
await query.edit_message_text("Collecting metrics...")
metrics = get_prometheus_metrics()
await query.edit_message_text(f"Current Metrics:\n{metrics}")
elif query.data == "deploy":
keyboard = [
[InlineKeyboardButton("Approve", callback_data="deploy_approve")],
[InlineKeyboardButton("Cancel", callback_data="deploy_cancel")],
]
await query.edit_message_text(
"Do you want to proceed with the deployment?",
reply_markup=InlineKeyboardMarkup(keyboard)
)
# General message handler
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text
# AI response (OpenAI integration)
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a DevOps assistant."},
{"role": "user", "content": text}
]
)
await update.message.reply_text(response.choices[0].message.content)
# Run the app
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("status", status))
app.add_handler(CallbackQueryHandler(button_callback))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
print("Bot is running...")
app.run_polling()
3. Webhook Method (Production)
3.1 FastAPI + Webhook
# bot_webhook.py
from fastapi import FastAPI, Request
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, filters
import uvicorn
TOKEN = "YOUR_BOT_TOKEN"
WEBHOOK_URL = "https://bot.example.com/webhook"
# FastAPI app
fastapi_app = FastAPI()
# Telegram Application
tg_app = Application.builder().token(TOKEN).build()
# Register handlers
async def start(update: Update, context):
await update.message.reply_text("Hello!")
async def echo(update: Update, context):
await update.message.reply_text(f"Echo: {update.message.text}")
tg_app.add_handler(CommandHandler("start", start))
tg_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
@fastapi_app.on_event("startup")
async def on_startup():
await tg_app.initialize()
await tg_app.start()
# Set Webhook
await tg_app.bot.set_webhook(
url=f"{WEBHOOK_URL}/{TOKEN}",
allowed_updates=["message", "callback_query"],
drop_pending_updates=True,
)
print(f"Webhook set to {WEBHOOK_URL}/{TOKEN}")
@fastapi_app.on_event("shutdown")
async def on_shutdown():
await tg_app.stop()
await tg_app.shutdown()
@fastapi_app.post(f"/webhook/{TOKEN}")
async def webhook(request: Request):
data = await request.json()
update = Update.de_json(data, tg_app.bot)
await tg_app.process_update(update)
return {"ok": True}
@fastapi_app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
uvicorn.run(fastapi_app, host="0.0.0.0", port=8080)
3.2 Verifying Webhook Configuration
# Check Webhook status
curl "https://api.telegram.org/bot${TOKEN}/getWebhookInfo" | jq
# Delete Webhook (when switching to Polling mode)
curl "https://api.telegram.org/bot${TOKEN}/deleteWebhook"
4. Advanced Features
4.1 Conversation Flow (ConversationHandler)
from telegram.ext import ConversationHandler
# State definitions
NAME, EMAIL, CONFIRM = range(3)
async def ticket_start(update, context):
await update.message.reply_text("Creating a ticket. Enter the title:")
return NAME
async def ticket_name(update, context):
context.user_data["title"] = update.message.text
await update.message.reply_text("Enter the description:")
return EMAIL
async def ticket_desc(update, context):
context.user_data["description"] = update.message.text
title = context.user_data["title"]
desc = context.user_data["description"]
keyboard = [
[InlineKeyboardButton("Create", callback_data="confirm_yes")],
[InlineKeyboardButton("Cancel", callback_data="confirm_no")],
]
await update.message.reply_text(
f"Ticket Confirmation:\nTitle: {title}\nDescription: {desc}",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return CONFIRM
conv_handler = ConversationHandler(
entry_points=[CommandHandler("ticket", ticket_start)],
states={
NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_name)],
EMAIL: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_desc)],
CONFIRM: [CallbackQueryHandler(ticket_confirm)],
},
fallbacks=[CommandHandler("cancel", cancel)],
)
4.2 Scheduling (Regular Notifications)
from telegram.ext import ApplicationBuilder
async def daily_report(context: ContextTypes.DEFAULT_TYPE):
"""Daily report at 9 AM"""
chat_id = context.job.data["chat_id"]
# Collect metrics
cpu = get_cluster_cpu()
memory = get_cluster_memory()
pods = get_pod_count()
message = (
"Daily Cluster Report\n"
f"━━━━━━━━━━━━━━\n"
f"CPU: {cpu}%\n"
f"Memory: {memory}%\n"
f"Pods: {pods}\n"
f"━━━━━━━━━━━━━━"
)
await context.bot.send_message(chat_id=chat_id, text=message)
# Register schedule
async def setup_schedule(update, context):
chat_id = update.effective_chat.id
context.job_queue.run_daily(
daily_report,
time=datetime.time(hour=9, minute=0, tzinfo=ZoneInfo("Asia/Seoul")),
data={"chat_id": chat_id},
name=f"daily_report_{chat_id}"
)
await update.message.reply_text("Daily reports will be sent at 9 AM!")
4.3 File Upload/Download
async def handle_document(update: Update, context):
"""Handle file upload"""
doc = update.message.document
file = await context.bot.get_file(doc.file_id)
# Download
local_path = f"/tmp/{doc.file_name}"
await file.download_to_drive(local_path)
await update.message.reply_text(f"File saved: {doc.file_name}")
async def send_file(update: Update, context):
"""Send file"""
await context.bot.send_document(
chat_id=update.effective_chat.id,
document=open("/tmp/report.pdf", "rb"),
filename="cluster_report.pdf",
caption="Cluster Report"
)
5. Kubernetes Deployment
5.1 Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["uvicorn", "bot_webhook:fastapi_app", "--host", "0.0.0.0", "--port", "8080"]
5.2 Kubernetes Manifests
apiVersion: apps/v1
kind: Deployment
metadata:
name: telegram-bot
namespace: chatbot
spec:
replicas: 2
selector:
matchLabels:
app: telegram-bot
template:
metadata:
labels:
app: telegram-bot
spec:
containers:
- name: bot
image: registry.example.com/telegram-bot:latest
ports:
- containerPort: 8080
env:
- name: BOT_TOKEN
valueFrom:
secretKeyRef:
name: telegram-bot-secret
key: token
- name: WEBHOOK_URL
value: 'https://bot.example.com'
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: telegram-bot
namespace: chatbot
spec:
selector:
app: telegram-bot
ports:
- port: 80
targetPort: 8080
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: telegram-bot
namespace: chatbot
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- bot.example.com
secretName: telegram-bot-tls
rules:
- host: bot.example.com
http:
paths:
- path: /webhook
pathType: Prefix
backend:
service:
name: telegram-bot
port:
number: 80
6. Security Best Practices
# 1) Include token in Webhook URL (prevent unauthorized requests)
WEBHOOK_PATH = f"/webhook/{TOKEN}"
# 2) IP whitelist (Telegram server IPs)
TELEGRAM_IPS = ["149.154.160.0/20", "91.108.4.0/22"]
from fastapi import Request, HTTPException
import ipaddress
@fastapi_app.middleware("http")
async def check_telegram_ip(request: Request, call_next):
if request.url.path.startswith("/webhook"):
client_ip = ipaddress.ip_address(request.client.host)
allowed = any(
client_ip in ipaddress.ip_network(net)
for net in TELEGRAM_IPS
)
if not allowed:
raise HTTPException(status_code=403)
return await call_next(request)
# 3) Secret Token (Telegram Bot API 6.1+)
await bot.set_webhook(
url=WEBHOOK_URL,
secret_token="my-secret-token-123"
)
@fastapi_app.post(WEBHOOK_PATH)
async def webhook(request: Request):
if request.headers.get("X-Telegram-Bot-Api-Secret-Token") != "my-secret-token-123":
raise HTTPException(status_code=403)
# ...
7. Quiz
Q1. What is the biggest difference between Polling and Webhook?
Polling: The bot periodically requests updates from the Telegram server. Webhook: Telegram delivers updates immediately via HTTP POST to the bot server. Webhooks have lower latency and are more resource-efficient.
Q2. What is absolutely required to use Webhooks?
A public server with an HTTPS certificate. Telegram does not allow HTTP webhooks. Free certificates from Let's Encrypt can be used.
Q3. What is the purpose of ConversationHandler?
Managing multi-step conversation flows. It maps different handlers per state, enabling scenarios (ticket creation, setup wizards, etc.) where the conversation progresses based on user input.
Q4. What is the role of InlineKeyboardButton's callback_data?
An identifier string sent to the bot when the button is clicked. The CallbackQueryHandler uses this value for branching logic. Maximum 64 bytes.
Q5. Why include the bot token in the Webhook URL?
To prevent unauthorized requests. It blocks external parties who don't know the token from sending arbitrary payloads. Additionally, verifying the X-Telegram-Bot-Api-Secret-Token header is recommended.
Q6. What should you be careful about when running a Telegram bot with 2 or more replicas on Kubernetes?
The Webhook method is fine since all replicas share the same Webhook URL (the load balancer distributes traffic). However, Polling only allows a single instance (duplicate message reception issue).
Q7. Why is the timezone setting important in job_queue.run_daily()?
Server time and user time may differ. Kubernetes Pods default to UTC. You must specify ZoneInfo("Asia/Seoul") to operate accurately based on Korean time.