- Authors

- Name
- Youngju Kim
- @fjvbn20031
概要
現代の分散システムにおいて、サービス間通信はますます複雑になっています。 同期的なHTTP呼び出しだけでは、高いスループット、障害分離、柔軟なスケーリングを達成することは困難です。 この記事では、メッセージキュー(Message Queue) ベースの非同期アーキテクチャの核心概念を整理し、 Apache KafkaとRabbitMQを詳細に比較し、LLM Gatewayでの実践的な非同期処理パターンまでカバーします。
1. 同期 vs 非同期アーキテクチャ
1.1 同期(Synchronous)通信の限界
同期通信では、クライアントがサーバーにリクエストを送信し、レスポンスが返るまで待機します。
Client --> Service A --> Service B --> Service C
(blocking) (blocking) (processing)
問題点:
- 密結合: 1つのサービスがダウンするとチェーン全体が失敗
- レイテンシの蓄積: 各サービスの応答時間が合算される
- スケーリング制約: 最も遅いサービスが全体のスループットを決定
- リソース浪費: 応答待ち中にスレッド/コネクションを占有
1.2 非同期(Asynchronous)通信の利点
メッセージキューを導入すると、プロデューサーとコンシューマーが分離されます。
Producer --> [Message Queue] --> Consumer
(fire) (buffer/persist) (process at own pace)
利点:
- 疎結合: プロデューサーとコンシューマーが独立して動作
- 負荷平準化: トラフィック急増時にキューがバッファの役割
- 障害分離: コンシューマーの障害がプロデューサーに影響しない
- 独立スケーリング: コンシューマーを独立してスケールアウト可能
- ピークシェービング: 瞬間的な負荷を平滑化
2. Message Queueの核心概念
2.1 基本構成要素
| 構成要素 | 役割 |
|---|---|
| Producer | メッセージを生成してキュー/トピックに送信 |
| Consumer | キュー/トピックからメッセージを受信して処理 |
| Broker | メッセージを受信、保存、転送する中間サーバー |
| Topic / Queue | メッセージが保存される論理的なチャネル |
| Partition | トピックを物理的に分割して並列処理を支援 |
2.2 メッセージ配信保証レベル
At-Most-Once: Producer --> Broker (リトライなし)
メッセージが失われる可能性があるが重複なし
At-Least-Once: Producer --> Broker --> ACK --> 失敗時リトライ
メッセージの損失なしだが重複の可能性あり
Exactly-Once: Producer --> Broker (べき等 + トランザクション)
メッセージが正確に1回のみ配信
2.3 メッセージ配信モデル
Point-to-Point(Queue):
- 1つのメッセージが1つのコンシューマーにのみ配信
- 作業分配(Work Distribution)に適合
Publish-Subscribe(Topic):
- 1つのメッセージがすべてのサブスクライバーに配信
- イベントブロードキャストに適合
3. Apache Kafka詳細分析
3.1 Kafkaとは
Apache Kafkaは、LinkedInで開発された分散イベントストリーミングプラットフォームです。 高いスループット、耐久性、水平スケーラビリティを特徴とし、リアルタイムデータパイプラインとストリーミングアプリケーションで広く使用されています。
3.2 アーキテクチャ概要
+-------------------+
| Kafka Cluster |
| |
Producers ------->| Broker 1 |-------> Consumers
| Broker 2 | (Consumer Group A)
| Broker 3 |-------> Consumers
| | (Consumer Group B)
+-------------------+
|
+------+------+
| KRaft |
| Controller |
+-------------+
主要構成要素:
- Broker: メッセージを受信、保存、転送するサーバーノード
- KRaft Controller: Kafka 4.0からZooKeeperを置き換える内蔵メタデータ管理レイヤー
- Topic: メッセージの論理的カテゴリ
- Partition: トピックの物理的分割。各パーティションは順序保証された不変のログ
- Replication: パーティションごとのレプリカ維持、Leader-Follower構造
- ISR(In-Sync Replicas): Leaderと同期されたレプリカの集合
3.3 KRaftモード(ZooKeeper廃止)
Kafka 4.0からKRaftがデフォルトモードです。
従来の構造:
Kafka Brokers <---> ZooKeeper Ensemble(別クラスター)
KRaft構造:
Kafka Brokers(一部がController役割を兼務)
- 内部Raft合意プロトコルでメタデータ管理
- 別のZooKeeperクラスター不要
KRaftの利点:
- 運用複雑度の低減(ZooKeeperクラスター管理不要)
- パーティション数の拡張限界改善(数百万パーティションサポート)
- メタデータ伝播速度の向上
- コントローラー障害復旧時間の短縮
3.4 Producer詳細分析
Partitioner
メッセージをどのパーティションに送るかを決定します。
from confluent_kafka import Producer
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'my-producer',
'acks': 'all',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
'retries': 2147483647,
'linger.ms': 5,
'batch.size': 16384,
'compression.type': 'lz4',
}
producer = Producer(conf)
def delivery_callback(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
# キーベースのパーティショニング - 同じキーは同じパーティションへ
producer.produce(
topic='orders',
key='user-123',
value='{"order_id": "ord-456", "amount": 5000}',
callback=delivery_callback
)
producer.flush()
acks設定
| acks値 | 動作 | 耐久性 | パフォーマンス |
|---|---|---|---|
| 0 | ブローカーの応答を待たない | 低 | 最高 |
| 1 | Leaderのみ書き込み確認 | 中 | 高 |
| all (-1) | すべてのISRが書き込み確認 | 最高 | 低 |
バッチ処理と圧縮
Producer内部の動作:
Record --> Accumulator --> [Batch] --> Compressor --> Network Send
linger.ms: バッチ送信待機時間(デフォルト0ms)
batch.size: バッチ最大サイズ(デフォルト16KB)
compression.type: none / gzip / snappy / lz4 / zstd
3.5 Consumer詳細分析
Consumer Group
Topic: orders(3パーティション)
Consumer Group A:
Consumer 1 <-- Partition 0
Consumer 2 <-- Partition 1
Consumer 3 <-- Partition 2
Consumer Group B:
Consumer 4 <-- Partition 0, 1, 2
各Consumer Groupは独立してメッセージを消費します。 パーティション内では順序が保証され、1つのパーティションはグループ内の1つのコンシューマーにのみ割り当てられます。
オフセット管理
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # 手動コミット推奨
'max.poll.interval.ms': 300000,
'session.timeout.ms': 45000,
}
consumer = Consumer(conf)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f"Error: {msg.error()}")
break
# メッセージ処理
process_order(msg.value().decode('utf-8'))
# 手動コミット - 処理完了後
consumer.commit(asynchronous=False)
finally:
consumer.close()
リバランシング
コンシューマーグループのメンバーが変更されると、パーティションの再割り当てが発生します。
リバランシングのトリガー:
1. 新しいコンシューマーの参加
2. 既存コンシューマーの離脱(クラッシュまたは正常終了)
3. トピックパーティション数の変更
4. サブスクリプショントピックの変更
リバランシング戦略:
- Eager(Stop-the-World): すべてのパーティション解放後に再割り当て
- Cooperative(Incremental): 変更されたパーティションのみ再割り当て
3.6 Exactly-Onceセマンティクス
# べき等Producer + トランザクショナルメッセージング
conf = {
'bootstrap.servers': 'localhost:9092',
'enable.idempotence': True,
'transactional.id': 'my-transaction-id',
}
producer = Producer(conf)
producer.init_transactions()
try:
producer.begin_transaction()
producer.produce('topic-a', key='k1', value='v1')
producer.produce('topic-b', key='k2', value='v2')
# Consumerオフセットもトランザクションに含める
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata()
)
producer.commit_transaction()
except Exception as e:
producer.abort_transaction()
raise e
Exactly-Onceが動作する原理:
- べき等Producer: Producer ID + Sequence Numberで重複送信を防止
- トランザクショナルメッセージング: 複数のトピック/パーティションへの原子的書き込み
- read_committed分離レベル: コミットされたトランザクションのメッセージのみ消費
3.7 Kafka ConnectとKafka Streams
Kafka Connect:
Source Connector: DB/File/API --> Kafka Topic
Sink Connector: Kafka Topic --> DB/File/API
例: Debezium (MySQL CDC) --> Kafka --> Elasticsearch Sink
Kafka Streams:
Kafka Topic --> Stream Processing --> Kafka Topic
特徴: 別クラスター不要、ライブラリ形式
機能: filter, map, groupBy, windowed aggregation, join
4. RabbitMQ詳細分析
4.1 RabbitMQとは
RabbitMQは、AMQP(Advanced Message Queuing Protocol)ベースのオープンソースメッセージブローカーです。 柔軟なルーティング、多様なプロトコルサポート、管理の容易さを特徴とします。
4.2 アーキテクチャ概要
Producer --> Exchange --> Binding --> Queue --> Consumer
Exchange: メッセージルーティングハブ
Binding: ExchangeとQueue間のルーティングルール
Queue: メッセージが保存されるバッファ
Virtual Host: 論理的な分離単位(マルチテナント)
4.3 Exchangeタイプ
Direct Exchange
Producer --> [Direct Exchange]
|
+--(routing_key="order.created")--> [Order Queue]
|
+--(routing_key="payment.processed")--> [Payment Queue]
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Direct Exchange宣言
channel.exchange_declare(
exchange='orders',
exchange_type='direct',
durable=True
)
# Queue宣言とバインディング
channel.queue_declare(queue='order_created', durable=True)
channel.queue_bind(
exchange='orders',
queue='order_created',
routing_key='order.created'
)
# メッセージ発行
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body='{"order_id": "123", "user_id": "456"}',
properties=pika.BasicProperties(
delivery_mode=2, # persistent
content_type='application/json',
)
)
Topic Exchange
Producer --> [Topic Exchange]
|
+--(routing_key="order.*.kr")--> [Korea Order Queue]
|
+--(routing_key="order.premium.#")--> [Premium Queue]
|
+--(routing_key="#")--> [Audit Queue](全メッセージ)
* : 正確に1つの単語にマッチ
# : 0個以上の単語にマッチ
Fanout Exchange
Producer --> [Fanout Exchange]
|
+--> [Queue A](全メッセージ)
|
+--> [Queue B](全メッセージ)
|
+--> [Queue C](全メッセージ)
Headers Exchange
ルーティングキーの代わりにメッセージヘッダーのkey-valueペアでルーティングします。
4.4 メッセージACKとPrefetch
def callback(ch, method, properties, body):
try:
process_message(body)
# 処理成功時ACK
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# 処理失敗時NACK(リキュー)
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
# Prefetch設定 - 一度に処理するメッセージ数を制限
channel.basic_qos(prefetch_count=10)
channel.basic_consume(
queue='order_created',
on_message_callback=callback,
auto_ack=False # 手動ACK
)
channel.start_consuming()
Prefetch戦略:
prefetch_count=1: 1つずつ処理、最も公平な分配prefetch_count=10-50: スループットと公平性のバランスprefetch_count=0: 無制限(非推奨)
4.5 Dead Letter Queue(DLQ)
# DLQ設定
channel.queue_declare(
queue='order_processing',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'order.failed',
'x-message-ttl': 30000, # 30秒TTL
'x-max-length': 10000, # キュー最大長
}
)
# DLX(Dead Letter Exchange)設定
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='order_failed', durable=True)
channel.queue_bind(
exchange='dlx',
queue='order_failed',
routing_key='order.failed'
)
Dead Letterに送られるケース:
- Consumerがメッセージをreject/nack(requeue=False)
- メッセージTTL期限切れ
- キュー最大長超過
4.6 Publisher Confirms
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=message_body,
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True # ルーティング失敗時返却
)
print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
print("Message could not be routed")
except pika.exceptions.NackError:
print("Message was nacked by broker")
4.7 クラスタリングと高可用性
Classic Mirrored Queue(レガシー):
Node 1 (Leader) <-mirror-> Node 2 (Mirror) <-mirror-> Node 3 (Mirror)
問題: スプリットブレインリスク、同期化オーバーヘッド
Quorum Queue(推奨):
Node 1 (Leader) <-Raft-> Node 2 (Follower) <-Raft-> Node 3 (Follower)
利点: Raft合意プロトコル、スプリットブレイン防止、データ安全性保証
Quorum Queue宣言:
channel.queue_declare(
queue='important_orders',
durable=True,
arguments={
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3,
}
)
4.8 Streams(Kafkaスタイル機能)
RabbitMQ 3.9+で導入されたStreamsは、Kafkaのログベースストレージに類似しています。
channel.queue_declare(
queue='order_stream',
durable=True,
arguments={
'x-queue-type': 'stream',
'x-max-length-bytes': 1073741824, # 1GB
'x-stream-max-segment-size-bytes': 52428800, # 50MB
}
)
5. LLM Gateway非同期処理パターン
5.1 なぜLLM呼び出しにQueueが必要か
LLM API呼び出しは一般的なREST APIとは異なる特性を持ちます。
| 特性 | 一般API | LLM API |
|---|---|---|
| 応答時間 | 50-200ms | 2-30秒 |
| コスト | ほぼ無料 | トークン単位課金 |
| Rate Limit | 高い | 低い(RPM/TPM制限) |
| エラー率 | 低い | 比較的高い(429, 500, タイムアウト) |
| 応答サイズ | 一定 | 可変(トークン数による) |
Queueが解決する問題:
- Rate Limit管理: キューから速度調整しながらAPI呼び出し
- コスト制御: リクエストをキューに入れて優先度別に処理
- 障害対応: API障害時にキューに保管し再試行
- 負荷分散: 多数のユーザーリクエストをWorker Poolが順次処理
5.2 アーキテクチャ図
+----------+ +-------------+ +----------+ +--------------+
| | | | | | | |
| Client +---->+ FastAPI +---->+ Request +---->+ Worker Pool |
| (Web) | | Gateway | | Queue | | (Celery) |
| | | | | (Redis) | | |
+----------+ +------+------+ +----------+ +------+-------+
| |
| +----------+ |
| | | |
+<-----------+ Response +<------------+
| | Store |
| | (Redis) |
| +----------+
v
+----------+
| Status |
| SSE/WS |
+----------+
5.3 Priority Queueパターン
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # リアルタイムチャットボット応答
HIGH = 1 # プレミアムユーザー
NORMAL = 2 # 一般ユーザー
LOW = 3 # バッチ処理
BACKGROUND = 4 # 非リアルタイム分析
# Celery Task with Priority
from celery import Celery
app = Celery('llm_gateway', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def call_llm(self, request_id, model, messages, priority):
try:
response = litellm.completion(
model=model,
messages=messages,
timeout=30,
)
store_response(request_id, response)
return response
except Exception as exc:
# Exponential backoff
retry_delay = 60 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=retry_delay)
5.4 Exponential Backoffによるリトライ
import asyncio
import random
async def call_llm_with_retry(
model: str,
messages: list,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
):
for attempt in range(max_retries + 1):
try:
response = await litellm.acompletion(
model=model,
messages=messages,
timeout=30,
)
return response
except Exception as e:
if attempt == max_retries:
raise
# Exponential backoff + jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
actual_delay = delay + jitter
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {actual_delay:.1f}s...")
await asyncio.sleep(actual_delay)
5.5 実践実装: FastAPI + Celery + Redis
# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
import uuid
app = FastAPI()
class LLMRequest(BaseModel):
model: str = "gpt-4o"
messages: list
priority: int = 2
class LLMResponse(BaseModel):
request_id: str
status: str
result: dict = None
@app.post("/api/v1/completions")
async def create_completion(req: LLMRequest):
request_id = str(uuid.uuid4())
# 優先度に基づいてCeleryキューに送信
task = call_llm.apply_async(
args=[request_id, req.model, req.messages, req.priority],
queue=f"llm_priority_{req.priority}",
priority=req.priority,
)
return {
"request_id": request_id,
"task_id": task.id,
"status": "queued",
}
@app.get("/api/v1/completions/{task_id}")
async def get_completion(task_id: str):
result = AsyncResult(task_id)
if result.ready():
return {
"status": "completed",
"result": result.get(),
}
elif result.failed():
return {
"status": "failed",
"error": str(result.result),
}
else:
return {
"status": "processing",
}
# celery_config.py
from celery import Celery
app = Celery('llm_gateway')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='UTC',
task_routes={
'tasks.call_llm': {
'queue': 'llm_default',
},
},
# 優先度キュー設定
task_queue_max_priority=10,
task_default_priority=5,
# Worker並行度制限(Rate Limit対応)
worker_concurrency=4,
# タスク時間制限
task_soft_time_limit=120,
task_time_limit=180,
)
# docker-compose.yml
version: '3.8'
services:
api:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
ports:
- '8000:8000'
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
worker-high:
build: .
command: celery -A tasks worker -Q llm_priority_0,llm_priority_1 -c 4
depends_on:
- redis
worker-normal:
build: .
command: celery -A tasks worker -Q llm_priority_2 -c 8
depends_on:
- redis
worker-low:
build: .
command: celery -A tasks worker -Q llm_priority_3,llm_priority_4 -c 2
depends_on:
- redis
redis:
image: redis:7-alpine
ports:
- '6379:6379'
flower:
build: .
command: celery -A tasks flower --port=5555
ports:
- '5555:5555'
depends_on:
- redis
6. Kafka vs RabbitMQ比較
6.1 総合比較表
| 項目 | Apache Kafka | RabbitMQ |
|---|---|---|
| メッセージモデル | Pullベース、Log append | Pushベース、Queue |
| スループット | 数百万msg/秒 | 数万msg/秒 |
| レイテンシ | ~5ms(p99) | サブミリ秒可能 |
| メッセージ保持 | 設定期間保持(デフォルト7日) | 消費後削除 |
| 順序保証 | パーティション内保証 | キュー内保証 |
| ルーティング | Topic/Partitionベース | Exchange/Bindingベース(柔軟) |
| プロトコル | 独自プロトコル | AMQP, MQTT, STOMP |
| スケーリング | パーティション追加で水平拡張 | クラスタリング + Sharding |
| メタデータ | KRaft(内蔵) | Erlang分散システム |
| ストリーム処理 | Kafka Streams内蔵 | なし(外部ツール必要) |
| 運用複雑度 | 中〜高 | 中 |
| 学習コスト | 高 | 中 |
| メッセージサイズ | デフォルト1MB(設定可能) | 制限なし(実質数MB) |
6.2 パフォーマンスベンチマーク(参考値)
テスト環境: 3 broker/node, 3 replica, メッセージサイズ1KB
Apache Kafka:
- 単一パーティション: ~50,000 msg/秒
- 12パーティション: ~800,000 msg/秒
- 60パーティション: ~2,000,000+ msg/秒
- レイテンシp99: 5ms
RabbitMQ(Quorum Queue):
- 単一キュー: ~20,000 msg/秒
- 10キュー: ~100,000 msg/秒
- レイテンシp99: 1ms
6.3 選択ガイド
Kafkaを選択すべきケース:
- 大量イベントストリーミング(ログ、メトリクス、クリックストリーム)
- イベントソーシングパターンの実装
- リアルタイムデータパイプライン(ETL/ELT)
- 複数のコンシューマーが同じメッセージを独立して消費
- メッセージリプレイが必要な場合
- Kafka Streams/ksqlDBでのストリーム処理が必要な場合
RabbitMQを選択すべきケース:
- 複雑なルーティングパターンが必要な場合(Topic, Headers Exchange)
- タスクキュー(Task Queue)パターン
- 低レイテンシが重要な場合
- 多様なプロトコルサポートが必要な場合(AMQP, MQTT, STOMP)
- メッセージ単位のTTL、Priority等の細密な制御が必要な場合
- 既存AMQPエコシステムとの統合
LLM Gatewayでは?
小規模/中規模(1日10万リクエスト以下):
RabbitMQ + Celeryの組み合わせ推奨
- 設定簡単、Priority Queueサポート優秀
- Dead Letter Queueで失敗処理容易
大規模(1日100万リクエスト以上):
Kafkaベースの実装推奨
- 高スループット、メッセージ保持で再処理可能
- Consumer Groupで柔軟なスケーリング
- コスト/使用量分析のためのイベントログ活用
7. 本番モニタリング設定
7.1 Kafkaモニタリング
# Prometheus JMX Exporter設定(prometheus-jmx-config.yaml)
rules:
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec>'
name: kafka_server_messages_in_total
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec>'
name: kafka_server_bytes_in_total
type: COUNTER
- pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+)><>records-lag-max'
name: kafka_consumer_lag_max
type: GAUGE
主要モニタリング指標:
- Consumer Lag: コンシューマーがプロデューサーにどれだけ追いついているか
- ISR Shrink/Expand: レプリケーション状態の変化
- Under-Replicated Partitions: レプリカ不足パーティション
- Request Latency: リクエスト処理時間
7.2 RabbitMQモニタリング
主要指標:
- Queue Depth: キューに溜まったメッセージ数
- Consumer Utilization: コンシューマー稼働率
- Publish/Deliver Rate: 秒あたりの発行/配信メッセージ数
- Unacked Messages: ACK待ちメッセージ数
- Memory/Disk Usage: リソース使用量
RabbitMQ Management UI: http://localhost:15672
Prometheus Plugin: rabbitmq_prometheus(内蔵)
8. まとめ
メッセージキューベースの非同期アーキテクチャは、現代の分散システムの核心コンポーネントです。
主要ポイント:
- Kafka: 大量イベントストリーミング、ログベースストレージ、Consumer Groupによる柔軟な拡張
- RabbitMQ: 柔軟なルーティング、低レイテンシ、従来のメッセージキューパターンに最適
- LLM Gateway: 非同期処理でRate Limit管理、コスト制御、障害分離を達成
- 選択基準: スループット/レイテンシ/ルーティング複雑度/メッセージ保持の必要性で判断
両技術ともに成熟したエコシステムを持ち、プロジェクトの要件に合わせて選択すればよいでしょう。 大規模イベントストリーミングにはKafka、作業分配と柔軟なルーティングにはRabbitMQが依然として最善の選択です。
参考資料
- Apache Kafka公式ドキュメント: https://kafka.apache.org/documentation/
- RabbitMQ公式ドキュメント: https://www.rabbitmq.com/docs
- Confluent Kafka Pythonクライアント: https://github.com/confluentinc/confluent-kafka-python
- Pika(RabbitMQ Pythonクライアント): https://pika.readthedocs.io/
- Celery公式ドキュメント: https://docs.celeryq.dev/