- Authors
- Name
- 1. Telegram Bot API の基礎
- 2. Long Polling 方式(開発用)
- 3. Webhook 方式(プロダクション)
- 4. 高度な機能
- 5. Kubernetesデプロイ
- 6. セキュリティのベストプラクティス
- 7. クイズ
1. Telegram Bot API の基礎
1.1 ボットの作成
1. Telegramで @BotFather を検索
2. /newbot コマンドを入力
3. ボット名を入力: "My DevOps Bot"
4. ボットのユーザー名を入力: "my_devops_bot"(必ず _bot で終わる)
5. APIトークンを受領: 123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11
1.2 Polling vs Webhook
| 方式 | Polling | Webhook |
|---|---|---|
| 動作 | ボットが定期的にサーバーにアップデートを要求 | TelegramがボットサーバーにHTTP POST |
| インフラ | サーバー不要(ローカル実行可能) | HTTPSサーバーが必要 |
| 遅延 | ポーリング間隔分 | 即時 |
| スケーラビリティ | 単一インスタンス | 水平スケーリング可能 |
| 用途 | 開発・テスト | プロダクション |
2. Long Polling 方式(開発用)
2.1 基本的なボットの実装
pip install python-telegram-bot==21.8
# bot_polling.py
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
ApplicationBuilder, CommandHandler, MessageHandler,
CallbackQueryHandler, ContextTypes, filters
)
TOKEN = "YOUR_BOT_TOKEN"
# /start コマンド
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
keyboard = [
[InlineKeyboardButton("サーバー状態", callback_data="status")],
[InlineKeyboardButton("メトリクス", callback_data="metrics")],
[InlineKeyboardButton("デプロイ", callback_data="deploy")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"DevOps Botへようこそ!\n操作を選択してください:",
reply_markup=reply_markup
)
# /status コマンド
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
import subprocess
result = subprocess.run(
["kubectl", "get", "nodes", "-o", "wide"],
capture_output=True, text=True, timeout=10
)
await update.message.reply_text(
f"```\n{result.stdout}\n```",
parse_mode="MarkdownV2"
)
# インラインボタンコールバック
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
if query.data == "status":
await query.edit_message_text("サーバー状態を確認中...")
# kubectl実行
result = check_cluster_status()
await query.edit_message_text(f"クラスター状態:\n```\n{result}\n```",
parse_mode="MarkdownV2")
elif query.data == "metrics":
await query.edit_message_text("メトリクスを収集中...")
metrics = get_prometheus_metrics()
await query.edit_message_text(f"現在のメトリクス:\n{metrics}")
elif query.data == "deploy":
keyboard = [
[InlineKeyboardButton("承認", callback_data="deploy_approve")],
[InlineKeyboardButton("キャンセル", callback_data="deploy_cancel")],
]
await query.edit_message_text(
"デプロイを実行しますか?",
reply_markup=InlineKeyboardMarkup(keyboard)
)
# 一般メッセージ処理
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text
# AI応答(OpenAI連携)
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a DevOps assistant."},
{"role": "user", "content": text}
]
)
await update.message.reply_text(response.choices[0].message.content)
# アプリ実行
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("status", status))
app.add_handler(CallbackQueryHandler(button_callback))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
print("Bot is running...")
app.run_polling()
3. Webhook 方式(プロダクション)
3.1 FastAPI + Webhook
# bot_webhook.py
from fastapi import FastAPI, Request
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, filters
import uvicorn
TOKEN = "YOUR_BOT_TOKEN"
WEBHOOK_URL = "https://bot.example.com/webhook"
# FastAPIアプリ
fastapi_app = FastAPI()
# Telegram Application
tg_app = Application.builder().token(TOKEN).build()
# ハンドラー登録
async def start(update: Update, context):
await update.message.reply_text("Hello!")
async def echo(update: Update, context):
await update.message.reply_text(f"Echo: {update.message.text}")
tg_app.add_handler(CommandHandler("start", start))
tg_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
@fastapi_app.on_event("startup")
async def on_startup():
await tg_app.initialize()
await tg_app.start()
# Webhook設定
await tg_app.bot.set_webhook(
url=f"{WEBHOOK_URL}/{TOKEN}",
allowed_updates=["message", "callback_query"],
drop_pending_updates=True,
)
print(f"Webhook set to {WEBHOOK_URL}/{TOKEN}")
@fastapi_app.on_event("shutdown")
async def on_shutdown():
await tg_app.stop()
await tg_app.shutdown()
@fastapi_app.post(f"/webhook/{TOKEN}")
async def webhook(request: Request):
data = await request.json()
update = Update.de_json(data, tg_app.bot)
await tg_app.process_update(update)
return {"ok": True}
@fastapi_app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
uvicorn.run(fastapi_app, host="0.0.0.0", port=8080)
3.2 Webhook設定の確認
# Webhookステータス確認
curl "https://api.telegram.org/bot${TOKEN}/getWebhookInfo" | jq
# Webhook削除(Pollingモードに切り替え時)
curl "https://api.telegram.org/bot${TOKEN}/deleteWebhook"
4. 高度な機能
4.1 会話フロー(ConversationHandler)
from telegram.ext import ConversationHandler
# 状態定義
NAME, EMAIL, CONFIRM = range(3)
async def ticket_start(update, context):
await update.message.reply_text("チケットを作成します。タイトルを入力してください:")
return NAME
async def ticket_name(update, context):
context.user_data["title"] = update.message.text
await update.message.reply_text("説明を入力してください:")
return EMAIL
async def ticket_desc(update, context):
context.user_data["description"] = update.message.text
title = context.user_data["title"]
desc = context.user_data["description"]
keyboard = [
[InlineKeyboardButton("作成", callback_data="confirm_yes")],
[InlineKeyboardButton("キャンセル", callback_data="confirm_no")],
]
await update.message.reply_text(
f"チケット確認:\nタイトル: {title}\n説明: {desc}",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return CONFIRM
conv_handler = ConversationHandler(
entry_points=[CommandHandler("ticket", ticket_start)],
states={
NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_name)],
EMAIL: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_desc)],
CONFIRM: [CallbackQueryHandler(ticket_confirm)],
},
fallbacks=[CommandHandler("cancel", cancel)],
)
4.2 スケジューリング(定期通知)
from telegram.ext import ApplicationBuilder
async def daily_report(context: ContextTypes.DEFAULT_TYPE):
"""毎日午前9時のレポート"""
chat_id = context.job.data["chat_id"]
# メトリクス収集
cpu = get_cluster_cpu()
memory = get_cluster_memory()
pods = get_pod_count()
message = (
"Daily Cluster Report\n"
f"━━━━━━━━━━━━━━\n"
f"CPU: {cpu}%\n"
f"Memory: {memory}%\n"
f"Pods: {pods}\n"
f"━━━━━━━━━━━━━━"
)
await context.bot.send_message(chat_id=chat_id, text=message)
# スケジュール登録
async def setup_schedule(update, context):
chat_id = update.effective_chat.id
context.job_queue.run_daily(
daily_report,
time=datetime.time(hour=9, minute=0, tzinfo=ZoneInfo("Asia/Seoul")),
data={"chat_id": chat_id},
name=f"daily_report_{chat_id}"
)
await update.message.reply_text("毎日午前9時にレポートをお送りします!")
4.3 ファイルのアップロード・ダウンロード
async def handle_document(update: Update, context):
"""ファイルアップロード処理"""
doc = update.message.document
file = await context.bot.get_file(doc.file_id)
# ダウンロード
local_path = f"/tmp/{doc.file_name}"
await file.download_to_drive(local_path)
await update.message.reply_text(f"ファイル保存完了: {doc.file_name}")
async def send_file(update: Update, context):
"""ファイル送信"""
await context.bot.send_document(
chat_id=update.effective_chat.id,
document=open("/tmp/report.pdf", "rb"),
filename="cluster_report.pdf",
caption="クラスターレポート"
)
5. Kubernetesデプロイ
5.1 Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["uvicorn", "bot_webhook:fastapi_app", "--host", "0.0.0.0", "--port", "8080"]
5.2 Kubernetesマニフェスト
apiVersion: apps/v1
kind: Deployment
metadata:
name: telegram-bot
namespace: chatbot
spec:
replicas: 2
selector:
matchLabels:
app: telegram-bot
template:
metadata:
labels:
app: telegram-bot
spec:
containers:
- name: bot
image: registry.example.com/telegram-bot:latest
ports:
- containerPort: 8080
env:
- name: BOT_TOKEN
valueFrom:
secretKeyRef:
name: telegram-bot-secret
key: token
- name: WEBHOOK_URL
value: 'https://bot.example.com'
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: telegram-bot
namespace: chatbot
spec:
selector:
app: telegram-bot
ports:
- port: 80
targetPort: 8080
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: telegram-bot
namespace: chatbot
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- bot.example.com
secretName: telegram-bot-tls
rules:
- host: bot.example.com
http:
paths:
- path: /webhook
pathType: Prefix
backend:
service:
name: telegram-bot
port:
number: 80
6. セキュリティのベストプラクティス
# 1) WebhookURLにトークンを含める(不正リクエスト防止)
WEBHOOK_PATH = f"/webhook/{TOKEN}"
# 2) IPホワイトリスト(TelegramサーバーIP)
TELEGRAM_IPS = ["149.154.160.0/20", "91.108.4.0/22"]
from fastapi import Request, HTTPException
import ipaddress
@fastapi_app.middleware("http")
async def check_telegram_ip(request: Request, call_next):
if request.url.path.startswith("/webhook"):
client_ip = ipaddress.ip_address(request.client.host)
allowed = any(
client_ip in ipaddress.ip_network(net)
for net in TELEGRAM_IPS
)
if not allowed:
raise HTTPException(status_code=403)
return await call_next(request)
# 3) Secret Token (Telegram Bot API 6.1+)
await bot.set_webhook(
url=WEBHOOK_URL,
secret_token="my-secret-token-123"
)
@fastapi_app.post(WEBHOOK_PATH)
async def webhook(request: Request):
if request.headers.get("X-Telegram-Bot-Api-Secret-Token") != "my-secret-token-123":
raise HTTPException(status_code=403)
# ...
7. クイズ
Q1. PollingとWebhookの最大の違いは?
Polling:ボットが定期的にTelegramサーバーにアップデートを要求。Webhook:TelegramがボットサーバーにHTTP POSTで即座に配信。Webhookの方が遅延が少なく、サーバーリソースの効率が良い。
Q2. Webhookを使用するために必ず必要なものは?
HTTPS証明書のある公開サーバー。TelegramはHTTP Webhookを許可しない。Let's Encryptなどの無料証明書が使用可能。
Q3. ConversationHandlerの用途は?
多段階の会話フローの管理。状態(state)ごとに異なるハンドラーをマッピングし、ユーザーの入力に応じて会話が進行するシナリオ(チケット作成、設定ウィザードなど)を実装。
Q4. InlineKeyboardButtonのcallback_dataの役割は?
ボタンクリック時にボットに送信される識別子文字列。CallbackQueryHandlerがこの値を基に分岐処理を行う。最大64バイト。
Q5. Webhook URLにボットトークンを含める理由は?
不正リクエストの防止。トークンを知らない外部者が任意のペイロードを送信するのをブロック。さらにX-Telegram-Bot-Api-Secret-Tokenヘッダーの検証も推奨。
Q6. KubernetesでTelegramボットを2つ以上のreplicaで運用する際の注意点は?
Webhook方式はすべてのreplicaが同じWebhook URLを共有するため問題ない(ロードバランサーが分配)。ただし、Pollingは1つのインスタンスのみ可能(メッセージの重複受信問題)。
Q7. job_queue.run_daily()のtimezone設定が重要な理由は?
サーバー時間とユーザー時間が異なる場合がある。特にKubernetes PodはUTCがデフォルト。**ZoneInfo("Asia/Seoul")**を明示しないと韓国時間基準で正確に動作しない。