1. 왜 메시지 큐인가?
시장 규모와 도입 현황
글로벌 메시지 큐 미들웨어 시장은 2025년 기준 약 **60억 달러(6B USD)** 규모로 성장했습니다. Fortune Business Insights 보고서에 따르면 기업의 **41%**가 이미 메시지 큐 기반 비동기 아키텍처를 도입했고, 추가 27%가 도입을 계획하고 있습니다.
왜 이렇게 많은 기업이 메시지 큐를 채택할까요?
동기 vs 비동기 통신
graph LR
subgraph "동기 방식 - 강한 결합"
A[주문 서비스] -->|HTTP 호출| B[결제 서비스]
B -->|HTTP 호출| C[재고 서비스]
C -->|HTTP 호출| D[알림 서비스]
end
동기 방식에서는 결제 서비스가 1초 지연되면 전체 체인이 1초 이상 느려집니다. 하나가 죽으면 연쇄 장애가 발생합니다.
graph LR
subgraph "비동기 방식 - 느슨한 결합"
A2[주문 서비스] -->|이벤트 발행| MQ[메시지 큐]
MQ -->|구독| B2[결제 서비스]
MQ -->|구독| C2[재고 서비스]
MQ -->|구독| D2[알림 서비스]
end
비동기 방식에서는 주문 서비스가 메시지만 발행하면 됩니다. 결제 서비스가 일시적으로 다운되어도 메시지는 큐에 보관되어 복구 후 처리됩니다.
메시지 큐 도입이 필요한 5가지 신호
1. **서비스 간 동기 호출**이 3단계 이상 체이닝되는 경우
2. **트래픽 스파이크**로 인해 다운스트림 서비스가 과부하에 빠지는 경우
3. **이벤트 리플레이**가 필요한 경우 (장애 복구, 데이터 재처리)
4. **다수의 소비자**가 동일한 데이터를 각기 다른 목적으로 처리하는 경우
5. **마이크로서비스 전환** 시 서비스 간 결합도를 낮추려는 경우
2. 아키텍처 비교: 로그 vs 브로커 vs 하이브리드
메시지 큐는 크게 세 가지 아키텍처 패러다임으로 나뉩니다.
2.1 Apache Kafka — 분산 커밋 로그
graph TB
subgraph "Kafka 클러스터"
direction TB
KR[KRaft Controller] --> B1[Broker 1]
KR --> B2[Broker 2]
KR --> B3[Broker 3]
end
P[Producer] --> B1
B1 --> T["Topic: orders (3 partitions)"]
T --> P0[Partition 0 - Leader: B1]
T --> P1[Partition 1 - Leader: B2]
T --> P2[Partition 2 - Leader: B3]
P0 --> CG["Consumer Group"]
P1 --> CG
P2 --> CG
**핵심 특징:**
- **Append-Only 커밋 로그**: 메시지는 파티션에 순서대로 추가되며, 불변(immutable)입니다
- **Consumer Offset**: 소비자가 어디까지 읽었는지를 자체 관리 — 리플레이 가능
- **KRaft (Kafka 4.0)**: ZooKeeper를 완전히 제거하고 Raft 기반 메타데이터 관리
- **보존 정책**: 시간 또는 크기 기반으로 메시지를 유지 (기본 7일)
- **파티션 병렬 처리**: 파티션 수만큼 컨슈머를 병렬로 확장
**적합한 경우**: 이벤트 스트리밍, 로그 집계, 이벤트 소싱, 실시간 분석
2.2 RabbitMQ — AMQP 메시지 브로커
graph LR
P[Producer] --> E["Exchange (type: topic)"]
E -->|"routing.key.a"| Q1[Queue A]
E -->|"routing.key.b"| Q2[Queue B]
E -->|"routing.#"| Q3[Queue C - 와일드카드]
Q1 --> C1[Consumer 1]
Q2 --> C2[Consumer 2]
Q3 --> C3[Consumer 3]
**핵심 특징:**
- **AMQP 프로토콜**: Exchange, Binding, Queue의 유연한 라우팅 모델
- **Exchange 유형**: Direct, Topic, Fanout, Headers — 복잡한 라우팅 가능
- **메시지 ACK**: Consumer가 확인 응답을 보내야 메시지가 큐에서 제거됨
- **플러그인 생태계**: Shovel, Federation, Management UI 등
- **Priority Queue**: 메시지 우선순위 설정 가능
**적합한 경우**: 복잡한 라우팅, RPC 패턴, 태스크 분배, 기존 AMQP 에코시스템
2.3 Amazon SQS — 완전 관리형 큐
graph LR
P[Producer] -->|SendMessage| SQS["SQS Queue"]
SQS -->|ReceiveMessage| C1[Consumer 1]
SQS -->|ReceiveMessage| C2[Consumer 2]
SQS -.->|실패 시| DLQ["Dead Letter Queue"]
**핵심 특징:**
- **완전 관리형**: 서버 프로비저닝, 패치, 스케일링 불필요
- **Pull 기반**: Consumer가 능동적으로 메시지를 가져가는 구조
- **Standard vs FIFO**: 무제한 처리량(Standard) 또는 순서 보장(FIFO, 초당 300~3,000 TPS)
- **Visibility Timeout**: 메시지 처리 중 다른 소비자에게 보이지 않음
- **DLQ 네이티브 지원**: 처리 실패 메시지 자동 이동
**적합한 경우**: 서버리스 아키텍처, AWS 네이티브 워크로드, 운영 부담 최소화
2.4 Apache Pulsar — 분리형 아키텍처
graph TB
subgraph "Pulsar 클러스터"
direction TB
B1[Broker 1 - Serving] --> BK1[BookKeeper 1 - Storage]
B2[Broker 2 - Serving] --> BK2[BookKeeper 2 - Storage]
B3[Broker 3 - Serving] --> BK3[BookKeeper 3 - Storage]
end
P[Producer] --> B1
B1 --> T["Topic: events"]
T --> SUB["Subscription (Shared/Key_Shared/Exclusive)"]
SUB --> C1[Consumer 1]
SUB --> C2[Consumer 2]
**핵심 특징:**
- **Compute/Storage 분리**: Broker(서빙)와 BookKeeper(저장)가 독립적으로 스케일링
- **Multi-Tenancy**: 네임스페이스와 테넌트 단위의 격리
- **Geo-Replication**: 네이티브 교차 데이터센터 복제
- **Tiered Storage**: 오래된 데이터를 S3 같은 저비용 스토리지로 자동 이관
- **다양한 구독 모드**: Exclusive, Shared, Failover, Key_Shared
**적합한 경우**: 멀티 테넌트 환경, 지역 분산, 대규모 스트리밍
2.5 NATS — 경량 메시지 메시
graph LR
subgraph "NATS 클러스터"
N1[NATS Server 1] --- N2[NATS Server 2]
N2 --- N3[NATS Server 3]
N1 --- N3
end
P[Publisher] --> N1
N1 -->|"subject: sensor.temp"| S1[Subscriber 1]
N2 -->|"subject: sensor.*"| S2[Subscriber 2 - 와일드카드]
N3 -->|JetStream| JS[JetStream Consumer]
**핵심 특징:**
- **Core NATS**: 최소한의 지연시간을 위한 fire-and-forget Pub/Sub
- **JetStream**: 영속성, 리플레이, exactly-once가 필요할 때 활성화
- **경량 바이너리**: 단일 바이너리 약 20MB, Go로 작성
- **Subject 기반 라우팅**: 계층적 subject 이름과 와일드카드 매칭
- **Leaf Node**: 엣지 컴퓨팅을 위한 경량 노드 확장
**적합한 경우**: IoT, 엣지 컴퓨팅, 마이크로서비스 간 빠른 통신, 클라우드 네이티브
2.6 Redis Streams — 인메모리 로그 구조
graph LR
P[Producer] -->|XADD| RS["Redis Stream (mystream)"]
RS -->|XREADGROUP| CG["Consumer Group"]
CG --> C1["Consumer 1 (pending entries)"]
CG --> C2["Consumer 2 (pending entries)"]
**핵심 특징:**
- **인메모리**: 초저지연, 높은 처리량
- **Consumer Group**: Kafka와 유사한 컨슈머 그룹 모델
- **XADD/XREAD**: 간단한 커맨드 기반 API
- **Pending Entries List (PEL)**: 미확인 메시지 추적
- **MAXLEN 트리밍**: 메모리 한도 내에서 자동 정리
**적합한 경우**: 경량 스트리밍, 이미 Redis를 사용 중인 환경, 캐시 + 큐 통합
아키텍처 패러다임 요약
| 시스템 | 패러다임 | 메시지 저장 | 소비 모델 | 메시지 삭제 |
| ------------- | -------------- | ------------------- | --------------------- | ----------------- |
| Kafka | 분산 커밋 로그 | 디스크 (보존 정책) | Pull + Offset | 보존 기간 후 삭제 |
| RabbitMQ | 메시지 브로커 | 메모리 + 디스크 | Push (ACK 후 삭제) | ACK 시 즉시 삭제 |
| SQS | 관리형 큐 | AWS 관리형 | Pull | 처리 후 삭제 |
| Pulsar | 분리형 로그 | BookKeeper + Tiered | Pull + 커서 | 보존 정책 기반 |
| NATS | 메시지 메시 | 메모리 / JetStream | Push / Pull | 정책 기반 |
| Redis Streams | 인메모리 로그 | 메모리 (AOF 옵션) | Pull (Consumer Group) | MAXLEN 트리밍 |
3. 성능 벤치마크
처리량과 지연시간 비교
다음 벤치마크는 표준적인 3-노드 클러스터 환경 (m5.2xlarge 또는 동급)에서의 측정치입니다.
| 시스템 | 처리량 (msgs/s) | P50 지연시간 | P99 지연시간 | 최적 사용처 |
| ----------------- | ------------------- | ------------ | --------------------------------------- | ---------------------------------- |
| **Kafka** | 500K ~ 1M | 5 ~ 15ms | 10 ~ 50ms | 이벤트 스트림, 리플레이, 로그 집계 |
| **Pulsar** | 1M ~ 2.6M | 5 ~ 10ms | Kafka 대비 300x 개선된 P99 tail latency | 멀티 테넌시, 지역 복제 |
| **NATS** | 200K ~ 400K | sub-ms | 1 ~ 5ms | IoT, 엣지, 빠른 메시징 |
| **RabbitMQ** | 50K ~ 100K | 1 ~ 5ms | 5 ~ 20ms | 복잡한 라우팅, RPC |
| **Redis Streams** | 100K ~ 500K | sub-ms | 1 ~ 3ms | 경량 스트리밍, 캐시 통합 |
| **SQS** | 3K ~ 30K (Standard) | 20 ~ 50ms | 50 ~ 100ms | 서버리스, AWS 네이티브 |
처리량 상세 분석
**Kafka 1M msgs/s의 비밀:**
Kafka 처리량 최적화 구성
num.partitions=12
batch.size=65536 # 64KB 배치
linger.ms=5 # 5ms 대기 후 배치 전송
compression.type=lz4 # LZ4 압축
buffer.memory=67108864 # 64MB 버퍼
acks=1 # Leader만 확인 (최대 처리량)
- 파티션 12개 기준, 각 파티션당 약 83K msgs/s
- `acks=all`로 변경 시 약 30~40% 처리량 감소하지만 데이터 안정성 확보
- LZ4 압축으로 네트워크 대역폭 40~60% 절감
**Pulsar가 P99에서 강한 이유:**
- Broker와 BookKeeper 분리로 쓰기 I/O와 읽기 I/O가 경쟁하지 않음
- BookKeeper의 Journal(WAL)과 Ledger(데이터)가 물리적으로 분리된 디스크 사용
- 결과: Kafka 대비 P99 tail latency가 최대 300배 개선
**NATS sub-ms 달성 조건:**
NATS Core (JetStream 비활성화)
- 영속성 없이 in-memory only
- 10KB 이하 메시지 크기
- 단일 데이터센터 내 통신
확장성 비교
| 시스템 | 수평 확장 방식 | 파티션/큐 한도 | 클러스터 최대 규모 |
| ------------- | --------------------------- | ----------------------------------- | ------------------ |
| Kafka | 파티션 추가 + Broker 추가 | 실무 권장 4,000 ~ 20,000개/클러스터 | 수백 대 브로커 |
| Pulsar | Broker/BookKeeper 독립 확장 | 100만+ 토픽 가능 | 수천 대 노드 |
| NATS | Leaf Node 확장 | 제한 없음 (subject 기반) | 슈퍼 클러스터 |
| RabbitMQ | Quorum Queue + 노드 추가 | 실무 권장 수천 개 | 수십 대 노드 |
| Redis Streams | Redis Cluster 샤딩 | 샤드당 하나의 Stream | 수백 대 노드 |
| SQS | 자동 (무제한) | 무제한 | AWS 관리형 |
4. 2025 신기능 하이라이트
4.1 Kafka 4.0 — KRaft GA와 큐 모드
**KRaft GA (KIP-833):**
Kafka 4.0에서 ZooKeeper가 완전히 제거되었습니다. 모든 메타데이터 관리가 KRaft 컨트롤러로 이관됩니다.
Kafka 4.0 KRaft 모드 설정
process.roles: controller,broker
node.id: 1
controller.quorum.voters: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners: PLAINTEXT://:9092,CONTROLLER://:9093
ZooKeeper 관련 설정 완전 제거됨
개선 효과:
- 메타데이터 전파 속도 10배 향상
- 파티션 수 100만 개 이상으로 확장 가능
- 운영 복잡도 대폭 감소 (ZooKeeper 클러스터 별도 관리 불필요)
- 클러스터 시작 시간 수 분에서 수 초로 단축
**KIP-932: Queues for Kafka (Share Groups):**
Kafka에 전통적인 "큐" 의미론이 추가되었습니다. 기존에는 같은 컨슈머 그룹 내에서도 파티션 단위로만 분배할 수 있었지만, Share Groups를 사용하면 메시지 단위로 분배됩니다.
// Kafka 4.0 Share Group 사용 예시
Properties props = new Properties();
props.put("group.id", "my-share-group");
props.put("group.type", "share"); // 새로운 Share 타입
KafkaShareConsumer<String, String> consumer =
new KafkaShareConsumer<>(props);
consumer.subscribe(List.of("orders"));
// 메시지 단위로 분배 - 파티션에 종속되지 않음
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
consumer.acknowledge(record); // 개별 메시지 ACK
}
}
4.2 RabbitMQ 4.x — SQL 필터와 AMQP 1.0
**Native AMQP 1.0 지원:**
RabbitMQ 4.0부터 AMQP 1.0이 코어 프로토콜로 승격되었습니다. 기존 AMQP 0.9.1은 레거시 플러그인으로 전환됩니다.
**Stream Filtering (4.2):**
// RabbitMQ 4.2 Stream SQL Filter
// 서버 사이드에서 필터링 - 네트워크 대역폭 절약
Consumer consumer = environment.consumerBuilder()
.stream("events")
.filter()
.values("region-us", "priority-high") // 서버 사이드 필터
.postFilter(msg -> msg.getProperties().getSubject().equals("order"))
.builder()
.messageHandler((context, message) -> {
// 필터링된 메시지만 수신
processMessage(message);
context.storeOffset();
})
.build();
처리 성능: 필터링 시에도 **4M+ msgs/s** 유지
**Khepri 메타데이터 저장소:**
Mnesia를 대체하는 Raft 기반 메타데이터 저장소로, 대규모 클러스터에서의 안정성이 크게 개선되었습니다.
4.3 Pulsar 4.1 — 19 PIPs
Apache Pulsar 4.1은 19개의 Pulsar Improvement Proposals(PIPs)가 포함되었습니다.
주요 변경:
- **Java 17 필수**: Java 8/11 지원 종료
- **PIP-354**: Topic Policy 성능 최적화
- **PIP-362**: Broker Level Dispatch Throttling
- **PIP-374**: 개선된 메모리 관리로 GC 부담 감소
4.4 NATS 2.11 — 메시지 TTL과 분산 트레이싱
NATS 2.11 JetStream 메시지 TTL 설정
nats stream add EVENTS \
--subjects "events.>" \
--max-msg-ttl 3600 \
--storage file \
--replicas 3
주요 변경:
- **메시지 레벨 TTL**: 스트림 전체가 아닌 개별 메시지에 만료 시간 설정
- **분산 트레이싱**: OpenTelemetry 통합으로 메시지 흐름 추적
- **SubjectTransform**: subject 이름을 동적으로 변환
- **개선된 모니터링**: Prometheus 메트릭 확장
5. 클라우드 매니지드 서비스 비교 + 가격
주요 매니지드 서비스
| 서비스 | 기반 기술 | 제공사 | 주요 특징 |
| ------------------------- | --------- | ------------ | ----------------------------------- |
| **Confluent Cloud** | Kafka | Confluent | Schema Registry, ksqlDB, Connectors |
| **Amazon MSK** | Kafka | AWS | VPC 통합, Serverless 옵션 |
| **Amazon MSK Serverless** | Kafka | AWS | 자동 스케일링, 사용량 과금 |
| **Amazon SQS** | 자체 | AWS | 완전 관리형, 서버리스 통합 |
| **CloudAMQP** | RabbitMQ | 84codes | Dedicated/Shared 인스턴스 |
| **StreamNative** | Pulsar | StreamNative | Pulsar 전문 매니지드 |
| **Synadia** | NATS | Synadia | NATS 전문 매니지드 |
월 비용 시뮬레이션 (일일 1억 메시지, 1KB 평균)
시나리오: 일일 1억 메시지, 평균 1KB, 3일 보존
Confluent Cloud (Basic):
- 처리량: ~1,160 msgs/s 평균
- 예상 비용: 월 약 800 ~ 1,200 USD
- 포함: 브로커, 스토리지, 네트워크
Amazon MSK Provisioned (kafka.m5.large x 3):
- 인스턴스: 0.21 USD/h x 3 = 월 약 454 USD
- 스토리지: 300GB x 0.10 USD/GB = 30 USD
- 예상 총비용: 월 약 500 ~ 700 USD
Amazon MSK Serverless:
- 클러스터: 0.75 USD/h = 월 약 540 USD
- 파티션: 0.0015 USD/h/partition
- 스토리지: 0.10 USD/GB
- 예상 총비용: 월 약 600 ~ 1,000 USD
Amazon SQS Standard:
- 처음 100만 건 무료
- 이후: 0.40 USD / 100만 건
- 월 약 30억 건: 약 1,200 USD
- 데이터 전송 비용 별도
CloudAMQP (Dedicated - Tiger):
- 월 약 500 ~ 1,000 USD
- 클러스터 포함, 모니터링 포함
자체 호스팅 Kafka (c5.2xlarge x 3 + EBS):
- EC2: 0.34 USD/h x 3 = 월 약 734 USD
- EBS gp3 300GB x 3: 월 약 72 USD
- 운영 인건비 별도 (DevOps 1인 월 최소 수백만 원)
- 예상 총비용: 월 약 800 USD + 운영비
선택 기준 정리
- **비용 최소화**: 소규모라면 SQS (사용량 과금), 대규모라면 자체 호스팅 Kafka
- **운영 부담 최소화**: SQS 또는 MSK Serverless
- **기능 최대화**: Confluent Cloud (Schema Registry, ksqlDB)
- **RabbitMQ 필요 시**: CloudAMQP
6. Spring Boot 연동 코드
6.1 Kafka + Spring Boot
// build.gradle
// implementation 'org.springframework.kafka:spring-kafka:3.3.0'
// application.yml
// spring:
// kafka:
// bootstrap-servers: localhost:9092
// producer:
// key-serializer: org.apache.kafka.common.serialization.StringSerializer
// value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
// consumer:
// group-id: order-service
// auto-offset-reset: earliest
// key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
// value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// KafkaProducerService.java
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public CompletableFuture<SendResult<String, OrderEvent>> sendOrderEvent(
OrderEvent event) {
return kafkaTemplate.send("orders", event.getOrderId(), event);
}
}
// KafkaConsumerService.java
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(
topics = "orders",
groupId = "order-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Received order: {} from partition: {}, offset: {}",
event.getOrderId(), partition, offset);
processOrder(event);
}
private void processOrder(OrderEvent event) {
// 주문 처리 로직
}
}
6.2 RabbitMQ + Spring Boot
// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-amqp'
// application.yml
// spring:
// rabbitmq:
// host: localhost
// port: 5672
// username: guest
// password: guest
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
public static final String ORDER_EXCHANGE = "order.exchange";
public static final String ORDER_QUEUE = "order.queue";
public static final String ORDER_ROUTING_KEY = "order.created";
public static final String DLQ_QUEUE = "order.dlq";
@Bean
public TopicExchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE);
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(ORDER_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", DLQ_QUEUE)
.withArgument("x-message-ttl", 60000)
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_QUEUE).build();
}
@Bean
public Binding orderBinding(Queue orderQueue, TopicExchange orderExchange) {
return BindingBuilder.bind(orderQueue)
.to(orderExchange)
.with(ORDER_ROUTING_KEY);
}
}
// RabbitMQProducer.java
@Service
@RequiredArgsConstructor
public class RabbitMQProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrderEvent(OrderEvent event) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,
RabbitMQConfig.ORDER_ROUTING_KEY,
event,
message -> {
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setDeliveryMode(
MessageDeliveryMode.PERSISTENT);
return message;
}
);
}
}
// RabbitMQConsumer.java
@Service
@Slf4j
public class RabbitMQConsumer {
@RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)
public void handleOrderEvent(OrderEvent event, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
log.info("Received order: {}", event.getOrderId());
processOrder(event);
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("Failed to process order: {}", event.getOrderId(), e);
channel.basicNack(tag, false, false); // DLQ로 이동
}
}
}
6.3 SQS + Spring Boot
// build.gradle
// implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs:3.2.0'
// application.yml
// spring:
// cloud:
// aws:
// region:
// static: ap-northeast-2
// sqs:
// endpoint: https://sqs.ap-northeast-2.amazonaws.com
// SQSProducer.java
@Service
@RequiredArgsConstructor
public class SQSProducer {
private final SqsTemplate sqsTemplate;
public void sendOrderEvent(OrderEvent event) {
sqsTemplate.send(to -> to
.queue("order-queue")
.payload(event)
.header("eventType", "ORDER_CREATED")
);
}
// FIFO 큐 전송
public void sendOrderEventFifo(OrderEvent event) {
sqsTemplate.send(to -> to
.queue("order-queue.fifo")
.payload(event)
.header(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER,
event.getOrderId())
.header(SqsHeaders.MessageSystemAttributes
.SQS_MESSAGE_DEDUPLICATION_ID_HEADER,
UUID.randomUUID().toString())
);
}
}
// SQSConsumer.java
@Service
@Slf4j
public class SQSConsumer {
@SqsListener(value = "order-queue", maxConcurrentMessages = "10",
maxMessagesPerPoll = "5")
public void handleOrderEvent(@Payload OrderEvent event,
@Header("eventType") String eventType) {
log.info("Received order: {} with type: {}",
event.getOrderId(), eventType);
processOrder(event);
}
}
6.4 Pulsar + Spring Boot
// build.gradle
// implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:1.2.0'
// application.yml
// spring:
// pulsar:
// client:
// service-url: pulsar://localhost:6650
// producer:
// topic-name: persistent://public/default/orders
// consumer:
// subscription-name: order-service-sub
// subscription-type: shared
// PulsarProducer.java
@Service
@RequiredArgsConstructor
public class PulsarProducer {
private final PulsarTemplate<OrderEvent> pulsarTemplate;
public void sendOrderEvent(OrderEvent event) {
pulsarTemplate.send("persistent://public/default/orders", event);
}
}
// PulsarConsumer.java
@Service
@Slf4j
public class PulsarConsumer {
@PulsarListener(
topics = "persistent://public/default/orders",
subscriptionName = "order-service-sub",
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON
)
public void handleOrderEvent(OrderEvent event) {
log.info("Received order: {}", event.getOrderId());
processOrder(event);
}
}
6.5 NATS + Spring Boot
// build.gradle
// implementation 'io.nats:jnats:2.20.4'
// NATSConfig.java
@Configuration
public class NATSConfig {
@Bean
public Connection natsConnection() throws IOException, InterruptedException {
Options options = new Options.Builder()
.server("nats://localhost:4222")
.reconnectWait(Duration.ofSeconds(2))
.maxReconnects(-1) // 무한 재연결
.connectionListener((conn, type) -> {
System.out.println("NATS connection event: " + type);
})
.build();
return Nats.connect(options);
}
@Bean
public JetStream jetStream(Connection connection) throws IOException {
return connection.jetStream();
}
}
// NATSProducer.java
@Service
@RequiredArgsConstructor
public class NATSProducer {
private final JetStream jetStream;
private final ObjectMapper objectMapper;
public PublishAck sendOrderEvent(OrderEvent event) throws Exception {
byte[] data = objectMapper.writeValueAsBytes(event);
Message msg = NatsMessage.builder()
.subject("orders.created")
.data(data)
.build();
return jetStream.publish(msg);
}
}
// NATSConsumer.java
@Service
@Slf4j
public class NATSConsumer {
@PostConstruct
public void startConsumer() throws Exception {
JetStream js = natsConnection.jetStream();
PushSubscribeOptions options = PushSubscribeOptions.builder()
.durable("order-consumer")
.build();
js.subscribe("orders.>", "order-queue", dispatcher -> {
dispatcher.onMessage(msg -> {
try {
OrderEvent event = objectMapper.readValue(
msg.getData(), OrderEvent.class);
log.info("Received order: {}", event.getOrderId());
processOrder(event);
msg.ack();
} catch (Exception e) {
log.error("Failed to process message", e);
msg.nak();
}
});
}, false, options);
}
}
6.6 Redis Streams + Spring Boot
// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-redis'
// RedisStreamConfig.java
@Configuration
public class RedisStreamConfig {
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
streamListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(1))
.batchSize(10)
.targetType(MapRecord.class)
.build();
var container = StreamMessageListenerContainer.create(factory, options);
container.receiveAutoAck(
Consumer.from("order-group", "consumer-1"),
StreamOffset.create("orders", ReadOffset.lastConsumed()),
new OrderStreamListener()
);
container.start();
return container;
}
}
// RedisStreamProducer.java
@Service
@RequiredArgsConstructor
public class RedisStreamProducer {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
public RecordId sendOrderEvent(OrderEvent event) throws Exception {
Map<String, String> fields = Map.of(
"orderId", event.getOrderId(),
"type", "ORDER_CREATED",
"payload", objectMapper.writeValueAsString(event)
);
return redisTemplate.opsForStream()
.add(StreamRecords.newRecord()
.ofMap(fields)
.withStreamKey("orders"));
}
}
// OrderStreamListener.java
@Slf4j
public class OrderStreamListener
implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
Map<String, String> body = message.getValue();
log.info("Received order: {} from stream: {}",
body.get("orderId"), message.getStream());
processOrder(body);
}
}
7. 의사결정 플로우차트
메시지 큐 선택을 위한 의사결정 트리입니다.
graph TD
START[메시지 큐 선택] --> Q1["이벤트 리플레이가 필요한가?"]
Q1 -->|예| Q1A["멀티 테넌시 또는 지역 복제가 필요한가?"]
Q1A -->|예| PULSAR["Apache Pulsar"]
Q1A -->|아니오| KAFKA["Apache Kafka"]
Q1 -->|아니오| Q2["복잡한 라우팅이 필요한가?"]
Q2 -->|예| RABBIT["RabbitMQ"]
Q2 -->|아니오| Q3["AWS 네이티브 서버리스인가?"]
Q3 -->|예| SQS["Amazon SQS"]
Q3 -->|아니오| Q4["초저지연 sub-ms가 필수인가?"]
Q4 -->|예| Q4A["영속성이 필요한가?"]
Q4A -->|예| NATS["NATS + JetStream"]
Q4A -->|아니오| NATS_CORE["NATS Core"]
Q4 -->|아니오| Q5["이미 Redis를 사용 중인가?"]
Q5 -->|예| REDIS["Redis Streams"]
Q5 -->|아니오| KAFKA2["Kafka (기본 선택)"]
빠른 선택 가이드
| 요구사항 | 추천 시스템 | 이유 |
| -------------------------- | ------------------------ | -------------------------- |
| 이벤트 스트리밍 + 리플레이 | Kafka | 커밋 로그 기반, 업계 표준 |
| 복잡한 메시지 라우팅 | RabbitMQ | Exchange/Binding 모델 |
| 서버리스 + AWS 올인 | SQS | 제로 운영, Lambda 트리거 |
| 멀티 테넌시 + 지역 복제 | Pulsar | 네이티브 멀티 테넌시 |
| IoT + 엣지 컴퓨팅 | NATS | 경량, sub-ms 지연 |
| 이미 Redis 사용 중 | Redis Streams | 추가 인프라 불필요 |
| 대규모 이벤트 + 낮은 비용 | Kafka (자체 호스팅) | 높은 처리량 대비 비용 효율 |
| 운영 인력 없음 | SQS 또는 Confluent Cloud | 완전 관리형 |
8. 실전 아키텍처 패턴
8.1 이커머스: 주문 처리 파이프라인
graph LR
subgraph "주문 생성"
API[API Gateway] --> OS[Order Service]
end
OS -->|OrderCreated| KAFKA[Kafka - orders 토픽]
KAFKA --> PAY[Payment Service]
KAFKA --> INV[Inventory Service]
KAFKA --> NOTIF[Notification Service]
PAY -->|PaymentCompleted| KAFKA2[Kafka - payments 토픽]
KAFKA2 --> SHIP[Shipping Service]
INV -->|StockReserved| KAFKA3[Kafka - inventory 토픽]
KAFKA3 --> OS2[Order Service - 상태 업데이트]
SHIP -->|ShipmentCreated| KAFKA4[Kafka - shipments 토픽]
KAFKA4 --> NOTIF2[Notification Service - 배송 알림]
**왜 Kafka인가?**
- 주문 이벤트 리플레이 필요 (결제 실패 시 재처리)
- 다수의 서비스가 동일 이벤트를 독립적으로 소비
- 감사 로그로 활용 가능
**핵심 설정:**
주문 토픽 설정
topics:
orders:
partitions: 12
replication-factor: 3
configs:
retention.ms: 604800000 # 7일
min.insync.replicas: 2
cleanup.policy: delete
payments:
partitions: 6
replication-factor: 3
configs:
retention.ms: 2592000000 # 30일 (감사 로그)
min.insync.replicas: 2
8.2 실시간 분석 파이프라인
graph LR
subgraph "데이터 수집"
WEB[Web Clickstream] --> KAFKA[Kafka]
APP[App Events] --> KAFKA
IOT[IoT Sensors] --> NATS[NATS]
NATS -->|브릿지| KAFKA
end
subgraph "스트림 처리"
KAFKA --> FLINK[Apache Flink]
KAFKA --> KSQL[ksqlDB]
end
subgraph "저장 및 서빙"
FLINK --> ES[Elasticsearch]
FLINK --> CH[ClickHouse]
KSQL --> REDIS[Redis Cache]
end
subgraph "시각화"
ES --> GRAFANA[Grafana]
CH --> SUPERSET[Apache Superset]
end
**왜 Kafka + NATS 조합인가?**
- **Kafka**: 높은 처리량의 이벤트 수집과 Flink/ksqlDB 연동
- **NATS**: IoT 센서의 초저지연 요구사항 충족, 경량 에이전트
- **브릿지**: NATS에서 수집한 데이터를 Kafka로 통합
8.3 IoT 센서 데이터 수집
graph TB
subgraph "엣지 레이어"
S1[온도 센서] --> LN1[NATS Leaf Node - 공장 A]
S2[습도 센서] --> LN1
S3[진동 센서] --> LN2[NATS Leaf Node - 공장 B]
end
subgraph "코어 레이어"
LN1 --> NATS[NATS 클러스터 - JetStream]
LN2 --> NATS
NATS --> PROC[Stream Processor]
PROC -->|이상 감지| ALERT[Alert Service - RabbitMQ]
PROC -->|집계 데이터| TS[TimescaleDB]
end
ALERT -->|이메일| EMAIL[Email Service]
ALERT -->|Slack| SLACK[Slack Notification]
**왜 NATS + RabbitMQ 조합인가?**
- **NATS**: Leaf Node로 엣지까지 확장, sub-ms 지연
- **JetStream**: 센서 데이터 영속성과 리플레이
- **RabbitMQ**: 알림 라우팅 (이메일, Slack, SMS를 유연하게 분배)
8.4 Saga 패턴 — 분산 트랜잭션
sequenceDiagram
participant OS as Order Service
participant K as Kafka
participant PS as Payment Service
participant IS as Inventory Service
participant SS as Shipping Service
OS->>K: OrderCreated
K->>PS: 결제 요청
PS->>K: PaymentCompleted
K->>IS: 재고 차감
IS->>K: StockReserved
K->>SS: 배송 생성
SS->>K: ShipmentCreated
K->>OS: 주문 완료
Note over PS,IS: 실패 시 보상 트랜잭션
PS-->>K: PaymentFailed (보상)
K-->>IS: 재고 복구 (보상)
K-->>OS: 주문 취소 (보상)
**Choreography vs Orchestration:**
| 방식 | 장점 | 단점 | 추천 시나리오 |
| ------------- | -------------------------------- | -------------------------- | -------------------- |
| Choreography | 서비스 간 결합도 낮음, 확장 용이 | 흐름 파악 어려움 | 단순한 워크플로우 |
| Orchestration | 중앙 제어, 흐름 명확 | 오케스트레이터 단일 장애점 | 복잡한 비즈니스 로직 |
9. 운영 체크리스트
공통 모니터링 메트릭
| 메트릭 | 설명 | 위험 임계값 |
| -------------------- | --------------------- | ----------------- |
| Consumer Lag | 소비 지연 메시지 수 | 1만 건 이상 지속 |
| 메시지 처리 실패율 | DLQ 유입 비율 | 1% 이상 |
| Broker 디스크 사용률 | 저장소 용량 | 80% 이상 |
| 네트워크 I/O | 인/아웃 트래픽 | 대역폭의 70% 이상 |
| JVM GC 시간 | Kafka/Pulsar/RabbitMQ | Full GC 5초 이상 |
Kafka 프로덕션 체크리스트
1. Broker 설정
- min.insync.replicas = 2 (replication-factor 3 기준)
- unclean.leader.election.enable = false
- auto.create.topics.enable = false
2. Producer 설정
- acks = all (데이터 손실 방지)
- retries = Integer.MAX_VALUE
- enable.idempotence = true
- max.in.flight.requests.per.connection = 5
3. Consumer 설정
- enable.auto.commit = false (수동 커밋)
- max.poll.records = 500
- session.timeout.ms = 30000
4. 모니터링
- Prometheus + Grafana (JMX Exporter)
- Consumer Lag 알림 설정
- Under-Replicated Partitions 알림
RabbitMQ 프로덕션 체크리스트
1. 클러스터 설정
- Quorum Queue 사용 (Classic Mirrored Queue 대체)
- vm_memory_high_watermark = 0.4
- disk_free_limit = 2GB
2. 연결 관리
- Connection Pool 사용
- Heartbeat 타임아웃: 60초
- 채널 재사용 (채널 생성 비용 높음)
3. 메시지 설정
- Persistent delivery mode
- Publisher Confirms 활성화
- Dead Letter Exchange 설정
4. 모니터링
- Management Plugin Dashboard
- rabbitmq_prometheus 플러그인
- Queue 깊이 알림 설정
10. 퀴즈
Q1. Kafka 4.0의 가장 큰 변화
**정답**: ZooKeeper가 완전히 제거되고 **KRaft (Kafka Raft)**가 이를 대체합니다.
- KRaft는 Raft 합의 알고리즘 기반의 메타데이터 관리 시스템입니다
- 메타데이터 전파 속도가 10배 향상되고, 파티션 수가 100만 개 이상으로 확장 가능합니다
- ZooKeeper 클러스터를 별도로 관리할 필요가 없어져 운영 복잡도가 크게 감소합니다
Q2. Pull vs Push 모델
**Pull 기반 (Kafka, SQS):**
- Consumer가 능동적으로 메시지를 가져감
- 장점: Consumer가 자신의 처리 속도에 맞춰 가져갈 수 있음 (자연스러운 배압)
- 단점: Long Polling이 필요하고 실시간성이 약간 떨어질 수 있음
**Push 기반 (RabbitMQ):**
- Broker가 Consumer에게 메시지를 밀어넣음
- 장점: 메시지 도착 즉시 전달 가능 (낮은 지연시간)
- 단점: Consumer가 과부하 시 별도의 배압 메커니즘 필요 (prefetch count 등)
Q3. Exactly-Once 보장
**정답:**
1. **enable.idempotence = true**: Producer가 동일 메시지를 중복 전송해도 한 번만 저장
2. **transactional.id 설정**: Producer 트랜잭션을 식별하는 고유 ID
3. **isolation.level = read_committed**: Consumer가 커밋된 트랜잭션 메시지만 읽음
이 세 가지를 조합하면 Producer에서 Consumer까지 end-to-end Exactly-Once가 보장됩니다.
Q4. SQS Standard vs FIFO
| 특성 | Standard | FIFO |
| --------- | ---------------------------- | ------------------------------- |
| 처리량 | 거의 무제한 | 초당 300 TPS (배치 시 3,000) |
| 순서 보장 | Best-effort (순서 보장 없음) | 엄격한 FIFO 순서 |
| 중복 처리 | At-Least-Once (중복 가능) | Exactly-Once (5분 내 중복 제거) |
| 가격 | 0.40 USD / 100만 건 | 0.50 USD / 100만 건 |
| 사용 사례 | 높은 처리량, 순서 무관 | 결제, 주문 등 순서 중요 |
Q5. Pulsar vs Kafka 아키텍처
**정답:**
1. **독립적 스케일링**: Broker(연산)와 BookKeeper(저장)를 별도로 확장할 수 있습니다. Kafka는 Broker가 데이터를 직접 저장하므로 저장 용량 확장 시 전체 Broker를 추가해야 합니다. Pulsar는 BookKeeper 노드만 추가하면 됩니다.
2. **Rebalancing 없는 확장**: Kafka에서 Broker를 추가하면 파티션 Rebalancing이 필요하여 대량의 데이터 이동이 발생합니다. Pulsar는 새 Broker가 즉시 토픽 소유권을 받을 수 있어 확장 시 서비스 영향이 최소화됩니다.
참고 자료
1. [Apache Kafka 4.0 Release Notes](https://kafka.apache.org/blog) — KRaft GA, Share Groups
2. [KIP-932: Queues for Kafka](https://cwiki.apache.org/confluence/display/KAFKA/KIP-932) — Share Group 상세 스펙
3. [RabbitMQ 4.0 Release Blog](https://www.rabbitmq.com/blog) — AMQP 1.0, Khepri
4. [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) — Stream Plugin 및 필터링
5. [Amazon SQS Developer Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/) — Standard vs FIFO
6. [Apache Pulsar 4.1 Release](https://pulsar.apache.org/blog) — 19 PIPs
7. [NATS 2.11 Release Notes](https://nats.io/blog/) — Message TTL, 분산 트레이싱
8. [Confluent Benchmark: Kafka vs Pulsar](https://www.confluent.io/blog/kafka-vs-pulsar-performance-comparison/) — 성능 비교
9. [Spring for Apache Kafka](https://spring.io/projects/spring-kafka) — Spring Kafka 공식 문서
10. [Spring AMQP](https://spring.io/projects/spring-amqp) — Spring RabbitMQ 연동
11. [Spring Cloud AWS SQS](https://docs.awspring.io/spring-cloud-aws/docs/current/reference/html/) — SQS 연동
12. [Spring for Apache Pulsar](https://spring.io/projects/spring-pulsar) — Spring Pulsar 공식 문서
13. [Designing Data-Intensive Applications (Martin Kleppmann)](https://dataintensive.net/) — 메시지 시스템 이론
14. [Enterprise Integration Patterns](https://www.enterpriseintegrationpatterns.com/) — 메시징 패턴 바이블
15. [Confluent Cloud Pricing](https://www.confluent.io/confluent-cloud/pricing/) — 클라우드 비용 참고
16. [AWS MSK Pricing](https://aws.amazon.com/msk/pricing/) — MSK 비용 참고
현재 단락 (1/794)
글로벌 메시지 큐 미들웨어 시장은 2025년 기준 약 **60억 달러(6B USD)** 규모로 성장했습니다. Fortune Business Insights 보고서에 따르면 기업의 ...