Skip to content
Published on

Message Queue Showdown: Kafka vs RabbitMQ vs SQS vs Pulsar vs NATS — 2025 Decision Guide

Authors

1. Why Message Queues?

Market Size and Adoption

The global message queue middleware market has grown to approximately 6 billion USD as of 2025. According to Fortune Business Insights, 41% of enterprises have already adopted message-queue-based asynchronous architectures, with an additional 27% planning to adopt them.

Why are so many companies embracing message queues?

Synchronous vs Asynchronous Communication

graph LR
    subgraph "Synchronous - Tight Coupling"
        A[Order Service] -->|HTTP call| B[Payment Service]
        B -->|HTTP call| C[Inventory Service]
        C -->|HTTP call| D[Notification Service]
    end

In synchronous communication, a 1-second delay in the payment service slows the entire chain by over 1 second. If one service dies, cascading failures occur.

graph LR
    subgraph "Asynchronous - Loose Coupling"
        A2[Order Service] -->|Publish event| MQ[Message Queue]
        MQ -->|Subscribe| B2[Payment Service]
        MQ -->|Subscribe| C2[Inventory Service]
        MQ -->|Subscribe| D2[Notification Service]
    end

In asynchronous communication, the order service only needs to publish a message. Even if the payment service goes down temporarily, the message is retained in the queue and processed after recovery.

5 Signs You Need a Message Queue

  1. Synchronous calls between services chain more than 3 levels deep
  2. Traffic spikes overload downstream services
  3. Event replay is needed (failure recovery, data reprocessing)
  4. Multiple consumers need to process the same data for different purposes
  5. Microservice migration requires reducing inter-service coupling

2. Architecture Comparison: Log vs Broker vs Hybrid

Message queues fall into three main architectural paradigms.

2.1 Apache Kafka — Distributed Commit Log

graph TB
    subgraph "Kafka Cluster"
        direction TB
        KR[KRaft Controller] --> B1[Broker 1]
        KR --> B2[Broker 2]
        KR --> B3[Broker 3]
    end
    P[Producer] --> B1
    B1 --> T["Topic: orders (3 partitions)"]
    T --> P0[Partition 0 - Leader: B1]
    T --> P1[Partition 1 - Leader: B2]
    T --> P2[Partition 2 - Leader: B3]
    P0 --> CG["Consumer Group"]
    P1 --> CG
    P2 --> CG

Key Characteristics:

  • Append-Only Commit Log: Messages are appended sequentially to partitions and are immutable
  • Consumer Offsets: Consumers manage their read positions, enabling replay
  • KRaft (Kafka 4.0): Complete removal of ZooKeeper with Raft-based metadata management
  • Retention Policy: Messages retained based on time or size (default 7 days)
  • Partition Parallelism: Scale consumers in parallel up to the number of partitions

Best For: Event streaming, log aggregation, event sourcing, real-time analytics

2.2 RabbitMQ — AMQP Message Broker

graph LR
    P[Producer] --> E["Exchange (type: topic)"]
    E -->|"routing.key.a"| Q1[Queue A]
    E -->|"routing.key.b"| Q2[Queue B]
    E -->|"routing.#"| Q3[Queue C - wildcard]
    Q1 --> C1[Consumer 1]
    Q2 --> C2[Consumer 2]
    Q3 --> C3[Consumer 3]

Key Characteristics:

  • AMQP Protocol: Flexible routing model with Exchanges, Bindings, and Queues
  • Exchange Types: Direct, Topic, Fanout, Headers for complex routing
  • Message ACK: Messages are removed from the queue only after consumer acknowledgment
  • Plugin Ecosystem: Shovel, Federation, Management UI, and more
  • Priority Queue: Support for message priority levels

Best For: Complex routing, RPC patterns, task distribution, existing AMQP ecosystems

2.3 Amazon SQS — Fully Managed Queue

graph LR
    P[Producer] -->|SendMessage| SQS["SQS Queue"]
    SQS -->|ReceiveMessage| C1[Consumer 1]
    SQS -->|ReceiveMessage| C2[Consumer 2]
    SQS -.->|On failure| DLQ["Dead Letter Queue"]

Key Characteristics:

  • Fully Managed: No server provisioning, patching, or scaling needed
  • Pull-Based: Consumers actively fetch messages
  • Standard vs FIFO: Unlimited throughput (Standard) or ordered delivery (FIFO, 300-3,000 TPS)
  • Visibility Timeout: Messages are hidden from other consumers during processing
  • Native DLQ Support: Automatic routing of failed messages

Best For: Serverless architecture, AWS-native workloads, minimal operational overhead

2.4 Apache Pulsar — Decoupled Architecture

graph TB
    subgraph "Pulsar Cluster"
        direction TB
        B1[Broker 1 - Serving] --> BK1[BookKeeper 1 - Storage]
        B2[Broker 2 - Serving] --> BK2[BookKeeper 2 - Storage]
        B3[Broker 3 - Serving] --> BK3[BookKeeper 3 - Storage]
    end
    P[Producer] --> B1
    B1 --> T["Topic: events"]
    T --> SUB["Subscription (Shared/Key_Shared/Exclusive)"]
    SUB --> C1[Consumer 1]
    SUB --> C2[Consumer 2]

Key Characteristics:

  • Compute/Storage Separation: Brokers (serving) and BookKeeper (storage) scale independently
  • Multi-Tenancy: Isolation at the namespace and tenant level
  • Geo-Replication: Native cross-datacenter replication
  • Tiered Storage: Automatic offloading of old data to low-cost storage like S3
  • Subscription Modes: Exclusive, Shared, Failover, Key_Shared

Best For: Multi-tenant environments, geo-distributed deployments, large-scale streaming

2.5 NATS — Lightweight Message Mesh

graph LR
    subgraph "NATS Cluster"
        N1[NATS Server 1] --- N2[NATS Server 2]
        N2 --- N3[NATS Server 3]
        N1 --- N3
    end
    P[Publisher] --> N1
    N1 -->|"subject: sensor.temp"| S1[Subscriber 1]
    N2 -->|"subject: sensor.*"| S2[Subscriber 2 - wildcard]
    N3 -->|JetStream| JS[JetStream Consumer]

Key Characteristics:

  • Core NATS: Fire-and-forget Pub/Sub for minimal latency
  • JetStream: Enable when persistence, replay, and exactly-once are needed
  • Lightweight Binary: Single binary of about 20MB, written in Go
  • Subject-Based Routing: Hierarchical subject names with wildcard matching
  • Leaf Nodes: Lightweight node extension for edge computing

Best For: IoT, edge computing, fast inter-service communication, cloud-native

2.6 Redis Streams — In-Memory Log Structure

graph LR
    P[Producer] -->|XADD| RS["Redis Stream (mystream)"]
    RS -->|XREADGROUP| CG["Consumer Group"]
    CG --> C1["Consumer 1 (pending entries)"]
    CG --> C2["Consumer 2 (pending entries)"]

Key Characteristics:

  • In-Memory: Ultra-low latency, high throughput
  • Consumer Groups: Similar consumer group model to Kafka
  • XADD/XREAD: Simple command-based API
  • Pending Entries List (PEL): Tracks unacknowledged messages
  • MAXLEN Trimming: Automatic cleanup within memory limits

Best For: Lightweight streaming, environments already using Redis, cache + queue integration

Architecture Paradigm Summary

SystemParadigmMessage StorageConsumption ModelMessage Deletion
KafkaDistributed Commit LogDisk (retention policy)Pull + OffsetAfter retention period
RabbitMQMessage BrokerMemory + DiskPush (delete on ACK)Immediately on ACK
SQSManaged QueueAWS managedPullAfter processing
PulsarDecoupled LogBookKeeper + TieredPull + CursorRetention-policy based
NATSMessage MeshMemory / JetStreamPush / PullPolicy-based
Redis StreamsIn-Memory LogMemory (AOF optional)Pull (Consumer Group)MAXLEN trimming

3. Performance Benchmarks

Throughput and Latency Comparison

The following benchmarks are measured on standard 3-node cluster environments (m5.2xlarge or equivalent).

SystemThroughput (msgs/s)P50 LatencyP99 LatencyBest For
Kafka500K - 1M5 - 15ms10 - 50msEvent streams, replay, log aggregation
Pulsar1M - 2.6M5 - 10ms300x better P99 tail latency vs KafkaMulti-tenancy, geo-replication
NATS200K - 400Ksub-ms1 - 5msIoT, edge, fast messaging
RabbitMQ50K - 100K1 - 5ms5 - 20msComplex routing, RPC
Redis Streams100K - 500Ksub-ms1 - 3msLightweight streaming, cache integration
SQS3K - 30K (Standard)20 - 50ms50 - 100msServerless, AWS native

Throughput Deep Dive

The secret behind Kafka's 1M msgs/s:

# Kafka throughput optimization configuration
num.partitions=12
batch.size=65536          # 64KB batch
linger.ms=5               # Wait 5ms before sending batch
compression.type=lz4      # LZ4 compression
buffer.memory=67108864    # 64MB buffer
acks=1                    # Leader-only ACK (max throughput)
  • With 12 partitions, each partition handles approximately 83K msgs/s
  • Changing to acks=all reduces throughput by 30-40% but ensures data durability
  • LZ4 compression saves 40-60% network bandwidth

Why Pulsar excels at P99:

  • Broker and BookKeeper separation means write I/O and read I/O don't compete
  • BookKeeper's Journal (WAL) and Ledger (data) use physically separate disks
  • Result: Up to 300x improvement in P99 tail latency compared to Kafka

NATS sub-ms requirements:

# NATS Core (JetStream disabled)
# - No persistence, in-memory only
# - Message size under 10KB
# - Communication within a single datacenter

Scalability Comparison

SystemHorizontal ScalingPartition/Queue LimitMax Cluster Size
KafkaAdd partitions + brokersRecommended 4,000 - 20,000/clusterHundreds of brokers
PulsarIndependent broker/BookKeeper scaling1M+ topics possibleThousands of nodes
NATSLeaf node expansionUnlimited (subject-based)Super cluster
RabbitMQQuorum queues + node additionRecommended thousandsDozens of nodes
Redis StreamsRedis Cluster shardingOne stream per shardHundreds of nodes
SQSAutomatic (unlimited)UnlimitedAWS managed

4. 2025 New Feature Highlights

4.1 Kafka 4.0 — KRaft GA and Queue Mode

KRaft GA (KIP-833):

ZooKeeper has been completely removed in Kafka 4.0. All metadata management is now handled by KRaft controllers.

# Kafka 4.0 KRaft mode configuration
process.roles: controller,broker
node.id: 1
controller.quorum.voters: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners: PLAINTEXT://:9092,CONTROLLER://:9093
# All ZooKeeper-related configuration removed

Improvements:

  • 10x faster metadata propagation
  • Partition count scalable beyond 1 million
  • Drastically reduced operational complexity (no separate ZooKeeper cluster)
  • Cluster startup time reduced from minutes to seconds

KIP-932: Queues for Kafka (Share Groups):

Traditional "queue" semantics have been added to Kafka. Previously, messages within a consumer group could only be distributed at the partition level. With Share Groups, messages can be distributed at the individual message level.

// Kafka 4.0 Share Group example
Properties props = new Properties();
props.put("group.id", "my-share-group");
props.put("group.type", "share");  // New Share type

KafkaShareConsumer<String, String> consumer =
    new KafkaShareConsumer<>(props);
consumer.subscribe(List.of("orders"));

// Distributed per message, not bound to partitions
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
        consumer.acknowledge(record);  // Per-message ACK
    }
}

4.2 RabbitMQ 4.x — SQL Filters and AMQP 1.0

Native AMQP 1.0 Support:

Starting with RabbitMQ 4.0, AMQP 1.0 has been promoted to a core protocol. The existing AMQP 0.9.1 transitions to a legacy plugin.

Stream Filtering (4.2):

// RabbitMQ 4.2 Stream SQL Filter
// Server-side filtering saves network bandwidth
Consumer consumer = environment.consumerBuilder()
    .stream("events")
    .filter()
        .values("region-us", "priority-high")  // Server-side filter
        .postFilter(msg -> msg.getProperties().getSubject().equals("order"))
    .builder()
    .messageHandler((context, message) -> {
        // Only filtered messages received
        processMessage(message);
        context.storeOffset();
    })
    .build();

Performance: Maintains 4M+ msgs/s even with filtering enabled

Khepri Metadata Store:

A Raft-based metadata store replacing Mnesia, significantly improving stability in large-scale clusters.

4.3 Pulsar 4.1 — 19 PIPs

Apache Pulsar 4.1 includes 19 Pulsar Improvement Proposals (PIPs).

Key changes:

  • Java 17 Required: Java 8/11 support dropped
  • PIP-354: Topic Policy performance optimization
  • PIP-362: Broker Level Dispatch Throttling
  • PIP-374: Improved memory management reducing GC overhead

4.4 NATS 2.11 — Message TTL and Distributed Tracing

# NATS 2.11 JetStream message TTL configuration
nats stream add EVENTS \
  --subjects "events.>" \
  --max-msg-ttl 3600 \
  --storage file \
  --replicas 3

Key changes:

  • Message-Level TTL: Set expiration times on individual messages, not entire streams
  • Distributed Tracing: OpenTelemetry integration for message flow tracking
  • SubjectTransform: Dynamic subject name transformation
  • Improved Monitoring: Extended Prometheus metrics

5. Managed Cloud Services Comparison + Pricing

Major Managed Services

ServiceUnderlying TechProviderKey Features
Confluent CloudKafkaConfluentSchema Registry, ksqlDB, Connectors
Amazon MSKKafkaAWSVPC integration, Serverless option
Amazon MSK ServerlessKafkaAWSAuto-scaling, pay-per-use
Amazon SQSProprietaryAWSFully managed, serverless integration
CloudAMQPRabbitMQ84codesDedicated/Shared instances
StreamNativePulsarStreamNativePulsar-specialized managed service
SynadiaNATSSynadiaNATS-specialized managed service

Monthly Cost Simulation (100M messages/day, 1KB average)

Scenario: 100M messages/day, 1KB average, 3-day retention

Confluent Cloud (Basic):
  - Throughput: ~1,160 msgs/s average
  - Estimated cost: ~800 - 1,200 USD/month
  - Includes: Brokers, storage, network

Amazon MSK Provisioned (kafka.m5.large x 3):
  - Instances: 0.21 USD/h x 3 = ~454 USD/month
  - Storage: 300GB x 0.10 USD/GB = 30 USD
  - Estimated total: ~500 - 700 USD/month

Amazon MSK Serverless:
  - Cluster: 0.75 USD/h = ~540 USD/month
  - Partitions: 0.0015 USD/h/partition
  - Storage: 0.10 USD/GB
  - Estimated total: ~600 - 1,000 USD/month

Amazon SQS Standard:
  - First 1M requests free
  - Then: 0.40 USD / 1M requests
  - ~3B requests/month: ~1,200 USD
  - Data transfer costs extra

CloudAMQP (Dedicated - Tiger):
  - ~500 - 1,000 USD/month
  - Cluster included, monitoring included

Self-Hosted Kafka (c5.2xlarge x 3 + EBS):
  - EC2: 0.34 USD/h x 3 = ~734 USD/month
  - EBS gp3 300GB x 3: ~72 USD/month
  - Operations cost extra (at least 1 DevOps engineer)
  - Estimated total: ~800 USD/month + ops costs

Selection Criteria Summary

  • Minimize cost: SQS for small scale (pay-per-use), self-hosted Kafka for large scale
  • Minimize ops overhead: SQS or MSK Serverless
  • Maximize features: Confluent Cloud (Schema Registry, ksqlDB)
  • Need RabbitMQ: CloudAMQP

6. Spring Boot Integration Code

6.1 Kafka + Spring Boot

// build.gradle
// implementation 'org.springframework.kafka:spring-kafka:3.3.0'

// application.yml
// spring:
//   kafka:
//     bootstrap-servers: localhost:9092
//     producer:
//       key-serializer: org.apache.kafka.common.serialization.StringSerializer
//       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
//     consumer:
//       group-id: order-service
//       auto-offset-reset: earliest
//       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// KafkaProducerService.java
@Service
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public CompletableFuture<SendResult<String, OrderEvent>> sendOrderEvent(
            OrderEvent event) {
        return kafkaTemplate.send("orders", event.getOrderId(), event);
    }
}

// KafkaConsumerService.java
@Service
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(
        topics = "orders",
        groupId = "order-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset) {

        log.info("Received order: {} from partition: {}, offset: {}",
            event.getOrderId(), partition, offset);
        processOrder(event);
    }

    private void processOrder(OrderEvent event) {
        // Order processing logic
    }
}

6.2 RabbitMQ + Spring Boot

// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-amqp'

// application.yml
// spring:
//   rabbitmq:
//     host: localhost
//     port: 5672
//     username: guest
//     password: guest
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {

    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String ORDER_QUEUE = "order.queue";
    public static final String ORDER_ROUTING_KEY = "order.created";
    public static final String DLQ_QUEUE = "order.dlq";

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE);
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
            .withArgument("x-dead-letter-exchange", "")
            .withArgument("x-dead-letter-routing-key", DLQ_QUEUE)
            .withArgument("x-message-ttl", 60000)
            .build();
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_QUEUE).build();
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, TopicExchange orderExchange) {
        return BindingBuilder.bind(orderQueue)
            .to(orderExchange)
            .with(ORDER_ROUTING_KEY);
    }
}

// RabbitMQProducer.java
@Service
@RequiredArgsConstructor
public class RabbitMQProducer {

    private final RabbitTemplate rabbitTemplate;

    public void sendOrderEvent(OrderEvent event) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.ORDER_EXCHANGE,
            RabbitMQConfig.ORDER_ROUTING_KEY,
            event,
            message -> {
                message.getMessageProperties().setContentType("application/json");
                message.getMessageProperties().setDeliveryMode(
                    MessageDeliveryMode.PERSISTENT);
                return message;
            }
        );
    }
}

// RabbitMQConsumer.java
@Service
@Slf4j
public class RabbitMQConsumer {

    @RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)
    public void handleOrderEvent(OrderEvent event, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            log.info("Received order: {}", event.getOrderId());
            processOrder(event);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error("Failed to process order: {}", event.getOrderId(), e);
            channel.basicNack(tag, false, false); // Move to DLQ
        }
    }
}

6.3 SQS + Spring Boot

// build.gradle
// implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs:3.2.0'

// application.yml
// spring:
//   cloud:
//     aws:
//       region:
//         static: ap-northeast-2
//       sqs:
//         endpoint: https://sqs.ap-northeast-2.amazonaws.com
// SQSProducer.java
@Service
@RequiredArgsConstructor
public class SQSProducer {

    private final SqsTemplate sqsTemplate;

    public void sendOrderEvent(OrderEvent event) {
        sqsTemplate.send(to -> to
            .queue("order-queue")
            .payload(event)
            .header("eventType", "ORDER_CREATED")
        );
    }

    // FIFO queue sending
    public void sendOrderEventFifo(OrderEvent event) {
        sqsTemplate.send(to -> to
            .queue("order-queue.fifo")
            .payload(event)
            .header(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER,
                event.getOrderId())
            .header(SqsHeaders.MessageSystemAttributes
                .SQS_MESSAGE_DEDUPLICATION_ID_HEADER,
                UUID.randomUUID().toString())
        );
    }
}

// SQSConsumer.java
@Service
@Slf4j
public class SQSConsumer {

    @SqsListener(value = "order-queue", maxConcurrentMessages = "10",
        maxMessagesPerPoll = "5")
    public void handleOrderEvent(@Payload OrderEvent event,
            @Header("eventType") String eventType) {
        log.info("Received order: {} with type: {}",
            event.getOrderId(), eventType);
        processOrder(event);
    }
}

6.4 Pulsar + Spring Boot

// build.gradle
// implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:1.2.0'

// application.yml
// spring:
//   pulsar:
//     client:
//       service-url: pulsar://localhost:6650
//     producer:
//       topic-name: persistent://public/default/orders
//     consumer:
//       subscription-name: order-service-sub
//       subscription-type: shared
// PulsarProducer.java
@Service
@RequiredArgsConstructor
public class PulsarProducer {

    private final PulsarTemplate<OrderEvent> pulsarTemplate;

    public void sendOrderEvent(OrderEvent event) {
        pulsarTemplate.send("persistent://public/default/orders", event);
    }
}

// PulsarConsumer.java
@Service
@Slf4j
public class PulsarConsumer {

    @PulsarListener(
        topics = "persistent://public/default/orders",
        subscriptionName = "order-service-sub",
        subscriptionType = SubscriptionType.Shared,
        schemaType = SchemaType.JSON
    )
    public void handleOrderEvent(OrderEvent event) {
        log.info("Received order: {}", event.getOrderId());
        processOrder(event);
    }
}

6.5 NATS + Spring Boot

// build.gradle
// implementation 'io.nats:jnats:2.20.4'
// NATSConfig.java
@Configuration
public class NATSConfig {

    @Bean
    public Connection natsConnection() throws IOException, InterruptedException {
        Options options = new Options.Builder()
            .server("nats://localhost:4222")
            .reconnectWait(Duration.ofSeconds(2))
            .maxReconnects(-1) // Unlimited reconnects
            .connectionListener((conn, type) -> {
                System.out.println("NATS connection event: " + type);
            })
            .build();
        return Nats.connect(options);
    }

    @Bean
    public JetStream jetStream(Connection connection) throws IOException {
        return connection.jetStream();
    }
}

// NATSProducer.java
@Service
@RequiredArgsConstructor
public class NATSProducer {

    private final JetStream jetStream;
    private final ObjectMapper objectMapper;

    public PublishAck sendOrderEvent(OrderEvent event) throws Exception {
        byte[] data = objectMapper.writeValueAsBytes(event);
        Message msg = NatsMessage.builder()
            .subject("orders.created")
            .data(data)
            .build();
        return jetStream.publish(msg);
    }
}

// NATSConsumer.java
@Service
@Slf4j
public class NATSConsumer {

    @PostConstruct
    public void startConsumer() throws Exception {
        JetStream js = natsConnection.jetStream();

        PushSubscribeOptions options = PushSubscribeOptions.builder()
            .durable("order-consumer")
            .build();

        js.subscribe("orders.>", "order-queue", dispatcher -> {
            dispatcher.onMessage(msg -> {
                try {
                    OrderEvent event = objectMapper.readValue(
                        msg.getData(), OrderEvent.class);
                    log.info("Received order: {}", event.getOrderId());
                    processOrder(event);
                    msg.ack();
                } catch (Exception e) {
                    log.error("Failed to process message", e);
                    msg.nak();
                }
            });
        }, false, options);
    }
}

6.6 Redis Streams + Spring Boot

// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-redis'
// RedisStreamConfig.java
@Configuration
public class RedisStreamConfig {

    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
            streamListenerContainer(RedisConnectionFactory factory) {

        var options = StreamMessageListenerContainer
            .StreamMessageListenerContainerOptions.builder()
            .pollTimeout(Duration.ofSeconds(1))
            .batchSize(10)
            .targetType(MapRecord.class)
            .build();

        var container = StreamMessageListenerContainer.create(factory, options);

        container.receiveAutoAck(
            Consumer.from("order-group", "consumer-1"),
            StreamOffset.create("orders", ReadOffset.lastConsumed()),
            new OrderStreamListener()
        );

        container.start();
        return container;
    }
}

// RedisStreamProducer.java
@Service
@RequiredArgsConstructor
public class RedisStreamProducer {

    private final StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;

    public RecordId sendOrderEvent(OrderEvent event) throws Exception {
        Map<String, String> fields = Map.of(
            "orderId", event.getOrderId(),
            "type", "ORDER_CREATED",
            "payload", objectMapper.writeValueAsString(event)
        );
        return redisTemplate.opsForStream()
            .add(StreamRecords.newRecord()
                .ofMap(fields)
                .withStreamKey("orders"));
    }
}

// OrderStreamListener.java
@Slf4j
public class OrderStreamListener
        implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        Map<String, String> body = message.getValue();
        log.info("Received order: {} from stream: {}",
            body.get("orderId"), message.getStream());
        processOrder(body);
    }
}

7. Decision Flowchart

A decision tree for choosing a message queue system.

graph TD
    START[Choose a Message Queue] --> Q1["Need event replay?"]

    Q1 -->|Yes| Q1A["Need multi-tenancy or geo-replication?"]
    Q1A -->|Yes| PULSAR["Apache Pulsar"]
    Q1A -->|No| KAFKA["Apache Kafka"]

    Q1 -->|No| Q2["Need complex routing?"]
    Q2 -->|Yes| RABBIT["RabbitMQ"]

    Q2 -->|No| Q3["AWS-native serverless?"]
    Q3 -->|Yes| SQS["Amazon SQS"]

    Q3 -->|No| Q4["Need sub-ms ultra-low latency?"]
    Q4 -->|Yes| Q4A["Need persistence?"]
    Q4A -->|Yes| NATS["NATS + JetStream"]
    Q4A -->|No| NATS_CORE["NATS Core"]

    Q4 -->|No| Q5["Already using Redis?"]
    Q5 -->|Yes| REDIS["Redis Streams"]
    Q5 -->|No| KAFKA2["Kafka (default choice)"]

Quick Selection Guide

RequirementRecommended SystemReason
Event streaming + replayKafkaCommit-log based, industry standard
Complex message routingRabbitMQExchange/Binding model
Serverless + all-in on AWSSQSZero ops, Lambda triggers
Multi-tenancy + geo-replicationPulsarNative multi-tenancy
IoT + edge computingNATSLightweight, sub-ms latency
Already using RedisRedis StreamsNo additional infrastructure
High-volume events + low costKafka (self-hosted)High throughput per dollar
No operations teamSQS or Confluent CloudFully managed

8. Real-World Architecture Patterns

8.1 E-Commerce: Order Processing Pipeline

graph LR
    subgraph "Order Creation"
        API[API Gateway] --> OS[Order Service]
    end

    OS -->|OrderCreated| KAFKA[Kafka - orders topic]

    KAFKA --> PAY[Payment Service]
    KAFKA --> INV[Inventory Service]
    KAFKA --> NOTIF[Notification Service]

    PAY -->|PaymentCompleted| KAFKA2[Kafka - payments topic]
    KAFKA2 --> SHIP[Shipping Service]

    INV -->|StockReserved| KAFKA3[Kafka - inventory topic]
    KAFKA3 --> OS2[Order Service - status update]

    SHIP -->|ShipmentCreated| KAFKA4[Kafka - shipments topic]
    KAFKA4 --> NOTIF2[Notification Service - shipping alerts]

Why Kafka?

  • Order event replay needed (reprocessing on payment failures)
  • Multiple services consume the same event independently
  • Can serve as an audit log

Key Configuration:

# Order topic configuration
topics:
  orders:
    partitions: 12
    replication-factor: 3
    configs:
      retention.ms: 604800000 # 7 days
      min.insync.replicas: 2
      cleanup.policy: delete

  payments:
    partitions: 6
    replication-factor: 3
    configs:
      retention.ms: 2592000000 # 30 days (audit log)
      min.insync.replicas: 2

8.2 Real-Time Analytics Pipeline

graph LR
    subgraph "Data Collection"
        WEB[Web Clickstream] --> KAFKA[Kafka]
        APP[App Events] --> KAFKA
        IOT[IoT Sensors] --> NATS[NATS]
        NATS -->|Bridge| KAFKA
    end

    subgraph "Stream Processing"
        KAFKA --> FLINK[Apache Flink]
        KAFKA --> KSQL[ksqlDB]
    end

    subgraph "Storage and Serving"
        FLINK --> ES[Elasticsearch]
        FLINK --> CH[ClickHouse]
        KSQL --> REDIS[Redis Cache]
    end

    subgraph "Visualization"
        ES --> GRAFANA[Grafana]
        CH --> SUPERSET[Apache Superset]
    end

Why the Kafka + NATS combination?

  • Kafka: High-throughput event collection and Flink/ksqlDB integration
  • NATS: Meets ultra-low latency requirements for IoT sensors, lightweight agents
  • Bridge: Consolidates NATS-collected data into Kafka

8.3 IoT Sensor Data Collection

graph TB
    subgraph "Edge Layer"
        S1[Temperature Sensor] --> LN1[NATS Leaf Node - Factory A]
        S2[Humidity Sensor] --> LN1
        S3[Vibration Sensor] --> LN2[NATS Leaf Node - Factory B]
    end

    subgraph "Core Layer"
        LN1 --> NATS[NATS Cluster - JetStream]
        LN2 --> NATS
        NATS --> PROC[Stream Processor]
        PROC -->|Anomaly detection| ALERT[Alert Service - RabbitMQ]
        PROC -->|Aggregated data| TS[TimescaleDB]
    end

    ALERT -->|Email| EMAIL[Email Service]
    ALERT -->|Slack| SLACK[Slack Notification]

Why the NATS + RabbitMQ combination?

  • NATS: Extends to the edge with Leaf Nodes, sub-ms latency
  • JetStream: Sensor data persistence and replay
  • RabbitMQ: Flexible alert routing (email, Slack, SMS distribution)

8.4 Saga Pattern — Distributed Transactions

sequenceDiagram
    participant OS as Order Service
    participant K as Kafka
    participant PS as Payment Service
    participant IS as Inventory Service
    participant SS as Shipping Service

    OS->>K: OrderCreated
    K->>PS: Payment request
    PS->>K: PaymentCompleted
    K->>IS: Deduct inventory
    IS->>K: StockReserved
    K->>SS: Create shipment
    SS->>K: ShipmentCreated
    K->>OS: Order complete

    Note over PS,IS: Compensating transactions on failure
    PS-->>K: PaymentFailed (compensate)
    K-->>IS: Restore inventory (compensate)
    K-->>OS: Cancel order (compensate)

Choreography vs Orchestration:

ApproachProsConsRecommended Scenario
ChoreographyLow coupling, easy to scaleHard to trace flowSimple workflows
OrchestrationCentral control, clear flowOrchestrator as single point of failureComplex business logic

9. Operations Checklist

Common Monitoring Metrics

MetricDescriptionWarning Threshold
Consumer LagNumber of unconsumed messagesSustained above 10K
Message Failure RateDLQ ingestion rateAbove 1%
Broker Disk UsageStorage capacityAbove 80%
Network I/OIn/Out trafficAbove 70% of bandwidth
JVM GC TimeKafka/Pulsar/RabbitMQFull GC over 5 seconds

Kafka Production Checklist

1. Broker Configuration
   - min.insync.replicas = 2 (with replication-factor 3)
   - unclean.leader.election.enable = false
   - auto.create.topics.enable = false

2. Producer Configuration
   - acks = all (prevent data loss)
   - retries = Integer.MAX_VALUE
   - enable.idempotence = true
   - max.in.flight.requests.per.connection = 5

3. Consumer Configuration
   - enable.auto.commit = false (manual commits)
   - max.poll.records = 500
   - session.timeout.ms = 30000

4. Monitoring
   - Prometheus + Grafana (JMX Exporter)
   - Consumer Lag alerts
   - Under-Replicated Partitions alerts

RabbitMQ Production Checklist

1. Cluster Configuration
   - Use Quorum Queues (replaces Classic Mirrored Queues)
   - vm_memory_high_watermark = 0.4
   - disk_free_limit = 2GB

2. Connection Management
   - Use Connection Pools
   - Heartbeat timeout: 60 seconds
   - Reuse channels (channel creation is expensive)

3. Message Configuration
   - Persistent delivery mode
   - Enable Publisher Confirms
   - Configure Dead Letter Exchange

4. Monitoring
   - Management Plugin Dashboard
   - rabbitmq_prometheus plugin
   - Queue depth alerts

10. Quiz

Q1. The Biggest Change in Kafka 4.0

What core component was removed in Kafka 4.0, and what technology replaces it?

Answer: ZooKeeper was completely removed and replaced by KRaft (Kafka Raft).

  • KRaft is a Raft consensus algorithm-based metadata management system
  • Metadata propagation speed improved by 10x, partition count scalable beyond 1 million
  • No need to manage a separate ZooKeeper cluster, greatly reducing operational complexity

Q2. Pull vs Push Model

What are the differences, pros, and cons between pull-based consumption (Kafka, SQS) and push-based consumption (RabbitMQ)?

Pull-Based (Kafka, SQS):

  • Consumers actively fetch messages
  • Pros: Consumers can fetch at their own processing speed (natural backpressure)
  • Cons: Requires long polling, slightly lower real-time capability

Push-Based (RabbitMQ):

  • Broker pushes messages to consumers
  • Pros: Immediate delivery on message arrival (low latency)
  • Cons: Requires separate backpressure mechanisms when consumers are overloaded (prefetch count, etc.)

Q3. Exactly-Once Semantics

What 3 configurations are needed to achieve Exactly-Once Semantics (EOS) in Kafka?

Answer:

  1. enable.idempotence = true: Even if a producer sends duplicate messages, only one is stored
  2. transactional.id setting: A unique ID that identifies producer transactions
  3. isolation.level = read_committed: Consumers only read committed transaction messages

Combining all three guarantees end-to-end Exactly-Once from producer to consumer.

Q4. SQS Standard vs FIFO

What are the differences between Amazon SQS Standard and FIFO queues in terms of throughput, ordering, and deduplication?
FeatureStandardFIFO
ThroughputNearly unlimited300 TPS (3,000 with batching)
OrderingBest-effort (no guarantee)Strict FIFO ordering
DeduplicationAt-Least-Once (duplicates possible)Exactly-Once (5-minute dedup window)
Price0.40 USD / 1M requests0.50 USD / 1M requests
Use CaseHigh throughput, order-independentPayments, orders where sequence matters

Q5. Pulsar vs Kafka Architecture

What are 2 operational advantages of Pulsar's compute/storage separation architecture compared to Kafka?

Answer:

  1. Independent Scaling: Brokers (compute) and BookKeeper (storage) can be scaled separately. In Kafka, brokers store data directly, so scaling storage requires adding entire brokers. In Pulsar, you just add BookKeeper nodes.

  2. Expansion Without Rebalancing: When adding a broker in Kafka, partition rebalancing is needed, causing massive data movement. In Pulsar, new brokers can immediately take topic ownership, minimizing service impact during scaling.


References

  1. Apache Kafka 4.0 Release Notes — KRaft GA, Share Groups
  2. KIP-932: Queues for Kafka — Share Group specification
  3. RabbitMQ 4.0 Release Blog — AMQP 1.0, Khepri
  4. RabbitMQ Streams — Stream Plugin and filtering
  5. Amazon SQS Developer Guide — Standard vs FIFO
  6. Apache Pulsar 4.1 Release — 19 PIPs
  7. NATS 2.11 Release Notes — Message TTL, distributed tracing
  8. Confluent Benchmark: Kafka vs Pulsar — Performance comparison
  9. Spring for Apache Kafka — Official Spring Kafka docs
  10. Spring AMQP — Spring RabbitMQ integration
  11. Spring Cloud AWS SQS — SQS integration
  12. Spring for Apache Pulsar — Official Spring Pulsar docs
  13. Designing Data-Intensive Applications (Martin Kleppmann) — Message system theory
  14. Enterprise Integration Patterns — Messaging pattern bible
  15. Confluent Cloud Pricing — Cloud cost reference
  16. AWS MSK Pricing — MSK cost reference