Skip to content

필사 모드: [Architecture] Building Async Systems with Message Queues: Kafka vs RabbitMQ

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Overview

Communication between services in modern distributed systems is becoming increasingly complex.

Synchronous HTTP calls alone cannot achieve high throughput, fault isolation, and flexible scaling.

In this post, we cover the core concepts of **Message Queue**-based asynchronous architecture,

provide an in-depth comparison of Apache Kafka and RabbitMQ, and explore practical async processing patterns for LLM Gateways.

1. Synchronous vs Asynchronous Architecture

1.1 Limitations of Synchronous Communication

In synchronous communication, the client sends a request and blocks until a response arrives.

Client --> Service A --> Service B --> Service C

(blocking) (blocking) (processing)

**Problems:**

- **Tight Coupling**: If one service goes down, the entire chain fails

- **Latency Accumulation**: Response times from each service add up

- **Scaling Constraints**: The slowest service determines overall throughput

- **Resource Waste**: Threads and connections are held during response waiting

1.2 Benefits of Asynchronous Communication

Introducing a message queue decouples producers from consumers.

Producer --> [Message Queue] --> Consumer

(fire) (buffer/persist) (process at own pace)

**Benefits:**

- **Loose Coupling**: Producers and consumers operate independently

- **Load Leveling**: The queue acts as a buffer during traffic spikes

- **Fault Isolation**: Consumer failures do not affect producers

- **Independent Scaling**: Consumers can be scaled out independently

- **Peak Shaving**: Smooths out sudden load bursts

2. Core Message Queue Concepts

2.1 Basic Components

| Component | Role |

| ----------------- | ---------------------------------------------------------------- |

| **Producer** | Creates and sends messages to queues/topics |

| **Consumer** | Receives and processes messages from queues/topics |

| **Broker** | Intermediate server that receives, stores, and delivers messages |

| **Topic / Queue** | Logical channel where messages are stored |

| **Partition** | Physical subdivision of a topic for parallel processing |

2.2 Message Delivery Guarantees

At-Most-Once: Producer --> Broker (no retry)

Messages may be lost, but no duplicates

At-Least-Once: Producer --> Broker --> ACK --> Retry on failure

No message loss, but duplicates possible

Exactly-Once: Producer --> Broker (idempotent + transaction)

Each message delivered exactly once

2.3 Message Delivery Models

**Point-to-Point (Queue):**

- Each message is delivered to exactly one consumer

- Suitable for work distribution

**Publish-Subscribe (Topic):**

- Each message is delivered to all subscribers

- Suitable for event broadcasting

3. Apache Kafka Deep Dive

3.1 What is Kafka

Apache Kafka is a distributed event streaming platform originally developed at LinkedIn.

It is characterized by high throughput, durability, and horizontal scalability, and is widely used for real-time data pipelines and streaming applications.

3.2 Architecture Overview

+-------------------+

| Kafka Cluster |

| |

Producers ------->| Broker 1 |-------> Consumers

| Broker 2 | (Consumer Group A)

| Broker 3 |-------> Consumers

| | (Consumer Group B)

+-------------------+

|

+------+------+

| KRaft |

| Controller |

+-------------+

**Key Components:**

- **Broker**: Server node that receives, stores, and delivers messages

- **KRaft Controller**: Built-in metadata management layer replacing ZooKeeper since Kafka 4.0

- **Topic**: Logical category for messages

- **Partition**: Physical subdivision of a topic; each partition is an immutable ordered log

- **Replication**: Per-partition replicas maintained in a Leader-Follower structure

- **ISR (In-Sync Replicas)**: Set of replicas synchronized with the Leader

3.3 KRaft Mode (ZooKeeper Removed)

KRaft is the default mode starting with Kafka 4.0.

Legacy Architecture:

Kafka Brokers <---> ZooKeeper Ensemble (separate cluster)

KRaft Architecture:

Kafka Brokers (some nodes also serve as Controllers)

- Internal Raft consensus protocol for metadata management

- No separate ZooKeeper cluster required

**KRaft Benefits:**

- Reduced operational complexity (no ZooKeeper cluster management)

- Improved partition scaling limits (millions of partitions supported)

- Faster metadata propagation

- Shorter controller failover recovery time

3.4 Producer Deep Dive

Partitioner

Determines which partition a message is sent to.

from confluent_kafka import Producer

conf = {

'bootstrap.servers': 'localhost:9092',

'client.id': 'my-producer',

'acks': 'all',

'enable.idempotence': True,

'max.in.flight.requests.per.connection': 5,

'retries': 2147483647,

'linger.ms': 5,

'batch.size': 16384,

'compression.type': 'lz4',

}

producer = Producer(conf)

def delivery_callback(err, msg):

if err:

print(f"Delivery failed: {err}")

else:

print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

Key-based partitioning - same key goes to same partition

producer.produce(

topic='orders',

key='user-123',

value='{"order_id": "ord-456", "amount": 5000}',

callback=delivery_callback

)

producer.flush()

acks Configuration

| acks Value | Behavior | Durability | Performance |

| ---------- | --------------------------------- | ---------- | ----------- |

| 0 | Does not wait for broker response | Low | Highest |

| 1 | Only Leader confirms write | Medium | High |

| all (-1) | All ISR replicas confirm write | Highest | Low |

Batching and Compression

Producer Internal Flow:

Record --> Accumulator --> [Batch] --> Compressor --> Network Send

linger.ms: Batch send wait time (default 0ms)

batch.size: Maximum batch size (default 16KB)

compression.type: none / gzip / snappy / lz4 / zstd

3.5 Consumer Deep Dive

Consumer Group

Topic: orders (3 partitions)

Consumer Group A:

Consumer 1 <-- Partition 0

Consumer 2 <-- Partition 1

Consumer 3 <-- Partition 2

Consumer Group B:

Consumer 4 <-- Partition 0, 1, 2

Each Consumer Group consumes messages independently.

Ordering is guaranteed within a partition, and each partition is assigned to only one consumer within a group.

Offset Management

from confluent_kafka import Consumer, KafkaError

conf = {

'bootstrap.servers': 'localhost:9092',

'group.id': 'order-processor',

'auto.offset.reset': 'earliest',

'enable.auto.commit': False, # Manual commit recommended

'max.poll.interval.ms': 300000,

'session.timeout.ms': 45000,

}

consumer = Consumer(conf)

consumer.subscribe(['orders'])

try:

while True:

msg = consumer.poll(timeout=1.0)

if msg is None:

continue

if msg.error():

if msg.error().code() == KafkaError._PARTITION_EOF:

continue

else:

print(f"Error: {msg.error()}")

break

Process message

process_order(msg.value().decode('utf-8'))

Manual commit - after processing completes

consumer.commit(asynchronous=False)

finally:

consumer.close()

Rebalancing

Partition reassignment occurs when consumer group membership changes.

Rebalancing Triggers:

1. New consumer joins

2. Existing consumer leaves (crash or graceful shutdown)

3. Topic partition count changes

4. Subscription topic changes

Rebalancing Strategies:

- Eager (Stop-the-World): Release all partitions, then reassign

- Cooperative (Incremental): Only reassign changed partitions

3.6 Exactly-Once Semantics

Idempotent Producer + Transactional Messaging

conf = {

'bootstrap.servers': 'localhost:9092',

'enable.idempotence': True,

'transactional.id': 'my-transaction-id',

}

producer = Producer(conf)

producer.init_transactions()

try:

producer.begin_transaction()

producer.produce('topic-a', key='k1', value='v1')

producer.produce('topic-b', key='k2', value='v2')

Include consumer offsets in the transaction

producer.send_offsets_to_transaction(

consumer.position(consumer.assignment()),

consumer.consumer_group_metadata()

)

producer.commit_transaction()

except Exception as e:

producer.abort_transaction()

raise e

**How Exactly-Once Works:**

1. **Idempotent Producer**: Producer ID + Sequence Number prevents duplicate sends

2. **Transactional Messaging**: Atomic writes across multiple topics/partitions

3. **read_committed isolation**: Only consume messages from committed transactions

3.7 Kafka Connect and Kafka Streams

Kafka Connect:

Source Connector: DB/File/API --> Kafka Topic

Sink Connector: Kafka Topic --> DB/File/API

Example: Debezium (MySQL CDC) --> Kafka --> Elasticsearch Sink

Kafka Streams:

Kafka Topic --> Stream Processing --> Kafka Topic

Features: No separate cluster needed, library-based

Capabilities: filter, map, groupBy, windowed aggregation, join

4. RabbitMQ Deep Dive

4.1 What is RabbitMQ

RabbitMQ is an open-source message broker based on the AMQP (Advanced Message Queuing Protocol).

It is characterized by flexible routing, support for multiple protocols, and ease of management.

4.2 Architecture Overview

Producer --> Exchange --> Binding --> Queue --> Consumer

Exchange: Message routing hub

Binding: Routing rules between Exchange and Queue

Queue: Buffer where messages are stored

Virtual Host: Logical isolation unit (multi-tenancy)

4.3 Exchange Types

Direct Exchange

Producer --> [Direct Exchange]

|

+--(routing_key="order.created")--> [Order Queue]

|

+--(routing_key="payment.processed")--> [Payment Queue]

connection = pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)

channel = connection.channel()

Declare Direct Exchange

channel.exchange_declare(

exchange='orders',

exchange_type='direct',

durable=True

)

Declare Queue and Binding

channel.queue_declare(queue='order_created', durable=True)

channel.queue_bind(

exchange='orders',

queue='order_created',

routing_key='order.created'

)

Publish message

channel.basic_publish(

exchange='orders',

routing_key='order.created',

body='{"order_id": "123", "user_id": "456"}',

properties=pika.BasicProperties(

delivery_mode=2, # persistent

content_type='application/json',

)

)

Topic Exchange

Producer --> [Topic Exchange]

|

+--(routing_key="order.*.kr")--> [Korea Order Queue]

|

+--(routing_key="order.premium.#")--> [Premium Queue]

|

+--(routing_key="#")--> [Audit Queue] (all messages)

* : Matches exactly one word

: Matches zero or more words

Fanout Exchange

Producer --> [Fanout Exchange]

|

+--> [Queue A] (all messages)

|

+--> [Queue B] (all messages)

|

+--> [Queue C] (all messages)

Headers Exchange

Routes based on message header key-value pairs instead of routing keys.

4.4 Message Acknowledgement and Prefetch

def callback(ch, method, properties, body):

try:

process_message(body)

ACK on successful processing

ch.basic_ack(delivery_tag=method.delivery_tag)

except Exception as e:

NACK on failure (requeue)

ch.basic_nack(

delivery_tag=method.delivery_tag,

requeue=True

)

Prefetch setting - limit concurrent messages per consumer

channel.basic_qos(prefetch_count=10)

channel.basic_consume(

queue='order_created',

on_message_callback=callback,

auto_ack=False # Manual ACK

)

channel.start_consuming()

**Prefetch Strategies:**

- `prefetch_count=1`: Process one at a time, fairest distribution

- `prefetch_count=10-50`: Balance between throughput and fairness

- `prefetch_count=0`: Unlimited (not recommended)

4.5 Dead Letter Queue (DLQ)

DLQ configuration

channel.queue_declare(

queue='order_processing',

durable=True,

arguments={

'x-dead-letter-exchange': 'dlx',

'x-dead-letter-routing-key': 'order.failed',

'x-message-ttl': 30000, # 30 second TTL

'x-max-length': 10000, # Max queue length

}

)

DLX (Dead Letter Exchange) configuration

channel.exchange_declare(exchange='dlx', exchange_type='direct')

channel.queue_declare(queue='order_failed', durable=True)

channel.queue_bind(

exchange='dlx',

queue='order_failed',

routing_key='order.failed'

)

**When messages go to Dead Letter:**

1. Consumer rejects/nacks a message (requeue=False)

2. Message TTL expires

3. Queue maximum length exceeded

4.6 Publisher Confirms

channel.confirm_delivery()

try:

channel.basic_publish(

exchange='orders',

routing_key='order.created',

body=message_body,

properties=pika.BasicProperties(delivery_mode=2),

mandatory=True # Return if routing fails

)

print("Message confirmed by broker")

except pika.exceptions.UnroutableError:

print("Message could not be routed")

except pika.exceptions.NackError:

print("Message was nacked by broker")

4.7 Clustering and High Availability

Classic Mirrored Queue (Legacy):

Node 1 (Leader) <-mirror-> Node 2 (Mirror) <-mirror-> Node 3 (Mirror)

Issues: split-brain risk, synchronization overhead

Quorum Queue (Recommended):

Node 1 (Leader) <-Raft-> Node 2 (Follower) <-Raft-> Node 3 (Follower)

Benefits: Raft consensus protocol, no split-brain, guaranteed data safety

**Quorum Queue Declaration:**

channel.queue_declare(

queue='important_orders',

durable=True,

arguments={

'x-queue-type': 'quorum',

'x-quorum-initial-group-size': 3,

}

)

4.8 Streams (Kafka-style Feature)

Streams, introduced in RabbitMQ 3.9+, are similar to Kafka's log-based storage.

channel.queue_declare(

queue='order_stream',

durable=True,

arguments={

'x-queue-type': 'stream',

'x-max-length-bytes': 1073741824, # 1GB

'x-stream-max-segment-size-bytes': 52428800, # 50MB

}

)

5. LLM Gateway Async Processing Patterns

5.1 Why LLM Calls Need Queues

LLM API calls have different characteristics than typical REST APIs.

| Characteristic | Typical API | LLM API |

| -------------- | ----------- | ----------------------------------- |

| Response Time | 50-200ms | 2-30 seconds |

| Cost | Nearly free | Per-token pricing |

| Rate Limit | High | Low (RPM/TPM limits) |

| Error Rate | Low | Relatively high (429, 500, timeout) |

| Response Size | Consistent | Variable (depends on token count) |

**Problems Queues Solve:**

1. **Rate Limit Management**: Control API call speed from the queue

2. **Cost Control**: Queue requests and process by priority

3. **Failure Handling**: Store in queue during API outages, retry later

4. **Load Distribution**: Worker pool processes user requests sequentially

5.2 Architecture Diagram

+----------+ +-------------+ +----------+ +--------------+

| | | | | | | |

| Client +---->+ FastAPI +---->+ Request +---->+ Worker Pool |

| (Web) | | Gateway | | Queue | | (Celery) |

| | | | | (Redis) | | |

+----------+ +------+------+ +----------+ +------+-------+

| |

| +----------+ |

| | | |

+<-----------+ Response +<------------+

| | Store |

| | (Redis) |

| +----------+

v

+----------+

| Status |

| SSE/WS |

+----------+

5.3 Priority Queue Pattern

from enum import IntEnum

class Priority(IntEnum):

CRITICAL = 0 # Real-time chatbot responses

HIGH = 1 # Premium users

NORMAL = 2 # Standard users

LOW = 3 # Batch processing

BACKGROUND = 4 # Non-real-time analytics

Celery Task with Priority

from celery import Celery

app = Celery('llm_gateway', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=60)

def call_llm(self, request_id, model, messages, priority):

try:

response = litellm.completion(

model=model,

messages=messages,

timeout=30,

)

store_response(request_id, response)

return response

except Exception as exc:

Exponential backoff

retry_delay = 60 * (2 ** self.request.retries)

raise self.retry(exc=exc, countdown=retry_delay)

5.4 Retry with Exponential Backoff

async def call_llm_with_retry(

model: str,

messages: list,

max_retries: int = 5,

base_delay: float = 1.0,

max_delay: float = 60.0,

):

for attempt in range(max_retries + 1):

try:

response = await litellm.acompletion(

model=model,

messages=messages,

timeout=30,

)

return response

except Exception as e:

if attempt == max_retries:

raise

Exponential backoff + jitter

delay = min(base_delay * (2 ** attempt), max_delay)

jitter = random.uniform(0, delay * 0.1)

actual_delay = delay + jitter

print(f"Attempt {attempt + 1} failed: {e}")

print(f"Retrying in {actual_delay:.1f}s...")

await asyncio.sleep(actual_delay)

5.5 Production Implementation: FastAPI + Celery + Redis

app/main.py

from fastapi import FastAPI, HTTPException

from pydantic import BaseModel

from celery.result import AsyncResult

app = FastAPI()

class LLMRequest(BaseModel):

model: str = "gpt-4o"

messages: list

priority: int = 2

class LLMResponse(BaseModel):

request_id: str

status: str

result: dict = None

@app.post("/api/v1/completions")

async def create_completion(req: LLMRequest):

request_id = str(uuid.uuid4())

Send to Celery queue based on priority

task = call_llm.apply_async(

args=[request_id, req.model, req.messages, req.priority],

queue=f"llm_priority_{req.priority}",

priority=req.priority,

)

return {

"request_id": request_id,

"task_id": task.id,

"status": "queued",

}

@app.get("/api/v1/completions/{task_id}")

async def get_completion(task_id: str):

result = AsyncResult(task_id)

if result.ready():

return {

"status": "completed",

"result": result.get(),

}

elif result.failed():

return {

"status": "failed",

"error": str(result.result),

}

else:

return {

"status": "processing",

}

celery_config.py

from celery import Celery

app = Celery('llm_gateway')

app.conf.update(

broker_url='redis://localhost:6379/0',

result_backend='redis://localhost:6379/1',

task_serializer='json',

result_serializer='json',

accept_content=['json'],

timezone='UTC',

task_routes={

'tasks.call_llm': {

'queue': 'llm_default',

},

},

Priority queue settings

task_queue_max_priority=10,

task_default_priority=5,

Worker concurrency limit (Rate Limit handling)

worker_concurrency=4,

Task time limits

task_soft_time_limit=120,

task_time_limit=180,

)

docker-compose.yml

version: '3.8'

services:

api:

build: .

command: uvicorn app.main:app --host 0.0.0.0 --port 8000

ports:

- '8000:8000'

depends_on:

- redis

environment:

- REDIS_URL=redis://redis:6379

worker-high:

build: .

command: celery -A tasks worker -Q llm_priority_0,llm_priority_1 -c 4

depends_on:

- redis

worker-normal:

build: .

command: celery -A tasks worker -Q llm_priority_2 -c 8

depends_on:

- redis

worker-low:

build: .

command: celery -A tasks worker -Q llm_priority_3,llm_priority_4 -c 2

depends_on:

- redis

redis:

image: redis:7-alpine

ports:

- '6379:6379'

flower:

build: .

command: celery -A tasks flower --port=5555

ports:

- '5555:5555'

depends_on:

- redis

6. Kafka vs RabbitMQ Comparison

6.1 Comprehensive Comparison Table

| Aspect | Apache Kafka | RabbitMQ |

| -------------------------- | --------------------------------------- | --------------------------------- |

| **Message Model** | Pull-based, Log append | Push-based, Queue |

| **Throughput** | Millions msg/sec | Tens of thousands msg/sec |

| **Latency** | ~5ms (p99) | Sub-millisecond possible |

| **Message Retention** | Configurable retention (default 7 days) | Deleted after consumption |

| **Ordering** | Guaranteed within partition | Guaranteed within queue |

| **Routing** | Topic/Partition based | Exchange/Binding based (flexible) |

| **Protocol** | Custom protocol | AMQP, MQTT, STOMP |

| **Scaling** | Horizontal via partition addition | Clustering + Sharding |

| **Metadata** | KRaft (built-in) | Erlang distributed system |

| **Stream Processing** | Kafka Streams built-in | None (external tools needed) |

| **Operational Complexity** | Medium-High | Medium |

| **Learning Curve** | High | Medium |

| **Message Size** | Default 1MB (configurable) | No limit (practically several MB) |

6.2 Performance Benchmarks (Reference)

Test Environment: 3 brokers/nodes, 3 replicas, 1KB message size

Apache Kafka:

- Single partition: ~50,000 msg/sec

- 12 partitions: ~800,000 msg/sec

- 60 partitions: ~2,000,000+ msg/sec

- Latency p99: 5ms

RabbitMQ (Quorum Queue):

- Single queue: ~20,000 msg/sec

- 10 queues: ~100,000 msg/sec

- Latency p99: 1ms

6.3 Selection Guide

**Choose Kafka when:**

- High-volume event streaming (logs, metrics, clickstreams)

- Implementing event sourcing patterns

- Real-time data pipelines (ETL/ELT)

- Multiple consumers independently consuming the same messages

- Message replay is needed

- Stream processing with Kafka Streams/ksqlDB is required

**Choose RabbitMQ when:**

- Complex routing patterns are needed (Topic, Headers Exchange)

- Task queue patterns

- Low latency is critical

- Multiple protocol support is needed (AMQP, MQTT, STOMP)

- Fine-grained control like per-message TTL, priority is needed

- Integration with existing AMQP ecosystem

**For LLM Gateways?**

Small/Medium Scale (under 100K requests/day):

RabbitMQ + Celery recommended

- Easy setup, excellent Priority Queue support

- Dead Letter Queue for failure handling

Large Scale (over 1M requests/day):

Kafka-based implementation recommended

- High throughput, message retention for reprocessing

- Flexible scaling with Consumer Groups

- Event log for cost/usage analytics

7. Production Monitoring Setup

7.1 Kafka Monitoring

Prometheus JMX Exporter config (prometheus-jmx-config.yaml)

rules:

- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec>'

name: kafka_server_messages_in_total

type: COUNTER

- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec>'

name: kafka_server_bytes_in_total

type: COUNTER

- pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+)><>records-lag-max'

name: kafka_consumer_lag_max

type: GAUGE

**Key Monitoring Metrics:**

- Consumer Lag: How far behind consumers are from producers

- ISR Shrink/Expand: Replication state changes

- Under-Replicated Partitions: Partitions with insufficient replicas

- Request Latency: Request processing time

7.2 RabbitMQ Monitoring

Key Metrics:

- Queue Depth: Number of messages in queue

- Consumer Utilization: Consumer utilization rate

- Publish/Deliver Rate: Messages per second published/delivered

- Unacked Messages: Messages awaiting acknowledgement

- Memory/Disk Usage: Resource consumption

RabbitMQ Management UI: http://localhost:15672

Prometheus Plugin: rabbitmq_prometheus (built-in)

8. Conclusion

Message queue-based asynchronous architecture is a core component of modern distributed systems.

**Key Takeaways:**

1. **Kafka**: High-volume event streaming, log-based storage, flexible scaling with Consumer Groups

2. **RabbitMQ**: Flexible routing, low latency, optimal for traditional message queue patterns

3. **LLM Gateway**: Async processing achieves rate limit management, cost control, and fault isolation

4. **Selection Criteria**: Decide based on throughput, latency, routing complexity, and message retention needs

Both technologies have mature ecosystems, and you should choose based on your project requirements.

Kafka remains the best choice for large-scale event streaming, while RabbitMQ excels at work distribution and flexible routing.

References

- Apache Kafka Official Documentation: https://kafka.apache.org/documentation/

- RabbitMQ Official Documentation: https://www.rabbitmq.com/docs

- Confluent Kafka Python Client: https://github.com/confluentinc/confluent-kafka-python

- Pika (RabbitMQ Python Client): https://pika.readthedocs.io/

- Celery Official Documentation: https://docs.celeryq.dev/

현재 단락 (1/578)

Communication between services in modern distributed systems is becoming increasingly complex.

작성 글자: 0원문 글자: 20,271작성 단락: 0/578