- Authors

- Name
- Youngju Kim
- @fjvbn20031
Overview
Communication between services in modern distributed systems is becoming increasingly complex. Synchronous HTTP calls alone cannot achieve high throughput, fault isolation, and flexible scaling. In this post, we cover the core concepts of Message Queue-based asynchronous architecture, provide an in-depth comparison of Apache Kafka and RabbitMQ, and explore practical async processing patterns for LLM Gateways.
1. Synchronous vs Asynchronous Architecture
1.1 Limitations of Synchronous Communication
In synchronous communication, the client sends a request and blocks until a response arrives.
Client --> Service A --> Service B --> Service C
(blocking) (blocking) (processing)
Problems:
- Tight Coupling: If one service goes down, the entire chain fails
- Latency Accumulation: Response times from each service add up
- Scaling Constraints: The slowest service determines overall throughput
- Resource Waste: Threads and connections are held during response waiting
1.2 Benefits of Asynchronous Communication
Introducing a message queue decouples producers from consumers.
Producer --> [Message Queue] --> Consumer
(fire) (buffer/persist) (process at own pace)
Benefits:
- Loose Coupling: Producers and consumers operate independently
- Load Leveling: The queue acts as a buffer during traffic spikes
- Fault Isolation: Consumer failures do not affect producers
- Independent Scaling: Consumers can be scaled out independently
- Peak Shaving: Smooths out sudden load bursts
2. Core Message Queue Concepts
2.1 Basic Components
| Component | Role |
|---|---|
| Producer | Creates and sends messages to queues/topics |
| Consumer | Receives and processes messages from queues/topics |
| Broker | Intermediate server that receives, stores, and delivers messages |
| Topic / Queue | Logical channel where messages are stored |
| Partition | Physical subdivision of a topic for parallel processing |
2.2 Message Delivery Guarantees
At-Most-Once: Producer --> Broker (no retry)
Messages may be lost, but no duplicates
At-Least-Once: Producer --> Broker --> ACK --> Retry on failure
No message loss, but duplicates possible
Exactly-Once: Producer --> Broker (idempotent + transaction)
Each message delivered exactly once
2.3 Message Delivery Models
Point-to-Point (Queue):
- Each message is delivered to exactly one consumer
- Suitable for work distribution
Publish-Subscribe (Topic):
- Each message is delivered to all subscribers
- Suitable for event broadcasting
3. Apache Kafka Deep Dive
3.1 What is Kafka
Apache Kafka is a distributed event streaming platform originally developed at LinkedIn. It is characterized by high throughput, durability, and horizontal scalability, and is widely used for real-time data pipelines and streaming applications.
3.2 Architecture Overview
+-------------------+
| Kafka Cluster |
| |
Producers ------->| Broker 1 |-------> Consumers
| Broker 2 | (Consumer Group A)
| Broker 3 |-------> Consumers
| | (Consumer Group B)
+-------------------+
|
+------+------+
| KRaft |
| Controller |
+-------------+
Key Components:
- Broker: Server node that receives, stores, and delivers messages
- KRaft Controller: Built-in metadata management layer replacing ZooKeeper since Kafka 4.0
- Topic: Logical category for messages
- Partition: Physical subdivision of a topic; each partition is an immutable ordered log
- Replication: Per-partition replicas maintained in a Leader-Follower structure
- ISR (In-Sync Replicas): Set of replicas synchronized with the Leader
3.3 KRaft Mode (ZooKeeper Removed)
KRaft is the default mode starting with Kafka 4.0.
Legacy Architecture:
Kafka Brokers <---> ZooKeeper Ensemble (separate cluster)
KRaft Architecture:
Kafka Brokers (some nodes also serve as Controllers)
- Internal Raft consensus protocol for metadata management
- No separate ZooKeeper cluster required
KRaft Benefits:
- Reduced operational complexity (no ZooKeeper cluster management)
- Improved partition scaling limits (millions of partitions supported)
- Faster metadata propagation
- Shorter controller failover recovery time
3.4 Producer Deep Dive
Partitioner
Determines which partition a message is sent to.
from confluent_kafka import Producer
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'my-producer',
'acks': 'all',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
'retries': 2147483647,
'linger.ms': 5,
'batch.size': 16384,
'compression.type': 'lz4',
}
producer = Producer(conf)
def delivery_callback(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
# Key-based partitioning - same key goes to same partition
producer.produce(
topic='orders',
key='user-123',
value='{"order_id": "ord-456", "amount": 5000}',
callback=delivery_callback
)
producer.flush()
acks Configuration
| acks Value | Behavior | Durability | Performance |
|---|---|---|---|
| 0 | Does not wait for broker response | Low | Highest |
| 1 | Only Leader confirms write | Medium | High |
| all (-1) | All ISR replicas confirm write | Highest | Low |
Batching and Compression
Producer Internal Flow:
Record --> Accumulator --> [Batch] --> Compressor --> Network Send
linger.ms: Batch send wait time (default 0ms)
batch.size: Maximum batch size (default 16KB)
compression.type: none / gzip / snappy / lz4 / zstd
3.5 Consumer Deep Dive
Consumer Group
Topic: orders (3 partitions)
Consumer Group A:
Consumer 1 <-- Partition 0
Consumer 2 <-- Partition 1
Consumer 3 <-- Partition 2
Consumer Group B:
Consumer 4 <-- Partition 0, 1, 2
Each Consumer Group consumes messages independently. Ordering is guaranteed within a partition, and each partition is assigned to only one consumer within a group.
Offset Management
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manual commit recommended
'max.poll.interval.ms': 300000,
'session.timeout.ms': 45000,
}
consumer = Consumer(conf)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f"Error: {msg.error()}")
break
# Process message
process_order(msg.value().decode('utf-8'))
# Manual commit - after processing completes
consumer.commit(asynchronous=False)
finally:
consumer.close()
Rebalancing
Partition reassignment occurs when consumer group membership changes.
Rebalancing Triggers:
1. New consumer joins
2. Existing consumer leaves (crash or graceful shutdown)
3. Topic partition count changes
4. Subscription topic changes
Rebalancing Strategies:
- Eager (Stop-the-World): Release all partitions, then reassign
- Cooperative (Incremental): Only reassign changed partitions
3.6 Exactly-Once Semantics
# Idempotent Producer + Transactional Messaging
conf = {
'bootstrap.servers': 'localhost:9092',
'enable.idempotence': True,
'transactional.id': 'my-transaction-id',
}
producer = Producer(conf)
producer.init_transactions()
try:
producer.begin_transaction()
producer.produce('topic-a', key='k1', value='v1')
producer.produce('topic-b', key='k2', value='v2')
# Include consumer offsets in the transaction
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata()
)
producer.commit_transaction()
except Exception as e:
producer.abort_transaction()
raise e
How Exactly-Once Works:
- Idempotent Producer: Producer ID + Sequence Number prevents duplicate sends
- Transactional Messaging: Atomic writes across multiple topics/partitions
- read_committed isolation: Only consume messages from committed transactions
3.7 Kafka Connect and Kafka Streams
Kafka Connect:
Source Connector: DB/File/API --> Kafka Topic
Sink Connector: Kafka Topic --> DB/File/API
Example: Debezium (MySQL CDC) --> Kafka --> Elasticsearch Sink
Kafka Streams:
Kafka Topic --> Stream Processing --> Kafka Topic
Features: No separate cluster needed, library-based
Capabilities: filter, map, groupBy, windowed aggregation, join
4. RabbitMQ Deep Dive
4.1 What is RabbitMQ
RabbitMQ is an open-source message broker based on the AMQP (Advanced Message Queuing Protocol). It is characterized by flexible routing, support for multiple protocols, and ease of management.
4.2 Architecture Overview
Producer --> Exchange --> Binding --> Queue --> Consumer
Exchange: Message routing hub
Binding: Routing rules between Exchange and Queue
Queue: Buffer where messages are stored
Virtual Host: Logical isolation unit (multi-tenancy)
4.3 Exchange Types
Direct Exchange
Producer --> [Direct Exchange]
|
+--(routing_key="order.created")--> [Order Queue]
|
+--(routing_key="payment.processed")--> [Payment Queue]
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare Direct Exchange
channel.exchange_declare(
exchange='orders',
exchange_type='direct',
durable=True
)
# Declare Queue and Binding
channel.queue_declare(queue='order_created', durable=True)
channel.queue_bind(
exchange='orders',
queue='order_created',
routing_key='order.created'
)
# Publish message
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body='{"order_id": "123", "user_id": "456"}',
properties=pika.BasicProperties(
delivery_mode=2, # persistent
content_type='application/json',
)
)
Topic Exchange
Producer --> [Topic Exchange]
|
+--(routing_key="order.*.kr")--> [Korea Order Queue]
|
+--(routing_key="order.premium.#")--> [Premium Queue]
|
+--(routing_key="#")--> [Audit Queue] (all messages)
* : Matches exactly one word
# : Matches zero or more words
Fanout Exchange
Producer --> [Fanout Exchange]
|
+--> [Queue A] (all messages)
|
+--> [Queue B] (all messages)
|
+--> [Queue C] (all messages)
Headers Exchange
Routes based on message header key-value pairs instead of routing keys.
4.4 Message Acknowledgement and Prefetch
def callback(ch, method, properties, body):
try:
process_message(body)
# ACK on successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# NACK on failure (requeue)
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
# Prefetch setting - limit concurrent messages per consumer
channel.basic_qos(prefetch_count=10)
channel.basic_consume(
queue='order_created',
on_message_callback=callback,
auto_ack=False # Manual ACK
)
channel.start_consuming()
Prefetch Strategies:
prefetch_count=1: Process one at a time, fairest distributionprefetch_count=10-50: Balance between throughput and fairnessprefetch_count=0: Unlimited (not recommended)
4.5 Dead Letter Queue (DLQ)
# DLQ configuration
channel.queue_declare(
queue='order_processing',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'order.failed',
'x-message-ttl': 30000, # 30 second TTL
'x-max-length': 10000, # Max queue length
}
)
# DLX (Dead Letter Exchange) configuration
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='order_failed', durable=True)
channel.queue_bind(
exchange='dlx',
queue='order_failed',
routing_key='order.failed'
)
When messages go to Dead Letter:
- Consumer rejects/nacks a message (requeue=False)
- Message TTL expires
- Queue maximum length exceeded
4.6 Publisher Confirms
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='orders',
routing_key='order.created',
body=message_body,
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True # Return if routing fails
)
print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
print("Message could not be routed")
except pika.exceptions.NackError:
print("Message was nacked by broker")
4.7 Clustering and High Availability
Classic Mirrored Queue (Legacy):
Node 1 (Leader) <-mirror-> Node 2 (Mirror) <-mirror-> Node 3 (Mirror)
Issues: split-brain risk, synchronization overhead
Quorum Queue (Recommended):
Node 1 (Leader) <-Raft-> Node 2 (Follower) <-Raft-> Node 3 (Follower)
Benefits: Raft consensus protocol, no split-brain, guaranteed data safety
Quorum Queue Declaration:
channel.queue_declare(
queue='important_orders',
durable=True,
arguments={
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3,
}
)
4.8 Streams (Kafka-style Feature)
Streams, introduced in RabbitMQ 3.9+, are similar to Kafka's log-based storage.
channel.queue_declare(
queue='order_stream',
durable=True,
arguments={
'x-queue-type': 'stream',
'x-max-length-bytes': 1073741824, # 1GB
'x-stream-max-segment-size-bytes': 52428800, # 50MB
}
)
5. LLM Gateway Async Processing Patterns
5.1 Why LLM Calls Need Queues
LLM API calls have different characteristics than typical REST APIs.
| Characteristic | Typical API | LLM API |
|---|---|---|
| Response Time | 50-200ms | 2-30 seconds |
| Cost | Nearly free | Per-token pricing |
| Rate Limit | High | Low (RPM/TPM limits) |
| Error Rate | Low | Relatively high (429, 500, timeout) |
| Response Size | Consistent | Variable (depends on token count) |
Problems Queues Solve:
- Rate Limit Management: Control API call speed from the queue
- Cost Control: Queue requests and process by priority
- Failure Handling: Store in queue during API outages, retry later
- Load Distribution: Worker pool processes user requests sequentially
5.2 Architecture Diagram
+----------+ +-------------+ +----------+ +--------------+
| | | | | | | |
| Client +---->+ FastAPI +---->+ Request +---->+ Worker Pool |
| (Web) | | Gateway | | Queue | | (Celery) |
| | | | | (Redis) | | |
+----------+ +------+------+ +----------+ +------+-------+
| |
| +----------+ |
| | | |
+<-----------+ Response +<------------+
| | Store |
| | (Redis) |
| +----------+
v
+----------+
| Status |
| SSE/WS |
+----------+
5.3 Priority Queue Pattern
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # Real-time chatbot responses
HIGH = 1 # Premium users
NORMAL = 2 # Standard users
LOW = 3 # Batch processing
BACKGROUND = 4 # Non-real-time analytics
# Celery Task with Priority
from celery import Celery
app = Celery('llm_gateway', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def call_llm(self, request_id, model, messages, priority):
try:
response = litellm.completion(
model=model,
messages=messages,
timeout=30,
)
store_response(request_id, response)
return response
except Exception as exc:
# Exponential backoff
retry_delay = 60 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=retry_delay)
5.4 Retry with Exponential Backoff
import asyncio
import random
async def call_llm_with_retry(
model: str,
messages: list,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
):
for attempt in range(max_retries + 1):
try:
response = await litellm.acompletion(
model=model,
messages=messages,
timeout=30,
)
return response
except Exception as e:
if attempt == max_retries:
raise
# Exponential backoff + jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
actual_delay = delay + jitter
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {actual_delay:.1f}s...")
await asyncio.sleep(actual_delay)
5.5 Production Implementation: FastAPI + Celery + Redis
# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
import uuid
app = FastAPI()
class LLMRequest(BaseModel):
model: str = "gpt-4o"
messages: list
priority: int = 2
class LLMResponse(BaseModel):
request_id: str
status: str
result: dict = None
@app.post("/api/v1/completions")
async def create_completion(req: LLMRequest):
request_id = str(uuid.uuid4())
# Send to Celery queue based on priority
task = call_llm.apply_async(
args=[request_id, req.model, req.messages, req.priority],
queue=f"llm_priority_{req.priority}",
priority=req.priority,
)
return {
"request_id": request_id,
"task_id": task.id,
"status": "queued",
}
@app.get("/api/v1/completions/{task_id}")
async def get_completion(task_id: str):
result = AsyncResult(task_id)
if result.ready():
return {
"status": "completed",
"result": result.get(),
}
elif result.failed():
return {
"status": "failed",
"error": str(result.result),
}
else:
return {
"status": "processing",
}
# celery_config.py
from celery import Celery
app = Celery('llm_gateway')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='UTC',
task_routes={
'tasks.call_llm': {
'queue': 'llm_default',
},
},
# Priority queue settings
task_queue_max_priority=10,
task_default_priority=5,
# Worker concurrency limit (Rate Limit handling)
worker_concurrency=4,
# Task time limits
task_soft_time_limit=120,
task_time_limit=180,
)
# docker-compose.yml
version: '3.8'
services:
api:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
ports:
- '8000:8000'
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
worker-high:
build: .
command: celery -A tasks worker -Q llm_priority_0,llm_priority_1 -c 4
depends_on:
- redis
worker-normal:
build: .
command: celery -A tasks worker -Q llm_priority_2 -c 8
depends_on:
- redis
worker-low:
build: .
command: celery -A tasks worker -Q llm_priority_3,llm_priority_4 -c 2
depends_on:
- redis
redis:
image: redis:7-alpine
ports:
- '6379:6379'
flower:
build: .
command: celery -A tasks flower --port=5555
ports:
- '5555:5555'
depends_on:
- redis
6. Kafka vs RabbitMQ Comparison
6.1 Comprehensive Comparison Table
| Aspect | Apache Kafka | RabbitMQ |
|---|---|---|
| Message Model | Pull-based, Log append | Push-based, Queue |
| Throughput | Millions msg/sec | Tens of thousands msg/sec |
| Latency | ~5ms (p99) | Sub-millisecond possible |
| Message Retention | Configurable retention (default 7 days) | Deleted after consumption |
| Ordering | Guaranteed within partition | Guaranteed within queue |
| Routing | Topic/Partition based | Exchange/Binding based (flexible) |
| Protocol | Custom protocol | AMQP, MQTT, STOMP |
| Scaling | Horizontal via partition addition | Clustering + Sharding |
| Metadata | KRaft (built-in) | Erlang distributed system |
| Stream Processing | Kafka Streams built-in | None (external tools needed) |
| Operational Complexity | Medium-High | Medium |
| Learning Curve | High | Medium |
| Message Size | Default 1MB (configurable) | No limit (practically several MB) |
6.2 Performance Benchmarks (Reference)
Test Environment: 3 brokers/nodes, 3 replicas, 1KB message size
Apache Kafka:
- Single partition: ~50,000 msg/sec
- 12 partitions: ~800,000 msg/sec
- 60 partitions: ~2,000,000+ msg/sec
- Latency p99: 5ms
RabbitMQ (Quorum Queue):
- Single queue: ~20,000 msg/sec
- 10 queues: ~100,000 msg/sec
- Latency p99: 1ms
6.3 Selection Guide
Choose Kafka when:
- High-volume event streaming (logs, metrics, clickstreams)
- Implementing event sourcing patterns
- Real-time data pipelines (ETL/ELT)
- Multiple consumers independently consuming the same messages
- Message replay is needed
- Stream processing with Kafka Streams/ksqlDB is required
Choose RabbitMQ when:
- Complex routing patterns are needed (Topic, Headers Exchange)
- Task queue patterns
- Low latency is critical
- Multiple protocol support is needed (AMQP, MQTT, STOMP)
- Fine-grained control like per-message TTL, priority is needed
- Integration with existing AMQP ecosystem
For LLM Gateways?
Small/Medium Scale (under 100K requests/day):
RabbitMQ + Celery recommended
- Easy setup, excellent Priority Queue support
- Dead Letter Queue for failure handling
Large Scale (over 1M requests/day):
Kafka-based implementation recommended
- High throughput, message retention for reprocessing
- Flexible scaling with Consumer Groups
- Event log for cost/usage analytics
7. Production Monitoring Setup
7.1 Kafka Monitoring
# Prometheus JMX Exporter config (prometheus-jmx-config.yaml)
rules:
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec>'
name: kafka_server_messages_in_total
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec>'
name: kafka_server_bytes_in_total
type: COUNTER
- pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+)><>records-lag-max'
name: kafka_consumer_lag_max
type: GAUGE
Key Monitoring Metrics:
- Consumer Lag: How far behind consumers are from producers
- ISR Shrink/Expand: Replication state changes
- Under-Replicated Partitions: Partitions with insufficient replicas
- Request Latency: Request processing time
7.2 RabbitMQ Monitoring
Key Metrics:
- Queue Depth: Number of messages in queue
- Consumer Utilization: Consumer utilization rate
- Publish/Deliver Rate: Messages per second published/delivered
- Unacked Messages: Messages awaiting acknowledgement
- Memory/Disk Usage: Resource consumption
RabbitMQ Management UI: http://localhost:15672
Prometheus Plugin: rabbitmq_prometheus (built-in)
8. Conclusion
Message queue-based asynchronous architecture is a core component of modern distributed systems.
Key Takeaways:
- Kafka: High-volume event streaming, log-based storage, flexible scaling with Consumer Groups
- RabbitMQ: Flexible routing, low latency, optimal for traditional message queue patterns
- LLM Gateway: Async processing achieves rate limit management, cost control, and fault isolation
- Selection Criteria: Decide based on throughput, latency, routing complexity, and message retention needs
Both technologies have mature ecosystems, and you should choose based on your project requirements. Kafka remains the best choice for large-scale event streaming, while RabbitMQ excels at work distribution and flexible routing.
References
- Apache Kafka Official Documentation: https://kafka.apache.org/documentation/
- RabbitMQ Official Documentation: https://www.rabbitmq.com/docs
- Confluent Kafka Python Client: https://github.com/confluentinc/confluent-kafka-python
- Pika (RabbitMQ Python Client): https://pika.readthedocs.io/
- Celery Official Documentation: https://docs.celeryq.dev/