- Authors

- Name
- Youngju Kim
- @fjvbn20031
- 1. なぜメッセージキューなのか?
- 2. アーキテクチャ比較:ログ vs ブローカー vs ハイブリッド
- 3. パフォーマンスベンチマーク
- 4. 2025年新機能ハイライト
- 5. クラウドマネージドサービス比較 + 価格
- 6. Spring Boot連携コード
- 7. 意思決定フローチャート
- 8. 実践アーキテクチャパターン
- 9. 運用チェックリスト
- 10. クイズ
- 参考資料
1. なぜメッセージキューなのか?
市場規模と導入状況
グローバルメッセージキューミドルウェア市場は、2025年基準で約**60億ドル(6B USD)規模に成長しました。Fortune Business Insightsのレポートによると、企業の41%**がすでにメッセージキュー基盤の非同期アーキテクチャを導入しており、追加で27%が導入を計画しています。
なぜこれほど多くの企業がメッセージキューを採用しているのでしょうか?
同期 vs 非同期通信
graph LR
subgraph "同期方式 - 強い結合"
A[注文サービス] -->|HTTP呼出| B[決済サービス]
B -->|HTTP呼出| C[在庫サービス]
C -->|HTTP呼出| D[通知サービス]
end
同期方式(どうきほうしき)では、決済(けっさい)サービスが1秒遅延(ちえん)するとチェーン全体が1秒以上遅くなります。一つが停止(ていし)すると連鎖障害(れんさしょうがい)が発生します。
graph LR
subgraph "非同期方式 - 緩い結合"
A2[注文サービス] -->|イベント発行| MQ[メッセージキュー]
MQ -->|購読| B2[決済サービス]
MQ -->|購読| C2[在庫サービス]
MQ -->|購読| D2[通知サービス]
end
非同期方式(ひどうきほうしき)では、注文サービスがメッセージを発行(はっこう)するだけで済みます。決済サービスが一時的にダウンしてもメッセージはキューに保管(ほかん)され、復旧後に処理されます。
メッセージキュー導入が必要な5つのサイン
- サービス間の同期呼出が3段階以上チェーンしている場合
- トラフィックスパイクによりダウンストリームサービスが過負荷になる場合
- イベントリプレイが必要な場合(障害復旧、データ再処理)
- 多数のコンシューマーが同じデータを異なる目的で処理する場合
- マイクロサービス移行時にサービス間の結合度を下げたい場合
2. アーキテクチャ比較:ログ vs ブローカー vs ハイブリッド
メッセージキューは大きく3つのアーキテクチャパラダイムに分類されます。
2.1 Apache Kafka — 分散コミットログ
graph TB
subgraph "Kafkaクラスタ"
direction TB
KR[KRaftコントローラー] --> B1[Broker 1]
KR --> B2[Broker 2]
KR --> B3[Broker 3]
end
P[Producer] --> B1
B1 --> T["Topic: orders(3パーティション)"]
T --> P0[Partition 0 - Leader: B1]
T --> P1[Partition 1 - Leader: B2]
T --> P2[Partition 2 - Leader: B3]
P0 --> CG["Consumer Group"]
P1 --> CG
P2 --> CG
主要特徴(しゅようとくちょう):
- Append-Onlyコミットログ:メッセージはパーティションに順序(じゅんじょ)通り追加され、不変(ふへん、immutable)です
- Consumer Offset:コンシューマーがどこまで読んだかを自己管理(じこかんり) — リプレイ可能
- KRaft(Kafka 4.0):ZooKeeperを完全に除去(じょきょ)し、Raft基盤のメタデータ管理
- 保持ポリシー:時間またはサイズに基づいてメッセージを保持(デフォルト7日)
- パーティション並列処理:パーティション数だけコンシューマーを並列(へいれつ)に拡張
適した用途:イベントストリーミング、ログ集約(しゅうやく)、イベントソーシング、リアルタイム分析
2.2 RabbitMQ — AMQPメッセージブローカー
graph LR
P[Producer] --> E["Exchange(type: topic)"]
E -->|"routing.key.a"| Q1[Queue A]
E -->|"routing.key.b"| Q2[Queue B]
E -->|"routing.#"| Q3[Queue C - ワイルドカード]
Q1 --> C1[Consumer 1]
Q2 --> C2[Consumer 2]
Q3 --> C3[Consumer 3]
主要特徴:
- AMQPプロトコル:Exchange、Binding、Queueの柔軟(じゅうなん)なルーティングモデル
- Exchange種類:Direct、Topic、Fanout、Headers — 複雑なルーティングが可能
- メッセージACK:Consumerが確認応答(かくにんおうとう)を送ってからメッセージがキューから削除
- プラグインエコシステム:Shovel、Federation、Management UIなど
- Priority Queue:メッセージ優先度(ゆうせんど)の設定が可能
適した用途:複雑なルーティング、RPCパターン、タスク分配、既存AMQPエコシステム
2.3 Amazon SQS — フルマネージドキュー
graph LR
P[Producer] -->|SendMessage| SQS["SQS Queue"]
SQS -->|ReceiveMessage| C1[Consumer 1]
SQS -->|ReceiveMessage| C2[Consumer 2]
SQS -.->|失敗時| DLQ["Dead Letter Queue"]
主要特徴:
- フルマネージド:サーバープロビジョニング、パッチ、スケーリング不要
- Pull基盤:Consumerが能動的にメッセージを取得する構造
- Standard vs FIFO:無制限スループット(Standard)または順序保証(FIFO、秒間300~3,000 TPS)
- Visibility Timeout:メッセージ処理中に他のコンシューマーから見えなくなる
- DLQネイティブサポート:処理失敗メッセージの自動移動
適した用途:サーバーレスアーキテクチャ、AWSネイティブワークロード、運用負担の最小化
2.4 Apache Pulsar — 分離型アーキテクチャ
graph TB
subgraph "Pulsarクラスタ"
direction TB
B1[Broker 1 - サービング] --> BK1[BookKeeper 1 - ストレージ]
B2[Broker 2 - サービング] --> BK2[BookKeeper 2 - ストレージ]
B3[Broker 3 - サービング] --> BK3[BookKeeper 3 - ストレージ]
end
P[Producer] --> B1
B1 --> T["Topic: events"]
T --> SUB["Subscription(Shared/Key_Shared/Exclusive)"]
SUB --> C1[Consumer 1]
SUB --> C2[Consumer 2]
主要特徴:
- Compute/Storage分離:Broker(サービング)とBookKeeper(ストレージ)が独立してスケーリング
- マルチテナンシー:ネームスペースとテナント単位の隔離(かくり)
- Geo-Replication:ネイティブなクロスデータセンター複製
- Tiered Storage:古いデータをS3のような低コストストレージに自動移管
- 多様なサブスクリプションモード:Exclusive、Shared、Failover、Key_Shared
適した用途:マルチテナント環境、地域分散、大規模ストリーミング
2.5 NATS — 軽量メッセージメッシュ
graph LR
subgraph "NATSクラスタ"
N1[NATS Server 1] --- N2[NATS Server 2]
N2 --- N3[NATS Server 3]
N1 --- N3
end
P[Publisher] --> N1
N1 -->|"subject: sensor.temp"| S1[Subscriber 1]
N2 -->|"subject: sensor.*"| S2[Subscriber 2 - ワイルドカード]
N3 -->|JetStream| JS[JetStream Consumer]
主要特徴:
- Core NATS:最小限のレイテンシーのためのfire-and-forget Pub/Sub
- JetStream:永続性(えいぞくせい)、リプレイ、exactly-onceが必要な場合に有効化
- 軽量バイナリ:単一バイナリ約20MB、Goで記述(きじゅつ)
- Subject基盤ルーティング:階層的なsubject名とワイルドカードマッチング
- Leaf Node:エッジコンピューティングのための軽量ノード拡張
適した用途:IoT、エッジコンピューティング、マイクロサービス間の高速通信、クラウドネイティブ
2.6 Redis Streams — インメモリログ構造
graph LR
P[Producer] -->|XADD| RS["Redis Stream(mystream)"]
RS -->|XREADGROUP| CG["Consumer Group"]
CG --> C1["Consumer 1(pending entries)"]
CG --> C2["Consumer 2(pending entries)"]
主要特徴:
- インメモリ:超低レイテンシー、高スループット
- Consumer Group:Kafkaと類似(るいじ)したコンシューマーグループモデル
- XADD/XREAD:シンプルなコマンドベースAPI
- Pending Entries List(PEL):未確認メッセージの追跡(ついせき)
- MAXLENトリミング:メモリ制限内での自動整理
適した用途:軽量ストリーミング、すでにRedisを使用している環境、キャッシュ+キュー統合
アーキテクチャパラダイム要約
| システム | パラダイム | メッセージ保存 | 消費モデル | メッセージ削除 |
|---|---|---|---|---|
| Kafka | 分散コミットログ | ディスク(保持ポリシー) | Pull + Offset | 保持期間後に削除 |
| RabbitMQ | メッセージブローカー | メモリ + ディスク | Push(ACK後削除) | ACK時に即時削除 |
| SQS | マネージドキュー | AWS管理型 | Pull | 処理後に削除 |
| Pulsar | 分離型ログ | BookKeeper + Tiered | Pull + カーソル | 保持ポリシー基盤 |
| NATS | メッセージメッシュ | メモリ / JetStream | Push / Pull | ポリシー基盤 |
| Redis Streams | インメモリログ | メモリ(AOFオプション) | Pull(Consumer Group) | MAXLENトリミング |
3. パフォーマンスベンチマーク
スループットとレイテンシーの比較
以下のベンチマークは標準的な3ノードクラスタ環境(m5.2xlargeまたは同等)での測定値です。
| システム | スループット(msgs/s) | P50レイテンシー | P99レイテンシー | 最適な用途 |
|---|---|---|---|---|
| Kafka | 500K~1M | 5~15ms | 10~50ms | イベントストリーム、リプレイ、ログ集約 |
| Pulsar | 1M~2.6M | 5~10ms | Kafka比300倍改善されたP99テールレイテンシー | マルチテナンシー、Geo複製 |
| NATS | 200K~400K | サブミリ秒 | 1~5ms | IoT、エッジ、高速メッセージング |
| RabbitMQ | 50K~100K | 1~5ms | 5~20ms | 複雑なルーティング、RPC |
| Redis Streams | 100K~500K | サブミリ秒 | 1~3ms | 軽量ストリーミング、キャッシュ統合 |
| SQS | 3K~30K(Standard) | 20~50ms | 50~100ms | サーバーレス、AWSネイティブ |
スループット詳細分析
Kafka 1M msgs/sの秘密:
# Kafkaスループット最適化設定
num.partitions=12
batch.size=65536 # 64KBバッチ
linger.ms=5 # 5ms待機後バッチ送信
compression.type=lz4 # LZ4圧縮
buffer.memory=67108864 # 64MBバッファ
acks=1 # Leaderのみ確認(最大スループット)
- パーティション12個基準、各パーティションあたり約83K msgs/s
acks=allに変更すると約30~40%スループット減少(げんしょう)するがデータ安全性を確保- LZ4圧縮(あっしゅく)でネットワーク帯域幅(たいいきはば)40~60%節約
PulsarがP99で強い理由:
- BrokerとBookKeeperの分離により書き込みI/Oと読み取りI/Oが競合(きょうごう)しない
- BookKeeperのJournal(WAL)とLedger(データ)が物理的(ぶつりてき)に分離されたディスクを使用
- 結果:Kafka比P99テールレイテンシーが最大300倍改善
NATSサブミリ秒の達成条件:
# NATS Core(JetStream無効化)
# - 永続性なし、インメモリのみ
# - 10KB以下のメッセージサイズ
# - 単一データセンター内の通信
スケーラビリティ比較
| システム | 水平スケーリング方式 | パーティション/キュー上限 | クラスタ最大規模 |
|---|---|---|---|
| Kafka | パーティション追加 + Broker追加 | 実務推奨4,000~20,000個/クラスタ | 数百台のBroker |
| Pulsar | Broker/BookKeeper独立スケーリング | 100万+トピック可能 | 数千台のノード |
| NATS | Leaf Nodeの拡張 | 制限なし(subject基盤) | スーパークラスタ |
| RabbitMQ | Quorum Queue + ノード追加 | 実務推奨数千個 | 数十台のノード |
| Redis Streams | Redis Clusterシャーディング | シャードあたり1つのStream | 数百台のノード |
| SQS | 自動(無制限) | 無制限 | AWSマネージド |
4. 2025年新機能ハイライト
4.1 Kafka 4.0 — KRaft GAとキューモード
KRaft GA(KIP-833):
Kafka 4.0でZooKeeperが完全に除去されました。すべてのメタデータ管理がKRaftコントローラーに移管(いかん)されます。
# Kafka 4.0 KRaftモード設定
process.roles: controller,broker
node.id: 1
controller.quorum.voters: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners: PLAINTEXT://:9092,CONTROLLER://:9093
# ZooKeeper関連設定は完全除去
改善効果(かいぜんこうか):
- メタデータ伝播(でんぱ)速度10倍向上
- パーティション数100万個以上に拡張可能
- 運用複雑度の大幅(おおはば)な削減(ZooKeeperクラスタの個別管理不要)
- クラスタ起動時間が数分から数秒に短縮
KIP-932: Queues for Kafka(Share Groups):
Kafkaに伝統的な「キュー」セマンティクスが追加されました。従来はコンシューマーグループ内でもパーティション単位でのみ分配できましたが、Share Groupsを使えばメッセージ単位で分配(ぶんぱい)できます。
// Kafka 4.0 Share Group使用例
Properties props = new Properties();
props.put("group.id", "my-share-group");
props.put("group.type", "share"); // 新しいShareタイプ
KafkaShareConsumer<String, String> consumer =
new KafkaShareConsumer<>(props);
consumer.subscribe(List.of("orders"));
// メッセージ単位で分配 - パーティションに依存しない
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
consumer.acknowledge(record); // 個別メッセージACK
}
}
4.2 RabbitMQ 4.x — SQLフィルターとAMQP 1.0
ネイティブAMQP 1.0サポート:
RabbitMQ 4.0からAMQP 1.0がコアプロトコルに昇格(しょうかく)しました。既存のAMQP 0.9.1はレガシープラグインに移行(いこう)します。
Stream Filtering(4.2):
// RabbitMQ 4.2 Stream SQLフィルター
// サーバーサイドでフィルタリング - ネットワーク帯域幅を節約
Consumer consumer = environment.consumerBuilder()
.stream("events")
.filter()
.values("region-us", "priority-high") // サーバーサイドフィルター
.postFilter(msg -> msg.getProperties().getSubject().equals("order"))
.builder()
.messageHandler((context, message) -> {
// フィルタリングされたメッセージのみ受信
processMessage(message);
context.storeOffset();
})
.build();
処理性能(しょりせいのう):フィルタリング時でも4M+ msgs/sを維持
Khepriメタデータストア:
Mnesiaを置き換えるRaft基盤のメタデータストアで、大規模クラスタでの安定性が大幅に改善されました。
4.3 Pulsar 4.1 — 19 PIPs
Apache Pulsar 4.1は19のPulsar Improvement Proposals(PIPs)を含んでいます。
主要な変更:
- Java 17必須:Java 8/11サポート終了
- PIP-354:Topic Policyパフォーマンス最適化
- PIP-362:Broker Level Dispatch Throttling
- PIP-374:改善されたメモリ管理でGC負担(ふたん)を削減
4.4 NATS 2.11 — メッセージTTLと分散トレーシング
# NATS 2.11 JetStreamメッセージTTL設定
nats stream add EVENTS \
--subjects "events.>" \
--max-msg-ttl 3600 \
--storage file \
--replicas 3
主要な変更:
- メッセージレベルTTL:ストリーム全体ではなく個別メッセージに有効期限(ゆうこうきげん)を設定
- 分散トレーシング:OpenTelemetry統合によるメッセージフロー追跡
- SubjectTransform:subject名の動的変換
- 改善されたモニタリング:Prometheusメトリクス拡張
5. クラウドマネージドサービス比較 + 価格
主要マネージドサービス
| サービス | 基盤技術 | 提供元 | 主要特徴 |
|---|---|---|---|
| Confluent Cloud | Kafka | Confluent | Schema Registry、ksqlDB、Connectors |
| Amazon MSK | Kafka | AWS | VPC統合、Serverlessオプション |
| Amazon MSK Serverless | Kafka | AWS | 自動スケーリング、従量課金 |
| Amazon SQS | 独自 | AWS | フルマネージド、サーバーレス統合 |
| CloudAMQP | RabbitMQ | 84codes | Dedicated/Sharedインスタンス |
| StreamNative | Pulsar | StreamNative | Pulsar専門マネージド |
| Synadia | NATS | Synadia | NATS専門マネージド |
月額コストシミュレーション(1日1億メッセージ、平均1KB)
シナリオ:1日1億メッセージ、平均1KB、3日間保持
Confluent Cloud(Basic):
- スループット:平均約1,160 msgs/s
- 予想コスト:月額約800~1,200 USD
- 含む:ブローカー、ストレージ、ネットワーク
Amazon MSK Provisioned(kafka.m5.large x 3):
- インスタンス:0.21 USD/h x 3 = 月額約454 USD
- ストレージ:300GB x 0.10 USD/GB = 30 USD
- 予想総コスト:月額約500~700 USD
Amazon MSK Serverless:
- クラスタ:0.75 USD/h = 月額約540 USD
- パーティション:0.0015 USD/h/partition
- ストレージ:0.10 USD/GB
- 予想総コスト:月額約600~1,000 USD
Amazon SQS Standard:
- 最初の100万件は無料
- 以降:0.40 USD / 100万件
- 月約30億件:約1,200 USD
- データ転送コスト別途
CloudAMQP(Dedicated - Tiger):
- 月額約500~1,000 USD
- クラスタ含む、モニタリング含む
自己ホスティングKafka(c5.2xlarge x 3 + EBS):
- EC2:0.34 USD/h x 3 = 月額約734 USD
- EBS gp3 300GB x 3:月額約72 USD
- 運用人件費別途(DevOpsエンジニア1名)
- 予想総コスト:月額約800 USD + 運用費
選択基準まとめ
- コスト最小化:小規模ならSQS(従量課金)、大規模なら自己ホスティングKafka
- 運用負担最小化:SQSまたはMSK Serverless
- 機能最大化:Confluent Cloud(Schema Registry、ksqlDB)
- RabbitMQが必要な場合:CloudAMQP
6. Spring Boot連携コード
6.1 Kafka + Spring Boot
// build.gradle
// implementation 'org.springframework.kafka:spring-kafka:3.3.0'
// application.yml
// spring:
// kafka:
// bootstrap-servers: localhost:9092
// producer:
// key-serializer: org.apache.kafka.common.serialization.StringSerializer
// value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
// consumer:
// group-id: order-service
// auto-offset-reset: earliest
// key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
// value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// KafkaProducerService.java
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public CompletableFuture<SendResult<String, OrderEvent>> sendOrderEvent(
OrderEvent event) {
return kafkaTemplate.send("orders", event.getOrderId(), event);
}
}
// KafkaConsumerService.java
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(
topics = "orders",
groupId = "order-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Received order: {} from partition: {}, offset: {}",
event.getOrderId(), partition, offset);
processOrder(event);
}
private void processOrder(OrderEvent event) {
// 注文処理ロジック
}
}
6.2 RabbitMQ + Spring Boot
// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-amqp'
// application.yml
// spring:
// rabbitmq:
// host: localhost
// port: 5672
// username: guest
// password: guest
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
public static final String ORDER_EXCHANGE = "order.exchange";
public static final String ORDER_QUEUE = "order.queue";
public static final String ORDER_ROUTING_KEY = "order.created";
public static final String DLQ_QUEUE = "order.dlq";
@Bean
public TopicExchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE);
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(ORDER_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", DLQ_QUEUE)
.withArgument("x-message-ttl", 60000)
.build();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_QUEUE).build();
}
@Bean
public Binding orderBinding(Queue orderQueue, TopicExchange orderExchange) {
return BindingBuilder.bind(orderQueue)
.to(orderExchange)
.with(ORDER_ROUTING_KEY);
}
}
// RabbitMQProducer.java
@Service
@RequiredArgsConstructor
public class RabbitMQProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrderEvent(OrderEvent event) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,
RabbitMQConfig.ORDER_ROUTING_KEY,
event,
message -> {
message.getMessageProperties().setContentType("application/json");
message.getMessageProperties().setDeliveryMode(
MessageDeliveryMode.PERSISTENT);
return message;
}
);
}
}
// RabbitMQConsumer.java
@Service
@Slf4j
public class RabbitMQConsumer {
@RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)
public void handleOrderEvent(OrderEvent event, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
log.info("Received order: {}", event.getOrderId());
processOrder(event);
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("Failed to process order: {}", event.getOrderId(), e);
channel.basicNack(tag, false, false); // DLQに移動
}
}
}
6.3 SQS + Spring Boot
// build.gradle
// implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs:3.2.0'
// application.yml
// spring:
// cloud:
// aws:
// region:
// static: ap-northeast-1
// sqs:
// endpoint: https://sqs.ap-northeast-1.amazonaws.com
// SQSProducer.java
@Service
@RequiredArgsConstructor
public class SQSProducer {
private final SqsTemplate sqsTemplate;
public void sendOrderEvent(OrderEvent event) {
sqsTemplate.send(to -> to
.queue("order-queue")
.payload(event)
.header("eventType", "ORDER_CREATED")
);
}
// FIFOキュー送信
public void sendOrderEventFifo(OrderEvent event) {
sqsTemplate.send(to -> to
.queue("order-queue.fifo")
.payload(event)
.header(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER,
event.getOrderId())
.header(SqsHeaders.MessageSystemAttributes
.SQS_MESSAGE_DEDUPLICATION_ID_HEADER,
UUID.randomUUID().toString())
);
}
}
// SQSConsumer.java
@Service
@Slf4j
public class SQSConsumer {
@SqsListener(value = "order-queue", maxConcurrentMessages = "10",
maxMessagesPerPoll = "5")
public void handleOrderEvent(@Payload OrderEvent event,
@Header("eventType") String eventType) {
log.info("Received order: {} with type: {}",
event.getOrderId(), eventType);
processOrder(event);
}
}
6.4 Pulsar + Spring Boot
// build.gradle
// implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:1.2.0'
// application.yml
// spring:
// pulsar:
// client:
// service-url: pulsar://localhost:6650
// producer:
// topic-name: persistent://public/default/orders
// consumer:
// subscription-name: order-service-sub
// subscription-type: shared
// PulsarProducer.java
@Service
@RequiredArgsConstructor
public class PulsarProducer {
private final PulsarTemplate<OrderEvent> pulsarTemplate;
public void sendOrderEvent(OrderEvent event) {
pulsarTemplate.send("persistent://public/default/orders", event);
}
}
// PulsarConsumer.java
@Service
@Slf4j
public class PulsarConsumer {
@PulsarListener(
topics = "persistent://public/default/orders",
subscriptionName = "order-service-sub",
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON
)
public void handleOrderEvent(OrderEvent event) {
log.info("Received order: {}", event.getOrderId());
processOrder(event);
}
}
6.5 NATS + Spring Boot
// build.gradle
// implementation 'io.nats:jnats:2.20.4'
// NATSConfig.java
@Configuration
public class NATSConfig {
@Bean
public Connection natsConnection() throws IOException, InterruptedException {
Options options = new Options.Builder()
.server("nats://localhost:4222")
.reconnectWait(Duration.ofSeconds(2))
.maxReconnects(-1) // 無制限再接続
.connectionListener((conn, type) -> {
System.out.println("NATS connection event: " + type);
})
.build();
return Nats.connect(options);
}
@Bean
public JetStream jetStream(Connection connection) throws IOException {
return connection.jetStream();
}
}
// NATSProducer.java
@Service
@RequiredArgsConstructor
public class NATSProducer {
private final JetStream jetStream;
private final ObjectMapper objectMapper;
public PublishAck sendOrderEvent(OrderEvent event) throws Exception {
byte[] data = objectMapper.writeValueAsBytes(event);
Message msg = NatsMessage.builder()
.subject("orders.created")
.data(data)
.build();
return jetStream.publish(msg);
}
}
// NATSConsumer.java
@Service
@Slf4j
public class NATSConsumer {
@PostConstruct
public void startConsumer() throws Exception {
JetStream js = natsConnection.jetStream();
PushSubscribeOptions options = PushSubscribeOptions.builder()
.durable("order-consumer")
.build();
js.subscribe("orders.>", "order-queue", dispatcher -> {
dispatcher.onMessage(msg -> {
try {
OrderEvent event = objectMapper.readValue(
msg.getData(), OrderEvent.class);
log.info("Received order: {}", event.getOrderId());
processOrder(event);
msg.ack();
} catch (Exception e) {
log.error("Failed to process message", e);
msg.nak();
}
});
}, false, options);
}
}
6.6 Redis Streams + Spring Boot
// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-redis'
// RedisStreamConfig.java
@Configuration
public class RedisStreamConfig {
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
streamListenerContainer(RedisConnectionFactory factory) {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofSeconds(1))
.batchSize(10)
.targetType(MapRecord.class)
.build();
var container = StreamMessageListenerContainer.create(factory, options);
container.receiveAutoAck(
Consumer.from("order-group", "consumer-1"),
StreamOffset.create("orders", ReadOffset.lastConsumed()),
new OrderStreamListener()
);
container.start();
return container;
}
}
// RedisStreamProducer.java
@Service
@RequiredArgsConstructor
public class RedisStreamProducer {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
public RecordId sendOrderEvent(OrderEvent event) throws Exception {
Map<String, String> fields = Map.of(
"orderId", event.getOrderId(),
"type", "ORDER_CREATED",
"payload", objectMapper.writeValueAsString(event)
);
return redisTemplate.opsForStream()
.add(StreamRecords.newRecord()
.ofMap(fields)
.withStreamKey("orders"));
}
}
// OrderStreamListener.java
@Slf4j
public class OrderStreamListener
implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
Map<String, String> body = message.getValue();
log.info("Received order: {} from stream: {}",
body.get("orderId"), message.getStream());
processOrder(body);
}
}
7. 意思決定フローチャート
メッセージキュー選択のための意思決定ツリーです。
graph TD
START[メッセージキューの選択] --> Q1["イベントリプレイが必要か?"]
Q1 -->|はい| Q1A["マルチテナンシーまたはGeo複製が必要か?"]
Q1A -->|はい| PULSAR["Apache Pulsar"]
Q1A -->|いいえ| KAFKA["Apache Kafka"]
Q1 -->|いいえ| Q2["複雑なルーティングが必要か?"]
Q2 -->|はい| RABBIT["RabbitMQ"]
Q2 -->|いいえ| Q3["AWSネイティブサーバーレスか?"]
Q3 -->|はい| SQS["Amazon SQS"]
Q3 -->|いいえ| Q4["サブミリ秒の超低レイテンシーが必須か?"]
Q4 -->|はい| Q4A["永続性が必要か?"]
Q4A -->|はい| NATS["NATS + JetStream"]
Q4A -->|いいえ| NATS_CORE["NATS Core"]
Q4 -->|いいえ| Q5["すでにRedisを使用中か?"]
Q5 -->|はい| REDIS["Redis Streams"]
Q5 -->|いいえ| KAFKA2["Kafka(デフォルト選択)"]
クイック選択ガイド
| 要件 | 推奨システム | 理由 |
|---|---|---|
| イベントストリーミング + リプレイ | Kafka | コミットログ基盤、業界標準 |
| 複雑なメッセージルーティング | RabbitMQ | Exchange/Bindingモデル |
| サーバーレス + AWSオールイン | SQS | ゼロ運用、Lambdaトリガー |
| マルチテナンシー + Geo複製 | Pulsar | ネイティブマルチテナンシー |
| IoT + エッジコンピューティング | NATS | 軽量、サブミリ秒レイテンシー |
| すでにRedis使用中 | Redis Streams | 追加インフラ不要 |
| 大規模イベント + 低コスト | Kafka(自己ホスティング) | 高スループット対コスト比 |
| 運用チームなし | SQSまたはConfluent Cloud | フルマネージド |
8. 実践アーキテクチャパターン
8.1 Eコマース:注文処理パイプライン
graph LR
subgraph "注文作成"
API[API Gateway] --> OS[Order Service]
end
OS -->|OrderCreated| KAFKA[Kafka - ordersトピック]
KAFKA --> PAY[Payment Service]
KAFKA --> INV[Inventory Service]
KAFKA --> NOTIF[Notification Service]
PAY -->|PaymentCompleted| KAFKA2[Kafka - paymentsトピック]
KAFKA2 --> SHIP[Shipping Service]
INV -->|StockReserved| KAFKA3[Kafka - inventoryトピック]
KAFKA3 --> OS2[Order Service - ステータス更新]
SHIP -->|ShipmentCreated| KAFKA4[Kafka - shipmentsトピック]
KAFKA4 --> NOTIF2[Notification Service - 配送通知]
なぜKafkaか?
- 注文イベントのリプレイが必要(決済失敗時の再処理)
- 多数のサービスが同じイベントを独立して消費
- 監査(かんさ)ログとして活用可能
主要設定:
# 注文トピック設定
topics:
orders:
partitions: 12
replication-factor: 3
configs:
retention.ms: 604800000 # 7日
min.insync.replicas: 2
cleanup.policy: delete
payments:
partitions: 6
replication-factor: 3
configs:
retention.ms: 2592000000 # 30日(監査ログ)
min.insync.replicas: 2
8.2 リアルタイム分析パイプライン
graph LR
subgraph "データ収集"
WEB[Webクリックストリーム] --> KAFKA[Kafka]
APP[アプリイベント] --> KAFKA
IOT[IoTセンサー] --> NATS[NATS]
NATS -->|ブリッジ| KAFKA
end
subgraph "ストリーム処理"
KAFKA --> FLINK[Apache Flink]
KAFKA --> KSQL[ksqlDB]
end
subgraph "保存とサービング"
FLINK --> ES[Elasticsearch]
FLINK --> CH[ClickHouse]
KSQL --> REDIS[Redis Cache]
end
subgraph "可視化"
ES --> GRAFANA[Grafana]
CH --> SUPERSET[Apache Superset]
end
なぜKafka + NATS組み合わせか?
- Kafka:高スループットのイベント収集とFlink/ksqlDB連携
- NATS:IoTセンサーの超低レイテンシー要件を満たす軽量エージェント
- ブリッジ:NATSで収集したデータをKafkaに統合
8.3 IoTセンサーデータ収集
graph TB
subgraph "エッジレイヤー"
S1[温度センサー] --> LN1[NATS Leaf Node - 工場A]
S2[湿度センサー] --> LN1
S3[振動センサー] --> LN2[NATS Leaf Node - 工場B]
end
subgraph "コアレイヤー"
LN1 --> NATS[NATSクラスタ - JetStream]
LN2 --> NATS
NATS --> PROC[Stream Processor]
PROC -->|異常検知| ALERT[Alert Service - RabbitMQ]
PROC -->|集約データ| TS[TimescaleDB]
end
ALERT -->|メール| EMAIL[Email Service]
ALERT -->|Slack| SLACK[Slack通知]
なぜNATS + RabbitMQ組み合わせか?
- NATS:Leaf Nodeでエッジまで拡張、サブミリ秒レイテンシー
- JetStream:センサーデータの永続性とリプレイ
- RabbitMQ:アラートの柔軟なルーティング(メール、Slack、SMS分配)
8.4 Sagaパターン — 分散トランザクション
sequenceDiagram
participant OS as Order Service
participant K as Kafka
participant PS as Payment Service
participant IS as Inventory Service
participant SS as Shipping Service
OS->>K: OrderCreated
K->>PS: 決済リクエスト
PS->>K: PaymentCompleted
K->>IS: 在庫差引
IS->>K: StockReserved
K->>SS: 配送作成
SS->>K: ShipmentCreated
K->>OS: 注文完了
Note over PS,IS: 失敗時の補償トランザクション
PS-->>K: PaymentFailed(補償)
K-->>IS: 在庫復旧(補償)
K-->>OS: 注文キャンセル(補償)
Choreography vs Orchestration:
| 方式 | メリット | デメリット | 推奨シナリオ |
|---|---|---|---|
| Choreography | サービス間結合度が低い、拡張容易 | フロー把握が困難 | シンプルなワークフロー |
| Orchestration | 中央制御、フロー明確 | オーケストレーターが単一障害点 | 複雑なビジネスロジック |
9. 運用チェックリスト
共通モニタリングメトリクス
| メトリクス | 説明 | 警告閾値(しきいち) |
|---|---|---|
| Consumer Lag | 未消費メッセージ数 | 1万件以上の持続 |
| メッセージ処理失敗率 | DLQ流入比率 | 1%以上 |
| Brokerディスク使用率 | ストレージ容量 | 80%以上 |
| ネットワークI/O | 入出力トラフィック | 帯域幅の70%以上 |
| JVM GC時間 | Kafka/Pulsar/RabbitMQ | Full GC 5秒以上 |
Kafka本番チェックリスト
1. Broker設定
- min.insync.replicas = 2(replication-factor 3基準)
- unclean.leader.election.enable = false
- auto.create.topics.enable = false
2. Producer設定
- acks = all(データ損失防止)
- retries = Integer.MAX_VALUE
- enable.idempotence = true
- max.in.flight.requests.per.connection = 5
3. Consumer設定
- enable.auto.commit = false(手動コミット)
- max.poll.records = 500
- session.timeout.ms = 30000
4. モニタリング
- Prometheus + Grafana(JMX Exporter)
- Consumer Lagアラート設定
- Under-Replicated Partitionsアラート
RabbitMQ本番チェックリスト
1. クラスタ設定
- Quorum Queueを使用(Classic Mirrored Queueの代替)
- vm_memory_high_watermark = 0.4
- disk_free_limit = 2GB
2. 接続管理
- Connection Poolを使用
- Heartbeatタイムアウト:60秒
- チャネル再利用(チャネル作成コストが高い)
3. メッセージ設定
- Persistent delivery mode
- Publisher Confirmsを有効化
- Dead Letter Exchangeを設定
4. モニタリング
- Management Pluginダッシュボード
- rabbitmq_prometheusプラグイン
- Queue深度アラート設定
10. クイズ
Q1. Kafka 4.0の最大の変化
Kafka 4.0で除去されたコアコンポーネントは何で、それを代替する技術は?
正解:ZooKeeperが完全に除去され、**KRaft(Kafka Raft)**がこれを代替します。
- KRaftはRaft合意(ごうい)アルゴリズム基盤のメタデータ管理システムです
- メタデータ伝播速度が10倍向上し、パーティション数が100万個以上に拡張可能になりました
- ZooKeeperクラスタを別途管理する必要がなくなり、運用複雑度が大幅に削減されます
Q2. Pull vs Pushモデル
Pull基盤の消費モデル(Kafka、SQS)とPush基盤の消費モデル(RabbitMQ)の違いとそれぞれの長所・短所は?
Pull基盤(Kafka、SQS):
- Consumerが能動的にメッセージを取得
- 長所:Consumerが自身の処理速度に合わせて取得可能(自然なバックプレッシャー)
- 短所:Long Pollingが必要で、リアルタイム性がやや劣る場合がある
Push基盤(RabbitMQ):
- BrokerがConsumerにメッセージをプッシュ
- 長所:メッセージ到着(とうちゃく)時に即座に配信可能(低レイテンシー)
- 短所:Consumer過負荷時に別途のバックプレッシャーメカニズムが必要(prefetch countなど)
Q3. Exactly-Once保証
KafkaでExactly-Once Semantics(EOS)を達成するために必要な3つの設定は?
正解:
- enable.idempotence = true:Producerが同一メッセージを重複送信しても一度だけ保存
- transactional.id設定:Producerトランザクションを識別する一意のID
- isolation.level = read_committed:Consumerがコミットされたトランザクションメッセージのみ読み取り
この3つを組み合わせると、ProducerからConsumerまでend-to-end Exactly-Onceが保証されます。
Q4. SQS Standard vs FIFO
Amazon SQS StandardキューとFIFOキューのスループット、順序保証、重複処理の観点での違いは?
| 特性 | Standard | FIFO |
|---|---|---|
| スループット | ほぼ無制限 | 秒間300 TPS(バッチ時3,000) |
| 順序保証 | ベストエフォート(順序保証なし) | 厳密なFIFO順序 |
| 重複処理 | At-Least-Once(重複の可能性) | Exactly-Once(5分以内の重複排除) |
| 価格 | 0.40 USD / 100万件 | 0.50 USD / 100万件 |
| 使用例 | 高スループット、順序不問 | 決済、注文など順序重要 |
Q5. Pulsar vs Kafkaアーキテクチャ
PulsarのCompute/Storage分離アーキテクチャがKafka比で持つ運用上のメリット2つは?
正解:
-
独立したスケーリング:Broker(演算)とBookKeeper(ストレージ)を別々に拡張できます。KafkaではBrokerがデータを直接保存するため、ストレージ容量の拡張時にBroker全体を追加する必要があります。PulsarではBookKeeperノードのみ追加すれば済みます。
-
リバランシング不要の拡張:KafkaでBrokerを追加するとパーティションリバランシングが必要で、大量のデータ移動が発生します。Pulsarでは新しいBrokerが即座にトピックの所有権を受け取れるため、拡張時のサービス影響が最小限に抑えられます。
参考資料
- Apache Kafka 4.0 Release Notes — KRaft GA、Share Groups
- KIP-932: Queues for Kafka — Share Group詳細スペック
- RabbitMQ 4.0 Release Blog — AMQP 1.0、Khepri
- RabbitMQ Streams — Stream Pluginとフィルタリング
- Amazon SQS Developer Guide — Standard vs FIFO
- Apache Pulsar 4.1 Release — 19 PIPs
- NATS 2.11 Release Notes — Message TTL、分散トレーシング
- Confluent Benchmark: Kafka vs Pulsar — パフォーマンス比較
- Spring for Apache Kafka — Spring Kafka公式ドキュメント
- Spring AMQP — Spring RabbitMQ連携
- Spring Cloud AWS SQS — SQS連携
- Spring for Apache Pulsar — Spring Pulsar公式ドキュメント
- Designing Data-Intensive Applications(Martin Kleppmann) — メッセージシステム理論
- Enterprise Integration Patterns — メッセージングパターンバイブル
- Confluent Cloud Pricing — クラウドコスト参考
- AWS MSK Pricing — MSKコスト参考