- Authors

- Name
- Youngju Kim
- @fjvbn20031
- はじめに:2026年(ねん)にリアクティブストリーミングアーキテクチャが重要(じゅうよう)な理由(りゆう)
- 1. リアクティブプログラミングの基礎
- 2. Spring WebFlux 徹底解説(てっていかいせつ)
- 3. Apache Kafka 完全ガイド
- 4. ZooKeeperからKRaftへ:大移行(だいいこう)
- 5. Streaming API比較:SSE vs WebSocket vs gRPC Streaming
- 6. WebFlux + Kafka統合
- 7. 実践プロジェクト:リアルタイム株価ティッカーシステム
- 8. 意思決定(いしけってい)マトリクス:いつ何(なに)を使うか
- 9. 6ヶ月(かげつ)学習ロードマップ
- 10. クイズ
- 参考文献(さんこうぶんけん)
はじめに:2026年(ねん)にリアクティブストリーミングアーキテクチャが重要(じゅうよう)な理由(りゆう)
現代(げんだい)のアプリケーションは、毎秒(まいびょう)数百万(すうひゃくまん)のイベントを処理(しょり)し、ユーザーにリアルタイム更新(こうしん)を配信(はいしん)し、予測(よそく)不能(ふのう)な負荷(ふか)の下(もと)で弾力的(だんりょくてき)にスケールすることが求(もと)められています。ブロッキングI/Oを使(つか)う従来(じゅうらい)のリクエスト・レスポンスアーキテクチャでは、これらの要求(ようきゅう)に応(こた)えることができません。2026年において、Spring WebFluxによるノンブロッキングリクエスト処理、Apache Kafkaによる耐久性(たいきゅうせい)のあるイベントストリーミング、そしてZooKeeperからKRaftへのクラスタ管理(かんり)の移行(いこう)の組(く)み合(あ)わせは、リアクティブストリーミングシステムのゴールドスタンダードです。
本(ほん)ガイドでは、リアクティブプログラミングの基礎(きそ)から本番(ほんばん)グレードのアーキテクチャパターンまで、フルスタックをカバーします。リアルタイム分析(ぶんせき)ダッシュボード、金融(きんゆう)取引(とりひき)プラットフォーム、IoTデータパイプラインのいずれを構築(こうちく)する場合(ばあい)でも、実用的(じつようてき)な知識(ちしき)が得(え)られます。
学(まな)べること
- リアクティブプログラミングの原則とReactive Streams仕様
- Spring WebFluxの内部構造:Nettyイベントループ、Mono/Flux、オペレータチェーン、R2DBC
- Apache Kafkaアーキテクチャ:パーティション、コンシューマグループ、Exactly-Onceセマンティクス、Kafka Streams、Connect
- ZooKeeperからKRaftへの移行:なぜ、いつ、どのように
- Streaming API比較(ひかく):SSE vs WebSocket vs gRPC Streaming
- 統合(とうごう)パターン:WebFlux + Reactor Kafka + CQRS + Event Sourcing
- 完全(かんぜん)な実践(じっせん)プロジェクト:リアルタイム株価(かぶか)ティッカーシステム
1. リアクティブプログラミングの基礎
1.1 リアクティブマニフェスト
リアクティブマニフェストは、リアクティブシステムが備(そな)えるべき4つの主要(しゅよう)な特性(とくせい)を定義(ていぎ)しています:
| 特性 | 説明(せつめい) | 実装例(じっそうれい) |
|---|---|---|
| 即応性(そくおうせい) | システムがタイムリーに応答する | WebFluxノンブロッキングI/O |
| 耐障害性(たいしょうがいせい) | 障害発生時もシステムが応答し続ける | サーキットブレーカー、バルクヘッドパターン |
| 弾力性(だんりょくせい) | 変動する負荷の下でもシステムが応答し続ける | Kafkaパーティションスケーリング |
| メッセージ駆動(くどう) | 非同期メッセージパッシングに依存する | Kafkaトピック、Reactorイベントストリーム |
これら4つの特性は独立(どくりつ)していません。メッセージ駆動アーキテクチャが弾力性と耐障害性を実現(じつげん)し、それらが合(あ)わさって即応性を実現します。
1.2 Reactive Streams仕様(しよう)
Reactive Streamsは、ノンブロッキングバックプレッシャーを備えた非同期(ひどうき)ストリーム処理の標準(ひょうじゅん)です。4つのコアインターフェースを定義しています:
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
public interface Subscription {
void request(long n);
void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
重要な革新(かくしん)はSubscriptionのrequest(long n)メソッドです。これによりバックプレッシャーが可能(かのう)になります:サブスクライバがデータフローの速度(そくど)を制御(せいぎょ)し、プロデューサがコンシューマを圧倒(あっとう)するのを防(ふせ)ぎます。
1.3 バックプレッシャーの理解(りかい)
バックプレッシャーとは、データコンシューマがプロデューサに対(たい)して処理可能なデータ量(りょう)を通知(つうち)するメカニズムです。バックプレッシャーがなければ、高速(こうそく)なプロデューサが低速(ていそく)なコンシューマを圧倒し、メモリ枯渇(こかつ)やメッセージのドロップにつながります。
プロデューサ (1000 msg/s) --> バッファ (オーバーフロー!) --> コンシューマ (100 msg/s)
バックプレッシャーあり:
プロデューサ (100 msg/sに調整) <-- request(100) -- コンシューマ (100 msg/s)
Project Reactorのバックプレッシャー戦略(せんりゃく):
Flux.range(1, Integer.MAX_VALUE)
.onBackpressureBuffer(256) // 最大256アイテムをバッファ
.onBackpressureDrop(dropped -> // バッファフル時にドロップ
log.warn("Dropped: {}", dropped))
.onBackpressureLatest() // 最新のアイテムのみ保持
.onBackpressureError() // 圧倒時にエラーシグナル
.subscribe(item -> processSlowly(item));
1.4 リアクティブライブラリの全体像(ぜんたいぞう)
| ライブラリ | 言語/プラットフォーム | 主な特徴 | 採用状況(さいようじょうきょう) |
|---|---|---|---|
| Project Reactor | Java | Mono/Flux、Spring統合 | Springエコシステム |
| RxJava 3 | Java | Observable/Flowable、Android対応 | Android、レガシーJava |
| Mutiny | Java | Uni/Multi、Quarkusネイティブ | Quarkusエコシステム |
| Kotlin Flow | Kotlin | コルーチンベース、構造化並行性 | Kotlinファーストプロジェクト |
| Akka Streams | Scala/Java | アクターベース、グラフDSL | 高スループットシステム |
2. Spring WebFlux 徹底解説(てっていかいせつ)
2.1 WebFlux vs Spring MVC アーキテクチャ
Spring MVCはスレッドプールからリクエストごとに1つのスレッドを使用(しよう)するthread-per-requestモデルです。レスポンスが完全(かんぜん)に書(か)き込(こ)まれるまで、各(かく)受信リクエストが1つのスレッドを占有(せんゆう)します。I/O集約的(しゅうやくてき)な高並行性(こうへいこうせい)ワークロードでは、スレッドはほとんどの時間(じかん)をデータベースクエリ、HTTPコール、ファイル読(よ)み取(と)りの待機(たいき)に費(つい)やします。
Spring WebFluxはNettyを使用したイベントループモデルです。少数(しょうすう)のイベントループスレッドがすべてのリクエストを処理します。I/Oが必要(ひつよう)な場合、リクエストはパーク(スレッドをブロックせず)され、イベントループは他(ほか)のリクエストの処理に移(うつ)ります。
Spring MVC (Thread-per-request):
Thread-1: [リクエスト] --> [DB待機......] --> [レスポンス]
Thread-2: [リクエスト] --> [API待機.........] --> [レスポンス]
Thread-3: [リクエスト] --> [ファイル待機....] --> [レスポンス]
(200スレッド = 200同時リクエスト)
Spring WebFlux (イベントループ):
EventLoop-1: [Req1][Req2][Req3][Req1-再開][Req4][Req2-再開]...
EventLoop-2: [Req5][Req6][Req5-再開][Req7][Req8]...
(4スレッド = 数千の同時リクエスト)
2.2 Nettyイベントループモデル
NettyはWebFluxのデフォルト組(く)み込(こ)みサーバーです。そのアーキテクチャを理解することはパフォーマンスチューニングに不可欠(ふかけつ)です。
┌─────────────────────────┐
│ Boss EventLoopGroup │
│ (接続を受け付ける) │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Worker EventLoopGroup │
│ (I/Oイベントを処理) │
├─────────────────────────┤
│ EventLoop-1 Channel-A │
│ EventLoop-2 Channel-B │
│ EventLoop-3 Channel-C │
│ EventLoop-4 Channel-D │
└─────────────────────────┘
Nettyイベントループを使(つか)う際(さい)の重要なルール:
- イベントループスレッドを絶対(ぜったい)にブロックしない —
Thread.sleep()、ブロッキングI/O、synchronizedロック禁止(きんし) - CPU集約的な作業(さぎょう)はバウンデッドエラスティックスケジューラにオフロードする
- チャネルアフィニティ — 各チャネルはそのライフタイム中、1つのイベントループにバインドされる
2.3 MonoとFlux:コア型(がた)
Monoは0または1要素(ようそ)を表現(ひょうげん)します。Fluxは0からN要素を表現します。
// Mono: 単一値またはempty
Mono<User> findUser = userRepository.findById(userId);
Mono<Void> saveResult = userRepository.save(user);
// Flux: 値のストリーム
Flux<Order> orders = orderRepository.findByUserId(userId);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<ServerSentEvent<String>> sseStream = Flux.create(sink -> {
// sinkにイベントをプッシュ
});
MonoとFluxの生成(せいせい):
// 静的ファクトリメソッド
Mono.just("hello");
Mono.empty();
Mono.error(new RuntimeException("failure"));
Mono.fromCallable(() -> expensiveComputation());
Mono.defer(() -> Mono.just(dynamicValue()));
Flux.just("a", "b", "c");
Flux.fromIterable(myList);
Flux.range(1, 100);
Flux.interval(Duration.ofMillis(500));
Flux.merge(flux1, flux2);
Flux.concat(flux1, flux2);
Flux.zip(flux1, flux2, (a, b) -> a + b);
2.4 必須(ひっす)オペレータ
// 変換
flux.map(String::toUpperCase)
.flatMap(name -> userService.findByName(name)) // 非同期 1:N
.concatMap(user -> enrichUser(user)) // 順次非同期
.switchMap(query -> search(query)) // 前の処理をキャンセル
.flatMapSequential(id -> fetchById(id)) // 順序保証付き並列
// フィルタリング
flux.filter(user -> user.isActive())
.distinct()
.take(10)
.skip(5)
.takeUntil(event -> event.isTerminal())
// エラーハンドリング
flux.onErrorReturn("default")
.onErrorResume(ex -> fallbackFlux())
.retry(3)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(30))
.filter(ex -> ex instanceof TransientException))
.timeout(Duration.ofSeconds(5))
// 結合
Flux.merge(fastSource, slowSource) // インターリーブ
Flux.concat(firstBatch, secondBatch) // 順次
Flux.zip(names, ages, Person::new) // ペアリング
Mono.zip(userMono, profileMono, ordersMono) // 並列ジョイン
.map(tuple -> buildResponse(tuple.getT1(), tuple.getT2(), tuple.getT3()))
// 副作用(ログ/デバッグ専用)
flux.doOnNext(item -> log.debug("Processing: {}", item))
.doOnError(ex -> log.error("Error: {}", ex.getMessage()))
.doOnComplete(() -> log.info("Stream completed"))
.doOnSubscribe(sub -> log.info("Subscribed"))
2.5 WebFluxコントローラと関数型(かんすうがた)エンドポイント
アノテーションベース(お馴染(なじ)みのMVCスタイル):
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.streamAllUsers();
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@Valid @RequestBody Mono<User> userMono) {
return userMono.flatMap(userService::create);
}
}
関数型エンドポイント(ルーター + ハンドラー):
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.path("/api/users", builder -> builder
.GET("/{id}", handler::getUser)
.GET("", handler::listUsers)
.POST("", handler::createUser)
.PUT("/{id}", handler::updateUser)
.DELETE("/{id}", handler::deleteUser))
.build();
}
}
@Component
public class UserHandler {
private final UserService userService;
public Mono<ServerResponse> getUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> listUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userService.findAll(), User.class);
}
}
2.6 R2DBC:リアクティブデータベースアクセス
R2DBC(Reactive Relational Database Connectivity)は、WebFluxアプリケーションにノンブロッキングデータベースアクセスを提供(ていきょう)します。
@Configuration
@EnableR2dbcRepositories
public class DatabaseConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, "localhost")
.option(PORT, 5432)
.option(DATABASE, "mydb")
.option(USER, "user")
.option(PASSWORD, "password")
.option(MAX_SIZE, 20)
.build());
}
}
public interface UserRepository extends ReactiveCrudRepository<User, String> {
Flux<User> findByStatusOrderByCreatedAtDesc(String status);
@Query("SELECT * FROM users WHERE email = :email")
Mono<User> findByEmail(String email);
@Query("SELECT * FROM users WHERE department = :dept")
Flux<User> findByDepartment(String dept);
}
DatabaseClientを使った複雑(ふくざつ)なクエリ:
@Repository
public class CustomUserRepository {
private final DatabaseClient databaseClient;
public Flux<User> searchUsers(String keyword, int limit) {
return databaseClient.sql(
"SELECT * FROM users WHERE name ILIKE :keyword LIMIT :limit")
.bind("keyword", "%" + keyword + "%")
.bind("limit", limit)
.map(row -> new User(
row.get("id", String.class),
row.get("name", String.class),
row.get("email", String.class)
))
.all();
}
}
2.7 WebFluxパフォーマンスベンチマーク
ベンチマーク条件(じょうけん):4コアCPU、8GB RAM、10,000同時接続(どうじせつぞく)、I/Oバウンドワークロード(リクエストあたり50msのデータベースレイテンシ)。
| メトリクス | Spring MVC (Tomcat) | Spring WebFlux (Netty) | 改善率(かいぜんりつ) |
|---|---|---|---|
| スループット (req/s) | 2,400 | 18,500 | 7.7倍 |
| 平均レイテンシ (ms) | 4,200 | 540 | 87%低減 |
| P99レイテンシ (ms) | 12,000 | 1,200 | 90%低減 |
| メモリ使用量 (MB) | 2,100 | 480 | 77%削減 |
| アクティブスレッド数 | 200 | 8 | 96%削減 |
| 最大同時リクエスト | 200 | 50,000+ | 250倍 |
WebFluxを使うべきでないケース:
- CPU集約的ワークロード(圧縮(あっしゅく)、暗号化(あんごうか)、重い計算(けいさん))
- MVCで既(すで)に十分(じゅうぶん)なパフォーマンスを発揮(はっき)しているアプリケーション
- リアクティブプログラミング経験(けいけん)のないチーム
- ブロッキングライブラリが避(さ)けられない場合(JDBCでR2DBCラッパーなし)
- 低並行性のシンプルなCRUDアプリケーション
3. Apache Kafka 完全ガイド
3.1 Kafkaアーキテクチャ概要(がいよう)
Apache Kafkaは、1日(いちにち)に数兆(すうちょう)のイベントを処理できる分散(ぶんさん)イベントストリーミングプラットフォームです。そのアーキテクチャはいくつかの主要な概念に基(もと)づいています:
┌─────────────────────────────────────────────────────────────┐
│ Kafkaクラスタ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic-A │ │ Topic-A │ │ Topic-A │ │
│ │ Part-0 │ │ Part-1 │ │ Part-2 │ │
│ │ (leader) │ │ (leader) │ │ (leader) │ │
│ │ │ │ │ │ │ │
│ │ Topic-A │ │ Topic-A │ │ Topic-A │ │
│ │ Part-1 │ │ Part-2 │ │ Part-0 │ │
│ │ (replica)│ │ (replica)│ │ (replica)│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ メタデータ: ZooKeeper (レガシー) or KRaft (3.5+) │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
コア概念:
| 概念 | 説明 |
|---|---|
| トピック | メッセージの名前付きフィード。データベーステーブルに類似 |
| パーティション | トピック内(ない)の順序付き不変レコードシーケンス |
| オフセット | パーティション内の各レコードのシーケンシャルID |
| ブローカー | データを保存(ほぞん)しクライアントにサービスを提供するKafkaサーバー |
| プロデューサ | トピックにレコードを公開するクライアント |
| コンシューマ | トピックからレコードを読み取るクライアント |
| コンシューマグループ | トピックから協調(きょうちょう)して消費するコンシューマの集合 |
| レプリケーション | 各パーティションは耐久性のため複数ブローカーにレプリケートされる |
| ISR | In-Sync Replicas:リーダーに追(お)いついたレプリカ |
3.2 プロデューサ設定(せってい)とベストプラクティス
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 信頼性設定
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 全ISRレプリカを待機
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// パフォーマンスチューニング
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KBバッチ
config.put(ProducerConfig.LINGER_MS_CONFIG, 20); // バッチング待機20ms
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MBバッファ
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
さまざまなパターンでのメッセージ送信(そうしん):
@Service
@RequiredArgsConstructor
public class OrderEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
// Fire-and-forget(最高スループット)
public void sendFireAndForget(OrderEvent event) {
kafkaTemplate.send("orders", event.getOrderId(), event);
}
// 同期送信(最高信頼性)
public void sendSync(OrderEvent event) throws Exception {
kafkaTemplate.send("orders", event.getOrderId(), event)
.get(10, TimeUnit.SECONDS);
}
// コールバック付き非同期送信(バランス型)
public CompletableFuture<SendResult<String, Object>> sendAsync(OrderEvent event) {
return kafkaTemplate.send("orders", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to send order event: {}", event.getOrderId(), ex);
} else {
log.info("Order event sent to partition {} offset {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
// ヘッダー付き送信
public void sendWithHeaders(OrderEvent event) {
ProducerRecord<String, Object> record = new ProducerRecord<>("orders",
null, null, event.getOrderId(), event);
record.headers()
.add("event-type", "ORDER_CREATED".getBytes())
.add("source", "order-service".getBytes())
.add("correlation-id", UUID.randomUUID().toString().getBytes());
kafkaTemplate.send(record);
}
}
3.3 コンシューマ設定とパターン
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// オフセット管理
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手動コミット
// パフォーマンスチューニング
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(1000L, 3)
));
return factory;
}
}
コンシューマリスナーパターン:
@Component
@RequiredArgsConstructor
public class OrderEventConsumer {
private final OrderService orderService;
// 単一レコード処理
@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void processOrder(ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
log.info("Processing order: {} from partition: {} offset: {}",
record.value().getOrderId(),
record.partition(),
record.offset());
orderService.process(record.value());
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process order: {}", record.value().getOrderId(), e);
// アクノリッジしない — エラーハンドラーによりリトライされる
}
}
// バッチ処理
@KafkaListener(topics = "analytics-events", groupId = "analytics-batch-group",
containerFactory = "batchKafkaListenerContainerFactory")
public void processBatch(List<ConsumerRecord<String, AnalyticsEvent>> records,
Acknowledgment ack) {
log.info("Processing batch of {} records", records.size());
try {
List<AnalyticsEvent> events = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
analyticsService.bulkInsert(events);
ack.acknowledge();
} catch (Exception e) {
log.error("Batch processing failed", e);
}
}
}
3.4 Exactly-Onceセマンティクス (EOS)
Kafkaは、冪等(べきとう)プロデューサとトランザクションAPIの組み合わせにより、exactly-onceセマンティクスをサポートします。
@Configuration
public class KafkaTransactionalConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
}
@Service
@RequiredArgsConstructor
public class TransactionalOrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional("kafkaTransactionManager")
public void processOrderTransactionally(OrderEvent order) {
// このメソッド内の全メッセージはアトミックに送信される
kafkaTemplate.send("orders-processed", order.getOrderId(), order);
kafkaTemplate.send("inventory-updates", order.getProductId(),
new InventoryEvent(order.getProductId(), -order.getQuantity()));
kafkaTemplate.send("payment-requests", order.getOrderId(),
new PaymentEvent(order.getOrderId(), order.getTotalAmount()));
// いずれかの送信が失敗すると、全てロールバックされる
}
}
3つの配信(はいしん)保証(ほしょう)レベル:
| 保証 | 設定 | ユースケース |
|---|---|---|
| At-most-once | acks=0、自動コミット | メトリクス、ロギング |
| At-least-once | acks=all、手動コミット、冪等コンシューマ | ほとんどのアプリケーション |
| Exactly-once | トランザクションAPI + read-committed分離 | 金融、課金、在庫管理 |
3.5 Kafka Streams
Kafka Streamsは、入出力データがKafkaに格納(かくのう)されるアプリケーションとマイクロサービスを構築するためのクライアントライブラリです。
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
config.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
return new KafkaStreamsConfiguration(config);
}
}
@Component
public class OrderAnalyticsTopology {
@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {
// リアルタイム注文分析パイプライン
KStream<String, OrderEvent> orders = streamsBuilder
.stream("orders", Consumed.with(Serdes.String(), orderEventSerde()));
// 1. タンブリングウィンドウでの商品別注文数
KTable<Windowed<String>, Long> productCounts = orders
.groupBy((key, order) -> order.getProductId(),
Grouped.with(Serdes.String(), orderEventSerde()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("product-counts-store"));
// 2. カテゴリ別収益集約
KTable<String, Double> revenueByCategory = orders
.groupBy((key, order) -> order.getCategory(),
Grouped.with(Serdes.String(), orderEventSerde()))
.aggregate(
() -> 0.0,
(key, order, total) -> total + order.getTotalAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// 3. 顧客データとの結合によるエンリッチメント
KTable<String, CustomerProfile> customers = streamsBuilder
.table("customer-profiles",
Consumed.with(Serdes.String(), customerProfileSerde()));
KStream<String, EnrichedOrder> enrichedOrders = orders
.selectKey((key, order) -> order.getCustomerId())
.join(customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde()));
enrichedOrders.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde()));
// 4. セッションウィンドウによる不正検知
orders
.filter((key, order) -> order.getTotalAmount() > 1000)
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
.count()
.toStream()
.filter((windowedKey, count) -> count >= 3)
.mapValues((key, count) -> "ALERT: セッション内に" + count + "件の高額注文")
.to("fraud-alerts");
}
}
3.6 Kafka Connect
Kafka Connectは、Kafkaをデータベース、キーバリューストア、検索(けんさく)インデックス、ファイルシステムなどの外部(がいぶ)システムと接続するフレームワークです。
{
"name": "postgresql-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "password",
"database.dbname": "orders_db",
"database.server.name": "order-server",
"table.include.list": "public.orders,public.order_items",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"topic.prefix": "cdc",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "cdc-$3"
}
}
Elasticsearchシンクコネクタ:
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "enriched-orders",
"type.name": "_doc",
"key.ignore": false,
"schema.ignore": true,
"behavior.on.null.values": "delete",
"write.method": "upsert",
"batch.size": 200,
"max.buffered.records": 5000,
"flush.timeout.ms": 10000
}
}
3.7 Schema Registry
Schema Registryは、メッセージスキーマの集中(しゅうちゅう)リポジトリを提供し、スキーマの進化(しんか)と互換性(ごかんせい)チェックを可能にします。
// Avroスキーマ付きプロデューサ
@Configuration
public class AvroProducerConfig {
@Bean
public ProducerFactory<String, GenericRecord> avroProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
config.put("schema.registry.url", "http://schema-registry:8081");
config.put("auto.register.schemas", true);
return new DefaultKafkaProducerFactory<>(config);
}
}
スキーマ進化の互換性モード:
| モード | 許可(きょか)される変更 | 最適な用途 |
|---|---|---|
| BACKWARD | フィールド削除、オプショナルフィールド追加 | コンシューマ先行アップグレード |
| FORWARD | フィールド追加、オプショナルフィールド削除 | プロデューサ先行アップグレード |
| FULL | オプショナルフィールドの追加/削除のみ | 独立アップグレード |
| NONE | あらゆる変更が許可 | 開発環境のみ |
3.8 Kafkaパフォーマンスチューニング早見表(はやみひょう)
| パラメータ | デフォルト | 推奨値(すいしょうち) | 影響(えいきょう) |
|---|---|---|---|
| num.partitions | 1 | トピックあたり6-12 | 並列性 |
| replication.factor | 1 | 3 | 耐久性 |
| min.insync.replicas | 1 | 2 | 書き込み耐久性 |
| batch.size (プロデューサ) | 16384 | 32768-65536 | スループットvsレイテンシ |
| linger.ms (プロデューサ) | 0 | 5-100 | バッチング効率 |
| compression.type | none | zstd or lz4 | ネットワーク/ディスク節約 |
| fetch.min.bytes (コンシューマ) | 1 | 1024-16384 | スループットvsレイテンシ |
| max.poll.records (コンシューマ) | 500 | 100-1000 | 処理バッチサイズ |
| segment.bytes | 1073741824 | 536870912 | ログコンパクション頻度 |
| retention.ms | 604800000 | ユースケースに依存 | ストレージvsリプレイ能力 |
4. ZooKeeperからKRaftへ:大移行(だいいこう)
4.1 KafkaにおけるZooKeeperの役割(やくわり)
ZooKeeperは当初(とうしょ)からKafkaのメタデータ管理システムでした。以下を担当(たんとう)します:
- ブローカー登録(とうろく):どのブローカーが生存しているかの追跡
- トピックメタデータ:パーティション割り当て、ISRリスト、設定
- コントローラ選出(せんしゅつ):どのブローカーがクラスタコントローラとして機能するかの選択
- コンシューマグループ調整(レガシー):コンシューマオフセットの保存(0.9+でKafka自体に移行済み)
- ACL:認可のためのアクセス制御リスト
4.2 ZooKeeperを廃止(はいし)すべき理由
| 問題(もんだい) | 説明 |
|---|---|
| 運用の複雑さ | 2つの異なる分散システムのデプロイ、監視、保守が必要 |
| スケーリングのボトルネック | ZooKeeperは全メタデータをメモリに格納;約20万パーティションが上限 |
| スプリットブレインリスク | ネットワーク分断でZooKeeperとKafkaの不一致が発生しうる |
| 復旧時間 | コントローラフェイルオーバーにはZooKeeperからの全メタデータ読み取りが必要(数分) |
| 専門知識の要件 | チームはKafkaとZooKeeperの両方に深い知識が必要 |
4.3 KRaft:Kafka内蔵(ないぞう)のコンセンサス
KRaft(Kafka Raft)は、ZooKeeperを内蔵のRaftベースコンセンサスプロトコルで置(お)き換(か)えます。メタデータは内部Kafkaトピック(__cluster_metadata)に格納され、コントローラノードのセットがクラスタ状態(じょうたい)を管理します。
ZooKeeperモード: KRaftモード:
┌────────┐ ┌────────────┐ ┌──────────────────────┐
│ Broker │───▶│ ZooKeeper │ │ Controller (Raft) │
│ Broker │───▶│ Ensemble │ │ Node 1 (leader) │
│ Broker │───▶│ (3-5ノード)│ │ Node 2 (follower) │
└────────┘ └────────────┘ │ Node 3 (follower) │
└──────────┬───────────┘
│
┌──────────▼───────────┐
│ Brokers │
│ Broker 1 │
│ Broker 2 │
│ Broker 3 │
└──────────────────────┘
KRaftの利点(りてん):
| 機能 | ZooKeeperモード | KRaftモード |
|---|---|---|
| 最大パーティション | 約200,000 | 数百万 |
| コントローラフェイルオーバー | 数分 | 数秒 |
| メタデータ伝播 | 非同期(遅延あり) | イベント駆動(即座) |
| デプロイ | 2つのシステム | 1つのシステム |
| 運用オーバーヘッド | 高い | 低い |
| シャットダウン時間 | 数分 | 数秒 |
4.4 KIP-500:ロードマップ
KIP-500(「ZooKeeperを自己管理メタデータクォーラムで置き換え」)は、複数年(ふくすうねん)にわたる移行計画(けいかく)を定義しました:
| Kafkaバージョン | マイルストーン | 年 |
|---|---|---|
| 2.8 | KRaftアーリーアクセス(開発専用) | 2021 |
| 3.3 | KRaftが本番環境対応に | 2022 |
| 3.5 | ZooKeeper移行ツールが利用可能に | 2023 |
| 3.7 | ブリッジリリース(デュアルサポート) | 2024 |
| 4.0 | ZooKeeperサポートを完全に削除 | 2025 |
4.5 移行ガイド:ZooKeeperからKRaftへ
ステップ1:移行前チェックリスト
# Kafkaバージョンの確認(3.5+が必要)
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | head -1
# 現在のZooKeeperメタデータの確認
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--cluster-id $(kafka-storage.sh random-uuid)
# 非推奨機能が使用されていないことを確認
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers
ステップ2:KRaftコントローラの設定
# kraft/server.properties(コントローラノード用)
process.roles=controller
node.id=1
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093
log.dirs=/var/kraft-controller-logs
ステップ3:移行の実行(じっこう)
# コントローラストレージのフォーマット
kafka-storage.sh format \
--config kraft/server.properties \
--cluster-id $(kafka-storage.sh random-uuid) \
--release-version 3.7
# 移行開始(ZooKeeperモード -> デュアルモード)
kafka-metadata.sh --migrate \
--zookeeper-connect zk1:2181,zk2:2181,zk3:2181 \
--bootstrap-server kafka1:9092
# 移行状態の確認
kafka-metadata.sh --status \
--bootstrap-server kafka1:9092
# ファイナライズ(復帰不可ポイント)
kafka-metadata.sh --finalize \
--bootstrap-server kafka1:9092
ステップ4:移行後の検証(けんしょう)
# 全トピックがアクセス可能か確認
kafka-topics.sh --bootstrap-server localhost:9092 --list
# コンシューマグループの確認
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# コントローラ状態の確認
kafka-metadata.sh --controllers \
--bootstrap-server localhost:9092
4.6 ZAB vs Raft:プロトコル比較
| 側面 | ZAB (ZooKeeper) | Raft (KRaft) |
|---|---|---|
| リーダー選出 | エポックベース | タームベース |
| ログレプリケーション | 2フェーズコミット | アペンドエントリ |
| 一貫性 | 逐次一貫性 | 線形化可能読み取り(オプション) |
| メンバーシップ | 静的設定 | 動的再設定 |
| スナップショット | ファジースナップショット | 一貫性スナップショット |
| 複雑性 | 高い | 中程度(設計上) |
| 実装 | 別システム | Kafkaに統合 |
5. Streaming API比較:SSE vs WebSocket vs gRPC Streaming
5.1 Server-Sent Events (SSE)
SSEは、サーバーからクライアントへのプッシュのためのシンプルなHTTPベースのプロトコルです。サーバーはtext/event-streamコンテンツタイプを使用して、長寿命(ちょうじゅみょう)HTTP接続でイベントを送信します。
WebFlux SSEエンドポイント:
@RestController
@RequestMapping("/api/stream")
public class SSEController {
private final StockPriceService stockPriceService;
@GetMapping(value = "/prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamPrices(
@RequestParam List<String> symbols) {
return stockPriceService.getPriceStream(symbols)
.map(price -> ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(price.getTimestamp()))
.event("price-update")
.data(price)
.retry(Duration.ofSeconds(5))
.build());
}
@GetMapping(value = "/heartbeat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> heartbeat() {
return Flux.interval(Duration.ofSeconds(30))
.map(seq -> ServerSentEvent.<String>builder()
.event("heartbeat")
.data("ping")
.build());
}
}
JavaScriptクライアント:
const eventSource = new EventSource('/api/stream/prices?symbols=AAPL,GOOG,MSFT')
eventSource.addEventListener('price-update', (event) => {
const price = JSON.parse(event.data)
updateUI(price)
})
eventSource.addEventListener('heartbeat', () => {
console.log('Connection alive')
})
eventSource.onerror = (error) => {
console.error('SSE error:', error)
// ブラウザが自動的に再接続
}
5.2 WebSocket
WebSocketは、単一(たんいつ)のTCP接続上で全二重通信を提供します。クライアントとサーバーの両方がいつでもメッセージを送信できます。
WebFlux WebSocketハンドラー:
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketMapping(StockWebSocketHandler handler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/stocks", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Component
@RequiredArgsConstructor
public class StockWebSocketHandler implements WebSocketHandler {
private final StockPriceService stockPriceService;
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(WebSocketSession session) {
// 受信メッセージ(サブスクリプション)
Flux<String> incoming = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> log.info("Received subscription: {}", msg));
// 送信価格更新
Flux<WebSocketMessage> outgoing = incoming
.flatMap(msg -> {
SubscriptionRequest req = parseRequest(msg);
return stockPriceService.getPriceStream(req.getSymbols());
})
.map(price -> {
try {
String json = objectMapper.writeValueAsString(price);
return session.textMessage(json);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return session.send(outgoing);
}
}
JavaScriptクライアント:
const ws = new WebSocket('ws://localhost:8080/ws/stocks')
ws.onopen = () => {
ws.send(
JSON.stringify({
action: 'subscribe',
symbols: ['AAPL', 'GOOG', 'MSFT'],
})
)
}
ws.onmessage = (event) => {
const price = JSON.parse(event.data)
updateUI(price)
}
ws.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason)
// 手動で再接続が必要
setTimeout(reconnect, 3000)
}
5.3 gRPC Streaming
gRPCは4つの通信パターンをサポートします:ユナリ、サーバーストリーミング、クライアントストリーミング、双方向(そうほうこう)ストリーミング。
Proto定義:
syntax = "proto3";
package stocks;
service StockService {
// ユナリ
rpc GetPrice (PriceRequest) returns (StockPrice);
// サーバーストリーミング
rpc StreamPrices (PriceStreamRequest) returns (stream StockPrice);
// クライアントストリーミング
rpc RecordTrades (stream TradeRecord) returns (TradeSummary);
// 双方向ストリーミング
rpc LiveTrading (stream TradeOrder) returns (stream TradeConfirmation);
}
message PriceStreamRequest {
repeated string symbols = 1;
int32 interval_ms = 2;
}
message StockPrice {
string symbol = 1;
double price = 2;
double change = 3;
int64 timestamp = 4;
int64 volume = 5;
}
サーバー側実装(Spring Boot + grpc-spring-boot-starter):
@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {
private final StockPriceService stockPriceService;
@Override
public void streamPrices(PriceStreamRequest request,
StreamObserver<StockPrice> responseObserver) {
stockPriceService.getPriceStream(request.getSymbolsList())
.subscribe(
price -> responseObserver.onNext(toProto(price)),
error -> responseObserver.onError(
Status.INTERNAL.withDescription(error.getMessage()).asException()),
responseObserver::onCompleted
);
}
@Override
public StreamObserver<TradeOrder> liveTrading(
StreamObserver<TradeConfirmation> responseObserver) {
return new StreamObserver<TradeOrder>() {
@Override
public void onNext(TradeOrder order) {
TradeConfirmation confirmation = executeTrade(order);
responseObserver.onNext(confirmation);
}
@Override
public void onError(Throwable t) {
log.error("Client error in live trading", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
5.4 比較表
| 機能 | SSE | WebSocket | gRPC Streaming |
|---|---|---|---|
| プロトコル | HTTP/1.1 | WS (TCP) | HTTP/2 |
| 方向 | サーバーからクライアント | 双方向 | 4パターン全て |
| データフォーマット | テキスト (UTF-8) | テキストまたはバイナリ | Protobuf(バイナリ) |
| 自動再接続 | ブラウザ内蔵 | 手動 | 手動 |
| バックプレッシャー | 限定的 | なし(ネイティブ) | 内蔵(フロー制御) |
| ブラウザサポート | 全モダンブラウザ | 全モダンブラウザ | grpc-web(限定的) |
| ファイアウォール対応 | 良好(標準HTTP) | ブロックされる場合あり | HTTP/2が必要 |
| 多重化 | なし | なし | あり(HTTP/2) |
| 最大接続数 | ドメインあたり6 (HTTP/1) | 制限なし | 多重化 |
| 圧縮 | 標準HTTP | メッセージ単位(任意) | 内蔵 |
| レイテンシ | 低い | 最低 | 低い |
| スループット | 中程度 | 高い | 最高 |
| シリアライゼーション | JSON(テキスト) | JSONまたは独自バイナリ | Protobuf(高効率) |
| ユースケース | 通知、フィード | チャット、ゲーム、コラボ | マイクロサービス、IoT |
5.5 選択(せんたく)の指針(ししん)
- SSE:ダッシュボード更新、ニュースフィード、株価ティッカー(読み取り専用)、通知ストリーム。実装がシンプル、プロキシを通過、自動再接続。
- WebSocket:チャットアプリケーション、共同編集、マルチプレイヤーゲーム。低レイテンシの双方向通信が必要。
- gRPC Streaming:マイクロサービス間通信、IoTデータ取り込み、高スループット内部API。スキーマ強制、効率的なシリアライゼーション、HTTP/2機能が必要。
6. WebFlux + Kafka統合
6.1 Reactor Kafka
Reactor KafkaはKafka用のリアクティブAPIを提供し、Spring WebFluxとシームレスに統合されます。
@Configuration
public class ReactorKafkaConfig {
@Bean
public ReceiverOptions<String, String> receiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton("events"))
.addAssignListener(partitions ->
log.info("Assigned: {}", partitions))
.addRevokeListener(partitions ->
log.info("Revoked: {}", partitions))
.commitInterval(Duration.ofSeconds(5))
.commitBatchSize(100);
}
@Bean
public SenderOptions<String, String> senderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return SenderOptions.create(props);
}
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
return KafkaReceiver.create(receiverOptions());
}
@Bean
public KafkaSender<String, String> kafkaSender() {
return KafkaSender.create(senderOptions());
}
}
バックプレッシャー付きリアクティブコンシューマ:
@Service
@RequiredArgsConstructor
public class ReactiveEventProcessor {
private final KafkaReceiver<String, String> kafkaReceiver;
private final EventService eventService;
@PostConstruct
public void startConsuming() {
kafkaReceiver.receive()
.groupBy(record -> record.receiverOffset().topicPartition())
.flatMap(partitionFlux -> partitionFlux
.publishOn(Schedulers.boundedElastic())
.concatMap(record -> processRecord(record)
.doOnSuccess(v -> record.receiverOffset().acknowledge())
.onErrorResume(e -> {
log.error("Error processing record: {}", record.key(), e);
record.receiverOffset().acknowledge();
return Mono.empty();
})
))
.subscribe();
}
private Mono<Void> processRecord(ReceiverRecord<String, String> record) {
return eventService.process(record.value())
.doOnNext(result ->
log.debug("Processed: partition={} offset={} key={}",
record.partition(), record.offset(), record.key()));
}
}
リアクティブプロデューサ:
@Service
@RequiredArgsConstructor
public class ReactiveEventProducer {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
public Mono<Void> sendEvent(String topic, String key, Object event) {
return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
.flatMap(json -> {
SenderRecord<String, String, String> record = SenderRecord.create(
new ProducerRecord<>(topic, key, json), key);
return kafkaSender.send(Mono.just(record))
.next()
.doOnNext(result -> log.info("Sent to {}-{} offset {}",
result.recordMetadata().topic(),
result.recordMetadata().partition(),
result.recordMetadata().offset()))
.then();
});
}
public Flux<SenderResult<String>> sendBatch(String topic, List<Event> events) {
Flux<SenderRecord<String, String, String>> records = Flux.fromIterable(events)
.map(event -> {
String json = objectMapper.writeValueAsString(event);
return SenderRecord.create(
new ProducerRecord<>(topic, event.getId(), json), event.getId());
});
return kafkaSender.send(records)
.doOnError(e -> log.error("Batch send failed", e));
}
}
6.2 CQRS + Event Sourcing(WebFluxとKafka)
CQRS(コマンドクエリ責務分離)は読み取りモデルと書(か)き込みモデルを分離(ぶんり)します。Event Sourcingと組み合わせることで、すべての状態変更が不変のイベントとして保存されます。
┌─────────────────────────────────────────────────────────┐
│ CQRS + Event Sourcing │
│ │
│ コマンド側 クエリ側 │
│ ┌──────────┐ ┌──────────┐ │
│ │ WebFlux │ │ WebFlux │ │
│ │ Command │ │ Query │ │
│ │ Handler │ │ Handler │ │
│ └────┬─────┘ └────▲─────┘ │
│ │ │ │
│ ┌────▼─────┐ ┌────┴─────┐ │
│ │ Domain │ │ Read │ │
│ │ Aggregate│ │ Model │ │
│ └────┬─────┘ │ (R2DBC) │ │
│ │ └────▲─────┘ │
│ ┌────▼─────────────────────────────┐ │ │
│ │ Kafka (Event Store) ├─┘ │
│ │ orders.events topic │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
コマンドハンドラー:
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderCommandController {
private final OrderCommandService commandService;
@PostMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public Mono<OrderId> createOrder(@RequestBody Mono<CreateOrderCommand> command) {
return command.flatMap(commandService::handle);
}
@PostMapping("/{orderId}/confirm")
public Mono<ResponseEntity<Void>> confirmOrder(@PathVariable String orderId) {
return commandService.handle(new ConfirmOrderCommand(orderId))
.then(Mono.just(ResponseEntity.accepted().build()));
}
}
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
public Mono<OrderId> handle(CreateOrderCommand command) {
// コマンドのバリデーション
OrderAggregate aggregate = OrderAggregate.create(command);
List<DomainEvent> events = aggregate.getUncommittedEvents();
// Kafkaにイベントを公開
return Flux.fromIterable(events)
.flatMap(event -> publishEvent("order-events", aggregate.getId(), event))
.then(Mono.just(new OrderId(aggregate.getId())));
}
private Mono<Void> publishEvent(String topic, String key, DomainEvent event) {
return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
.flatMap(json -> {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, json);
record.headers().add("event-type", event.getType().getBytes());
return kafkaSender.send(Mono.just(SenderRecord.create(record, key)))
.next()
.then();
});
}
}
イベントプロジェクション(読み取り側の更新):
@Service
@RequiredArgsConstructor
public class OrderProjectionService {
private final KafkaReceiver<String, String> kafkaReceiver;
private final OrderReadRepository readRepository;
private final ObjectMapper objectMapper;
@PostConstruct
public void startProjection() {
kafkaReceiver.receive()
.concatMap(record -> {
String eventType = new String(
record.headers().lastHeader("event-type").value());
return processEvent(eventType, record.value())
.doOnSuccess(v -> record.receiverOffset().acknowledge());
})
.subscribe();
}
private Mono<Void> processEvent(String eventType, String payload) {
return switch (eventType) {
case "OrderCreated" -> {
OrderCreatedEvent event = objectMapper.readValue(payload, OrderCreatedEvent.class);
yield readRepository.save(OrderReadModel.from(event)).then();
}
case "OrderConfirmed" -> {
OrderConfirmedEvent event = objectMapper.readValue(payload, OrderConfirmedEvent.class);
yield readRepository.findById(event.getOrderId())
.map(model -> model.withStatus("CONFIRMED"))
.flatMap(readRepository::save)
.then();
}
default -> Mono.empty();
};
}
}
7. 実践プロジェクト:リアルタイム株価ティッカーシステム
7.1 システムアーキテクチャ
┌──────────────────────────────────────────────────────────────────┐
│ リアルタイム株価ティッカーシステム │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ マーケット │ │ Kafka │ │ WebFlux │ │
│ │ データ │────▶│ Cluster │────▶│ API Gateway │ │
│ │ プロバイダ │ │ │ │ │ │
│ │ (WebSocket) │ │ raw-prices │ │ SSE /prices │ │
│ └─────────────┘ │ processed │ │ WS /ws/trade │ │
│ │ alerts │ │ gRPC /internal │ │
│ ┌─────────────┐ │ │ └───────┬─────────┘ │
│ │ ニュース │────▶│ │ │ │
│ │ フィード │ └──────┬───────┘ │ │
│ │ (REST API) │ │ ┌──────▼─────────┐ │
│ └─────────────┘ ┌──────▼───────┐ │ クライアント │ │
│ │ Kafka Streams│ │ Webダッシュボード│
│ │ 処理 │ │ モバイルアプリ │ │
│ │ │ │ トレーディングBot│ │
│ │ - VWAP計算 │ └────────────────┘ │
│ │ - 異常検知 │ │
│ │ - 集約 │ ┌────────────────┐ │
│ └──────┬───────┘ │ PostgreSQL │ │
│ │ │ (R2DBC) │ │
│ └─────────────▶│ + Redis │ │
│ └────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
7.2 マーケットデータ取(と)り込(こ)みサービス
@Service
@RequiredArgsConstructor
@Slf4j
public class MarketDataIngestionService {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
private final WebSocketClient webSocketClient;
@PostConstruct
public void connect() {
URI uri = URI.create("wss://market-data-provider.example.com/stream");
webSocketClient.execute(uri, session -> {
// シンボルのサブスクライブ
Mono<Void> subscribe = session.send(
Mono.just(session.textMessage(
"{\"action\":\"subscribe\",\"symbols\":[\"AAPL\",\"GOOG\",\"MSFT\",\"AMZN\"]}"
))
);
// 受信マーケットデータの処理
Flux<Void> receive = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(this::publishToKafka)
.doOnError(e -> log.error("Market data stream error", e));
return subscribe.thenMany(receive).then();
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofMinutes(5)))
.subscribe();
}
private Mono<Void> publishToKafka(String rawData) {
try {
MarketTick tick = objectMapper.readValue(rawData, MarketTick.class);
ProducerRecord<String, String> record = new ProducerRecord<>(
"raw-prices", tick.getSymbol(), rawData);
record.headers()
.add("source", "market-data-provider".getBytes())
.add("timestamp", String.valueOf(tick.getTimestamp()).getBytes());
return kafkaSender.send(Mono.just(SenderRecord.create(record, tick.getSymbol())))
.next()
.then();
} catch (Exception e) {
log.error("Failed to parse market data: {}", rawData, e);
return Mono.empty();
}
}
}
7.3 Kafka Streamsによる価格処理
@Component
public class PriceProcessingTopology {
@Autowired
public void buildPipeline(StreamsBuilder builder) {
KStream<String, String> rawPrices = builder.stream("raw-prices");
// 1. パースとバリデーション
KStream<String, StockPrice> validPrices = rawPrices
.mapValues(this::parsePrice)
.filter((symbol, price) -> price != null && price.getPrice() > 0);
// 2. VWAP(出来高加重平均価格)の1分ウィンドウ計算
KTable<Windowed<String>, VWAPResult> vwap = validPrices
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(
VWAPAccumulator::new,
(symbol, price, acc) -> acc.add(price.getPrice(), price.getVolume()),
Materialized.<String, VWAPAccumulator, WindowStore<Bytes, byte[]>>
as("vwap-store")
.withValueSerde(vwapAccumulatorSerde())
)
.mapValues(VWAPAccumulator::result);
// 3. 異常検知(30秒以内に5%超の価格変動)
validPrices
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)))
.aggregate(
PriceRange::new,
(symbol, price, range) -> range.update(price.getPrice()),
Materialized.with(Serdes.String(), priceRangeSerde())
)
.toStream()
.filter((key, range) -> range.percentChange() > 5.0)
.mapValues((key, range) -> new PriceAlert(
key.key(), range.percentChange(), "ANOMALY"))
.to("alerts", Produced.with(windowedSerde(), priceAlertSerde()));
// 4. 処理済み価格の出力
validPrices
.mapValues(price -> new ProcessedPrice(
price.getSymbol(), price.getPrice(), price.getVolume(),
price.getTimestamp(), System.currentTimeMillis()))
.to("processed-prices", Produced.with(Serdes.String(), processedPriceSerde()));
}
}
7.4 Webクライアント向けSSEストリーミングエンドポイント
@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {
private final KafkaReceiver<String, String> processedPriceReceiver;
private final ObjectMapper objectMapper;
private final Sinks.Many<ProcessedPrice> priceSink;
@PostConstruct
public void init() {
// KafkaからリアクティブSinkへのブリッジ
processedPriceReceiver.receive()
.map(record -> {
record.receiverOffset().acknowledge();
try {
return objectMapper.readValue(record.value(), ProcessedPrice.class);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull)
.subscribe(price -> priceSink.tryEmitNext(price));
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ProcessedPrice>> streamPrices(
@RequestParam(required = false) List<String> symbols) {
Flux<ProcessedPrice> priceFlux = priceSink.asFlux();
if (symbols != null && !symbols.isEmpty()) {
Set<String> symbolSet = new HashSet<>(symbols);
priceFlux = priceFlux.filter(p -> symbolSet.contains(p.getSymbol()));
}
return priceFlux
.map(price -> ServerSentEvent.<ProcessedPrice>builder()
.id(price.getSymbol() + "-" + price.getTimestamp())
.event("price")
.data(price)
.build())
.doOnCancel(() -> log.info("Client disconnected from price stream"));
}
@GetMapping(value = "/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PriceAlert>> streamAlerts() {
return alertReceiver.receive()
.map(record -> {
record.receiverOffset().acknowledge();
PriceAlert alert = objectMapper.readValue(record.value(), PriceAlert.class);
return ServerSentEvent.<PriceAlert>builder()
.event("alert")
.data(alert)
.build();
});
}
}
7.5 アプリケーション設定
# application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/stockdb
username: stockapp
password: secret
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
consumer:
group-id: stock-ticker-service
auto-offset-reset: latest
enable-auto-commit: false
producer:
acks: all
compression-type: zstd
batch-size: 32768
properties:
linger.ms: 10
webflux:
base-path: /api
server:
port: 8080
netty:
connection-timeout: 5000
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
tags:
application: stock-ticker
7.6 ローカル開発用Docker Compose
version: '3.8'
services:
kafka-1:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
ports:
- '9092:9092'
kafka-2:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-3:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
postgres:
image: postgres:16
environment:
POSTGRES_DB: stockdb
POSTGRES_USER: stockapp
POSTGRES_PASSWORD: secret
ports:
- '5432:5432'
redis:
image: redis:7-alpine
ports:
- '6379:6379'
8. 意思決定(いしけってい)マトリクス:いつ何(なに)を使うか
8.1 技術選定(ぎじゅつせんてい)ガイド
| シナリオ | WebFlux | Kafka | SSE | WebSocket | gRPC |
|---|---|---|---|---|---|
| リアルタイムダッシュボード | Yes | Yes | 最適 | Good | 過剰 |
| チャットアプリケーション | Yes | 任意 | No | 最適 | Good |
| IoTデータパイプライン | Yes | 最適 | No | Good | 最適 |
| ECサイト注文処理 | 任意 | 最適 | No | No | Good |
| ライブスポーツスコア | Yes | Yes | 最適 | Good | No |
| 金融取引プラットフォーム | Yes | 最適 | No | 最適 | 最適 |
| マイクロサービスイベントバス | No | 最適 | No | No | Good |
| サーバー通知 | Yes | 任意 | 最適 | Good | No |
| ビデオ/オーディオストリーミング | No | No | No | 部分的 | Good |
| ログ集約パイプライン | No | 最適 | No | No | Good |
8.2 決定フローチャート
開始: リアルタイムデータが必要か?
│
├── No → 従来のREST API(Spring MVC)で十分
│
└── Yes → サーバーからクライアントへの一方向のみか?
│
├── Yes → SSE(最もシンプル、自動再接続)
│
└── No → 双方向通信が必要か?
│
├── Yes → ブラウザ向けか?
│ │
│ ├── Yes → WebSocket
│ │
│ └── No → gRPC双方向ストリーミング
│
└── 耐久性のあるイベントストレージが必要か?
│
├── Yes → Kafka + 選択したコンシューマ
│
└── No → 直接WebFluxストリーミング
8.3 ワークロード別パフォーマンス比較
| ワークロード | Spring MVC | WebFlux | WebFlux + Kafka | 備考(びこう) |
|---|---|---|---|---|
| 100同時ユーザー | 50ms avg | 48ms | 52ms | 差はわずか |
| 10K同時ユーザー | 2s avg | 120ms | 150ms | WebFluxが光る |
| 100K同時ユーザー | タイムアウト | 250ms | 300ms | MVCでは処理不可 |
| 100万イベント/秒取り込み | N/A | 限定的 | 120万/秒 | Kafka必須 |
| 複雑なイベント処理 | N/A | 可能 | 50万/秒 | Kafka Streamsの強み |
9. 6ヶ月(かげつ)学習ロードマップ
月(つき)1-2:リアクティブの基礎
| 週(しゅう) | トピック | 成果物(せいかぶつ) |
|---|---|---|
| 1 | Reactive Streams仕様、Publisher/Subscriber | カスタムPublisherの実装 |
| 2 | Project Reactor:Mono/Fluxの基礎 | 20のオペレータ演習 |
| 3 | エラーハンドリング、リトライ、タイムアウトパターン | 耐障害APIクライアント |
| 4 | リアクティブコードのテスト(StepVerifier) | 演習の90%テストカバレッジ |
| 5-6 | Spring WebFlux:コントローラ、関数型エンドポイント | CRUD REST API(WebFlux) |
| 7-8 | R2DBC、WebClient、リアクティブセキュリティ | フルスタックリアクティブアプリ |
月3-4:Kafkaマスタリー
| 週 | トピック | 成果物 |
|---|---|---|
| 9-10 | Kafka基礎:トピック、パーティション、コンシューマ | ローカル3ブローカークラスタ構築 |
| 11-12 | Spring Kafka:プロデューサ/コンシューマパターン | 注文処理システム |
| 13-14 | Kafka Streams:ステートレス/ステートフル操作 | リアルタイム分析パイプライン |
| 15-16 | Exactly-once、トランザクション、Schema Registry | トランザクショナルイベントシステム |
月5:統合とStreaming API
| 週 | トピック | 成果物 |
|---|---|---|
| 17 | WebFluxでのSSE | ライブ通知システム |
| 18 | WebFluxでのWebSocket | チャットアプリケーション |
| 19 | Spring BootでのgRPC Streaming | 内部サービス間通信 |
| 20 | Reactor Kafka + CQRSパターン | イベントソースドマイクロサービス |
月6:本番環境(ほんばんかんきょう)対応
| 週 | トピック | 成果物 |
|---|---|---|
| 21 | KRaft移行、クラスタ運用 | KRaftクラスタデプロイ |
| 22 | 監視:Micrometer、Prometheus、Grafana | オブザーバビリティダッシュボード |
| 23 | パフォーマンスチューニング、負荷テスト | ベンチマークレポート |
| 24 | キャップストーン:リアルタイム株価ティッカーシステム | 本番環境対応プロジェクト |
10. クイズ
Q1: I/Oバウンドワークロードにおいて、Spring WebFluxがSpring MVCより優(すぐ)れている主(おも)な利点は何ですか?
回答(かいとう): WebFluxは、わずか数スレッドで数千の同時接続を処理できるノンブロッキングイベントループモデル(Netty)を使用しています。Spring MVCは、各同時リクエストが1つのスレッドを占有するthread-per-requestモデルを使用します。I/Oバウンドワークロード(スレッドがデータベース、ネットワーク、ファイル操作の待機にほとんどの時間を費やす場合)では、WebFluxは待機にスレッドを浪費(ろうひ)しないため、圧倒的に効率的です。ベンチマークでは、I/Oバウンドワークロードにおいて、WebFluxは7-8倍のスループットを77%少ないメモリで処理できます。
Q2: Project ReactorにおけるflatMap、concatMap、switchMapの違(ちが)いを説明してください。
回答:
flatMap:各要素をPublisherにマップし、eagerly(即座に)マージします。内部のPublisherは並行して実行され、結果は順序通りでない場合があります。順序が重要でない場合に最大スループットを得るのに最適です。concatMap:各要素をPublisherにマップし、順次サブスクライブします。次のサブスクリプションの前に各内部Publisherの完了を待ちます。順序を保持しますが、スループットは低くなります。switchMap:各要素をPublisherにマップしますが、新しい要素が到着すると前の内部Publisherをキャンセルします。最新のサブスクリプションのみがアクティブです。最新のクエリ結果のみが重要な検索中入力(search-as-you-type)に理想的です。
Q3: KafkaがZooKeeperをKRaftに置き換えた理由は?具体的な技術的メリットを3つ挙(あ)げてください。
回答:
- パーティションスケーラビリティ:ZooKeeperは全メタデータをメモリに格納しており、クラスタを約20万パーティションに制限していました。KRaftはログベースのメタデータストアを使用し、数百万パーティションを可能にします。
- 高速フェイルオーバー:ZooKeeperモードでのコントローラフェイルオーバーにはZooKeeperからの全メタデータ読み取りが必要でした(数分)。KRaftのフェイルオーバーは、新しいコントローラがRaftを通じて既にメタデータをレプリケートしているため、数秒で完了します。
- 運用の簡素化:ZooKeeperでは、2つの別々の分散システムのデプロイ、設定、監視、保守が必要でした。KRaftはZooKeeperの依存を完全に排除し、運用の複雑さを軽減します。
追加の利点として、より高速な制御されたシャットダウン、KafkaとZooKeeper間のスプリットブレインリスクの排除、イベント駆動のメタデータ伝播(ZooKeeperの非同期で遅延する可能性のある伝播に対して)があります。
Q4: リアルタイム機能にWebSocketよりSSEを選ぶべきケースは?
回答: 以下の場合にSSEを選択します:
- 通信がサーバーからクライアントへの一方向のみ — 株価ティッカー、ニュースフィード、通知、ダッシュボード。同じ接続でクライアントからデータを送り返す必要がない場合。
- 自動再接続が必要 — SSEにはブラウザの自動再接続機能が内蔵されています。WebSocketでは手動の再接続ロジックが必要です。
- HTTPインフラストラクチャとの互換性 — SSEは標準的なHTTP/1.1で動作し、特別な設定なしにプロキシやロードバランサを通過します。WebSocketのアップグレードは企業のファイアウォールでブロックされる場合があります。
- シンプルさ — SSEの実装はシンプルです(text/event-streamを返すGETエンドポイントのみ)。WebSocketでは接続管理、ハートビート、再接続ロジックが必要です。
双方向通信(チャット、ゲーム、共同編集)やバイナリデータ転送が必要な場合はWebSocketを選択します。
Q5: WebFlux、Kafka、SSEを組み合わせたシステムを設計してください。イベント生成からクライアント表示までのデータフローを説明してください。
回答: リアルタイム注文追跡(ついせき)システムを考えましょう:
-
イベント生成:注文ステータスが変更されると、注文サービスがReactor Kafkaを使用してKafkaトピック(
order-events)にOrderStatusChangedイベントを公開します。プロデューサは耐久性のためacks=allを使用します。 -
イベント処理:Kafka Streamsアプリケーションが
order-eventsから消費し、顧客データ(customer-profilesトピックからのKTable結合)でイベントをエンリッチし、エンリッチされたイベントをorder-notificationsトピックに出力します。 -
WebFluxブリッジ:WebFluxサービスが
KafkaReceiver(Reactor Kafka)を使用してorder-notificationsからリアクティブに消費します。イベントは顧客IDでキー付けされたSinks.Many(マルチキャストホットパブリッシャー)にルーティングされます。 -
SSEエンドポイント:WebFluxコントローラが、認証済みユーザーの顧客IDで
Sinks.ManyのFluxをフィルタリングするSSEエンドポイント/api/orders/trackを公開します。一致する各イベントがServerSentEventとして送信されます。 -
クライアント:ブラウザがSSEエンドポイントを指す
EventSourceを作成します。各イベントでUIが注文追跡ビジュアライゼーションを更新します。接続が切断された場合、ブラウザは自動再接続します(SSE内蔵機能)。
データフロー:注文サービス → Kafka → Kafka Streams → Kafka → Reactor Kafka → Sinks.Many → SSE → ブラウザ
参考文献(さんこうぶんけん)
- Reactive Manifesto
- Reactive Streams仕様
- Project Reactorリファレンスガイド
- Spring WebFluxドキュメント
- Spring WebFlux vs MVCパフォーマンス
- R2DBC仕様
- Netty Project
- Apache Kafkaドキュメント
- Kafka: The Definitive Guide(第2版)
- KIP-500: ZooKeeperの置き換え
- Kafka KRaftドキュメント
- Kafka Streamsドキュメント
- Confluent Schema Registry
- Kafka Connectドキュメント
- Reactor Kafkaリファレンスガイド
- Server-Sent Events (MDN)
- WebSocket API (MDN)
- gRPCドキュメント
- CQRSパターン (Microsoft)
- Event Sourcingパターン (Microsoft)
- Debezium CDCドキュメント
- Spring Boot gRPC Starter
- Kafkaパフォーマンスチューニングガイド (Confluent)
- ZooKeeperドキュメント
- Raftコンセンサスアルゴリズム