Skip to content
Published on

WebFlux + Kafka + ZooKeeper Mastery: The Complete Guide to Reactive Streaming Architecture

Authors

Introduction: Why Reactive Streaming Architecture Matters in 2026

Modern applications are expected to process millions of events per second, deliver real-time updates to users, and scale elastically under unpredictable load. Traditional request-response architectures with blocking I/O simply cannot meet these demands. In 2026, the combination of Spring WebFlux for non-blocking request handling, Apache Kafka for durable event streaming, and the transition from ZooKeeper to KRaft for simplified cluster management represents the gold standard for reactive streaming systems.

This guide covers the full stack: from reactive programming fundamentals to production-grade architecture patterns. Whether you are building a real-time analytics dashboard, a financial trading platform, or an IoT data pipeline, you will find actionable knowledge here.

What You Will Learn

  • Reactive programming principles and the Reactive Streams specification
  • Spring WebFlux internals: Netty event loop, Mono/Flux, operator chains, and R2DBC
  • Apache Kafka architecture: partitions, consumer groups, Exactly-Once Semantics, Kafka Streams, and Connect
  • The ZooKeeper to KRaft migration: why, when, and how
  • Streaming API comparison: SSE vs WebSocket vs gRPC Streaming
  • Integration patterns: WebFlux + Reactor Kafka + CQRS + Event Sourcing
  • A complete real-world project: real-time stock ticker system

1. Reactive Programming Fundamentals

1.1 The Reactive Manifesto

The Reactive Manifesto defines four key properties that reactive systems must exhibit:

PropertyDescriptionImplementation Example
ResponsiveSystem responds in a timely mannerWebFlux non-blocking I/O
ResilientSystem stays responsive in the face of failureCircuit breaker, bulkhead patterns
ElasticSystem stays responsive under varying workloadKafka partition scaling
Message DrivenSystem relies on asynchronous message passingKafka topics, Reactor event streams

These four properties are not independent. Message-driven architecture enables elasticity and resilience, which together enable responsiveness.

1.2 Reactive Streams Specification

Reactive Streams is a standard for asynchronous stream processing with non-blocking backpressure. It defines four core interfaces:

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

public interface Subscription {
    void request(long n);
    void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

The critical innovation is the request(long n) method in Subscription. This enables backpressure: the subscriber controls the rate of data flow, preventing the producer from overwhelming the consumer.

1.3 Understanding Backpressure

Backpressure is the mechanism by which a data consumer signals to a producer how much data it can handle. Without backpressure, fast producers overwhelm slow consumers, leading to memory exhaustion or dropped messages.

Producer (1000 msg/s) --> Buffer (overflows!) --> Consumer (100 msg/s)

With backpressure:
Producer (adjusts to 100 msg/s) <-- request(100) -- Consumer (100 msg/s)

Backpressure strategies in Project Reactor:

Flux.range(1, Integer.MAX_VALUE)
    .onBackpressureBuffer(256)          // Buffer up to 256 items
    .onBackpressureDrop(dropped ->      // Drop when buffer full
        log.warn("Dropped: {}", dropped))
    .onBackpressureLatest()             // Keep only latest item
    .onBackpressureError()              // Signal error when overwhelmed
    .subscribe(item -> processSlowly(item));

1.4 Reactive Libraries Landscape

LibraryLanguage/PlatformKey FeaturesAdoption
Project ReactorJavaMono/Flux, Spring integrationSpring ecosystem
RxJava 3JavaObservable/Flowable, Android supportAndroid, legacy Java
MutinyJavaUni/Multi, Quarkus-nativeQuarkus ecosystem
Kotlin FlowKotlinCoroutine-based, structured concurrencyKotlin-first projects
Akka StreamsScala/JavaActor-based, graph DSLHigh-throughput systems

2. Spring WebFlux Deep Dive

2.1 WebFlux vs Spring MVC Architecture

Spring MVC uses a thread-per-request model. Each incoming request occupies one thread from the pool until the response is fully written. Under high concurrency with I/O-bound workloads, threads are mostly idle waiting for database queries, HTTP calls, or file reads.

Spring WebFlux uses an event-loop model powered by Netty. A small number of event loop threads handle all requests. When I/O is needed, the request is parked (not blocking a thread), and the event loop moves on to handle other requests. When the I/O completes, the event loop picks up the response.

Spring MVC (Thread-per-request):
  Thread-1: [Request] --> [DB Wait......] --> [Response]
  Thread-2: [Request] --> [API Wait.........] --> [Response]
  Thread-3: [Request] --> [File Wait....] --> [Response]
  (200 threads = 200 concurrent requests)

Spring WebFlux (Event Loop):
  EventLoop-1: [Req1][Req2][Req3][Req1-resume][Req4][Req2-resume]...
  EventLoop-2: [Req5][Req6][Req5-resume][Req7][Req8]...
  (4 threads = thousands of concurrent requests)

2.2 Netty Event Loop Model

Netty is the default embedded server for WebFlux. Understanding its architecture is essential for performance tuning.

                    ┌─────────────────────────┐
Boss EventLoopGroup                       (accepts connections)                    └────────────┬────────────┘
                    ┌────────────▼────────────┐
Worker EventLoopGroup                       (handles I/O events)                    ├─────────────────────────┤
EventLoop-1  Channel-AEventLoop-2  Channel-BEventLoop-3  Channel-CEventLoop-4  Channel-D                    └─────────────────────────┘

Key rules for working with Netty event loops:

  1. Never block an event loop thread — no Thread.sleep(), no blocking I/O, no synchronized locks
  2. Offload CPU-intensive work to bounded elastic schedulers
  3. Channel affinity — each channel is bound to one event loop for its lifetime

2.3 Mono and Flux: The Core Types

Mono represents 0 or 1 element. Flux represents 0 to N elements.

// Mono: single value or empty
Mono<User> findUser = userRepository.findById(userId);
Mono<Void> saveResult = userRepository.save(user);

// Flux: stream of values
Flux<Order> orders = orderRepository.findByUserId(userId);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<ServerSentEvent<String>> sseStream = Flux.create(sink -> {
    // push events to sink
});

Creating Mono and Flux:

// Static factory methods
Mono.just("hello");
Mono.empty();
Mono.error(new RuntimeException("failure"));
Mono.fromCallable(() -> expensiveComputation());
Mono.defer(() -> Mono.just(dynamicValue()));

Flux.just("a", "b", "c");
Flux.fromIterable(myList);
Flux.range(1, 100);
Flux.interval(Duration.ofMillis(500));
Flux.merge(flux1, flux2);
Flux.concat(flux1, flux2);
Flux.zip(flux1, flux2, (a, b) -> a + b);

2.4 Essential Operators

// Transformation
flux.map(String::toUpperCase)
    .flatMap(name -> userService.findByName(name))   // async 1:N
    .concatMap(user -> enrichUser(user))              // sequential async
    .switchMap(query -> search(query))                // cancel previous
    .flatMapSequential(id -> fetchById(id))           // ordered parallel

// Filtering
flux.filter(user -> user.isActive())
    .distinct()
    .take(10)
    .skip(5)
    .takeUntil(event -> event.isTerminal())

// Error Handling
flux.onErrorReturn("default")
    .onErrorResume(ex -> fallbackFlux())
    .retry(3)
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
        .maxBackoff(Duration.ofSeconds(30))
        .filter(ex -> ex instanceof TransientException))
    .timeout(Duration.ofSeconds(5))

// Combining
Flux.merge(fastSource, slowSource)          // interleaved
Flux.concat(firstBatch, secondBatch)        // sequential
Flux.zip(names, ages, Person::new)          // paired
Mono.zip(userMono, profileMono, ordersMono) // parallel join
    .map(tuple -> buildResponse(tuple.getT1(), tuple.getT2(), tuple.getT3()))

// Side Effects (for logging/debugging only)
flux.doOnNext(item -> log.debug("Processing: {}", item))
    .doOnError(ex -> log.error("Error: {}", ex.getMessage()))
    .doOnComplete(() -> log.info("Stream completed"))
    .doOnSubscribe(sub -> log.info("Subscribed"))

2.5 WebFlux Controller and Functional Endpoints

Annotation-based (familiar MVC style):

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
        return userService.findById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamUsers() {
        return userService.streamAllUsers();
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<User> createUser(@Valid @RequestBody Mono<User> userMono) {
        return userMono.flatMap(userService::create);
    }
}

Functional endpoints (router + handler):

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
            .path("/api/users", builder -> builder
                .GET("/{id}", handler::getUser)
                .GET("", handler::listUsers)
                .POST("", handler::createUser)
                .PUT("/{id}", handler::updateUser)
                .DELETE("/{id}", handler::deleteUser))
            .build();
    }
}

@Component
public class UserHandler {

    private final UserService userService;

    public Mono<ServerResponse> getUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> listUsers(ServerRequest request) {
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(userService.findAll(), User.class);
    }
}

2.6 R2DBC: Reactive Database Access

R2DBC (Reactive Relational Database Connectivity) brings non-blocking database access to WebFlux applications.

@Configuration
@EnableR2dbcRepositories
public class DatabaseConfig extends AbstractR2dbcConfiguration {

    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return ConnectionFactories.get(ConnectionFactoryOptions.builder()
            .option(DRIVER, "postgresql")
            .option(HOST, "localhost")
            .option(PORT, 5432)
            .option(DATABASE, "mydb")
            .option(USER, "user")
            .option(PASSWORD, "password")
            .option(MAX_SIZE, 20)
            .build());
    }
}

public interface UserRepository extends ReactiveCrudRepository<User, String> {

    Flux<User> findByStatusOrderByCreatedAtDesc(String status);

    @Query("SELECT * FROM users WHERE email = :email")
    Mono<User> findByEmail(String email);

    @Query("SELECT * FROM users WHERE department = :dept")
    Flux<User> findByDepartment(String dept);
}

R2DBC with DatabaseClient for complex queries:

@Repository
public class CustomUserRepository {

    private final DatabaseClient databaseClient;

    public Flux<User> searchUsers(String keyword, int limit) {
        return databaseClient.sql(
                "SELECT * FROM users WHERE name ILIKE :keyword LIMIT :limit")
            .bind("keyword", "%" + keyword + "%")
            .bind("limit", limit)
            .map(row -> new User(
                row.get("id", String.class),
                row.get("name", String.class),
                row.get("email", String.class)
            ))
            .all();
    }
}

2.7 WebFlux Performance Benchmarks

Benchmark conditions: 4-core CPU, 8GB RAM, 10,000 concurrent connections, I/O-bound workload (50ms database latency per request).

MetricSpring MVC (Tomcat)Spring WebFlux (Netty)Improvement
Throughput (req/s)2,40018,5007.7x
Avg Latency (ms)4,20054087% lower
P99 Latency (ms)12,0001,20090% lower
Memory Usage (MB)2,10048077% lower
Threads Active200896% fewer
Max Concurrent Requests20050,000+250x

When NOT to use WebFlux:

  • CPU-bound workloads (compression, encryption, heavy computation)
  • Applications already performing well with MVC
  • Teams without reactive programming experience
  • When blocking libraries are unavoidable (JDBC without R2DBC wrapper)
  • Simple CRUD applications with low concurrency

3. Apache Kafka: The Complete Guide

3.1 Kafka Architecture Overview

Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day. Its architecture is built around a few key concepts:

┌─────────────────────────────────────────────────────────────┐
Kafka Cluster│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                  │
│  │ Broker 1 │  │ Broker 2 │  │ Broker 3 │                  │
│  │          │  │          │  │          │                  │
│  │ Topic-A  │  │ Topic-A  │  │ Topic-A  │                  │
│  │ Part-0   │  │ Part-1   │  │ Part-2   │                  │
 (leader) (leader) (leader) │                  │
│  │          │  │          │  │          │                  │
│  │ Topic-A  │  │ Topic-A  │  │ Topic-A  │                  │
│  │ Part-1   │  │ Part-2   │  │ Part-0   │                  │
 (replica) (replica) (replica)│                  │
│  └──────────┘  └──────────┘  └──────────┘                  │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Metadata: ZooKeeper (legacy) or KRaft (Kafka 3.5+) │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Core Concepts:

ConceptDescription
TopicNamed feed of messages, similar to a database table
PartitionOrdered, immutable sequence of records within a topic
OffsetSequential ID for each record within a partition
BrokerA Kafka server that stores data and serves clients
ProducerClient that publishes records to topics
ConsumerClient that reads records from topics
Consumer GroupSet of consumers that cooperatively consume from topics
ReplicationEach partition is replicated across multiple brokers for durability
ISRIn-Sync Replicas: replicas that are caught up with the leader

3.2 Producer Configuration and Best Practices

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // Reliability settings
        config.put(ProducerConfig.ACKS_CONFIG, "all");           // Wait for all ISR replicas
        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Performance tuning
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);     // 32KB batch
        config.put(ProducerConfig.LINGER_MS_CONFIG, 20);         // Wait 20ms for batching
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Sending messages with different patterns:

@Service
@RequiredArgsConstructor
public class OrderEventProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    // Fire-and-forget (highest throughput)
    public void sendFireAndForget(OrderEvent event) {
        kafkaTemplate.send("orders", event.getOrderId(), event);
    }

    // Synchronous send (highest reliability)
    public void sendSync(OrderEvent event) throws Exception {
        kafkaTemplate.send("orders", event.getOrderId(), event)
            .get(10, TimeUnit.SECONDS);
    }

    // Asynchronous send with callback (balanced)
    public CompletableFuture<SendResult<String, Object>> sendAsync(OrderEvent event) {
        return kafkaTemplate.send("orders", event.getOrderId(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to send order event: {}", event.getOrderId(), ex);
                } else {
                    log.info("Order event sent to partition {} offset {}",
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }

    // Send with headers
    public void sendWithHeaders(OrderEvent event) {
        ProducerRecord<String, Object> record = new ProducerRecord<>("orders",
            null, null, event.getOrderId(), event);
        record.headers()
            .add("event-type", "ORDER_CREATED".getBytes())
            .add("source", "order-service".getBytes())
            .add("correlation-id", UUID.randomUUID().toString().getBytes());
        kafkaTemplate.send(record);
    }
}

3.3 Consumer Configuration and Patterns

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // Offset management
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit

        // Performance tuning
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()),
            new FixedBackOff(1000L, 3)
        ));
        return factory;
    }
}

Consumer listener patterns:

@Component
@RequiredArgsConstructor
public class OrderEventConsumer {

    private final OrderService orderService;

    // Single record processing
    @KafkaListener(topics = "orders", groupId = "order-processing-group")
    public void processOrder(ConsumerRecord<String, OrderEvent> record,
                             Acknowledgment ack) {
        try {
            log.info("Processing order: {} from partition: {} offset: {}",
                record.value().getOrderId(),
                record.partition(),
                record.offset());
            orderService.process(record.value());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process order: {}", record.value().getOrderId(), e);
            // Don't acknowledge — will be retried by error handler
        }
    }

    // Batch processing
    @KafkaListener(topics = "analytics-events", groupId = "analytics-batch-group",
                   containerFactory = "batchKafkaListenerContainerFactory")
    public void processBatch(List<ConsumerRecord<String, AnalyticsEvent>> records,
                             Acknowledgment ack) {
        log.info("Processing batch of {} records", records.size());
        try {
            List<AnalyticsEvent> events = records.stream()
                .map(ConsumerRecord::value)
                .collect(Collectors.toList());
            analyticsService.bulkInsert(events);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Batch processing failed", e);
        }
    }

    // Filtering with headers
    @KafkaListener(topics = "orders",
                   filter = "orderCreatedFilter",
                   groupId = "notification-group")
    public void handleOrderCreated(OrderEvent event) {
        notificationService.sendOrderConfirmation(event);
    }
}

3.4 Exactly-Once Semantics (EOS)

Kafka supports exactly-once semantics through the combination of idempotent producers and transactional APIs.

@Configuration
public class KafkaTransactionalConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }
}

@Service
@RequiredArgsConstructor
public class TransactionalOrderService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional("kafkaTransactionManager")
    public void processOrderTransactionally(OrderEvent order) {
        // All messages in this method are sent atomically
        kafkaTemplate.send("orders-processed", order.getOrderId(), order);
        kafkaTemplate.send("inventory-updates", order.getProductId(),
            new InventoryEvent(order.getProductId(), -order.getQuantity()));
        kafkaTemplate.send("payment-requests", order.getOrderId(),
            new PaymentEvent(order.getOrderId(), order.getTotalAmount()));
        // If any send fails, all are rolled back
    }
}

Three levels of delivery guarantees:

GuaranteeConfigurationUse Case
At-most-onceacks=0, auto-commitMetrics, logging
At-least-onceacks=all, manual commit, idempotent consumerMost applications
Exactly-onceTransactional API + read-committed isolationFinancial, billing, inventory

3.5 Kafka Streams

Kafka Streams is a client library for building applications and microservices where the input and output data are stored in Kafka.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        config.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
        return new KafkaStreamsConfiguration(config);
    }
}

@Component
public class OrderAnalyticsTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder) {
        // Real-time order analytics pipeline
        KStream<String, OrderEvent> orders = streamsBuilder
            .stream("orders", Consumed.with(Serdes.String(), orderEventSerde()));

        // 1. Order count per product in tumbling windows
        KTable<Windowed<String>, Long> productCounts = orders
            .groupBy((key, order) -> order.getProductId(),
                Grouped.with(Serdes.String(), orderEventSerde()))
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .count(Materialized.as("product-counts-store"));

        // 2. Revenue aggregation per category
        KTable<String, Double> revenueByCategory = orders
            .groupBy((key, order) -> order.getCategory(),
                Grouped.with(Serdes.String(), orderEventSerde()))
            .aggregate(
                () -> 0.0,
                (key, order, total) -> total + order.getTotalAmount(),
                Materialized.with(Serdes.String(), Serdes.Double())
            );

        // 3. Join with customer data for enrichment
        KTable<String, CustomerProfile> customers = streamsBuilder
            .table("customer-profiles",
                Consumed.with(Serdes.String(), customerProfileSerde()));

        KStream<String, EnrichedOrder> enrichedOrders = orders
            .selectKey((key, order) -> order.getCustomerId())
            .join(customers,
                (order, customer) -> new EnrichedOrder(order, customer),
                Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde()));

        enrichedOrders.to("enriched-orders",
            Produced.with(Serdes.String(), enrichedOrderSerde()));

        // 4. Fraud detection with session windows
        orders
            .filter((key, order) -> order.getTotalAmount() > 1000)
            .groupByKey()
            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
            .count()
            .toStream()
            .filter((windowedKey, count) -> count >= 3)
            .mapValues((key, count) -> "ALERT: " + count + " high-value orders in session")
            .to("fraud-alerts");
    }
}

3.6 Kafka Connect

Kafka Connect is a framework for connecting Kafka with external systems like databases, key-value stores, search indexes, and file systems.

{
  "name": "postgresql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "password",
    "database.dbname": "orders_db",
    "database.server.name": "order-server",
    "table.include.list": "public.orders,public.order_items",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "topic.prefix": "cdc",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "cdc-$3"
  }
}

Elasticsearch sink connector:

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "enriched-orders",
    "type.name": "_doc",
    "key.ignore": false,
    "schema.ignore": true,
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "batch.size": 200,
    "max.buffered.records": 5000,
    "flush.timeout.ms": 10000
  }
}

3.7 Schema Registry

Schema Registry provides a centralized repository for message schemas, enabling schema evolution and compatibility checking.

// Producer with Avro schema
@Configuration
public class AvroProducerConfig {

    @Bean
    public ProducerFactory<String, GenericRecord> avroProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        config.put("schema.registry.url", "http://schema-registry:8081");
        config.put("auto.register.schemas", true);
        return new DefaultKafkaProducerFactory<>(config);
    }
}

Schema evolution compatibility modes:

ModeAllowed ChangesBest For
BACKWARDDelete fields, add optional fieldsConsumer-first upgrades
FORWARDAdd fields, delete optional fieldsProducer-first upgrades
FULLAdd/delete optional fields onlyIndependent upgrades
NONEAny change allowedDevelopment only

3.8 Kafka Performance Tuning Cheat Sheet

ParameterDefaultRecommendedImpact
num.partitions16-12 per topicParallelism
replication.factor13Durability
min.insync.replicas12Write durability
batch.size (producer)1638432768-65536Throughput vs latency
linger.ms (producer)05-100Batching efficiency
compression.typenonezstd or lz4Network/disk savings
fetch.min.bytes (consumer)11024-16384Throughput vs latency
max.poll.records (consumer)500100-1000Processing batch size
segment.bytes1073741824536870912Log compaction frequency
retention.ms604800000Based on use caseStorage vs replay capability

4. ZooKeeper to KRaft: The Great Migration

4.1 ZooKeeper's Role in Kafka

ZooKeeper has been Kafka's metadata management system since the beginning. It handles:

  • Broker registration: tracking which brokers are alive
  • Topic metadata: partition assignments, ISR lists, configurations
  • Controller election: choosing which broker acts as the cluster controller
  • Consumer group coordination (legacy): storing consumer offsets (moved to Kafka itself in 0.9+)
  • ACLs: access control lists for authorization

4.2 Why ZooKeeper Had to Go

ProblemDescription
Operational complexityTwo distinct distributed systems to deploy, monitor, and maintain
Scaling bottleneckZooKeeper stores all metadata in memory; limits cluster to about 200K partitions
Split-brain risksNetwork partitions can cause ZooKeeper and Kafka to disagree
Recovery timeController failover requires reading all metadata from ZooKeeper (minutes)
Expertise requirementTeams need deep knowledge of both Kafka and ZooKeeper

4.3 KRaft: Kafka's Built-in Consensus

KRaft (Kafka Raft) replaces ZooKeeper with a built-in Raft-based consensus protocol. Metadata is stored in an internal Kafka topic (__cluster_metadata), and a set of controller nodes manage cluster state.

ZooKeeper Mode:                        KRaft Mode:
┌────────┐    ┌────────────┐           ┌──────────────────────┐
Broker │───▶│ ZooKeeper  │           │ Controller (Raft)Broker │───▶│ Ensemble   │           │  Node 1 (leader)Broker │───▶│ (3-5 nodes)│           │  Node 2 (follower)└────────┘    └────────────┘           │  Node 3 (follower)                                       └──────────┬───────────┘
                                       ┌──────────▼───────────┐
BrokersBroker 1Broker 2Broker 3                                       └──────────────────────┘

KRaft advantages:

FeatureZooKeeper ModeKRaft Mode
Max partitions~200,000Millions
Controller failoverMinutesSeconds
Metadata propagationAsynchronous (laggy)Event-driven (immediate)
DeploymentTwo systemsOne system
Operational overheadHighLow
Shutdown timeMinutesSeconds

4.4 KIP-500: The Roadmap

KIP-500 ("Replace ZooKeeper with a Self-Managed Metadata Quorum") defined the multi-year migration plan:

Kafka VersionMilestoneYear
2.8Early access KRaft (development only)2021
3.3KRaft marked production-ready2022
3.5ZooKeeper migration tool available2023
3.7Bridge release (dual support)2024
4.0ZooKeeper support removed entirely2025

4.5 Migration Guide: ZooKeeper to KRaft

Step 1: Pre-migration checklist

# Verify Kafka version (must be 3.5+)
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | head -1

# Check current ZooKeeper metadata
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
  --cluster-id $(kafka-storage.sh random-uuid)

# Verify no deprecated features in use
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers

Step 2: Configure KRaft controllers

# kraft/server.properties for controller nodes
process.roles=controller
node.id=1
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093
log.dirs=/var/kraft-controller-logs

Step 3: Run the migration

# Format controller storage
kafka-storage.sh format \
  --config kraft/server.properties \
  --cluster-id $(kafka-storage.sh random-uuid) \
  --release-version 3.7

# Start migration (ZooKeeper mode -> dual mode)
kafka-metadata.sh --migrate \
  --zookeeper-connect zk1:2181,zk2:2181,zk3:2181 \
  --bootstrap-server kafka1:9092

# Verify migration status
kafka-metadata.sh --status \
  --bootstrap-server kafka1:9092

# Finalize (point of no return)
kafka-metadata.sh --finalize \
  --bootstrap-server kafka1:9092

Step 4: Post-migration validation

# Verify all topics are accessible
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Verify consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Check controller status
kafka-metadata.sh --controllers \
  --bootstrap-server localhost:9092

4.6 ZAB vs Raft: Protocol Comparison

AspectZAB (ZooKeeper)Raft (KRaft)
Leader electionEpoch-basedTerm-based
Log replicationTwo-phase commitAppend entries
ConsistencySequential consistencyLinearizable reads (optional)
MembershipStatic configDynamic reconfiguration
SnapshotFuzzy snapshotsConsistent snapshots
ComplexityHighModerate (by design)
ImplementationSeparate systemIntegrated into Kafka

5. Streaming API Comparison: SSE vs WebSocket vs gRPC Streaming

5.1 Server-Sent Events (SSE)

SSE is a simple, HTTP-based protocol for server-to-client push. The server sends events over a long-lived HTTP connection using the text/event-stream content type.

WebFlux SSE endpoint:

@RestController
@RequestMapping("/api/stream")
public class SSEController {

    private final StockPriceService stockPriceService;

    @GetMapping(value = "/prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamPrices(
            @RequestParam List<String> symbols) {
        return stockPriceService.getPriceStream(symbols)
            .map(price -> ServerSentEvent.<StockPrice>builder()
                .id(String.valueOf(price.getTimestamp()))
                .event("price-update")
                .data(price)
                .retry(Duration.ofSeconds(5))
                .build());
    }

    @GetMapping(value = "/heartbeat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> heartbeat() {
        return Flux.interval(Duration.ofSeconds(30))
            .map(seq -> ServerSentEvent.<String>builder()
                .event("heartbeat")
                .data("ping")
                .build());
    }
}

JavaScript client:

const eventSource = new EventSource('/api/stream/prices?symbols=AAPL,GOOG,MSFT')

eventSource.addEventListener('price-update', (event) => {
  const price = JSON.parse(event.data)
  updateUI(price)
})

eventSource.addEventListener('heartbeat', () => {
  console.log('Connection alive')
})

eventSource.onerror = (error) => {
  console.error('SSE error:', error)
  // Browser automatically reconnects
}

5.2 WebSocket

WebSocket provides full-duplex communication over a single TCP connection. Both client and server can send messages at any time.

WebFlux WebSocket handler:

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping webSocketMapping(StockWebSocketHandler handler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/stocks", handler);
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Component
@RequiredArgsConstructor
public class StockWebSocketHandler implements WebSocketHandler {

    private final StockPriceService stockPriceService;
    private final ObjectMapper objectMapper;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // Incoming messages (subscriptions)
        Flux<String> incoming = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(msg -> log.info("Received subscription: {}", msg));

        // Outgoing price updates
        Flux<WebSocketMessage> outgoing = incoming
            .flatMap(msg -> {
                SubscriptionRequest req = parseRequest(msg);
                return stockPriceService.getPriceStream(req.getSymbols());
            })
            .map(price -> {
                try {
                    String json = objectMapper.writeValueAsString(price);
                    return session.textMessage(json);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });

        return session.send(outgoing);
    }
}

JavaScript client:

const ws = new WebSocket('ws://localhost:8080/ws/stocks')

ws.onopen = () => {
  ws.send(
    JSON.stringify({
      action: 'subscribe',
      symbols: ['AAPL', 'GOOG', 'MSFT'],
    })
  )
}

ws.onmessage = (event) => {
  const price = JSON.parse(event.data)
  updateUI(price)
}

ws.onclose = (event) => {
  console.log('WebSocket closed:', event.code, event.reason)
  // Manual reconnection needed
  setTimeout(reconnect, 3000)
}

5.3 gRPC Streaming

gRPC supports four communication patterns: unary, server streaming, client streaming, and bidirectional streaming.

Proto definition:

syntax = "proto3";

package stocks;

service StockService {
    // Unary
    rpc GetPrice (PriceRequest) returns (StockPrice);

    // Server streaming
    rpc StreamPrices (PriceStreamRequest) returns (stream StockPrice);

    // Client streaming
    rpc RecordTrades (stream TradeRecord) returns (TradeSummary);

    // Bidirectional streaming
    rpc LiveTrading (stream TradeOrder) returns (stream TradeConfirmation);
}

message PriceStreamRequest {
    repeated string symbols = 1;
    int32 interval_ms = 2;
}

message StockPrice {
    string symbol = 1;
    double price = 2;
    double change = 3;
    int64 timestamp = 4;
    int64 volume = 5;
}

Server-side implementation (Spring Boot + grpc-spring-boot-starter):

@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {

    private final StockPriceService stockPriceService;

    @Override
    public void streamPrices(PriceStreamRequest request,
                             StreamObserver<StockPrice> responseObserver) {
        stockPriceService.getPriceStream(request.getSymbolsList())
            .subscribe(
                price -> responseObserver.onNext(toProto(price)),
                error -> responseObserver.onError(
                    Status.INTERNAL.withDescription(error.getMessage()).asException()),
                responseObserver::onCompleted
            );
    }

    @Override
    public StreamObserver<TradeOrder> liveTrading(
            StreamObserver<TradeConfirmation> responseObserver) {
        return new StreamObserver<TradeOrder>() {
            @Override
            public void onNext(TradeOrder order) {
                TradeConfirmation confirmation = executeTrade(order);
                responseObserver.onNext(confirmation);
            }

            @Override
            public void onError(Throwable t) {
                log.error("Client error in live trading", t);
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

5.4 Comparison Table

FeatureSSEWebSocketgRPC Streaming
ProtocolHTTP/1.1WS (TCP)HTTP/2
DirectionServer to clientBidirectionalAll 4 patterns
Data FormatText (UTF-8)Text or BinaryProtobuf (binary)
Auto-reconnectBuilt-in (browser)ManualManual
BackpressureLimitedNone (native)Built-in (flow control)
Browser SupportAll modern browsersAll modern browsersgrpc-web (limited)
Firewall FriendlyYes (standard HTTP)Sometimes blockedRequires HTTP/2
MultiplexingNoNoYes (HTTP/2)
Max Connections6 per domain (HTTP/1)No limitMultiplexed
CompressionStandard HTTPPer-message (optional)Built-in
LatencyLowLowestLow
ThroughputModerateHighHighest
SerializationJSON (text)JSON or custom binaryProtobuf (efficient)
Use CaseNotifications, feedsChat, gaming, collabMicroservices, IoT

5.5 When to Choose What

  • SSE: Dashboard updates, news feeds, stock tickers (read-only), notification streams. Simple to implement, works through proxies, auto-reconnects.
  • WebSocket: Chat applications, collaborative editing, multiplayer games. Need bidirectional communication with low latency.
  • gRPC Streaming: Microservice-to-microservice communication, IoT data ingestion, high-throughput internal APIs. Need schema enforcement, efficient serialization, and HTTP/2 features.

6. WebFlux + Kafka Integration

6.1 Reactor Kafka

Reactor Kafka provides a reactive API for Kafka, integrating seamlessly with Spring WebFlux.

@Configuration
public class ReactorKafkaConfig {

    @Bean
    public ReceiverOptions<String, String> receiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return ReceiverOptions.<String, String>create(props)
            .subscription(Collections.singleton("events"))
            .addAssignListener(partitions ->
                log.info("Assigned: {}", partitions))
            .addRevokeListener(partitions ->
                log.info("Revoked: {}", partitions))
            .commitInterval(Duration.ofSeconds(5))
            .commitBatchSize(100);
    }

    @Bean
    public SenderOptions<String, String> senderOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        return SenderOptions.create(props);
    }

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        return KafkaReceiver.create(receiverOptions());
    }

    @Bean
    public KafkaSender<String, String> kafkaSender() {
        return KafkaSender.create(senderOptions());
    }
}

Reactive consumer with backpressure:

@Service
@RequiredArgsConstructor
public class ReactiveEventProcessor {

    private final KafkaReceiver<String, String> kafkaReceiver;
    private final EventService eventService;

    @PostConstruct
    public void startConsuming() {
        kafkaReceiver.receive()
            .groupBy(record -> record.receiverOffset().topicPartition())
            .flatMap(partitionFlux -> partitionFlux
                .publishOn(Schedulers.boundedElastic())
                .concatMap(record -> processRecord(record)
                    .doOnSuccess(v -> record.receiverOffset().acknowledge())
                    .onErrorResume(e -> {
                        log.error("Error processing record: {}", record.key(), e);
                        record.receiverOffset().acknowledge();
                        return Mono.empty();
                    })
                ))
            .subscribe();
    }

    private Mono<Void> processRecord(ReceiverRecord<String, String> record) {
        return eventService.process(record.value())
            .doOnNext(result ->
                log.debug("Processed: partition={} offset={} key={}",
                    record.partition(), record.offset(), record.key()));
    }
}

Reactive producer:

@Service
@RequiredArgsConstructor
public class ReactiveEventProducer {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;

    public Mono<Void> sendEvent(String topic, String key, Object event) {
        return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
            .flatMap(json -> {
                SenderRecord<String, String, String> record = SenderRecord.create(
                    new ProducerRecord<>(topic, key, json), key);
                return kafkaSender.send(Mono.just(record))
                    .next()
                    .doOnNext(result -> log.info("Sent to {}-{} offset {}",
                        result.recordMetadata().topic(),
                        result.recordMetadata().partition(),
                        result.recordMetadata().offset()))
                    .then();
            });
    }

    public Flux<SenderResult<String>> sendBatch(String topic, List<Event> events) {
        Flux<SenderRecord<String, String, String>> records = Flux.fromIterable(events)
            .map(event -> {
                String json = objectMapper.writeValueAsString(event);
                return SenderRecord.create(
                    new ProducerRecord<>(topic, event.getId(), json), event.getId());
            });

        return kafkaSender.send(records)
            .doOnError(e -> log.error("Batch send failed", e));
    }
}

6.2 CQRS + Event Sourcing with WebFlux and Kafka

CQRS (Command Query Responsibility Segregation) separates read and write models. Combined with Event Sourcing, every state change is stored as an immutable event.

┌─────────────────────────────────────────────────────────┐
CQRS + Event Sourcing│                                                         │
Command Side                    Query Side│  ┌──────────┐                    ┌──────────┐           │
│  │ WebFlux  │                    │ WebFlux  │           │
│  │ Command  │                    │ Query    │           │
│  │ Handler  │                    │ Handler  │           │
│  └────┬─────┘                    └────▲─────┘           │
│       │                               │                 │
│  ┌────▼─────┐                    ┌────┴─────┐           │
│  │ Domain   │                    │ Read     │           │
│  │ Aggregate│Model    │           │
│  └────┬─────┘                     (R2DBC)  │           │
│       │                          └────▲─────┘           │
│  ┌────▼─────────────────────────────┐ │                 │
│  │         Kafka (Event Store)      ├─┘                 │
│  │  orders.events topic             │                   │
│  └──────────────────────────────────┘                   │
└─────────────────────────────────────────────────────────┘

Command handler:

@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderCommandController {

    private final OrderCommandService commandService;

    @PostMapping
    @ResponseStatus(HttpStatus.ACCEPTED)
    public Mono<OrderId> createOrder(@RequestBody Mono<CreateOrderCommand> command) {
        return command.flatMap(commandService::handle);
    }

    @PostMapping("/{orderId}/confirm")
    public Mono<ResponseEntity<Void>> confirmOrder(@PathVariable String orderId) {
        return commandService.handle(new ConfirmOrderCommand(orderId))
            .then(Mono.just(ResponseEntity.accepted().build()));
    }
}

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;

    public Mono<OrderId> handle(CreateOrderCommand command) {
        // Validate command
        OrderAggregate aggregate = OrderAggregate.create(command);
        List<DomainEvent> events = aggregate.getUncommittedEvents();

        // Publish events to Kafka
        return Flux.fromIterable(events)
            .flatMap(event -> publishEvent("order-events", aggregate.getId(), event))
            .then(Mono.just(new OrderId(aggregate.getId())));
    }

    private Mono<Void> publishEvent(String topic, String key, DomainEvent event) {
        return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
            .flatMap(json -> {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, json);
                record.headers().add("event-type", event.getType().getBytes());
                return kafkaSender.send(Mono.just(SenderRecord.create(record, key)))
                    .next()
                    .then();
            });
    }
}

Event projection (read-side updater):

@Service
@RequiredArgsConstructor
public class OrderProjectionService {

    private final KafkaReceiver<String, String> kafkaReceiver;
    private final OrderReadRepository readRepository;
    private final ObjectMapper objectMapper;

    @PostConstruct
    public void startProjection() {
        kafkaReceiver.receive()
            .concatMap(record -> {
                String eventType = new String(
                    record.headers().lastHeader("event-type").value());
                return processEvent(eventType, record.value())
                    .doOnSuccess(v -> record.receiverOffset().acknowledge());
            })
            .subscribe();
    }

    private Mono<Void> processEvent(String eventType, String payload) {
        return switch (eventType) {
            case "OrderCreated" -> {
                OrderCreatedEvent event = objectMapper.readValue(payload, OrderCreatedEvent.class);
                yield readRepository.save(OrderReadModel.from(event)).then();
            }
            case "OrderConfirmed" -> {
                OrderConfirmedEvent event = objectMapper.readValue(payload, OrderConfirmedEvent.class);
                yield readRepository.findById(event.getOrderId())
                    .map(model -> model.withStatus("CONFIRMED"))
                    .flatMap(readRepository::save)
                    .then();
            }
            default -> Mono.empty();
        };
    }
}

7. Real-World Project: Real-Time Stock Ticker System

7.1 System Architecture

┌──────────────────────────────────────────────────────────────────┐
Real-Time Stock Ticker System│                                                                  │
│  ┌─────────────┐     ┌──────────────┐     ┌─────────────────┐   │
│  │ Market Data │     │   Kafka      │     │   WebFlux       │   │
│  │ Providers   │────▶│   Cluster    │────▶│   API Gateway   │   │
 (WebSocket) │     │              │     │                 │   │
│  └─────────────┘     │ raw-prices   │     │  SSE /prices    │   │
│                      │ processed    │     │  WS  /ws/trade  │   │
│  ┌─────────────┐     │ alerts       │     │  gRPC /internal │   │
│  │ News Feed   │────▶│              │     └───────┬─────────┘   │
 (REST API)  │     └──────┬───────┘             │             │
│  └─────────────┘            │              ┌──────▼─────────┐   │
│                      ┌──────▼───────┐      │   Clients      │   │
│                      │ Kafka Streams│Web Dashboard │   │
│                      │ Processing   │      │  Mobile App    │   │
│                      │              │      │  Trading Bot   │   │
│                      │ - VWAP calc  │      └────────────────┘   │
│                      │ - Anomaly    │                           │
│                      │ - Aggregate  │      ┌────────────────┐   │
│                      └──────┬───────┘      │   PostgreSQL   │   │
│                             │                 (R2DBC)      │   │
│                             └─────────────▶│   + Redis      │   │
│                                            └────────────────┘   │
└──────────────────────────────────────────────────────────────────┘

7.2 Market Data Ingestion Service

@Service
@RequiredArgsConstructor
@Slf4j
public class MarketDataIngestionService {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;
    private final WebSocketClient webSocketClient;

    @PostConstruct
    public void connect() {
        URI uri = URI.create("wss://market-data-provider.example.com/stream");

        webSocketClient.execute(uri, session -> {
            // Subscribe to symbols
            Mono<Void> subscribe = session.send(
                Mono.just(session.textMessage(
                    "{\"action\":\"subscribe\",\"symbols\":[\"AAPL\",\"GOOG\",\"MSFT\",\"AMZN\"]}"
                ))
            );

            // Process incoming market data
            Flux<Void> receive = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .flatMap(this::publishToKafka)
                .doOnError(e -> log.error("Market data stream error", e));

            return subscribe.thenMany(receive).then();
        })
        .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
            .maxBackoff(Duration.ofMinutes(5)))
        .subscribe();
    }

    private Mono<Void> publishToKafka(String rawData) {
        try {
            MarketTick tick = objectMapper.readValue(rawData, MarketTick.class);
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "raw-prices", tick.getSymbol(), rawData);
            record.headers()
                .add("source", "market-data-provider".getBytes())
                .add("timestamp", String.valueOf(tick.getTimestamp()).getBytes());

            return kafkaSender.send(Mono.just(SenderRecord.create(record, tick.getSymbol())))
                .next()
                .then();
        } catch (Exception e) {
            log.error("Failed to parse market data: {}", rawData, e);
            return Mono.empty();
        }
    }
}

7.3 Price Processing with Kafka Streams

@Component
public class PriceProcessingTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder builder) {
        KStream<String, String> rawPrices = builder.stream("raw-prices");

        // 1. Parse and validate
        KStream<String, StockPrice> validPrices = rawPrices
            .mapValues(this::parsePrice)
            .filter((symbol, price) -> price != null && price.getPrice() > 0);

        // 2. Calculate VWAP (Volume Weighted Average Price) per 1-minute window
        KTable<Windowed<String>, VWAPResult> vwap = validPrices
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .aggregate(
                VWAPAccumulator::new,
                (symbol, price, acc) -> acc.add(price.getPrice(), price.getVolume()),
                Materialized.<String, VWAPAccumulator, WindowStore<Bytes, byte[]>>
                    as("vwap-store")
                    .withValueSerde(vwapAccumulatorSerde())
            )
            .mapValues(VWAPAccumulator::result);

        // 3. Anomaly detection (price change > 5% in 30 seconds)
        validPrices
            .groupByKey()
            .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)))
            .aggregate(
                PriceRange::new,
                (symbol, price, range) -> range.update(price.getPrice()),
                Materialized.with(Serdes.String(), priceRangeSerde())
            )
            .toStream()
            .filter((key, range) -> range.percentChange() > 5.0)
            .mapValues((key, range) -> new PriceAlert(
                key.key(), range.percentChange(), "ANOMALY"))
            .to("alerts", Produced.with(windowedSerde(), priceAlertSerde()));

        // 4. Output processed prices
        validPrices
            .mapValues(price -> new ProcessedPrice(
                price.getSymbol(), price.getPrice(), price.getVolume(),
                price.getTimestamp(), System.currentTimeMillis()))
            .to("processed-prices", Produced.with(Serdes.String(), processedPriceSerde()));
    }
}

7.4 SSE Streaming Endpoint for Web Clients

@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {

    private final KafkaReceiver<String, String> processedPriceReceiver;
    private final ObjectMapper objectMapper;
    private final Sinks.Many<ProcessedPrice> priceSink;

    @PostConstruct
    public void init() {
        // Bridge Kafka to reactive sink
        processedPriceReceiver.receive()
            .map(record -> {
                record.receiverOffset().acknowledge();
                try {
                    return objectMapper.readValue(record.value(), ProcessedPrice.class);
                } catch (Exception e) {
                    return null;
                }
            })
            .filter(Objects::nonNull)
            .subscribe(price -> priceSink.tryEmitNext(price));
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<ProcessedPrice>> streamPrices(
            @RequestParam(required = false) List<String> symbols) {
        Flux<ProcessedPrice> priceFlux = priceSink.asFlux();

        if (symbols != null && !symbols.isEmpty()) {
            Set<String> symbolSet = new HashSet<>(symbols);
            priceFlux = priceFlux.filter(p -> symbolSet.contains(p.getSymbol()));
        }

        return priceFlux
            .map(price -> ServerSentEvent.<ProcessedPrice>builder()
                .id(price.getSymbol() + "-" + price.getTimestamp())
                .event("price")
                .data(price)
                .build())
            .doOnCancel(() -> log.info("Client disconnected from price stream"));
    }

    @GetMapping(value = "/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<PriceAlert>> streamAlerts() {
        return alertReceiver.receive()
            .map(record -> {
                record.receiverOffset().acknowledge();
                PriceAlert alert = objectMapper.readValue(record.value(), PriceAlert.class);
                return ServerSentEvent.<PriceAlert>builder()
                    .event("alert")
                    .data(alert)
                    .build();
            });
    }
}

7.5 Application Configuration

# application.yml
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/stockdb
    username: stockapp
    password: secret
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    consumer:
      group-id: stock-ticker-service
      auto-offset-reset: latest
      enable-auto-commit: false
    producer:
      acks: all
      compression-type: zstd
      batch-size: 32768
      properties:
        linger.ms: 10

  webflux:
    base-path: /api

server:
  port: 8080
  netty:
    connection-timeout: 5000

management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    tags:
      application: stock-ticker

7.6 Docker Compose for Local Development

version: '3.8'
services:
  kafka-1:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
    ports:
      - '9092:9092'

  kafka-2:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

  kafka-3:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: stockdb
      POSTGRES_USER: stockapp
      POSTGRES_PASSWORD: secret
    ports:
      - '5432:5432'

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

8. Decision Matrix: When to Use What

8.1 Technology Selection Guide

ScenarioWebFluxKafkaSSEWebSocketgRPC
Real-time dashboardYesYesBestGoodOverkill
Chat applicationYesOptionalNoBestGood
IoT data pipelineYesBestNoGoodBest
E-commerce order processingOptionalBestNoNoGood
Live sports scoresYesYesBestGoodNo
Financial trading platformYesBestNoBestBest
Microservice event busNoBestNoNoGood
Server notificationsYesOptionalBestGoodNo
Video/audio streamingNoNoNoPartialGood
Log aggregation pipelineNoBestNoNoGood

8.2 Decision Flowchart

Start: Do you need real-time data?
├── NoTraditional REST API (Spring MVC) is fine
└── YesIs it server-to-client only?
    ├── YesSSE (simplest, auto-reconnect)
    └── NoDo you need bidirectional communication?
        ├── YesIs it browser-facing?
        │   │
        │   ├── YesWebSocket
        │   │
        │   └── No → gRPC Bidirectional Streaming
        └── Do you need durable event storage?
            ├── YesKafka + consumer of choice
            └── NoDirect WebFlux streaming

8.3 Performance Comparison by Workload

WorkloadSpring MVCWebFluxWebFlux + KafkaNotes
100 concurrent users50ms avg48ms52msNegligible difference
10K concurrent users2s avg120ms150msWebFlux shines
100K concurrent usersTimeout250ms300msMVC cannot handle
1M events/sec ingestN/ALimited1.2M/sKafka essential
Complex event proc.N/APossible500K/sKafka Streams advantage

9. Six-Month Learning Roadmap

Month 1-2: Reactive Foundations

WeekTopicDeliverable
1Reactive Streams spec, Publisher/SubscriberImplement custom Publisher
2Project Reactor: Mono/Flux fundamentals20 operator exercises
3Error handling, retry, timeout patternsResilient API client
4Testing reactive code (StepVerifier)90% test coverage on exercises
5-6Spring WebFlux: controllers, functional endpointsCRUD REST API (WebFlux)
7-8R2DBC, WebClient, reactive securityFull-stack reactive application

Month 3-4: Kafka Mastery

WeekTopicDeliverable
9-10Kafka fundamentals: topics, partitions, consumersLocal 3-broker cluster setup
11-12Spring Kafka: producer/consumer patternsOrder processing system
13-14Kafka Streams: stateless and stateful operationsReal-time analytics pipeline
15-16Exactly-once, transactions, Schema RegistryTransactional event system

Month 5: Integration and Streaming APIs

WeekTopicDeliverable
17SSE with WebFluxLive notification system
18WebSocket with WebFluxChat application
19gRPC Streaming with Spring BootInternal service communication
20Reactor Kafka + CQRS patternEvent-sourced microservice

Month 6: Production Readiness

WeekTopicDeliverable
21KRaft migration, cluster operationsKRaft cluster deployment
22Monitoring: Micrometer, Prometheus, GrafanaObservability dashboard
23Performance tuning, load testingBenchmark report
24Capstone: Real-time stock ticker systemProduction-ready project

10. Quiz

Q1: What is the main advantage of Spring WebFlux over Spring MVC for I/O-bound workloads?

Answer: WebFlux uses a non-blocking event loop model (Netty) that can handle thousands of concurrent connections with just a few threads. Spring MVC uses a thread-per-request model where each concurrent request occupies a thread. For I/O-bound workloads (where threads spend most time waiting for database, network, or file operations), WebFlux is dramatically more efficient because it does not waste threads on waiting. In benchmarks, WebFlux can handle 7-8x more throughput with 77% less memory for I/O-bound workloads.

Q2: Explain the difference between flatMap, concatMap, and switchMap in Project Reactor.

Answer:

  • flatMap: Maps each element to a Publisher and merges them eagerly. Inner publishers run concurrently and results may arrive out of order. Best for maximum throughput when order does not matter.
  • concatMap: Maps each element to a Publisher and subscribes sequentially. Waits for each inner publisher to complete before subscribing to the next. Preserves order but lower throughput.
  • switchMap: Maps each element to a Publisher, but cancels the previous inner publisher when a new element arrives. Only the latest subscription is active. Ideal for search-as-you-type where only the latest query result matters.
Q3: Why did Kafka replace ZooKeeper with KRaft? Name three specific technical benefits.

Answer:

  1. Partition scalability: ZooKeeper stored all metadata in memory, limiting clusters to around 200,000 partitions. KRaft uses a log-based metadata store, enabling millions of partitions.
  2. Faster failover: Controller failover in ZooKeeper mode required reading all metadata from ZooKeeper (minutes). KRaft failover takes seconds because the new controller already has metadata replicated via Raft.
  3. Simplified operations: With ZooKeeper, teams had to deploy, configure, monitor, and maintain two separate distributed systems. KRaft eliminates the ZooKeeper dependency entirely, reducing operational complexity.

Additional benefits include faster controlled shutdown, elimination of split-brain risks between Kafka and ZooKeeper, and event-driven metadata propagation (versus ZooKeeper's asynchronous, potentially laggy propagation).

Q4: When would you choose SSE over WebSocket for a real-time feature?

Answer: Choose SSE when:

  1. Communication is server-to-client only — stock tickers, news feeds, notifications, dashboards. No need for the client to send data back on the same connection.
  2. You need auto-reconnect — SSE has built-in browser reconnection. WebSocket requires manual reconnection logic.
  3. HTTP infrastructure compatibility — SSE works over standard HTTP/1.1, passing through proxies and load balancers without special configuration. WebSocket upgrades can be blocked by corporate firewalls.
  4. Simplicity — SSE is simpler to implement (just a GET endpoint returning text/event-stream). WebSocket requires connection management, heartbeats, and reconnection logic.

Choose WebSocket when you need bidirectional communication (chat, gaming, collaborative editing) or binary data transfer.

Q5: Design a system that uses WebFlux, Kafka, and SSE together. Describe the data flow from event generation to client display.

Answer: Consider a real-time order tracking system:

  1. Event Generation: When an order status changes, the Order Service publishes an OrderStatusChanged event to a Kafka topic (order-events) using Reactor Kafka. The producer uses acks=all for durability.

  2. Event Processing: A Kafka Streams application consumes from order-events, enriches events with customer data (KTable join from customer-profiles topic), and produces enriched events to order-notifications topic.

  3. WebFlux Bridge: A WebFlux service uses KafkaReceiver (Reactor Kafka) to reactively consume from order-notifications. It routes events into a Sinks.Many (multicast hot publisher), keyed by customer ID.

  4. SSE Endpoint: The WebFlux controller exposes an SSE endpoint /api/orders/track that filters the Sinks.Many flux by the authenticated user's customer ID. Each matching event is sent as a ServerSentEvent.

  5. Client: The browser creates an EventSource pointing to the SSE endpoint. On each event, the UI updates the order tracking visualization. If the connection drops, the browser auto-reconnects (SSE built-in).

Data flow: Order Service to Kafka to Kafka Streams to Kafka to Reactor Kafka to Sinks.Many to SSE to Browser.


References

  1. Reactive Manifesto
  2. Reactive Streams Specification
  3. Project Reactor Reference Guide
  4. Spring WebFlux Documentation
  5. Spring WebFlux vs MVC Performance
  6. R2DBC Specification
  7. Netty Project
  8. Apache Kafka Documentation
  9. Kafka: The Definitive Guide (2nd Edition)
  10. KIP-500: Replace ZooKeeper
  11. Kafka KRaft Documentation
  12. Kafka Streams Documentation
  13. Confluent Schema Registry
  14. Kafka Connect Documentation
  15. Reactor Kafka Reference Guide
  16. Server-Sent Events (MDN)
  17. WebSocket API (MDN)
  18. gRPC Documentation
  19. CQRS Pattern (Microsoft)
  20. Event Sourcing Pattern (Microsoft)
  21. Debezium CDC Documentation
  22. Spring Boot gRPC Starter
  23. Kafka Performance Tuning Guide (Confluent)
  24. ZooKeeper Documentation
  25. Raft Consensus Algorithm