Skip to content
Published on

メッセージキュー完全比較:Kafka vs RabbitMQ vs SQS vs Pulsar vs NATS — 2025年実践選択ガイド

Authors

1. なぜメッセージキューなのか?

市場規模と導入状況

グローバルメッセージキューミドルウェア市場は、2025年基準で約**60億ドル(6B USD)規模に成長しました。Fortune Business Insightsのレポートによると、企業の41%**がすでにメッセージキュー基盤の非同期アーキテクチャを導入しており、追加で27%が導入を計画しています。

なぜこれほど多くの企業がメッセージキューを採用しているのでしょうか?

同期 vs 非同期通信

graph LR
    subgraph "同期方式 - 強い結合"
        A[注文サービス] -->|HTTP呼出| B[決済サービス]
        B -->|HTTP呼出| C[在庫サービス]
        C -->|HTTP呼出| D[通知サービス]
    end

同期方式(どうきほうしき)では、決済(けっさい)サービスが1秒遅延(ちえん)するとチェーン全体が1秒以上遅くなります。一つが停止(ていし)すると連鎖障害(れんさしょうがい)が発生します。

graph LR
    subgraph "非同期方式 - 緩い結合"
        A2[注文サービス] -->|イベント発行| MQ[メッセージキュー]
        MQ -->|購読| B2[決済サービス]
        MQ -->|購読| C2[在庫サービス]
        MQ -->|購読| D2[通知サービス]
    end

非同期方式(ひどうきほうしき)では、注文サービスがメッセージを発行(はっこう)するだけで済みます。決済サービスが一時的にダウンしてもメッセージはキューに保管(ほかん)され、復旧後に処理されます。

メッセージキュー導入が必要な5つのサイン

  1. サービス間の同期呼出が3段階以上チェーンしている場合
  2. トラフィックスパイクによりダウンストリームサービスが過負荷になる場合
  3. イベントリプレイが必要な場合(障害復旧、データ再処理)
  4. 多数のコンシューマーが同じデータを異なる目的で処理する場合
  5. マイクロサービス移行時にサービス間の結合度を下げたい場合

2. アーキテクチャ比較:ログ vs ブローカー vs ハイブリッド

メッセージキューは大きく3つのアーキテクチャパラダイムに分類されます。

2.1 Apache Kafka — 分散コミットログ

graph TB
    subgraph "Kafkaクラスタ"
        direction TB
        KR[KRaftコントローラー] --> B1[Broker 1]
        KR --> B2[Broker 2]
        KR --> B3[Broker 3]
    end
    P[Producer] --> B1
    B1 --> T["Topic: orders(3パーティション)"]
    T --> P0[Partition 0 - Leader: B1]
    T --> P1[Partition 1 - Leader: B2]
    T --> P2[Partition 2 - Leader: B3]
    P0 --> CG["Consumer Group"]
    P1 --> CG
    P2 --> CG

主要特徴(しゅようとくちょう):

  • Append-Onlyコミットログ:メッセージはパーティションに順序(じゅんじょ)通り追加され、不変(ふへん、immutable)です
  • Consumer Offset:コンシューマーがどこまで読んだかを自己管理(じこかんり) — リプレイ可能
  • KRaft(Kafka 4.0):ZooKeeperを完全に除去(じょきょ)し、Raft基盤のメタデータ管理
  • 保持ポリシー:時間またはサイズに基づいてメッセージを保持(デフォルト7日)
  • パーティション並列処理:パーティション数だけコンシューマーを並列(へいれつ)に拡張

適した用途:イベントストリーミング、ログ集約(しゅうやく)、イベントソーシング、リアルタイム分析

2.2 RabbitMQ — AMQPメッセージブローカー

graph LR
    P[Producer] --> E["Exchange(type: topic)"]
    E -->|"routing.key.a"| Q1[Queue A]
    E -->|"routing.key.b"| Q2[Queue B]
    E -->|"routing.#"| Q3[Queue C - ワイルドカード]
    Q1 --> C1[Consumer 1]
    Q2 --> C2[Consumer 2]
    Q3 --> C3[Consumer 3]

主要特徴:

  • AMQPプロトコル:Exchange、Binding、Queueの柔軟(じゅうなん)なルーティングモデル
  • Exchange種類:Direct、Topic、Fanout、Headers — 複雑なルーティングが可能
  • メッセージACK:Consumerが確認応答(かくにんおうとう)を送ってからメッセージがキューから削除
  • プラグインエコシステム:Shovel、Federation、Management UIなど
  • Priority Queue:メッセージ優先度(ゆうせんど)の設定が可能

適した用途:複雑なルーティング、RPCパターン、タスク分配、既存AMQPエコシステム

2.3 Amazon SQS — フルマネージドキュー

graph LR
    P[Producer] -->|SendMessage| SQS["SQS Queue"]
    SQS -->|ReceiveMessage| C1[Consumer 1]
    SQS -->|ReceiveMessage| C2[Consumer 2]
    SQS -.->|失敗時| DLQ["Dead Letter Queue"]

主要特徴:

  • フルマネージド:サーバープロビジョニング、パッチ、スケーリング不要
  • Pull基盤:Consumerが能動的にメッセージを取得する構造
  • Standard vs FIFO:無制限スループット(Standard)または順序保証(FIFO、秒間300~3,000 TPS)
  • Visibility Timeout:メッセージ処理中に他のコンシューマーから見えなくなる
  • DLQネイティブサポート:処理失敗メッセージの自動移動

適した用途:サーバーレスアーキテクチャ、AWSネイティブワークロード、運用負担の最小化

2.4 Apache Pulsar — 分離型アーキテクチャ

graph TB
    subgraph "Pulsarクラスタ"
        direction TB
        B1[Broker 1 - サービング] --> BK1[BookKeeper 1 - ストレージ]
        B2[Broker 2 - サービング] --> BK2[BookKeeper 2 - ストレージ]
        B3[Broker 3 - サービング] --> BK3[BookKeeper 3 - ストレージ]
    end
    P[Producer] --> B1
    B1 --> T["Topic: events"]
    T --> SUB["Subscription(Shared/Key_Shared/Exclusive)"]
    SUB --> C1[Consumer 1]
    SUB --> C2[Consumer 2]

主要特徴:

  • Compute/Storage分離:Broker(サービング)とBookKeeper(ストレージ)が独立してスケーリング
  • マルチテナンシー:ネームスペースとテナント単位の隔離(かくり)
  • Geo-Replication:ネイティブなクロスデータセンター複製
  • Tiered Storage:古いデータをS3のような低コストストレージに自動移管
  • 多様なサブスクリプションモード:Exclusive、Shared、Failover、Key_Shared

適した用途:マルチテナント環境、地域分散、大規模ストリーミング

2.5 NATS — 軽量メッセージメッシュ

graph LR
    subgraph "NATSクラスタ"
        N1[NATS Server 1] --- N2[NATS Server 2]
        N2 --- N3[NATS Server 3]
        N1 --- N3
    end
    P[Publisher] --> N1
    N1 -->|"subject: sensor.temp"| S1[Subscriber 1]
    N2 -->|"subject: sensor.*"| S2[Subscriber 2 - ワイルドカード]
    N3 -->|JetStream| JS[JetStream Consumer]

主要特徴:

  • Core NATS:最小限のレイテンシーのためのfire-and-forget Pub/Sub
  • JetStream:永続性(えいぞくせい)、リプレイ、exactly-onceが必要な場合に有効化
  • 軽量バイナリ:単一バイナリ約20MB、Goで記述(きじゅつ)
  • Subject基盤ルーティング:階層的なsubject名とワイルドカードマッチング
  • Leaf Node:エッジコンピューティングのための軽量ノード拡張

適した用途:IoT、エッジコンピューティング、マイクロサービス間の高速通信、クラウドネイティブ

2.6 Redis Streams — インメモリログ構造

graph LR
    P[Producer] -->|XADD| RS["Redis Stream(mystream)"]
    RS -->|XREADGROUP| CG["Consumer Group"]
    CG --> C1["Consumer 1(pending entries)"]
    CG --> C2["Consumer 2(pending entries)"]

主要特徴:

  • インメモリ:超低レイテンシー、高スループット
  • Consumer Group:Kafkaと類似(るいじ)したコンシューマーグループモデル
  • XADD/XREAD:シンプルなコマンドベースAPI
  • Pending Entries List(PEL):未確認メッセージの追跡(ついせき)
  • MAXLENトリミング:メモリ制限内での自動整理

適した用途:軽量ストリーミング、すでにRedisを使用している環境、キャッシュ+キュー統合

アーキテクチャパラダイム要約

システムパラダイムメッセージ保存消費モデルメッセージ削除
Kafka分散コミットログディスク(保持ポリシー)Pull + Offset保持期間後に削除
RabbitMQメッセージブローカーメモリ + ディスクPush(ACK後削除)ACK時に即時削除
SQSマネージドキューAWS管理型Pull処理後に削除
Pulsar分離型ログBookKeeper + TieredPull + カーソル保持ポリシー基盤
NATSメッセージメッシュメモリ / JetStreamPush / Pullポリシー基盤
Redis Streamsインメモリログメモリ(AOFオプション)Pull(Consumer Group)MAXLENトリミング

3. パフォーマンスベンチマーク

スループットとレイテンシーの比較

以下のベンチマークは標準的な3ノードクラスタ環境(m5.2xlargeまたは同等)での測定値です。

システムスループット(msgs/s)P50レイテンシーP99レイテンシー最適な用途
Kafka500K~1M5~15ms10~50msイベントストリーム、リプレイ、ログ集約
Pulsar1M~2.6M5~10msKafka比300倍改善されたP99テールレイテンシーマルチテナンシー、Geo複製
NATS200K~400Kサブミリ秒1~5msIoT、エッジ、高速メッセージング
RabbitMQ50K~100K1~5ms5~20ms複雑なルーティング、RPC
Redis Streams100K~500Kサブミリ秒1~3ms軽量ストリーミング、キャッシュ統合
SQS3K~30K(Standard)20~50ms50~100msサーバーレス、AWSネイティブ

スループット詳細分析

Kafka 1M msgs/sの秘密:

# Kafkaスループット最適化設定
num.partitions=12
batch.size=65536          # 64KBバッチ
linger.ms=5               # 5ms待機後バッチ送信
compression.type=lz4      # LZ4圧縮
buffer.memory=67108864    # 64MBバッファ
acks=1                    # Leaderのみ確認(最大スループット)
  • パーティション12個基準、各パーティションあたり約83K msgs/s
  • acks=allに変更すると約30~40%スループット減少(げんしょう)するがデータ安全性を確保
  • LZ4圧縮(あっしゅく)でネットワーク帯域幅(たいいきはば)40~60%節約

PulsarがP99で強い理由:

  • BrokerとBookKeeperの分離により書き込みI/Oと読み取りI/Oが競合(きょうごう)しない
  • BookKeeperのJournal(WAL)とLedger(データ)が物理的(ぶつりてき)に分離されたディスクを使用
  • 結果:Kafka比P99テールレイテンシーが最大300倍改善

NATSサブミリ秒の達成条件:

# NATS Core(JetStream無効化)
# - 永続性なし、インメモリのみ
# - 10KB以下のメッセージサイズ
# - 単一データセンター内の通信

スケーラビリティ比較

システム水平スケーリング方式パーティション/キュー上限クラスタ最大規模
Kafkaパーティション追加 + Broker追加実務推奨4,000~20,000個/クラスタ数百台のBroker
PulsarBroker/BookKeeper独立スケーリング100万+トピック可能数千台のノード
NATSLeaf Nodeの拡張制限なし(subject基盤)スーパークラスタ
RabbitMQQuorum Queue + ノード追加実務推奨数千個数十台のノード
Redis StreamsRedis Clusterシャーディングシャードあたり1つのStream数百台のノード
SQS自動(無制限)無制限AWSマネージド

4. 2025年新機能ハイライト

4.1 Kafka 4.0 — KRaft GAとキューモード

KRaft GA(KIP-833):

Kafka 4.0でZooKeeperが完全に除去されました。すべてのメタデータ管理がKRaftコントローラーに移管(いかん)されます。

# Kafka 4.0 KRaftモード設定
process.roles: controller,broker
node.id: 1
controller.quorum.voters: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners: PLAINTEXT://:9092,CONTROLLER://:9093
# ZooKeeper関連設定は完全除去

改善効果(かいぜんこうか):

  • メタデータ伝播(でんぱ)速度10倍向上
  • パーティション数100万個以上に拡張可能
  • 運用複雑度の大幅(おおはば)な削減(ZooKeeperクラスタの個別管理不要)
  • クラスタ起動時間が数分から数秒に短縮

KIP-932: Queues for Kafka(Share Groups):

Kafkaに伝統的な「キュー」セマンティクスが追加されました。従来はコンシューマーグループ内でもパーティション単位でのみ分配できましたが、Share Groupsを使えばメッセージ単位で分配(ぶんぱい)できます。

// Kafka 4.0 Share Group使用例
Properties props = new Properties();
props.put("group.id", "my-share-group");
props.put("group.type", "share");  // 新しいShareタイプ

KafkaShareConsumer<String, String> consumer =
    new KafkaShareConsumer<>(props);
consumer.subscribe(List.of("orders"));

// メッセージ単位で分配 - パーティションに依存しない
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
        consumer.acknowledge(record);  // 個別メッセージACK
    }
}

4.2 RabbitMQ 4.x — SQLフィルターとAMQP 1.0

ネイティブAMQP 1.0サポート:

RabbitMQ 4.0からAMQP 1.0がコアプロトコルに昇格(しょうかく)しました。既存のAMQP 0.9.1はレガシープラグインに移行(いこう)します。

Stream Filtering(4.2):

// RabbitMQ 4.2 Stream SQLフィルター
// サーバーサイドでフィルタリング - ネットワーク帯域幅を節約
Consumer consumer = environment.consumerBuilder()
    .stream("events")
    .filter()
        .values("region-us", "priority-high")  // サーバーサイドフィルター
        .postFilter(msg -> msg.getProperties().getSubject().equals("order"))
    .builder()
    .messageHandler((context, message) -> {
        // フィルタリングされたメッセージのみ受信
        processMessage(message);
        context.storeOffset();
    })
    .build();

処理性能(しょりせいのう):フィルタリング時でも4M+ msgs/sを維持

Khepriメタデータストア:

Mnesiaを置き換えるRaft基盤のメタデータストアで、大規模クラスタでの安定性が大幅に改善されました。

4.3 Pulsar 4.1 — 19 PIPs

Apache Pulsar 4.1は19のPulsar Improvement Proposals(PIPs)を含んでいます。

主要な変更:

  • Java 17必須:Java 8/11サポート終了
  • PIP-354:Topic Policyパフォーマンス最適化
  • PIP-362:Broker Level Dispatch Throttling
  • PIP-374:改善されたメモリ管理でGC負担(ふたん)を削減

4.4 NATS 2.11 — メッセージTTLと分散トレーシング

# NATS 2.11 JetStreamメッセージTTL設定
nats stream add EVENTS \
  --subjects "events.>" \
  --max-msg-ttl 3600 \
  --storage file \
  --replicas 3

主要な変更:

  • メッセージレベルTTL:ストリーム全体ではなく個別メッセージに有効期限(ゆうこうきげん)を設定
  • 分散トレーシング:OpenTelemetry統合によるメッセージフロー追跡
  • SubjectTransform:subject名の動的変換
  • 改善されたモニタリング:Prometheusメトリクス拡張

5. クラウドマネージドサービス比較 + 価格

主要マネージドサービス

サービス基盤技術提供元主要特徴
Confluent CloudKafkaConfluentSchema Registry、ksqlDB、Connectors
Amazon MSKKafkaAWSVPC統合、Serverlessオプション
Amazon MSK ServerlessKafkaAWS自動スケーリング、従量課金
Amazon SQS独自AWSフルマネージド、サーバーレス統合
CloudAMQPRabbitMQ84codesDedicated/Sharedインスタンス
StreamNativePulsarStreamNativePulsar専門マネージド
SynadiaNATSSynadiaNATS専門マネージド

月額コストシミュレーション(1日1億メッセージ、平均1KB)

シナリオ:1日1億メッセージ、平均1KB、3日間保持

Confluent Cloud(Basic):
  - スループット:平均約1,160 msgs/s
  - 予想コスト:月額約800~1,200 USD
  - 含む:ブローカー、ストレージ、ネットワーク

Amazon MSK Provisioned(kafka.m5.large x 3):
  - インスタンス:0.21 USD/h x 3 = 月額約454 USD
  - ストレージ:300GB x 0.10 USD/GB = 30 USD
  - 予想総コスト:月額約500~700 USD

Amazon MSK Serverless:
  - クラスタ:0.75 USD/h = 月額約540 USD
  - パーティション:0.0015 USD/h/partition
  - ストレージ:0.10 USD/GB
  - 予想総コスト:月額約600~1,000 USD

Amazon SQS Standard:
  - 最初の100万件は無料
  - 以降:0.40 USD / 100万件
  - 月約30億件:約1,200 USD
  - データ転送コスト別途

CloudAMQP(Dedicated - Tiger):
  - 月額約500~1,000 USD
  - クラスタ含む、モニタリング含む

自己ホスティングKafka(c5.2xlarge x 3 + EBS):
  - EC2:0.34 USD/h x 3 = 月額約734 USD
  - EBS gp3 300GB x 3:月額約72 USD
  - 運用人件費別途(DevOpsエンジニア1名)
  - 予想総コスト:月額約800 USD + 運用費

選択基準まとめ

  • コスト最小化:小規模ならSQS(従量課金)、大規模なら自己ホスティングKafka
  • 運用負担最小化:SQSまたはMSK Serverless
  • 機能最大化:Confluent Cloud(Schema Registry、ksqlDB)
  • RabbitMQが必要な場合:CloudAMQP

6. Spring Boot連携コード

6.1 Kafka + Spring Boot

// build.gradle
// implementation 'org.springframework.kafka:spring-kafka:3.3.0'

// application.yml
// spring:
//   kafka:
//     bootstrap-servers: localhost:9092
//     producer:
//       key-serializer: org.apache.kafka.common.serialization.StringSerializer
//       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
//     consumer:
//       group-id: order-service
//       auto-offset-reset: earliest
//       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// KafkaProducerService.java
@Service
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public CompletableFuture<SendResult<String, OrderEvent>> sendOrderEvent(
            OrderEvent event) {
        return kafkaTemplate.send("orders", event.getOrderId(), event);
    }
}

// KafkaConsumerService.java
@Service
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(
        topics = "orders",
        groupId = "order-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset) {

        log.info("Received order: {} from partition: {}, offset: {}",
            event.getOrderId(), partition, offset);
        processOrder(event);
    }

    private void processOrder(OrderEvent event) {
        // 注文処理ロジック
    }
}

6.2 RabbitMQ + Spring Boot

// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-amqp'

// application.yml
// spring:
//   rabbitmq:
//     host: localhost
//     port: 5672
//     username: guest
//     password: guest
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {

    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String ORDER_QUEUE = "order.queue";
    public static final String ORDER_ROUTING_KEY = "order.created";
    public static final String DLQ_QUEUE = "order.dlq";

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE);
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
            .withArgument("x-dead-letter-exchange", "")
            .withArgument("x-dead-letter-routing-key", DLQ_QUEUE)
            .withArgument("x-message-ttl", 60000)
            .build();
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_QUEUE).build();
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, TopicExchange orderExchange) {
        return BindingBuilder.bind(orderQueue)
            .to(orderExchange)
            .with(ORDER_ROUTING_KEY);
    }
}

// RabbitMQProducer.java
@Service
@RequiredArgsConstructor
public class RabbitMQProducer {

    private final RabbitTemplate rabbitTemplate;

    public void sendOrderEvent(OrderEvent event) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.ORDER_EXCHANGE,
            RabbitMQConfig.ORDER_ROUTING_KEY,
            event,
            message -> {
                message.getMessageProperties().setContentType("application/json");
                message.getMessageProperties().setDeliveryMode(
                    MessageDeliveryMode.PERSISTENT);
                return message;
            }
        );
    }
}

// RabbitMQConsumer.java
@Service
@Slf4j
public class RabbitMQConsumer {

    @RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)
    public void handleOrderEvent(OrderEvent event, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            log.info("Received order: {}", event.getOrderId());
            processOrder(event);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error("Failed to process order: {}", event.getOrderId(), e);
            channel.basicNack(tag, false, false); // DLQに移動
        }
    }
}

6.3 SQS + Spring Boot

// build.gradle
// implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs:3.2.0'

// application.yml
// spring:
//   cloud:
//     aws:
//       region:
//         static: ap-northeast-1
//       sqs:
//         endpoint: https://sqs.ap-northeast-1.amazonaws.com
// SQSProducer.java
@Service
@RequiredArgsConstructor
public class SQSProducer {

    private final SqsTemplate sqsTemplate;

    public void sendOrderEvent(OrderEvent event) {
        sqsTemplate.send(to -> to
            .queue("order-queue")
            .payload(event)
            .header("eventType", "ORDER_CREATED")
        );
    }

    // FIFOキュー送信
    public void sendOrderEventFifo(OrderEvent event) {
        sqsTemplate.send(to -> to
            .queue("order-queue.fifo")
            .payload(event)
            .header(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER,
                event.getOrderId())
            .header(SqsHeaders.MessageSystemAttributes
                .SQS_MESSAGE_DEDUPLICATION_ID_HEADER,
                UUID.randomUUID().toString())
        );
    }
}

// SQSConsumer.java
@Service
@Slf4j
public class SQSConsumer {

    @SqsListener(value = "order-queue", maxConcurrentMessages = "10",
        maxMessagesPerPoll = "5")
    public void handleOrderEvent(@Payload OrderEvent event,
            @Header("eventType") String eventType) {
        log.info("Received order: {} with type: {}",
            event.getOrderId(), eventType);
        processOrder(event);
    }
}

6.4 Pulsar + Spring Boot

// build.gradle
// implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:1.2.0'

// application.yml
// spring:
//   pulsar:
//     client:
//       service-url: pulsar://localhost:6650
//     producer:
//       topic-name: persistent://public/default/orders
//     consumer:
//       subscription-name: order-service-sub
//       subscription-type: shared
// PulsarProducer.java
@Service
@RequiredArgsConstructor
public class PulsarProducer {

    private final PulsarTemplate<OrderEvent> pulsarTemplate;

    public void sendOrderEvent(OrderEvent event) {
        pulsarTemplate.send("persistent://public/default/orders", event);
    }
}

// PulsarConsumer.java
@Service
@Slf4j
public class PulsarConsumer {

    @PulsarListener(
        topics = "persistent://public/default/orders",
        subscriptionName = "order-service-sub",
        subscriptionType = SubscriptionType.Shared,
        schemaType = SchemaType.JSON
    )
    public void handleOrderEvent(OrderEvent event) {
        log.info("Received order: {}", event.getOrderId());
        processOrder(event);
    }
}

6.5 NATS + Spring Boot

// build.gradle
// implementation 'io.nats:jnats:2.20.4'
// NATSConfig.java
@Configuration
public class NATSConfig {

    @Bean
    public Connection natsConnection() throws IOException, InterruptedException {
        Options options = new Options.Builder()
            .server("nats://localhost:4222")
            .reconnectWait(Duration.ofSeconds(2))
            .maxReconnects(-1) // 無制限再接続
            .connectionListener((conn, type) -> {
                System.out.println("NATS connection event: " + type);
            })
            .build();
        return Nats.connect(options);
    }

    @Bean
    public JetStream jetStream(Connection connection) throws IOException {
        return connection.jetStream();
    }
}

// NATSProducer.java
@Service
@RequiredArgsConstructor
public class NATSProducer {

    private final JetStream jetStream;
    private final ObjectMapper objectMapper;

    public PublishAck sendOrderEvent(OrderEvent event) throws Exception {
        byte[] data = objectMapper.writeValueAsBytes(event);
        Message msg = NatsMessage.builder()
            .subject("orders.created")
            .data(data)
            .build();
        return jetStream.publish(msg);
    }
}

// NATSConsumer.java
@Service
@Slf4j
public class NATSConsumer {

    @PostConstruct
    public void startConsumer() throws Exception {
        JetStream js = natsConnection.jetStream();

        PushSubscribeOptions options = PushSubscribeOptions.builder()
            .durable("order-consumer")
            .build();

        js.subscribe("orders.>", "order-queue", dispatcher -> {
            dispatcher.onMessage(msg -> {
                try {
                    OrderEvent event = objectMapper.readValue(
                        msg.getData(), OrderEvent.class);
                    log.info("Received order: {}", event.getOrderId());
                    processOrder(event);
                    msg.ack();
                } catch (Exception e) {
                    log.error("Failed to process message", e);
                    msg.nak();
                }
            });
        }, false, options);
    }
}

6.6 Redis Streams + Spring Boot

// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-redis'
// RedisStreamConfig.java
@Configuration
public class RedisStreamConfig {

    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
            streamListenerContainer(RedisConnectionFactory factory) {

        var options = StreamMessageListenerContainer
            .StreamMessageListenerContainerOptions.builder()
            .pollTimeout(Duration.ofSeconds(1))
            .batchSize(10)
            .targetType(MapRecord.class)
            .build();

        var container = StreamMessageListenerContainer.create(factory, options);

        container.receiveAutoAck(
            Consumer.from("order-group", "consumer-1"),
            StreamOffset.create("orders", ReadOffset.lastConsumed()),
            new OrderStreamListener()
        );

        container.start();
        return container;
    }
}

// RedisStreamProducer.java
@Service
@RequiredArgsConstructor
public class RedisStreamProducer {

    private final StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;

    public RecordId sendOrderEvent(OrderEvent event) throws Exception {
        Map<String, String> fields = Map.of(
            "orderId", event.getOrderId(),
            "type", "ORDER_CREATED",
            "payload", objectMapper.writeValueAsString(event)
        );
        return redisTemplate.opsForStream()
            .add(StreamRecords.newRecord()
                .ofMap(fields)
                .withStreamKey("orders"));
    }
}

// OrderStreamListener.java
@Slf4j
public class OrderStreamListener
        implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        Map<String, String> body = message.getValue();
        log.info("Received order: {} from stream: {}",
            body.get("orderId"), message.getStream());
        processOrder(body);
    }
}

7. 意思決定フローチャート

メッセージキュー選択のための意思決定ツリーです。

graph TD
    START[メッセージキューの選択] --> Q1["イベントリプレイが必要か?"]

    Q1 -->|はい| Q1A["マルチテナンシーまたはGeo複製が必要か?"]
    Q1A -->|はい| PULSAR["Apache Pulsar"]
    Q1A -->|いいえ| KAFKA["Apache Kafka"]

    Q1 -->|いいえ| Q2["複雑なルーティングが必要か?"]
    Q2 -->|はい| RABBIT["RabbitMQ"]

    Q2 -->|いいえ| Q3["AWSネイティブサーバーレスか?"]
    Q3 -->|はい| SQS["Amazon SQS"]

    Q3 -->|いいえ| Q4["サブミリ秒の超低レイテンシーが必須か?"]
    Q4 -->|はい| Q4A["永続性が必要か?"]
    Q4A -->|はい| NATS["NATS + JetStream"]
    Q4A -->|いいえ| NATS_CORE["NATS Core"]

    Q4 -->|いいえ| Q5["すでにRedisを使用中か?"]
    Q5 -->|はい| REDIS["Redis Streams"]
    Q5 -->|いいえ| KAFKA2["Kafka(デフォルト選択)"]

クイック選択ガイド

要件推奨システム理由
イベントストリーミング + リプレイKafkaコミットログ基盤、業界標準
複雑なメッセージルーティングRabbitMQExchange/Bindingモデル
サーバーレス + AWSオールインSQSゼロ運用、Lambdaトリガー
マルチテナンシー + Geo複製Pulsarネイティブマルチテナンシー
IoT + エッジコンピューティングNATS軽量、サブミリ秒レイテンシー
すでにRedis使用中Redis Streams追加インフラ不要
大規模イベント + 低コストKafka(自己ホスティング)高スループット対コスト比
運用チームなしSQSまたはConfluent Cloudフルマネージド

8. 実践アーキテクチャパターン

8.1 Eコマース:注文処理パイプライン

graph LR
    subgraph "注文作成"
        API[API Gateway] --> OS[Order Service]
    end

    OS -->|OrderCreated| KAFKA[Kafka - ordersトピック]

    KAFKA --> PAY[Payment Service]
    KAFKA --> INV[Inventory Service]
    KAFKA --> NOTIF[Notification Service]

    PAY -->|PaymentCompleted| KAFKA2[Kafka - paymentsトピック]
    KAFKA2 --> SHIP[Shipping Service]

    INV -->|StockReserved| KAFKA3[Kafka - inventoryトピック]
    KAFKA3 --> OS2[Order Service - ステータス更新]

    SHIP -->|ShipmentCreated| KAFKA4[Kafka - shipmentsトピック]
    KAFKA4 --> NOTIF2[Notification Service - 配送通知]

なぜKafkaか?

  • 注文イベントのリプレイが必要(決済失敗時の再処理)
  • 多数のサービスが同じイベントを独立して消費
  • 監査(かんさ)ログとして活用可能

主要設定:

# 注文トピック設定
topics:
  orders:
    partitions: 12
    replication-factor: 3
    configs:
      retention.ms: 604800000 # 7日
      min.insync.replicas: 2
      cleanup.policy: delete

  payments:
    partitions: 6
    replication-factor: 3
    configs:
      retention.ms: 2592000000 # 30日(監査ログ)
      min.insync.replicas: 2

8.2 リアルタイム分析パイプライン

graph LR
    subgraph "データ収集"
        WEB[Webクリックストリーム] --> KAFKA[Kafka]
        APP[アプリイベント] --> KAFKA
        IOT[IoTセンサー] --> NATS[NATS]
        NATS -->|ブリッジ| KAFKA
    end

    subgraph "ストリーム処理"
        KAFKA --> FLINK[Apache Flink]
        KAFKA --> KSQL[ksqlDB]
    end

    subgraph "保存とサービング"
        FLINK --> ES[Elasticsearch]
        FLINK --> CH[ClickHouse]
        KSQL --> REDIS[Redis Cache]
    end

    subgraph "可視化"
        ES --> GRAFANA[Grafana]
        CH --> SUPERSET[Apache Superset]
    end

なぜKafka + NATS組み合わせか?

  • Kafka:高スループットのイベント収集とFlink/ksqlDB連携
  • NATS:IoTセンサーの超低レイテンシー要件を満たす軽量エージェント
  • ブリッジ:NATSで収集したデータをKafkaに統合

8.3 IoTセンサーデータ収集

graph TB
    subgraph "エッジレイヤー"
        S1[温度センサー] --> LN1[NATS Leaf Node - 工場A]
        S2[湿度センサー] --> LN1
        S3[振動センサー] --> LN2[NATS Leaf Node - 工場B]
    end

    subgraph "コアレイヤー"
        LN1 --> NATS[NATSクラスタ - JetStream]
        LN2 --> NATS
        NATS --> PROC[Stream Processor]
        PROC -->|異常検知| ALERT[Alert Service - RabbitMQ]
        PROC -->|集約データ| TS[TimescaleDB]
    end

    ALERT -->|メール| EMAIL[Email Service]
    ALERT -->|Slack| SLACK[Slack通知]

なぜNATS + RabbitMQ組み合わせか?

  • NATS:Leaf Nodeでエッジまで拡張、サブミリ秒レイテンシー
  • JetStream:センサーデータの永続性とリプレイ
  • RabbitMQ:アラートの柔軟なルーティング(メール、Slack、SMS分配)

8.4 Sagaパターン — 分散トランザクション

sequenceDiagram
    participant OS as Order Service
    participant K as Kafka
    participant PS as Payment Service
    participant IS as Inventory Service
    participant SS as Shipping Service

    OS->>K: OrderCreated
    K->>PS: 決済リクエスト
    PS->>K: PaymentCompleted
    K->>IS: 在庫差引
    IS->>K: StockReserved
    K->>SS: 配送作成
    SS->>K: ShipmentCreated
    K->>OS: 注文完了

    Note over PS,IS: 失敗時の補償トランザクション
    PS-->>K: PaymentFailed(補償)
    K-->>IS: 在庫復旧(補償)
    K-->>OS: 注文キャンセル(補償)

Choreography vs Orchestration:

方式メリットデメリット推奨シナリオ
Choreographyサービス間結合度が低い、拡張容易フロー把握が困難シンプルなワークフロー
Orchestration中央制御、フロー明確オーケストレーターが単一障害点複雑なビジネスロジック

9. 運用チェックリスト

共通モニタリングメトリクス

メトリクス説明警告閾値(しきいち)
Consumer Lag未消費メッセージ数1万件以上の持続
メッセージ処理失敗率DLQ流入比率1%以上
Brokerディスク使用率ストレージ容量80%以上
ネットワークI/O入出力トラフィック帯域幅の70%以上
JVM GC時間Kafka/Pulsar/RabbitMQFull GC 5秒以上

Kafka本番チェックリスト

1. Broker設定
   - min.insync.replicas = 2(replication-factor 3基準)
   - unclean.leader.election.enable = false
   - auto.create.topics.enable = false

2. Producer設定
   - acks = all(データ損失防止)
   - retries = Integer.MAX_VALUE
   - enable.idempotence = true
   - max.in.flight.requests.per.connection = 5

3. Consumer設定
   - enable.auto.commit = false(手動コミット)
   - max.poll.records = 500
   - session.timeout.ms = 30000

4. モニタリング
   - Prometheus + Grafana(JMX Exporter)
   - Consumer Lagアラート設定
   - Under-Replicated Partitionsアラート

RabbitMQ本番チェックリスト

1. クラスタ設定
   - Quorum Queueを使用(Classic Mirrored Queueの代替)
   - vm_memory_high_watermark = 0.4
   - disk_free_limit = 2GB

2. 接続管理
   - Connection Poolを使用
   - Heartbeatタイムアウト:60秒
   - チャネル再利用(チャネル作成コストが高い)

3. メッセージ設定
   - Persistent delivery mode
   - Publisher Confirmsを有効化
   - Dead Letter Exchangeを設定

4. モニタリング
   - Management Pluginダッシュボード
   - rabbitmq_prometheusプラグイン
   - Queue深度アラート設定

10. クイズ

Q1. Kafka 4.0の最大の変化

Kafka 4.0で除去されたコアコンポーネントは何で、それを代替する技術は?

正解:ZooKeeperが完全に除去され、**KRaft(Kafka Raft)**がこれを代替します。

  • KRaftはRaft合意(ごうい)アルゴリズム基盤のメタデータ管理システムです
  • メタデータ伝播速度が10倍向上し、パーティション数が100万個以上に拡張可能になりました
  • ZooKeeperクラスタを別途管理する必要がなくなり、運用複雑度が大幅に削減されます

Q2. Pull vs Pushモデル

Pull基盤の消費モデル(Kafka、SQS)とPush基盤の消費モデル(RabbitMQ)の違いとそれぞれの長所・短所は?

Pull基盤(Kafka、SQS):

  • Consumerが能動的にメッセージを取得
  • 長所:Consumerが自身の処理速度に合わせて取得可能(自然なバックプレッシャー)
  • 短所:Long Pollingが必要で、リアルタイム性がやや劣る場合がある

Push基盤(RabbitMQ):

  • BrokerがConsumerにメッセージをプッシュ
  • 長所:メッセージ到着(とうちゃく)時に即座に配信可能(低レイテンシー)
  • 短所:Consumer過負荷時に別途のバックプレッシャーメカニズムが必要(prefetch countなど)

Q3. Exactly-Once保証

KafkaでExactly-Once Semantics(EOS)を達成するために必要な3つの設定は?

正解:

  1. enable.idempotence = true:Producerが同一メッセージを重複送信しても一度だけ保存
  2. transactional.id設定:Producerトランザクションを識別する一意のID
  3. isolation.level = read_committed:Consumerがコミットされたトランザクションメッセージのみ読み取り

この3つを組み合わせると、ProducerからConsumerまでend-to-end Exactly-Onceが保証されます。

Q4. SQS Standard vs FIFO

Amazon SQS StandardキューとFIFOキューのスループット、順序保証、重複処理の観点での違いは?
特性StandardFIFO
スループットほぼ無制限秒間300 TPS(バッチ時3,000)
順序保証ベストエフォート(順序保証なし)厳密なFIFO順序
重複処理At-Least-Once(重複の可能性)Exactly-Once(5分以内の重複排除)
価格0.40 USD / 100万件0.50 USD / 100万件
使用例高スループット、順序不問決済、注文など順序重要

Q5. Pulsar vs Kafkaアーキテクチャ

PulsarのCompute/Storage分離アーキテクチャがKafka比で持つ運用上のメリット2つは?

正解:

  1. 独立したスケーリング:Broker(演算)とBookKeeper(ストレージ)を別々に拡張できます。KafkaではBrokerがデータを直接保存するため、ストレージ容量の拡張時にBroker全体を追加する必要があります。PulsarではBookKeeperノードのみ追加すれば済みます。

  2. リバランシング不要の拡張:KafkaでBrokerを追加するとパーティションリバランシングが必要で、大量のデータ移動が発生します。Pulsarでは新しいBrokerが即座にトピックの所有権を受け取れるため、拡張時のサービス影響が最小限に抑えられます。


参考資料

  1. Apache Kafka 4.0 Release Notes — KRaft GA、Share Groups
  2. KIP-932: Queues for Kafka — Share Group詳細スペック
  3. RabbitMQ 4.0 Release Blog — AMQP 1.0、Khepri
  4. RabbitMQ Streams — Stream Pluginとフィルタリング
  5. Amazon SQS Developer Guide — Standard vs FIFO
  6. Apache Pulsar 4.1 Release — 19 PIPs
  7. NATS 2.11 Release Notes — Message TTL、分散トレーシング
  8. Confluent Benchmark: Kafka vs Pulsar — パフォーマンス比較
  9. Spring for Apache Kafka — Spring Kafka公式ドキュメント
  10. Spring AMQP — Spring RabbitMQ連携
  11. Spring Cloud AWS SQS — SQS連携
  12. Spring for Apache Pulsar — Spring Pulsar公式ドキュメント
  13. Designing Data-Intensive Applications(Martin Kleppmann) — メッセージシステム理論
  14. Enterprise Integration Patterns — メッセージングパターンバイブル
  15. Confluent Cloud Pricing — クラウドコスト参考
  16. AWS MSK Pricing — MSKコスト参考