Skip to content
Published on

WebFlux + Kafka + ZooKeeper完全攻略:リアクティブストリーミングアーキテクチャのすべて

Authors

はじめに:2026年(ねん)にリアクティブストリーミングアーキテクチャが重要(じゅうよう)な理由(りゆう)

現代(げんだい)のアプリケーションは、毎秒(まいびょう)数百万(すうひゃくまん)のイベントを処理(しょり)し、ユーザーにリアルタイム更新(こうしん)を配信(はいしん)し、予測(よそく)不能(ふのう)な負荷(ふか)の下(もと)で弾力的(だんりょくてき)にスケールすることが求(もと)められています。ブロッキングI/Oを使(つか)う従来(じゅうらい)のリクエスト・レスポンスアーキテクチャでは、これらの要求(ようきゅう)に応(こた)えることができません。2026年において、Spring WebFluxによるノンブロッキングリクエスト処理、Apache Kafkaによる耐久性(たいきゅうせい)のあるイベントストリーミング、そしてZooKeeperからKRaftへのクラスタ管理(かんり)の移行(いこう)の組(く)み合(あ)わせは、リアクティブストリーミングシステムのゴールドスタンダードです。

本(ほん)ガイドでは、リアクティブプログラミングの基礎(きそ)から本番(ほんばん)グレードのアーキテクチャパターンまで、フルスタックをカバーします。リアルタイム分析(ぶんせき)ダッシュボード、金融(きんゆう)取引(とりひき)プラットフォーム、IoTデータパイプラインのいずれを構築(こうちく)する場合(ばあい)でも、実用的(じつようてき)な知識(ちしき)が得(え)られます。

学(まな)べること

  • リアクティブプログラミングの原則とReactive Streams仕様
  • Spring WebFluxの内部構造:Nettyイベントループ、Mono/Flux、オペレータチェーン、R2DBC
  • Apache Kafkaアーキテクチャ:パーティション、コンシューマグループ、Exactly-Onceセマンティクス、Kafka Streams、Connect
  • ZooKeeperからKRaftへの移行:なぜ、いつ、どのように
  • Streaming API比較(ひかく):SSE vs WebSocket vs gRPC Streaming
  • 統合(とうごう)パターン:WebFlux + Reactor Kafka + CQRS + Event Sourcing
  • 完全(かんぜん)な実践(じっせん)プロジェクト:リアルタイム株価(かぶか)ティッカーシステム

1. リアクティブプログラミングの基礎

1.1 リアクティブマニフェスト

リアクティブマニフェストは、リアクティブシステムが備(そな)えるべき4つの主要(しゅよう)な特性(とくせい)を定義(ていぎ)しています:

特性説明(せつめい)実装例(じっそうれい)
即応性(そくおうせい)システムがタイムリーに応答するWebFluxノンブロッキングI/O
耐障害性(たいしょうがいせい)障害発生時もシステムが応答し続けるサーキットブレーカー、バルクヘッドパターン
弾力性(だんりょくせい)変動する負荷の下でもシステムが応答し続けるKafkaパーティションスケーリング
メッセージ駆動(くどう)非同期メッセージパッシングに依存するKafkaトピック、Reactorイベントストリーム

これら4つの特性は独立(どくりつ)していません。メッセージ駆動アーキテクチャが弾力性と耐障害性を実現(じつげん)し、それらが合(あ)わさって即応性を実現します。

1.2 Reactive Streams仕様(しよう)

Reactive Streamsは、ノンブロッキングバックプレッシャーを備えた非同期(ひどうき)ストリーム処理の標準(ひょうじゅん)です。4つのコアインターフェースを定義しています:

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

public interface Subscription {
    void request(long n);
    void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

重要な革新(かくしん)はSubscriptionrequest(long n)メソッドです。これによりバックプレッシャーが可能(かのう)になります:サブスクライバがデータフローの速度(そくど)を制御(せいぎょ)し、プロデューサがコンシューマを圧倒(あっとう)するのを防(ふせ)ぎます。

1.3 バックプレッシャーの理解(りかい)

バックプレッシャーとは、データコンシューマがプロデューサに対(たい)して処理可能なデータ量(りょう)を通知(つうち)するメカニズムです。バックプレッシャーがなければ、高速(こうそく)なプロデューサが低速(ていそく)なコンシューマを圧倒し、メモリ枯渇(こかつ)やメッセージのドロップにつながります。

プロデューサ (1000 msg/s) --> バッファ (オーバーフロー!) --> コンシューマ (100 msg/s)

バックプレッシャーあり:
プロデューサ (100 msg/sに調整) <-- request(100) -- コンシューマ (100 msg/s)

Project Reactorのバックプレッシャー戦略(せんりゃく):

Flux.range(1, Integer.MAX_VALUE)
    .onBackpressureBuffer(256)          // 最大256アイテムをバッファ
    .onBackpressureDrop(dropped ->      // バッファフル時にドロップ
        log.warn("Dropped: {}", dropped))
    .onBackpressureLatest()             // 最新のアイテムのみ保持
    .onBackpressureError()              // 圧倒時にエラーシグナル
    .subscribe(item -> processSlowly(item));

1.4 リアクティブライブラリの全体像(ぜんたいぞう)

ライブラリ言語/プラットフォーム主な特徴採用状況(さいようじょうきょう)
Project ReactorJavaMono/Flux、Spring統合Springエコシステム
RxJava 3JavaObservable/Flowable、Android対応Android、レガシーJava
MutinyJavaUni/Multi、QuarkusネイティブQuarkusエコシステム
Kotlin FlowKotlinコルーチンベース、構造化並行性Kotlinファーストプロジェクト
Akka StreamsScala/Javaアクターベース、グラフDSL高スループットシステム

2. Spring WebFlux 徹底解説(てっていかいせつ)

2.1 WebFlux vs Spring MVC アーキテクチャ

Spring MVCはスレッドプールからリクエストごとに1つのスレッドを使用(しよう)するthread-per-requestモデルです。レスポンスが完全(かんぜん)に書(か)き込(こ)まれるまで、各(かく)受信リクエストが1つのスレッドを占有(せんゆう)します。I/O集約的(しゅうやくてき)な高並行性(こうへいこうせい)ワークロードでは、スレッドはほとんどの時間(じかん)をデータベースクエリ、HTTPコール、ファイル読(よ)み取(と)りの待機(たいき)に費(つい)やします。

Spring WebFluxはNettyを使用したイベントループモデルです。少数(しょうすう)のイベントループスレッドがすべてのリクエストを処理します。I/Oが必要(ひつよう)な場合、リクエストはパーク(スレッドをブロックせず)され、イベントループは他(ほか)のリクエストの処理に移(うつ)ります。

Spring MVC (Thread-per-request):
  Thread-1: [リクエスト] --> [DB待機......] --> [レスポンス]
  Thread-2: [リクエスト] --> [API待機.........] --> [レスポンス]
  Thread-3: [リクエスト] --> [ファイル待機....] --> [レスポンス]
  (200スレッド = 200同時リクエスト)

Spring WebFlux (イベントループ):
  EventLoop-1: [Req1][Req2][Req3][Req1-再開][Req4][Req2-再開]...
  EventLoop-2: [Req5][Req6][Req5-再開][Req7][Req8]...
  (4スレッド = 数千の同時リクエスト)

2.2 Nettyイベントループモデル

NettyはWebFluxのデフォルト組(く)み込(こ)みサーバーです。そのアーキテクチャを理解することはパフォーマンスチューニングに不可欠(ふかけつ)です。

                    ┌─────────────────────────┐
Boss EventLoopGroup                       (接続を受け付ける)                    └────────────┬────────────┘
                    ┌────────────▼────────────┐
Worker EventLoopGroup                       (I/Oイベントを処理)                    ├─────────────────────────┤
EventLoop-1  Channel-AEventLoop-2  Channel-BEventLoop-3  Channel-CEventLoop-4  Channel-D                    └─────────────────────────┘

Nettyイベントループを使(つか)う際(さい)の重要なルール:

  1. イベントループスレッドを絶対(ぜったい)にブロックしないThread.sleep()、ブロッキングI/O、synchronizedロック禁止(きんし)
  2. CPU集約的な作業(さぎょう)はバウンデッドエラスティックスケジューラにオフロードする
  3. チャネルアフィニティ — 各チャネルはそのライフタイム中、1つのイベントループにバインドされる

2.3 MonoとFlux:コア型(がた)

Monoは0または1要素(ようそ)を表現(ひょうげん)します。Fluxは0からN要素を表現します。

// Mono: 単一値またはempty
Mono<User> findUser = userRepository.findById(userId);
Mono<Void> saveResult = userRepository.save(user);

// Flux: 値のストリーム
Flux<Order> orders = orderRepository.findByUserId(userId);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<ServerSentEvent<String>> sseStream = Flux.create(sink -> {
    // sinkにイベントをプッシュ
});

MonoとFluxの生成(せいせい):

// 静的ファクトリメソッド
Mono.just("hello");
Mono.empty();
Mono.error(new RuntimeException("failure"));
Mono.fromCallable(() -> expensiveComputation());
Mono.defer(() -> Mono.just(dynamicValue()));

Flux.just("a", "b", "c");
Flux.fromIterable(myList);
Flux.range(1, 100);
Flux.interval(Duration.ofMillis(500));
Flux.merge(flux1, flux2);
Flux.concat(flux1, flux2);
Flux.zip(flux1, flux2, (a, b) -> a + b);

2.4 必須(ひっす)オペレータ

// 変換
flux.map(String::toUpperCase)
    .flatMap(name -> userService.findByName(name))   // 非同期 1:N
    .concatMap(user -> enrichUser(user))              // 順次非同期
    .switchMap(query -> search(query))                // 前の処理をキャンセル
    .flatMapSequential(id -> fetchById(id))           // 順序保証付き並列

// フィルタリング
flux.filter(user -> user.isActive())
    .distinct()
    .take(10)
    .skip(5)
    .takeUntil(event -> event.isTerminal())

// エラーハンドリング
flux.onErrorReturn("default")
    .onErrorResume(ex -> fallbackFlux())
    .retry(3)
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
        .maxBackoff(Duration.ofSeconds(30))
        .filter(ex -> ex instanceof TransientException))
    .timeout(Duration.ofSeconds(5))

// 結合
Flux.merge(fastSource, slowSource)          // インターリーブ
Flux.concat(firstBatch, secondBatch)        // 順次
Flux.zip(names, ages, Person::new)          // ペアリング
Mono.zip(userMono, profileMono, ordersMono) // 並列ジョイン
    .map(tuple -> buildResponse(tuple.getT1(), tuple.getT2(), tuple.getT3()))

// 副作用(ログ/デバッグ専用)
flux.doOnNext(item -> log.debug("Processing: {}", item))
    .doOnError(ex -> log.error("Error: {}", ex.getMessage()))
    .doOnComplete(() -> log.info("Stream completed"))
    .doOnSubscribe(sub -> log.info("Subscribed"))

2.5 WebFluxコントローラと関数型(かんすうがた)エンドポイント

アノテーションベース(お馴染(なじ)みのMVCスタイル):

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
        return userService.findById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamUsers() {
        return userService.streamAllUsers();
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<User> createUser(@Valid @RequestBody Mono<User> userMono) {
        return userMono.flatMap(userService::create);
    }
}

関数型エンドポイント(ルーター + ハンドラー):

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
            .path("/api/users", builder -> builder
                .GET("/{id}", handler::getUser)
                .GET("", handler::listUsers)
                .POST("", handler::createUser)
                .PUT("/{id}", handler::updateUser)
                .DELETE("/{id}", handler::deleteUser))
            .build();
    }
}

@Component
public class UserHandler {

    private final UserService userService;

    public Mono<ServerResponse> getUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> listUsers(ServerRequest request) {
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(userService.findAll(), User.class);
    }
}

2.6 R2DBC:リアクティブデータベースアクセス

R2DBC(Reactive Relational Database Connectivity)は、WebFluxアプリケーションにノンブロッキングデータベースアクセスを提供(ていきょう)します。

@Configuration
@EnableR2dbcRepositories
public class DatabaseConfig extends AbstractR2dbcConfiguration {

    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return ConnectionFactories.get(ConnectionFactoryOptions.builder()
            .option(DRIVER, "postgresql")
            .option(HOST, "localhost")
            .option(PORT, 5432)
            .option(DATABASE, "mydb")
            .option(USER, "user")
            .option(PASSWORD, "password")
            .option(MAX_SIZE, 20)
            .build());
    }
}

public interface UserRepository extends ReactiveCrudRepository<User, String> {

    Flux<User> findByStatusOrderByCreatedAtDesc(String status);

    @Query("SELECT * FROM users WHERE email = :email")
    Mono<User> findByEmail(String email);

    @Query("SELECT * FROM users WHERE department = :dept")
    Flux<User> findByDepartment(String dept);
}

DatabaseClientを使った複雑(ふくざつ)なクエリ:

@Repository
public class CustomUserRepository {

    private final DatabaseClient databaseClient;

    public Flux<User> searchUsers(String keyword, int limit) {
        return databaseClient.sql(
                "SELECT * FROM users WHERE name ILIKE :keyword LIMIT :limit")
            .bind("keyword", "%" + keyword + "%")
            .bind("limit", limit)
            .map(row -> new User(
                row.get("id", String.class),
                row.get("name", String.class),
                row.get("email", String.class)
            ))
            .all();
    }
}

2.7 WebFluxパフォーマンスベンチマーク

ベンチマーク条件(じょうけん):4コアCPU、8GB RAM、10,000同時接続(どうじせつぞく)、I/Oバウンドワークロード(リクエストあたり50msのデータベースレイテンシ)。

メトリクスSpring MVC (Tomcat)Spring WebFlux (Netty)改善率(かいぜんりつ)
スループット (req/s)2,40018,5007.7倍
平均レイテンシ (ms)4,20054087%低減
P99レイテンシ (ms)12,0001,20090%低減
メモリ使用量 (MB)2,10048077%削減
アクティブスレッド数200896%削減
最大同時リクエスト20050,000+250倍

WebFluxを使うべきでないケース:

  • CPU集約的ワークロード(圧縮(あっしゅく)、暗号化(あんごうか)、重い計算(けいさん))
  • MVCで既(すで)に十分(じゅうぶん)なパフォーマンスを発揮(はっき)しているアプリケーション
  • リアクティブプログラミング経験(けいけん)のないチーム
  • ブロッキングライブラリが避(さ)けられない場合(JDBCでR2DBCラッパーなし)
  • 低並行性のシンプルなCRUDアプリケーション

3. Apache Kafka 完全ガイド

3.1 Kafkaアーキテクチャ概要(がいよう)

Apache Kafkaは、1日(いちにち)に数兆(すうちょう)のイベントを処理できる分散(ぶんさん)イベントストリーミングプラットフォームです。そのアーキテクチャはいくつかの主要な概念に基(もと)づいています:

┌─────────────────────────────────────────────────────────────┐
Kafkaクラスタ│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                  │
│  │ Broker 1 │  │ Broker 2 │  │ Broker 3 │                  │
│  │          │  │          │  │          │                  │
│  │ Topic-A  │  │ Topic-A  │  │ Topic-A  │                  │
│  │ Part-0   │  │ Part-1   │  │ Part-2   │                  │
 (leader) (leader) (leader) │                  │
│  │          │  │          │  │          │                  │
│  │ Topic-A  │  │ Topic-A  │  │ Topic-A  │                  │
│  │ Part-1   │  │ Part-2   │  │ Part-0   │                  │
 (replica) (replica) (replica)│                  │
│  └──────────┘  └──────────┘  └──────────┘                  │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  メタデータ: ZooKeeper (レガシー) or KRaft (3.5+)    │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

コア概念:

概念説明
トピックメッセージの名前付きフィード。データベーステーブルに類似
パーティショントピック内(ない)の順序付き不変レコードシーケンス
オフセットパーティション内の各レコードのシーケンシャルID
ブローカーデータを保存(ほぞん)しクライアントにサービスを提供するKafkaサーバー
プロデューサトピックにレコードを公開するクライアント
コンシューマトピックからレコードを読み取るクライアント
コンシューマグループトピックから協調(きょうちょう)して消費するコンシューマの集合
レプリケーション各パーティションは耐久性のため複数ブローカーにレプリケートされる
ISRIn-Sync Replicas:リーダーに追(お)いついたレプリカ

3.2 プロデューサ設定(せってい)とベストプラクティス

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // 信頼性設定
        config.put(ProducerConfig.ACKS_CONFIG, "all");           // 全ISRレプリカを待機
        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // パフォーマンスチューニング
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);     // 32KBバッチ
        config.put(ProducerConfig.LINGER_MS_CONFIG, 20);         // バッチング待機20ms
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MBバッファ

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

さまざまなパターンでのメッセージ送信(そうしん):

@Service
@RequiredArgsConstructor
public class OrderEventProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    // Fire-and-forget(最高スループット)
    public void sendFireAndForget(OrderEvent event) {
        kafkaTemplate.send("orders", event.getOrderId(), event);
    }

    // 同期送信(最高信頼性)
    public void sendSync(OrderEvent event) throws Exception {
        kafkaTemplate.send("orders", event.getOrderId(), event)
            .get(10, TimeUnit.SECONDS);
    }

    // コールバック付き非同期送信(バランス型)
    public CompletableFuture<SendResult<String, Object>> sendAsync(OrderEvent event) {
        return kafkaTemplate.send("orders", event.getOrderId(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to send order event: {}", event.getOrderId(), ex);
                } else {
                    log.info("Order event sent to partition {} offset {}",
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }

    // ヘッダー付き送信
    public void sendWithHeaders(OrderEvent event) {
        ProducerRecord<String, Object> record = new ProducerRecord<>("orders",
            null, null, event.getOrderId(), event);
        record.headers()
            .add("event-type", "ORDER_CREATED".getBytes())
            .add("source", "order-service".getBytes())
            .add("correlation-id", UUID.randomUUID().toString().getBytes());
        kafkaTemplate.send(record);
    }
}

3.3 コンシューマ設定とパターン

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // オフセット管理
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手動コミット

        // パフォーマンスチューニング
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()),
            new FixedBackOff(1000L, 3)
        ));
        return factory;
    }
}

コンシューマリスナーパターン:

@Component
@RequiredArgsConstructor
public class OrderEventConsumer {

    private final OrderService orderService;

    // 単一レコード処理
    @KafkaListener(topics = "orders", groupId = "order-processing-group")
    public void processOrder(ConsumerRecord<String, OrderEvent> record,
                             Acknowledgment ack) {
        try {
            log.info("Processing order: {} from partition: {} offset: {}",
                record.value().getOrderId(),
                record.partition(),
                record.offset());
            orderService.process(record.value());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process order: {}", record.value().getOrderId(), e);
            // アクノリッジしない — エラーハンドラーによりリトライされる
        }
    }

    // バッチ処理
    @KafkaListener(topics = "analytics-events", groupId = "analytics-batch-group",
                   containerFactory = "batchKafkaListenerContainerFactory")
    public void processBatch(List<ConsumerRecord<String, AnalyticsEvent>> records,
                             Acknowledgment ack) {
        log.info("Processing batch of {} records", records.size());
        try {
            List<AnalyticsEvent> events = records.stream()
                .map(ConsumerRecord::value)
                .collect(Collectors.toList());
            analyticsService.bulkInsert(events);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Batch processing failed", e);
        }
    }
}

3.4 Exactly-Onceセマンティクス (EOS)

Kafkaは、冪等(べきとう)プロデューサとトランザクションAPIの組み合わせにより、exactly-onceセマンティクスをサポートします。

@Configuration
public class KafkaTransactionalConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }
}

@Service
@RequiredArgsConstructor
public class TransactionalOrderService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional("kafkaTransactionManager")
    public void processOrderTransactionally(OrderEvent order) {
        // このメソッド内の全メッセージはアトミックに送信される
        kafkaTemplate.send("orders-processed", order.getOrderId(), order);
        kafkaTemplate.send("inventory-updates", order.getProductId(),
            new InventoryEvent(order.getProductId(), -order.getQuantity()));
        kafkaTemplate.send("payment-requests", order.getOrderId(),
            new PaymentEvent(order.getOrderId(), order.getTotalAmount()));
        // いずれかの送信が失敗すると、全てロールバックされる
    }
}

3つの配信(はいしん)保証(ほしょう)レベル:

保証設定ユースケース
At-most-onceacks=0、自動コミットメトリクス、ロギング
At-least-onceacks=all、手動コミット、冪等コンシューマほとんどのアプリケーション
Exactly-onceトランザクションAPI + read-committed分離金融、課金、在庫管理

3.5 Kafka Streams

Kafka Streamsは、入出力データがKafkaに格納(かくのう)されるアプリケーションとマイクロサービスを構築するためのクライアントライブラリです。

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        config.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
        return new KafkaStreamsConfiguration(config);
    }
}

@Component
public class OrderAnalyticsTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder) {
        // リアルタイム注文分析パイプライン
        KStream<String, OrderEvent> orders = streamsBuilder
            .stream("orders", Consumed.with(Serdes.String(), orderEventSerde()));

        // 1. タンブリングウィンドウでの商品別注文数
        KTable<Windowed<String>, Long> productCounts = orders
            .groupBy((key, order) -> order.getProductId(),
                Grouped.with(Serdes.String(), orderEventSerde()))
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .count(Materialized.as("product-counts-store"));

        // 2. カテゴリ別収益集約
        KTable<String, Double> revenueByCategory = orders
            .groupBy((key, order) -> order.getCategory(),
                Grouped.with(Serdes.String(), orderEventSerde()))
            .aggregate(
                () -> 0.0,
                (key, order, total) -> total + order.getTotalAmount(),
                Materialized.with(Serdes.String(), Serdes.Double())
            );

        // 3. 顧客データとの結合によるエンリッチメント
        KTable<String, CustomerProfile> customers = streamsBuilder
            .table("customer-profiles",
                Consumed.with(Serdes.String(), customerProfileSerde()));

        KStream<String, EnrichedOrder> enrichedOrders = orders
            .selectKey((key, order) -> order.getCustomerId())
            .join(customers,
                (order, customer) -> new EnrichedOrder(order, customer),
                Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde()));

        enrichedOrders.to("enriched-orders",
            Produced.with(Serdes.String(), enrichedOrderSerde()));

        // 4. セッションウィンドウによる不正検知
        orders
            .filter((key, order) -> order.getTotalAmount() > 1000)
            .groupByKey()
            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
            .count()
            .toStream()
            .filter((windowedKey, count) -> count >= 3)
            .mapValues((key, count) -> "ALERT: セッション内に" + count + "件の高額注文")
            .to("fraud-alerts");
    }
}

3.6 Kafka Connect

Kafka Connectは、Kafkaをデータベース、キーバリューストア、検索(けんさく)インデックス、ファイルシステムなどの外部(がいぶ)システムと接続するフレームワークです。

{
  "name": "postgresql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "password",
    "database.dbname": "orders_db",
    "database.server.name": "order-server",
    "table.include.list": "public.orders,public.order_items",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "topic.prefix": "cdc",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "cdc-$3"
  }
}

Elasticsearchシンクコネクタ:

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "enriched-orders",
    "type.name": "_doc",
    "key.ignore": false,
    "schema.ignore": true,
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "batch.size": 200,
    "max.buffered.records": 5000,
    "flush.timeout.ms": 10000
  }
}

3.7 Schema Registry

Schema Registryは、メッセージスキーマの集中(しゅうちゅう)リポジトリを提供し、スキーマの進化(しんか)と互換性(ごかんせい)チェックを可能にします。

// Avroスキーマ付きプロデューサ
@Configuration
public class AvroProducerConfig {

    @Bean
    public ProducerFactory<String, GenericRecord> avroProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        config.put("schema.registry.url", "http://schema-registry:8081");
        config.put("auto.register.schemas", true);
        return new DefaultKafkaProducerFactory<>(config);
    }
}

スキーマ進化の互換性モード:

モード許可(きょか)される変更最適な用途
BACKWARDフィールド削除、オプショナルフィールド追加コンシューマ先行アップグレード
FORWARDフィールド追加、オプショナルフィールド削除プロデューサ先行アップグレード
FULLオプショナルフィールドの追加/削除のみ独立アップグレード
NONEあらゆる変更が許可開発環境のみ

3.8 Kafkaパフォーマンスチューニング早見表(はやみひょう)

パラメータデフォルト推奨値(すいしょうち)影響(えいきょう)
num.partitions1トピックあたり6-12並列性
replication.factor13耐久性
min.insync.replicas12書き込み耐久性
batch.size (プロデューサ)1638432768-65536スループットvsレイテンシ
linger.ms (プロデューサ)05-100バッチング効率
compression.typenonezstd or lz4ネットワーク/ディスク節約
fetch.min.bytes (コンシューマ)11024-16384スループットvsレイテンシ
max.poll.records (コンシューマ)500100-1000処理バッチサイズ
segment.bytes1073741824536870912ログコンパクション頻度
retention.ms604800000ユースケースに依存ストレージvsリプレイ能力

4. ZooKeeperからKRaftへ:大移行(だいいこう)

4.1 KafkaにおけるZooKeeperの役割(やくわり)

ZooKeeperは当初(とうしょ)からKafkaのメタデータ管理システムでした。以下を担当(たんとう)します:

  • ブローカー登録(とうろく):どのブローカーが生存しているかの追跡
  • トピックメタデータ:パーティション割り当て、ISRリスト、設定
  • コントローラ選出(せんしゅつ):どのブローカーがクラスタコントローラとして機能するかの選択
  • コンシューマグループ調整(レガシー):コンシューマオフセットの保存(0.9+でKafka自体に移行済み)
  • ACL:認可のためのアクセス制御リスト

4.2 ZooKeeperを廃止(はいし)すべき理由

問題(もんだい)説明
運用の複雑さ2つの異なる分散システムのデプロイ、監視、保守が必要
スケーリングのボトルネックZooKeeperは全メタデータをメモリに格納;約20万パーティションが上限
スプリットブレインリスクネットワーク分断でZooKeeperとKafkaの不一致が発生しうる
復旧時間コントローラフェイルオーバーにはZooKeeperからの全メタデータ読み取りが必要(数分)
専門知識の要件チームはKafkaとZooKeeperの両方に深い知識が必要

4.3 KRaft:Kafka内蔵(ないぞう)のコンセンサス

KRaft(Kafka Raft)は、ZooKeeperを内蔵のRaftベースコンセンサスプロトコルで置(お)き換(か)えます。メタデータは内部Kafkaトピック(__cluster_metadata)に格納され、コントローラノードのセットがクラスタ状態(じょうたい)を管理します。

ZooKeeperモード:                       KRaftモード:
┌────────┐    ┌────────────┐           ┌──────────────────────┐
Broker │───▶│ ZooKeeper  │           │ Controller (Raft)Broker │───▶│ Ensemble   │           │  Node 1 (leader)Broker │───▶│ (3-5ノード)│           │  Node 2 (follower)└────────┘    └────────────┘           │  Node 3 (follower)                                       └──────────┬───────────┘
                                       ┌──────────▼───────────┐
BrokersBroker 1Broker 2Broker 3                                       └──────────────────────┘

KRaftの利点(りてん):

機能ZooKeeperモードKRaftモード
最大パーティション約200,000数百万
コントローラフェイルオーバー数分数秒
メタデータ伝播非同期(遅延あり)イベント駆動(即座)
デプロイ2つのシステム1つのシステム
運用オーバーヘッド高い低い
シャットダウン時間数分数秒

4.4 KIP-500:ロードマップ

KIP-500(「ZooKeeperを自己管理メタデータクォーラムで置き換え」)は、複数年(ふくすうねん)にわたる移行計画(けいかく)を定義しました:

Kafkaバージョンマイルストーン
2.8KRaftアーリーアクセス(開発専用)2021
3.3KRaftが本番環境対応に2022
3.5ZooKeeper移行ツールが利用可能に2023
3.7ブリッジリリース(デュアルサポート)2024
4.0ZooKeeperサポートを完全に削除2025

4.5 移行ガイド:ZooKeeperからKRaftへ

ステップ1:移行前チェックリスト

# Kafkaバージョンの確認(3.5+が必要)
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | head -1

# 現在のZooKeeperメタデータの確認
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
  --cluster-id $(kafka-storage.sh random-uuid)

# 非推奨機能が使用されていないことを確認
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers

ステップ2:KRaftコントローラの設定

# kraft/server.properties(コントローラノード用)
process.roles=controller
node.id=1
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093
log.dirs=/var/kraft-controller-logs

ステップ3:移行の実行(じっこう)

# コントローラストレージのフォーマット
kafka-storage.sh format \
  --config kraft/server.properties \
  --cluster-id $(kafka-storage.sh random-uuid) \
  --release-version 3.7

# 移行開始(ZooKeeperモード -> デュアルモード)
kafka-metadata.sh --migrate \
  --zookeeper-connect zk1:2181,zk2:2181,zk3:2181 \
  --bootstrap-server kafka1:9092

# 移行状態の確認
kafka-metadata.sh --status \
  --bootstrap-server kafka1:9092

# ファイナライズ(復帰不可ポイント)
kafka-metadata.sh --finalize \
  --bootstrap-server kafka1:9092

ステップ4:移行後の検証(けんしょう)

# 全トピックがアクセス可能か確認
kafka-topics.sh --bootstrap-server localhost:9092 --list

# コンシューマグループの確認
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# コントローラ状態の確認
kafka-metadata.sh --controllers \
  --bootstrap-server localhost:9092

4.6 ZAB vs Raft:プロトコル比較

側面ZAB (ZooKeeper)Raft (KRaft)
リーダー選出エポックベースタームベース
ログレプリケーション2フェーズコミットアペンドエントリ
一貫性逐次一貫性線形化可能読み取り(オプション)
メンバーシップ静的設定動的再設定
スナップショットファジースナップショット一貫性スナップショット
複雑性高い中程度(設計上)
実装別システムKafkaに統合

5. Streaming API比較:SSE vs WebSocket vs gRPC Streaming

5.1 Server-Sent Events (SSE)

SSEは、サーバーからクライアントへのプッシュのためのシンプルなHTTPベースのプロトコルです。サーバーはtext/event-streamコンテンツタイプを使用して、長寿命(ちょうじゅみょう)HTTP接続でイベントを送信します。

WebFlux SSEエンドポイント:

@RestController
@RequestMapping("/api/stream")
public class SSEController {

    private final StockPriceService stockPriceService;

    @GetMapping(value = "/prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamPrices(
            @RequestParam List<String> symbols) {
        return stockPriceService.getPriceStream(symbols)
            .map(price -> ServerSentEvent.<StockPrice>builder()
                .id(String.valueOf(price.getTimestamp()))
                .event("price-update")
                .data(price)
                .retry(Duration.ofSeconds(5))
                .build());
    }

    @GetMapping(value = "/heartbeat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> heartbeat() {
        return Flux.interval(Duration.ofSeconds(30))
            .map(seq -> ServerSentEvent.<String>builder()
                .event("heartbeat")
                .data("ping")
                .build());
    }
}

JavaScriptクライアント:

const eventSource = new EventSource('/api/stream/prices?symbols=AAPL,GOOG,MSFT')

eventSource.addEventListener('price-update', (event) => {
  const price = JSON.parse(event.data)
  updateUI(price)
})

eventSource.addEventListener('heartbeat', () => {
  console.log('Connection alive')
})

eventSource.onerror = (error) => {
  console.error('SSE error:', error)
  // ブラウザが自動的に再接続
}

5.2 WebSocket

WebSocketは、単一(たんいつ)のTCP接続上で全二重通信を提供します。クライアントとサーバーの両方がいつでもメッセージを送信できます。

WebFlux WebSocketハンドラー:

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping webSocketMapping(StockWebSocketHandler handler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/stocks", handler);
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Component
@RequiredArgsConstructor
public class StockWebSocketHandler implements WebSocketHandler {

    private final StockPriceService stockPriceService;
    private final ObjectMapper objectMapper;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 受信メッセージ(サブスクリプション)
        Flux<String> incoming = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(msg -> log.info("Received subscription: {}", msg));

        // 送信価格更新
        Flux<WebSocketMessage> outgoing = incoming
            .flatMap(msg -> {
                SubscriptionRequest req = parseRequest(msg);
                return stockPriceService.getPriceStream(req.getSymbols());
            })
            .map(price -> {
                try {
                    String json = objectMapper.writeValueAsString(price);
                    return session.textMessage(json);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });

        return session.send(outgoing);
    }
}

JavaScriptクライアント:

const ws = new WebSocket('ws://localhost:8080/ws/stocks')

ws.onopen = () => {
  ws.send(
    JSON.stringify({
      action: 'subscribe',
      symbols: ['AAPL', 'GOOG', 'MSFT'],
    })
  )
}

ws.onmessage = (event) => {
  const price = JSON.parse(event.data)
  updateUI(price)
}

ws.onclose = (event) => {
  console.log('WebSocket closed:', event.code, event.reason)
  // 手動で再接続が必要
  setTimeout(reconnect, 3000)
}

5.3 gRPC Streaming

gRPCは4つの通信パターンをサポートします:ユナリ、サーバーストリーミング、クライアントストリーミング、双方向(そうほうこう)ストリーミング。

Proto定義:

syntax = "proto3";

package stocks;

service StockService {
    // ユナリ
    rpc GetPrice (PriceRequest) returns (StockPrice);

    // サーバーストリーミング
    rpc StreamPrices (PriceStreamRequest) returns (stream StockPrice);

    // クライアントストリーミング
    rpc RecordTrades (stream TradeRecord) returns (TradeSummary);

    // 双方向ストリーミング
    rpc LiveTrading (stream TradeOrder) returns (stream TradeConfirmation);
}

message PriceStreamRequest {
    repeated string symbols = 1;
    int32 interval_ms = 2;
}

message StockPrice {
    string symbol = 1;
    double price = 2;
    double change = 3;
    int64 timestamp = 4;
    int64 volume = 5;
}

サーバー側実装(Spring Boot + grpc-spring-boot-starter):

@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {

    private final StockPriceService stockPriceService;

    @Override
    public void streamPrices(PriceStreamRequest request,
                             StreamObserver<StockPrice> responseObserver) {
        stockPriceService.getPriceStream(request.getSymbolsList())
            .subscribe(
                price -> responseObserver.onNext(toProto(price)),
                error -> responseObserver.onError(
                    Status.INTERNAL.withDescription(error.getMessage()).asException()),
                responseObserver::onCompleted
            );
    }

    @Override
    public StreamObserver<TradeOrder> liveTrading(
            StreamObserver<TradeConfirmation> responseObserver) {
        return new StreamObserver<TradeOrder>() {
            @Override
            public void onNext(TradeOrder order) {
                TradeConfirmation confirmation = executeTrade(order);
                responseObserver.onNext(confirmation);
            }

            @Override
            public void onError(Throwable t) {
                log.error("Client error in live trading", t);
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

5.4 比較表

機能SSEWebSocketgRPC Streaming
プロトコルHTTP/1.1WS (TCP)HTTP/2
方向サーバーからクライアント双方向4パターン全て
データフォーマットテキスト (UTF-8)テキストまたはバイナリProtobuf(バイナリ)
自動再接続ブラウザ内蔵手動手動
バックプレッシャー限定的なし(ネイティブ)内蔵(フロー制御)
ブラウザサポート全モダンブラウザ全モダンブラウザgrpc-web(限定的)
ファイアウォール対応良好(標準HTTP)ブロックされる場合ありHTTP/2が必要
多重化なしなしあり(HTTP/2)
最大接続数ドメインあたり6 (HTTP/1)制限なし多重化
圧縮標準HTTPメッセージ単位(任意)内蔵
レイテンシ低い最低低い
スループット中程度高い最高
シリアライゼーションJSON(テキスト)JSONまたは独自バイナリProtobuf(高効率)
ユースケース通知、フィードチャット、ゲーム、コラボマイクロサービス、IoT

5.5 選択(せんたく)の指針(ししん)

  • SSE:ダッシュボード更新、ニュースフィード、株価ティッカー(読み取り専用)、通知ストリーム。実装がシンプル、プロキシを通過、自動再接続。
  • WebSocket:チャットアプリケーション、共同編集、マルチプレイヤーゲーム。低レイテンシの双方向通信が必要。
  • gRPC Streaming:マイクロサービス間通信、IoTデータ取り込み、高スループット内部API。スキーマ強制、効率的なシリアライゼーション、HTTP/2機能が必要。

6. WebFlux + Kafka統合

6.1 Reactor Kafka

Reactor KafkaはKafka用のリアクティブAPIを提供し、Spring WebFluxとシームレスに統合されます。

@Configuration
public class ReactorKafkaConfig {

    @Bean
    public ReceiverOptions<String, String> receiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return ReceiverOptions.<String, String>create(props)
            .subscription(Collections.singleton("events"))
            .addAssignListener(partitions ->
                log.info("Assigned: {}", partitions))
            .addRevokeListener(partitions ->
                log.info("Revoked: {}", partitions))
            .commitInterval(Duration.ofSeconds(5))
            .commitBatchSize(100);
    }

    @Bean
    public SenderOptions<String, String> senderOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        return SenderOptions.create(props);
    }

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        return KafkaReceiver.create(receiverOptions());
    }

    @Bean
    public KafkaSender<String, String> kafkaSender() {
        return KafkaSender.create(senderOptions());
    }
}

バックプレッシャー付きリアクティブコンシューマ:

@Service
@RequiredArgsConstructor
public class ReactiveEventProcessor {

    private final KafkaReceiver<String, String> kafkaReceiver;
    private final EventService eventService;

    @PostConstruct
    public void startConsuming() {
        kafkaReceiver.receive()
            .groupBy(record -> record.receiverOffset().topicPartition())
            .flatMap(partitionFlux -> partitionFlux
                .publishOn(Schedulers.boundedElastic())
                .concatMap(record -> processRecord(record)
                    .doOnSuccess(v -> record.receiverOffset().acknowledge())
                    .onErrorResume(e -> {
                        log.error("Error processing record: {}", record.key(), e);
                        record.receiverOffset().acknowledge();
                        return Mono.empty();
                    })
                ))
            .subscribe();
    }

    private Mono<Void> processRecord(ReceiverRecord<String, String> record) {
        return eventService.process(record.value())
            .doOnNext(result ->
                log.debug("Processed: partition={} offset={} key={}",
                    record.partition(), record.offset(), record.key()));
    }
}

リアクティブプロデューサ:

@Service
@RequiredArgsConstructor
public class ReactiveEventProducer {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;

    public Mono<Void> sendEvent(String topic, String key, Object event) {
        return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
            .flatMap(json -> {
                SenderRecord<String, String, String> record = SenderRecord.create(
                    new ProducerRecord<>(topic, key, json), key);
                return kafkaSender.send(Mono.just(record))
                    .next()
                    .doOnNext(result -> log.info("Sent to {}-{} offset {}",
                        result.recordMetadata().topic(),
                        result.recordMetadata().partition(),
                        result.recordMetadata().offset()))
                    .then();
            });
    }

    public Flux<SenderResult<String>> sendBatch(String topic, List<Event> events) {
        Flux<SenderRecord<String, String, String>> records = Flux.fromIterable(events)
            .map(event -> {
                String json = objectMapper.writeValueAsString(event);
                return SenderRecord.create(
                    new ProducerRecord<>(topic, event.getId(), json), event.getId());
            });

        return kafkaSender.send(records)
            .doOnError(e -> log.error("Batch send failed", e));
    }
}

6.2 CQRS + Event Sourcing(WebFluxとKafka)

CQRS(コマンドクエリ責務分離)は読み取りモデルと書(か)き込みモデルを分離(ぶんり)します。Event Sourcingと組み合わせることで、すべての状態変更が不変のイベントとして保存されます。

┌─────────────────────────────────────────────────────────┐
CQRS + Event Sourcing│                                                         │
│  コマンド側                      クエリ側               │
│  ┌──────────┐                    ┌──────────┐           │
│  │ WebFlux  │                    │ WebFlux  │           │
│  │ Command  │                    │ Query    │           │
│  │ Handler  │                    │ Handler  │           │
│  └────┬─────┘                    └────▲─────┘           │
│       │                               │                 │
│  ┌────▼─────┐                    ┌────┴─────┐           │
│  │ Domain   │                    │ Read     │           │
│  │ Aggregate│Model    │           │
│  └────┬─────┘                     (R2DBC)  │           │
│       │                          └────▲─────┘           │
│  ┌────▼─────────────────────────────┐ │                 │
│  │         Kafka (Event Store)      ├─┘                 │
│  │  orders.events topic             │                   │
│  └──────────────────────────────────┘                   │
└─────────────────────────────────────────────────────────┘

コマンドハンドラー:

@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderCommandController {

    private final OrderCommandService commandService;

    @PostMapping
    @ResponseStatus(HttpStatus.ACCEPTED)
    public Mono<OrderId> createOrder(@RequestBody Mono<CreateOrderCommand> command) {
        return command.flatMap(commandService::handle);
    }

    @PostMapping("/{orderId}/confirm")
    public Mono<ResponseEntity<Void>> confirmOrder(@PathVariable String orderId) {
        return commandService.handle(new ConfirmOrderCommand(orderId))
            .then(Mono.just(ResponseEntity.accepted().build()));
    }
}

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;

    public Mono<OrderId> handle(CreateOrderCommand command) {
        // コマンドのバリデーション
        OrderAggregate aggregate = OrderAggregate.create(command);
        List<DomainEvent> events = aggregate.getUncommittedEvents();

        // Kafkaにイベントを公開
        return Flux.fromIterable(events)
            .flatMap(event -> publishEvent("order-events", aggregate.getId(), event))
            .then(Mono.just(new OrderId(aggregate.getId())));
    }

    private Mono<Void> publishEvent(String topic, String key, DomainEvent event) {
        return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
            .flatMap(json -> {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, json);
                record.headers().add("event-type", event.getType().getBytes());
                return kafkaSender.send(Mono.just(SenderRecord.create(record, key)))
                    .next()
                    .then();
            });
    }
}

イベントプロジェクション(読み取り側の更新):

@Service
@RequiredArgsConstructor
public class OrderProjectionService {

    private final KafkaReceiver<String, String> kafkaReceiver;
    private final OrderReadRepository readRepository;
    private final ObjectMapper objectMapper;

    @PostConstruct
    public void startProjection() {
        kafkaReceiver.receive()
            .concatMap(record -> {
                String eventType = new String(
                    record.headers().lastHeader("event-type").value());
                return processEvent(eventType, record.value())
                    .doOnSuccess(v -> record.receiverOffset().acknowledge());
            })
            .subscribe();
    }

    private Mono<Void> processEvent(String eventType, String payload) {
        return switch (eventType) {
            case "OrderCreated" -> {
                OrderCreatedEvent event = objectMapper.readValue(payload, OrderCreatedEvent.class);
                yield readRepository.save(OrderReadModel.from(event)).then();
            }
            case "OrderConfirmed" -> {
                OrderConfirmedEvent event = objectMapper.readValue(payload, OrderConfirmedEvent.class);
                yield readRepository.findById(event.getOrderId())
                    .map(model -> model.withStatus("CONFIRMED"))
                    .flatMap(readRepository::save)
                    .then();
            }
            default -> Mono.empty();
        };
    }
}

7. 実践プロジェクト:リアルタイム株価ティッカーシステム

7.1 システムアーキテクチャ

┌──────────────────────────────────────────────────────────────────┐
│             リアルタイム株価ティッカーシステム                    │
│                                                                  │
│  ┌─────────────┐     ┌──────────────┐     ┌─────────────────┐   │
│  │ マーケット  │     │   Kafka      │     │   WebFlux       │   │
│  │ データ      │────▶│   Cluster    │────▶│   API Gateway   │   │
│  │ プロバイダ  │     │              │     │                 │   │
 (WebSocket) │     │ raw-prices   │     │  SSE /prices    │   │
│  └─────────────┘     │ processed    │     │  WS  /ws/trade  │   │
│                      │ alerts       │     │  gRPC /internal │   │
│  ┌─────────────┐     │              │     └───────┬─────────┘   │
│  │ ニュース    │────▶│              │             │             │
│  │ フィード    │     └──────┬───────┘             │             │
 (REST API)  │            │              ┌──────▼─────────┐   │
│  └─────────────┘     ┌──────▼───────┐      │   クライアント │   │
│                      │ Kafka Streams│Webダッシュボード│
│                      │ 処理         │      │  モバイルアプリ │   │
│                      │              │      │  トレーディングBot│  │
│                      │ - VWAP計算   │      └────────────────┘   │
│                      │ - 異常検知   │                           │
│                      │ - 集約       │      ┌────────────────┐   │
│                      └──────┬───────┘      │   PostgreSQL   │   │
│                             │                 (R2DBC)      │   │
│                             └─────────────▶│   + Redis      │   │
│                                            └────────────────┘   │
└──────────────────────────────────────────────────────────────────┘

7.2 マーケットデータ取(と)り込(こ)みサービス

@Service
@RequiredArgsConstructor
@Slf4j
public class MarketDataIngestionService {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;
    private final WebSocketClient webSocketClient;

    @PostConstruct
    public void connect() {
        URI uri = URI.create("wss://market-data-provider.example.com/stream");

        webSocketClient.execute(uri, session -> {
            // シンボルのサブスクライブ
            Mono<Void> subscribe = session.send(
                Mono.just(session.textMessage(
                    "{\"action\":\"subscribe\",\"symbols\":[\"AAPL\",\"GOOG\",\"MSFT\",\"AMZN\"]}"
                ))
            );

            // 受信マーケットデータの処理
            Flux<Void> receive = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .flatMap(this::publishToKafka)
                .doOnError(e -> log.error("Market data stream error", e));

            return subscribe.thenMany(receive).then();
        })
        .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
            .maxBackoff(Duration.ofMinutes(5)))
        .subscribe();
    }

    private Mono<Void> publishToKafka(String rawData) {
        try {
            MarketTick tick = objectMapper.readValue(rawData, MarketTick.class);
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "raw-prices", tick.getSymbol(), rawData);
            record.headers()
                .add("source", "market-data-provider".getBytes())
                .add("timestamp", String.valueOf(tick.getTimestamp()).getBytes());

            return kafkaSender.send(Mono.just(SenderRecord.create(record, tick.getSymbol())))
                .next()
                .then();
        } catch (Exception e) {
            log.error("Failed to parse market data: {}", rawData, e);
            return Mono.empty();
        }
    }
}

7.3 Kafka Streamsによる価格処理

@Component
public class PriceProcessingTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder builder) {
        KStream<String, String> rawPrices = builder.stream("raw-prices");

        // 1. パースとバリデーション
        KStream<String, StockPrice> validPrices = rawPrices
            .mapValues(this::parsePrice)
            .filter((symbol, price) -> price != null && price.getPrice() > 0);

        // 2. VWAP(出来高加重平均価格)の1分ウィンドウ計算
        KTable<Windowed<String>, VWAPResult> vwap = validPrices
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .aggregate(
                VWAPAccumulator::new,
                (symbol, price, acc) -> acc.add(price.getPrice(), price.getVolume()),
                Materialized.<String, VWAPAccumulator, WindowStore<Bytes, byte[]>>
                    as("vwap-store")
                    .withValueSerde(vwapAccumulatorSerde())
            )
            .mapValues(VWAPAccumulator::result);

        // 3. 異常検知(30秒以内に5%超の価格変動)
        validPrices
            .groupByKey()
            .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)))
            .aggregate(
                PriceRange::new,
                (symbol, price, range) -> range.update(price.getPrice()),
                Materialized.with(Serdes.String(), priceRangeSerde())
            )
            .toStream()
            .filter((key, range) -> range.percentChange() > 5.0)
            .mapValues((key, range) -> new PriceAlert(
                key.key(), range.percentChange(), "ANOMALY"))
            .to("alerts", Produced.with(windowedSerde(), priceAlertSerde()));

        // 4. 処理済み価格の出力
        validPrices
            .mapValues(price -> new ProcessedPrice(
                price.getSymbol(), price.getPrice(), price.getVolume(),
                price.getTimestamp(), System.currentTimeMillis()))
            .to("processed-prices", Produced.with(Serdes.String(), processedPriceSerde()));
    }
}

7.4 Webクライアント向けSSEストリーミングエンドポイント

@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {

    private final KafkaReceiver<String, String> processedPriceReceiver;
    private final ObjectMapper objectMapper;
    private final Sinks.Many<ProcessedPrice> priceSink;

    @PostConstruct
    public void init() {
        // KafkaからリアクティブSinkへのブリッジ
        processedPriceReceiver.receive()
            .map(record -> {
                record.receiverOffset().acknowledge();
                try {
                    return objectMapper.readValue(record.value(), ProcessedPrice.class);
                } catch (Exception e) {
                    return null;
                }
            })
            .filter(Objects::nonNull)
            .subscribe(price -> priceSink.tryEmitNext(price));
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<ProcessedPrice>> streamPrices(
            @RequestParam(required = false) List<String> symbols) {
        Flux<ProcessedPrice> priceFlux = priceSink.asFlux();

        if (symbols != null && !symbols.isEmpty()) {
            Set<String> symbolSet = new HashSet<>(symbols);
            priceFlux = priceFlux.filter(p -> symbolSet.contains(p.getSymbol()));
        }

        return priceFlux
            .map(price -> ServerSentEvent.<ProcessedPrice>builder()
                .id(price.getSymbol() + "-" + price.getTimestamp())
                .event("price")
                .data(price)
                .build())
            .doOnCancel(() -> log.info("Client disconnected from price stream"));
    }

    @GetMapping(value = "/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<PriceAlert>> streamAlerts() {
        return alertReceiver.receive()
            .map(record -> {
                record.receiverOffset().acknowledge();
                PriceAlert alert = objectMapper.readValue(record.value(), PriceAlert.class);
                return ServerSentEvent.<PriceAlert>builder()
                    .event("alert")
                    .data(alert)
                    .build();
            });
    }
}

7.5 アプリケーション設定

# application.yml
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/stockdb
    username: stockapp
    password: secret
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    consumer:
      group-id: stock-ticker-service
      auto-offset-reset: latest
      enable-auto-commit: false
    producer:
      acks: all
      compression-type: zstd
      batch-size: 32768
      properties:
        linger.ms: 10

  webflux:
    base-path: /api

server:
  port: 8080
  netty:
    connection-timeout: 5000

management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    tags:
      application: stock-ticker

7.6 ローカル開発用Docker Compose

version: '3.8'
services:
  kafka-1:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
    ports:
      - '9092:9092'

  kafka-2:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

  kafka-3:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: stockdb
      POSTGRES_USER: stockapp
      POSTGRES_PASSWORD: secret
    ports:
      - '5432:5432'

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

8. 意思決定(いしけってい)マトリクス:いつ何(なに)を使うか

8.1 技術選定(ぎじゅつせんてい)ガイド

シナリオWebFluxKafkaSSEWebSocketgRPC
リアルタイムダッシュボードYesYes最適Good過剰
チャットアプリケーションYes任意No最適Good
IoTデータパイプラインYes最適NoGood最適
ECサイト注文処理任意最適NoNoGood
ライブスポーツスコアYesYes最適GoodNo
金融取引プラットフォームYes最適No最適最適
マイクロサービスイベントバスNo最適NoNoGood
サーバー通知Yes任意最適GoodNo
ビデオ/オーディオストリーミングNoNoNo部分的Good
ログ集約パイプラインNo最適NoNoGood

8.2 決定フローチャート

開始: リアルタイムデータが必要か?
├── No → 従来のREST API(Spring MVC)で十分
└── Yes → サーバーからクライアントへの一方向のみか?
    ├── YesSSE(最もシンプル、自動再接続)
    └── No → 双方向通信が必要か?
        ├── Yes → ブラウザ向けか?
        │   │
        │   ├── YesWebSocket
        │   │
        │   └── No → gRPC双方向ストリーミング
        └── 耐久性のあるイベントストレージが必要か?
            ├── YesKafka + 選択したコンシューマ
            └── No → 直接WebFluxストリーミング

8.3 ワークロード別パフォーマンス比較

ワークロードSpring MVCWebFluxWebFlux + Kafka備考(びこう)
100同時ユーザー50ms avg48ms52ms差はわずか
10K同時ユーザー2s avg120ms150msWebFluxが光る
100K同時ユーザータイムアウト250ms300msMVCでは処理不可
100万イベント/秒取り込みN/A限定的120万/秒Kafka必須
複雑なイベント処理N/A可能50万/秒Kafka Streamsの強み

9. 6ヶ月(かげつ)学習ロードマップ

月(つき)1-2:リアクティブの基礎

週(しゅう)トピック成果物(せいかぶつ)
1Reactive Streams仕様、Publisher/SubscriberカスタムPublisherの実装
2Project Reactor:Mono/Fluxの基礎20のオペレータ演習
3エラーハンドリング、リトライ、タイムアウトパターン耐障害APIクライアント
4リアクティブコードのテスト(StepVerifier)演習の90%テストカバレッジ
5-6Spring WebFlux:コントローラ、関数型エンドポイントCRUD REST API(WebFlux)
7-8R2DBC、WebClient、リアクティブセキュリティフルスタックリアクティブアプリ

月3-4:Kafkaマスタリー

トピック成果物
9-10Kafka基礎:トピック、パーティション、コンシューマローカル3ブローカークラスタ構築
11-12Spring Kafka:プロデューサ/コンシューマパターン注文処理システム
13-14Kafka Streams:ステートレス/ステートフル操作リアルタイム分析パイプライン
15-16Exactly-once、トランザクション、Schema Registryトランザクショナルイベントシステム

月5:統合とStreaming API

トピック成果物
17WebFluxでのSSEライブ通知システム
18WebFluxでのWebSocketチャットアプリケーション
19Spring BootでのgRPC Streaming内部サービス間通信
20Reactor Kafka + CQRSパターンイベントソースドマイクロサービス

月6:本番環境(ほんばんかんきょう)対応

トピック成果物
21KRaft移行、クラスタ運用KRaftクラスタデプロイ
22監視:Micrometer、Prometheus、Grafanaオブザーバビリティダッシュボード
23パフォーマンスチューニング、負荷テストベンチマークレポート
24キャップストーン:リアルタイム株価ティッカーシステム本番環境対応プロジェクト

10. クイズ

Q1: I/Oバウンドワークロードにおいて、Spring WebFluxがSpring MVCより優(すぐ)れている主(おも)な利点は何ですか?

回答(かいとう): WebFluxは、わずか数スレッドで数千の同時接続を処理できるノンブロッキングイベントループモデル(Netty)を使用しています。Spring MVCは、各同時リクエストが1つのスレッドを占有するthread-per-requestモデルを使用します。I/Oバウンドワークロード(スレッドがデータベース、ネットワーク、ファイル操作の待機にほとんどの時間を費やす場合)では、WebFluxは待機にスレッドを浪費(ろうひ)しないため、圧倒的に効率的です。ベンチマークでは、I/Oバウンドワークロードにおいて、WebFluxは7-8倍のスループットを77%少ないメモリで処理できます。

Q2: Project ReactorにおけるflatMapconcatMapswitchMapの違(ちが)いを説明してください。

回答:

  • flatMap:各要素をPublisherにマップし、eagerly(即座に)マージします。内部のPublisherは並行して実行され、結果は順序通りでない場合があります。順序が重要でない場合に最大スループットを得るのに最適です。
  • concatMap:各要素をPublisherにマップし、順次サブスクライブします。次のサブスクリプションの前に各内部Publisherの完了を待ちます。順序を保持しますが、スループットは低くなります。
  • switchMap:各要素をPublisherにマップしますが、新しい要素が到着すると前の内部Publisherをキャンセルします。最新のサブスクリプションのみがアクティブです。最新のクエリ結果のみが重要な検索中入力(search-as-you-type)に理想的です。
Q3: KafkaがZooKeeperをKRaftに置き換えた理由は?具体的な技術的メリットを3つ挙(あ)げてください。

回答:

  1. パーティションスケーラビリティ:ZooKeeperは全メタデータをメモリに格納しており、クラスタを約20万パーティションに制限していました。KRaftはログベースのメタデータストアを使用し、数百万パーティションを可能にします。
  2. 高速フェイルオーバー:ZooKeeperモードでのコントローラフェイルオーバーにはZooKeeperからの全メタデータ読み取りが必要でした(数分)。KRaftのフェイルオーバーは、新しいコントローラがRaftを通じて既にメタデータをレプリケートしているため、数秒で完了します。
  3. 運用の簡素化:ZooKeeperでは、2つの別々の分散システムのデプロイ、設定、監視、保守が必要でした。KRaftはZooKeeperの依存を完全に排除し、運用の複雑さを軽減します。

追加の利点として、より高速な制御されたシャットダウン、KafkaとZooKeeper間のスプリットブレインリスクの排除、イベント駆動のメタデータ伝播(ZooKeeperの非同期で遅延する可能性のある伝播に対して)があります。

Q4: リアルタイム機能にWebSocketよりSSEを選ぶべきケースは?

回答: 以下の場合にSSEを選択します:

  1. 通信がサーバーからクライアントへの一方向のみ — 株価ティッカー、ニュースフィード、通知、ダッシュボード。同じ接続でクライアントからデータを送り返す必要がない場合。
  2. 自動再接続が必要 — SSEにはブラウザの自動再接続機能が内蔵されています。WebSocketでは手動の再接続ロジックが必要です。
  3. HTTPインフラストラクチャとの互換性 — SSEは標準的なHTTP/1.1で動作し、特別な設定なしにプロキシやロードバランサを通過します。WebSocketのアップグレードは企業のファイアウォールでブロックされる場合があります。
  4. シンプルさ — SSEの実装はシンプルです(text/event-streamを返すGETエンドポイントのみ)。WebSocketでは接続管理、ハートビート、再接続ロジックが必要です。

双方向通信(チャット、ゲーム、共同編集)やバイナリデータ転送が必要な場合はWebSocketを選択します。

Q5: WebFlux、Kafka、SSEを組み合わせたシステムを設計してください。イベント生成からクライアント表示までのデータフローを説明してください。

回答: リアルタイム注文追跡(ついせき)システムを考えましょう:

  1. イベント生成:注文ステータスが変更されると、注文サービスがReactor Kafkaを使用してKafkaトピック(order-events)にOrderStatusChangedイベントを公開します。プロデューサは耐久性のためacks=allを使用します。

  2. イベント処理:Kafka Streamsアプリケーションがorder-eventsから消費し、顧客データ(customer-profilesトピックからのKTable結合)でイベントをエンリッチし、エンリッチされたイベントをorder-notificationsトピックに出力します。

  3. WebFluxブリッジ:WebFluxサービスがKafkaReceiver(Reactor Kafka)を使用してorder-notificationsからリアクティブに消費します。イベントは顧客IDでキー付けされたSinks.Many(マルチキャストホットパブリッシャー)にルーティングされます。

  4. SSEエンドポイント:WebFluxコントローラが、認証済みユーザーの顧客IDでSinks.ManyのFluxをフィルタリングするSSEエンドポイント/api/orders/trackを公開します。一致する各イベントがServerSentEventとして送信されます。

  5. クライアント:ブラウザがSSEエンドポイントを指すEventSourceを作成します。各イベントでUIが注文追跡ビジュアライゼーションを更新します。接続が切断された場合、ブラウザは自動再接続します(SSE内蔵機能)。

データフロー:注文サービス → Kafka → Kafka Streams → Kafka → Reactor Kafka → Sinks.Many → SSE → ブラウザ


参考文献(さんこうぶんけん)

  1. Reactive Manifesto
  2. Reactive Streams仕様
  3. Project Reactorリファレンスガイド
  4. Spring WebFluxドキュメント
  5. Spring WebFlux vs MVCパフォーマンス
  6. R2DBC仕様
  7. Netty Project
  8. Apache Kafkaドキュメント
  9. Kafka: The Definitive Guide(第2版)
  10. KIP-500: ZooKeeperの置き換え
  11. Kafka KRaftドキュメント
  12. Kafka Streamsドキュメント
  13. Confluent Schema Registry
  14. Kafka Connectドキュメント
  15. Reactor Kafkaリファレンスガイド
  16. Server-Sent Events (MDN)
  17. WebSocket API (MDN)
  18. gRPCドキュメント
  19. CQRSパターン (Microsoft)
  20. Event Sourcingパターン (Microsoft)
  21. Debezium CDCドキュメント
  22. Spring Boot gRPC Starter
  23. Kafkaパフォーマンスチューニングガイド (Confluent)
  24. ZooKeeperドキュメント
  25. Raftコンセンサスアルゴリズム