Skip to content
Published on

[Architecture] Building Async Systems with Message Queues: Kafka vs RabbitMQ

Authors

Overview

Communication between services in modern distributed systems is becoming increasingly complex. Synchronous HTTP calls alone cannot achieve high throughput, fault isolation, and flexible scaling. In this post, we cover the core concepts of Message Queue-based asynchronous architecture, provide an in-depth comparison of Apache Kafka and RabbitMQ, and explore practical async processing patterns for LLM Gateways.


1. Synchronous vs Asynchronous Architecture

1.1 Limitations of Synchronous Communication

In synchronous communication, the client sends a request and blocks until a response arrives.

Client --> Service A --> Service B --> Service C
          (blocking)    (blocking)    (processing)

Problems:

  • Tight Coupling: If one service goes down, the entire chain fails
  • Latency Accumulation: Response times from each service add up
  • Scaling Constraints: The slowest service determines overall throughput
  • Resource Waste: Threads and connections are held during response waiting

1.2 Benefits of Asynchronous Communication

Introducing a message queue decouples producers from consumers.

Producer --> [Message Queue] --> Consumer
  (fire)    (buffer/persist)    (process at own pace)

Benefits:

  • Loose Coupling: Producers and consumers operate independently
  • Load Leveling: The queue acts as a buffer during traffic spikes
  • Fault Isolation: Consumer failures do not affect producers
  • Independent Scaling: Consumers can be scaled out independently
  • Peak Shaving: Smooths out sudden load bursts

2. Core Message Queue Concepts

2.1 Basic Components

ComponentRole
ProducerCreates and sends messages to queues/topics
ConsumerReceives and processes messages from queues/topics
BrokerIntermediate server that receives, stores, and delivers messages
Topic / QueueLogical channel where messages are stored
PartitionPhysical subdivision of a topic for parallel processing

2.2 Message Delivery Guarantees

At-Most-Once:   Producer --> Broker (no retry)
                Messages may be lost, but no duplicates

At-Least-Once:  Producer --> Broker --> ACK --> Retry on failure
                No message loss, but duplicates possible

Exactly-Once:   Producer --> Broker (idempotent + transaction)
                Each message delivered exactly once

2.3 Message Delivery Models

Point-to-Point (Queue):

  • Each message is delivered to exactly one consumer
  • Suitable for work distribution

Publish-Subscribe (Topic):

  • Each message is delivered to all subscribers
  • Suitable for event broadcasting

3. Apache Kafka Deep Dive

3.1 What is Kafka

Apache Kafka is a distributed event streaming platform originally developed at LinkedIn. It is characterized by high throughput, durability, and horizontal scalability, and is widely used for real-time data pipelines and streaming applications.

3.2 Architecture Overview

                    +-------------------+
                    |   Kafka Cluster   |
                    |                   |
  Producers ------->|  Broker 1         |-------> Consumers
                    |  Broker 2         |         (Consumer Group A)
                    |  Broker 3         |-------> Consumers
                    |                   |         (Consumer Group B)
                    +-------------------+
                           |
                    +------+------+
                    |  KRaft      |
                    |  Controller |
                    +-------------+

Key Components:

  • Broker: Server node that receives, stores, and delivers messages
  • KRaft Controller: Built-in metadata management layer replacing ZooKeeper since Kafka 4.0
  • Topic: Logical category for messages
  • Partition: Physical subdivision of a topic; each partition is an immutable ordered log
  • Replication: Per-partition replicas maintained in a Leader-Follower structure
  • ISR (In-Sync Replicas): Set of replicas synchronized with the Leader

3.3 KRaft Mode (ZooKeeper Removed)

KRaft is the default mode starting with Kafka 4.0.

Legacy Architecture:
  Kafka Brokers <---> ZooKeeper Ensemble (separate cluster)

KRaft Architecture:
  Kafka Brokers (some nodes also serve as Controllers)
  - Internal Raft consensus protocol for metadata management
  - No separate ZooKeeper cluster required

KRaft Benefits:

  • Reduced operational complexity (no ZooKeeper cluster management)
  • Improved partition scaling limits (millions of partitions supported)
  • Faster metadata propagation
  • Shorter controller failover recovery time

3.4 Producer Deep Dive

Partitioner

Determines which partition a message is sent to.

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'my-producer',
    'acks': 'all',
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5,
    'retries': 2147483647,
    'linger.ms': 5,
    'batch.size': 16384,
    'compression.type': 'lz4',
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

# Key-based partitioning - same key goes to same partition
producer.produce(
    topic='orders',
    key='user-123',
    value='{"order_id": "ord-456", "amount": 5000}',
    callback=delivery_callback
)

producer.flush()

acks Configuration

acks ValueBehaviorDurabilityPerformance
0Does not wait for broker responseLowHighest
1Only Leader confirms writeMediumHigh
all (-1)All ISR replicas confirm writeHighestLow

Batching and Compression

Producer Internal Flow:
  Record --> Accumulator --> [Batch] --> Compressor --> Network Send

  linger.ms: Batch send wait time (default 0ms)
  batch.size: Maximum batch size (default 16KB)
  compression.type: none / gzip / snappy / lz4 / zstd

3.5 Consumer Deep Dive

Consumer Group

Topic: orders (3 partitions)

Consumer Group A:
  Consumer 1 <-- Partition 0
  Consumer 2 <-- Partition 1
  Consumer 3 <-- Partition 2

Consumer Group B:
  Consumer 4 <-- Partition 0, 1, 2

Each Consumer Group consumes messages independently. Ordering is guaranteed within a partition, and each partition is assigned to only one consumer within a group.

Offset Management

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Manual commit recommended
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 45000,
}

consumer = Consumer(conf)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {msg.error()}")
                break

        # Process message
        process_order(msg.value().decode('utf-8'))

        # Manual commit - after processing completes
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Rebalancing

Partition reassignment occurs when consumer group membership changes.

Rebalancing Triggers:
  1. New consumer joins
  2. Existing consumer leaves (crash or graceful shutdown)
  3. Topic partition count changes
  4. Subscription topic changes

Rebalancing Strategies:
  - Eager (Stop-the-World): Release all partitions, then reassign
  - Cooperative (Incremental): Only reassign changed partitions

3.6 Exactly-Once Semantics

# Idempotent Producer + Transactional Messaging
conf = {
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,
    'transactional.id': 'my-transaction-id',
}

producer = Producer(conf)
producer.init_transactions()

try:
    producer.begin_transaction()

    producer.produce('topic-a', key='k1', value='v1')
    producer.produce('topic-b', key='k2', value='v2')

    # Include consumer offsets in the transaction
    producer.send_offsets_to_transaction(
        consumer.position(consumer.assignment()),
        consumer.consumer_group_metadata()
    )

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise e

How Exactly-Once Works:

  1. Idempotent Producer: Producer ID + Sequence Number prevents duplicate sends
  2. Transactional Messaging: Atomic writes across multiple topics/partitions
  3. read_committed isolation: Only consume messages from committed transactions

3.7 Kafka Connect and Kafka Streams

Kafka Connect:
  Source Connector: DB/File/API --> Kafka Topic
  Sink Connector:   Kafka Topic --> DB/File/API

  Example: Debezium (MySQL CDC) --> Kafka --> Elasticsearch Sink

Kafka Streams:
  Kafka Topic --> Stream Processing --> Kafka Topic

  Features: No separate cluster needed, library-based
  Capabilities: filter, map, groupBy, windowed aggregation, join

4. RabbitMQ Deep Dive

4.1 What is RabbitMQ

RabbitMQ is an open-source message broker based on the AMQP (Advanced Message Queuing Protocol). It is characterized by flexible routing, support for multiple protocols, and ease of management.

4.2 Architecture Overview

Producer --> Exchange --> Binding --> Queue --> Consumer

  Exchange: Message routing hub
  Binding:  Routing rules between Exchange and Queue
  Queue:    Buffer where messages are stored
  Virtual Host: Logical isolation unit (multi-tenancy)

4.3 Exchange Types

Direct Exchange

Producer --> [Direct Exchange]
              |
              +--(routing_key="order.created")--> [Order Queue]
              |
              +--(routing_key="payment.processed")--> [Payment Queue]
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Declare Direct Exchange
channel.exchange_declare(
    exchange='orders',
    exchange_type='direct',
    durable=True
)

# Declare Queue and Binding
channel.queue_declare(queue='order_created', durable=True)
channel.queue_bind(
    exchange='orders',
    queue='order_created',
    routing_key='order.created'
)

# Publish message
channel.basic_publish(
    exchange='orders',
    routing_key='order.created',
    body='{"order_id": "123", "user_id": "456"}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # persistent
        content_type='application/json',
    )
)

Topic Exchange

Producer --> [Topic Exchange]
              |
              +--(routing_key="order.*.kr")--> [Korea Order Queue]
              |
              +--(routing_key="order.premium.#")--> [Premium Queue]
              |
              +--(routing_key="#")--> [Audit Queue] (all messages)

  * : Matches exactly one word
  # : Matches zero or more words

Fanout Exchange

Producer --> [Fanout Exchange]
              |
              +--> [Queue A] (all messages)
              |
              +--> [Queue B] (all messages)
              |
              +--> [Queue C] (all messages)

Headers Exchange

Routes based on message header key-value pairs instead of routing keys.

4.4 Message Acknowledgement and Prefetch

def callback(ch, method, properties, body):
    try:
        process_message(body)
        # ACK on successful processing
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # NACK on failure (requeue)
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True
        )

# Prefetch setting - limit concurrent messages per consumer
channel.basic_qos(prefetch_count=10)

channel.basic_consume(
    queue='order_created',
    on_message_callback=callback,
    auto_ack=False  # Manual ACK
)

channel.start_consuming()

Prefetch Strategies:

  • prefetch_count=1: Process one at a time, fairest distribution
  • prefetch_count=10-50: Balance between throughput and fairness
  • prefetch_count=0: Unlimited (not recommended)

4.5 Dead Letter Queue (DLQ)

# DLQ configuration
channel.queue_declare(
    queue='order_processing',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'order.failed',
        'x-message-ttl': 30000,       # 30 second TTL
        'x-max-length': 10000,         # Max queue length
    }
)

# DLX (Dead Letter Exchange) configuration
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='order_failed', durable=True)
channel.queue_bind(
    exchange='dlx',
    queue='order_failed',
    routing_key='order.failed'
)

When messages go to Dead Letter:

  1. Consumer rejects/nacks a message (requeue=False)
  2. Message TTL expires
  3. Queue maximum length exceeded

4.6 Publisher Confirms

channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=message_body,
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True  # Return if routing fails
    )
    print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
    print("Message could not be routed")
except pika.exceptions.NackError:
    print("Message was nacked by broker")

4.7 Clustering and High Availability

Classic Mirrored Queue (Legacy):
  Node 1 (Leader)  <-mirror-> Node 2 (Mirror) <-mirror-> Node 3 (Mirror)
  Issues: split-brain risk, synchronization overhead

Quorum Queue (Recommended):
  Node 1 (Leader)  <-Raft-> Node 2 (Follower) <-Raft-> Node 3 (Follower)
  Benefits: Raft consensus protocol, no split-brain, guaranteed data safety

Quorum Queue Declaration:

channel.queue_declare(
    queue='important_orders',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,
    }
)

4.8 Streams (Kafka-style Feature)

Streams, introduced in RabbitMQ 3.9+, are similar to Kafka's log-based storage.

channel.queue_declare(
    queue='order_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-length-bytes': 1073741824,  # 1GB
        'x-stream-max-segment-size-bytes': 52428800,  # 50MB
    }
)

5. LLM Gateway Async Processing Patterns

5.1 Why LLM Calls Need Queues

LLM API calls have different characteristics than typical REST APIs.

CharacteristicTypical APILLM API
Response Time50-200ms2-30 seconds
CostNearly freePer-token pricing
Rate LimitHighLow (RPM/TPM limits)
Error RateLowRelatively high (429, 500, timeout)
Response SizeConsistentVariable (depends on token count)

Problems Queues Solve:

  1. Rate Limit Management: Control API call speed from the queue
  2. Cost Control: Queue requests and process by priority
  3. Failure Handling: Store in queue during API outages, retry later
  4. Load Distribution: Worker pool processes user requests sequentially

5.2 Architecture Diagram

+----------+     +-------------+     +----------+     +--------------+
|          |     |             |     |          |     |              |
|  Client  +---->+ FastAPI     +---->+ Request  +---->+ Worker Pool  |
|  (Web)   |     | Gateway     |     | Queue    |     | (Celery)     |
|          |     |             |     | (Redis)  |     |              |
+----------+     +------+------+     +----------+     +------+-------+
                        |                                     |
                        |            +----------+             |
                        |            |          |             |
                        +<-----------+ Response +<------------+
                        |            | Store    |
                        |            | (Redis)  |
                        |            +----------+
                        v
                  +----------+
                  |  Status  |
                  |  SSE/WS  |
                  +----------+

5.3 Priority Queue Pattern

from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 0   # Real-time chatbot responses
    HIGH = 1       # Premium users
    NORMAL = 2     # Standard users
    LOW = 3        # Batch processing
    BACKGROUND = 4 # Non-real-time analytics

# Celery Task with Priority
from celery import Celery

app = Celery('llm_gateway', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def call_llm(self, request_id, model, messages, priority):
    try:
        response = litellm.completion(
            model=model,
            messages=messages,
            timeout=30,
        )
        store_response(request_id, response)
        return response
    except Exception as exc:
        # Exponential backoff
        retry_delay = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=retry_delay)

5.4 Retry with Exponential Backoff

import asyncio
import random

async def call_llm_with_retry(
    model: str,
    messages: list,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
):
    for attempt in range(max_retries + 1):
        try:
            response = await litellm.acompletion(
                model=model,
                messages=messages,
                timeout=30,
            )
            return response
        except Exception as e:
            if attempt == max_retries:
                raise

            # Exponential backoff + jitter
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            actual_delay = delay + jitter

            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {actual_delay:.1f}s...")
            await asyncio.sleep(actual_delay)

5.5 Production Implementation: FastAPI + Celery + Redis

# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
import uuid

app = FastAPI()

class LLMRequest(BaseModel):
    model: str = "gpt-4o"
    messages: list
    priority: int = 2

class LLMResponse(BaseModel):
    request_id: str
    status: str
    result: dict = None

@app.post("/api/v1/completions")
async def create_completion(req: LLMRequest):
    request_id = str(uuid.uuid4())

    # Send to Celery queue based on priority
    task = call_llm.apply_async(
        args=[request_id, req.model, req.messages, req.priority],
        queue=f"llm_priority_{req.priority}",
        priority=req.priority,
    )

    return {
        "request_id": request_id,
        "task_id": task.id,
        "status": "queued",
    }

@app.get("/api/v1/completions/{task_id}")
async def get_completion(task_id: str):
    result = AsyncResult(task_id)

    if result.ready():
        return {
            "status": "completed",
            "result": result.get(),
        }
    elif result.failed():
        return {
            "status": "failed",
            "error": str(result.result),
        }
    else:
        return {
            "status": "processing",
        }
# celery_config.py
from celery import Celery

app = Celery('llm_gateway')

app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    task_routes={
        'tasks.call_llm': {
            'queue': 'llm_default',
        },
    },
    # Priority queue settings
    task_queue_max_priority=10,
    task_default_priority=5,
    # Worker concurrency limit (Rate Limit handling)
    worker_concurrency=4,
    # Task time limits
    task_soft_time_limit=120,
    task_time_limit=180,
)
# docker-compose.yml
version: '3.8'
services:
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000
    ports:
      - '8000:8000'
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379

  worker-high:
    build: .
    command: celery -A tasks worker -Q llm_priority_0,llm_priority_1 -c 4
    depends_on:
      - redis

  worker-normal:
    build: .
    command: celery -A tasks worker -Q llm_priority_2 -c 8
    depends_on:
      - redis

  worker-low:
    build: .
    command: celery -A tasks worker -Q llm_priority_3,llm_priority_4 -c 2
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

  flower:
    build: .
    command: celery -A tasks flower --port=5555
    ports:
      - '5555:5555'
    depends_on:
      - redis

6. Kafka vs RabbitMQ Comparison

6.1 Comprehensive Comparison Table

AspectApache KafkaRabbitMQ
Message ModelPull-based, Log appendPush-based, Queue
ThroughputMillions msg/secTens of thousands msg/sec
Latency~5ms (p99)Sub-millisecond possible
Message RetentionConfigurable retention (default 7 days)Deleted after consumption
OrderingGuaranteed within partitionGuaranteed within queue
RoutingTopic/Partition basedExchange/Binding based (flexible)
ProtocolCustom protocolAMQP, MQTT, STOMP
ScalingHorizontal via partition additionClustering + Sharding
MetadataKRaft (built-in)Erlang distributed system
Stream ProcessingKafka Streams built-inNone (external tools needed)
Operational ComplexityMedium-HighMedium
Learning CurveHighMedium
Message SizeDefault 1MB (configurable)No limit (practically several MB)

6.2 Performance Benchmarks (Reference)

Test Environment: 3 brokers/nodes, 3 replicas, 1KB message size

Apache Kafka:
  - Single partition: ~50,000 msg/sec
  - 12 partitions:    ~800,000 msg/sec
  - 60 partitions:    ~2,000,000+ msg/sec
  - Latency p99: 5ms

RabbitMQ (Quorum Queue):
  - Single queue:     ~20,000 msg/sec
  - 10 queues:        ~100,000 msg/sec
  - Latency p99: 1ms

6.3 Selection Guide

Choose Kafka when:

  • High-volume event streaming (logs, metrics, clickstreams)
  • Implementing event sourcing patterns
  • Real-time data pipelines (ETL/ELT)
  • Multiple consumers independently consuming the same messages
  • Message replay is needed
  • Stream processing with Kafka Streams/ksqlDB is required

Choose RabbitMQ when:

  • Complex routing patterns are needed (Topic, Headers Exchange)
  • Task queue patterns
  • Low latency is critical
  • Multiple protocol support is needed (AMQP, MQTT, STOMP)
  • Fine-grained control like per-message TTL, priority is needed
  • Integration with existing AMQP ecosystem

For LLM Gateways?

Small/Medium Scale (under 100K requests/day):
  RabbitMQ + Celery recommended
  - Easy setup, excellent Priority Queue support
  - Dead Letter Queue for failure handling

Large Scale (over 1M requests/day):
  Kafka-based implementation recommended
  - High throughput, message retention for reprocessing
  - Flexible scaling with Consumer Groups
  - Event log for cost/usage analytics

7. Production Monitoring Setup

7.1 Kafka Monitoring

# Prometheus JMX Exporter config (prometheus-jmx-config.yaml)
rules:
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec>'
    name: kafka_server_messages_in_total
    type: COUNTER
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec>'
    name: kafka_server_bytes_in_total
    type: COUNTER
  - pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+)><>records-lag-max'
    name: kafka_consumer_lag_max
    type: GAUGE

Key Monitoring Metrics:

  • Consumer Lag: How far behind consumers are from producers
  • ISR Shrink/Expand: Replication state changes
  • Under-Replicated Partitions: Partitions with insufficient replicas
  • Request Latency: Request processing time

7.2 RabbitMQ Monitoring

Key Metrics:
  - Queue Depth: Number of messages in queue
  - Consumer Utilization: Consumer utilization rate
  - Publish/Deliver Rate: Messages per second published/delivered
  - Unacked Messages: Messages awaiting acknowledgement
  - Memory/Disk Usage: Resource consumption

RabbitMQ Management UI: http://localhost:15672
Prometheus Plugin: rabbitmq_prometheus (built-in)

8. Conclusion

Message queue-based asynchronous architecture is a core component of modern distributed systems.

Key Takeaways:

  1. Kafka: High-volume event streaming, log-based storage, flexible scaling with Consumer Groups
  2. RabbitMQ: Flexible routing, low latency, optimal for traditional message queue patterns
  3. LLM Gateway: Async processing achieves rate limit management, cost control, and fault isolation
  4. Selection Criteria: Decide based on throughput, latency, routing complexity, and message retention needs

Both technologies have mature ecosystems, and you should choose based on your project requirements. Kafka remains the best choice for large-scale event streaming, while RabbitMQ excels at work distribution and flexible routing.


References