Skip to content
Published on

Telegram Bot + Webhook 実践ガイド:Pythonでのボット構築からKubernetesデプロイまで

Authors
  • Name
    Twitter

1. Telegram Bot API の基礎

1.1 ボットの作成

1. Telegramで @BotFather を検索
2. /newbot コマンドを入力
3. ボット名を入力: "My DevOps Bot"
4. ボットのユーザー名を入力: "my_devops_bot"(必ず _bot で終わる)
5. APIトークンを受領: 123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11

1.2 Polling vs Webhook

方式PollingWebhook
動作ボットが定期的にサーバーにアップデートを要求TelegramがボットサーバーにHTTP POST
インフラサーバー不要(ローカル実行可能)HTTPSサーバーが必要
遅延ポーリング間隔分即時
スケーラビリティ単一インスタンス水平スケーリング可能
用途開発・テストプロダクション

2. Long Polling 方式(開発用)

2.1 基本的なボットの実装

pip install python-telegram-bot==21.8
# bot_polling.py
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
    ApplicationBuilder, CommandHandler, MessageHandler,
    CallbackQueryHandler, ContextTypes, filters
)

TOKEN = "YOUR_BOT_TOKEN"

# /start コマンド
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    keyboard = [
        [InlineKeyboardButton("サーバー状態", callback_data="status")],
        [InlineKeyboardButton("メトリクス", callback_data="metrics")],
        [InlineKeyboardButton("デプロイ", callback_data="deploy")],
    ]
    reply_markup = InlineKeyboardMarkup(keyboard)
    await update.message.reply_text(
        "DevOps Botへようこそ!\n操作を選択してください:",
        reply_markup=reply_markup
    )

# /status コマンド
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
    import subprocess
    result = subprocess.run(
        ["kubectl", "get", "nodes", "-o", "wide"],
        capture_output=True, text=True, timeout=10
    )
    await update.message.reply_text(
        f"```\n{result.stdout}\n```",
        parse_mode="MarkdownV2"
    )

# インラインボタンコールバック
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
    await query.answer()

    if query.data == "status":
        await query.edit_message_text("サーバー状態を確認中...")
        # kubectl実行
        result = check_cluster_status()
        await query.edit_message_text(f"クラスター状態:\n```\n{result}\n```",
                                       parse_mode="MarkdownV2")
    elif query.data == "metrics":
        await query.edit_message_text("メトリクスを収集中...")
        metrics = get_prometheus_metrics()
        await query.edit_message_text(f"現在のメトリクス:\n{metrics}")
    elif query.data == "deploy":
        keyboard = [
            [InlineKeyboardButton("承認", callback_data="deploy_approve")],
            [InlineKeyboardButton("キャンセル", callback_data="deploy_cancel")],
        ]
        await query.edit_message_text(
            "デプロイを実行しますか?",
            reply_markup=InlineKeyboardMarkup(keyboard)
        )

# 一般メッセージ処理
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    text = update.message.text
    # AI応答(OpenAI連携)
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a DevOps assistant."},
            {"role": "user", "content": text}
        ]
    )
    await update.message.reply_text(response.choices[0].message.content)

# アプリ実行
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("status", status))
app.add_handler(CallbackQueryHandler(button_callback))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

print("Bot is running...")
app.run_polling()

3. Webhook 方式(プロダクション)

3.1 FastAPI + Webhook

# bot_webhook.py
from fastapi import FastAPI, Request
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, filters
import uvicorn

TOKEN = "YOUR_BOT_TOKEN"
WEBHOOK_URL = "https://bot.example.com/webhook"

# FastAPIアプリ
fastapi_app = FastAPI()

# Telegram Application
tg_app = Application.builder().token(TOKEN).build()

# ハンドラー登録
async def start(update: Update, context):
    await update.message.reply_text("Hello!")

async def echo(update: Update, context):
    await update.message.reply_text(f"Echo: {update.message.text}")

tg_app.add_handler(CommandHandler("start", start))
tg_app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))

@fastapi_app.on_event("startup")
async def on_startup():
    await tg_app.initialize()
    await tg_app.start()
    # Webhook設定
    await tg_app.bot.set_webhook(
        url=f"{WEBHOOK_URL}/{TOKEN}",
        allowed_updates=["message", "callback_query"],
        drop_pending_updates=True,
    )
    print(f"Webhook set to {WEBHOOK_URL}/{TOKEN}")

@fastapi_app.on_event("shutdown")
async def on_shutdown():
    await tg_app.stop()
    await tg_app.shutdown()

@fastapi_app.post(f"/webhook/{TOKEN}")
async def webhook(request: Request):
    data = await request.json()
    update = Update.de_json(data, tg_app.bot)
    await tg_app.process_update(update)
    return {"ok": True}

@fastapi_app.get("/health")
async def health():
    return {"status": "ok"}

if __name__ == "__main__":
    uvicorn.run(fastapi_app, host="0.0.0.0", port=8080)

3.2 Webhook設定の確認

# Webhookステータス確認
curl "https://api.telegram.org/bot${TOKEN}/getWebhookInfo" | jq

# Webhook削除(Pollingモードに切り替え時)
curl "https://api.telegram.org/bot${TOKEN}/deleteWebhook"

4. 高度な機能

4.1 会話フロー(ConversationHandler)

from telegram.ext import ConversationHandler

# 状態定義
NAME, EMAIL, CONFIRM = range(3)

async def ticket_start(update, context):
    await update.message.reply_text("チケットを作成します。タイトルを入力してください:")
    return NAME

async def ticket_name(update, context):
    context.user_data["title"] = update.message.text
    await update.message.reply_text("説明を入力してください:")
    return EMAIL

async def ticket_desc(update, context):
    context.user_data["description"] = update.message.text
    title = context.user_data["title"]
    desc = context.user_data["description"]

    keyboard = [
        [InlineKeyboardButton("作成", callback_data="confirm_yes")],
        [InlineKeyboardButton("キャンセル", callback_data="confirm_no")],
    ]
    await update.message.reply_text(
        f"チケット確認:\nタイトル: {title}\n説明: {desc}",
        reply_markup=InlineKeyboardMarkup(keyboard)
    )
    return CONFIRM

conv_handler = ConversationHandler(
    entry_points=[CommandHandler("ticket", ticket_start)],
    states={
        NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_name)],
        EMAIL: [MessageHandler(filters.TEXT & ~filters.COMMAND, ticket_desc)],
        CONFIRM: [CallbackQueryHandler(ticket_confirm)],
    },
    fallbacks=[CommandHandler("cancel", cancel)],
)

4.2 スケジューリング(定期通知)

from telegram.ext import ApplicationBuilder

async def daily_report(context: ContextTypes.DEFAULT_TYPE):
    """毎日午前9時のレポート"""
    chat_id = context.job.data["chat_id"]

    # メトリクス収集
    cpu = get_cluster_cpu()
    memory = get_cluster_memory()
    pods = get_pod_count()

    message = (
        "Daily Cluster Report\n"
        f"━━━━━━━━━━━━━━\n"
        f"CPU: {cpu}%\n"
        f"Memory: {memory}%\n"
        f"Pods: {pods}\n"
        f"━━━━━━━━━━━━━━"
    )
    await context.bot.send_message(chat_id=chat_id, text=message)

# スケジュール登録
async def setup_schedule(update, context):
    chat_id = update.effective_chat.id
    context.job_queue.run_daily(
        daily_report,
        time=datetime.time(hour=9, minute=0, tzinfo=ZoneInfo("Asia/Seoul")),
        data={"chat_id": chat_id},
        name=f"daily_report_{chat_id}"
    )
    await update.message.reply_text("毎日午前9時にレポートをお送りします!")

4.3 ファイルのアップロード・ダウンロード

async def handle_document(update: Update, context):
    """ファイルアップロード処理"""
    doc = update.message.document
    file = await context.bot.get_file(doc.file_id)

    # ダウンロード
    local_path = f"/tmp/{doc.file_name}"
    await file.download_to_drive(local_path)

    await update.message.reply_text(f"ファイル保存完了: {doc.file_name}")

async def send_file(update: Update, context):
    """ファイル送信"""
    await context.bot.send_document(
        chat_id=update.effective_chat.id,
        document=open("/tmp/report.pdf", "rb"),
        filename="cluster_report.pdf",
        caption="クラスターレポート"
    )

5. Kubernetesデプロイ

5.1 Dockerfile

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8080
CMD ["uvicorn", "bot_webhook:fastapi_app", "--host", "0.0.0.0", "--port", "8080"]

5.2 Kubernetesマニフェスト

apiVersion: apps/v1
kind: Deployment
metadata:
  name: telegram-bot
  namespace: chatbot
spec:
  replicas: 2
  selector:
    matchLabels:
      app: telegram-bot
  template:
    metadata:
      labels:
        app: telegram-bot
    spec:
      containers:
        - name: bot
          image: registry.example.com/telegram-bot:latest
          ports:
            - containerPort: 8080
          env:
            - name: BOT_TOKEN
              valueFrom:
                secretKeyRef:
                  name: telegram-bot-secret
                  key: token
            - name: WEBHOOK_URL
              value: 'https://bot.example.com'
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 500m
              memory: 256Mi
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: telegram-bot
  namespace: chatbot
spec:
  selector:
    app: telegram-bot
  ports:
    - port: 80
      targetPort: 8080
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: telegram-bot
  namespace: chatbot
  annotations:
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  tls:
    - hosts:
        - bot.example.com
      secretName: telegram-bot-tls
  rules:
    - host: bot.example.com
      http:
        paths:
          - path: /webhook
            pathType: Prefix
            backend:
              service:
                name: telegram-bot
                port:
                  number: 80

6. セキュリティのベストプラクティス

# 1) WebhookURLにトークンを含める(不正リクエスト防止)
WEBHOOK_PATH = f"/webhook/{TOKEN}"

# 2) IPホワイトリスト(TelegramサーバーIP)
TELEGRAM_IPS = ["149.154.160.0/20", "91.108.4.0/22"]

from fastapi import Request, HTTPException
import ipaddress

@fastapi_app.middleware("http")
async def check_telegram_ip(request: Request, call_next):
    if request.url.path.startswith("/webhook"):
        client_ip = ipaddress.ip_address(request.client.host)
        allowed = any(
            client_ip in ipaddress.ip_network(net)
            for net in TELEGRAM_IPS
        )
        if not allowed:
            raise HTTPException(status_code=403)
    return await call_next(request)

# 3) Secret Token (Telegram Bot API 6.1+)
await bot.set_webhook(
    url=WEBHOOK_URL,
    secret_token="my-secret-token-123"
)

@fastapi_app.post(WEBHOOK_PATH)
async def webhook(request: Request):
    if request.headers.get("X-Telegram-Bot-Api-Secret-Token") != "my-secret-token-123":
        raise HTTPException(status_code=403)
    # ...

7. クイズ

Q1. PollingとWebhookの最大の違いは?

Polling:ボットが定期的にTelegramサーバーにアップデートを要求。Webhook:TelegramがボットサーバーにHTTP POSTで即座に配信。Webhookの方が遅延が少なく、サーバーリソースの効率が良い。

Q2. Webhookを使用するために必ず必要なものは?

HTTPS証明書のある公開サーバー。TelegramはHTTP Webhookを許可しない。Let's Encryptなどの無料証明書が使用可能。

Q3. ConversationHandlerの用途は?

多段階の会話フローの管理。状態(state)ごとに異なるハンドラーをマッピングし、ユーザーの入力に応じて会話が進行するシナリオ(チケット作成、設定ウィザードなど)を実装。

Q4. InlineKeyboardButtonのcallback_dataの役割は?

ボタンクリック時にボットに送信される識別子文字列。CallbackQueryHandlerがこの値を基に分岐処理を行う。最大64バイト。

Q5. Webhook URLにボットトークンを含める理由は?

不正リクエストの防止。トークンを知らない外部者が任意のペイロードを送信するのをブロック。さらにX-Telegram-Bot-Api-Secret-Tokenヘッダーの検証も推奨。

Q6. KubernetesでTelegramボットを2つ以上のreplicaで運用する際の注意点は?

Webhook方式はすべてのreplicaが同じWebhook URLを共有するため問題ない(ロードバランサーが分配)。ただし、Pollingは1つのインスタンスのみ可能(メッセージの重複受信問題)。

Q7. job_queue.run_daily()のtimezone設定が重要な理由は?

サーバー時間とユーザー時間が異なる場合がある。特にKubernetes PodはUTCがデフォルト。**ZoneInfo("Asia/Seoul")**を明示しないと韓国時間基準で正確に動作しない。