- Published on
WebFlux + Kafka + ZooKeeper Mastery: The Complete Guide to Reactive Streaming Architecture
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- Introduction: Why Reactive Streaming Architecture Matters in 2026
- 1. Reactive Programming Fundamentals
- 2. Spring WebFlux Deep Dive
- 3. Apache Kafka: The Complete Guide
- 4. ZooKeeper to KRaft: The Great Migration
- 5. Streaming API Comparison: SSE vs WebSocket vs gRPC Streaming
- 6. WebFlux + Kafka Integration
- 7. Real-World Project: Real-Time Stock Ticker System
- 8. Decision Matrix: When to Use What
- 9. Six-Month Learning Roadmap
- 10. Quiz
- References
Introduction: Why Reactive Streaming Architecture Matters in 2026
Modern applications are expected to process millions of events per second, deliver real-time updates to users, and scale elastically under unpredictable load. Traditional request-response architectures with blocking I/O simply cannot meet these demands. In 2026, the combination of Spring WebFlux for non-blocking request handling, Apache Kafka for durable event streaming, and the transition from ZooKeeper to KRaft for simplified cluster management represents the gold standard for reactive streaming systems.
This guide covers the full stack: from reactive programming fundamentals to production-grade architecture patterns. Whether you are building a real-time analytics dashboard, a financial trading platform, or an IoT data pipeline, you will find actionable knowledge here.
What You Will Learn
- Reactive programming principles and the Reactive Streams specification
- Spring WebFlux internals: Netty event loop, Mono/Flux, operator chains, and R2DBC
- Apache Kafka architecture: partitions, consumer groups, Exactly-Once Semantics, Kafka Streams, and Connect
- The ZooKeeper to KRaft migration: why, when, and how
- Streaming API comparison: SSE vs WebSocket vs gRPC Streaming
- Integration patterns: WebFlux + Reactor Kafka + CQRS + Event Sourcing
- A complete real-world project: real-time stock ticker system
1. Reactive Programming Fundamentals
1.1 The Reactive Manifesto
The Reactive Manifesto defines four key properties that reactive systems must exhibit:
| Property | Description | Implementation Example |
|---|---|---|
| Responsive | System responds in a timely manner | WebFlux non-blocking I/O |
| Resilient | System stays responsive in the face of failure | Circuit breaker, bulkhead patterns |
| Elastic | System stays responsive under varying workload | Kafka partition scaling |
| Message Driven | System relies on asynchronous message passing | Kafka topics, Reactor event streams |
These four properties are not independent. Message-driven architecture enables elasticity and resilience, which together enable responsiveness.
1.2 Reactive Streams Specification
Reactive Streams is a standard for asynchronous stream processing with non-blocking backpressure. It defines four core interfaces:
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
public interface Subscription {
void request(long n);
void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
The critical innovation is the request(long n) method in Subscription. This enables backpressure: the subscriber controls the rate of data flow, preventing the producer from overwhelming the consumer.
1.3 Understanding Backpressure
Backpressure is the mechanism by which a data consumer signals to a producer how much data it can handle. Without backpressure, fast producers overwhelm slow consumers, leading to memory exhaustion or dropped messages.
Producer (1000 msg/s) --> Buffer (overflows!) --> Consumer (100 msg/s)
With backpressure:
Producer (adjusts to 100 msg/s) <-- request(100) -- Consumer (100 msg/s)
Backpressure strategies in Project Reactor:
Flux.range(1, Integer.MAX_VALUE)
.onBackpressureBuffer(256) // Buffer up to 256 items
.onBackpressureDrop(dropped -> // Drop when buffer full
log.warn("Dropped: {}", dropped))
.onBackpressureLatest() // Keep only latest item
.onBackpressureError() // Signal error when overwhelmed
.subscribe(item -> processSlowly(item));
1.4 Reactive Libraries Landscape
| Library | Language/Platform | Key Features | Adoption |
|---|---|---|---|
| Project Reactor | Java | Mono/Flux, Spring integration | Spring ecosystem |
| RxJava 3 | Java | Observable/Flowable, Android support | Android, legacy Java |
| Mutiny | Java | Uni/Multi, Quarkus-native | Quarkus ecosystem |
| Kotlin Flow | Kotlin | Coroutine-based, structured concurrency | Kotlin-first projects |
| Akka Streams | Scala/Java | Actor-based, graph DSL | High-throughput systems |
2. Spring WebFlux Deep Dive
2.1 WebFlux vs Spring MVC Architecture
Spring MVC uses a thread-per-request model. Each incoming request occupies one thread from the pool until the response is fully written. Under high concurrency with I/O-bound workloads, threads are mostly idle waiting for database queries, HTTP calls, or file reads.
Spring WebFlux uses an event-loop model powered by Netty. A small number of event loop threads handle all requests. When I/O is needed, the request is parked (not blocking a thread), and the event loop moves on to handle other requests. When the I/O completes, the event loop picks up the response.
Spring MVC (Thread-per-request):
Thread-1: [Request] --> [DB Wait......] --> [Response]
Thread-2: [Request] --> [API Wait.........] --> [Response]
Thread-3: [Request] --> [File Wait....] --> [Response]
(200 threads = 200 concurrent requests)
Spring WebFlux (Event Loop):
EventLoop-1: [Req1][Req2][Req3][Req1-resume][Req4][Req2-resume]...
EventLoop-2: [Req5][Req6][Req5-resume][Req7][Req8]...
(4 threads = thousands of concurrent requests)
2.2 Netty Event Loop Model
Netty is the default embedded server for WebFlux. Understanding its architecture is essential for performance tuning.
┌─────────────────────────┐
│ Boss EventLoopGroup │
│ (accepts connections) │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Worker EventLoopGroup │
│ (handles I/O events) │
├─────────────────────────┤
│ EventLoop-1 Channel-A │
│ EventLoop-2 Channel-B │
│ EventLoop-3 Channel-C │
│ EventLoop-4 Channel-D │
└─────────────────────────┘
Key rules for working with Netty event loops:
- Never block an event loop thread — no
Thread.sleep(), no blocking I/O, no synchronized locks - Offload CPU-intensive work to bounded elastic schedulers
- Channel affinity — each channel is bound to one event loop for its lifetime
2.3 Mono and Flux: The Core Types
Mono represents 0 or 1 element. Flux represents 0 to N elements.
// Mono: single value or empty
Mono<User> findUser = userRepository.findById(userId);
Mono<Void> saveResult = userRepository.save(user);
// Flux: stream of values
Flux<Order> orders = orderRepository.findByUserId(userId);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<ServerSentEvent<String>> sseStream = Flux.create(sink -> {
// push events to sink
});
Creating Mono and Flux:
// Static factory methods
Mono.just("hello");
Mono.empty();
Mono.error(new RuntimeException("failure"));
Mono.fromCallable(() -> expensiveComputation());
Mono.defer(() -> Mono.just(dynamicValue()));
Flux.just("a", "b", "c");
Flux.fromIterable(myList);
Flux.range(1, 100);
Flux.interval(Duration.ofMillis(500));
Flux.merge(flux1, flux2);
Flux.concat(flux1, flux2);
Flux.zip(flux1, flux2, (a, b) -> a + b);
2.4 Essential Operators
// Transformation
flux.map(String::toUpperCase)
.flatMap(name -> userService.findByName(name)) // async 1:N
.concatMap(user -> enrichUser(user)) // sequential async
.switchMap(query -> search(query)) // cancel previous
.flatMapSequential(id -> fetchById(id)) // ordered parallel
// Filtering
flux.filter(user -> user.isActive())
.distinct()
.take(10)
.skip(5)
.takeUntil(event -> event.isTerminal())
// Error Handling
flux.onErrorReturn("default")
.onErrorResume(ex -> fallbackFlux())
.retry(3)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(30))
.filter(ex -> ex instanceof TransientException))
.timeout(Duration.ofSeconds(5))
// Combining
Flux.merge(fastSource, slowSource) // interleaved
Flux.concat(firstBatch, secondBatch) // sequential
Flux.zip(names, ages, Person::new) // paired
Mono.zip(userMono, profileMono, ordersMono) // parallel join
.map(tuple -> buildResponse(tuple.getT1(), tuple.getT2(), tuple.getT3()))
// Side Effects (for logging/debugging only)
flux.doOnNext(item -> log.debug("Processing: {}", item))
.doOnError(ex -> log.error("Error: {}", ex.getMessage()))
.doOnComplete(() -> log.info("Stream completed"))
.doOnSubscribe(sub -> log.info("Subscribed"))
2.5 WebFlux Controller and Functional Endpoints
Annotation-based (familiar MVC style):
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.streamAllUsers();
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@Valid @RequestBody Mono<User> userMono) {
return userMono.flatMap(userService::create);
}
}
Functional endpoints (router + handler):
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.path("/api/users", builder -> builder
.GET("/{id}", handler::getUser)
.GET("", handler::listUsers)
.POST("", handler::createUser)
.PUT("/{id}", handler::updateUser)
.DELETE("/{id}", handler::deleteUser))
.build();
}
}
@Component
public class UserHandler {
private final UserService userService;
public Mono<ServerResponse> getUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> listUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userService.findAll(), User.class);
}
}
2.6 R2DBC: Reactive Database Access
R2DBC (Reactive Relational Database Connectivity) brings non-blocking database access to WebFlux applications.
@Configuration
@EnableR2dbcRepositories
public class DatabaseConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, "localhost")
.option(PORT, 5432)
.option(DATABASE, "mydb")
.option(USER, "user")
.option(PASSWORD, "password")
.option(MAX_SIZE, 20)
.build());
}
}
public interface UserRepository extends ReactiveCrudRepository<User, String> {
Flux<User> findByStatusOrderByCreatedAtDesc(String status);
@Query("SELECT * FROM users WHERE email = :email")
Mono<User> findByEmail(String email);
@Query("SELECT * FROM users WHERE department = :dept")
Flux<User> findByDepartment(String dept);
}
R2DBC with DatabaseClient for complex queries:
@Repository
public class CustomUserRepository {
private final DatabaseClient databaseClient;
public Flux<User> searchUsers(String keyword, int limit) {
return databaseClient.sql(
"SELECT * FROM users WHERE name ILIKE :keyword LIMIT :limit")
.bind("keyword", "%" + keyword + "%")
.bind("limit", limit)
.map(row -> new User(
row.get("id", String.class),
row.get("name", String.class),
row.get("email", String.class)
))
.all();
}
}
2.7 WebFlux Performance Benchmarks
Benchmark conditions: 4-core CPU, 8GB RAM, 10,000 concurrent connections, I/O-bound workload (50ms database latency per request).
| Metric | Spring MVC (Tomcat) | Spring WebFlux (Netty) | Improvement |
|---|---|---|---|
| Throughput (req/s) | 2,400 | 18,500 | 7.7x |
| Avg Latency (ms) | 4,200 | 540 | 87% lower |
| P99 Latency (ms) | 12,000 | 1,200 | 90% lower |
| Memory Usage (MB) | 2,100 | 480 | 77% lower |
| Threads Active | 200 | 8 | 96% fewer |
| Max Concurrent Requests | 200 | 50,000+ | 250x |
When NOT to use WebFlux:
- CPU-bound workloads (compression, encryption, heavy computation)
- Applications already performing well with MVC
- Teams without reactive programming experience
- When blocking libraries are unavoidable (JDBC without R2DBC wrapper)
- Simple CRUD applications with low concurrency
3. Apache Kafka: The Complete Guide
3.1 Kafka Architecture Overview
Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day. Its architecture is built around a few key concepts:
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic-A │ │ Topic-A │ │ Topic-A │ │
│ │ Part-0 │ │ Part-1 │ │ Part-2 │ │
│ │ (leader) │ │ (leader) │ │ (leader) │ │
│ │ │ │ │ │ │ │
│ │ Topic-A │ │ Topic-A │ │ Topic-A │ │
│ │ Part-1 │ │ Part-2 │ │ Part-0 │ │
│ │ (replica)│ │ (replica)│ │ (replica)│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Metadata: ZooKeeper (legacy) or KRaft (Kafka 3.5+) │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Core Concepts:
| Concept | Description |
|---|---|
| Topic | Named feed of messages, similar to a database table |
| Partition | Ordered, immutable sequence of records within a topic |
| Offset | Sequential ID for each record within a partition |
| Broker | A Kafka server that stores data and serves clients |
| Producer | Client that publishes records to topics |
| Consumer | Client that reads records from topics |
| Consumer Group | Set of consumers that cooperatively consume from topics |
| Replication | Each partition is replicated across multiple brokers for durability |
| ISR | In-Sync Replicas: replicas that are caught up with the leader |
3.2 Producer Configuration and Best Practices
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Reliability settings
config.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all ISR replicas
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Performance tuning
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB batch
config.put(ProducerConfig.LINGER_MS_CONFIG, 20); // Wait 20ms for batching
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Sending messages with different patterns:
@Service
@RequiredArgsConstructor
public class OrderEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
// Fire-and-forget (highest throughput)
public void sendFireAndForget(OrderEvent event) {
kafkaTemplate.send("orders", event.getOrderId(), event);
}
// Synchronous send (highest reliability)
public void sendSync(OrderEvent event) throws Exception {
kafkaTemplate.send("orders", event.getOrderId(), event)
.get(10, TimeUnit.SECONDS);
}
// Asynchronous send with callback (balanced)
public CompletableFuture<SendResult<String, Object>> sendAsync(OrderEvent event) {
return kafkaTemplate.send("orders", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to send order event: {}", event.getOrderId(), ex);
} else {
log.info("Order event sent to partition {} offset {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
// Send with headers
public void sendWithHeaders(OrderEvent event) {
ProducerRecord<String, Object> record = new ProducerRecord<>("orders",
null, null, event.getOrderId(), event);
record.headers()
.add("event-type", "ORDER_CREATED".getBytes())
.add("source", "order-service".getBytes())
.add("correlation-id", UUID.randomUUID().toString().getBytes());
kafkaTemplate.send(record);
}
}
3.3 Consumer Configuration and Patterns
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// Offset management
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit
// Performance tuning
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(1000L, 3)
));
return factory;
}
}
Consumer listener patterns:
@Component
@RequiredArgsConstructor
public class OrderEventConsumer {
private final OrderService orderService;
// Single record processing
@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void processOrder(ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
log.info("Processing order: {} from partition: {} offset: {}",
record.value().getOrderId(),
record.partition(),
record.offset());
orderService.process(record.value());
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process order: {}", record.value().getOrderId(), e);
// Don't acknowledge — will be retried by error handler
}
}
// Batch processing
@KafkaListener(topics = "analytics-events", groupId = "analytics-batch-group",
containerFactory = "batchKafkaListenerContainerFactory")
public void processBatch(List<ConsumerRecord<String, AnalyticsEvent>> records,
Acknowledgment ack) {
log.info("Processing batch of {} records", records.size());
try {
List<AnalyticsEvent> events = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
analyticsService.bulkInsert(events);
ack.acknowledge();
} catch (Exception e) {
log.error("Batch processing failed", e);
}
}
// Filtering with headers
@KafkaListener(topics = "orders",
filter = "orderCreatedFilter",
groupId = "notification-group")
public void handleOrderCreated(OrderEvent event) {
notificationService.sendOrderConfirmation(event);
}
}
3.4 Exactly-Once Semantics (EOS)
Kafka supports exactly-once semantics through the combination of idempotent producers and transactional APIs.
@Configuration
public class KafkaTransactionalConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
}
@Service
@RequiredArgsConstructor
public class TransactionalOrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional("kafkaTransactionManager")
public void processOrderTransactionally(OrderEvent order) {
// All messages in this method are sent atomically
kafkaTemplate.send("orders-processed", order.getOrderId(), order);
kafkaTemplate.send("inventory-updates", order.getProductId(),
new InventoryEvent(order.getProductId(), -order.getQuantity()));
kafkaTemplate.send("payment-requests", order.getOrderId(),
new PaymentEvent(order.getOrderId(), order.getTotalAmount()));
// If any send fails, all are rolled back
}
}
Three levels of delivery guarantees:
| Guarantee | Configuration | Use Case |
|---|---|---|
| At-most-once | acks=0, auto-commit | Metrics, logging |
| At-least-once | acks=all, manual commit, idempotent consumer | Most applications |
| Exactly-once | Transactional API + read-committed isolation | Financial, billing, inventory |
3.5 Kafka Streams
Kafka Streams is a client library for building applications and microservices where the input and output data are stored in Kafka.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
config.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
return new KafkaStreamsConfiguration(config);
}
}
@Component
public class OrderAnalyticsTopology {
@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {
// Real-time order analytics pipeline
KStream<String, OrderEvent> orders = streamsBuilder
.stream("orders", Consumed.with(Serdes.String(), orderEventSerde()));
// 1. Order count per product in tumbling windows
KTable<Windowed<String>, Long> productCounts = orders
.groupBy((key, order) -> order.getProductId(),
Grouped.with(Serdes.String(), orderEventSerde()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("product-counts-store"));
// 2. Revenue aggregation per category
KTable<String, Double> revenueByCategory = orders
.groupBy((key, order) -> order.getCategory(),
Grouped.with(Serdes.String(), orderEventSerde()))
.aggregate(
() -> 0.0,
(key, order, total) -> total + order.getTotalAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// 3. Join with customer data for enrichment
KTable<String, CustomerProfile> customers = streamsBuilder
.table("customer-profiles",
Consumed.with(Serdes.String(), customerProfileSerde()));
KStream<String, EnrichedOrder> enrichedOrders = orders
.selectKey((key, order) -> order.getCustomerId())
.join(customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde()));
enrichedOrders.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde()));
// 4. Fraud detection with session windows
orders
.filter((key, order) -> order.getTotalAmount() > 1000)
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
.count()
.toStream()
.filter((windowedKey, count) -> count >= 3)
.mapValues((key, count) -> "ALERT: " + count + " high-value orders in session")
.to("fraud-alerts");
}
}
3.6 Kafka Connect
Kafka Connect is a framework for connecting Kafka with external systems like databases, key-value stores, search indexes, and file systems.
{
"name": "postgresql-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "password",
"database.dbname": "orders_db",
"database.server.name": "order-server",
"table.include.list": "public.orders,public.order_items",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"topic.prefix": "cdc",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "cdc-$3"
}
}
Elasticsearch sink connector:
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "enriched-orders",
"type.name": "_doc",
"key.ignore": false,
"schema.ignore": true,
"behavior.on.null.values": "delete",
"write.method": "upsert",
"batch.size": 200,
"max.buffered.records": 5000,
"flush.timeout.ms": 10000
}
}
3.7 Schema Registry
Schema Registry provides a centralized repository for message schemas, enabling schema evolution and compatibility checking.
// Producer with Avro schema
@Configuration
public class AvroProducerConfig {
@Bean
public ProducerFactory<String, GenericRecord> avroProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
config.put("schema.registry.url", "http://schema-registry:8081");
config.put("auto.register.schemas", true);
return new DefaultKafkaProducerFactory<>(config);
}
}
Schema evolution compatibility modes:
| Mode | Allowed Changes | Best For |
|---|---|---|
| BACKWARD | Delete fields, add optional fields | Consumer-first upgrades |
| FORWARD | Add fields, delete optional fields | Producer-first upgrades |
| FULL | Add/delete optional fields only | Independent upgrades |
| NONE | Any change allowed | Development only |
3.8 Kafka Performance Tuning Cheat Sheet
| Parameter | Default | Recommended | Impact |
|---|---|---|---|
| num.partitions | 1 | 6-12 per topic | Parallelism |
| replication.factor | 1 | 3 | Durability |
| min.insync.replicas | 1 | 2 | Write durability |
| batch.size (producer) | 16384 | 32768-65536 | Throughput vs latency |
| linger.ms (producer) | 0 | 5-100 | Batching efficiency |
| compression.type | none | zstd or lz4 | Network/disk savings |
| fetch.min.bytes (consumer) | 1 | 1024-16384 | Throughput vs latency |
| max.poll.records (consumer) | 500 | 100-1000 | Processing batch size |
| segment.bytes | 1073741824 | 536870912 | Log compaction frequency |
| retention.ms | 604800000 | Based on use case | Storage vs replay capability |
4. ZooKeeper to KRaft: The Great Migration
4.1 ZooKeeper's Role in Kafka
ZooKeeper has been Kafka's metadata management system since the beginning. It handles:
- Broker registration: tracking which brokers are alive
- Topic metadata: partition assignments, ISR lists, configurations
- Controller election: choosing which broker acts as the cluster controller
- Consumer group coordination (legacy): storing consumer offsets (moved to Kafka itself in 0.9+)
- ACLs: access control lists for authorization
4.2 Why ZooKeeper Had to Go
| Problem | Description |
|---|---|
| Operational complexity | Two distinct distributed systems to deploy, monitor, and maintain |
| Scaling bottleneck | ZooKeeper stores all metadata in memory; limits cluster to about 200K partitions |
| Split-brain risks | Network partitions can cause ZooKeeper and Kafka to disagree |
| Recovery time | Controller failover requires reading all metadata from ZooKeeper (minutes) |
| Expertise requirement | Teams need deep knowledge of both Kafka and ZooKeeper |
4.3 KRaft: Kafka's Built-in Consensus
KRaft (Kafka Raft) replaces ZooKeeper with a built-in Raft-based consensus protocol. Metadata is stored in an internal Kafka topic (__cluster_metadata), and a set of controller nodes manage cluster state.
ZooKeeper Mode: KRaft Mode:
┌────────┐ ┌────────────┐ ┌──────────────────────┐
│ Broker │───▶│ ZooKeeper │ │ Controller (Raft) │
│ Broker │───▶│ Ensemble │ │ Node 1 (leader) │
│ Broker │───▶│ (3-5 nodes)│ │ Node 2 (follower) │
└────────┘ └────────────┘ │ Node 3 (follower) │
└──────────┬───────────┘
│
┌──────────▼───────────┐
│ Brokers │
│ Broker 1 │
│ Broker 2 │
│ Broker 3 │
└──────────────────────┘
KRaft advantages:
| Feature | ZooKeeper Mode | KRaft Mode |
|---|---|---|
| Max partitions | ~200,000 | Millions |
| Controller failover | Minutes | Seconds |
| Metadata propagation | Asynchronous (laggy) | Event-driven (immediate) |
| Deployment | Two systems | One system |
| Operational overhead | High | Low |
| Shutdown time | Minutes | Seconds |
4.4 KIP-500: The Roadmap
KIP-500 ("Replace ZooKeeper with a Self-Managed Metadata Quorum") defined the multi-year migration plan:
| Kafka Version | Milestone | Year |
|---|---|---|
| 2.8 | Early access KRaft (development only) | 2021 |
| 3.3 | KRaft marked production-ready | 2022 |
| 3.5 | ZooKeeper migration tool available | 2023 |
| 3.7 | Bridge release (dual support) | 2024 |
| 4.0 | ZooKeeper support removed entirely | 2025 |
4.5 Migration Guide: ZooKeeper to KRaft
Step 1: Pre-migration checklist
# Verify Kafka version (must be 3.5+)
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | head -1
# Check current ZooKeeper metadata
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--cluster-id $(kafka-storage.sh random-uuid)
# Verify no deprecated features in use
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers
Step 2: Configure KRaft controllers
# kraft/server.properties for controller nodes
process.roles=controller
node.id=1
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093
log.dirs=/var/kraft-controller-logs
Step 3: Run the migration
# Format controller storage
kafka-storage.sh format \
--config kraft/server.properties \
--cluster-id $(kafka-storage.sh random-uuid) \
--release-version 3.7
# Start migration (ZooKeeper mode -> dual mode)
kafka-metadata.sh --migrate \
--zookeeper-connect zk1:2181,zk2:2181,zk3:2181 \
--bootstrap-server kafka1:9092
# Verify migration status
kafka-metadata.sh --status \
--bootstrap-server kafka1:9092
# Finalize (point of no return)
kafka-metadata.sh --finalize \
--bootstrap-server kafka1:9092
Step 4: Post-migration validation
# Verify all topics are accessible
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Verify consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Check controller status
kafka-metadata.sh --controllers \
--bootstrap-server localhost:9092
4.6 ZAB vs Raft: Protocol Comparison
| Aspect | ZAB (ZooKeeper) | Raft (KRaft) |
|---|---|---|
| Leader election | Epoch-based | Term-based |
| Log replication | Two-phase commit | Append entries |
| Consistency | Sequential consistency | Linearizable reads (optional) |
| Membership | Static config | Dynamic reconfiguration |
| Snapshot | Fuzzy snapshots | Consistent snapshots |
| Complexity | High | Moderate (by design) |
| Implementation | Separate system | Integrated into Kafka |
5. Streaming API Comparison: SSE vs WebSocket vs gRPC Streaming
5.1 Server-Sent Events (SSE)
SSE is a simple, HTTP-based protocol for server-to-client push. The server sends events over a long-lived HTTP connection using the text/event-stream content type.
WebFlux SSE endpoint:
@RestController
@RequestMapping("/api/stream")
public class SSEController {
private final StockPriceService stockPriceService;
@GetMapping(value = "/prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamPrices(
@RequestParam List<String> symbols) {
return stockPriceService.getPriceStream(symbols)
.map(price -> ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(price.getTimestamp()))
.event("price-update")
.data(price)
.retry(Duration.ofSeconds(5))
.build());
}
@GetMapping(value = "/heartbeat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> heartbeat() {
return Flux.interval(Duration.ofSeconds(30))
.map(seq -> ServerSentEvent.<String>builder()
.event("heartbeat")
.data("ping")
.build());
}
}
JavaScript client:
const eventSource = new EventSource('/api/stream/prices?symbols=AAPL,GOOG,MSFT')
eventSource.addEventListener('price-update', (event) => {
const price = JSON.parse(event.data)
updateUI(price)
})
eventSource.addEventListener('heartbeat', () => {
console.log('Connection alive')
})
eventSource.onerror = (error) => {
console.error('SSE error:', error)
// Browser automatically reconnects
}
5.2 WebSocket
WebSocket provides full-duplex communication over a single TCP connection. Both client and server can send messages at any time.
WebFlux WebSocket handler:
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketMapping(StockWebSocketHandler handler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/stocks", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Component
@RequiredArgsConstructor
public class StockWebSocketHandler implements WebSocketHandler {
private final StockPriceService stockPriceService;
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(WebSocketSession session) {
// Incoming messages (subscriptions)
Flux<String> incoming = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> log.info("Received subscription: {}", msg));
// Outgoing price updates
Flux<WebSocketMessage> outgoing = incoming
.flatMap(msg -> {
SubscriptionRequest req = parseRequest(msg);
return stockPriceService.getPriceStream(req.getSymbols());
})
.map(price -> {
try {
String json = objectMapper.writeValueAsString(price);
return session.textMessage(json);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return session.send(outgoing);
}
}
JavaScript client:
const ws = new WebSocket('ws://localhost:8080/ws/stocks')
ws.onopen = () => {
ws.send(
JSON.stringify({
action: 'subscribe',
symbols: ['AAPL', 'GOOG', 'MSFT'],
})
)
}
ws.onmessage = (event) => {
const price = JSON.parse(event.data)
updateUI(price)
}
ws.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason)
// Manual reconnection needed
setTimeout(reconnect, 3000)
}
5.3 gRPC Streaming
gRPC supports four communication patterns: unary, server streaming, client streaming, and bidirectional streaming.
Proto definition:
syntax = "proto3";
package stocks;
service StockService {
// Unary
rpc GetPrice (PriceRequest) returns (StockPrice);
// Server streaming
rpc StreamPrices (PriceStreamRequest) returns (stream StockPrice);
// Client streaming
rpc RecordTrades (stream TradeRecord) returns (TradeSummary);
// Bidirectional streaming
rpc LiveTrading (stream TradeOrder) returns (stream TradeConfirmation);
}
message PriceStreamRequest {
repeated string symbols = 1;
int32 interval_ms = 2;
}
message StockPrice {
string symbol = 1;
double price = 2;
double change = 3;
int64 timestamp = 4;
int64 volume = 5;
}
Server-side implementation (Spring Boot + grpc-spring-boot-starter):
@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {
private final StockPriceService stockPriceService;
@Override
public void streamPrices(PriceStreamRequest request,
StreamObserver<StockPrice> responseObserver) {
stockPriceService.getPriceStream(request.getSymbolsList())
.subscribe(
price -> responseObserver.onNext(toProto(price)),
error -> responseObserver.onError(
Status.INTERNAL.withDescription(error.getMessage()).asException()),
responseObserver::onCompleted
);
}
@Override
public StreamObserver<TradeOrder> liveTrading(
StreamObserver<TradeConfirmation> responseObserver) {
return new StreamObserver<TradeOrder>() {
@Override
public void onNext(TradeOrder order) {
TradeConfirmation confirmation = executeTrade(order);
responseObserver.onNext(confirmation);
}
@Override
public void onError(Throwable t) {
log.error("Client error in live trading", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
5.4 Comparison Table
| Feature | SSE | WebSocket | gRPC Streaming |
|---|---|---|---|
| Protocol | HTTP/1.1 | WS (TCP) | HTTP/2 |
| Direction | Server to client | Bidirectional | All 4 patterns |
| Data Format | Text (UTF-8) | Text or Binary | Protobuf (binary) |
| Auto-reconnect | Built-in (browser) | Manual | Manual |
| Backpressure | Limited | None (native) | Built-in (flow control) |
| Browser Support | All modern browsers | All modern browsers | grpc-web (limited) |
| Firewall Friendly | Yes (standard HTTP) | Sometimes blocked | Requires HTTP/2 |
| Multiplexing | No | No | Yes (HTTP/2) |
| Max Connections | 6 per domain (HTTP/1) | No limit | Multiplexed |
| Compression | Standard HTTP | Per-message (optional) | Built-in |
| Latency | Low | Lowest | Low |
| Throughput | Moderate | High | Highest |
| Serialization | JSON (text) | JSON or custom binary | Protobuf (efficient) |
| Use Case | Notifications, feeds | Chat, gaming, collab | Microservices, IoT |
5.5 When to Choose What
- SSE: Dashboard updates, news feeds, stock tickers (read-only), notification streams. Simple to implement, works through proxies, auto-reconnects.
- WebSocket: Chat applications, collaborative editing, multiplayer games. Need bidirectional communication with low latency.
- gRPC Streaming: Microservice-to-microservice communication, IoT data ingestion, high-throughput internal APIs. Need schema enforcement, efficient serialization, and HTTP/2 features.
6. WebFlux + Kafka Integration
6.1 Reactor Kafka
Reactor Kafka provides a reactive API for Kafka, integrating seamlessly with Spring WebFlux.
@Configuration
public class ReactorKafkaConfig {
@Bean
public ReceiverOptions<String, String> receiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton("events"))
.addAssignListener(partitions ->
log.info("Assigned: {}", partitions))
.addRevokeListener(partitions ->
log.info("Revoked: {}", partitions))
.commitInterval(Duration.ofSeconds(5))
.commitBatchSize(100);
}
@Bean
public SenderOptions<String, String> senderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return SenderOptions.create(props);
}
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
return KafkaReceiver.create(receiverOptions());
}
@Bean
public KafkaSender<String, String> kafkaSender() {
return KafkaSender.create(senderOptions());
}
}
Reactive consumer with backpressure:
@Service
@RequiredArgsConstructor
public class ReactiveEventProcessor {
private final KafkaReceiver<String, String> kafkaReceiver;
private final EventService eventService;
@PostConstruct
public void startConsuming() {
kafkaReceiver.receive()
.groupBy(record -> record.receiverOffset().topicPartition())
.flatMap(partitionFlux -> partitionFlux
.publishOn(Schedulers.boundedElastic())
.concatMap(record -> processRecord(record)
.doOnSuccess(v -> record.receiverOffset().acknowledge())
.onErrorResume(e -> {
log.error("Error processing record: {}", record.key(), e);
record.receiverOffset().acknowledge();
return Mono.empty();
})
))
.subscribe();
}
private Mono<Void> processRecord(ReceiverRecord<String, String> record) {
return eventService.process(record.value())
.doOnNext(result ->
log.debug("Processed: partition={} offset={} key={}",
record.partition(), record.offset(), record.key()));
}
}
Reactive producer:
@Service
@RequiredArgsConstructor
public class ReactiveEventProducer {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
public Mono<Void> sendEvent(String topic, String key, Object event) {
return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
.flatMap(json -> {
SenderRecord<String, String, String> record = SenderRecord.create(
new ProducerRecord<>(topic, key, json), key);
return kafkaSender.send(Mono.just(record))
.next()
.doOnNext(result -> log.info("Sent to {}-{} offset {}",
result.recordMetadata().topic(),
result.recordMetadata().partition(),
result.recordMetadata().offset()))
.then();
});
}
public Flux<SenderResult<String>> sendBatch(String topic, List<Event> events) {
Flux<SenderRecord<String, String, String>> records = Flux.fromIterable(events)
.map(event -> {
String json = objectMapper.writeValueAsString(event);
return SenderRecord.create(
new ProducerRecord<>(topic, event.getId(), json), event.getId());
});
return kafkaSender.send(records)
.doOnError(e -> log.error("Batch send failed", e));
}
}
6.2 CQRS + Event Sourcing with WebFlux and Kafka
CQRS (Command Query Responsibility Segregation) separates read and write models. Combined with Event Sourcing, every state change is stored as an immutable event.
┌─────────────────────────────────────────────────────────┐
│ CQRS + Event Sourcing │
│ │
│ Command Side Query Side │
│ ┌──────────┐ ┌──────────┐ │
│ │ WebFlux │ │ WebFlux │ │
│ │ Command │ │ Query │ │
│ │ Handler │ │ Handler │ │
│ └────┬─────┘ └────▲─────┘ │
│ │ │ │
│ ┌────▼─────┐ ┌────┴─────┐ │
│ │ Domain │ │ Read │ │
│ │ Aggregate│ │ Model │ │
│ └────┬─────┘ │ (R2DBC) │ │
│ │ └────▲─────┘ │
│ ┌────▼─────────────────────────────┐ │ │
│ │ Kafka (Event Store) ├─┘ │
│ │ orders.events topic │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Command handler:
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderCommandController {
private final OrderCommandService commandService;
@PostMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public Mono<OrderId> createOrder(@RequestBody Mono<CreateOrderCommand> command) {
return command.flatMap(commandService::handle);
}
@PostMapping("/{orderId}/confirm")
public Mono<ResponseEntity<Void>> confirmOrder(@PathVariable String orderId) {
return commandService.handle(new ConfirmOrderCommand(orderId))
.then(Mono.just(ResponseEntity.accepted().build()));
}
}
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
public Mono<OrderId> handle(CreateOrderCommand command) {
// Validate command
OrderAggregate aggregate = OrderAggregate.create(command);
List<DomainEvent> events = aggregate.getUncommittedEvents();
// Publish events to Kafka
return Flux.fromIterable(events)
.flatMap(event -> publishEvent("order-events", aggregate.getId(), event))
.then(Mono.just(new OrderId(aggregate.getId())));
}
private Mono<Void> publishEvent(String topic, String key, DomainEvent event) {
return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
.flatMap(json -> {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, json);
record.headers().add("event-type", event.getType().getBytes());
return kafkaSender.send(Mono.just(SenderRecord.create(record, key)))
.next()
.then();
});
}
}
Event projection (read-side updater):
@Service
@RequiredArgsConstructor
public class OrderProjectionService {
private final KafkaReceiver<String, String> kafkaReceiver;
private final OrderReadRepository readRepository;
private final ObjectMapper objectMapper;
@PostConstruct
public void startProjection() {
kafkaReceiver.receive()
.concatMap(record -> {
String eventType = new String(
record.headers().lastHeader("event-type").value());
return processEvent(eventType, record.value())
.doOnSuccess(v -> record.receiverOffset().acknowledge());
})
.subscribe();
}
private Mono<Void> processEvent(String eventType, String payload) {
return switch (eventType) {
case "OrderCreated" -> {
OrderCreatedEvent event = objectMapper.readValue(payload, OrderCreatedEvent.class);
yield readRepository.save(OrderReadModel.from(event)).then();
}
case "OrderConfirmed" -> {
OrderConfirmedEvent event = objectMapper.readValue(payload, OrderConfirmedEvent.class);
yield readRepository.findById(event.getOrderId())
.map(model -> model.withStatus("CONFIRMED"))
.flatMap(readRepository::save)
.then();
}
default -> Mono.empty();
};
}
}
7. Real-World Project: Real-Time Stock Ticker System
7.1 System Architecture
┌──────────────────────────────────────────────────────────────────┐
│ Real-Time Stock Ticker System │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ Market Data │ │ Kafka │ │ WebFlux │ │
│ │ Providers │────▶│ Cluster │────▶│ API Gateway │ │
│ │ (WebSocket) │ │ │ │ │ │
│ └─────────────┘ │ raw-prices │ │ SSE /prices │ │
│ │ processed │ │ WS /ws/trade │ │
│ ┌─────────────┐ │ alerts │ │ gRPC /internal │ │
│ │ News Feed │────▶│ │ └───────┬─────────┘ │
│ │ (REST API) │ └──────┬───────┘ │ │
│ └─────────────┘ │ ┌──────▼─────────┐ │
│ ┌──────▼───────┐ │ Clients │ │
│ │ Kafka Streams│ │ Web Dashboard │ │
│ │ Processing │ │ Mobile App │ │
│ │ │ │ Trading Bot │ │
│ │ - VWAP calc │ └────────────────┘ │
│ │ - Anomaly │ │
│ │ - Aggregate │ ┌────────────────┐ │
│ └──────┬───────┘ │ PostgreSQL │ │
│ │ │ (R2DBC) │ │
│ └─────────────▶│ + Redis │ │
│ └────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
7.2 Market Data Ingestion Service
@Service
@RequiredArgsConstructor
@Slf4j
public class MarketDataIngestionService {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
private final WebSocketClient webSocketClient;
@PostConstruct
public void connect() {
URI uri = URI.create("wss://market-data-provider.example.com/stream");
webSocketClient.execute(uri, session -> {
// Subscribe to symbols
Mono<Void> subscribe = session.send(
Mono.just(session.textMessage(
"{\"action\":\"subscribe\",\"symbols\":[\"AAPL\",\"GOOG\",\"MSFT\",\"AMZN\"]}"
))
);
// Process incoming market data
Flux<Void> receive = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(this::publishToKafka)
.doOnError(e -> log.error("Market data stream error", e));
return subscribe.thenMany(receive).then();
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofMinutes(5)))
.subscribe();
}
private Mono<Void> publishToKafka(String rawData) {
try {
MarketTick tick = objectMapper.readValue(rawData, MarketTick.class);
ProducerRecord<String, String> record = new ProducerRecord<>(
"raw-prices", tick.getSymbol(), rawData);
record.headers()
.add("source", "market-data-provider".getBytes())
.add("timestamp", String.valueOf(tick.getTimestamp()).getBytes());
return kafkaSender.send(Mono.just(SenderRecord.create(record, tick.getSymbol())))
.next()
.then();
} catch (Exception e) {
log.error("Failed to parse market data: {}", rawData, e);
return Mono.empty();
}
}
}
7.3 Price Processing with Kafka Streams
@Component
public class PriceProcessingTopology {
@Autowired
public void buildPipeline(StreamsBuilder builder) {
KStream<String, String> rawPrices = builder.stream("raw-prices");
// 1. Parse and validate
KStream<String, StockPrice> validPrices = rawPrices
.mapValues(this::parsePrice)
.filter((symbol, price) -> price != null && price.getPrice() > 0);
// 2. Calculate VWAP (Volume Weighted Average Price) per 1-minute window
KTable<Windowed<String>, VWAPResult> vwap = validPrices
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(
VWAPAccumulator::new,
(symbol, price, acc) -> acc.add(price.getPrice(), price.getVolume()),
Materialized.<String, VWAPAccumulator, WindowStore<Bytes, byte[]>>
as("vwap-store")
.withValueSerde(vwapAccumulatorSerde())
)
.mapValues(VWAPAccumulator::result);
// 3. Anomaly detection (price change > 5% in 30 seconds)
validPrices
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)))
.aggregate(
PriceRange::new,
(symbol, price, range) -> range.update(price.getPrice()),
Materialized.with(Serdes.String(), priceRangeSerde())
)
.toStream()
.filter((key, range) -> range.percentChange() > 5.0)
.mapValues((key, range) -> new PriceAlert(
key.key(), range.percentChange(), "ANOMALY"))
.to("alerts", Produced.with(windowedSerde(), priceAlertSerde()));
// 4. Output processed prices
validPrices
.mapValues(price -> new ProcessedPrice(
price.getSymbol(), price.getPrice(), price.getVolume(),
price.getTimestamp(), System.currentTimeMillis()))
.to("processed-prices", Produced.with(Serdes.String(), processedPriceSerde()));
}
}
7.4 SSE Streaming Endpoint for Web Clients
@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {
private final KafkaReceiver<String, String> processedPriceReceiver;
private final ObjectMapper objectMapper;
private final Sinks.Many<ProcessedPrice> priceSink;
@PostConstruct
public void init() {
// Bridge Kafka to reactive sink
processedPriceReceiver.receive()
.map(record -> {
record.receiverOffset().acknowledge();
try {
return objectMapper.readValue(record.value(), ProcessedPrice.class);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull)
.subscribe(price -> priceSink.tryEmitNext(price));
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ProcessedPrice>> streamPrices(
@RequestParam(required = false) List<String> symbols) {
Flux<ProcessedPrice> priceFlux = priceSink.asFlux();
if (symbols != null && !symbols.isEmpty()) {
Set<String> symbolSet = new HashSet<>(symbols);
priceFlux = priceFlux.filter(p -> symbolSet.contains(p.getSymbol()));
}
return priceFlux
.map(price -> ServerSentEvent.<ProcessedPrice>builder()
.id(price.getSymbol() + "-" + price.getTimestamp())
.event("price")
.data(price)
.build())
.doOnCancel(() -> log.info("Client disconnected from price stream"));
}
@GetMapping(value = "/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PriceAlert>> streamAlerts() {
return alertReceiver.receive()
.map(record -> {
record.receiverOffset().acknowledge();
PriceAlert alert = objectMapper.readValue(record.value(), PriceAlert.class);
return ServerSentEvent.<PriceAlert>builder()
.event("alert")
.data(alert)
.build();
});
}
}
7.5 Application Configuration
# application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/stockdb
username: stockapp
password: secret
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
consumer:
group-id: stock-ticker-service
auto-offset-reset: latest
enable-auto-commit: false
producer:
acks: all
compression-type: zstd
batch-size: 32768
properties:
linger.ms: 10
webflux:
base-path: /api
server:
port: 8080
netty:
connection-timeout: 5000
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
tags:
application: stock-ticker
7.6 Docker Compose for Local Development
version: '3.8'
services:
kafka-1:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
ports:
- '9092:9092'
kafka-2:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-3:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
postgres:
image: postgres:16
environment:
POSTGRES_DB: stockdb
POSTGRES_USER: stockapp
POSTGRES_PASSWORD: secret
ports:
- '5432:5432'
redis:
image: redis:7-alpine
ports:
- '6379:6379'
8. Decision Matrix: When to Use What
8.1 Technology Selection Guide
| Scenario | WebFlux | Kafka | SSE | WebSocket | gRPC |
|---|---|---|---|---|---|
| Real-time dashboard | Yes | Yes | Best | Good | Overkill |
| Chat application | Yes | Optional | No | Best | Good |
| IoT data pipeline | Yes | Best | No | Good | Best |
| E-commerce order processing | Optional | Best | No | No | Good |
| Live sports scores | Yes | Yes | Best | Good | No |
| Financial trading platform | Yes | Best | No | Best | Best |
| Microservice event bus | No | Best | No | No | Good |
| Server notifications | Yes | Optional | Best | Good | No |
| Video/audio streaming | No | No | No | Partial | Good |
| Log aggregation pipeline | No | Best | No | No | Good |
8.2 Decision Flowchart
Start: Do you need real-time data?
│
├── No → Traditional REST API (Spring MVC) is fine
│
└── Yes → Is it server-to-client only?
│
├── Yes → SSE (simplest, auto-reconnect)
│
└── No → Do you need bidirectional communication?
│
├── Yes → Is it browser-facing?
│ │
│ ├── Yes → WebSocket
│ │
│ └── No → gRPC Bidirectional Streaming
│
└── Do you need durable event storage?
│
├── Yes → Kafka + consumer of choice
│
└── No → Direct WebFlux streaming
8.3 Performance Comparison by Workload
| Workload | Spring MVC | WebFlux | WebFlux + Kafka | Notes |
|---|---|---|---|---|
| 100 concurrent users | 50ms avg | 48ms | 52ms | Negligible difference |
| 10K concurrent users | 2s avg | 120ms | 150ms | WebFlux shines |
| 100K concurrent users | Timeout | 250ms | 300ms | MVC cannot handle |
| 1M events/sec ingest | N/A | Limited | 1.2M/s | Kafka essential |
| Complex event proc. | N/A | Possible | 500K/s | Kafka Streams advantage |
9. Six-Month Learning Roadmap
Month 1-2: Reactive Foundations
| Week | Topic | Deliverable |
|---|---|---|
| 1 | Reactive Streams spec, Publisher/Subscriber | Implement custom Publisher |
| 2 | Project Reactor: Mono/Flux fundamentals | 20 operator exercises |
| 3 | Error handling, retry, timeout patterns | Resilient API client |
| 4 | Testing reactive code (StepVerifier) | 90% test coverage on exercises |
| 5-6 | Spring WebFlux: controllers, functional endpoints | CRUD REST API (WebFlux) |
| 7-8 | R2DBC, WebClient, reactive security | Full-stack reactive application |
Month 3-4: Kafka Mastery
| Week | Topic | Deliverable |
|---|---|---|
| 9-10 | Kafka fundamentals: topics, partitions, consumers | Local 3-broker cluster setup |
| 11-12 | Spring Kafka: producer/consumer patterns | Order processing system |
| 13-14 | Kafka Streams: stateless and stateful operations | Real-time analytics pipeline |
| 15-16 | Exactly-once, transactions, Schema Registry | Transactional event system |
Month 5: Integration and Streaming APIs
| Week | Topic | Deliverable |
|---|---|---|
| 17 | SSE with WebFlux | Live notification system |
| 18 | WebSocket with WebFlux | Chat application |
| 19 | gRPC Streaming with Spring Boot | Internal service communication |
| 20 | Reactor Kafka + CQRS pattern | Event-sourced microservice |
Month 6: Production Readiness
| Week | Topic | Deliverable |
|---|---|---|
| 21 | KRaft migration, cluster operations | KRaft cluster deployment |
| 22 | Monitoring: Micrometer, Prometheus, Grafana | Observability dashboard |
| 23 | Performance tuning, load testing | Benchmark report |
| 24 | Capstone: Real-time stock ticker system | Production-ready project |
10. Quiz
Q1: What is the main advantage of Spring WebFlux over Spring MVC for I/O-bound workloads?
Answer: WebFlux uses a non-blocking event loop model (Netty) that can handle thousands of concurrent connections with just a few threads. Spring MVC uses a thread-per-request model where each concurrent request occupies a thread. For I/O-bound workloads (where threads spend most time waiting for database, network, or file operations), WebFlux is dramatically more efficient because it does not waste threads on waiting. In benchmarks, WebFlux can handle 7-8x more throughput with 77% less memory for I/O-bound workloads.
Q2: Explain the difference between flatMap, concatMap, and switchMap in Project Reactor.
Answer:
flatMap: Maps each element to a Publisher and merges them eagerly. Inner publishers run concurrently and results may arrive out of order. Best for maximum throughput when order does not matter.concatMap: Maps each element to a Publisher and subscribes sequentially. Waits for each inner publisher to complete before subscribing to the next. Preserves order but lower throughput.switchMap: Maps each element to a Publisher, but cancels the previous inner publisher when a new element arrives. Only the latest subscription is active. Ideal for search-as-you-type where only the latest query result matters.
Q3: Why did Kafka replace ZooKeeper with KRaft? Name three specific technical benefits.
Answer:
- Partition scalability: ZooKeeper stored all metadata in memory, limiting clusters to around 200,000 partitions. KRaft uses a log-based metadata store, enabling millions of partitions.
- Faster failover: Controller failover in ZooKeeper mode required reading all metadata from ZooKeeper (minutes). KRaft failover takes seconds because the new controller already has metadata replicated via Raft.
- Simplified operations: With ZooKeeper, teams had to deploy, configure, monitor, and maintain two separate distributed systems. KRaft eliminates the ZooKeeper dependency entirely, reducing operational complexity.
Additional benefits include faster controlled shutdown, elimination of split-brain risks between Kafka and ZooKeeper, and event-driven metadata propagation (versus ZooKeeper's asynchronous, potentially laggy propagation).
Q4: When would you choose SSE over WebSocket for a real-time feature?
Answer: Choose SSE when:
- Communication is server-to-client only — stock tickers, news feeds, notifications, dashboards. No need for the client to send data back on the same connection.
- You need auto-reconnect — SSE has built-in browser reconnection. WebSocket requires manual reconnection logic.
- HTTP infrastructure compatibility — SSE works over standard HTTP/1.1, passing through proxies and load balancers without special configuration. WebSocket upgrades can be blocked by corporate firewalls.
- Simplicity — SSE is simpler to implement (just a GET endpoint returning text/event-stream). WebSocket requires connection management, heartbeats, and reconnection logic.
Choose WebSocket when you need bidirectional communication (chat, gaming, collaborative editing) or binary data transfer.
Q5: Design a system that uses WebFlux, Kafka, and SSE together. Describe the data flow from event generation to client display.
Answer: Consider a real-time order tracking system:
-
Event Generation: When an order status changes, the Order Service publishes an
OrderStatusChangedevent to a Kafka topic (order-events) using Reactor Kafka. The producer usesacks=allfor durability. -
Event Processing: A Kafka Streams application consumes from
order-events, enriches events with customer data (KTable join fromcustomer-profilestopic), and produces enriched events toorder-notificationstopic. -
WebFlux Bridge: A WebFlux service uses
KafkaReceiver(Reactor Kafka) to reactively consume fromorder-notifications. It routes events into aSinks.Many(multicast hot publisher), keyed by customer ID. -
SSE Endpoint: The WebFlux controller exposes an SSE endpoint
/api/orders/trackthat filters theSinks.Manyflux by the authenticated user's customer ID. Each matching event is sent as aServerSentEvent. -
Client: The browser creates an
EventSourcepointing to the SSE endpoint. On each event, the UI updates the order tracking visualization. If the connection drops, the browser auto-reconnects (SSE built-in).
Data flow: Order Service to Kafka to Kafka Streams to Kafka to Reactor Kafka to Sinks.Many to SSE to Browser.
References
- Reactive Manifesto
- Reactive Streams Specification
- Project Reactor Reference Guide
- Spring WebFlux Documentation
- Spring WebFlux vs MVC Performance
- R2DBC Specification
- Netty Project
- Apache Kafka Documentation
- Kafka: The Definitive Guide (2nd Edition)
- KIP-500: Replace ZooKeeper
- Kafka KRaft Documentation
- Kafka Streams Documentation
- Confluent Schema Registry
- Kafka Connect Documentation
- Reactor Kafka Reference Guide
- Server-Sent Events (MDN)
- WebSocket API (MDN)
- gRPC Documentation
- CQRS Pattern (Microsoft)
- Event Sourcing Pattern (Microsoft)
- Debezium CDC Documentation
- Spring Boot gRPC Starter
- Kafka Performance Tuning Guide (Confluent)
- ZooKeeper Documentation
- Raft Consensus Algorithm