Skip to content
Published on

[Architecture] メッセージキューによる非同期システム構築:Kafka vs RabbitMQ

Authors

概要

現代の分散システムにおいて、サービス間通信はますます複雑になっています。 同期的なHTTP呼び出しだけでは、高いスループット、障害分離、柔軟なスケーリングを達成することは困難です。 この記事では、メッセージキュー(Message Queue) ベースの非同期アーキテクチャの核心概念を整理し、 Apache KafkaとRabbitMQを詳細に比較し、LLM Gatewayでの実践的な非同期処理パターンまでカバーします。


1. 同期 vs 非同期アーキテクチャ

1.1 同期(Synchronous)通信の限界

同期通信では、クライアントがサーバーにリクエストを送信し、レスポンスが返るまで待機します。

Client --> Service A --> Service B --> Service C
          (blocking)    (blocking)    (processing)

問題点:

  • 密結合: 1つのサービスがダウンするとチェーン全体が失敗
  • レイテンシの蓄積: 各サービスの応答時間が合算される
  • スケーリング制約: 最も遅いサービスが全体のスループットを決定
  • リソース浪費: 応答待ち中にスレッド/コネクションを占有

1.2 非同期(Asynchronous)通信の利点

メッセージキューを導入すると、プロデューサーとコンシューマーが分離されます。

Producer --> [Message Queue] --> Consumer
  (fire)    (buffer/persist)    (process at own pace)

利点:

  • 疎結合: プロデューサーとコンシューマーが独立して動作
  • 負荷平準化: トラフィック急増時にキューがバッファの役割
  • 障害分離: コンシューマーの障害がプロデューサーに影響しない
  • 独立スケーリング: コンシューマーを独立してスケールアウト可能
  • ピークシェービング: 瞬間的な負荷を平滑化

2. Message Queueの核心概念

2.1 基本構成要素

構成要素役割
Producerメッセージを生成してキュー/トピックに送信
Consumerキュー/トピックからメッセージを受信して処理
Brokerメッセージを受信、保存、転送する中間サーバー
Topic / Queueメッセージが保存される論理的なチャネル
Partitionトピックを物理的に分割して並列処理を支援

2.2 メッセージ配信保証レベル

At-Most-Once:   Producer --> Broker (リトライなし)
                メッセージが失われる可能性があるが重複なし

At-Least-Once:  Producer --> Broker --> ACK --> 失敗時リトライ
                メッセージの損失なしだが重複の可能性あり

Exactly-Once:   Producer --> Broker (べき等 + トランザクション)
                メッセージが正確に1回のみ配信

2.3 メッセージ配信モデル

Point-to-Point(Queue):

  • 1つのメッセージが1つのコンシューマーにのみ配信
  • 作業分配(Work Distribution)に適合

Publish-Subscribe(Topic):

  • 1つのメッセージがすべてのサブスクライバーに配信
  • イベントブロードキャストに適合

3. Apache Kafka詳細分析

3.1 Kafkaとは

Apache Kafkaは、LinkedInで開発された分散イベントストリーミングプラットフォームです。 高いスループット、耐久性、水平スケーラビリティを特徴とし、リアルタイムデータパイプラインとストリーミングアプリケーションで広く使用されています。

3.2 アーキテクチャ概要

                    +-------------------+
                    |   Kafka Cluster   |
                    |                   |
  Producers ------->|  Broker 1         |-------> Consumers
                    |  Broker 2         |         (Consumer Group A)
                    |  Broker 3         |-------> Consumers
                    |                   |         (Consumer Group B)
                    +-------------------+
                           |
                    +------+------+
                    |  KRaft      |
                    |  Controller |
                    +-------------+

主要構成要素:

  • Broker: メッセージを受信、保存、転送するサーバーノード
  • KRaft Controller: Kafka 4.0からZooKeeperを置き換える内蔵メタデータ管理レイヤー
  • Topic: メッセージの論理的カテゴリ
  • Partition: トピックの物理的分割。各パーティションは順序保証された不変のログ
  • Replication: パーティションごとのレプリカ維持、Leader-Follower構造
  • ISR(In-Sync Replicas): Leaderと同期されたレプリカの集合

3.3 KRaftモード(ZooKeeper廃止)

Kafka 4.0からKRaftがデフォルトモードです。

従来の構造:
  Kafka Brokers <---> ZooKeeper Ensemble(別クラスター)

KRaft構造:
  Kafka Brokers(一部がController役割を兼務)
  - 内部Raft合意プロトコルでメタデータ管理
  - 別のZooKeeperクラスター不要

KRaftの利点:

  • 運用複雑度の低減(ZooKeeperクラスター管理不要)
  • パーティション数の拡張限界改善(数百万パーティションサポート)
  • メタデータ伝播速度の向上
  • コントローラー障害復旧時間の短縮

3.4 Producer詳細分析

Partitioner

メッセージをどのパーティションに送るかを決定します。

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'my-producer',
    'acks': 'all',
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5,
    'retries': 2147483647,
    'linger.ms': 5,
    'batch.size': 16384,
    'compression.type': 'lz4',
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

# キーベースのパーティショニング - 同じキーは同じパーティションへ
producer.produce(
    topic='orders',
    key='user-123',
    value='{"order_id": "ord-456", "amount": 5000}',
    callback=delivery_callback
)

producer.flush()

acks設定

acks値動作耐久性パフォーマンス
0ブローカーの応答を待たない最高
1Leaderのみ書き込み確認
all (-1)すべてのISRが書き込み確認最高

バッチ処理と圧縮

Producer内部の動作:
  Record --> Accumulator --> [Batch] --> Compressor --> Network Send

  linger.ms: バッチ送信待機時間(デフォルト0ms)
  batch.size: バッチ最大サイズ(デフォルト16KB)
  compression.type: none / gzip / snappy / lz4 / zstd

3.5 Consumer詳細分析

Consumer Group

Topic: orders(3パーティション)

Consumer Group A:
  Consumer 1 <-- Partition 0
  Consumer 2 <-- Partition 1
  Consumer 3 <-- Partition 2

Consumer Group B:
  Consumer 4 <-- Partition 0, 1, 2

各Consumer Groupは独立してメッセージを消費します。 パーティション内では順序が保証され、1つのパーティションはグループ内の1つのコンシューマーにのみ割り当てられます。

オフセット管理

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # 手動コミット推奨
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 45000,
}

consumer = Consumer(conf)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {msg.error()}")
                break

        # メッセージ処理
        process_order(msg.value().decode('utf-8'))

        # 手動コミット - 処理完了後
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

リバランシング

コンシューマーグループのメンバーが変更されると、パーティションの再割り当てが発生します。

リバランシングのトリガー:
  1. 新しいコンシューマーの参加
  2. 既存コンシューマーの離脱(クラッシュまたは正常終了)
  3. トピックパーティション数の変更
  4. サブスクリプショントピックの変更

リバランシング戦略:
  - Eager(Stop-the-World): すべてのパーティション解放後に再割り当て
  - Cooperative(Incremental): 変更されたパーティションのみ再割り当て

3.6 Exactly-Onceセマンティクス

# べき等Producer + トランザクショナルメッセージング
conf = {
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,
    'transactional.id': 'my-transaction-id',
}

producer = Producer(conf)
producer.init_transactions()

try:
    producer.begin_transaction()

    producer.produce('topic-a', key='k1', value='v1')
    producer.produce('topic-b', key='k2', value='v2')

    # Consumerオフセットもトランザクションに含める
    producer.send_offsets_to_transaction(
        consumer.position(consumer.assignment()),
        consumer.consumer_group_metadata()
    )

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise e

Exactly-Onceが動作する原理:

  1. べき等Producer: Producer ID + Sequence Numberで重複送信を防止
  2. トランザクショナルメッセージング: 複数のトピック/パーティションへの原子的書き込み
  3. read_committed分離レベル: コミットされたトランザクションのメッセージのみ消費

3.7 Kafka ConnectとKafka Streams

Kafka Connect:
  Source Connector: DB/File/API --> Kafka Topic
  Sink Connector:   Kafka Topic --> DB/File/API

  例: Debezium (MySQL CDC) --> Kafka --> Elasticsearch Sink

Kafka Streams:
  Kafka Topic --> Stream Processing --> Kafka Topic

  特徴: 別クラスター不要、ライブラリ形式
  機能: filter, map, groupBy, windowed aggregation, join

4. RabbitMQ詳細分析

4.1 RabbitMQとは

RabbitMQは、AMQP(Advanced Message Queuing Protocol)ベースのオープンソースメッセージブローカーです。 柔軟なルーティング、多様なプロトコルサポート、管理の容易さを特徴とします。

4.2 アーキテクチャ概要

Producer --> Exchange --> Binding --> Queue --> Consumer

  Exchange: メッセージルーティングハブ
  Binding:  ExchangeとQueue間のルーティングルール
  Queue:    メッセージが保存されるバッファ
  Virtual Host: 論理的な分離単位(マルチテナント)

4.3 Exchangeタイプ

Direct Exchange

Producer --> [Direct Exchange]
              |
              +--(routing_key="order.created")--> [Order Queue]
              |
              +--(routing_key="payment.processed")--> [Payment Queue]
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Direct Exchange宣言
channel.exchange_declare(
    exchange='orders',
    exchange_type='direct',
    durable=True
)

# Queue宣言とバインディング
channel.queue_declare(queue='order_created', durable=True)
channel.queue_bind(
    exchange='orders',
    queue='order_created',
    routing_key='order.created'
)

# メッセージ発行
channel.basic_publish(
    exchange='orders',
    routing_key='order.created',
    body='{"order_id": "123", "user_id": "456"}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # persistent
        content_type='application/json',
    )
)

Topic Exchange

Producer --> [Topic Exchange]
              |
              +--(routing_key="order.*.kr")--> [Korea Order Queue]
              |
              +--(routing_key="order.premium.#")--> [Premium Queue]
              |
              +--(routing_key="#")--> [Audit Queue](全メッセージ)

  * : 正確に1つの単語にマッチ
  # : 0個以上の単語にマッチ

Fanout Exchange

Producer --> [Fanout Exchange]
              |
              +--> [Queue A](全メッセージ)
              |
              +--> [Queue B](全メッセージ)
              |
              +--> [Queue C](全メッセージ)

Headers Exchange

ルーティングキーの代わりにメッセージヘッダーのkey-valueペアでルーティングします。

4.4 メッセージACKとPrefetch

def callback(ch, method, properties, body):
    try:
        process_message(body)
        # 処理成功時ACK
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # 処理失敗時NACK(リキュー)
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True
        )

# Prefetch設定 - 一度に処理するメッセージ数を制限
channel.basic_qos(prefetch_count=10)

channel.basic_consume(
    queue='order_created',
    on_message_callback=callback,
    auto_ack=False  # 手動ACK
)

channel.start_consuming()

Prefetch戦略:

  • prefetch_count=1: 1つずつ処理、最も公平な分配
  • prefetch_count=10-50: スループットと公平性のバランス
  • prefetch_count=0: 無制限(非推奨)

4.5 Dead Letter Queue(DLQ)

# DLQ設定
channel.queue_declare(
    queue='order_processing',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'order.failed',
        'x-message-ttl': 30000,       # 30秒TTL
        'x-max-length': 10000,         # キュー最大長
    }
)

# DLX(Dead Letter Exchange)設定
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='order_failed', durable=True)
channel.queue_bind(
    exchange='dlx',
    queue='order_failed',
    routing_key='order.failed'
)

Dead Letterに送られるケース:

  1. Consumerがメッセージをreject/nack(requeue=False)
  2. メッセージTTL期限切れ
  3. キュー最大長超過

4.6 Publisher Confirms

channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=message_body,
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True  # ルーティング失敗時返却
    )
    print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
    print("Message could not be routed")
except pika.exceptions.NackError:
    print("Message was nacked by broker")

4.7 クラスタリングと高可用性

Classic Mirrored Queue(レガシー):
  Node 1 (Leader)  <-mirror-> Node 2 (Mirror) <-mirror-> Node 3 (Mirror)
  問題: スプリットブレインリスク、同期化オーバーヘッド

Quorum Queue(推奨):
  Node 1 (Leader)  <-Raft-> Node 2 (Follower) <-Raft-> Node 3 (Follower)
  利点: Raft合意プロトコル、スプリットブレイン防止、データ安全性保証

Quorum Queue宣言:

channel.queue_declare(
    queue='important_orders',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,
    }
)

4.8 Streams(Kafkaスタイル機能)

RabbitMQ 3.9+で導入されたStreamsは、Kafkaのログベースストレージに類似しています。

channel.queue_declare(
    queue='order_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-length-bytes': 1073741824,  # 1GB
        'x-stream-max-segment-size-bytes': 52428800,  # 50MB
    }
)

5. LLM Gateway非同期処理パターン

5.1 なぜLLM呼び出しにQueueが必要か

LLM API呼び出しは一般的なREST APIとは異なる特性を持ちます。

特性一般APILLM API
応答時間50-200ms2-30秒
コストほぼ無料トークン単位課金
Rate Limit高い低い(RPM/TPM制限)
エラー率低い比較的高い(429, 500, タイムアウト)
応答サイズ一定可変(トークン数による)

Queueが解決する問題:

  1. Rate Limit管理: キューから速度調整しながらAPI呼び出し
  2. コスト制御: リクエストをキューに入れて優先度別に処理
  3. 障害対応: API障害時にキューに保管し再試行
  4. 負荷分散: 多数のユーザーリクエストをWorker Poolが順次処理

5.2 アーキテクチャ図

+----------+     +-------------+     +----------+     +--------------+
|          |     |             |     |          |     |              |
|  Client  +---->+ FastAPI     +---->+ Request  +---->+ Worker Pool  |
|  (Web)   |     | Gateway     |     | Queue    |     | (Celery)     |
|          |     |             |     | (Redis)  |     |              |
+----------+     +------+------+     +----------+     +------+-------+
                        |                                     |
                        |            +----------+             |
                        |            |          |             |
                        +<-----------+ Response +<------------+
                        |            | Store    |
                        |            | (Redis)  |
                        |            +----------+
                        v
                  +----------+
                  |  Status  |
                  |  SSE/WS  |
                  +----------+

5.3 Priority Queueパターン

from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 0   # リアルタイムチャットボット応答
    HIGH = 1       # プレミアムユーザー
    NORMAL = 2     # 一般ユーザー
    LOW = 3        # バッチ処理
    BACKGROUND = 4 # 非リアルタイム分析

# Celery Task with Priority
from celery import Celery

app = Celery('llm_gateway', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def call_llm(self, request_id, model, messages, priority):
    try:
        response = litellm.completion(
            model=model,
            messages=messages,
            timeout=30,
        )
        store_response(request_id, response)
        return response
    except Exception as exc:
        # Exponential backoff
        retry_delay = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=retry_delay)

5.4 Exponential Backoffによるリトライ

import asyncio
import random

async def call_llm_with_retry(
    model: str,
    messages: list,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
):
    for attempt in range(max_retries + 1):
        try:
            response = await litellm.acompletion(
                model=model,
                messages=messages,
                timeout=30,
            )
            return response
        except Exception as e:
            if attempt == max_retries:
                raise

            # Exponential backoff + jitter
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            actual_delay = delay + jitter

            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {actual_delay:.1f}s...")
            await asyncio.sleep(actual_delay)

5.5 実践実装: FastAPI + Celery + Redis

# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
import uuid

app = FastAPI()

class LLMRequest(BaseModel):
    model: str = "gpt-4o"
    messages: list
    priority: int = 2

class LLMResponse(BaseModel):
    request_id: str
    status: str
    result: dict = None

@app.post("/api/v1/completions")
async def create_completion(req: LLMRequest):
    request_id = str(uuid.uuid4())

    # 優先度に基づいてCeleryキューに送信
    task = call_llm.apply_async(
        args=[request_id, req.model, req.messages, req.priority],
        queue=f"llm_priority_{req.priority}",
        priority=req.priority,
    )

    return {
        "request_id": request_id,
        "task_id": task.id,
        "status": "queued",
    }

@app.get("/api/v1/completions/{task_id}")
async def get_completion(task_id: str):
    result = AsyncResult(task_id)

    if result.ready():
        return {
            "status": "completed",
            "result": result.get(),
        }
    elif result.failed():
        return {
            "status": "failed",
            "error": str(result.result),
        }
    else:
        return {
            "status": "processing",
        }
# celery_config.py
from celery import Celery

app = Celery('llm_gateway')

app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    task_routes={
        'tasks.call_llm': {
            'queue': 'llm_default',
        },
    },
    # 優先度キュー設定
    task_queue_max_priority=10,
    task_default_priority=5,
    # Worker並行度制限(Rate Limit対応)
    worker_concurrency=4,
    # タスク時間制限
    task_soft_time_limit=120,
    task_time_limit=180,
)
# docker-compose.yml
version: '3.8'
services:
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000
    ports:
      - '8000:8000'
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379

  worker-high:
    build: .
    command: celery -A tasks worker -Q llm_priority_0,llm_priority_1 -c 4
    depends_on:
      - redis

  worker-normal:
    build: .
    command: celery -A tasks worker -Q llm_priority_2 -c 8
    depends_on:
      - redis

  worker-low:
    build: .
    command: celery -A tasks worker -Q llm_priority_3,llm_priority_4 -c 2
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

  flower:
    build: .
    command: celery -A tasks flower --port=5555
    ports:
      - '5555:5555'
    depends_on:
      - redis

6. Kafka vs RabbitMQ比較

6.1 総合比較表

項目Apache KafkaRabbitMQ
メッセージモデルPullベース、Log appendPushベース、Queue
スループット数百万msg/秒数万msg/秒
レイテンシ~5ms(p99)サブミリ秒可能
メッセージ保持設定期間保持(デフォルト7日)消費後削除
順序保証パーティション内保証キュー内保証
ルーティングTopic/PartitionベースExchange/Bindingベース(柔軟)
プロトコル独自プロトコルAMQP, MQTT, STOMP
スケーリングパーティション追加で水平拡張クラスタリング + Sharding
メタデータKRaft(内蔵)Erlang分散システム
ストリーム処理Kafka Streams内蔵なし(外部ツール必要)
運用複雑度中〜高
学習コスト
メッセージサイズデフォルト1MB(設定可能)制限なし(実質数MB)

6.2 パフォーマンスベンチマーク(参考値)

テスト環境: 3 broker/node, 3 replica, メッセージサイズ1KB

Apache Kafka:
  - 単一パーティション: ~50,000 msg/秒
  - 12パーティション:   ~800,000 msg/秒
  - 60パーティション:   ~2,000,000+ msg/秒
  - レイテンシp99: 5ms

RabbitMQ(Quorum Queue):
  - 単一キュー:     ~20,000 msg/秒
  - 10キュー:       ~100,000 msg/秒
  - レイテンシp99: 1ms

6.3 選択ガイド

Kafkaを選択すべきケース:

  • 大量イベントストリーミング(ログ、メトリクス、クリックストリーム)
  • イベントソーシングパターンの実装
  • リアルタイムデータパイプライン(ETL/ELT)
  • 複数のコンシューマーが同じメッセージを独立して消費
  • メッセージリプレイが必要な場合
  • Kafka Streams/ksqlDBでのストリーム処理が必要な場合

RabbitMQを選択すべきケース:

  • 複雑なルーティングパターンが必要な場合(Topic, Headers Exchange)
  • タスクキュー(Task Queue)パターン
  • 低レイテンシが重要な場合
  • 多様なプロトコルサポートが必要な場合(AMQP, MQTT, STOMP)
  • メッセージ単位のTTL、Priority等の細密な制御が必要な場合
  • 既存AMQPエコシステムとの統合

LLM Gatewayでは?

小規模/中規模(1日10万リクエスト以下):
  RabbitMQ + Celeryの組み合わせ推奨
  - 設定簡単、Priority Queueサポート優秀
  - Dead Letter Queueで失敗処理容易

大規模(1日100万リクエスト以上):
  Kafkaベースの実装推奨
  - 高スループット、メッセージ保持で再処理可能
  - Consumer Groupで柔軟なスケーリング
  - コスト/使用量分析のためのイベントログ活用

7. 本番モニタリング設定

7.1 Kafkaモニタリング

# Prometheus JMX Exporter設定(prometheus-jmx-config.yaml)
rules:
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec>'
    name: kafka_server_messages_in_total
    type: COUNTER
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec>'
    name: kafka_server_bytes_in_total
    type: COUNTER
  - pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+)><>records-lag-max'
    name: kafka_consumer_lag_max
    type: GAUGE

主要モニタリング指標:

  • Consumer Lag: コンシューマーがプロデューサーにどれだけ追いついているか
  • ISR Shrink/Expand: レプリケーション状態の変化
  • Under-Replicated Partitions: レプリカ不足パーティション
  • Request Latency: リクエスト処理時間

7.2 RabbitMQモニタリング

主要指標:
  - Queue Depth: キューに溜まったメッセージ数
  - Consumer Utilization: コンシューマー稼働率
  - Publish/Deliver Rate: 秒あたりの発行/配信メッセージ数
  - Unacked Messages: ACK待ちメッセージ数
  - Memory/Disk Usage: リソース使用量

RabbitMQ Management UI: http://localhost:15672
Prometheus Plugin: rabbitmq_prometheus(内蔵)

8. まとめ

メッセージキューベースの非同期アーキテクチャは、現代の分散システムの核心コンポーネントです。

主要ポイント:

  1. Kafka: 大量イベントストリーミング、ログベースストレージ、Consumer Groupによる柔軟な拡張
  2. RabbitMQ: 柔軟なルーティング、低レイテンシ、従来のメッセージキューパターンに最適
  3. LLM Gateway: 非同期処理でRate Limit管理、コスト制御、障害分離を達成
  4. 選択基準: スループット/レイテンシ/ルーティング複雑度/メッセージ保持の必要性で判断

両技術ともに成熟したエコシステムを持ち、プロジェクトの要件に合わせて選択すればよいでしょう。 大規模イベントストリーミングにはKafka、作業分配と柔軟なルーティングにはRabbitMQが依然として最善の選択です。


参考資料