Skip to content

Split View: WebFlux + Kafka + ZooKeeper 완전 정복: 리액티브 스트리밍 아키텍처의 모든 것

✨ Learn with Quiz
|

WebFlux + Kafka + ZooKeeper 완전 정복: 리액티브 스트리밍 아키텍처의 모든 것

들어가며

현대 소프트웨어 시스템은 "실시간"을 요구합니다. 주식 시세는 밀리초 단위로 업데이트되고, 채팅 메시지는 즉시 전달되어야 하며, IoT 센서 데이터는 끊임없이 쏟아집니다. 전통적인 요청-응답(Request-Response) 모델로는 이러한 요구사항을 감당하기 어렵습니다.

이 글에서는 실시간 데이터 처리 아키텍처의 핵심 기술 세 가지를 깊이 있게 다룹니다.

  1. Spring WebFlux — 리액티브 웹 프레임워크로 높은 동시접속을 효율적으로 처리
  2. Apache Kafka — 분산 이벤트 스트리밍 플랫폼으로 대용량 데이터를 안정적으로 전달
  3. Streaming API — SSE, WebSocket, gRPC Streaming으로 클라이언트에 실시간 데이터 푸시

또한 Kafka의 역사적 전환점인 ZooKeeper에서 KRaft로의 마이그레이션도 상세히 살펴봅니다. 각 기술의 동작 원리부터 실전 코드 예제, 성능 벤치마크, 아키텍처 패턴까지 — 리액티브 스트리밍 아키텍처의 모든 것을 한 곳에 담았습니다.


1. 리액티브 프로그래밍이란? (기초부터)

Reactive Manifesto

리액티브 시스템은 2014년에 발표된 Reactive Manifesto에서 네 가지 핵심 속성을 정의합니다.

  • Responsive (응답성): 시스템은 가능한 한 적시에 응답해야 합니다. 응답성은 사용성과 유용성의 기초입니다.
  • Resilient (탄력성): 시스템은 장애에 직면하더라도 응답성을 유지합니다. 복제(replication), 격리(containment), 위임(delegation)으로 달성합니다.
  • Elastic (유연성): 시스템은 작업 부하의 변화에 따라 자원을 증감시키며 응답성을 유지합니다.
  • Message Driven (메시지 기반): 리액티브 시스템은 비동기 메시지 전달에 의존하여 컴포넌트 간 느슨한 결합, 격리, 위치 투명성을 보장합니다.
         ┌─────────────────────────────────┐
Responsive              (빠르고 일관된 응답)         └──────────┬──────────┬───────────┘
                    │          │
         ┌──────────▼──┐  ┌───▼──────────┐
Resilient  │  │   Elastic          (장애 복원력) (부하 적응력)         └──────────┬──┘  └───┬──────────┘
                    │          │
         ┌──────────▼──────────▼───────────┐
Message Driven             (비동기 메시지 기반 통신)         └─────────────────────────────────┘

명령형 vs 리액티브 패러다임 비교

명령형(Imperative) 프로그래밍은 "무엇을 어떻게 할지" 단계별로 지시합니다.

// 명령형: 블로킹 방식
List<User> users = userRepository.findAll();          // DB 호출 - 스레드 블로킹
List<User> activeUsers = new ArrayList<>();
for (User user : users) {
    if (user.isActive()) {
        activeUsers.add(user);
    }
}
List<String> names = new ArrayList<>();
for (User user : activeUsers) {
    names.add(user.getName());
}
return names;

리액티브(Reactive) 프로그래밍은 "데이터 흐름과 변화 전파"를 선언적으로 표현합니다.

// 리액티브: 논블로킹 방식
return userRepository.findAll()       // Flux<User> 반환 - 논블로킹
    .filter(User::isActive)           // 활성 사용자 필터링
    .map(User::getName)               // 이름 추출
    .collectList();                   // Mono<List<String>> 반환

핵심 차이는 다음과 같습니다.

항목명령형리액티브
실행 모델동기/블로킹비동기/논블로킹
스레드 사용요청당 하나의 스레드소수의 이벤트 루프 스레드
데이터 처리Pull 기반 (소비자가 요청)Push 기반 (생산자가 전달)
에러 처리try-catchonError 시그널
백프레셔없음 (또는 수동 구현)내장 지원
적합한 상황낮은 동시접속, 간단한 로직높은 동시접속, I/O 집약적

Reactive Streams 스펙

Reactive Streams는 JVM 상에서 논블로킹 백프레셔를 지원하는 비동기 스트림 처리의 표준입니다. Java 9의 java.util.concurrent.Flow에 포함되었으며, 핵심 인터페이스는 네 가지입니다.

// 1. Publisher: 데이터를 생산하는 주체
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

// 2. Subscriber: 데이터를 소비하는 주체
public interface Subscriber<T> {
    void onSubscribe(Subscription s);  // 구독 시작
    void onNext(T t);                  // 데이터 수신
    void onError(Throwable t);         // 에러 발생
    void onComplete();                 // 스트림 완료
}

// 3. Subscription: Publisher-Subscriber 간 연결 관리
public interface Subscription {
    void request(long n);              // n개의 데이터 요청 (백프레셔)
    void cancel();                     // 구독 취소
}

// 4. Processor: Publisher이면서 Subscriber (중간 처리)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

데이터 흐름의 시퀀스는 다음과 같습니다.

Publisher                     Subscriber
   │                              │
   │◄──── subscribe() ────────────│
   │                              │
   │───── onSubscribe(sub) ──────►│
   │                              │
   │◄──── sub.request(n) ─────────│  ← 백프레셔: n개만 요청
   │                              │
   │───── onNext(data1) ─────────►│
   │───── onNext(data2) ─────────►│
   │───── ...   │───── onNext(dataN) ─────────►│
   │                              │
   │◄──── sub.request(m) ─────────│  ← 추가 m개 요청
   │                              │
   │───── onComplete() ──────────►│  (또는 onError())

백프레셔(Backpressure)의 중요성과 동작 원리

백프레셔는 소비자가 처리할 수 있는 속도에 맞춰 생산자의 데이터 전송 속도를 조절하는 메커니즘입니다. 백프레셔가 없으면 다음과 같은 문제가 발생합니다.

  • 메모리 초과(OOM): 빠른 생산자가 느린 소비자의 버퍼를 가득 채움
  • 데이터 유실: 버퍼 오버플로우로 데이터가 드롭됨
  • 시스템 장애: 연쇄적인 장애 전파 (Cascading Failure)

백프레셔 전략에는 여러 가지가 있습니다.

// 1. Buffer: 일정 크기까지 버퍼링 후 오버플로우 시 에러
Flux.range(1, 1000)
    .onBackpressureBuffer(256)
    .subscribe(item -> processSlowly(item));

// 2. Drop: 처리 못하는 데이터는 버림
Flux.range(1, 1000)
    .onBackpressureDrop(dropped ->
        log.warn("Dropped: " + dropped))
    .subscribe(item -> processSlowly(item));

// 3. Latest: 최신 값만 유지
Flux.range(1, 1000)
    .onBackpressureLatest()
    .subscribe(item -> processSlowly(item));

// 4. Error: 처리 못하면 에러 시그널
Flux.range(1, 1000)
    .onBackpressureError()
    .subscribe(item -> processSlowly(item));

2. Spring WebFlux 딥다이브

WebFlux vs Spring MVC 아키텍처 비교

Spring MVC와 WebFlux는 근본적으로 다른 실행 모델을 사용합니다.

Spring MVC: Thread-per-Request 모델

Client Request ──► Tomcat Thread Pool (200 threads)
                   Thread-1 ──► Controller ──► Service ──► DB (블로킹 대기)
                   Thread-2 ──► Controller ──► Service ──► DB (블로킹 대기)
                   Thread-3 ──► Controller ──► Service ──► External API (블로킹 대기)
                   ...
                   Thread-200 ──► (모든 스레드 고갈 시 요청 대기열에 쌓임)
  • 기본 스레드 풀 크기: Tomcat 기준 200개
  • 각 요청이 하나의 스레드를 점유 (I/O 대기 중에도)
  • 동시접속 200개 초과 시 요청 대기 발생

Spring WebFlux: Event Loop 모델

Client Request ──► Netty Event Loop (CPU 코어 수 = N개 스레드)
                   EventLoop-1 ──► Handler1 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
                   EventLoop-2 ──► Handler2 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
                   ...
                   EventLoop-N ──► (I/O 완료 콜백) ──► 응답 전송
  • 이벤트 루프 스레드 수: CPU 코어 수 (예: 8코어 = 8스레드)
  • 요청이 스레드를 점유하지 않음 (I/O 대기 시 해제)
  • 적은 스레드로 수만 개의 동시접속 처리 가능

Netty 이벤트 루프 모델

Netty는 WebFlux의 기본 런타임 서버입니다. Netty의 핵심 구조를 살펴보겠습니다.

┌────────────────────────────────────────────────────┐
Netty Server│                                                    │
│  ┌──────────────┐    ┌──────────────────────────┐  │
│  │  Boss Group   │    │     Worker Group          │  │
  (1 thread)  (CPU cores x 2 threads) │  │
│  │              │    │                          │  │
│  │  Accept      │───►│  EventLoop-1: Channel A  │  │
│  │  connections │    │  EventLoop-2: Channel B  │  │
│  │              │    │  EventLoop-3: Channel C  │  │
│  │              │    │  ...                     │  │
│  └──────────────┘    └──────────────────────────┘  │
│                                                    │
Channel Pipeline:│  ┌─────────┬──────────┬───────────┬────────────┐   │
│  │ DecoderHandler1Handler2Encoder   │   │
│  └─────────┴──────────┴───────────┴────────────┘   │
└────────────────────────────────────────────────────┘
  • Boss Group: 클라이언트 연결을 수락하고 Worker Group에 위임
  • Worker Group: 실제 I/O 처리를 담당하는 이벤트 루프 스레드
  • Channel Pipeline: 인바운드/아웃바운드 핸들러 체인으로 데이터 처리

Project Reactor: Mono vs Flux

Project Reactor는 Spring WebFlux의 리액티브 라이브러리입니다. 핵심 타입 두 가지를 이해하는 것이 중요합니다.

Mono: 0개 또는 1개의 요소를 방출하는 비동기 시퀀스

// Mono 생성
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.empty();
Mono<String> mono3 = Mono.error(new RuntimeException("Error"));

// 실전: 사용자 단건 조회
@GetMapping("/users/me")
public Mono<UserResponse> getCurrentUser(Authentication auth) {
    return userRepository.findByEmail(auth.getName())  // Mono<User>
        .map(UserResponse::from)                        // Mono<UserResponse>
        .switchIfEmpty(Mono.error(new UserNotFoundException()));
}

Flux: 0개에서 N개의 요소를 방출하는 비동기 시퀀스

// Flux 생성
Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> flux2 = Flux.range(1, 100);
Flux<Long> flux3 = Flux.interval(Duration.ofSeconds(1));  // 무한 스트림

// 실전: 사용자 목록 조회
@GetMapping("/users")
public Flux<UserResponse> getActiveUsers() {
    return userRepository.findAll()             // Flux<User>
        .filter(User::isActive)                 // 활성 사용자만
        .map(UserResponse::from)                // DTO 변환
        .take(100);                             // 최대 100명
}

핵심 Operators 가이드

리액티브 프로그래밍의 핵심은 오퍼레이터 조합입니다. 가장 많이 사용하는 오퍼레이터들을 살펴봅니다.

// 1. map: 동기 변환 (1:1)
Flux.just("hello", "world")
    .map(String::toUpperCase)
    // 결과: "HELLO", "WORLD"

// 2. flatMap: 비동기 변환 (1:N, 순서 보장 없음)
Flux.just(1L, 2L, 3L)
    .flatMap(id -> userRepository.findById(id))  // 각 ID로 비동기 조회
    // 결과: 순서가 보장되지 않음 (User2, User1, User3 가능)

// 3. concatMap: 비동기 변환 (1:N, 순서 보장)
Flux.just(1L, 2L, 3L)
    .concatMap(id -> userRepository.findById(id))
    // 결과: 순서 보장 (User1, User2, User3)

// 4. filter: 조건 필터링
Flux.range(1, 10)
    .filter(n -> n % 2 == 0)
    // 결과: 2, 4, 6, 8, 10

// 5. zip: 여러 소스를 조합 (1:1 매칭)
Mono<User> user = userRepository.findById(userId);
Mono<List<Order>> orders = orderRepository.findByUserId(userId).collectList();
Mono.zip(user, orders)
    .map(tuple -> new UserWithOrders(tuple.getT1(), tuple.getT2()));

// 6. merge: 여러 소스를 병합 (인터리빙)
Flux.merge(
    hotNewsFlux,     // 실시간 뉴스
    stockPriceFlux,  // 주식 시세
    alertFlux        // 알림
)
.subscribe(event -> dashboard.update(event));

// 7. concat: 순차적 연결
Flux.concat(
    cacheRepository.findById(id),      // 캐시 먼저
    databaseRepository.findById(id)    // 캐시 미스 시 DB
)
.next();  // 첫 번째 결과만

// 8. switchIfEmpty: 빈 결과 처리
userRepository.findByEmail(email)
    .switchIfEmpty(Mono.defer(() -> createDefaultUser(email)));

// 9. retry + backoff: 재시도 로직
webClient.get()
    .uri("/external-api/data")
    .retrieve()
    .bodyToMono(ExternalData.class)
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
        .maxBackoff(Duration.ofSeconds(10))
        .filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable));

// 10. onErrorResume: 에러 복구
userService.getUserProfile(userId)
    .onErrorResume(UserNotFoundException.class,
        ex -> Mono.just(UserProfile.anonymous()))
    .onErrorResume(ServiceException.class,
        ex -> cachedProfileService.getCachedProfile(userId));

WebFlux + R2DBC (리액티브 DB 접근)

R2DBC(Reactive Relational Database Connectivity)는 관계형 데이터베이스에 대한 논블로킹 접근을 제공합니다.

// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
// implementation 'io.r2dbc:r2dbc-postgresql'

// Entity
@Table("users")
public record User(
    @Id Long id,
    String email,
    String name,
    boolean active,
    LocalDateTime createdAt
) {}

// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    Flux<User> findByActiveTrue();

    @Query("SELECT * FROM users WHERE email = :email")
    Mono<User> findByEmail(String email);

    @Query("SELECT * FROM users WHERE created_at > :since ORDER BY created_at DESC")
    Flux<User> findRecentUsers(LocalDateTime since);
}

// Service
@Service
@RequiredArgsConstructor
public class UserService {

    private final UserRepository userRepository;
    private final DatabaseClient databaseClient;

    public Flux<User> getActiveUsers() {
        return userRepository.findByActiveTrue();
    }

    // 복잡한 쿼리는 DatabaseClient 사용
    public Flux<UserStats> getUserStats() {
        return databaseClient
            .sql("""
                SELECT u.id, u.name, COUNT(o.id) as order_count
                FROM users u
                LEFT JOIN orders o ON u.id = o.user_id
                GROUP BY u.id, u.name
                HAVING COUNT(o.id) > 0
                """)
            .map(row -> new UserStats(
                row.get("id", Long.class),
                row.get("name", String.class),
                row.get("order_count", Long.class)
            ))
            .all();
    }
}

WebFlux + WebClient (리액티브 HTTP 클라이언트)

WebClient는 RestTemplate을 대체하는 논블로킹 HTTP 클라이언트입니다.

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .filter(ExchangeFilterFunctions.basicAuthentication("user", "password"))
            .codecs(configurer -> configurer
                .defaultCodecs()
                .maxInMemorySize(16 * 1024 * 1024))  // 16MB
            .build();
    }
}

@Service
@RequiredArgsConstructor
public class ExternalApiService {

    private final WebClient webClient;

    // GET 요청
    public Mono<ProductResponse> getProduct(String productId) {
        return webClient.get()
            .uri("/products/{id}", productId)
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError,
                response -> Mono.error(new ProductNotFoundException(productId)))
            .onStatus(HttpStatusCode::is5xxServerError,
                response -> Mono.error(new ExternalServiceException()))
            .bodyToMono(ProductResponse.class)
            .timeout(Duration.ofSeconds(5))
            .retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
    }

    // POST 요청
    public Mono<OrderResponse> createOrder(OrderRequest request) {
        return webClient.post()
            .uri("/orders")
            .bodyValue(request)
            .retrieve()
            .bodyToMono(OrderResponse.class);
    }

    // Streaming 응답
    public Flux<EventData> streamEvents() {
        return webClient.get()
            .uri("/events/stream")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(EventData.class);
    }
}

성능 벤치마크: WebFlux vs MVC

다양한 동시접속 수준에서의 벤치마크 결과입니다. (4코어 8GB, 100ms 지연 시뮬레이션)

┌──────────────────┬────────────┬────────────┬────────────┐
│ 동시접속 수       │ Spring MVCWebFlux    │ 차이       │
├──────────────────┼────────────┼────────────┼────────────┤
1,0008,500 rps  │ 9,200 rps  │ +8%10,0003,200 rps  │ 9,100 rps  │ +184%100,000          │ 타임아웃    │ 8,800 rps  │ MVC 실패   │
├──────────────────┼────────────┼────────────┼────────────┤
│ 메모리 사용      │ 2.1 GB0.8 GB-62%│ 스레드 수        │ 20416-92%P99 응답시간     │ 1,200ms    │ 180ms      │ -85% (동시접속 10K)   │            │            │            │
└──────────────────┴────────────┴────────────┴────────────┘

핵심 포인트:

  • 낮은 동시접속(1,000 이하): MVC와 WebFlux의 성능 차이가 크지 않음
  • 높은 동시접속(10,000 이상): WebFlux가 압도적으로 유리
  • 초고동시접속(100,000): MVC는 스레드 고갈로 사실상 작동 불가

주의점: 블로킹 코드 감지

WebFlux에서 가장 흔한 실수는 이벤트 루프 스레드에서 블로킹 코드를 실행하는 것입니다.

// 절대 금지: 이벤트 루프에서 블로킹 호출
@GetMapping("/bad-example")
public Mono<String> badExample() {
    // JDBC는 블로킹 — 이벤트 루프를 멈추게 함
    User user = jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class);
    return Mono.just(user.getName());
}

// 올바른 방법 1: R2DBC 사용
@GetMapping("/good-example-1")
public Mono<String> goodExample1() {
    return r2dbcUserRepository.findById(1L)
        .map(User::getName);
}

// 올바른 방법 2: 블로킹 코드를 별도 스케줄러에서 실행
@GetMapping("/good-example-2")
public Mono<String> goodExample2() {
    return Mono.fromCallable(() ->
            jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class))
        .subscribeOn(Schedulers.boundedElastic())  // 블로킹 전용 스레드풀
        .map(User::getName);
}

블로킹 코드를 자동으로 감지하려면 Blockhound를 사용할 수 있습니다.

// build.gradle
// testImplementation 'io.projectreactor.tools:blockhound:1.0.9.RELEASE'

// 테스트 설정
@BeforeAll
static void setUp() {
    BlockHound.install();
}

@Test
void shouldNotBlock() {
    // 블로킹 코드가 있으면 BlockingOperationError 발생
    StepVerifier.create(userService.getActiveUsers())
        .expectNextCount(5)
        .verifyComplete();
}

3. Apache Kafka 완전 가이드

Kafka 아키텍처 개요

Apache Kafka는 LinkedIn에서 개발한 분산 이벤트 스트리밍 플랫폼입니다. 초당 수백만 건의 이벤트를 안정적으로 처리할 수 있습니다.

┌─────────────────────────────────────────────────────────────┐
Kafka Cluster│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Broker 1   │  │  Broker 2   │  │  Broker 3   │         │
│  │             │  │             │  │             │         │
│  │ Topic-A P0  │  │ Topic-A P1  │  │ Topic-A P2Leader│  │ Topic-B P1R │  │ Topic-B P0  │  │ Topic-A P0RReplica│
│  │ Topic-A P2R │  │ Topic-A P0R │  │ Topic-B P1  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
P = Partition (Leader),  R = Replica (Follower)└─────────────────────────────────────────────────────────────┘
         ▲                                    │
         │                                    ▼
   ┌─────────────┐                   ┌─────────────────┐
Producers  │                   │  Consumer Group   │             │                   │                 │
App1, App2  │                   │ Consumer1P0   │             │                   │ Consumer2P1   │             │                   │ Consumer3P2   └─────────────┘                   └─────────────────┘

핵심 구성 요소:

  • Broker: Kafka 서버 인스턴스. 클러스터에 여러 브로커가 존재
  • Topic: 메시지를 구분하는 논리적 카테고리 (예: user-events, order-events)
  • Partition: 토픽을 분할하는 물리적 단위. 병렬 처리의 기본 단위
  • Replica: 파티션의 복제본. Leader와 Follower로 구분
  • ISR (In-Sync Replicas): Leader와 동기화된 Replica 집합

Producer: 메시지 전송의 모든 것

// Producer 설정
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // 성능 튜닝
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);        // 32KB 배치
        config.put(ProducerConfig.LINGER_MS_CONFIG, 20);            // 20ms 대기
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");  // 압축
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);  // 64MB 버퍼

        // 안정성
        config.put(ProducerConfig.ACKS_CONFIG, "all");              // 모든 ISR 확인
        config.put(ProducerConfig.RETRIES_CONFIG, 3);               // 재시도
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 보장

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Acknowledgment 모드 비교:

acks 값동작안정성성능
0전송 후 확인 안 함낮음 (데이터 유실 가능)최고
1Leader만 확인중간 (Leader 장애 시 유실 가능)높음
all (-1)모든 ISR 확인최고 (ISR 모두 확인)보통

Idempotent Producer: enable.idempotence=true 설정 시, 동일 메시지를 중복 전송하더라도 한 번만 기록합니다. Producer ID와 Sequence Number를 사용하여 중복을 감지합니다.

Consumer: 메시지 소비의 모든 것

// Consumer 설정
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service-group");

        // 오프셋 관리
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 수동 커밋

        // 성능 튜닝
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);      // 1KB 최소 패치
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);     // 500ms 대기
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);      // 최대 500건

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);  // 3개의 컨슈머 스레드
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

// Consumer 리스너
@Service
@Slf4j
public class OrderEventConsumer {

    @KafkaListener(
        topics = "order-events",
        groupId = "order-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            ConsumerRecord<String, OrderEvent> record,
            Acknowledgment ack) {
        try {
            log.info("Received: topic={}, partition={}, offset={}, key={}",
                record.topic(), record.partition(), record.offset(), record.key());

            OrderEvent event = record.value();
            processOrderEvent(event);

            ack.acknowledge();  // 수동 커밋
        } catch (Exception e) {
            log.error("Failed to process order event", e);
            // 에러 처리: DLQ(Dead Letter Queue)로 전송하거나 재시도
        }
    }
}

Consumer Group과 파티션 할당:

Topic: order-events (6 partitions)

Consumer Group: order-service-group
  Consumer-1: P0, P1  (2개 파티션 담당)
  Consumer-2: P2, P3  (2개 파티션 담당)
  Consumer-3: P4, P5  (2개 파티션 담당)

규칙: 하나의 파티션은 그룹 내 하나의 컨슈머만 소비 가능
     컨슈머 수 > 파티션 수이면 일부 컨슈머는 유휴 상태

리밸런싱(Rebalancing): 컨슈머가 그룹에 참가하거나 탈퇴하면 파티션 재할당이 발생합니다. 리밸런싱 중에는 모든 컨슈머가 일시적으로 소비를 중단합니다.

Exactly-Once Semantics (EOS)

Kafka의 전달 보증 수준은 세 가지입니다.

  • At-Most-Once: 메시지가 유실될 수 있지만 중복은 없음
  • At-Least-Once: 메시지가 중복될 수 있지만 유실은 없음
  • Exactly-Once: 메시지가 정확히 한 번만 처리됨

Exactly-Once를 달성하려면 Transactional Producer와 Consumer를 함께 사용합니다.

// Transactional Producer 설정
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-producer-1");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 트랜잭션 사용 예제
@Service
@RequiredArgsConstructor
public class OrderProcessor {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void processOrder(Order order) {
        kafkaTemplate.executeInTransaction(ops -> {
            // 하나의 트랜잭션으로 묶임
            ops.send("order-confirmed", order.getId(), new OrderConfirmedEvent(order));
            ops.send("inventory-update", order.getProductId(), new InventoryUpdateEvent(order));
            ops.send("notification", order.getUserId(), new NotificationEvent(order));
            return true;
        });
        // 3개의 메시지가 모두 성공하거나 모두 실패
    }
}

Kafka Streams 개요

Kafka Streams는 Kafka 위에 구축된 스트림 처리 라이브러리입니다.

// KStream: 이벤트 스트림 (각 레코드가 독립적)
// KTable: 변경 로그 (키의 최신 값만 유지)

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    public KStream<String, OrderEvent> orderStream(StreamsBuilder builder) {
        KStream<String, OrderEvent> stream = builder.stream("order-events");

        // 1. 필터링 + 변환
        stream
            .filter((key, event) -> event.getAmount() > 10000)  // 1만원 이상
            .mapValues(event -> new HighValueOrderEvent(event))
            .to("high-value-orders");

        // 2. 윈도우 집계: 5분 간격 주문 금액 합계
        stream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .aggregate(
                () -> 0L,
                (key, event, total) -> total + event.getAmount(),
                Materialized.as("order-amount-store")
            )
            .toStream()
            .to("order-amount-per-5min");

        // 3. KStream-KTable Join
        KTable<String, UserProfile> userTable = builder.table("user-profiles");
        stream
            .join(userTable,
                (order, user) -> new EnrichedOrder(order, user))
            .to("enriched-orders");

        return stream;
    }
}

Kafka Connect와 Debezium CDC

Kafka Connect는 외부 시스템과 Kafka 간 데이터를 안정적으로 이동시키는 프레임워크입니다.

{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-server",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "1",
    "topic.prefix": "myapp",
    "database.include.list": "orders_db",
    "table.include.list": "orders_db.orders,orders_db.order_items",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes"
  }
}

Debezium은 CDC(Change Data Capture) 커넥터로, 데이터베이스의 변경 사항을 실시간으로 Kafka 토픽에 스트리밍합니다. INSERT, UPDATE, DELETE 이벤트가 모두 캡처됩니다.

Schema Registry + Avro

스키마 레지스트리는 Kafka 메시지의 스키마를 중앙에서 관리하여 호환성을 보장합니다.

// user-event.avsc (Avro 스키마)
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.example.events",
  "fields": [
    {"name": "userId", "type": "string"},
    {"name": "action", "type": {"type": "enum", "name": "Action",
      "symbols": ["CREATED", "UPDATED", "DELETED"]}},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}

성능 튜닝 가이드

┌──────────────────────────────────────────────────────────────┐
Producer 튜닝                              │
├──────────────────┬───────────────────────────────────────────┤
│ batch.size16KB(기본)32~64KB (처리량 증가)│ linger.ms0(기본)10~50ms (배치 효율성)│ compression.type │ none → lz4 (속도), zstd (압축률)│ buffer.memory32MB(기본)64~128MB (대량 전송 시)│ max.in.flight5(기본)1 (순서 보장 필요 시)├──────────────────┴───────────────────────────────────────────┤
Consumer 튜닝                              │
├──────────────────┬───────────────────────────────────────────┤
│ fetch.min.bytes1(기본)1024~10240 (배치 페치)│ fetch.max.wait500ms(기본)조정 (지연 vs 처리량)│ max.poll.records500(기본) → 워크로드에 따라 조정            │
│ session.timeout45s(기본) → 리밸런싱 민감도 조절            │
├──────────────────┴───────────────────────────────────────────┤
Broker 튜닝                                │
├──────────────────┬───────────────────────────────────────────┤
│ num.partitions   │ 토픽 기본 파티션  (목표 처리량 / 파티션당)│ replication      │ 3 (프로덕션 권장)│ min.insync2 (acks=all과 함께 사용)│ log.retention7(기본) → 비즈니스 요구에 따라             │
└──────────────────┴───────────────────────────────────────────┘

운영: 파티션 수 결정 공식

파티션 수를 결정하는 경험적 공식입니다.

목표 처리량 = T (messages/sec)
단일 파티션 처리량 = P (messages/sec)
필요 파티션 수 = max(T/P_producer, T/P_consumer)

예시:
- 목표: 100,000 msg/sec
- Producer 단일 파티션: 50,000 msg/sec
- Consumer 단일 파티션: 10,000 msg/sec
- 필요 파티션: max(100K/50K, 100K/10K) = max(2, 10) = 10
실무 권장:
- 소규모: 6~12 파티션
- 중규모: 12~30 파티션
- 대규모: 30~100 파티션
- 초대규모: 100+ (KRaft 모드에서 100만개 이상 지원)

4. ZooKeeper에서 KRaft로: 역사적 전환

ZooKeeper란?

Apache ZooKeeper는 분산 시스템의 코디네이션을 담당하는 서비스입니다. ZAB(ZooKeeper Atomic Broadcast) 프로토콜 기반의 합의 알고리즘을 사용합니다.

┌────────────────────────────────────────────┐
ZooKeeper Ensemble│                                            │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐   │
│  │  ZK Node  │ │  ZK Node  │ │  ZK Node  │   │
  (Leader)(Follower)(Follower)│   │
│  └──────────┘ └──────────┘ └──────────┘   │
│                                            │
│  저장 데이터:/brokers/ids/[0,1,2]     - 브로커 목록     │
/brokers/topics/my-topic - 토픽 메타데이터  │
/controller              - 컨트롤러 선출    │
/admin/delete_topics     - 토픽 삭제 요청   │
/config                  - 동적 설정        │
└────────────────────────────────────────────┘
           ▲          ▲          ▲
           │          │          │
     Kafka Broker1  Broker2   Broker3

Kafka가 ZooKeeper에 의존했던 이유는 다음과 같습니다.

  • 컨트롤러 선출: 클러스터 내 하나의 브로커가 컨트롤러 역할을 담당
  • 토픽/파티션 메타데이터: 토픽 설정, 파티션 할당 정보 저장
  • 브로커 등록: 살아있는 브로커 목록 관리 (Ephemeral Node)
  • ACL 관리: 접근 제어 목록 저장
  • 컨슈머 그룹 관리: 과거에는 컨슈머 오프셋도 ZK에 저장 (현재는 내부 토픽 사용)

ZooKeeper의 한계

ZooKeeper는 Kafka의 규모가 커지면서 여러 문제를 드러냈습니다.

  1. 운영 복잡성: Kafka와 별도로 ZooKeeper 클러스터를 운영해야 함 (별도의 모니터링, 패치, 백업)
  2. 확장성 병목: ZK의 znode에 메타데이터를 저장하므로, 수십만 개 이상의 파티션에서 성능 저하
  3. 컨트롤러 페일오버 지연: 컨트롤러가 죽으면 새 컨트롤러가 ZK에서 전체 메타데이터를 로드해야 하므로 수 분 소요 가능
  4. 단일 장애점 가능성: ZK 클러스터 자체가 과부하되거나 장애 발생 시 Kafka 전체에 영향
  5. 이중 인프라 비용: ZK 3노드 + Kafka N노드 운영 부담

KRaft 모드 (KIP-500)

KRaft(Kafka Raft)는 ZooKeeper를 완전히 제거하고, Kafka 자체에 Raft 기반 합의 프로토콜을 내장한 아키텍처입니다.

┌────────────────────────────────────────────────────┐
Kafka Cluster (KRaft Mode)│                                                    │
│  ┌───────────────────────────────────────────┐     │
│  │         Controller Quorum                  │     │
│  │                                           │     │
│  │  Broker-0      Broker-1      Broker-2     │     │
  (Controller   (Controller   (Controller  │     │
│  │   + Broker)     + Broker)     + Broker)   │     │
│  │  [Active]      [Follower]    [Follower]   │     │
│  └───────────────────────────────────────────┘     │
│                                                    │
│  ┌───────────────────────────────────────────┐     │
│  │         Data Brokers                       │     │
│  │  Broker-3      Broker-4      Broker-5     │     │
  (Broker only) (Broker only) (Broker only)│     │
│  └───────────────────────────────────────────┘     │
│                                                    │
Metadata Log: __cluster_metadata (내부 토픽)│  합의 프로토콜: Raft (자체 구현)└────────────────────────────────────────────────────┘

KRaft의 장점

항목ZooKeeper 모드KRaft 모드
외부 의존성ZK 클러스터 필요없음 (자체 완결)
운영 복잡성높음 (2개 시스템)낮음 (1개 시스템)
최대 파티션 수약 20만 개100만 개 이상
컨트롤러 페일오버수 분수 초
메타데이터 전파ZK Watch + RPCRaft 로그 복제
배포 복잡성ZK 별도 설치/설정Kafka만 설치

마이그레이션 가이드: ZK에서 KRaft로

Kafka 3.6 이상에서 KRaft가 GA(General Availability)이며, Kafka 4.0에서 ZooKeeper는 완전히 제거됩니다.

# 1단계: 클러스터 ID 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --cluster-id

# 2단계: ZK에서 메타데이터 마이그레이션 준비
bin/kafka-storage.sh format \
  --config config/kraft/server.properties \
  --cluster-id YOUR_CLUSTER_ID \
  --release-version 3.7

# 3단계: 브로커 설정 변경 (server.properties)
# ZK 모드 설정 제거
# zookeeper.connect=localhost:2181  ← 제거

# KRaft 모드 설정 추가
# process.roles=broker,controller
# node.id=1
# controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
# controller.listener.names=CONTROLLER
# listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# 4단계: 롤링 재시작 (하나씩 순차적으로)
# 각 브로커를 중지 → KRaft 설정 적용 → 재시작

# 5단계: 마이그레이션 완료 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --verify

마이그레이션 시 주의사항:

  • 반드시 Kafka 3.6 이상 버전에서 진행
  • 마이그레이션 전 전체 백업 수행
  • 스테이징 환경에서 먼저 검증
  • 롤링 방식으로 진행하여 다운타임 최소화
  • 마이그레이션 중에는 토픽 생성/삭제 자제

5. Streaming API 비교: SSE vs WebSocket vs gRPC Streaming

세 가지 Streaming API 비교

항목SSEWebSocketgRPC Streaming
통신 방향서버 → 클라이언트 (단방향)양방향 (Full Duplex)양방향 (Full Duplex)
프로토콜HTTP/1.1WS (HTTP Upgrade)HTTP/2
데이터 형식Text (UTF-8)Text / BinaryProtobuf (Binary)
자동 재연결내장 지원수동 구현 필요수동 구현 필요
브라우저 지원모든 모던 브라우저모든 모던 브라우저grpc-web 필요
멀티플렉싱불가 (HTTP/1.1)불가가능 (HTTP/2)
프록시/방화벽친화적 (HTTP)일부 제한일부 제한
직렬화 오버헤드낮음낮음~중간매우 낮음
적합한 케이스알림, 피드, 시세채팅, 게임, 협업MSA 간 통신

SSE (Server-Sent Events) 구현

WebFlux 기반 SSE:

@RestController
@RequestMapping("/api/stream")
public class SseController {

    private final StockPriceService stockPriceService;

    // 방법 1: Flux + ServerSentEvent 래퍼
    @GetMapping(value = "/stocks", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
            @RequestParam List<String> symbols) {
        return stockPriceService.getPriceStream(symbols)
            .map(price -> ServerSentEvent.<StockPrice>builder()
                .id(String.valueOf(price.getTimestamp()))
                .event("price-update")
                .data(price)
                .retry(Duration.ofSeconds(5))
                .build());
    }

    // 방법 2: 심플한 Flux 스트림
    @GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<NotificationEvent> streamNotifications(
            @AuthenticationPrincipal UserDetails user) {
        return notificationService.getNotificationStream(user.getUsername())
            .doOnCancel(() -> log.info("Client disconnected: {}", user.getUsername()));
    }

    // 방법 3: Heartbeat 포함 (연결 유지)
    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Object>> streamWithHeartbeat() {
        Flux<ServerSentEvent<Object>> dataStream = eventService.getEventStream()
            .map(event -> ServerSentEvent.builder()
                .event("data")
                .data((Object) event)
                .build());

        Flux<ServerSentEvent<Object>> heartbeat = Flux.interval(Duration.ofSeconds(30))
            .map(tick -> ServerSentEvent.builder()
                .event("heartbeat")
                .data((Object) "ping")
                .build());

        return Flux.merge(dataStream, heartbeat);
    }
}

클라이언트 (JavaScript):

const eventSource = new EventSource('/api/stream/stocks?symbols=AAPL,GOOGL,MSFT')

eventSource.addEventListener('price-update', (event) => {
  const price = JSON.parse(event.data)
  updateDashboard(price)
})

eventSource.addEventListener('heartbeat', () => {
  console.log('Connection alive')
})

eventSource.onerror = (error) => {
  console.error('SSE error:', error)
  // EventSource는 자동으로 재연결을 시도합니다
}

WebSocket 구현

WebFlux 기반 WebSocket:

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping webSocketHandlerMapping(ChatWebSocketHandler handler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/chat", handler);

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Component
@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {

    private final Sinks.Many<ChatMessage> chatSink =
        Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 인바운드: 클라이언트 → 서버
        Mono<Void> input = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::parseChatMessage)
            .doOnNext(msg -> {
                log.info("Received: {}", msg);
                chatSink.tryEmitNext(msg);  // 모든 구독자에게 브로드캐스트
            })
            .then();

        // 아웃바운드: 서버 → 클라이언트
        Mono<Void> output = session.send(
            chatSink.asFlux()
                .map(msg -> session.textMessage(toJson(msg)))
        );

        // 양방향 동시 처리
        return Mono.zip(input, output).then();
    }

    private ChatMessage parseChatMessage(String payload) {
        return new ObjectMapper().readValue(payload, ChatMessage.class);
    }

    private String toJson(ChatMessage msg) {
        return new ObjectMapper().writeValueAsString(msg);
    }
}

gRPC Streaming 구현

Proto 파일 정의:

syntax = "proto3";

package stockservice;

service StockService {
  // Server Streaming: 서버가 연속으로 전송
  rpc StreamPrices(StockRequest) returns (stream StockPrice);

  // Client Streaming: 클라이언트가 연속으로 전송
  rpc SendOrders(stream OrderRequest) returns (OrderSummary);

  // Bidirectional Streaming: 양방향
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message StockRequest {
  repeated string symbols = 1;
}

message StockPrice {
  string symbol = 1;
  double price = 2;
  int64 timestamp = 3;
}

message OrderRequest {
  string symbol = 1;
  int32 quantity = 2;
  double price = 3;
}

message OrderSummary {
  int32 total_orders = 1;
  double total_amount = 2;
}

message ChatMessage {
  string user = 1;
  string content = 2;
  int64 timestamp = 3;
}

gRPC Server Streaming 구현:

@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {

    private final StockPriceService priceService;

    @Override
    public void streamPrices(StockRequest request,
                             StreamObserver<StockPrice> responseObserver) {
        // 구독 시작
        Disposable subscription = priceService
            .getPriceStream(request.getSymbolsList())
            .subscribe(
                price -> {
                    // 각 가격 업데이트를 클라이언트에 전송
                    responseObserver.onNext(StockPrice.newBuilder()
                        .setSymbol(price.getSymbol())
                        .setPrice(price.getPrice())
                        .setTimestamp(price.getTimestamp())
                        .build());
                },
                error -> {
                    responseObserver.onError(Status.INTERNAL
                        .withDescription(error.getMessage())
                        .asRuntimeException());
                },
                () -> {
                    responseObserver.onCompleted();
                }
            );

        // 클라이언트 연결 해제 시 구독 취소
        Context.current().addListener(
            context -> subscription.dispose(),
            Runnable::run
        );
    }
}

Streaming API 선택 가이드

                    ┌─────────────┐
                    │ 양방향 통신  │
                    │  필요한가?                    └──────┬──────┘
/     \ 아니오
                      /       \
            ┌────────▼──┐  ┌──▼──────────┐
            │ 브라우저    │  │    SSE            │ 직접 통신? (가장 간단)            └─────┬─────┘  └─────────────┘
/   \ 아니오
               /     \
    ┌─────────▼─┐  ┌─▼──────────────┐
WebSocket │  │ gRPC Streaming     (브라우저) (MSA/백엔드)    └───────────┘  └────────────────┘

6. WebFlux + Kafka 통합 아키텍처

Reactor Kafka 설정

Reactor Kafka는 Kafka를 리액티브하게 사용할 수 있게 해주는 라이브러리입니다.

// build.gradle
// implementation 'io.projectreactor.kafka:reactor-kafka:1.3.23'

@Configuration
public class ReactorKafkaConfig {

    @Bean
    public ReactiveKafkaProducerTemplate<String, Object> reactiveProducer() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        SenderOptions<String, Object> senderOptions = SenderOptions.create(props);
        return new ReactiveKafkaProducerTemplate<>(senderOptions);
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, StockPrice> reactiveConsumer() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-price-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        ReceiverOptions<String, StockPrice> receiverOptions =
            ReceiverOptions.<String, StockPrice>create(props)
                .subscription(Collections.singleton("stock-prices"));

        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }
}

실시간 데이터 파이프라인: Kafka에서 브라우저까지

┌──────────┐    ┌─────────┐    ┌───────────┐    ┌──────────┐    ┌──────────┐
External │    │  Kafka  │    │  WebFlux  │    │   SSE    │    │  ReactAPI    │───►│ Producer│───►│ Consumer  │───►│ Endpoint │───►│ Frontend│          │    │         │     (Reactor) │    │          │    │          │
└──────────┘    └─────────┘    └───────────┘    └──────────┘    └──────────┘
  시세 소스        토픽 발행       리액티브 소비     HTTP 스트림     실시간 렌더링
// 1. Kafka Producer: 외부 API에서 시세 데이터를 Kafka로
@Service
@RequiredArgsConstructor
@Slf4j
public class StockPriceProducer {

    private final ReactiveKafkaProducerTemplate<String, Object> producer;
    private final WebClient marketDataClient;

    @Scheduled(fixedRate = 1000)  // 1초마다
    public void fetchAndPublish() {
        marketDataClient.get()
            .uri("/v1/quotes?symbols=AAPL,GOOGL,MSFT")
            .retrieve()
            .bodyToFlux(StockPrice.class)
            .flatMap(price -> producer.send("stock-prices", price.getSymbol(), price))
            .doOnNext(result -> log.debug("Sent: topic={}, partition={}, offset={}",
                result.recordMetadata().topic(),
                result.recordMetadata().partition(),
                result.recordMetadata().offset()))
            .subscribe();
    }
}

// 2. Kafka Consumer + SSE: Kafka에서 브라우저로
@RestController
@RequestMapping("/api/stream")
@RequiredArgsConstructor
public class StockStreamController {

    private final ReactiveKafkaConsumerTemplate<String, StockPrice> consumer;

    // Kafka 토픽을 SSE로 브리지
    @GetMapping(value = "/stock-prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
            @RequestParam(required = false) List<String> symbols) {

        return consumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .filter(price -> symbols == null || symbols.contains(price.getSymbol()))
            .map(price -> ServerSentEvent.<StockPrice>builder()
                .id(price.getSymbol() + "-" + price.getTimestamp())
                .event("price-update")
                .data(price)
                .build())
            .doOnCancel(() -> log.info("Client disconnected from stock stream"));
    }
}

CQRS + Event Sourcing with Kafka

┌──────────────────────────────────────────────────────────┐
CQRS Pattern│                                                          │
Command SideQuery Side│                            │                             │
│  ┌──────────┐              │  ┌───────────┐              │
│  │ WebFluxCommand     │  │  WebFluxQuery│  │Controller│────────┐     │  │Controller │◄─────┐       │
│  └──────────┘        │     │  └───────────┘      │       │
│                      ▼     │                     │       │
│  ┌──────────────────────┐  │  ┌─────────────────────┐   │
│  │   Kafka Topic        │  │  │   Read Database      │   │
   (Event Store)      │──┼─►│   (Materialized     │   │
│  │   order-events       │  │  │    View / Redis)     │   │
│  └──────────────────────┘  │  └─────────────────────┘   │
│                            │                             │
└──────────────────────────────────────────────────────────┘
// Command: 주문 생성 → Kafka에 이벤트 발행
@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final ReactiveKafkaProducerTemplate<String, Object> producer;

    public Mono<String> createOrder(CreateOrderCommand command) {
        String orderId = UUID.randomUUID().toString();
        OrderCreatedEvent event = new OrderCreatedEvent(
            orderId, command.getUserId(), command.getItems(),
            command.getTotalAmount(), Instant.now()
        );

        return producer.send("order-events", orderId, event)
            .map(result -> orderId);
    }
}

// Query: Kafka 이벤트를 소비하여 읽기 모델 업데이트
@Service
@RequiredArgsConstructor
public class OrderProjectionService {

    private final ReactiveKafkaConsumerTemplate<String, OrderEvent> consumer;
    private final ReactiveRedisTemplate<String, OrderView> redisTemplate;

    @PostConstruct
    public void startProjection() {
        consumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .flatMap(this::updateProjection)
            .doOnError(e -> log.error("Projection error", e))
            .retry()
            .subscribe();
    }

    private Mono<Boolean> updateProjection(OrderEvent event) {
        if (event instanceof OrderCreatedEvent created) {
            OrderView view = OrderView.from(created);
            return redisTemplate.opsForValue()
                .set("order:" + created.getOrderId(), view);
        }
        return Mono.just(true);
    }
}

// Query Controller: Redis에서 읽기
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderQueryController {

    private final ReactiveRedisTemplate<String, OrderView> redisTemplate;

    @GetMapping("/{orderId}")
    public Mono<OrderView> getOrder(@PathVariable String orderId) {
        return redisTemplate.opsForValue()
            .get("order:" + orderId)
            .switchIfEmpty(Mono.error(new OrderNotFoundException(orderId)));
    }
}

7. 실전 프로젝트: 실시간 주식 시세 시스템

전체 아키텍처

┌────────────────────────────────────────────────────────────────────┐
Real-Time Stock Price System│                                                                    │
│  ┌──────────┐   ┌──────────┐   ┌────────────┐   ┌──────────────┐ │
│  │ Market   │   │  Kafka   │   │  WebFlux   │   │   React      │ │
│  │ Data API │──►│ Producer │──►│  Consumer  │──►│  Frontend    │ │
 (Yahoo/  │   │          │   │  + SSE  (Dashboard) │ │
│  │  Alpha)  │   │          │   │            │   │              │ │
│  └──────────┘   └──────────┘   └────────────┘   └──────────────┘ │
│       │              │               │                  │         │
│       │         ┌────▼────┐    ┌─────▼─────┐    ┌──────▼──────┐ │
│       │         │  Kafka  │    │  Redis     │    │ Prometheus  │ │
│       │         │  Topic  │    │  Cache     │    │ + Grafana   │ │
│       │          (3 rep) │    │            │    │             │ │
│       │         └─────────┘    └───────────┘    └─────────────┘ │
│       │                                                          │
│  ┌────▼──────────────────────────────────────┐                   │
│  │  Kafka Streams: 집계/분석                   │                   │
│  │  - 5분 이동평균                              │                   │
│  │  - 일일 최고/최저가                          │                   │
│  │  - 거래량 이상 탐지                          │                   │
│  └───────────────────────────────────────────┘                   │
└────────────────────────────────────────────────────────────────────┘

핵심 코드 스니펫

1) Market Data Fetcher:

@Component
@RequiredArgsConstructor
@Slf4j
public class MarketDataFetcher {

    private final WebClient marketClient;
    private final ReactiveKafkaProducerTemplate<String, Object> kafkaProducer;

    @Scheduled(fixedRate = 1000)
    public void fetchPrices() {
        List<String> symbols = List.of("AAPL", "GOOGL", "MSFT", "AMZN", "TSLA");

        Flux.fromIterable(symbols)
            .flatMap(symbol -> marketClient.get()
                .uri("/quote/{symbol}", symbol)
                .retrieve()
                .bodyToMono(MarketQuote.class)
                .map(quote -> new StockPrice(
                    symbol, quote.getPrice(), quote.getVolume(),
                    quote.getChange(), Instant.now().toEpochMilli()))
                .onErrorResume(e -> {
                    log.warn("Failed to fetch {}: {}", symbol, e.getMessage());
                    return Mono.empty();
                }))
            .flatMap(price -> kafkaProducer
                .send("stock-prices", price.getSymbol(), price)
                .doOnSuccess(r -> log.debug("Published: {} = {}",
                    price.getSymbol(), price.getPrice())))
            .subscribe();
    }
}

2) Kafka Streams 집계:

@Configuration
@EnableKafkaStreams
public class StockAnalyticsConfig {

    @Bean
    public KStream<String, StockPrice> stockAnalytics(StreamsBuilder builder) {
        KStream<String, StockPrice> priceStream = builder.stream("stock-prices");

        // 5분 이동평균 계산
        priceStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .aggregate(
                PriceAggregation::new,
                (symbol, price, agg) -> agg.add(price.getPrice()),
                Materialized.<String, PriceAggregation, WindowStore<Bytes, byte[]>>
                    as("price-5min-avg")
                    .withValueSerde(new PriceAggregationSerde()))
            .toStream()
            .map((windowed, agg) -> KeyValue.pair(
                windowed.key(),
                new PriceAverage(windowed.key(), agg.getAverage(),
                    agg.getMin(), agg.getMax(), agg.getCount())))
            .to("stock-price-analytics");

        return priceStream;
    }
}

// 집계 클래스
public class PriceAggregation {
    private double sum = 0;
    private double min = Double.MAX_VALUE;
    private double max = Double.MIN_VALUE;
    private long count = 0;

    public PriceAggregation add(double price) {
        sum += price;
        min = Math.min(min, price);
        max = Math.max(max, price);
        count++;
        return this;
    }

    public double getAverage() {
        return count > 0 ? sum / count : 0;
    }

    // getters ...
}

3) SSE 엔드포인트:

@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {

    private final ReactiveKafkaConsumerTemplate<String, StockPrice> priceConsumer;
    private final ReactiveKafkaConsumerTemplate<String, PriceAverage> analyticsConsumer;

    // 실시간 시세 스트림
    @GetMapping(value = "/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> liveStream(
            @RequestParam List<String> symbols) {
        return priceConsumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .filter(p -> symbols.contains(p.getSymbol()))
            .map(p -> ServerSentEvent.<StockPrice>builder()
                .event("price")
                .data(p)
                .build());
    }

    // 분석 데이터 스트림
    @GetMapping(value = "/analytics", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<PriceAverage>> analyticsStream() {
        return analyticsConsumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .map(avg -> ServerSentEvent.<PriceAverage>builder()
                .event("analytics")
                .data(avg)
                .build());
    }
}

4) React Frontend (TypeScript):

// hooks/useStockStream.ts
import { useState, useEffect, useCallback } from 'react';

interface StockPrice {
  symbol: string;
  price: number;
  volume: number;
  change: number;
  timestamp: number;
}

export function useStockStream(symbols: string[]) {
  const [prices, setPrices] = useState<Map<string, StockPrice>>(new Map());

  useEffect(() => {
    const params = symbols.map(s => `symbols=${s}`).join('&');
    const eventSource = new EventSource(`/api/stocks/live?${params}`);

    eventSource.addEventListener('price', (event: MessageEvent) => {
      const price: StockPrice = JSON.parse(event.data);
      setPrices(prev => new Map(prev).set(price.symbol, price));
    });

    eventSource.onerror = () => {
      console.error('SSE connection error, will auto-reconnect...');
    };

    return () => eventSource.close();
  }, [symbols.join(',')]);

  return prices;
}

// components/StockDashboard.tsx
function StockDashboard() {
  const prices = useStockStream(['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']);

  return (
    <div className="grid grid-cols-5 gap-4">
      {Array.from(prices.entries()).map(([symbol, price]) => (
        <div key={symbol} className="p-4 rounded-lg bg-gray-800">
          <h3 className="text-lg font-bold">{symbol}</h3>
          <p className="text-2xl">${price.price.toFixed(2)}</p>
          <p className={price.change >= 0 ? 'text-green-400' : 'text-red-400'}>
            {price.change >= 0 ? '+' : ''}{price.change.toFixed(2)}%
          </p>
        </div>
      ))}
    </div>
  );
}

확장성: 파티션 기반 수평 확장

확장  (1 인스턴스):
  WebFlux Instance-1Kafka Consumer (P0, P1, P2, P3, P4, P5)

확장  (3 인스턴스):
  WebFlux Instance-1Kafka Consumer (P0, P1)SSE Client 1~1000
  WebFlux Instance-2Kafka Consumer (P2, P3)SSE Client 1001~2000
  WebFlux Instance-3Kafka Consumer (P4, P5)SSE Client 2001~3000

핵심: Kafka 파티션 수 = 최대 병렬 Consumer     파티션 (symbol)로 같은 종목은 같은 파티션에 순서 보장

모니터링 설정

# docker-compose-monitoring.yml
services:
  prometheus:
    image: prom/prometheus:v2.51.0
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - '9090:9090'

  grafana:
    image: grafana/grafana:10.4.0
    ports:
      - '3000:3000'
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

  kafka-exporter:
    image: danielqsj/kafka-exporter:latest
    command: ['--kafka.server=kafka:9092']
    ports:
      - '9308:9308'
// Micrometer 메트릭 추가
@Component
@RequiredArgsConstructor
public class StockMetrics {

    private final MeterRegistry registry;
    private final AtomicLong connectedClients = new AtomicLong(0);

    @PostConstruct
    void init() {
        Gauge.builder("stock.stream.connected_clients", connectedClients, AtomicLong::get)
            .description("Number of connected SSE clients")
            .register(registry);
    }

    public void recordPricePublished(String symbol) {
        registry.counter("stock.price.published",
            "symbol", symbol).increment();
    }

    public void recordLatency(String symbol, long latencyMs) {
        registry.timer("stock.price.latency",
            "symbol", symbol)
            .record(Duration.ofMillis(latencyMs));
    }

    public void clientConnected() { connectedClients.incrementAndGet(); }
    public void clientDisconnected() { connectedClients.decrementAndGet(); }
}

8. 언제 무엇을 선택할 것인가?

기술 선택 의사결정 매트릭스

시나리오추천 기술이유
간단한 CRUD APISpring MVC학습 곡선 낮고 생태계 풍부
높은 동시접속 APIWebFlux적은 리소스로 만 단위 동시접속
이벤트 스트리밍/로그Kafka높은 처리량, 내구성, 리플레이 가능
실시간 알림/피드SSE단방향, 자동 재연결, HTTP 호환
실시간 채팅/게임WebSocket양방향, 낮은 지연시간
MSA 간 실시간 통신gRPC Streaming양방향, Protobuf, 높은 성능
CDC(변경 감지)Kafka Connect + DebeziumDB 변경을 실시간 캡처
복잡한 스트림 처리Kafka StreamsStateful 처리, 윈도우 집계
CQRS 패턴Kafka + WebFlux이벤트 소싱 + 리액티브 쿼리

조합 패턴

실전에서는 단일 기술이 아니라 여러 기술을 조합합니다.

패턴 1: 실시간 대시보드
  Data SourceKafkaWebFlux ConsumerSSEBrowser

패턴 2: 실시간 채팅 + 메시지 저장
  BrowserWebSocketWebFluxKafka (저장)Elasticsearch (검색)

패턴 3: MSA 이벤트 드리븐
  Service AKafkaService B (WebFlux Consumer)
Service C (WebFlux Consumer)
Analytics (Kafka Streams)

패턴 4: IoT 데이터 처리
  SensorsMQTTKafka ConnectKafkaKafka Streams (집계)
WebFluxSSEDashboard

안티패턴 (피해야 할 것들)

  1. WebFlux에서 JDBC 사용: 이벤트 루프를 블로킹하여 성능 급격 저하. R2DBC 또는 Schedulers.boundedElastic() 사용
  2. 모든 API를 WebFlux로: 간단한 CRUD는 MVC가 더 적합. 불필요한 복잡성 증가
  3. Kafka를 단순 메시지 큐로: RabbitMQ/SQS가 더 적합한 경우도 있음. Kafka는 이벤트 로그에 최적화
  4. 파티션 수 과다 설정: 파티션이 많으면 리밸런싱 비용 증가. 필요에 맞게 설정
  5. SSE에서 대용량 데이터: SSE는 텍스트 기반이므로 바이너리 데이터에 부적합. gRPC 사용

9. 학습 로드맵 (6개월)

1개월 차: 리액티브 프로그래밍 기초

  • Week 1-2: Project Reactor 핵심 (Mono, Flux, 오퍼레이터)
    • 공식 문서의 "Learn" 섹션 완독
    • Lite Rx Hands-On 실습 프로젝트
  • Week 3-4: Spring WebFlux 기본
    • 간단한 REST API를 MVC에서 WebFlux로 마이그레이션
    • R2DBC 연동 실습

2개월 차: WebFlux 심화

  • Week 1-2: WebClient, WebSocket, SSE 구현
    • 실시간 알림 시스템 미니 프로젝트
  • Week 3-4: 테스트 (StepVerifier, WebTestClient)
    • BlockHound 적용
    • 성능 테스트 (Gatling/k6)

3개월 차: Apache Kafka 기초

  • Week 1-2: Kafka 아키텍처, Producer/Consumer
    • Docker Compose로 로컬 Kafka 클러스터 구축
    • 기본 메시지 발행/소비 실습
  • Week 3-4: Consumer Group, Offset 관리, EOS
    • 멀티 컨슈머 그룹 시나리오 실습
    • Transactional Producer 구현

4개월 차: Kafka 심화

  • Week 1-2: Kafka Streams, KTable/KStream
    • 윈도우 집계, Join 실습
  • Week 3-4: Kafka Connect, Schema Registry
    • Debezium CDC 연동
    • Avro 스키마 관리

5개월 차: 통합 프로젝트

  • Week 1-2: WebFlux + Kafka 통합
    • Reactor Kafka 활용
    • CQRS + Event Sourcing 패턴
  • Week 3-4: 실시간 대시보드 프로젝트
    • Kafka에서 SSE까지 전체 파이프라인
    • React 프론트엔드 연동

6개월 차: 운영과 최적화

  • Week 1-2: KRaft 모드 마이그레이션 실습
    • 모니터링 (Prometheus + Grafana)
    • 알림 설정 (Consumer Lag, Error Rate)
  • Week 3-4: 성능 튜닝과 장애 대응
    • 부하 테스트 및 병목 분석
    • 장애 시나리오 시뮬레이션 (브로커 다운, 네트워크 파티션)

실전 퀴즈

아래 퀴즈를 통해 학습 내용을 점검해 보세요.

Q1: WebFlux에서 Mono.just()과 Mono.defer()의 차이는 무엇인가요?

Mono.just(value)는 값을 즉시 캡처하여 Mono로 감쌉니다. 구독 시점과 관계없이 동일한 값을 반환합니다.

Mono.defer(() -> Mono.just(value))는 구독 시점에 람다를 평가합니다. 따라서 매 구독마다 새로운 값을 생성할 수 있습니다.

// just: 생성 시점에 값이 결정됨
Mono<Long> eager = Mono.just(System.currentTimeMillis());
// 1초 후 구독해도 동일한 타임스탬프

// defer: 구독 시점에 값이 결정됨
Mono<Long> lazy = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
// 구독할 때마다 새로운 타임스탬프

실무에서 switchIfEmpty와 함께 사용할 때 중요합니다. switchIfEmpty(Mono.just(createDefault()))는 createDefault()가 즉시 실행되지만, switchIfEmpty(Mono.defer(() -> createDefault()))는 실제로 빈 경우에만 실행됩니다.

Q2: Kafka에서 acks=all과 min.insync.replicas=2를 함께 설정하면 어떤 보장을 제공하나요?

acks=all은 Leader가 모든 ISR(In-Sync Replicas)의 확인을 받은 후에 Producer에게 성공을 반환합니다.

min.insync.replicas=2는 ISR의 최소 크기를 2로 설정합니다. ISR이 2 미만이면 Producer는 NotEnoughReplicasException을 받습니다.

이 조합은 다음을 보장합니다:

  • 메시지가 최소 2개의 브로커에 기록된 후에만 성공 응답
  • Replication Factor가 3일 때, 1대의 브로커가 장애나도 데이터 유실 없음
  • 2대 이상 동시 장애 시 쓰기 실패 (데이터 유실 방지를 위해 의도적 거부)

프로덕션에서 가장 많이 사용하는 안정성 설정입니다.

Q3: SSE 연결에서 클라이언트가 네트워크 장애로 끊어졌다가 재연결할 때, 어떻게 유실된 이벤트를 복구할 수 있나요?

SSE는 Last-Event-ID 헤더를 통해 재연결 시 마지막으로 수신한 이벤트 ID를 서버에 전달합니다. 서버는 이를 기반으로 유실된 이벤트를 재전송할 수 있습니다.

구현 방법:

  1. 서버에서 각 이벤트에 고유 ID를 부여합니다 (예: Kafka offset 또는 타임스탬프).
  2. 클라이언트 재연결 시 Last-Event-ID 헤더를 확인합니다.
  3. 해당 ID 이후의 이벤트를 Kafka에서 다시 소비하여 전송합니다.
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<EventData>> stream(
        @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
    long startOffset = lastEventId != null ? Long.parseLong(lastEventId) + 1 : -1;
    // startOffset부터 Kafka 소비 시작
    return eventService.getEventsFrom(startOffset)
        .map(e -> ServerSentEvent.<EventData>builder()
            .id(String.valueOf(e.getOffset()))
            .data(e)
            .build());
}

Kafka의 메시지 보존(retention) 덕분에 일정 기간 내의 이벤트는 언제든 다시 읽을 수 있습니다.

Q4: KRaft 모드에서 Controller Quorum의 Active Controller가 장애나면 어떻게 되나요?

KRaft 모드에서는 Raft 합의 프로토콜을 사용하여 Controller 선출을 자동으로 처리합니다.

  1. Active Controller가 하트비트를 보내지 않으면, Follower Controller들이 이를 감지합니다.
  2. Election Timeout이 만료되면 Follower 중 하나가 후보(Candidate)로 전환됩니다.
  3. 과반수(Quorum)의 투표를 받은 후보가 새로운 Active Controller가 됩니다.
  4. 새 Controller는 이미 Raft 로그를 통해 메타데이터를 복제하고 있으므로, ZooKeeper 모드처럼 전체 메타데이터를 다시 로드할 필요가 없습니다.

결과적으로 페일오버 시간이 ZooKeeper 모드의 수 분에서 수 초로 단축됩니다. Controller Quorum이 3개(1 Active + 2 Follower)일 때, 1대 장애까지 허용 가능합니다.

Q5: WebFlux에서 flatMap과 concatMap의 차이를 설명하고, 각각 언제 사용해야 하나요?

flatMap: 내부 Publisher를 구독할 때 동시성(concurrency)을 허용합니다. 여러 내부 스트림이 인터리빙(interleaving)되어 순서가 보장되지 않습니다.

concatMap: 내부 Publisher를 순차적으로 구독합니다. 이전 스트림이 완료된 후에 다음 스트림을 구독하므로 순서가 보장됩니다.

// flatMap: 순서 무관, 최대 성능
// 사용 예: 여러 사용자의 프로필을 병렬로 조회
Flux.fromIterable(userIds)
    .flatMap(id -> userService.findById(id))  // 동시 실행
    // 결과 순서: [User3, User1, User2] (비결정적)

// concatMap: 순서 보장, 순차 실행
// 사용 예: 이벤트를 순서대로 처리해야 할 때
Flux.fromIterable(events)
    .concatMap(event -> processEvent(event))  // 하나씩 순차 실행
    // 결과 순서: [Event1, Event2, Event3] (항상 동일)

선택 기준:

  • 순서가 중요하지 않고 성능이 중요하면 flatMap
  • 순서가 반드시 보장되어야 하면 concatMap
  • flatMap에 concurrency 파라미터를 설정하여 동시 실행 수를 제한할 수도 있습니다: flatMap(fn, 10) (최대 10개 동시)

참고 자료

공식 문서

아키텍처 및 패턴

Kafka 생태계

모니터링 및 운영

학습 자료

WebFlux + Kafka + ZooKeeper Mastery: The Complete Guide to Reactive Streaming Architecture

Introduction: Why Reactive Streaming Architecture Matters in 2026

Modern applications are expected to process millions of events per second, deliver real-time updates to users, and scale elastically under unpredictable load. Traditional request-response architectures with blocking I/O simply cannot meet these demands. In 2026, the combination of Spring WebFlux for non-blocking request handling, Apache Kafka for durable event streaming, and the transition from ZooKeeper to KRaft for simplified cluster management represents the gold standard for reactive streaming systems.

This guide covers the full stack: from reactive programming fundamentals to production-grade architecture patterns. Whether you are building a real-time analytics dashboard, a financial trading platform, or an IoT data pipeline, you will find actionable knowledge here.

What You Will Learn

  • Reactive programming principles and the Reactive Streams specification
  • Spring WebFlux internals: Netty event loop, Mono/Flux, operator chains, and R2DBC
  • Apache Kafka architecture: partitions, consumer groups, Exactly-Once Semantics, Kafka Streams, and Connect
  • The ZooKeeper to KRaft migration: why, when, and how
  • Streaming API comparison: SSE vs WebSocket vs gRPC Streaming
  • Integration patterns: WebFlux + Reactor Kafka + CQRS + Event Sourcing
  • A complete real-world project: real-time stock ticker system

1. Reactive Programming Fundamentals

1.1 The Reactive Manifesto

The Reactive Manifesto defines four key properties that reactive systems must exhibit:

PropertyDescriptionImplementation Example
ResponsiveSystem responds in a timely mannerWebFlux non-blocking I/O
ResilientSystem stays responsive in the face of failureCircuit breaker, bulkhead patterns
ElasticSystem stays responsive under varying workloadKafka partition scaling
Message DrivenSystem relies on asynchronous message passingKafka topics, Reactor event streams

These four properties are not independent. Message-driven architecture enables elasticity and resilience, which together enable responsiveness.

1.2 Reactive Streams Specification

Reactive Streams is a standard for asynchronous stream processing with non-blocking backpressure. It defines four core interfaces:

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}

public interface Subscription {
    void request(long n);
    void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

The critical innovation is the request(long n) method in Subscription. This enables backpressure: the subscriber controls the rate of data flow, preventing the producer from overwhelming the consumer.

1.3 Understanding Backpressure

Backpressure is the mechanism by which a data consumer signals to a producer how much data it can handle. Without backpressure, fast producers overwhelm slow consumers, leading to memory exhaustion or dropped messages.

Producer (1000 msg/s) --> Buffer (overflows!) --> Consumer (100 msg/s)

With backpressure:
Producer (adjusts to 100 msg/s) <-- request(100) -- Consumer (100 msg/s)

Backpressure strategies in Project Reactor:

Flux.range(1, Integer.MAX_VALUE)
    .onBackpressureBuffer(256)          // Buffer up to 256 items
    .onBackpressureDrop(dropped ->      // Drop when buffer full
        log.warn("Dropped: {}", dropped))
    .onBackpressureLatest()             // Keep only latest item
    .onBackpressureError()              // Signal error when overwhelmed
    .subscribe(item -> processSlowly(item));

1.4 Reactive Libraries Landscape

LibraryLanguage/PlatformKey FeaturesAdoption
Project ReactorJavaMono/Flux, Spring integrationSpring ecosystem
RxJava 3JavaObservable/Flowable, Android supportAndroid, legacy Java
MutinyJavaUni/Multi, Quarkus-nativeQuarkus ecosystem
Kotlin FlowKotlinCoroutine-based, structured concurrencyKotlin-first projects
Akka StreamsScala/JavaActor-based, graph DSLHigh-throughput systems

2. Spring WebFlux Deep Dive

2.1 WebFlux vs Spring MVC Architecture

Spring MVC uses a thread-per-request model. Each incoming request occupies one thread from the pool until the response is fully written. Under high concurrency with I/O-bound workloads, threads are mostly idle waiting for database queries, HTTP calls, or file reads.

Spring WebFlux uses an event-loop model powered by Netty. A small number of event loop threads handle all requests. When I/O is needed, the request is parked (not blocking a thread), and the event loop moves on to handle other requests. When the I/O completes, the event loop picks up the response.

Spring MVC (Thread-per-request):
  Thread-1: [Request] --> [DB Wait......] --> [Response]
  Thread-2: [Request] --> [API Wait.........] --> [Response]
  Thread-3: [Request] --> [File Wait....] --> [Response]
  (200 threads = 200 concurrent requests)

Spring WebFlux (Event Loop):
  EventLoop-1: [Req1][Req2][Req3][Req1-resume][Req4][Req2-resume]...
  EventLoop-2: [Req5][Req6][Req5-resume][Req7][Req8]...
  (4 threads = thousands of concurrent requests)

2.2 Netty Event Loop Model

Netty is the default embedded server for WebFlux. Understanding its architecture is essential for performance tuning.

                    ┌─────────────────────────┐
Boss EventLoopGroup                       (accepts connections)                    └────────────┬────────────┘
                    ┌────────────▼────────────┐
Worker EventLoopGroup                       (handles I/O events)                    ├─────────────────────────┤
EventLoop-1  Channel-AEventLoop-2  Channel-BEventLoop-3  Channel-CEventLoop-4  Channel-D                    └─────────────────────────┘

Key rules for working with Netty event loops:

  1. Never block an event loop thread — no Thread.sleep(), no blocking I/O, no synchronized locks
  2. Offload CPU-intensive work to bounded elastic schedulers
  3. Channel affinity — each channel is bound to one event loop for its lifetime

2.3 Mono and Flux: The Core Types

Mono represents 0 or 1 element. Flux represents 0 to N elements.

// Mono: single value or empty
Mono<User> findUser = userRepository.findById(userId);
Mono<Void> saveResult = userRepository.save(user);

// Flux: stream of values
Flux<Order> orders = orderRepository.findByUserId(userId);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<ServerSentEvent<String>> sseStream = Flux.create(sink -> {
    // push events to sink
});

Creating Mono and Flux:

// Static factory methods
Mono.just("hello");
Mono.empty();
Mono.error(new RuntimeException("failure"));
Mono.fromCallable(() -> expensiveComputation());
Mono.defer(() -> Mono.just(dynamicValue()));

Flux.just("a", "b", "c");
Flux.fromIterable(myList);
Flux.range(1, 100);
Flux.interval(Duration.ofMillis(500));
Flux.merge(flux1, flux2);
Flux.concat(flux1, flux2);
Flux.zip(flux1, flux2, (a, b) -> a + b);

2.4 Essential Operators

// Transformation
flux.map(String::toUpperCase)
    .flatMap(name -> userService.findByName(name))   // async 1:N
    .concatMap(user -> enrichUser(user))              // sequential async
    .switchMap(query -> search(query))                // cancel previous
    .flatMapSequential(id -> fetchById(id))           // ordered parallel

// Filtering
flux.filter(user -> user.isActive())
    .distinct()
    .take(10)
    .skip(5)
    .takeUntil(event -> event.isTerminal())

// Error Handling
flux.onErrorReturn("default")
    .onErrorResume(ex -> fallbackFlux())
    .retry(3)
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
        .maxBackoff(Duration.ofSeconds(30))
        .filter(ex -> ex instanceof TransientException))
    .timeout(Duration.ofSeconds(5))

// Combining
Flux.merge(fastSource, slowSource)          // interleaved
Flux.concat(firstBatch, secondBatch)        // sequential
Flux.zip(names, ages, Person::new)          // paired
Mono.zip(userMono, profileMono, ordersMono) // parallel join
    .map(tuple -> buildResponse(tuple.getT1(), tuple.getT2(), tuple.getT3()))

// Side Effects (for logging/debugging only)
flux.doOnNext(item -> log.debug("Processing: {}", item))
    .doOnError(ex -> log.error("Error: {}", ex.getMessage()))
    .doOnComplete(() -> log.info("Stream completed"))
    .doOnSubscribe(sub -> log.info("Subscribed"))

2.5 WebFlux Controller and Functional Endpoints

Annotation-based (familiar MVC style):

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
        return userService.findById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamUsers() {
        return userService.streamAllUsers();
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<User> createUser(@Valid @RequestBody Mono<User> userMono) {
        return userMono.flatMap(userService::create);
    }
}

Functional endpoints (router + handler):

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
            .path("/api/users", builder -> builder
                .GET("/{id}", handler::getUser)
                .GET("", handler::listUsers)
                .POST("", handler::createUser)
                .PUT("/{id}", handler::updateUser)
                .DELETE("/{id}", handler::deleteUser))
            .build();
    }
}

@Component
public class UserHandler {

    private final UserService userService;

    public Mono<ServerResponse> getUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> listUsers(ServerRequest request) {
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(userService.findAll(), User.class);
    }
}

2.6 R2DBC: Reactive Database Access

R2DBC (Reactive Relational Database Connectivity) brings non-blocking database access to WebFlux applications.

@Configuration
@EnableR2dbcRepositories
public class DatabaseConfig extends AbstractR2dbcConfiguration {

    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return ConnectionFactories.get(ConnectionFactoryOptions.builder()
            .option(DRIVER, "postgresql")
            .option(HOST, "localhost")
            .option(PORT, 5432)
            .option(DATABASE, "mydb")
            .option(USER, "user")
            .option(PASSWORD, "password")
            .option(MAX_SIZE, 20)
            .build());
    }
}

public interface UserRepository extends ReactiveCrudRepository<User, String> {

    Flux<User> findByStatusOrderByCreatedAtDesc(String status);

    @Query("SELECT * FROM users WHERE email = :email")
    Mono<User> findByEmail(String email);

    @Query("SELECT * FROM users WHERE department = :dept")
    Flux<User> findByDepartment(String dept);
}

R2DBC with DatabaseClient for complex queries:

@Repository
public class CustomUserRepository {

    private final DatabaseClient databaseClient;

    public Flux<User> searchUsers(String keyword, int limit) {
        return databaseClient.sql(
                "SELECT * FROM users WHERE name ILIKE :keyword LIMIT :limit")
            .bind("keyword", "%" + keyword + "%")
            .bind("limit", limit)
            .map(row -> new User(
                row.get("id", String.class),
                row.get("name", String.class),
                row.get("email", String.class)
            ))
            .all();
    }
}

2.7 WebFlux Performance Benchmarks

Benchmark conditions: 4-core CPU, 8GB RAM, 10,000 concurrent connections, I/O-bound workload (50ms database latency per request).

MetricSpring MVC (Tomcat)Spring WebFlux (Netty)Improvement
Throughput (req/s)2,40018,5007.7x
Avg Latency (ms)4,20054087% lower
P99 Latency (ms)12,0001,20090% lower
Memory Usage (MB)2,10048077% lower
Threads Active200896% fewer
Max Concurrent Requests20050,000+250x

When NOT to use WebFlux:

  • CPU-bound workloads (compression, encryption, heavy computation)
  • Applications already performing well with MVC
  • Teams without reactive programming experience
  • When blocking libraries are unavoidable (JDBC without R2DBC wrapper)
  • Simple CRUD applications with low concurrency

3. Apache Kafka: The Complete Guide

3.1 Kafka Architecture Overview

Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day. Its architecture is built around a few key concepts:

┌─────────────────────────────────────────────────────────────┐
Kafka Cluster│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                  │
│  │ Broker 1 │  │ Broker 2 │  │ Broker 3 │                  │
│  │          │  │          │  │          │                  │
│  │ Topic-A  │  │ Topic-A  │  │ Topic-A  │                  │
│  │ Part-0   │  │ Part-1   │  │ Part-2   │                  │
 (leader) (leader) (leader) │                  │
│  │          │  │          │  │          │                  │
│  │ Topic-A  │  │ Topic-A  │  │ Topic-A  │                  │
│  │ Part-1   │  │ Part-2   │  │ Part-0   │                  │
 (replica) (replica) (replica)│                  │
│  └──────────┘  └──────────┘  └──────────┘                  │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Metadata: ZooKeeper (legacy) or KRaft (Kafka 3.5+) │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Core Concepts:

ConceptDescription
TopicNamed feed of messages, similar to a database table
PartitionOrdered, immutable sequence of records within a topic
OffsetSequential ID for each record within a partition
BrokerA Kafka server that stores data and serves clients
ProducerClient that publishes records to topics
ConsumerClient that reads records from topics
Consumer GroupSet of consumers that cooperatively consume from topics
ReplicationEach partition is replicated across multiple brokers for durability
ISRIn-Sync Replicas: replicas that are caught up with the leader

3.2 Producer Configuration and Best Practices

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // Reliability settings
        config.put(ProducerConfig.ACKS_CONFIG, "all");           // Wait for all ISR replicas
        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Performance tuning
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);     // 32KB batch
        config.put(ProducerConfig.LINGER_MS_CONFIG, 20);         // Wait 20ms for batching
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Sending messages with different patterns:

@Service
@RequiredArgsConstructor
public class OrderEventProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    // Fire-and-forget (highest throughput)
    public void sendFireAndForget(OrderEvent event) {
        kafkaTemplate.send("orders", event.getOrderId(), event);
    }

    // Synchronous send (highest reliability)
    public void sendSync(OrderEvent event) throws Exception {
        kafkaTemplate.send("orders", event.getOrderId(), event)
            .get(10, TimeUnit.SECONDS);
    }

    // Asynchronous send with callback (balanced)
    public CompletableFuture<SendResult<String, Object>> sendAsync(OrderEvent event) {
        return kafkaTemplate.send("orders", event.getOrderId(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to send order event: {}", event.getOrderId(), ex);
                } else {
                    log.info("Order event sent to partition {} offset {}",
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }

    // Send with headers
    public void sendWithHeaders(OrderEvent event) {
        ProducerRecord<String, Object> record = new ProducerRecord<>("orders",
            null, null, event.getOrderId(), event);
        record.headers()
            .add("event-type", "ORDER_CREATED".getBytes())
            .add("source", "order-service".getBytes())
            .add("correlation-id", UUID.randomUUID().toString().getBytes());
        kafkaTemplate.send(record);
    }
}

3.3 Consumer Configuration and Patterns

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        // Offset management
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit

        // Performance tuning
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()),
            new FixedBackOff(1000L, 3)
        ));
        return factory;
    }
}

Consumer listener patterns:

@Component
@RequiredArgsConstructor
public class OrderEventConsumer {

    private final OrderService orderService;

    // Single record processing
    @KafkaListener(topics = "orders", groupId = "order-processing-group")
    public void processOrder(ConsumerRecord<String, OrderEvent> record,
                             Acknowledgment ack) {
        try {
            log.info("Processing order: {} from partition: {} offset: {}",
                record.value().getOrderId(),
                record.partition(),
                record.offset());
            orderService.process(record.value());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process order: {}", record.value().getOrderId(), e);
            // Don't acknowledge — will be retried by error handler
        }
    }

    // Batch processing
    @KafkaListener(topics = "analytics-events", groupId = "analytics-batch-group",
                   containerFactory = "batchKafkaListenerContainerFactory")
    public void processBatch(List<ConsumerRecord<String, AnalyticsEvent>> records,
                             Acknowledgment ack) {
        log.info("Processing batch of {} records", records.size());
        try {
            List<AnalyticsEvent> events = records.stream()
                .map(ConsumerRecord::value)
                .collect(Collectors.toList());
            analyticsService.bulkInsert(events);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Batch processing failed", e);
        }
    }

    // Filtering with headers
    @KafkaListener(topics = "orders",
                   filter = "orderCreatedFilter",
                   groupId = "notification-group")
    public void handleOrderCreated(OrderEvent event) {
        notificationService.sendOrderConfirmation(event);
    }
}

3.4 Exactly-Once Semantics (EOS)

Kafka supports exactly-once semantics through the combination of idempotent producers and transactional APIs.

@Configuration
public class KafkaTransactionalConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }
}

@Service
@RequiredArgsConstructor
public class TransactionalOrderService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional("kafkaTransactionManager")
    public void processOrderTransactionally(OrderEvent order) {
        // All messages in this method are sent atomically
        kafkaTemplate.send("orders-processed", order.getOrderId(), order);
        kafkaTemplate.send("inventory-updates", order.getProductId(),
            new InventoryEvent(order.getProductId(), -order.getQuantity()));
        kafkaTemplate.send("payment-requests", order.getOrderId(),
            new PaymentEvent(order.getOrderId(), order.getTotalAmount()));
        // If any send fails, all are rolled back
    }
}

Three levels of delivery guarantees:

GuaranteeConfigurationUse Case
At-most-onceacks=0, auto-commitMetrics, logging
At-least-onceacks=all, manual commit, idempotent consumerMost applications
Exactly-onceTransactional API + read-committed isolationFinancial, billing, inventory

3.5 Kafka Streams

Kafka Streams is a client library for building applications and microservices where the input and output data are stored in Kafka.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        config.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
        return new KafkaStreamsConfiguration(config);
    }
}

@Component
public class OrderAnalyticsTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder) {
        // Real-time order analytics pipeline
        KStream<String, OrderEvent> orders = streamsBuilder
            .stream("orders", Consumed.with(Serdes.String(), orderEventSerde()));

        // 1. Order count per product in tumbling windows
        KTable<Windowed<String>, Long> productCounts = orders
            .groupBy((key, order) -> order.getProductId(),
                Grouped.with(Serdes.String(), orderEventSerde()))
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .count(Materialized.as("product-counts-store"));

        // 2. Revenue aggregation per category
        KTable<String, Double> revenueByCategory = orders
            .groupBy((key, order) -> order.getCategory(),
                Grouped.with(Serdes.String(), orderEventSerde()))
            .aggregate(
                () -> 0.0,
                (key, order, total) -> total + order.getTotalAmount(),
                Materialized.with(Serdes.String(), Serdes.Double())
            );

        // 3. Join with customer data for enrichment
        KTable<String, CustomerProfile> customers = streamsBuilder
            .table("customer-profiles",
                Consumed.with(Serdes.String(), customerProfileSerde()));

        KStream<String, EnrichedOrder> enrichedOrders = orders
            .selectKey((key, order) -> order.getCustomerId())
            .join(customers,
                (order, customer) -> new EnrichedOrder(order, customer),
                Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde()));

        enrichedOrders.to("enriched-orders",
            Produced.with(Serdes.String(), enrichedOrderSerde()));

        // 4. Fraud detection with session windows
        orders
            .filter((key, order) -> order.getTotalAmount() > 1000)
            .groupByKey()
            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
            .count()
            .toStream()
            .filter((windowedKey, count) -> count >= 3)
            .mapValues((key, count) -> "ALERT: " + count + " high-value orders in session")
            .to("fraud-alerts");
    }
}

3.6 Kafka Connect

Kafka Connect is a framework for connecting Kafka with external systems like databases, key-value stores, search indexes, and file systems.

{
  "name": "postgresql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "password",
    "database.dbname": "orders_db",
    "database.server.name": "order-server",
    "table.include.list": "public.orders,public.order_items",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "topic.prefix": "cdc",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "cdc-$3"
  }
}

Elasticsearch sink connector:

{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "enriched-orders",
    "type.name": "_doc",
    "key.ignore": false,
    "schema.ignore": true,
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "batch.size": 200,
    "max.buffered.records": 5000,
    "flush.timeout.ms": 10000
  }
}

3.7 Schema Registry

Schema Registry provides a centralized repository for message schemas, enabling schema evolution and compatibility checking.

// Producer with Avro schema
@Configuration
public class AvroProducerConfig {

    @Bean
    public ProducerFactory<String, GenericRecord> avroProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        config.put("schema.registry.url", "http://schema-registry:8081");
        config.put("auto.register.schemas", true);
        return new DefaultKafkaProducerFactory<>(config);
    }
}

Schema evolution compatibility modes:

ModeAllowed ChangesBest For
BACKWARDDelete fields, add optional fieldsConsumer-first upgrades
FORWARDAdd fields, delete optional fieldsProducer-first upgrades
FULLAdd/delete optional fields onlyIndependent upgrades
NONEAny change allowedDevelopment only

3.8 Kafka Performance Tuning Cheat Sheet

ParameterDefaultRecommendedImpact
num.partitions16-12 per topicParallelism
replication.factor13Durability
min.insync.replicas12Write durability
batch.size (producer)1638432768-65536Throughput vs latency
linger.ms (producer)05-100Batching efficiency
compression.typenonezstd or lz4Network/disk savings
fetch.min.bytes (consumer)11024-16384Throughput vs latency
max.poll.records (consumer)500100-1000Processing batch size
segment.bytes1073741824536870912Log compaction frequency
retention.ms604800000Based on use caseStorage vs replay capability

4. ZooKeeper to KRaft: The Great Migration

4.1 ZooKeeper's Role in Kafka

ZooKeeper has been Kafka's metadata management system since the beginning. It handles:

  • Broker registration: tracking which brokers are alive
  • Topic metadata: partition assignments, ISR lists, configurations
  • Controller election: choosing which broker acts as the cluster controller
  • Consumer group coordination (legacy): storing consumer offsets (moved to Kafka itself in 0.9+)
  • ACLs: access control lists for authorization

4.2 Why ZooKeeper Had to Go

ProblemDescription
Operational complexityTwo distinct distributed systems to deploy, monitor, and maintain
Scaling bottleneckZooKeeper stores all metadata in memory; limits cluster to about 200K partitions
Split-brain risksNetwork partitions can cause ZooKeeper and Kafka to disagree
Recovery timeController failover requires reading all metadata from ZooKeeper (minutes)
Expertise requirementTeams need deep knowledge of both Kafka and ZooKeeper

4.3 KRaft: Kafka's Built-in Consensus

KRaft (Kafka Raft) replaces ZooKeeper with a built-in Raft-based consensus protocol. Metadata is stored in an internal Kafka topic (__cluster_metadata), and a set of controller nodes manage cluster state.

ZooKeeper Mode:                        KRaft Mode:
┌────────┐    ┌────────────┐           ┌──────────────────────┐
Broker │───▶│ ZooKeeper  │           │ Controller (Raft)Broker │───▶│ Ensemble   │           │  Node 1 (leader)Broker │───▶│ (3-5 nodes)│           │  Node 2 (follower)└────────┘    └────────────┘           │  Node 3 (follower)                                       └──────────┬───────────┘
                                       ┌──────────▼───────────┐
BrokersBroker 1Broker 2Broker 3                                       └──────────────────────┘

KRaft advantages:

FeatureZooKeeper ModeKRaft Mode
Max partitions~200,000Millions
Controller failoverMinutesSeconds
Metadata propagationAsynchronous (laggy)Event-driven (immediate)
DeploymentTwo systemsOne system
Operational overheadHighLow
Shutdown timeMinutesSeconds

4.4 KIP-500: The Roadmap

KIP-500 ("Replace ZooKeeper with a Self-Managed Metadata Quorum") defined the multi-year migration plan:

Kafka VersionMilestoneYear
2.8Early access KRaft (development only)2021
3.3KRaft marked production-ready2022
3.5ZooKeeper migration tool available2023
3.7Bridge release (dual support)2024
4.0ZooKeeper support removed entirely2025

4.5 Migration Guide: ZooKeeper to KRaft

Step 1: Pre-migration checklist

# Verify Kafka version (must be 3.5+)
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | head -1

# Check current ZooKeeper metadata
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
  --cluster-id $(kafka-storage.sh random-uuid)

# Verify no deprecated features in use
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers

Step 2: Configure KRaft controllers

# kraft/server.properties for controller nodes
process.roles=controller
node.id=1
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093
log.dirs=/var/kraft-controller-logs

Step 3: Run the migration

# Format controller storage
kafka-storage.sh format \
  --config kraft/server.properties \
  --cluster-id $(kafka-storage.sh random-uuid) \
  --release-version 3.7

# Start migration (ZooKeeper mode -> dual mode)
kafka-metadata.sh --migrate \
  --zookeeper-connect zk1:2181,zk2:2181,zk3:2181 \
  --bootstrap-server kafka1:9092

# Verify migration status
kafka-metadata.sh --status \
  --bootstrap-server kafka1:9092

# Finalize (point of no return)
kafka-metadata.sh --finalize \
  --bootstrap-server kafka1:9092

Step 4: Post-migration validation

# Verify all topics are accessible
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Verify consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Check controller status
kafka-metadata.sh --controllers \
  --bootstrap-server localhost:9092

4.6 ZAB vs Raft: Protocol Comparison

AspectZAB (ZooKeeper)Raft (KRaft)
Leader electionEpoch-basedTerm-based
Log replicationTwo-phase commitAppend entries
ConsistencySequential consistencyLinearizable reads (optional)
MembershipStatic configDynamic reconfiguration
SnapshotFuzzy snapshotsConsistent snapshots
ComplexityHighModerate (by design)
ImplementationSeparate systemIntegrated into Kafka

5. Streaming API Comparison: SSE vs WebSocket vs gRPC Streaming

5.1 Server-Sent Events (SSE)

SSE is a simple, HTTP-based protocol for server-to-client push. The server sends events over a long-lived HTTP connection using the text/event-stream content type.

WebFlux SSE endpoint:

@RestController
@RequestMapping("/api/stream")
public class SSEController {

    private final StockPriceService stockPriceService;

    @GetMapping(value = "/prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamPrices(
            @RequestParam List<String> symbols) {
        return stockPriceService.getPriceStream(symbols)
            .map(price -> ServerSentEvent.<StockPrice>builder()
                .id(String.valueOf(price.getTimestamp()))
                .event("price-update")
                .data(price)
                .retry(Duration.ofSeconds(5))
                .build());
    }

    @GetMapping(value = "/heartbeat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> heartbeat() {
        return Flux.interval(Duration.ofSeconds(30))
            .map(seq -> ServerSentEvent.<String>builder()
                .event("heartbeat")
                .data("ping")
                .build());
    }
}

JavaScript client:

const eventSource = new EventSource('/api/stream/prices?symbols=AAPL,GOOG,MSFT')

eventSource.addEventListener('price-update', (event) => {
  const price = JSON.parse(event.data)
  updateUI(price)
})

eventSource.addEventListener('heartbeat', () => {
  console.log('Connection alive')
})

eventSource.onerror = (error) => {
  console.error('SSE error:', error)
  // Browser automatically reconnects
}

5.2 WebSocket

WebSocket provides full-duplex communication over a single TCP connection. Both client and server can send messages at any time.

WebFlux WebSocket handler:

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping webSocketMapping(StockWebSocketHandler handler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/stocks", handler);
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Component
@RequiredArgsConstructor
public class StockWebSocketHandler implements WebSocketHandler {

    private final StockPriceService stockPriceService;
    private final ObjectMapper objectMapper;

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // Incoming messages (subscriptions)
        Flux<String> incoming = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(msg -> log.info("Received subscription: {}", msg));

        // Outgoing price updates
        Flux<WebSocketMessage> outgoing = incoming
            .flatMap(msg -> {
                SubscriptionRequest req = parseRequest(msg);
                return stockPriceService.getPriceStream(req.getSymbols());
            })
            .map(price -> {
                try {
                    String json = objectMapper.writeValueAsString(price);
                    return session.textMessage(json);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });

        return session.send(outgoing);
    }
}

JavaScript client:

const ws = new WebSocket('ws://localhost:8080/ws/stocks')

ws.onopen = () => {
  ws.send(
    JSON.stringify({
      action: 'subscribe',
      symbols: ['AAPL', 'GOOG', 'MSFT'],
    })
  )
}

ws.onmessage = (event) => {
  const price = JSON.parse(event.data)
  updateUI(price)
}

ws.onclose = (event) => {
  console.log('WebSocket closed:', event.code, event.reason)
  // Manual reconnection needed
  setTimeout(reconnect, 3000)
}

5.3 gRPC Streaming

gRPC supports four communication patterns: unary, server streaming, client streaming, and bidirectional streaming.

Proto definition:

syntax = "proto3";

package stocks;

service StockService {
    // Unary
    rpc GetPrice (PriceRequest) returns (StockPrice);

    // Server streaming
    rpc StreamPrices (PriceStreamRequest) returns (stream StockPrice);

    // Client streaming
    rpc RecordTrades (stream TradeRecord) returns (TradeSummary);

    // Bidirectional streaming
    rpc LiveTrading (stream TradeOrder) returns (stream TradeConfirmation);
}

message PriceStreamRequest {
    repeated string symbols = 1;
    int32 interval_ms = 2;
}

message StockPrice {
    string symbol = 1;
    double price = 2;
    double change = 3;
    int64 timestamp = 4;
    int64 volume = 5;
}

Server-side implementation (Spring Boot + grpc-spring-boot-starter):

@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {

    private final StockPriceService stockPriceService;

    @Override
    public void streamPrices(PriceStreamRequest request,
                             StreamObserver<StockPrice> responseObserver) {
        stockPriceService.getPriceStream(request.getSymbolsList())
            .subscribe(
                price -> responseObserver.onNext(toProto(price)),
                error -> responseObserver.onError(
                    Status.INTERNAL.withDescription(error.getMessage()).asException()),
                responseObserver::onCompleted
            );
    }

    @Override
    public StreamObserver<TradeOrder> liveTrading(
            StreamObserver<TradeConfirmation> responseObserver) {
        return new StreamObserver<TradeOrder>() {
            @Override
            public void onNext(TradeOrder order) {
                TradeConfirmation confirmation = executeTrade(order);
                responseObserver.onNext(confirmation);
            }

            @Override
            public void onError(Throwable t) {
                log.error("Client error in live trading", t);
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

5.4 Comparison Table

FeatureSSEWebSocketgRPC Streaming
ProtocolHTTP/1.1WS (TCP)HTTP/2
DirectionServer to clientBidirectionalAll 4 patterns
Data FormatText (UTF-8)Text or BinaryProtobuf (binary)
Auto-reconnectBuilt-in (browser)ManualManual
BackpressureLimitedNone (native)Built-in (flow control)
Browser SupportAll modern browsersAll modern browsersgrpc-web (limited)
Firewall FriendlyYes (standard HTTP)Sometimes blockedRequires HTTP/2
MultiplexingNoNoYes (HTTP/2)
Max Connections6 per domain (HTTP/1)No limitMultiplexed
CompressionStandard HTTPPer-message (optional)Built-in
LatencyLowLowestLow
ThroughputModerateHighHighest
SerializationJSON (text)JSON or custom binaryProtobuf (efficient)
Use CaseNotifications, feedsChat, gaming, collabMicroservices, IoT

5.5 When to Choose What

  • SSE: Dashboard updates, news feeds, stock tickers (read-only), notification streams. Simple to implement, works through proxies, auto-reconnects.
  • WebSocket: Chat applications, collaborative editing, multiplayer games. Need bidirectional communication with low latency.
  • gRPC Streaming: Microservice-to-microservice communication, IoT data ingestion, high-throughput internal APIs. Need schema enforcement, efficient serialization, and HTTP/2 features.

6. WebFlux + Kafka Integration

6.1 Reactor Kafka

Reactor Kafka provides a reactive API for Kafka, integrating seamlessly with Spring WebFlux.

@Configuration
public class ReactorKafkaConfig {

    @Bean
    public ReceiverOptions<String, String> receiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return ReceiverOptions.<String, String>create(props)
            .subscription(Collections.singleton("events"))
            .addAssignListener(partitions ->
                log.info("Assigned: {}", partitions))
            .addRevokeListener(partitions ->
                log.info("Revoked: {}", partitions))
            .commitInterval(Duration.ofSeconds(5))
            .commitBatchSize(100);
    }

    @Bean
    public SenderOptions<String, String> senderOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        return SenderOptions.create(props);
    }

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        return KafkaReceiver.create(receiverOptions());
    }

    @Bean
    public KafkaSender<String, String> kafkaSender() {
        return KafkaSender.create(senderOptions());
    }
}

Reactive consumer with backpressure:

@Service
@RequiredArgsConstructor
public class ReactiveEventProcessor {

    private final KafkaReceiver<String, String> kafkaReceiver;
    private final EventService eventService;

    @PostConstruct
    public void startConsuming() {
        kafkaReceiver.receive()
            .groupBy(record -> record.receiverOffset().topicPartition())
            .flatMap(partitionFlux -> partitionFlux
                .publishOn(Schedulers.boundedElastic())
                .concatMap(record -> processRecord(record)
                    .doOnSuccess(v -> record.receiverOffset().acknowledge())
                    .onErrorResume(e -> {
                        log.error("Error processing record: {}", record.key(), e);
                        record.receiverOffset().acknowledge();
                        return Mono.empty();
                    })
                ))
            .subscribe();
    }

    private Mono<Void> processRecord(ReceiverRecord<String, String> record) {
        return eventService.process(record.value())
            .doOnNext(result ->
                log.debug("Processed: partition={} offset={} key={}",
                    record.partition(), record.offset(), record.key()));
    }
}

Reactive producer:

@Service
@RequiredArgsConstructor
public class ReactiveEventProducer {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;

    public Mono<Void> sendEvent(String topic, String key, Object event) {
        return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
            .flatMap(json -> {
                SenderRecord<String, String, String> record = SenderRecord.create(
                    new ProducerRecord<>(topic, key, json), key);
                return kafkaSender.send(Mono.just(record))
                    .next()
                    .doOnNext(result -> log.info("Sent to {}-{} offset {}",
                        result.recordMetadata().topic(),
                        result.recordMetadata().partition(),
                        result.recordMetadata().offset()))
                    .then();
            });
    }

    public Flux<SenderResult<String>> sendBatch(String topic, List<Event> events) {
        Flux<SenderRecord<String, String, String>> records = Flux.fromIterable(events)
            .map(event -> {
                String json = objectMapper.writeValueAsString(event);
                return SenderRecord.create(
                    new ProducerRecord<>(topic, event.getId(), json), event.getId());
            });

        return kafkaSender.send(records)
            .doOnError(e -> log.error("Batch send failed", e));
    }
}

6.2 CQRS + Event Sourcing with WebFlux and Kafka

CQRS (Command Query Responsibility Segregation) separates read and write models. Combined with Event Sourcing, every state change is stored as an immutable event.

┌─────────────────────────────────────────────────────────┐
CQRS + Event Sourcing│                                                         │
Command Side                    Query Side│  ┌──────────┐                    ┌──────────┐           │
│  │ WebFlux  │                    │ WebFlux  │           │
│  │ Command  │                    │ Query    │           │
│  │ Handler  │                    │ Handler  │           │
│  └────┬─────┘                    └────▲─────┘           │
│       │                               │                 │
│  ┌────▼─────┐                    ┌────┴─────┐           │
│  │ Domain   │                    │ Read     │           │
│  │ Aggregate│Model    │           │
│  └────┬─────┘                     (R2DBC)  │           │
│       │                          └────▲─────┘           │
│  ┌────▼─────────────────────────────┐ │                 │
│  │         Kafka (Event Store)      ├─┘                 │
│  │  orders.events topic             │                   │
│  └──────────────────────────────────┘                   │
└─────────────────────────────────────────────────────────┘

Command handler:

@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderCommandController {

    private final OrderCommandService commandService;

    @PostMapping
    @ResponseStatus(HttpStatus.ACCEPTED)
    public Mono<OrderId> createOrder(@RequestBody Mono<CreateOrderCommand> command) {
        return command.flatMap(commandService::handle);
    }

    @PostMapping("/{orderId}/confirm")
    public Mono<ResponseEntity<Void>> confirmOrder(@PathVariable String orderId) {
        return commandService.handle(new ConfirmOrderCommand(orderId))
            .then(Mono.just(ResponseEntity.accepted().build()));
    }
}

@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;

    public Mono<OrderId> handle(CreateOrderCommand command) {
        // Validate command
        OrderAggregate aggregate = OrderAggregate.create(command);
        List<DomainEvent> events = aggregate.getUncommittedEvents();

        // Publish events to Kafka
        return Flux.fromIterable(events)
            .flatMap(event -> publishEvent("order-events", aggregate.getId(), event))
            .then(Mono.just(new OrderId(aggregate.getId())));
    }

    private Mono<Void> publishEvent(String topic, String key, DomainEvent event) {
        return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
            .flatMap(json -> {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, json);
                record.headers().add("event-type", event.getType().getBytes());
                return kafkaSender.send(Mono.just(SenderRecord.create(record, key)))
                    .next()
                    .then();
            });
    }
}

Event projection (read-side updater):

@Service
@RequiredArgsConstructor
public class OrderProjectionService {

    private final KafkaReceiver<String, String> kafkaReceiver;
    private final OrderReadRepository readRepository;
    private final ObjectMapper objectMapper;

    @PostConstruct
    public void startProjection() {
        kafkaReceiver.receive()
            .concatMap(record -> {
                String eventType = new String(
                    record.headers().lastHeader("event-type").value());
                return processEvent(eventType, record.value())
                    .doOnSuccess(v -> record.receiverOffset().acknowledge());
            })
            .subscribe();
    }

    private Mono<Void> processEvent(String eventType, String payload) {
        return switch (eventType) {
            case "OrderCreated" -> {
                OrderCreatedEvent event = objectMapper.readValue(payload, OrderCreatedEvent.class);
                yield readRepository.save(OrderReadModel.from(event)).then();
            }
            case "OrderConfirmed" -> {
                OrderConfirmedEvent event = objectMapper.readValue(payload, OrderConfirmedEvent.class);
                yield readRepository.findById(event.getOrderId())
                    .map(model -> model.withStatus("CONFIRMED"))
                    .flatMap(readRepository::save)
                    .then();
            }
            default -> Mono.empty();
        };
    }
}

7. Real-World Project: Real-Time Stock Ticker System

7.1 System Architecture

┌──────────────────────────────────────────────────────────────────┐
Real-Time Stock Ticker System│                                                                  │
│  ┌─────────────┐     ┌──────────────┐     ┌─────────────────┐   │
│  │ Market Data │     │   Kafka      │     │   WebFlux       │   │
│  │ Providers   │────▶│   Cluster    │────▶│   API Gateway   │   │
 (WebSocket) │     │              │     │                 │   │
│  └─────────────┘     │ raw-prices   │     │  SSE /prices    │   │
│                      │ processed    │     │  WS  /ws/trade  │   │
│  ┌─────────────┐     │ alerts       │     │  gRPC /internal │   │
│  │ News Feed   │────▶│              │     └───────┬─────────┘   │
 (REST API)  │     └──────┬───────┘             │             │
│  └─────────────┘            │              ┌──────▼─────────┐   │
│                      ┌──────▼───────┐      │   Clients      │   │
│                      │ Kafka Streams│Web Dashboard │   │
│                      │ Processing   │      │  Mobile App    │   │
│                      │              │      │  Trading Bot   │   │
│                      │ - VWAP calc  │      └────────────────┘   │
│                      │ - Anomaly    │                           │
│                      │ - Aggregate  │      ┌────────────────┐   │
│                      └──────┬───────┘      │   PostgreSQL   │   │
│                             │                 (R2DBC)      │   │
│                             └─────────────▶│   + Redis      │   │
│                                            └────────────────┘   │
└──────────────────────────────────────────────────────────────────┘

7.2 Market Data Ingestion Service

@Service
@RequiredArgsConstructor
@Slf4j
public class MarketDataIngestionService {

    private final KafkaSender<String, String> kafkaSender;
    private final ObjectMapper objectMapper;
    private final WebSocketClient webSocketClient;

    @PostConstruct
    public void connect() {
        URI uri = URI.create("wss://market-data-provider.example.com/stream");

        webSocketClient.execute(uri, session -> {
            // Subscribe to symbols
            Mono<Void> subscribe = session.send(
                Mono.just(session.textMessage(
                    "{\"action\":\"subscribe\",\"symbols\":[\"AAPL\",\"GOOG\",\"MSFT\",\"AMZN\"]}"
                ))
            );

            // Process incoming market data
            Flux<Void> receive = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .flatMap(this::publishToKafka)
                .doOnError(e -> log.error("Market data stream error", e));

            return subscribe.thenMany(receive).then();
        })
        .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
            .maxBackoff(Duration.ofMinutes(5)))
        .subscribe();
    }

    private Mono<Void> publishToKafka(String rawData) {
        try {
            MarketTick tick = objectMapper.readValue(rawData, MarketTick.class);
            ProducerRecord<String, String> record = new ProducerRecord<>(
                "raw-prices", tick.getSymbol(), rawData);
            record.headers()
                .add("source", "market-data-provider".getBytes())
                .add("timestamp", String.valueOf(tick.getTimestamp()).getBytes());

            return kafkaSender.send(Mono.just(SenderRecord.create(record, tick.getSymbol())))
                .next()
                .then();
        } catch (Exception e) {
            log.error("Failed to parse market data: {}", rawData, e);
            return Mono.empty();
        }
    }
}

7.3 Price Processing with Kafka Streams

@Component
public class PriceProcessingTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder builder) {
        KStream<String, String> rawPrices = builder.stream("raw-prices");

        // 1. Parse and validate
        KStream<String, StockPrice> validPrices = rawPrices
            .mapValues(this::parsePrice)
            .filter((symbol, price) -> price != null && price.getPrice() > 0);

        // 2. Calculate VWAP (Volume Weighted Average Price) per 1-minute window
        KTable<Windowed<String>, VWAPResult> vwap = validPrices
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .aggregate(
                VWAPAccumulator::new,
                (symbol, price, acc) -> acc.add(price.getPrice(), price.getVolume()),
                Materialized.<String, VWAPAccumulator, WindowStore<Bytes, byte[]>>
                    as("vwap-store")
                    .withValueSerde(vwapAccumulatorSerde())
            )
            .mapValues(VWAPAccumulator::result);

        // 3. Anomaly detection (price change > 5% in 30 seconds)
        validPrices
            .groupByKey()
            .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)))
            .aggregate(
                PriceRange::new,
                (symbol, price, range) -> range.update(price.getPrice()),
                Materialized.with(Serdes.String(), priceRangeSerde())
            )
            .toStream()
            .filter((key, range) -> range.percentChange() > 5.0)
            .mapValues((key, range) -> new PriceAlert(
                key.key(), range.percentChange(), "ANOMALY"))
            .to("alerts", Produced.with(windowedSerde(), priceAlertSerde()));

        // 4. Output processed prices
        validPrices
            .mapValues(price -> new ProcessedPrice(
                price.getSymbol(), price.getPrice(), price.getVolume(),
                price.getTimestamp(), System.currentTimeMillis()))
            .to("processed-prices", Produced.with(Serdes.String(), processedPriceSerde()));
    }
}

7.4 SSE Streaming Endpoint for Web Clients

@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {

    private final KafkaReceiver<String, String> processedPriceReceiver;
    private final ObjectMapper objectMapper;
    private final Sinks.Many<ProcessedPrice> priceSink;

    @PostConstruct
    public void init() {
        // Bridge Kafka to reactive sink
        processedPriceReceiver.receive()
            .map(record -> {
                record.receiverOffset().acknowledge();
                try {
                    return objectMapper.readValue(record.value(), ProcessedPrice.class);
                } catch (Exception e) {
                    return null;
                }
            })
            .filter(Objects::nonNull)
            .subscribe(price -> priceSink.tryEmitNext(price));
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<ProcessedPrice>> streamPrices(
            @RequestParam(required = false) List<String> symbols) {
        Flux<ProcessedPrice> priceFlux = priceSink.asFlux();

        if (symbols != null && !symbols.isEmpty()) {
            Set<String> symbolSet = new HashSet<>(symbols);
            priceFlux = priceFlux.filter(p -> symbolSet.contains(p.getSymbol()));
        }

        return priceFlux
            .map(price -> ServerSentEvent.<ProcessedPrice>builder()
                .id(price.getSymbol() + "-" + price.getTimestamp())
                .event("price")
                .data(price)
                .build())
            .doOnCancel(() -> log.info("Client disconnected from price stream"));
    }

    @GetMapping(value = "/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<PriceAlert>> streamAlerts() {
        return alertReceiver.receive()
            .map(record -> {
                record.receiverOffset().acknowledge();
                PriceAlert alert = objectMapper.readValue(record.value(), PriceAlert.class);
                return ServerSentEvent.<PriceAlert>builder()
                    .event("alert")
                    .data(alert)
                    .build();
            });
    }
}

7.5 Application Configuration

# application.yml
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/stockdb
    username: stockapp
    password: secret
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    consumer:
      group-id: stock-ticker-service
      auto-offset-reset: latest
      enable-auto-commit: false
    producer:
      acks: all
      compression-type: zstd
      batch-size: 32768
      properties:
        linger.ms: 10

  webflux:
    base-path: /api

server:
  port: 8080
  netty:
    connection-timeout: 5000

management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    tags:
      application: stock-ticker

7.6 Docker Compose for Local Development

version: '3.8'
services:
  kafka-1:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
    ports:
      - '9092:9092'

  kafka-2:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

  kafka-3:
    image: apache/kafka:3.7.0
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: stockdb
      POSTGRES_USER: stockapp
      POSTGRES_PASSWORD: secret
    ports:
      - '5432:5432'

  redis:
    image: redis:7-alpine
    ports:
      - '6379:6379'

8. Decision Matrix: When to Use What

8.1 Technology Selection Guide

ScenarioWebFluxKafkaSSEWebSocketgRPC
Real-time dashboardYesYesBestGoodOverkill
Chat applicationYesOptionalNoBestGood
IoT data pipelineYesBestNoGoodBest
E-commerce order processingOptionalBestNoNoGood
Live sports scoresYesYesBestGoodNo
Financial trading platformYesBestNoBestBest
Microservice event busNoBestNoNoGood
Server notificationsYesOptionalBestGoodNo
Video/audio streamingNoNoNoPartialGood
Log aggregation pipelineNoBestNoNoGood

8.2 Decision Flowchart

Start: Do you need real-time data?
├── NoTraditional REST API (Spring MVC) is fine
└── YesIs it server-to-client only?
    ├── YesSSE (simplest, auto-reconnect)
    └── NoDo you need bidirectional communication?
        ├── YesIs it browser-facing?
        │   │
        │   ├── YesWebSocket
        │   │
        │   └── No → gRPC Bidirectional Streaming
        └── Do you need durable event storage?
            ├── YesKafka + consumer of choice
            └── NoDirect WebFlux streaming

8.3 Performance Comparison by Workload

WorkloadSpring MVCWebFluxWebFlux + KafkaNotes
100 concurrent users50ms avg48ms52msNegligible difference
10K concurrent users2s avg120ms150msWebFlux shines
100K concurrent usersTimeout250ms300msMVC cannot handle
1M events/sec ingestN/ALimited1.2M/sKafka essential
Complex event proc.N/APossible500K/sKafka Streams advantage

9. Six-Month Learning Roadmap

Month 1-2: Reactive Foundations

WeekTopicDeliverable
1Reactive Streams spec, Publisher/SubscriberImplement custom Publisher
2Project Reactor: Mono/Flux fundamentals20 operator exercises
3Error handling, retry, timeout patternsResilient API client
4Testing reactive code (StepVerifier)90% test coverage on exercises
5-6Spring WebFlux: controllers, functional endpointsCRUD REST API (WebFlux)
7-8R2DBC, WebClient, reactive securityFull-stack reactive application

Month 3-4: Kafka Mastery

WeekTopicDeliverable
9-10Kafka fundamentals: topics, partitions, consumersLocal 3-broker cluster setup
11-12Spring Kafka: producer/consumer patternsOrder processing system
13-14Kafka Streams: stateless and stateful operationsReal-time analytics pipeline
15-16Exactly-once, transactions, Schema RegistryTransactional event system

Month 5: Integration and Streaming APIs

WeekTopicDeliverable
17SSE with WebFluxLive notification system
18WebSocket with WebFluxChat application
19gRPC Streaming with Spring BootInternal service communication
20Reactor Kafka + CQRS patternEvent-sourced microservice

Month 6: Production Readiness

WeekTopicDeliverable
21KRaft migration, cluster operationsKRaft cluster deployment
22Monitoring: Micrometer, Prometheus, GrafanaObservability dashboard
23Performance tuning, load testingBenchmark report
24Capstone: Real-time stock ticker systemProduction-ready project

10. Quiz

Q1: What is the main advantage of Spring WebFlux over Spring MVC for I/O-bound workloads?

Answer: WebFlux uses a non-blocking event loop model (Netty) that can handle thousands of concurrent connections with just a few threads. Spring MVC uses a thread-per-request model where each concurrent request occupies a thread. For I/O-bound workloads (where threads spend most time waiting for database, network, or file operations), WebFlux is dramatically more efficient because it does not waste threads on waiting. In benchmarks, WebFlux can handle 7-8x more throughput with 77% less memory for I/O-bound workloads.

Q2: Explain the difference between flatMap, concatMap, and switchMap in Project Reactor.

Answer:

  • flatMap: Maps each element to a Publisher and merges them eagerly. Inner publishers run concurrently and results may arrive out of order. Best for maximum throughput when order does not matter.
  • concatMap: Maps each element to a Publisher and subscribes sequentially. Waits for each inner publisher to complete before subscribing to the next. Preserves order but lower throughput.
  • switchMap: Maps each element to a Publisher, but cancels the previous inner publisher when a new element arrives. Only the latest subscription is active. Ideal for search-as-you-type where only the latest query result matters.
Q3: Why did Kafka replace ZooKeeper with KRaft? Name three specific technical benefits.

Answer:

  1. Partition scalability: ZooKeeper stored all metadata in memory, limiting clusters to around 200,000 partitions. KRaft uses a log-based metadata store, enabling millions of partitions.
  2. Faster failover: Controller failover in ZooKeeper mode required reading all metadata from ZooKeeper (minutes). KRaft failover takes seconds because the new controller already has metadata replicated via Raft.
  3. Simplified operations: With ZooKeeper, teams had to deploy, configure, monitor, and maintain two separate distributed systems. KRaft eliminates the ZooKeeper dependency entirely, reducing operational complexity.

Additional benefits include faster controlled shutdown, elimination of split-brain risks between Kafka and ZooKeeper, and event-driven metadata propagation (versus ZooKeeper's asynchronous, potentially laggy propagation).

Q4: When would you choose SSE over WebSocket for a real-time feature?

Answer: Choose SSE when:

  1. Communication is server-to-client only — stock tickers, news feeds, notifications, dashboards. No need for the client to send data back on the same connection.
  2. You need auto-reconnect — SSE has built-in browser reconnection. WebSocket requires manual reconnection logic.
  3. HTTP infrastructure compatibility — SSE works over standard HTTP/1.1, passing through proxies and load balancers without special configuration. WebSocket upgrades can be blocked by corporate firewalls.
  4. Simplicity — SSE is simpler to implement (just a GET endpoint returning text/event-stream). WebSocket requires connection management, heartbeats, and reconnection logic.

Choose WebSocket when you need bidirectional communication (chat, gaming, collaborative editing) or binary data transfer.

Q5: Design a system that uses WebFlux, Kafka, and SSE together. Describe the data flow from event generation to client display.

Answer: Consider a real-time order tracking system:

  1. Event Generation: When an order status changes, the Order Service publishes an OrderStatusChanged event to a Kafka topic (order-events) using Reactor Kafka. The producer uses acks=all for durability.

  2. Event Processing: A Kafka Streams application consumes from order-events, enriches events with customer data (KTable join from customer-profiles topic), and produces enriched events to order-notifications topic.

  3. WebFlux Bridge: A WebFlux service uses KafkaReceiver (Reactor Kafka) to reactively consume from order-notifications. It routes events into a Sinks.Many (multicast hot publisher), keyed by customer ID.

  4. SSE Endpoint: The WebFlux controller exposes an SSE endpoint /api/orders/track that filters the Sinks.Many flux by the authenticated user's customer ID. Each matching event is sent as a ServerSentEvent.

  5. Client: The browser creates an EventSource pointing to the SSE endpoint. On each event, the UI updates the order tracking visualization. If the connection drops, the browser auto-reconnects (SSE built-in).

Data flow: Order Service to Kafka to Kafka Streams to Kafka to Reactor Kafka to Sinks.Many to SSE to Browser.


References

  1. Reactive Manifesto
  2. Reactive Streams Specification
  3. Project Reactor Reference Guide
  4. Spring WebFlux Documentation
  5. Spring WebFlux vs MVC Performance
  6. R2DBC Specification
  7. Netty Project
  8. Apache Kafka Documentation
  9. Kafka: The Definitive Guide (2nd Edition)
  10. KIP-500: Replace ZooKeeper
  11. Kafka KRaft Documentation
  12. Kafka Streams Documentation
  13. Confluent Schema Registry
  14. Kafka Connect Documentation
  15. Reactor Kafka Reference Guide
  16. Server-Sent Events (MDN)
  17. WebSocket API (MDN)
  18. gRPC Documentation
  19. CQRS Pattern (Microsoft)
  20. Event Sourcing Pattern (Microsoft)
  21. Debezium CDC Documentation
  22. Spring Boot gRPC Starter
  23. Kafka Performance Tuning Guide (Confluent)
  24. ZooKeeper Documentation
  25. Raft Consensus Algorithm