Skip to content

Split View: [Architecture] 메시지 큐로 비동기 시스템 구현하기: Kafka vs RabbitMQ

|

[Architecture] 메시지 큐로 비동기 시스템 구현하기: Kafka vs RabbitMQ

개요

현대 분산 시스템에서 서비스 간 통신은 점점 복잡해지고 있습니다. 동기 방식의 HTTP 호출만으로는 높은 처리량, 장애 격리, 유연한 스케일링을 달성하기 어렵습니다. 이 글에서는 메시지 큐(Message Queue) 기반 비동기 아키텍처의 핵심 개념을 정리하고, Apache Kafka와 RabbitMQ를 심층 비교하며, LLM Gateway에서의 실전 비동기 처리 패턴까지 다룹니다.


1. 동기 vs 비동기 아키텍처

1.1 동기(Synchronous) 통신의 한계

동기 통신에서는 클라이언트가 서버에 요청을 보내고 응답이 올 때까지 대기합니다.

Client --> Service A --> Service B --> Service C
          (blocking)    (blocking)    (processing)

문제점:

  • Tight Coupling: 하나의 서비스가 다운되면 전체 체인이 실패
  • Latency 누적: 각 서비스의 응답 시간이 합산
  • 스케일링 제약: 가장 느린 서비스가 전체 처리량을 결정
  • 리소스 낭비: 응답 대기 중 스레드/커넥션 점유

1.2 비동기(Asynchronous) 통신의 장점

메시지 큐를 도입하면 생산자와 소비자가 분리됩니다.

Producer --> [Message Queue] --> Consumer
  (fire)    (buffer/persist)    (process at own pace)

장점:

  • Loose Coupling: 생산자와 소비자가 독립적으로 동작
  • Load Leveling: 트래픽 급증 시 큐가 버퍼 역할
  • Fault Isolation: 소비자 장애가 생산자에 영향을 주지 않음
  • Independent Scaling: 소비자를 독립적으로 스케일아웃
  • Peak Shaving: 순간 부하를 평탄화

2. Message Queue 핵심 개념

2.1 기본 구성 요소

구성 요소역할
Producer메시지를 생성하고 큐/토픽에 전송
Consumer큐/토픽에서 메시지를 수신하고 처리
Broker메시지를 수신, 저장, 전달하는 중간 서버
Topic / Queue메시지가 저장되는 논리적 채널
Partition토픽을 물리적으로 분할하여 병렬 처리 지원

2.2 메시지 전달 보장 수준

At-Most-Once:   Producer --> Broker (no retry)
                메시지가 유실될 수 있지만 중복 없음

At-Least-Once:  Producer --> Broker --> ACK --> Retry on failure
                메시지 유실 없지만 중복 가능

Exactly-Once:   Producer --> Broker (idempotent + transaction)
                메시지가 정확히 한 번만 전달

2.3 메시지 전달 모델

Point-to-Point (Queue):

  • 하나의 메시지가 하나의 소비자에게만 전달
  • 작업 분배(Work Distribution)에 적합

Publish-Subscribe (Topic):

  • 하나의 메시지가 모든 구독자에게 전달
  • 이벤트 브로드캐스팅에 적합

3. Apache Kafka 심층 분석

3.1 Kafka란

Apache Kafka는 LinkedIn에서 개발된 분산 이벤트 스트리밍 플랫폼입니다. 높은 처리량, 내구성, 수평 확장성을 특징으로 하며, 실시간 데이터 파이프라인과 스트리밍 애플리케이션에 널리 사용됩니다.

3.2 아키텍처 개요

                    +-------------------+
                    |   Kafka Cluster   |
                    |                   |
  Producers ------->|  Broker 1         |-------> Consumers
                    |  Broker 2         |         (Consumer Group A)
                    |  Broker 3         |-------> Consumers
                    |                   |         (Consumer Group B)
                    +-------------------+
                           |
                    +------+------+
                    |  KRaft      |
                    |  Controller |
                    +-------------+

주요 구성 요소:

  • Broker: 메시지를 수신, 저장, 전달하는 서버 노드
  • KRaft Controller: Kafka 4.0부터 ZooKeeper를 대체하는 내장 메타데이터 관리 계층
  • Topic: 메시지의 논리적 카테고리
  • Partition: 토픽을 물리적으로 분할, 각 파티션은 순서가 보장되는 불변 로그
  • Replication: 파티션별 복제본 유지, Leader-Follower 구조
  • ISR (In-Sync Replicas): Leader와 동기화된 복제본 집합

3.3 KRaft 모드 (ZooKeeper 제거)

Kafka 4.0부터 KRaft가 기본 모드입니다.

기존 구조:
  Kafka Brokers <---> ZooKeeper Ensemble (별도 클러스터)

KRaft 구조:
  Kafka Brokers (일부가 Controller 역할 겸임)
  - 내부 Raft 합의 프로토콜로 메타데이터 관리
  - 별도 ZooKeeper 클러스터 불필요

KRaft의 장점:

  • 운영 복잡도 감소 (ZooKeeper 클러스터 관리 불필요)
  • 파티션 수 확장 한계 개선 (수백만 파티션 지원)
  • 메타데이터 전파 속도 향상
  • 컨트롤러 장애 복구 시간 단축

3.4 Producer 심층 분석

Partitioner

메시지를 어느 파티션에 보낼지 결정합니다.

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'my-producer',
    'acks': 'all',
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5,
    'retries': 2147483647,
    'linger.ms': 5,
    'batch.size': 16384,
    'compression.type': 'lz4',
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

# 키 기반 파티셔닝 - 같은 키는 같은 파티션으로
producer.produce(
    topic='orders',
    key='user-123',
    value='{"order_id": "ord-456", "amount": 5000}',
    callback=delivery_callback
)

producer.flush()

acks 설정

acks 값동작내구성성능
0브로커 응답 기다리지 않음낮음최고
1Leader만 기록 확인중간높음
all (-1)모든 ISR이 기록 확인최고낮음

Batching과 Compression

Producer 내부 동작:
  Record --> Accumulator --> [Batch] --> Compressor --> Network Send

  linger.ms: 배치 전송 대기 시간 (기본 0ms)
  batch.size: 배치 최대 크기 (기본 16KB)
  compression.type: none / gzip / snappy / lz4 / zstd

3.5 Consumer 심층 분석

Consumer Group

Topic: orders (3 partitions)

Consumer Group A:
  Consumer 1 <-- Partition 0
  Consumer 2 <-- Partition 1
  Consumer 3 <-- Partition 2

Consumer Group B:
  Consumer 4 <-- Partition 0, 1, 2

각 Consumer Group은 독립적으로 메시지를 소비합니다. 파티션 내에서는 순서가 보장되며, 하나의 파티션은 그룹 내 하나의 소비자에게만 할당됩니다.

Offset 관리

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # 수동 커밋 권장
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 45000,
}

consumer = Consumer(conf)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {msg.error()}")
                break

        # 메시지 처리
        process_order(msg.value().decode('utf-8'))

        # 수동 커밋 - 처리 완료 후
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Rebalancing

소비자 그룹의 멤버가 변경되면 파티션 재할당이 발생합니다.

Rebalancing 트리거:
  1. 새로운 소비자 합류
  2. 기존 소비자 이탈 (crash 또는 정상 종료)
  3. 토픽 파티션 수 변경
  4. 구독 토픽 변경

Rebalancing 전략:
  - Eager (Stop-the-World): 모든 파티션 해제 후 재할당
  - Cooperative (Incremental): 변경된 파티션만 재할당

3.6 Exactly-Once Semantics

# Idempotent Producer + Transactional Messaging
conf = {
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,
    'transactional.id': 'my-transaction-id',
}

producer = Producer(conf)
producer.init_transactions()

try:
    producer.begin_transaction()

    producer.produce('topic-a', key='k1', value='v1')
    producer.produce('topic-b', key='k2', value='v2')

    # Consumer offset도 트랜잭션에 포함
    producer.send_offsets_to_transaction(
        consumer.position(consumer.assignment()),
        consumer.consumer_group_metadata()
    )

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise e

Exactly-Once가 동작하는 원리:

  1. Idempotent Producer: Producer ID + Sequence Number로 중복 전송 방지
  2. Transactional Messaging: 여러 토픽/파티션에 원자적 쓰기
  3. read_committed isolation: 커밋된 트랜잭션의 메시지만 소비

3.7 Kafka Connect와 Kafka Streams

Kafka Connect:
  Source Connector: DB/File/API --> Kafka Topic
  Sink Connector:   Kafka Topic --> DB/File/API

  예시: Debezium (MySQL CDC) --> Kafka --> Elasticsearch Sink

Kafka Streams:
  Kafka Topic --> Stream Processing --> Kafka Topic

  특징: 별도 클러스터 불필요, 라이브러리 형태
  기능: filter, map, groupBy, windowed aggregation, join

4. RabbitMQ 심층 분석

4.1 RabbitMQ란

RabbitMQ는 AMQP(Advanced Message Queuing Protocol) 기반의 오픈소스 메시지 브로커입니다. 유연한 라우팅, 다양한 프로토콜 지원, 관리 편의성을 특징으로 합니다.

4.2 아키텍처 개요

Producer --> Exchange --> Binding --> Queue --> Consumer

  Exchange: 메시지 라우팅 허브
  Binding:  Exchange와 Queue 간의 라우팅 규칙
  Queue:    메시지가 저장되는 버퍼
  Virtual Host: 논리적 격리 단위 (멀티 테넌트)

4.3 Exchange Types

Direct Exchange

Producer --> [Direct Exchange]
              |
              +--(routing_key="order.created")--> [Order Queue]
              |
              +--(routing_key="payment.processed")--> [Payment Queue]
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Direct Exchange 선언
channel.exchange_declare(
    exchange='orders',
    exchange_type='direct',
    durable=True
)

# Queue 선언 및 바인딩
channel.queue_declare(queue='order_created', durable=True)
channel.queue_bind(
    exchange='orders',
    queue='order_created',
    routing_key='order.created'
)

# 메시지 발행
channel.basic_publish(
    exchange='orders',
    routing_key='order.created',
    body='{"order_id": "123", "user_id": "456"}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # persistent
        content_type='application/json',
    )
)

Topic Exchange

Producer --> [Topic Exchange]
              |
              +--(routing_key="order.*.kr")--> [Korea Order Queue]
              |
              +--(routing_key="order.premium.#")--> [Premium Queue]
              |
              +--(routing_key="#")--> [Audit Queue] (모든 메시지)

  * : 정확히 하나의 단어 매칭
  # : 0개 이상의 단어 매칭

Fanout Exchange

Producer --> [Fanout Exchange]
              |
              +--> [Queue A] (모든 메시지)
              |
              +--> [Queue B] (모든 메시지)
              |
              +--> [Queue C] (모든 메시지)

Headers Exchange

라우팅 키 대신 메시지 헤더의 key-value 쌍으로 라우팅합니다.

4.4 Message Acknowledgement와 Prefetch

def callback(ch, method, properties, body):
    try:
        process_message(body)
        # 처리 성공 시 ACK
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # 처리 실패 시 NACK (requeue)
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True
        )

# Prefetch 설정 - 한 번에 처리할 메시지 수 제한
channel.basic_qos(prefetch_count=10)

channel.basic_consume(
    queue='order_created',
    on_message_callback=callback,
    auto_ack=False  # 수동 ACK
)

channel.start_consuming()

Prefetch 전략:

  • prefetch_count=1: 하나씩 처리, 가장 공정한 분배
  • prefetch_count=10-50: 처리량과 공정성의 균형
  • prefetch_count=0: 제한 없음 (비권장)

4.5 Dead Letter Queue (DLQ)

# DLQ 설정
channel.queue_declare(
    queue='order_processing',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'order.failed',
        'x-message-ttl': 30000,       # 30초 TTL
        'x-max-length': 10000,         # 최대 큐 길이
    }
)

# DLX (Dead Letter Exchange) 설정
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='order_failed', durable=True)
channel.queue_bind(
    exchange='dlx',
    queue='order_failed',
    routing_key='order.failed'
)

Dead Letter로 가는 경우:

  1. Consumer가 메시지를 reject/nack (requeue=False)
  2. 메시지 TTL 만료
  3. 큐 최대 길이 초과

4.6 Publisher Confirms

channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=message_body,
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True  # 라우팅 실패 시 반환
    )
    print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
    print("Message could not be routed")
except pika.exceptions.NackError:
    print("Message was nacked by broker")

4.7 Clustering과 고가용성

Classic Mirrored Queue (레거시):
  Node 1 (Leader)  <-mirror-> Node 2 (Mirror) <-mirror-> Node 3 (Mirror)
  문제: split-brain 위험, 동기화 오버헤드

Quorum Queue (권장):
  Node 1 (Leader)  <-Raft-> Node 2 (Follower) <-Raft-> Node 3 (Follower)
  장점: Raft 합의 프로토콜, split-brain 방지, 데이터 안전성 보장

Quorum Queue 선언:

channel.queue_declare(
    queue='important_orders',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,
    }
)

4.8 Streams (Kafka 스타일 기능)

RabbitMQ 3.9+에서 도입된 Streams는 Kafka의 로그 기반 저장과 유사합니다.

channel.queue_declare(
    queue='order_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-length-bytes': 1073741824,  # 1GB
        'x-stream-max-segment-size-bytes': 52428800,  # 50MB
    }
)

5. LLM Gateway 비동기 처리 패턴

5.1 왜 LLM 호출에 Queue가 필요한가

LLM API 호출은 일반적인 REST API와 다른 특성을 갖습니다.

특성일반 APILLM API
응답 시간50-200ms2-30초
비용거의 무료토큰당 과금
Rate Limit높음낮음 (RPM/TPM 제한)
에러율낮음상대적으로 높음 (429, 500, timeout)
응답 크기일정가변적 (토큰 수에 따라)

Queue가 해결하는 문제:

  1. Rate Limit 관리: 큐에서 속도 조절하며 API 호출
  2. 비용 제어: 요청을 큐에 넣고 우선순위별 처리
  3. 장애 대응: API 장애 시 큐에 보관 후 재시도
  4. 부하 분산: 다수의 사용자 요청을 Worker Pool이 순차 처리

5.2 아키텍처 다이어그램

+----------+     +-------------+     +----------+     +--------------+
|          |     |             |     |          |     |              |
|  Client  +---->+ FastAPI     +---->+ Request  +---->+ Worker Pool  |
|  (Web)   |     | Gateway     |     | Queue    |     | (Celery)     |
|          |     |             |     | (Redis)  |     |              |
+----------+     +------+------+     +----------+     +------+-------+
                        |                                     |
                        |            +----------+             |
                        |            |          |             |
                        +<-----------+ Response +<------------+
                        |            | Store    |
                        |            | (Redis)  |
                        |            +----------+
                        v
                  +----------+
                  |  Status  |
                  |  SSE/WS  |
                  +----------+

5.3 Priority Queue 패턴

from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 0   # 실시간 챗봇 응답
    HIGH = 1       # 프리미엄 사용자
    NORMAL = 2     # 일반 사용자
    LOW = 3        # 배치 처리
    BACKGROUND = 4 # 비실시간 분석

# Celery Task with Priority
from celery import Celery

app = Celery('llm_gateway', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def call_llm(self, request_id, model, messages, priority):
    try:
        response = litellm.completion(
            model=model,
            messages=messages,
            timeout=30,
        )
        store_response(request_id, response)
        return response
    except Exception as exc:
        # Exponential backoff
        retry_delay = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=retry_delay)

5.4 Retry with Exponential Backoff

import asyncio
import random

async def call_llm_with_retry(
    model: str,
    messages: list,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
):
    for attempt in range(max_retries + 1):
        try:
            response = await litellm.acompletion(
                model=model,
                messages=messages,
                timeout=30,
            )
            return response
        except Exception as e:
            if attempt == max_retries:
                raise

            # Exponential backoff + jitter
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            actual_delay = delay + jitter

            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {actual_delay:.1f}s...")
            await asyncio.sleep(actual_delay)

5.5 실전 구현: FastAPI + Celery + Redis

# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
import uuid

app = FastAPI()

class LLMRequest(BaseModel):
    model: str = "gpt-4o"
    messages: list
    priority: int = 2

class LLMResponse(BaseModel):
    request_id: str
    status: str
    result: dict = None

@app.post("/api/v1/completions")
async def create_completion(req: LLMRequest):
    request_id = str(uuid.uuid4())

    # 우선순위에 따라 Celery 큐에 전송
    task = call_llm.apply_async(
        args=[request_id, req.model, req.messages, req.priority],
        queue=f"llm_priority_{req.priority}",
        priority=req.priority,
    )

    return {
        "request_id": request_id,
        "task_id": task.id,
        "status": "queued",
    }

@app.get("/api/v1/completions/{task_id}")
async def get_completion(task_id: str):
    result = AsyncResult(task_id)

    if result.ready():
        return {
            "status": "completed",
            "result": result.get(),
        }
    elif result.failed():
        return {
            "status": "failed",
            "error": str(result.result),
        }
    else:
        return {
            "status": "processing",
        }
# celery_config.py
from celery import Celery

app = Celery('llm_gateway')

app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    task_routes={
        'tasks.call_llm': {
            'queue': 'llm_default',
        },
    },
    # 우선순위 큐 설정
    task_queue_max_priority=10,
    task_default_priority=5,
    # Worker 동시성 제한 (Rate Limit 대응)
    worker_concurrency=4,
    # 태스크 시간 제한
    task_soft_time_limit=120,
    task_time_limit=180,
)
# docker-compose.yml
version: '3.8'
services:
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000
    ports:
      - '8000:8000'
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379

  worker-high:
    build: .
    command: celery -A tasks worker -Q llm_priority_0,llm_priority_1 -c 4
    depends_on:
      - redis

  worker-normal:
    build: .
    command: celery -A tasks worker -Q llm_priority_2 -c 8
    depends_on:
      - redis

  worker-low:
    build: .
    command: celery -A tasks worker -Q llm_priority_3,llm_priority_4 -c 2
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

  flower:
    build: .
    command: celery -A tasks flower --port=5555
    ports:
      - '5555:5555'
    depends_on:
      - redis

6. Kafka vs RabbitMQ 비교

6.1 종합 비교표

항목Apache KafkaRabbitMQ
메시지 모델Pull 기반, Log appendPush 기반, Queue
처리량수백만 msg/sec수만 msg/sec
지연 시간~5ms (p99)sub-ms 가능
메시지 보존설정 기간 보존 (기본 7일)소비 후 삭제
순서 보장파티션 내 보장큐 내 보장
라우팅Topic/Partition 기반Exchange/Binding 기반 (유연)
프로토콜자체 프로토콜AMQP, MQTT, STOMP
스케일링파티션 추가로 수평 확장클러스터링 + Sharding
메타데이터KRaft (내장)Erlang 분산 시스템
스트림 처리Kafka Streams 내장없음 (외부 도구 필요)
운영 복잡도중-상
학습 곡선높음중간
메시지 크기기본 1MB (설정 가능)제한 없음 (실질적 수 MB)

6.2 성능 벤치마크 (참고치)

테스트 환경: 3 broker/node, 3 replica, 메시지 크기 1KB

Apache Kafka:
  - 단일 파티션: ~50,000 msg/sec
  - 12 파티션:   ~800,000 msg/sec
  - 60 파티션:   ~2,000,000+ msg/sec
  - 지연시간 p99: 5ms

RabbitMQ (Quorum Queue):
  - 단일 큐:     ~20,000 msg/sec
  - 10 큐:       ~100,000 msg/sec
  - 지연시간 p99: 1ms

6.3 선택 가이드

Kafka를 선택해야 하는 경우:

  • 대용량 이벤트 스트리밍 (로그, 지표, 클릭스트림)
  • 이벤트 소싱 패턴 구현
  • 실시간 데이터 파이프라인 (ETL/ELT)
  • 여러 소비자가 같은 메시지를 독립적으로 소비
  • 메시지 재처리(replay)가 필요한 경우
  • Kafka Streams/ksqlDB로 스트림 처리가 필요한 경우

RabbitMQ를 선택해야 하는 경우:

  • 복잡한 라우팅 패턴이 필요한 경우 (Topic, Headers Exchange)
  • 작업 큐(Task Queue) 패턴
  • 낮은 지연시간이 중요한 경우
  • 다양한 프로토콜 지원이 필요한 경우 (AMQP, MQTT, STOMP)
  • 메시지별 TTL, Priority 등 세밀한 제어가 필요한 경우
  • 기존 AMQP 생태계와 통합

LLM Gateway에서는?

소규모/중규모 (일 10만 요청 이하):
  RabbitMQ + Celery 조합 권장
  - 설정 간편, Priority Queue 지원 우수
  - Dead Letter Queue로 실패 처리 용이

대규모 (일 100만 요청 이상):
  Kafka 기반 구현 권장
  - 높은 처리량, 메시지 보존으로 재처리 가능
  - Consumer Group으로 유연한 스케일링
  - 비용/사용량 분석을 위한 이벤트 로그 활용

7. 실전 모니터링 설정

7.1 Kafka 모니터링

# Prometheus JMX Exporter 설정 (prometheus-jmx-config.yaml)
rules:
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec>'
    name: kafka_server_messages_in_total
    type: COUNTER
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec>'
    name: kafka_server_bytes_in_total
    type: COUNTER
  - pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+)><>records-lag-max'
    name: kafka_consumer_lag_max
    type: GAUGE

핵심 모니터링 지표:

  • Consumer Lag: 소비자가 생산자를 얼마나 따라가고 있는지
  • ISR Shrink/Expand: 복제 상태 변화
  • Under-Replicated Partitions: 복제 부족 파티션
  • Request Latency: 요청 처리 시간

7.2 RabbitMQ 모니터링

핵심 지표:
  - Queue Depth: 큐에 쌓인 메시지 수
  - Consumer Utilization: 소비자 활용률
  - Publish/Deliver Rate: 초당 발행/전달 메시지 수
  - Unacked Messages: ACK 대기 중인 메시지 수
  - Memory/Disk Usage: 리소스 사용량

RabbitMQ Management UI: http://localhost:15672
Prometheus Plugin: rabbitmq_prometheus (내장)

8. 마무리

메시지 큐 기반 비동기 아키텍처는 현대 분산 시스템의 핵심 구성 요소입니다.

핵심 정리:

  1. Kafka: 대용량 이벤트 스트리밍, 로그 기반 저장, Consumer Group으로 유연한 확장
  2. RabbitMQ: 유연한 라우팅, 낮은 지연시간, 전통적 메시지 큐 패턴에 최적
  3. LLM Gateway: 비동기 처리로 Rate Limit 관리, 비용 제어, 장애 격리 달성
  4. 선택 기준: 처리량/지연시간/라우팅 복잡도/메시지 보존 필요성에 따라 결정

두 기술 모두 성숙한 생태계를 가지고 있으며, 프로젝트의 요구사항에 맞게 선택하면 됩니다. 대규모 이벤트 스트리밍에는 Kafka, 작업 분배와 유연한 라우팅에는 RabbitMQ가 여전히 최선의 선택입니다.


참고 자료

[Architecture] Building Async Systems with Message Queues: Kafka vs RabbitMQ

Overview

Communication between services in modern distributed systems is becoming increasingly complex. Synchronous HTTP calls alone cannot achieve high throughput, fault isolation, and flexible scaling. In this post, we cover the core concepts of Message Queue-based asynchronous architecture, provide an in-depth comparison of Apache Kafka and RabbitMQ, and explore practical async processing patterns for LLM Gateways.


1. Synchronous vs Asynchronous Architecture

1.1 Limitations of Synchronous Communication

In synchronous communication, the client sends a request and blocks until a response arrives.

Client --> Service A --> Service B --> Service C
          (blocking)    (blocking)    (processing)

Problems:

  • Tight Coupling: If one service goes down, the entire chain fails
  • Latency Accumulation: Response times from each service add up
  • Scaling Constraints: The slowest service determines overall throughput
  • Resource Waste: Threads and connections are held during response waiting

1.2 Benefits of Asynchronous Communication

Introducing a message queue decouples producers from consumers.

Producer --> [Message Queue] --> Consumer
  (fire)    (buffer/persist)    (process at own pace)

Benefits:

  • Loose Coupling: Producers and consumers operate independently
  • Load Leveling: The queue acts as a buffer during traffic spikes
  • Fault Isolation: Consumer failures do not affect producers
  • Independent Scaling: Consumers can be scaled out independently
  • Peak Shaving: Smooths out sudden load bursts

2. Core Message Queue Concepts

2.1 Basic Components

ComponentRole
ProducerCreates and sends messages to queues/topics
ConsumerReceives and processes messages from queues/topics
BrokerIntermediate server that receives, stores, and delivers messages
Topic / QueueLogical channel where messages are stored
PartitionPhysical subdivision of a topic for parallel processing

2.2 Message Delivery Guarantees

At-Most-Once:   Producer --> Broker (no retry)
                Messages may be lost, but no duplicates

At-Least-Once:  Producer --> Broker --> ACK --> Retry on failure
                No message loss, but duplicates possible

Exactly-Once:   Producer --> Broker (idempotent + transaction)
                Each message delivered exactly once

2.3 Message Delivery Models

Point-to-Point (Queue):

  • Each message is delivered to exactly one consumer
  • Suitable for work distribution

Publish-Subscribe (Topic):

  • Each message is delivered to all subscribers
  • Suitable for event broadcasting

3. Apache Kafka Deep Dive

3.1 What is Kafka

Apache Kafka is a distributed event streaming platform originally developed at LinkedIn. It is characterized by high throughput, durability, and horizontal scalability, and is widely used for real-time data pipelines and streaming applications.

3.2 Architecture Overview

                    +-------------------+
                    |   Kafka Cluster   |
                    |                   |
  Producers ------->|  Broker 1         |-------> Consumers
                    |  Broker 2         |         (Consumer Group A)
                    |  Broker 3         |-------> Consumers
                    |                   |         (Consumer Group B)
                    +-------------------+
                           |
                    +------+------+
                    |  KRaft      |
                    |  Controller |
                    +-------------+

Key Components:

  • Broker: Server node that receives, stores, and delivers messages
  • KRaft Controller: Built-in metadata management layer replacing ZooKeeper since Kafka 4.0
  • Topic: Logical category for messages
  • Partition: Physical subdivision of a topic; each partition is an immutable ordered log
  • Replication: Per-partition replicas maintained in a Leader-Follower structure
  • ISR (In-Sync Replicas): Set of replicas synchronized with the Leader

3.3 KRaft Mode (ZooKeeper Removed)

KRaft is the default mode starting with Kafka 4.0.

Legacy Architecture:
  Kafka Brokers <---> ZooKeeper Ensemble (separate cluster)

KRaft Architecture:
  Kafka Brokers (some nodes also serve as Controllers)
  - Internal Raft consensus protocol for metadata management
  - No separate ZooKeeper cluster required

KRaft Benefits:

  • Reduced operational complexity (no ZooKeeper cluster management)
  • Improved partition scaling limits (millions of partitions supported)
  • Faster metadata propagation
  • Shorter controller failover recovery time

3.4 Producer Deep Dive

Partitioner

Determines which partition a message is sent to.

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'my-producer',
    'acks': 'all',
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5,
    'retries': 2147483647,
    'linger.ms': 5,
    'batch.size': 16384,
    'compression.type': 'lz4',
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

# Key-based partitioning - same key goes to same partition
producer.produce(
    topic='orders',
    key='user-123',
    value='{"order_id": "ord-456", "amount": 5000}',
    callback=delivery_callback
)

producer.flush()

acks Configuration

acks ValueBehaviorDurabilityPerformance
0Does not wait for broker responseLowHighest
1Only Leader confirms writeMediumHigh
all (-1)All ISR replicas confirm writeHighestLow

Batching and Compression

Producer Internal Flow:
  Record --> Accumulator --> [Batch] --> Compressor --> Network Send

  linger.ms: Batch send wait time (default 0ms)
  batch.size: Maximum batch size (default 16KB)
  compression.type: none / gzip / snappy / lz4 / zstd

3.5 Consumer Deep Dive

Consumer Group

Topic: orders (3 partitions)

Consumer Group A:
  Consumer 1 <-- Partition 0
  Consumer 2 <-- Partition 1
  Consumer 3 <-- Partition 2

Consumer Group B:
  Consumer 4 <-- Partition 0, 1, 2

Each Consumer Group consumes messages independently. Ordering is guaranteed within a partition, and each partition is assigned to only one consumer within a group.

Offset Management

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Manual commit recommended
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 45000,
}

consumer = Consumer(conf)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Error: {msg.error()}")
                break

        # Process message
        process_order(msg.value().decode('utf-8'))

        # Manual commit - after processing completes
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Rebalancing

Partition reassignment occurs when consumer group membership changes.

Rebalancing Triggers:
  1. New consumer joins
  2. Existing consumer leaves (crash or graceful shutdown)
  3. Topic partition count changes
  4. Subscription topic changes

Rebalancing Strategies:
  - Eager (Stop-the-World): Release all partitions, then reassign
  - Cooperative (Incremental): Only reassign changed partitions

3.6 Exactly-Once Semantics

# Idempotent Producer + Transactional Messaging
conf = {
    'bootstrap.servers': 'localhost:9092',
    'enable.idempotence': True,
    'transactional.id': 'my-transaction-id',
}

producer = Producer(conf)
producer.init_transactions()

try:
    producer.begin_transaction()

    producer.produce('topic-a', key='k1', value='v1')
    producer.produce('topic-b', key='k2', value='v2')

    # Include consumer offsets in the transaction
    producer.send_offsets_to_transaction(
        consumer.position(consumer.assignment()),
        consumer.consumer_group_metadata()
    )

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise e

How Exactly-Once Works:

  1. Idempotent Producer: Producer ID + Sequence Number prevents duplicate sends
  2. Transactional Messaging: Atomic writes across multiple topics/partitions
  3. read_committed isolation: Only consume messages from committed transactions

3.7 Kafka Connect and Kafka Streams

Kafka Connect:
  Source Connector: DB/File/API --> Kafka Topic
  Sink Connector:   Kafka Topic --> DB/File/API

  Example: Debezium (MySQL CDC) --> Kafka --> Elasticsearch Sink

Kafka Streams:
  Kafka Topic --> Stream Processing --> Kafka Topic

  Features: No separate cluster needed, library-based
  Capabilities: filter, map, groupBy, windowed aggregation, join

4. RabbitMQ Deep Dive

4.1 What is RabbitMQ

RabbitMQ is an open-source message broker based on the AMQP (Advanced Message Queuing Protocol). It is characterized by flexible routing, support for multiple protocols, and ease of management.

4.2 Architecture Overview

Producer --> Exchange --> Binding --> Queue --> Consumer

  Exchange: Message routing hub
  Binding:  Routing rules between Exchange and Queue
  Queue:    Buffer where messages are stored
  Virtual Host: Logical isolation unit (multi-tenancy)

4.3 Exchange Types

Direct Exchange

Producer --> [Direct Exchange]
              |
              +--(routing_key="order.created")--> [Order Queue]
              |
              +--(routing_key="payment.processed")--> [Payment Queue]
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Declare Direct Exchange
channel.exchange_declare(
    exchange='orders',
    exchange_type='direct',
    durable=True
)

# Declare Queue and Binding
channel.queue_declare(queue='order_created', durable=True)
channel.queue_bind(
    exchange='orders',
    queue='order_created',
    routing_key='order.created'
)

# Publish message
channel.basic_publish(
    exchange='orders',
    routing_key='order.created',
    body='{"order_id": "123", "user_id": "456"}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # persistent
        content_type='application/json',
    )
)

Topic Exchange

Producer --> [Topic Exchange]
              |
              +--(routing_key="order.*.kr")--> [Korea Order Queue]
              |
              +--(routing_key="order.premium.#")--> [Premium Queue]
              |
              +--(routing_key="#")--> [Audit Queue] (all messages)

  * : Matches exactly one word
  # : Matches zero or more words

Fanout Exchange

Producer --> [Fanout Exchange]
              |
              +--> [Queue A] (all messages)
              |
              +--> [Queue B] (all messages)
              |
              +--> [Queue C] (all messages)

Headers Exchange

Routes based on message header key-value pairs instead of routing keys.

4.4 Message Acknowledgement and Prefetch

def callback(ch, method, properties, body):
    try:
        process_message(body)
        # ACK on successful processing
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # NACK on failure (requeue)
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True
        )

# Prefetch setting - limit concurrent messages per consumer
channel.basic_qos(prefetch_count=10)

channel.basic_consume(
    queue='order_created',
    on_message_callback=callback,
    auto_ack=False  # Manual ACK
)

channel.start_consuming()

Prefetch Strategies:

  • prefetch_count=1: Process one at a time, fairest distribution
  • prefetch_count=10-50: Balance between throughput and fairness
  • prefetch_count=0: Unlimited (not recommended)

4.5 Dead Letter Queue (DLQ)

# DLQ configuration
channel.queue_declare(
    queue='order_processing',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'order.failed',
        'x-message-ttl': 30000,       # 30 second TTL
        'x-max-length': 10000,         # Max queue length
    }
)

# DLX (Dead Letter Exchange) configuration
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='order_failed', durable=True)
channel.queue_bind(
    exchange='dlx',
    queue='order_failed',
    routing_key='order.failed'
)

When messages go to Dead Letter:

  1. Consumer rejects/nacks a message (requeue=False)
  2. Message TTL expires
  3. Queue maximum length exceeded

4.6 Publisher Confirms

channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='orders',
        routing_key='order.created',
        body=message_body,
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True  # Return if routing fails
    )
    print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
    print("Message could not be routed")
except pika.exceptions.NackError:
    print("Message was nacked by broker")

4.7 Clustering and High Availability

Classic Mirrored Queue (Legacy):
  Node 1 (Leader)  <-mirror-> Node 2 (Mirror) <-mirror-> Node 3 (Mirror)
  Issues: split-brain risk, synchronization overhead

Quorum Queue (Recommended):
  Node 1 (Leader)  <-Raft-> Node 2 (Follower) <-Raft-> Node 3 (Follower)
  Benefits: Raft consensus protocol, no split-brain, guaranteed data safety

Quorum Queue Declaration:

channel.queue_declare(
    queue='important_orders',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,
    }
)

4.8 Streams (Kafka-style Feature)

Streams, introduced in RabbitMQ 3.9+, are similar to Kafka's log-based storage.

channel.queue_declare(
    queue='order_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-length-bytes': 1073741824,  # 1GB
        'x-stream-max-segment-size-bytes': 52428800,  # 50MB
    }
)

5. LLM Gateway Async Processing Patterns

5.1 Why LLM Calls Need Queues

LLM API calls have different characteristics than typical REST APIs.

CharacteristicTypical APILLM API
Response Time50-200ms2-30 seconds
CostNearly freePer-token pricing
Rate LimitHighLow (RPM/TPM limits)
Error RateLowRelatively high (429, 500, timeout)
Response SizeConsistentVariable (depends on token count)

Problems Queues Solve:

  1. Rate Limit Management: Control API call speed from the queue
  2. Cost Control: Queue requests and process by priority
  3. Failure Handling: Store in queue during API outages, retry later
  4. Load Distribution: Worker pool processes user requests sequentially

5.2 Architecture Diagram

+----------+     +-------------+     +----------+     +--------------+
|          |     |             |     |          |     |              |
|  Client  +---->+ FastAPI     +---->+ Request  +---->+ Worker Pool  |
|  (Web)   |     | Gateway     |     | Queue    |     | (Celery)     |
|          |     |             |     | (Redis)  |     |              |
+----------+     +------+------+     +----------+     +------+-------+
                        |                                     |
                        |            +----------+             |
                        |            |          |             |
                        +<-----------+ Response +<------------+
                        |            | Store    |
                        |            | (Redis)  |
                        |            +----------+
                        v
                  +----------+
                  |  Status  |
                  |  SSE/WS  |
                  +----------+

5.3 Priority Queue Pattern

from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 0   # Real-time chatbot responses
    HIGH = 1       # Premium users
    NORMAL = 2     # Standard users
    LOW = 3        # Batch processing
    BACKGROUND = 4 # Non-real-time analytics

# Celery Task with Priority
from celery import Celery

app = Celery('llm_gateway', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def call_llm(self, request_id, model, messages, priority):
    try:
        response = litellm.completion(
            model=model,
            messages=messages,
            timeout=30,
        )
        store_response(request_id, response)
        return response
    except Exception as exc:
        # Exponential backoff
        retry_delay = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=retry_delay)

5.4 Retry with Exponential Backoff

import asyncio
import random

async def call_llm_with_retry(
    model: str,
    messages: list,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
):
    for attempt in range(max_retries + 1):
        try:
            response = await litellm.acompletion(
                model=model,
                messages=messages,
                timeout=30,
            )
            return response
        except Exception as e:
            if attempt == max_retries:
                raise

            # Exponential backoff + jitter
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            actual_delay = delay + jitter

            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {actual_delay:.1f}s...")
            await asyncio.sleep(actual_delay)

5.5 Production Implementation: FastAPI + Celery + Redis

# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
import uuid

app = FastAPI()

class LLMRequest(BaseModel):
    model: str = "gpt-4o"
    messages: list
    priority: int = 2

class LLMResponse(BaseModel):
    request_id: str
    status: str
    result: dict = None

@app.post("/api/v1/completions")
async def create_completion(req: LLMRequest):
    request_id = str(uuid.uuid4())

    # Send to Celery queue based on priority
    task = call_llm.apply_async(
        args=[request_id, req.model, req.messages, req.priority],
        queue=f"llm_priority_{req.priority}",
        priority=req.priority,
    )

    return {
        "request_id": request_id,
        "task_id": task.id,
        "status": "queued",
    }

@app.get("/api/v1/completions/{task_id}")
async def get_completion(task_id: str):
    result = AsyncResult(task_id)

    if result.ready():
        return {
            "status": "completed",
            "result": result.get(),
        }
    elif result.failed():
        return {
            "status": "failed",
            "error": str(result.result),
        }
    else:
        return {
            "status": "processing",
        }
# celery_config.py
from celery import Celery

app = Celery('llm_gateway')

app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    task_routes={
        'tasks.call_llm': {
            'queue': 'llm_default',
        },
    },
    # Priority queue settings
    task_queue_max_priority=10,
    task_default_priority=5,
    # Worker concurrency limit (Rate Limit handling)
    worker_concurrency=4,
    # Task time limits
    task_soft_time_limit=120,
    task_time_limit=180,
)
# docker-compose.yml
version: '3.8'
services:
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000
    ports:
      - '8000:8000'
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379

  worker-high:
    build: .
    command: celery -A tasks worker -Q llm_priority_0,llm_priority_1 -c 4
    depends_on:
      - redis

  worker-normal:
    build: .
    command: celery -A tasks worker -Q llm_priority_2 -c 8
    depends_on:
      - redis

  worker-low:
    build: .
    command: celery -A tasks worker -Q llm_priority_3,llm_priority_4 -c 2
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

  flower:
    build: .
    command: celery -A tasks flower --port=5555
    ports:
      - '5555:5555'
    depends_on:
      - redis

6. Kafka vs RabbitMQ Comparison

6.1 Comprehensive Comparison Table

AspectApache KafkaRabbitMQ
Message ModelPull-based, Log appendPush-based, Queue
ThroughputMillions msg/secTens of thousands msg/sec
Latency~5ms (p99)Sub-millisecond possible
Message RetentionConfigurable retention (default 7 days)Deleted after consumption
OrderingGuaranteed within partitionGuaranteed within queue
RoutingTopic/Partition basedExchange/Binding based (flexible)
ProtocolCustom protocolAMQP, MQTT, STOMP
ScalingHorizontal via partition additionClustering + Sharding
MetadataKRaft (built-in)Erlang distributed system
Stream ProcessingKafka Streams built-inNone (external tools needed)
Operational ComplexityMedium-HighMedium
Learning CurveHighMedium
Message SizeDefault 1MB (configurable)No limit (practically several MB)

6.2 Performance Benchmarks (Reference)

Test Environment: 3 brokers/nodes, 3 replicas, 1KB message size

Apache Kafka:
  - Single partition: ~50,000 msg/sec
  - 12 partitions:    ~800,000 msg/sec
  - 60 partitions:    ~2,000,000+ msg/sec
  - Latency p99: 5ms

RabbitMQ (Quorum Queue):
  - Single queue:     ~20,000 msg/sec
  - 10 queues:        ~100,000 msg/sec
  - Latency p99: 1ms

6.3 Selection Guide

Choose Kafka when:

  • High-volume event streaming (logs, metrics, clickstreams)
  • Implementing event sourcing patterns
  • Real-time data pipelines (ETL/ELT)
  • Multiple consumers independently consuming the same messages
  • Message replay is needed
  • Stream processing with Kafka Streams/ksqlDB is required

Choose RabbitMQ when:

  • Complex routing patterns are needed (Topic, Headers Exchange)
  • Task queue patterns
  • Low latency is critical
  • Multiple protocol support is needed (AMQP, MQTT, STOMP)
  • Fine-grained control like per-message TTL, priority is needed
  • Integration with existing AMQP ecosystem

For LLM Gateways?

Small/Medium Scale (under 100K requests/day):
  RabbitMQ + Celery recommended
  - Easy setup, excellent Priority Queue support
  - Dead Letter Queue for failure handling

Large Scale (over 1M requests/day):
  Kafka-based implementation recommended
  - High throughput, message retention for reprocessing
  - Flexible scaling with Consumer Groups
  - Event log for cost/usage analytics

7. Production Monitoring Setup

7.1 Kafka Monitoring

# Prometheus JMX Exporter config (prometheus-jmx-config.yaml)
rules:
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec>'
    name: kafka_server_messages_in_total
    type: COUNTER
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec>'
    name: kafka_server_bytes_in_total
    type: COUNTER
  - pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+)><>records-lag-max'
    name: kafka_consumer_lag_max
    type: GAUGE

Key Monitoring Metrics:

  • Consumer Lag: How far behind consumers are from producers
  • ISR Shrink/Expand: Replication state changes
  • Under-Replicated Partitions: Partitions with insufficient replicas
  • Request Latency: Request processing time

7.2 RabbitMQ Monitoring

Key Metrics:
  - Queue Depth: Number of messages in queue
  - Consumer Utilization: Consumer utilization rate
  - Publish/Deliver Rate: Messages per second published/delivered
  - Unacked Messages: Messages awaiting acknowledgement
  - Memory/Disk Usage: Resource consumption

RabbitMQ Management UI: http://localhost:15672
Prometheus Plugin: rabbitmq_prometheus (built-in)

8. Conclusion

Message queue-based asynchronous architecture is a core component of modern distributed systems.

Key Takeaways:

  1. Kafka: High-volume event streaming, log-based storage, flexible scaling with Consumer Groups
  2. RabbitMQ: Flexible routing, low latency, optimal for traditional message queue patterns
  3. LLM Gateway: Async processing achieves rate limit management, cost control, and fault isolation
  4. Selection Criteria: Decide based on throughput, latency, routing complexity, and message retention needs

Both technologies have mature ecosystems, and you should choose based on your project requirements. Kafka remains the best choice for large-scale event streaming, while RabbitMQ excels at work distribution and flexible routing.


References