Split View: WebFlux + Kafka + ZooKeeper 완전 정복: 리액티브 스트리밍 아키텍처의 모든 것
WebFlux + Kafka + ZooKeeper 완전 정복: 리액티브 스트리밍 아키텍처의 모든 것
- 들어가며
- 1. 리액티브 프로그래밍이란? (기초부터)
- 2. Spring WebFlux 딥다이브
- 3. Apache Kafka 완전 가이드
- 4. ZooKeeper에서 KRaft로: 역사적 전환
- 5. Streaming API 비교: SSE vs WebSocket vs gRPC Streaming
- 6. WebFlux + Kafka 통합 아키텍처
- 7. 실전 프로젝트: 실시간 주식 시세 시스템
- 8. 언제 무엇을 선택할 것인가?
- 9. 학습 로드맵 (6개월)
- 실전 퀴즈
- 참고 자료
들어가며
현대 소프트웨어 시스템은 "실시간"을 요구합니다. 주식 시세는 밀리초 단위로 업데이트되고, 채팅 메시지는 즉시 전달되어야 하며, IoT 센서 데이터는 끊임없이 쏟아집니다. 전통적인 요청-응답(Request-Response) 모델로는 이러한 요구사항을 감당하기 어렵습니다.
이 글에서는 실시간 데이터 처리 아키텍처의 핵심 기술 세 가지를 깊이 있게 다룹니다.
- Spring WebFlux — 리액티브 웹 프레임워크로 높은 동시접속을 효율적으로 처리
- Apache Kafka — 분산 이벤트 스트리밍 플랫폼으로 대용량 데이터를 안정적으로 전달
- Streaming API — SSE, WebSocket, gRPC Streaming으로 클라이언트에 실시간 데이터 푸시
또한 Kafka의 역사적 전환점인 ZooKeeper에서 KRaft로의 마이그레이션도 상세히 살펴봅니다. 각 기술의 동작 원리부터 실전 코드 예제, 성능 벤치마크, 아키텍처 패턴까지 — 리액티브 스트리밍 아키텍처의 모든 것을 한 곳에 담았습니다.
1. 리액티브 프로그래밍이란? (기초부터)
Reactive Manifesto
리액티브 시스템은 2014년에 발표된 Reactive Manifesto에서 네 가지 핵심 속성을 정의합니다.
- Responsive (응답성): 시스템은 가능한 한 적시에 응답해야 합니다. 응답성은 사용성과 유용성의 기초입니다.
- Resilient (탄력성): 시스템은 장애에 직면하더라도 응답성을 유지합니다. 복제(replication), 격리(containment), 위임(delegation)으로 달성합니다.
- Elastic (유연성): 시스템은 작업 부하의 변화에 따라 자원을 증감시키며 응답성을 유지합니다.
- Message Driven (메시지 기반): 리액티브 시스템은 비동기 메시지 전달에 의존하여 컴포넌트 간 느슨한 결합, 격리, 위치 투명성을 보장합니다.
┌─────────────────────────────────┐
│ Responsive │
│ (빠르고 일관된 응답) │
└──────────┬──────────┬───────────┘
│ │
┌──────────▼──┐ ┌───▼──────────┐
│ Resilient │ │ Elastic │
│ (장애 복원력) │ │ (부하 적응력) │
└──────────┬──┘ └───┬──────────┘
│ │
┌──────────▼──────────▼───────────┐
│ Message Driven │
│ (비동기 메시지 기반 통신) │
└─────────────────────────────────┘
명령형 vs 리액티브 패러다임 비교
명령형(Imperative) 프로그래밍은 "무엇을 어떻게 할지" 단계별로 지시합니다.
// 명령형: 블로킹 방식
List<User> users = userRepository.findAll(); // DB 호출 - 스레드 블로킹
List<User> activeUsers = new ArrayList<>();
for (User user : users) {
if (user.isActive()) {
activeUsers.add(user);
}
}
List<String> names = new ArrayList<>();
for (User user : activeUsers) {
names.add(user.getName());
}
return names;
리액티브(Reactive) 프로그래밍은 "데이터 흐름과 변화 전파"를 선언적으로 표현합니다.
// 리액티브: 논블로킹 방식
return userRepository.findAll() // Flux<User> 반환 - 논블로킹
.filter(User::isActive) // 활성 사용자 필터링
.map(User::getName) // 이름 추출
.collectList(); // Mono<List<String>> 반환
핵심 차이는 다음과 같습니다.
| 항목 | 명령형 | 리액티브 |
|---|---|---|
| 실행 모델 | 동기/블로킹 | 비동기/논블로킹 |
| 스레드 사용 | 요청당 하나의 스레드 | 소수의 이벤트 루프 스레드 |
| 데이터 처리 | Pull 기반 (소비자가 요청) | Push 기반 (생산자가 전달) |
| 에러 처리 | try-catch | onError 시그널 |
| 백프레셔 | 없음 (또는 수동 구현) | 내장 지원 |
| 적합한 상황 | 낮은 동시접속, 간단한 로직 | 높은 동시접속, I/O 집약적 |
Reactive Streams 스펙
Reactive Streams는 JVM 상에서 논블로킹 백프레셔를 지원하는 비동기 스트림 처리의 표준입니다. Java 9의 java.util.concurrent.Flow에 포함되었으며, 핵심 인터페이스는 네 가지입니다.
// 1. Publisher: 데이터를 생산하는 주체
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
// 2. Subscriber: 데이터를 소비하는 주체
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 구독 시작
void onNext(T t); // 데이터 수신
void onError(Throwable t); // 에러 발생
void onComplete(); // 스트림 완료
}
// 3. Subscription: Publisher-Subscriber 간 연결 관리
public interface Subscription {
void request(long n); // n개의 데이터 요청 (백프레셔)
void cancel(); // 구독 취소
}
// 4. Processor: Publisher이면서 Subscriber (중간 처리)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
데이터 흐름의 시퀀스는 다음과 같습니다.
Publisher Subscriber
│ │
│◄──── subscribe() ────────────│
│ │
│───── onSubscribe(sub) ──────►│
│ │
│◄──── sub.request(n) ─────────│ ← 백프레셔: n개만 요청
│ │
│───── onNext(data1) ─────────►│
│───── onNext(data2) ─────────►│
│───── ... │
│───── onNext(dataN) ─────────►│
│ │
│◄──── sub.request(m) ─────────│ ← 추가 m개 요청
│ │
│───── onComplete() ──────────►│ (또는 onError())
백프레셔(Backpressure)의 중요성과 동작 원리
백프레셔는 소비자가 처리할 수 있는 속도에 맞춰 생산자의 데이터 전송 속도를 조절하는 메커니즘입니다. 백프레셔가 없으면 다음과 같은 문제가 발생합니다.
- 메모리 초과(OOM): 빠른 생산자가 느린 소비자의 버퍼를 가득 채움
- 데이터 유실: 버퍼 오버플로우로 데이터가 드롭됨
- 시스템 장애: 연쇄적인 장애 전파 (Cascading Failure)
백프레셔 전략에는 여러 가지가 있습니다.
// 1. Buffer: 일정 크기까지 버퍼링 후 오버플로우 시 에러
Flux.range(1, 1000)
.onBackpressureBuffer(256)
.subscribe(item -> processSlowly(item));
// 2. Drop: 처리 못하는 데이터는 버림
Flux.range(1, 1000)
.onBackpressureDrop(dropped ->
log.warn("Dropped: " + dropped))
.subscribe(item -> processSlowly(item));
// 3. Latest: 최신 값만 유지
Flux.range(1, 1000)
.onBackpressureLatest()
.subscribe(item -> processSlowly(item));
// 4. Error: 처리 못하면 에러 시그널
Flux.range(1, 1000)
.onBackpressureError()
.subscribe(item -> processSlowly(item));
2. Spring WebFlux 딥다이브
WebFlux vs Spring MVC 아키텍처 비교
Spring MVC와 WebFlux는 근본적으로 다른 실행 모델을 사용합니다.
Spring MVC: Thread-per-Request 모델
Client Request ──► Tomcat Thread Pool (200 threads)
│
Thread-1 ──► Controller ──► Service ──► DB (블로킹 대기)
Thread-2 ──► Controller ──► Service ──► DB (블로킹 대기)
Thread-3 ──► Controller ──► Service ──► External API (블로킹 대기)
...
Thread-200 ──► (모든 스레드 고갈 시 요청 대기열에 쌓임)
- 기본 스레드 풀 크기: Tomcat 기준 200개
- 각 요청이 하나의 스레드를 점유 (I/O 대기 중에도)
- 동시접속 200개 초과 시 요청 대기 발생
Spring WebFlux: Event Loop 모델
Client Request ──► Netty Event Loop (CPU 코어 수 = N개 스레드)
│
EventLoop-1 ──► Handler1 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
EventLoop-2 ──► Handler2 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
...
EventLoop-N ──► (I/O 완료 콜백) ──► 응답 전송
- 이벤트 루프 스레드 수: CPU 코어 수 (예: 8코어 = 8스레드)
- 요청이 스레드를 점유하지 않음 (I/O 대기 시 해제)
- 적은 스레드로 수만 개의 동시접속 처리 가능
Netty 이벤트 루프 모델
Netty는 WebFlux의 기본 런타임 서버입니다. Netty의 핵심 구조를 살펴보겠습니다.
┌────────────────────────────────────────────────────┐
│ Netty Server │
│ │
│ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Boss Group │ │ Worker Group │ │
│ │ (1 thread) │ │ (CPU cores x 2 threads) │ │
│ │ │ │ │ │
│ │ Accept │───►│ EventLoop-1: Channel A │ │
│ │ connections │ │ EventLoop-2: Channel B │ │
│ │ │ │ EventLoop-3: Channel C │ │
│ │ │ │ ... │ │
│ └──────────────┘ └──────────────────────────┘ │
│ │
│ Channel Pipeline: │
│ ┌─────────┬──────────┬───────────┬────────────┐ │
│ │ Decoder │ Handler1 │ Handler2 │ Encoder │ │
│ └─────────┴──────────┴───────────┴────────────┘ │
└────────────────────────────────────────────────────┘
- Boss Group: 클라이언트 연결을 수락하고 Worker Group에 위임
- Worker Group: 실제 I/O 처리를 담당하는 이벤트 루프 스레드
- Channel Pipeline: 인바운드/아웃바운드 핸들러 체인으로 데이터 처리
Project Reactor: Mono vs Flux
Project Reactor는 Spring WebFlux의 리액티브 라이브러리입니다. 핵심 타입 두 가지를 이해하는 것이 중요합니다.
Mono: 0개 또는 1개의 요소를 방출하는 비동기 시퀀스
// Mono 생성
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.empty();
Mono<String> mono3 = Mono.error(new RuntimeException("Error"));
// 실전: 사용자 단건 조회
@GetMapping("/users/me")
public Mono<UserResponse> getCurrentUser(Authentication auth) {
return userRepository.findByEmail(auth.getName()) // Mono<User>
.map(UserResponse::from) // Mono<UserResponse>
.switchIfEmpty(Mono.error(new UserNotFoundException()));
}
Flux: 0개에서 N개의 요소를 방출하는 비동기 시퀀스
// Flux 생성
Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> flux2 = Flux.range(1, 100);
Flux<Long> flux3 = Flux.interval(Duration.ofSeconds(1)); // 무한 스트림
// 실전: 사용자 목록 조회
@GetMapping("/users")
public Flux<UserResponse> getActiveUsers() {
return userRepository.findAll() // Flux<User>
.filter(User::isActive) // 활성 사용자만
.map(UserResponse::from) // DTO 변환
.take(100); // 최대 100명
}
핵심 Operators 가이드
리액티브 프로그래밍의 핵심은 오퍼레이터 조합입니다. 가장 많이 사용하는 오퍼레이터들을 살펴봅니다.
// 1. map: 동기 변환 (1:1)
Flux.just("hello", "world")
.map(String::toUpperCase)
// 결과: "HELLO", "WORLD"
// 2. flatMap: 비동기 변환 (1:N, 순서 보장 없음)
Flux.just(1L, 2L, 3L)
.flatMap(id -> userRepository.findById(id)) // 각 ID로 비동기 조회
// 결과: 순서가 보장되지 않음 (User2, User1, User3 가능)
// 3. concatMap: 비동기 변환 (1:N, 순서 보장)
Flux.just(1L, 2L, 3L)
.concatMap(id -> userRepository.findById(id))
// 결과: 순서 보장 (User1, User2, User3)
// 4. filter: 조건 필터링
Flux.range(1, 10)
.filter(n -> n % 2 == 0)
// 결과: 2, 4, 6, 8, 10
// 5. zip: 여러 소스를 조합 (1:1 매칭)
Mono<User> user = userRepository.findById(userId);
Mono<List<Order>> orders = orderRepository.findByUserId(userId).collectList();
Mono.zip(user, orders)
.map(tuple -> new UserWithOrders(tuple.getT1(), tuple.getT2()));
// 6. merge: 여러 소스를 병합 (인터리빙)
Flux.merge(
hotNewsFlux, // 실시간 뉴스
stockPriceFlux, // 주식 시세
alertFlux // 알림
)
.subscribe(event -> dashboard.update(event));
// 7. concat: 순차적 연결
Flux.concat(
cacheRepository.findById(id), // 캐시 먼저
databaseRepository.findById(id) // 캐시 미스 시 DB
)
.next(); // 첫 번째 결과만
// 8. switchIfEmpty: 빈 결과 처리
userRepository.findByEmail(email)
.switchIfEmpty(Mono.defer(() -> createDefaultUser(email)));
// 9. retry + backoff: 재시도 로직
webClient.get()
.uri("/external-api/data")
.retrieve()
.bodyToMono(ExternalData.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable));
// 10. onErrorResume: 에러 복구
userService.getUserProfile(userId)
.onErrorResume(UserNotFoundException.class,
ex -> Mono.just(UserProfile.anonymous()))
.onErrorResume(ServiceException.class,
ex -> cachedProfileService.getCachedProfile(userId));
WebFlux + R2DBC (리액티브 DB 접근)
R2DBC(Reactive Relational Database Connectivity)는 관계형 데이터베이스에 대한 논블로킹 접근을 제공합니다.
// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
// implementation 'io.r2dbc:r2dbc-postgresql'
// Entity
@Table("users")
public record User(
@Id Long id,
String email,
String name,
boolean active,
LocalDateTime createdAt
) {}
// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByActiveTrue();
@Query("SELECT * FROM users WHERE email = :email")
Mono<User> findByEmail(String email);
@Query("SELECT * FROM users WHERE created_at > :since ORDER BY created_at DESC")
Flux<User> findRecentUsers(LocalDateTime since);
}
// Service
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
private final DatabaseClient databaseClient;
public Flux<User> getActiveUsers() {
return userRepository.findByActiveTrue();
}
// 복잡한 쿼리는 DatabaseClient 사용
public Flux<UserStats> getUserStats() {
return databaseClient
.sql("""
SELECT u.id, u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.name
HAVING COUNT(o.id) > 0
""")
.map(row -> new UserStats(
row.get("id", Long.class),
row.get("name", String.class),
row.get("order_count", Long.class)
))
.all();
}
}
WebFlux + WebClient (리액티브 HTTP 클라이언트)
WebClient는 RestTemplate을 대체하는 논블로킹 HTTP 클라이언트입니다.
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.filter(ExchangeFilterFunctions.basicAuthentication("user", "password"))
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024)) // 16MB
.build();
}
}
@Service
@RequiredArgsConstructor
public class ExternalApiService {
private final WebClient webClient;
// GET 요청
public Mono<ProductResponse> getProduct(String productId) {
return webClient.get()
.uri("/products/{id}", productId)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError,
response -> Mono.error(new ProductNotFoundException(productId)))
.onStatus(HttpStatusCode::is5xxServerError,
response -> Mono.error(new ExternalServiceException()))
.bodyToMono(ProductResponse.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
}
// POST 요청
public Mono<OrderResponse> createOrder(OrderRequest request) {
return webClient.post()
.uri("/orders")
.bodyValue(request)
.retrieve()
.bodyToMono(OrderResponse.class);
}
// Streaming 응답
public Flux<EventData> streamEvents() {
return webClient.get()
.uri("/events/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(EventData.class);
}
}
성능 벤치마크: WebFlux vs MVC
다양한 동시접속 수준에서의 벤치마크 결과입니다. (4코어 8GB, 100ms 지연 시뮬레이션)
┌──────────────────┬────────────┬────────────┬────────────┐
│ 동시접속 수 │ Spring MVC │ WebFlux │ 차이 │
├──────────────────┼────────────┼────────────┼────────────┤
│ 1,000 │ 8,500 rps │ 9,200 rps │ +8% │
│ 10,000 │ 3,200 rps │ 9,100 rps │ +184% │
│ 100,000 │ 타임아웃 │ 8,800 rps │ MVC 실패 │
├──────────────────┼────────────┼────────────┼────────────┤
│ 메모리 사용 │ 2.1 GB │ 0.8 GB │ -62% │
│ 스레드 수 │ 204 │ 16 │ -92% │
│ P99 응답시간 │ 1,200ms │ 180ms │ -85% │
│ (동시접속 10K) │ │ │ │
└──────────────────┴────────────┴────────────┴────────────┘
핵심 포인트:
- 낮은 동시접속(1,000 이하): MVC와 WebFlux의 성능 차이가 크지 않음
- 높은 동시접속(10,000 이상): WebFlux가 압도적으로 유리
- 초고동시접속(100,000): MVC는 스레드 고갈로 사실상 작동 불가
주의점: 블로킹 코드 감지
WebFlux에서 가장 흔한 실수는 이벤트 루프 스레드에서 블로킹 코드를 실행하는 것입니다.
// 절대 금지: 이벤트 루프에서 블로킹 호출
@GetMapping("/bad-example")
public Mono<String> badExample() {
// JDBC는 블로킹 — 이벤트 루프를 멈추게 함
User user = jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class);
return Mono.just(user.getName());
}
// 올바른 방법 1: R2DBC 사용
@GetMapping("/good-example-1")
public Mono<String> goodExample1() {
return r2dbcUserRepository.findById(1L)
.map(User::getName);
}
// 올바른 방법 2: 블로킹 코드를 별도 스케줄러에서 실행
@GetMapping("/good-example-2")
public Mono<String> goodExample2() {
return Mono.fromCallable(() ->
jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class))
.subscribeOn(Schedulers.boundedElastic()) // 블로킹 전용 스레드풀
.map(User::getName);
}
블로킹 코드를 자동으로 감지하려면 Blockhound를 사용할 수 있습니다.
// build.gradle
// testImplementation 'io.projectreactor.tools:blockhound:1.0.9.RELEASE'
// 테스트 설정
@BeforeAll
static void setUp() {
BlockHound.install();
}
@Test
void shouldNotBlock() {
// 블로킹 코드가 있으면 BlockingOperationError 발생
StepVerifier.create(userService.getActiveUsers())
.expectNextCount(5)
.verifyComplete();
}
3. Apache Kafka 완전 가이드
Kafka 아키텍처 개요
Apache Kafka는 LinkedIn에서 개발한 분산 이벤트 스트리밍 플랫폼입니다. 초당 수백만 건의 이벤트를 안정적으로 처리할 수 있습니다.
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic-A P0 │ │ Topic-A P1 │ │ Topic-A P2 │ Leader │
│ │ Topic-B P1R │ │ Topic-B P0 │ │ Topic-A P0R │ Replica│
│ │ Topic-A P2R │ │ Topic-A P0R │ │ Topic-B P1 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ P = Partition (Leader), R = Replica (Follower) │
└─────────────────────────────────────────────────────────────┘
▲ │
│ ▼
┌─────────────┐ ┌─────────────────┐
│ Producers │ │ Consumer Group │
│ │ │ │
│ App1, App2 │ │ Consumer1 ← P0 │
│ │ │ Consumer2 ← P1 │
│ │ │ Consumer3 ← P2 │
└─────────────┘ └─────────────────┘
핵심 구성 요소:
- Broker: Kafka 서버 인스턴스. 클러스터에 여러 브로커가 존재
- Topic: 메시지를 구분하는 논리적 카테고리 (예: user-events, order-events)
- Partition: 토픽을 분할하는 물리적 단위. 병렬 처리의 기본 단위
- Replica: 파티션의 복제본. Leader와 Follower로 구분
- ISR (In-Sync Replicas): Leader와 동기화된 Replica 집합
Producer: 메시지 전송의 모든 것
// Producer 설정
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 성능 튜닝
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB 배치
config.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 20ms 대기
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 압축
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB 버퍼
// 안정성
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 모든 ISR 확인
config.put(ProducerConfig.RETRIES_CONFIG, 3); // 재시도
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 보장
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Acknowledgment 모드 비교:
| acks 값 | 동작 | 안정성 | 성능 |
|---|---|---|---|
| 0 | 전송 후 확인 안 함 | 낮음 (데이터 유실 가능) | 최고 |
| 1 | Leader만 확인 | 중간 (Leader 장애 시 유실 가능) | 높음 |
| all (-1) | 모든 ISR 확인 | 최고 (ISR 모두 확인) | 보통 |
Idempotent Producer: enable.idempotence=true 설정 시, 동일 메시지를 중복 전송하더라도 한 번만 기록합니다. Producer ID와 Sequence Number를 사용하여 중복을 감지합니다.
Consumer: 메시지 소비의 모든 것
// Consumer 설정
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service-group");
// 오프셋 관리
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋
// 성능 튜닝
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB 최소 패치
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 500ms 대기
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 최대 500건
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 3개의 컨슈머 스레드
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
// Consumer 리스너
@Service
@Slf4j
public class OrderEventConsumer {
@KafkaListener(
topics = "order-events",
groupId = "order-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
log.info("Received: topic={}, partition={}, offset={}, key={}",
record.topic(), record.partition(), record.offset(), record.key());
OrderEvent event = record.value();
processOrderEvent(event);
ack.acknowledge(); // 수동 커밋
} catch (Exception e) {
log.error("Failed to process order event", e);
// 에러 처리: DLQ(Dead Letter Queue)로 전송하거나 재시도
}
}
}
Consumer Group과 파티션 할당:
Topic: order-events (6 partitions)
Consumer Group: order-service-group
Consumer-1: P0, P1 (2개 파티션 담당)
Consumer-2: P2, P3 (2개 파티션 담당)
Consumer-3: P4, P5 (2개 파티션 담당)
규칙: 하나의 파티션은 그룹 내 하나의 컨슈머만 소비 가능
컨슈머 수 > 파티션 수이면 일부 컨슈머는 유휴 상태
리밸런싱(Rebalancing): 컨슈머가 그룹에 참가하거나 탈퇴하면 파티션 재할당이 발생합니다. 리밸런싱 중에는 모든 컨슈머가 일시적으로 소비를 중단합니다.
Exactly-Once Semantics (EOS)
Kafka의 전달 보증 수준은 세 가지입니다.
- At-Most-Once: 메시지가 유실될 수 있지만 중복은 없음
- At-Least-Once: 메시지가 중복될 수 있지만 유실은 없음
- Exactly-Once: 메시지가 정확히 한 번만 처리됨
Exactly-Once를 달성하려면 Transactional Producer와 Consumer를 함께 사용합니다.
// Transactional Producer 설정
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-producer-1");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 트랜잭션 사용 예제
@Service
@RequiredArgsConstructor
public class OrderProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void processOrder(Order order) {
kafkaTemplate.executeInTransaction(ops -> {
// 하나의 트랜잭션으로 묶임
ops.send("order-confirmed", order.getId(), new OrderConfirmedEvent(order));
ops.send("inventory-update", order.getProductId(), new InventoryUpdateEvent(order));
ops.send("notification", order.getUserId(), new NotificationEvent(order));
return true;
});
// 3개의 메시지가 모두 성공하거나 모두 실패
}
}
Kafka Streams 개요
Kafka Streams는 Kafka 위에 구축된 스트림 처리 라이브러리입니다.
// KStream: 이벤트 스트림 (각 레코드가 독립적)
// KTable: 변경 로그 (키의 최신 값만 유지)
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KStream<String, OrderEvent> orderStream(StreamsBuilder builder) {
KStream<String, OrderEvent> stream = builder.stream("order-events");
// 1. 필터링 + 변환
stream
.filter((key, event) -> event.getAmount() > 10000) // 1만원 이상
.mapValues(event -> new HighValueOrderEvent(event))
.to("high-value-orders");
// 2. 윈도우 집계: 5분 간격 주문 금액 합계
stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
() -> 0L,
(key, event, total) -> total + event.getAmount(),
Materialized.as("order-amount-store")
)
.toStream()
.to("order-amount-per-5min");
// 3. KStream-KTable Join
KTable<String, UserProfile> userTable = builder.table("user-profiles");
stream
.join(userTable,
(order, user) -> new EnrichedOrder(order, user))
.to("enriched-orders");
return stream;
}
}
Kafka Connect와 Debezium CDC
Kafka Connect는 외부 시스템과 Kafka 간 데이터를 안정적으로 이동시키는 프레임워크입니다.
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-server",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"topic.prefix": "myapp",
"database.include.list": "orders_db",
"table.include.list": "orders_db.orders,orders_db.order_items",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes"
}
}
Debezium은 CDC(Change Data Capture) 커넥터로, 데이터베이스의 변경 사항을 실시간으로 Kafka 토픽에 스트리밍합니다. INSERT, UPDATE, DELETE 이벤트가 모두 캡처됩니다.
Schema Registry + Avro
스키마 레지스트리는 Kafka 메시지의 스키마를 중앙에서 관리하여 호환성을 보장합니다.
// user-event.avsc (Avro 스키마)
{
"type": "record",
"name": "UserEvent",
"namespace": "com.example.events",
"fields": [
{"name": "userId", "type": "string"},
{"name": "action", "type": {"type": "enum", "name": "Action",
"symbols": ["CREATED", "UPDATED", "DELETED"]}},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
성능 튜닝 가이드
┌──────────────────────────────────────────────────────────────┐
│ Producer 튜닝 │
├──────────────────┬───────────────────────────────────────────┤
│ batch.size │ 16KB(기본) → 32~64KB (처리량 증가) │
│ linger.ms │ 0(기본) → 10~50ms (배치 효율성) │
│ compression.type │ none → lz4 (속도), zstd (압축률) │
│ buffer.memory │ 32MB(기본) → 64~128MB (대량 전송 시) │
│ max.in.flight │ 5(기본) → 1 (순서 보장 필요 시) │
├──────────────────┴───────────────────────────────────────────┤
│ Consumer 튜닝 │
├──────────────────┬───────────────────────────────────────────┤
│ fetch.min.bytes │ 1(기본) → 1024~10240 (배치 페치) │
│ fetch.max.wait │ 500ms(기본) → 조정 (지연 vs 처리량) │
│ max.poll.records │ 500(기본) → 워크로드에 따라 조정 │
│ session.timeout │ 45s(기본) → 리밸런싱 민감도 조절 │
├──────────────────┴───────────────────────────────────────────┤
│ Broker 튜닝 │
├──────────────────┬───────────────────────────────────────────┤
│ num.partitions │ 토픽 기본 파티션 수 (목표 처리량 / 파티션당)│
│ replication │ 3 (프로덕션 권장) │
│ min.insync │ 2 (acks=all과 함께 사용) │
│ log.retention │ 7일(기본) → 비즈니스 요구에 따라 │
└──────────────────┴───────────────────────────────────────────┘
운영: 파티션 수 결정 공식
파티션 수를 결정하는 경험적 공식입니다.
목표 처리량 = T (messages/sec)
단일 파티션 처리량 = P (messages/sec)
필요 파티션 수 = max(T/P_producer, T/P_consumer)
예시:
- 목표: 100,000 msg/sec
- Producer 단일 파티션: 50,000 msg/sec
- Consumer 단일 파티션: 10,000 msg/sec
- 필요 파티션: max(100K/50K, 100K/10K) = max(2, 10) = 10개
실무 권장:
- 소규모: 6~12 파티션
- 중규모: 12~30 파티션
- 대규모: 30~100 파티션
- 초대규모: 100+ (KRaft 모드에서 100만개 이상 지원)
4. ZooKeeper에서 KRaft로: 역사적 전환
ZooKeeper란?
Apache ZooKeeper는 분산 시스템의 코디네이션을 담당하는 서비스입니다. ZAB(ZooKeeper Atomic Broadcast) 프로토콜 기반의 합의 알고리즘을 사용합니다.
┌────────────────────────────────────────────┐
│ ZooKeeper Ensemble │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ZK Node │ │ ZK Node │ │ ZK Node │ │
│ │ (Leader) │ │(Follower)│ │(Follower)│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 저장 데이터: │
│ /brokers/ids/[0,1,2] - 브로커 목록 │
│ /brokers/topics/my-topic - 토픽 메타데이터 │
│ /controller - 컨트롤러 선출 │
│ /admin/delete_topics - 토픽 삭제 요청 │
│ /config - 동적 설정 │
└────────────────────────────────────────────┘
▲ ▲ ▲
│ │ │
Kafka Broker1 Broker2 Broker3
Kafka가 ZooKeeper에 의존했던 이유는 다음과 같습니다.
- 컨트롤러 선출: 클러스터 내 하나의 브로커가 컨트롤러 역할을 담당
- 토픽/파티션 메타데이터: 토픽 설정, 파티션 할당 정보 저장
- 브로커 등록: 살아있는 브로커 목록 관리 (Ephemeral Node)
- ACL 관리: 접근 제어 목록 저장
- 컨슈머 그룹 관리: 과거에는 컨슈머 오프셋도 ZK에 저장 (현재는 내부 토픽 사용)
ZooKeeper의 한계
ZooKeeper는 Kafka의 규모가 커지면서 여러 문제를 드러냈습니다.
- 운영 복잡성: Kafka와 별도로 ZooKeeper 클러스터를 운영해야 함 (별도의 모니터링, 패치, 백업)
- 확장성 병목: ZK의 znode에 메타데이터를 저장하므로, 수십만 개 이상의 파티션에서 성능 저하
- 컨트롤러 페일오버 지연: 컨트롤러가 죽으면 새 컨트롤러가 ZK에서 전체 메타데이터를 로드해야 하므로 수 분 소요 가능
- 단일 장애점 가능성: ZK 클러스터 자체가 과부하되거나 장애 발생 시 Kafka 전체에 영향
- 이중 인프라 비용: ZK 3노드 + Kafka N노드 운영 부담
KRaft 모드 (KIP-500)
KRaft(Kafka Raft)는 ZooKeeper를 완전히 제거하고, Kafka 자체에 Raft 기반 합의 프로토콜을 내장한 아키텍처입니다.
┌────────────────────────────────────────────────────┐
│ Kafka Cluster (KRaft Mode) │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ Controller Quorum │ │
│ │ │ │
│ │ Broker-0 Broker-1 Broker-2 │ │
│ │ (Controller (Controller (Controller │ │
│ │ + Broker) + Broker) + Broker) │ │
│ │ [Active] [Follower] [Follower] │ │
│ └───────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ Data Brokers │ │
│ │ Broker-3 Broker-4 Broker-5 │ │
│ │ (Broker only) (Broker only) (Broker only)│ │
│ └───────────────────────────────────────────┘ │
│ │
│ Metadata Log: __cluster_metadata (내부 토픽) │
│ 합의 프로토콜: Raft (자체 구현) │
└────────────────────────────────────────────────────┘
KRaft의 장점
| 항목 | ZooKeeper 모드 | KRaft 모드 |
|---|---|---|
| 외부 의존성 | ZK 클러스터 필요 | 없음 (자체 완결) |
| 운영 복잡성 | 높음 (2개 시스템) | 낮음 (1개 시스템) |
| 최대 파티션 수 | 약 20만 개 | 100만 개 이상 |
| 컨트롤러 페일오버 | 수 분 | 수 초 |
| 메타데이터 전파 | ZK Watch + RPC | Raft 로그 복제 |
| 배포 복잡성 | ZK 별도 설치/설정 | Kafka만 설치 |
마이그레이션 가이드: ZK에서 KRaft로
Kafka 3.6 이상에서 KRaft가 GA(General Availability)이며, Kafka 4.0에서 ZooKeeper는 완전히 제거됩니다.
# 1단계: 클러스터 ID 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --cluster-id
# 2단계: ZK에서 메타데이터 마이그레이션 준비
bin/kafka-storage.sh format \
--config config/kraft/server.properties \
--cluster-id YOUR_CLUSTER_ID \
--release-version 3.7
# 3단계: 브로커 설정 변경 (server.properties)
# ZK 모드 설정 제거
# zookeeper.connect=localhost:2181 ← 제거
# KRaft 모드 설정 추가
# process.roles=broker,controller
# node.id=1
# controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
# controller.listener.names=CONTROLLER
# listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# 4단계: 롤링 재시작 (하나씩 순차적으로)
# 각 브로커를 중지 → KRaft 설정 적용 → 재시작
# 5단계: 마이그레이션 완료 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --verify
마이그레이션 시 주의사항:
- 반드시 Kafka 3.6 이상 버전에서 진행
- 마이그레이션 전 전체 백업 수행
- 스테이징 환경에서 먼저 검증
- 롤링 방식으로 진행하여 다운타임 최소화
- 마이그레이션 중에는 토픽 생성/삭제 자제
5. Streaming API 비교: SSE vs WebSocket vs gRPC Streaming
세 가지 Streaming API 비교
| 항목 | SSE | WebSocket | gRPC Streaming |
|---|---|---|---|
| 통신 방향 | 서버 → 클라이언트 (단방향) | 양방향 (Full Duplex) | 양방향 (Full Duplex) |
| 프로토콜 | HTTP/1.1 | WS (HTTP Upgrade) | HTTP/2 |
| 데이터 형식 | Text (UTF-8) | Text / Binary | Protobuf (Binary) |
| 자동 재연결 | 내장 지원 | 수동 구현 필요 | 수동 구현 필요 |
| 브라우저 지원 | 모든 모던 브라우저 | 모든 모던 브라우저 | grpc-web 필요 |
| 멀티플렉싱 | 불가 (HTTP/1.1) | 불가 | 가능 (HTTP/2) |
| 프록시/방화벽 | 친화적 (HTTP) | 일부 제한 | 일부 제한 |
| 직렬화 오버헤드 | 낮음 | 낮음~중간 | 매우 낮음 |
| 적합한 케이스 | 알림, 피드, 시세 | 채팅, 게임, 협업 | MSA 간 통신 |
SSE (Server-Sent Events) 구현
WebFlux 기반 SSE:
@RestController
@RequestMapping("/api/stream")
public class SseController {
private final StockPriceService stockPriceService;
// 방법 1: Flux + ServerSentEvent 래퍼
@GetMapping(value = "/stocks", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
@RequestParam List<String> symbols) {
return stockPriceService.getPriceStream(symbols)
.map(price -> ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(price.getTimestamp()))
.event("price-update")
.data(price)
.retry(Duration.ofSeconds(5))
.build());
}
// 방법 2: 심플한 Flux 스트림
@GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<NotificationEvent> streamNotifications(
@AuthenticationPrincipal UserDetails user) {
return notificationService.getNotificationStream(user.getUsername())
.doOnCancel(() -> log.info("Client disconnected: {}", user.getUsername()));
}
// 방법 3: Heartbeat 포함 (연결 유지)
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> streamWithHeartbeat() {
Flux<ServerSentEvent<Object>> dataStream = eventService.getEventStream()
.map(event -> ServerSentEvent.builder()
.event("data")
.data((Object) event)
.build());
Flux<ServerSentEvent<Object>> heartbeat = Flux.interval(Duration.ofSeconds(30))
.map(tick -> ServerSentEvent.builder()
.event("heartbeat")
.data((Object) "ping")
.build());
return Flux.merge(dataStream, heartbeat);
}
}
클라이언트 (JavaScript):
const eventSource = new EventSource('/api/stream/stocks?symbols=AAPL,GOOGL,MSFT')
eventSource.addEventListener('price-update', (event) => {
const price = JSON.parse(event.data)
updateDashboard(price)
})
eventSource.addEventListener('heartbeat', () => {
console.log('Connection alive')
})
eventSource.onerror = (error) => {
console.error('SSE error:', error)
// EventSource는 자동으로 재연결을 시도합니다
}
WebSocket 구현
WebFlux 기반 WebSocket:
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketHandlerMapping(ChatWebSocketHandler handler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/chat", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Component
@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {
private final Sinks.Many<ChatMessage> chatSink =
Sinks.many().multicast().onBackpressureBuffer();
@Override
public Mono<Void> handle(WebSocketSession session) {
// 인바운드: 클라이언트 → 서버
Mono<Void> input = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::parseChatMessage)
.doOnNext(msg -> {
log.info("Received: {}", msg);
chatSink.tryEmitNext(msg); // 모든 구독자에게 브로드캐스트
})
.then();
// 아웃바운드: 서버 → 클라이언트
Mono<Void> output = session.send(
chatSink.asFlux()
.map(msg -> session.textMessage(toJson(msg)))
);
// 양방향 동시 처리
return Mono.zip(input, output).then();
}
private ChatMessage parseChatMessage(String payload) {
return new ObjectMapper().readValue(payload, ChatMessage.class);
}
private String toJson(ChatMessage msg) {
return new ObjectMapper().writeValueAsString(msg);
}
}
gRPC Streaming 구현
Proto 파일 정의:
syntax = "proto3";
package stockservice;
service StockService {
// Server Streaming: 서버가 연속으로 전송
rpc StreamPrices(StockRequest) returns (stream StockPrice);
// Client Streaming: 클라이언트가 연속으로 전송
rpc SendOrders(stream OrderRequest) returns (OrderSummary);
// Bidirectional Streaming: 양방향
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message StockRequest {
repeated string symbols = 1;
}
message StockPrice {
string symbol = 1;
double price = 2;
int64 timestamp = 3;
}
message OrderRequest {
string symbol = 1;
int32 quantity = 2;
double price = 3;
}
message OrderSummary {
int32 total_orders = 1;
double total_amount = 2;
}
message ChatMessage {
string user = 1;
string content = 2;
int64 timestamp = 3;
}
gRPC Server Streaming 구현:
@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {
private final StockPriceService priceService;
@Override
public void streamPrices(StockRequest request,
StreamObserver<StockPrice> responseObserver) {
// 구독 시작
Disposable subscription = priceService
.getPriceStream(request.getSymbolsList())
.subscribe(
price -> {
// 각 가격 업데이트를 클라이언트에 전송
responseObserver.onNext(StockPrice.newBuilder()
.setSymbol(price.getSymbol())
.setPrice(price.getPrice())
.setTimestamp(price.getTimestamp())
.build());
},
error -> {
responseObserver.onError(Status.INTERNAL
.withDescription(error.getMessage())
.asRuntimeException());
},
() -> {
responseObserver.onCompleted();
}
);
// 클라이언트 연결 해제 시 구독 취소
Context.current().addListener(
context -> subscription.dispose(),
Runnable::run
);
}
}
Streaming API 선택 가이드
┌─────────────┐
│ 양방향 통신 │
│ 필요한가? │
└──────┬──────┘
예 / \ 아니오
/ \
┌────────▼──┐ ┌──▼──────────┐
│ 브라우저 │ │ SSE │
│ 직접 통신? │ │ (가장 간단) │
└─────┬─────┘ └─────────────┘
예 / \ 아니오
/ \
┌─────────▼─┐ ┌─▼──────────────┐
│ WebSocket │ │ gRPC Streaming │
│ (브라우저) │ │ (MSA/백엔드) │
└───────────┘ └────────────────┘
6. WebFlux + Kafka 통합 아키텍처
Reactor Kafka 설정
Reactor Kafka는 Kafka를 리액티브하게 사용할 수 있게 해주는 라이브러리입니다.
// build.gradle
// implementation 'io.projectreactor.kafka:reactor-kafka:1.3.23'
@Configuration
public class ReactorKafkaConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, Object> reactiveProducer() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
SenderOptions<String, Object> senderOptions = SenderOptions.create(props);
return new ReactiveKafkaProducerTemplate<>(senderOptions);
}
@Bean
public ReactiveKafkaConsumerTemplate<String, StockPrice> reactiveConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-price-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ReceiverOptions<String, StockPrice> receiverOptions =
ReceiverOptions.<String, StockPrice>create(props)
.subscription(Collections.singleton("stock-prices"));
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
}
실시간 데이터 파이프라인: Kafka에서 브라우저까지
┌──────────┐ ┌─────────┐ ┌───────────┐ ┌──────────┐ ┌──────────┐
│ External │ │ Kafka │ │ WebFlux │ │ SSE │ │ React │
│ API │───►│ Producer│───►│ Consumer │───►│ Endpoint │───►│ Frontend │
│ │ │ │ │ (Reactor) │ │ │ │ │
└──────────┘ └─────────┘ └───────────┘ └──────────┘ └──────────┘
시세 소스 토픽 발행 리액티브 소비 HTTP 스트림 실시간 렌더링
// 1. Kafka Producer: 외부 API에서 시세 데이터를 Kafka로
@Service
@RequiredArgsConstructor
@Slf4j
public class StockPriceProducer {
private final ReactiveKafkaProducerTemplate<String, Object> producer;
private final WebClient marketDataClient;
@Scheduled(fixedRate = 1000) // 1초마다
public void fetchAndPublish() {
marketDataClient.get()
.uri("/v1/quotes?symbols=AAPL,GOOGL,MSFT")
.retrieve()
.bodyToFlux(StockPrice.class)
.flatMap(price -> producer.send("stock-prices", price.getSymbol(), price))
.doOnNext(result -> log.debug("Sent: topic={}, partition={}, offset={}",
result.recordMetadata().topic(),
result.recordMetadata().partition(),
result.recordMetadata().offset()))
.subscribe();
}
}
// 2. Kafka Consumer + SSE: Kafka에서 브라우저로
@RestController
@RequestMapping("/api/stream")
@RequiredArgsConstructor
public class StockStreamController {
private final ReactiveKafkaConsumerTemplate<String, StockPrice> consumer;
// Kafka 토픽을 SSE로 브리지
@GetMapping(value = "/stock-prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
@RequestParam(required = false) List<String> symbols) {
return consumer.receiveAutoAck()
.map(ConsumerRecord::value)
.filter(price -> symbols == null || symbols.contains(price.getSymbol()))
.map(price -> ServerSentEvent.<StockPrice>builder()
.id(price.getSymbol() + "-" + price.getTimestamp())
.event("price-update")
.data(price)
.build())
.doOnCancel(() -> log.info("Client disconnected from stock stream"));
}
}
CQRS + Event Sourcing with Kafka
┌──────────────────────────────────────────────────────────┐
│ CQRS Pattern │
│ │
│ Command Side │ Query Side │
│ │ │
│ ┌──────────┐ │ ┌───────────┐ │
│ │ WebFlux │ Command │ │ WebFlux │ Query │
│ │Controller│────────┐ │ │Controller │◄─────┐ │
│ └──────────┘ │ │ └───────────┘ │ │
│ ▼ │ │ │
│ ┌──────────────────────┐ │ ┌─────────────────────┐ │
│ │ Kafka Topic │ │ │ Read Database │ │
│ │ (Event Store) │──┼─►│ (Materialized │ │
│ │ order-events │ │ │ View / Redis) │ │
│ └──────────────────────┘ │ └─────────────────────┘ │
│ │ │
└──────────────────────────────────────────────────────────┘
// Command: 주문 생성 → Kafka에 이벤트 발행
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final ReactiveKafkaProducerTemplate<String, Object> producer;
public Mono<String> createOrder(CreateOrderCommand command) {
String orderId = UUID.randomUUID().toString();
OrderCreatedEvent event = new OrderCreatedEvent(
orderId, command.getUserId(), command.getItems(),
command.getTotalAmount(), Instant.now()
);
return producer.send("order-events", orderId, event)
.map(result -> orderId);
}
}
// Query: Kafka 이벤트를 소비하여 읽기 모델 업데이트
@Service
@RequiredArgsConstructor
public class OrderProjectionService {
private final ReactiveKafkaConsumerTemplate<String, OrderEvent> consumer;
private final ReactiveRedisTemplate<String, OrderView> redisTemplate;
@PostConstruct
public void startProjection() {
consumer.receiveAutoAck()
.map(ConsumerRecord::value)
.flatMap(this::updateProjection)
.doOnError(e -> log.error("Projection error", e))
.retry()
.subscribe();
}
private Mono<Boolean> updateProjection(OrderEvent event) {
if (event instanceof OrderCreatedEvent created) {
OrderView view = OrderView.from(created);
return redisTemplate.opsForValue()
.set("order:" + created.getOrderId(), view);
}
return Mono.just(true);
}
}
// Query Controller: Redis에서 읽기
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderQueryController {
private final ReactiveRedisTemplate<String, OrderView> redisTemplate;
@GetMapping("/{orderId}")
public Mono<OrderView> getOrder(@PathVariable String orderId) {
return redisTemplate.opsForValue()
.get("order:" + orderId)
.switchIfEmpty(Mono.error(new OrderNotFoundException(orderId)));
}
}
7. 실전 프로젝트: 실시간 주식 시세 시스템
전체 아키텍처
┌────────────────────────────────────────────────────────────────────┐
│ Real-Time Stock Price System │
│ │
│ ┌──────────┐ ┌──────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Market │ │ Kafka │ │ WebFlux │ │ React │ │
│ │ Data API │──►│ Producer │──►│ Consumer │──►│ Frontend │ │
│ │ (Yahoo/ │ │ │ │ + SSE │ │ (Dashboard) │ │
│ │ Alpha) │ │ │ │ │ │ │ │
│ └──────────┘ └──────────┘ └────────────┘ └──────────────┘ │
│ │ │ │ │ │
│ │ ┌────▼────┐ ┌─────▼─────┐ ┌──────▼──────┐ │
│ │ │ Kafka │ │ Redis │ │ Prometheus │ │
│ │ │ Topic │ │ Cache │ │ + Grafana │ │
│ │ │ (3 rep) │ │ │ │ │ │
│ │ └─────────┘ └───────────┘ └─────────────┘ │
│ │ │
│ ┌────▼──────────────────────────────────────┐ │
│ │ Kafka Streams: 집계/분석 │ │
│ │ - 5분 이동평균 │ │
│ │ - 일일 최고/최저가 │ │
│ │ - 거래량 이상 탐지 │ │
│ └───────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
핵심 코드 스니펫
1) Market Data Fetcher:
@Component
@RequiredArgsConstructor
@Slf4j
public class MarketDataFetcher {
private final WebClient marketClient;
private final ReactiveKafkaProducerTemplate<String, Object> kafkaProducer;
@Scheduled(fixedRate = 1000)
public void fetchPrices() {
List<String> symbols = List.of("AAPL", "GOOGL", "MSFT", "AMZN", "TSLA");
Flux.fromIterable(symbols)
.flatMap(symbol -> marketClient.get()
.uri("/quote/{symbol}", symbol)
.retrieve()
.bodyToMono(MarketQuote.class)
.map(quote -> new StockPrice(
symbol, quote.getPrice(), quote.getVolume(),
quote.getChange(), Instant.now().toEpochMilli()))
.onErrorResume(e -> {
log.warn("Failed to fetch {}: {}", symbol, e.getMessage());
return Mono.empty();
}))
.flatMap(price -> kafkaProducer
.send("stock-prices", price.getSymbol(), price)
.doOnSuccess(r -> log.debug("Published: {} = {}",
price.getSymbol(), price.getPrice())))
.subscribe();
}
}
2) Kafka Streams 집계:
@Configuration
@EnableKafkaStreams
public class StockAnalyticsConfig {
@Bean
public KStream<String, StockPrice> stockAnalytics(StreamsBuilder builder) {
KStream<String, StockPrice> priceStream = builder.stream("stock-prices");
// 5분 이동평균 계산
priceStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
PriceAggregation::new,
(symbol, price, agg) -> agg.add(price.getPrice()),
Materialized.<String, PriceAggregation, WindowStore<Bytes, byte[]>>
as("price-5min-avg")
.withValueSerde(new PriceAggregationSerde()))
.toStream()
.map((windowed, agg) -> KeyValue.pair(
windowed.key(),
new PriceAverage(windowed.key(), agg.getAverage(),
agg.getMin(), agg.getMax(), agg.getCount())))
.to("stock-price-analytics");
return priceStream;
}
}
// 집계 클래스
public class PriceAggregation {
private double sum = 0;
private double min = Double.MAX_VALUE;
private double max = Double.MIN_VALUE;
private long count = 0;
public PriceAggregation add(double price) {
sum += price;
min = Math.min(min, price);
max = Math.max(max, price);
count++;
return this;
}
public double getAverage() {
return count > 0 ? sum / count : 0;
}
// getters ...
}
3) SSE 엔드포인트:
@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {
private final ReactiveKafkaConsumerTemplate<String, StockPrice> priceConsumer;
private final ReactiveKafkaConsumerTemplate<String, PriceAverage> analyticsConsumer;
// 실시간 시세 스트림
@GetMapping(value = "/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> liveStream(
@RequestParam List<String> symbols) {
return priceConsumer.receiveAutoAck()
.map(ConsumerRecord::value)
.filter(p -> symbols.contains(p.getSymbol()))
.map(p -> ServerSentEvent.<StockPrice>builder()
.event("price")
.data(p)
.build());
}
// 분석 데이터 스트림
@GetMapping(value = "/analytics", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PriceAverage>> analyticsStream() {
return analyticsConsumer.receiveAutoAck()
.map(ConsumerRecord::value)
.map(avg -> ServerSentEvent.<PriceAverage>builder()
.event("analytics")
.data(avg)
.build());
}
}
4) React Frontend (TypeScript):
// hooks/useStockStream.ts
import { useState, useEffect, useCallback } from 'react';
interface StockPrice {
symbol: string;
price: number;
volume: number;
change: number;
timestamp: number;
}
export function useStockStream(symbols: string[]) {
const [prices, setPrices] = useState<Map<string, StockPrice>>(new Map());
useEffect(() => {
const params = symbols.map(s => `symbols=${s}`).join('&');
const eventSource = new EventSource(`/api/stocks/live?${params}`);
eventSource.addEventListener('price', (event: MessageEvent) => {
const price: StockPrice = JSON.parse(event.data);
setPrices(prev => new Map(prev).set(price.symbol, price));
});
eventSource.onerror = () => {
console.error('SSE connection error, will auto-reconnect...');
};
return () => eventSource.close();
}, [symbols.join(',')]);
return prices;
}
// components/StockDashboard.tsx
function StockDashboard() {
const prices = useStockStream(['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']);
return (
<div className="grid grid-cols-5 gap-4">
{Array.from(prices.entries()).map(([symbol, price]) => (
<div key={symbol} className="p-4 rounded-lg bg-gray-800">
<h3 className="text-lg font-bold">{symbol}</h3>
<p className="text-2xl">${price.price.toFixed(2)}</p>
<p className={price.change >= 0 ? 'text-green-400' : 'text-red-400'}>
{price.change >= 0 ? '+' : ''}{price.change.toFixed(2)}%
</p>
</div>
))}
</div>
);
}
확장성: 파티션 기반 수평 확장
확장 전 (1 인스턴스):
WebFlux Instance-1 ← Kafka Consumer (P0, P1, P2, P3, P4, P5)
확장 후 (3 인스턴스):
WebFlux Instance-1 ← Kafka Consumer (P0, P1) ← SSE Client 1~1000
WebFlux Instance-2 ← Kafka Consumer (P2, P3) ← SSE Client 1001~2000
WebFlux Instance-3 ← Kafka Consumer (P4, P5) ← SSE Client 2001~3000
핵심: Kafka 파티션 수 = 최대 병렬 Consumer 수
파티션 키(symbol)로 같은 종목은 같은 파티션에 순서 보장
모니터링 설정
# docker-compose-monitoring.yml
services:
prometheus:
image: prom/prometheus:v2.51.0
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- '9090:9090'
grafana:
image: grafana/grafana:10.4.0
ports:
- '3000:3000'
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
kafka-exporter:
image: danielqsj/kafka-exporter:latest
command: ['--kafka.server=kafka:9092']
ports:
- '9308:9308'
// Micrometer 메트릭 추가
@Component
@RequiredArgsConstructor
public class StockMetrics {
private final MeterRegistry registry;
private final AtomicLong connectedClients = new AtomicLong(0);
@PostConstruct
void init() {
Gauge.builder("stock.stream.connected_clients", connectedClients, AtomicLong::get)
.description("Number of connected SSE clients")
.register(registry);
}
public void recordPricePublished(String symbol) {
registry.counter("stock.price.published",
"symbol", symbol).increment();
}
public void recordLatency(String symbol, long latencyMs) {
registry.timer("stock.price.latency",
"symbol", symbol)
.record(Duration.ofMillis(latencyMs));
}
public void clientConnected() { connectedClients.incrementAndGet(); }
public void clientDisconnected() { connectedClients.decrementAndGet(); }
}
8. 언제 무엇을 선택할 것인가?
기술 선택 의사결정 매트릭스
| 시나리오 | 추천 기술 | 이유 |
|---|---|---|
| 간단한 CRUD API | Spring MVC | 학습 곡선 낮고 생태계 풍부 |
| 높은 동시접속 API | WebFlux | 적은 리소스로 만 단위 동시접속 |
| 이벤트 스트리밍/로그 | Kafka | 높은 처리량, 내구성, 리플레이 가능 |
| 실시간 알림/피드 | SSE | 단방향, 자동 재연결, HTTP 호환 |
| 실시간 채팅/게임 | WebSocket | 양방향, 낮은 지연시간 |
| MSA 간 실시간 통신 | gRPC Streaming | 양방향, Protobuf, 높은 성능 |
| CDC(변경 감지) | Kafka Connect + Debezium | DB 변경을 실시간 캡처 |
| 복잡한 스트림 처리 | Kafka Streams | Stateful 처리, 윈도우 집계 |
| CQRS 패턴 | Kafka + WebFlux | 이벤트 소싱 + 리액티브 쿼리 |
조합 패턴
실전에서는 단일 기술이 아니라 여러 기술을 조합합니다.
패턴 1: 실시간 대시보드
Data Source → Kafka → WebFlux Consumer → SSE → Browser
패턴 2: 실시간 채팅 + 메시지 저장
Browser ↔ WebSocket ↔ WebFlux → Kafka (저장) → Elasticsearch (검색)
패턴 3: MSA 이벤트 드리븐
Service A → Kafka → Service B (WebFlux Consumer)
→ Service C (WebFlux Consumer)
→ Analytics (Kafka Streams)
패턴 4: IoT 데이터 처리
Sensors → MQTT → Kafka Connect → Kafka → Kafka Streams (집계)
→ WebFlux → SSE → Dashboard
안티패턴 (피해야 할 것들)
- WebFlux에서 JDBC 사용: 이벤트 루프를 블로킹하여 성능 급격 저하. R2DBC 또는
Schedulers.boundedElastic()사용 - 모든 API를 WebFlux로: 간단한 CRUD는 MVC가 더 적합. 불필요한 복잡성 증가
- Kafka를 단순 메시지 큐로: RabbitMQ/SQS가 더 적합한 경우도 있음. Kafka는 이벤트 로그에 최적화
- 파티션 수 과다 설정: 파티션이 많으면 리밸런싱 비용 증가. 필요에 맞게 설정
- SSE에서 대용량 데이터: SSE는 텍스트 기반이므로 바이너리 데이터에 부적합. gRPC 사용
9. 학습 로드맵 (6개월)
1개월 차: 리액티브 프로그래밍 기초
- Week 1-2: Project Reactor 핵심 (Mono, Flux, 오퍼레이터)
- 공식 문서의 "Learn" 섹션 완독
- Lite Rx Hands-On 실습 프로젝트
- Week 3-4: Spring WebFlux 기본
- 간단한 REST API를 MVC에서 WebFlux로 마이그레이션
- R2DBC 연동 실습
2개월 차: WebFlux 심화
- Week 1-2: WebClient, WebSocket, SSE 구현
- 실시간 알림 시스템 미니 프로젝트
- Week 3-4: 테스트 (StepVerifier, WebTestClient)
- BlockHound 적용
- 성능 테스트 (Gatling/k6)
3개월 차: Apache Kafka 기초
- Week 1-2: Kafka 아키텍처, Producer/Consumer
- Docker Compose로 로컬 Kafka 클러스터 구축
- 기본 메시지 발행/소비 실습
- Week 3-4: Consumer Group, Offset 관리, EOS
- 멀티 컨슈머 그룹 시나리오 실습
- Transactional Producer 구현
4개월 차: Kafka 심화
- Week 1-2: Kafka Streams, KTable/KStream
- 윈도우 집계, Join 실습
- Week 3-4: Kafka Connect, Schema Registry
- Debezium CDC 연동
- Avro 스키마 관리
5개월 차: 통합 프로젝트
- Week 1-2: WebFlux + Kafka 통합
- Reactor Kafka 활용
- CQRS + Event Sourcing 패턴
- Week 3-4: 실시간 대시보드 프로젝트
- Kafka에서 SSE까지 전체 파이프라인
- React 프론트엔드 연동
6개월 차: 운영과 최적화
- Week 1-2: KRaft 모드 마이그레이션 실습
- 모니터링 (Prometheus + Grafana)
- 알림 설정 (Consumer Lag, Error Rate)
- Week 3-4: 성능 튜닝과 장애 대응
- 부하 테스트 및 병목 분석
- 장애 시나리오 시뮬레이션 (브로커 다운, 네트워크 파티션)
실전 퀴즈
아래 퀴즈를 통해 학습 내용을 점검해 보세요.
Q1: WebFlux에서 Mono.just()과 Mono.defer()의 차이는 무엇인가요?
Mono.just(value)는 값을 즉시 캡처하여 Mono로 감쌉니다. 구독 시점과 관계없이 동일한 값을 반환합니다.
Mono.defer(() -> Mono.just(value))는 구독 시점에 람다를 평가합니다. 따라서 매 구독마다 새로운 값을 생성할 수 있습니다.
// just: 생성 시점에 값이 결정됨
Mono<Long> eager = Mono.just(System.currentTimeMillis());
// 1초 후 구독해도 동일한 타임스탬프
// defer: 구독 시점에 값이 결정됨
Mono<Long> lazy = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
// 구독할 때마다 새로운 타임스탬프
실무에서 switchIfEmpty와 함께 사용할 때 중요합니다. switchIfEmpty(Mono.just(createDefault()))는 createDefault()가 즉시 실행되지만, switchIfEmpty(Mono.defer(() -> createDefault()))는 실제로 빈 경우에만 실행됩니다.
Q2: Kafka에서 acks=all과 min.insync.replicas=2를 함께 설정하면 어떤 보장을 제공하나요?
acks=all은 Leader가 모든 ISR(In-Sync Replicas)의 확인을 받은 후에 Producer에게 성공을 반환합니다.
min.insync.replicas=2는 ISR의 최소 크기를 2로 설정합니다. ISR이 2 미만이면 Producer는 NotEnoughReplicasException을 받습니다.
이 조합은 다음을 보장합니다:
- 메시지가 최소 2개의 브로커에 기록된 후에만 성공 응답
- Replication Factor가 3일 때, 1대의 브로커가 장애나도 데이터 유실 없음
- 2대 이상 동시 장애 시 쓰기 실패 (데이터 유실 방지를 위해 의도적 거부)
프로덕션에서 가장 많이 사용하는 안정성 설정입니다.
Q3: SSE 연결에서 클라이언트가 네트워크 장애로 끊어졌다가 재연결할 때, 어떻게 유실된 이벤트를 복구할 수 있나요?
SSE는 Last-Event-ID 헤더를 통해 재연결 시 마지막으로 수신한 이벤트 ID를 서버에 전달합니다. 서버는 이를 기반으로 유실된 이벤트를 재전송할 수 있습니다.
구현 방법:
- 서버에서 각 이벤트에 고유 ID를 부여합니다 (예: Kafka offset 또는 타임스탬프).
- 클라이언트 재연결 시
Last-Event-ID헤더를 확인합니다. - 해당 ID 이후의 이벤트를 Kafka에서 다시 소비하여 전송합니다.
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<EventData>> stream(
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
long startOffset = lastEventId != null ? Long.parseLong(lastEventId) + 1 : -1;
// startOffset부터 Kafka 소비 시작
return eventService.getEventsFrom(startOffset)
.map(e -> ServerSentEvent.<EventData>builder()
.id(String.valueOf(e.getOffset()))
.data(e)
.build());
}
Kafka의 메시지 보존(retention) 덕분에 일정 기간 내의 이벤트는 언제든 다시 읽을 수 있습니다.
Q4: KRaft 모드에서 Controller Quorum의 Active Controller가 장애나면 어떻게 되나요?
KRaft 모드에서는 Raft 합의 프로토콜을 사용하여 Controller 선출을 자동으로 처리합니다.
- Active Controller가 하트비트를 보내지 않으면, Follower Controller들이 이를 감지합니다.
- Election Timeout이 만료되면 Follower 중 하나가 후보(Candidate)로 전환됩니다.
- 과반수(Quorum)의 투표를 받은 후보가 새로운 Active Controller가 됩니다.
- 새 Controller는 이미 Raft 로그를 통해 메타데이터를 복제하고 있으므로, ZooKeeper 모드처럼 전체 메타데이터를 다시 로드할 필요가 없습니다.
결과적으로 페일오버 시간이 ZooKeeper 모드의 수 분에서 수 초로 단축됩니다. Controller Quorum이 3개(1 Active + 2 Follower)일 때, 1대 장애까지 허용 가능합니다.
Q5: WebFlux에서 flatMap과 concatMap의 차이를 설명하고, 각각 언제 사용해야 하나요?
flatMap: 내부 Publisher를 구독할 때 동시성(concurrency)을 허용합니다. 여러 내부 스트림이 인터리빙(interleaving)되어 순서가 보장되지 않습니다.
concatMap: 내부 Publisher를 순차적으로 구독합니다. 이전 스트림이 완료된 후에 다음 스트림을 구독하므로 순서가 보장됩니다.
// flatMap: 순서 무관, 최대 성능
// 사용 예: 여러 사용자의 프로필을 병렬로 조회
Flux.fromIterable(userIds)
.flatMap(id -> userService.findById(id)) // 동시 실행
// 결과 순서: [User3, User1, User2] (비결정적)
// concatMap: 순서 보장, 순차 실행
// 사용 예: 이벤트를 순서대로 처리해야 할 때
Flux.fromIterable(events)
.concatMap(event -> processEvent(event)) // 하나씩 순차 실행
// 결과 순서: [Event1, Event2, Event3] (항상 동일)
선택 기준:
- 순서가 중요하지 않고 성능이 중요하면 flatMap
- 순서가 반드시 보장되어야 하면 concatMap
- flatMap에 concurrency 파라미터를 설정하여 동시 실행 수를 제한할 수도 있습니다:
flatMap(fn, 10)(최대 10개 동시)
참고 자료
공식 문서
- Spring WebFlux Reference: https://docs.spring.io/spring-framework/reference/web/webflux.html
- Project Reactor Reference: https://projectreactor.io/docs/core/release/reference/
- Apache Kafka Documentation: https://kafka.apache.org/documentation/
- KRaft (KIP-500): https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
- Reactive Streams Specification: https://www.reactive-streams.org/
- R2DBC Specification: https://r2dbc.io/spec/1.0.0.RELEASE/spec/html/
아키텍처 및 패턴
- Reactive Manifesto: https://www.reactivemanifesto.org/
- Kafka The Definitive Guide (O'Reilly)
- Designing Data-Intensive Applications — Martin Kleppmann
- Enterprise Integration Patterns: https://www.enterpriseintegrationpatterns.com/
- CQRS Pattern (Microsoft): https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs
- Event Sourcing Pattern: https://martinfowler.com/eaaDev/EventSourcing.html
Kafka 생태계
- Confluent Schema Registry: https://docs.confluent.io/platform/current/schema-registry/
- Debezium CDC: https://debezium.io/documentation/
- Kafka Streams Documentation: https://kafka.apache.org/documentation/streams/
- Reactor Kafka: https://projectreactor.io/docs/kafka/release/reference/
모니터링 및 운영
- Kafka Exporter for Prometheus: https://github.com/danielqsj/kafka_exporter
- Micrometer Metrics: https://micrometer.io/docs
- Grafana Kafka Dashboard: https://grafana.com/grafana/dashboards/
- BlockHound (블로킹 감지): https://github.com/reactor/BlockHound
학습 자료
- Lite Rx Hands-On: https://github.com/reactor/lite-rx-api-hands-on
- Kafka Tutorials (Confluent): https://developer.confluent.io/tutorials/
- Spring WebFlux Workshop: https://github.com/spring-projects/spring-framework
- gRPC Java: https://grpc.io/docs/languages/java/
- SSE Specification (W3C): https://html.spec.whatwg.org/multipage/server-sent-events.html
WebFlux + Kafka + ZooKeeper Mastery: The Complete Guide to Reactive Streaming Architecture
- Introduction: Why Reactive Streaming Architecture Matters in 2026
- 1. Reactive Programming Fundamentals
- 2. Spring WebFlux Deep Dive
- 3. Apache Kafka: The Complete Guide
- 4. ZooKeeper to KRaft: The Great Migration
- 5. Streaming API Comparison: SSE vs WebSocket vs gRPC Streaming
- 6. WebFlux + Kafka Integration
- 7. Real-World Project: Real-Time Stock Ticker System
- 8. Decision Matrix: When to Use What
- 9. Six-Month Learning Roadmap
- 10. Quiz
- References
Introduction: Why Reactive Streaming Architecture Matters in 2026
Modern applications are expected to process millions of events per second, deliver real-time updates to users, and scale elastically under unpredictable load. Traditional request-response architectures with blocking I/O simply cannot meet these demands. In 2026, the combination of Spring WebFlux for non-blocking request handling, Apache Kafka for durable event streaming, and the transition from ZooKeeper to KRaft for simplified cluster management represents the gold standard for reactive streaming systems.
This guide covers the full stack: from reactive programming fundamentals to production-grade architecture patterns. Whether you are building a real-time analytics dashboard, a financial trading platform, or an IoT data pipeline, you will find actionable knowledge here.
What You Will Learn
- Reactive programming principles and the Reactive Streams specification
- Spring WebFlux internals: Netty event loop, Mono/Flux, operator chains, and R2DBC
- Apache Kafka architecture: partitions, consumer groups, Exactly-Once Semantics, Kafka Streams, and Connect
- The ZooKeeper to KRaft migration: why, when, and how
- Streaming API comparison: SSE vs WebSocket vs gRPC Streaming
- Integration patterns: WebFlux + Reactor Kafka + CQRS + Event Sourcing
- A complete real-world project: real-time stock ticker system
1. Reactive Programming Fundamentals
1.1 The Reactive Manifesto
The Reactive Manifesto defines four key properties that reactive systems must exhibit:
| Property | Description | Implementation Example |
|---|---|---|
| Responsive | System responds in a timely manner | WebFlux non-blocking I/O |
| Resilient | System stays responsive in the face of failure | Circuit breaker, bulkhead patterns |
| Elastic | System stays responsive under varying workload | Kafka partition scaling |
| Message Driven | System relies on asynchronous message passing | Kafka topics, Reactor event streams |
These four properties are not independent. Message-driven architecture enables elasticity and resilience, which together enable responsiveness.
1.2 Reactive Streams Specification
Reactive Streams is a standard for asynchronous stream processing with non-blocking backpressure. It defines four core interfaces:
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
public interface Subscription {
void request(long n);
void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
The critical innovation is the request(long n) method in Subscription. This enables backpressure: the subscriber controls the rate of data flow, preventing the producer from overwhelming the consumer.
1.3 Understanding Backpressure
Backpressure is the mechanism by which a data consumer signals to a producer how much data it can handle. Without backpressure, fast producers overwhelm slow consumers, leading to memory exhaustion or dropped messages.
Producer (1000 msg/s) --> Buffer (overflows!) --> Consumer (100 msg/s)
With backpressure:
Producer (adjusts to 100 msg/s) <-- request(100) -- Consumer (100 msg/s)
Backpressure strategies in Project Reactor:
Flux.range(1, Integer.MAX_VALUE)
.onBackpressureBuffer(256) // Buffer up to 256 items
.onBackpressureDrop(dropped -> // Drop when buffer full
log.warn("Dropped: {}", dropped))
.onBackpressureLatest() // Keep only latest item
.onBackpressureError() // Signal error when overwhelmed
.subscribe(item -> processSlowly(item));
1.4 Reactive Libraries Landscape
| Library | Language/Platform | Key Features | Adoption |
|---|---|---|---|
| Project Reactor | Java | Mono/Flux, Spring integration | Spring ecosystem |
| RxJava 3 | Java | Observable/Flowable, Android support | Android, legacy Java |
| Mutiny | Java | Uni/Multi, Quarkus-native | Quarkus ecosystem |
| Kotlin Flow | Kotlin | Coroutine-based, structured concurrency | Kotlin-first projects |
| Akka Streams | Scala/Java | Actor-based, graph DSL | High-throughput systems |
2. Spring WebFlux Deep Dive
2.1 WebFlux vs Spring MVC Architecture
Spring MVC uses a thread-per-request model. Each incoming request occupies one thread from the pool until the response is fully written. Under high concurrency with I/O-bound workloads, threads are mostly idle waiting for database queries, HTTP calls, or file reads.
Spring WebFlux uses an event-loop model powered by Netty. A small number of event loop threads handle all requests. When I/O is needed, the request is parked (not blocking a thread), and the event loop moves on to handle other requests. When the I/O completes, the event loop picks up the response.
Spring MVC (Thread-per-request):
Thread-1: [Request] --> [DB Wait......] --> [Response]
Thread-2: [Request] --> [API Wait.........] --> [Response]
Thread-3: [Request] --> [File Wait....] --> [Response]
(200 threads = 200 concurrent requests)
Spring WebFlux (Event Loop):
EventLoop-1: [Req1][Req2][Req3][Req1-resume][Req4][Req2-resume]...
EventLoop-2: [Req5][Req6][Req5-resume][Req7][Req8]...
(4 threads = thousands of concurrent requests)
2.2 Netty Event Loop Model
Netty is the default embedded server for WebFlux. Understanding its architecture is essential for performance tuning.
┌─────────────────────────┐
│ Boss EventLoopGroup │
│ (accepts connections) │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Worker EventLoopGroup │
│ (handles I/O events) │
├─────────────────────────┤
│ EventLoop-1 Channel-A │
│ EventLoop-2 Channel-B │
│ EventLoop-3 Channel-C │
│ EventLoop-4 Channel-D │
└─────────────────────────┘
Key rules for working with Netty event loops:
- Never block an event loop thread — no
Thread.sleep(), no blocking I/O, no synchronized locks - Offload CPU-intensive work to bounded elastic schedulers
- Channel affinity — each channel is bound to one event loop for its lifetime
2.3 Mono and Flux: The Core Types
Mono represents 0 or 1 element. Flux represents 0 to N elements.
// Mono: single value or empty
Mono<User> findUser = userRepository.findById(userId);
Mono<Void> saveResult = userRepository.save(user);
// Flux: stream of values
Flux<Order> orders = orderRepository.findByUserId(userId);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<ServerSentEvent<String>> sseStream = Flux.create(sink -> {
// push events to sink
});
Creating Mono and Flux:
// Static factory methods
Mono.just("hello");
Mono.empty();
Mono.error(new RuntimeException("failure"));
Mono.fromCallable(() -> expensiveComputation());
Mono.defer(() -> Mono.just(dynamicValue()));
Flux.just("a", "b", "c");
Flux.fromIterable(myList);
Flux.range(1, 100);
Flux.interval(Duration.ofMillis(500));
Flux.merge(flux1, flux2);
Flux.concat(flux1, flux2);
Flux.zip(flux1, flux2, (a, b) -> a + b);
2.4 Essential Operators
// Transformation
flux.map(String::toUpperCase)
.flatMap(name -> userService.findByName(name)) // async 1:N
.concatMap(user -> enrichUser(user)) // sequential async
.switchMap(query -> search(query)) // cancel previous
.flatMapSequential(id -> fetchById(id)) // ordered parallel
// Filtering
flux.filter(user -> user.isActive())
.distinct()
.take(10)
.skip(5)
.takeUntil(event -> event.isTerminal())
// Error Handling
flux.onErrorReturn("default")
.onErrorResume(ex -> fallbackFlux())
.retry(3)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(30))
.filter(ex -> ex instanceof TransientException))
.timeout(Duration.ofSeconds(5))
// Combining
Flux.merge(fastSource, slowSource) // interleaved
Flux.concat(firstBatch, secondBatch) // sequential
Flux.zip(names, ages, Person::new) // paired
Mono.zip(userMono, profileMono, ordersMono) // parallel join
.map(tuple -> buildResponse(tuple.getT1(), tuple.getT2(), tuple.getT3()))
// Side Effects (for logging/debugging only)
flux.doOnNext(item -> log.debug("Processing: {}", item))
.doOnError(ex -> log.error("Error: {}", ex.getMessage()))
.doOnComplete(() -> log.info("Stream completed"))
.doOnSubscribe(sub -> log.info("Subscribed"))
2.5 WebFlux Controller and Functional Endpoints
Annotation-based (familiar MVC style):
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.streamAllUsers();
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@Valid @RequestBody Mono<User> userMono) {
return userMono.flatMap(userService::create);
}
}
Functional endpoints (router + handler):
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.path("/api/users", builder -> builder
.GET("/{id}", handler::getUser)
.GET("", handler::listUsers)
.POST("", handler::createUser)
.PUT("/{id}", handler::updateUser)
.DELETE("/{id}", handler::deleteUser))
.build();
}
}
@Component
public class UserHandler {
private final UserService userService;
public Mono<ServerResponse> getUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> listUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userService.findAll(), User.class);
}
}
2.6 R2DBC: Reactive Database Access
R2DBC (Reactive Relational Database Connectivity) brings non-blocking database access to WebFlux applications.
@Configuration
@EnableR2dbcRepositories
public class DatabaseConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, "localhost")
.option(PORT, 5432)
.option(DATABASE, "mydb")
.option(USER, "user")
.option(PASSWORD, "password")
.option(MAX_SIZE, 20)
.build());
}
}
public interface UserRepository extends ReactiveCrudRepository<User, String> {
Flux<User> findByStatusOrderByCreatedAtDesc(String status);
@Query("SELECT * FROM users WHERE email = :email")
Mono<User> findByEmail(String email);
@Query("SELECT * FROM users WHERE department = :dept")
Flux<User> findByDepartment(String dept);
}
R2DBC with DatabaseClient for complex queries:
@Repository
public class CustomUserRepository {
private final DatabaseClient databaseClient;
public Flux<User> searchUsers(String keyword, int limit) {
return databaseClient.sql(
"SELECT * FROM users WHERE name ILIKE :keyword LIMIT :limit")
.bind("keyword", "%" + keyword + "%")
.bind("limit", limit)
.map(row -> new User(
row.get("id", String.class),
row.get("name", String.class),
row.get("email", String.class)
))
.all();
}
}
2.7 WebFlux Performance Benchmarks
Benchmark conditions: 4-core CPU, 8GB RAM, 10,000 concurrent connections, I/O-bound workload (50ms database latency per request).
| Metric | Spring MVC (Tomcat) | Spring WebFlux (Netty) | Improvement |
|---|---|---|---|
| Throughput (req/s) | 2,400 | 18,500 | 7.7x |
| Avg Latency (ms) | 4,200 | 540 | 87% lower |
| P99 Latency (ms) | 12,000 | 1,200 | 90% lower |
| Memory Usage (MB) | 2,100 | 480 | 77% lower |
| Threads Active | 200 | 8 | 96% fewer |
| Max Concurrent Requests | 200 | 50,000+ | 250x |
When NOT to use WebFlux:
- CPU-bound workloads (compression, encryption, heavy computation)
- Applications already performing well with MVC
- Teams without reactive programming experience
- When blocking libraries are unavoidable (JDBC without R2DBC wrapper)
- Simple CRUD applications with low concurrency
3. Apache Kafka: The Complete Guide
3.1 Kafka Architecture Overview
Apache Kafka is a distributed event streaming platform capable of handling trillions of events per day. Its architecture is built around a few key concepts:
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic-A │ │ Topic-A │ │ Topic-A │ │
│ │ Part-0 │ │ Part-1 │ │ Part-2 │ │
│ │ (leader) │ │ (leader) │ │ (leader) │ │
│ │ │ │ │ │ │ │
│ │ Topic-A │ │ Topic-A │ │ Topic-A │ │
│ │ Part-1 │ │ Part-2 │ │ Part-0 │ │
│ │ (replica)│ │ (replica)│ │ (replica)│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Metadata: ZooKeeper (legacy) or KRaft (Kafka 3.5+) │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Core Concepts:
| Concept | Description |
|---|---|
| Topic | Named feed of messages, similar to a database table |
| Partition | Ordered, immutable sequence of records within a topic |
| Offset | Sequential ID for each record within a partition |
| Broker | A Kafka server that stores data and serves clients |
| Producer | Client that publishes records to topics |
| Consumer | Client that reads records from topics |
| Consumer Group | Set of consumers that cooperatively consume from topics |
| Replication | Each partition is replicated across multiple brokers for durability |
| ISR | In-Sync Replicas: replicas that are caught up with the leader |
3.2 Producer Configuration and Best Practices
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Reliability settings
config.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all ISR replicas
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Performance tuning
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB batch
config.put(ProducerConfig.LINGER_MS_CONFIG, 20); // Wait 20ms for batching
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Sending messages with different patterns:
@Service
@RequiredArgsConstructor
public class OrderEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
// Fire-and-forget (highest throughput)
public void sendFireAndForget(OrderEvent event) {
kafkaTemplate.send("orders", event.getOrderId(), event);
}
// Synchronous send (highest reliability)
public void sendSync(OrderEvent event) throws Exception {
kafkaTemplate.send("orders", event.getOrderId(), event)
.get(10, TimeUnit.SECONDS);
}
// Asynchronous send with callback (balanced)
public CompletableFuture<SendResult<String, Object>> sendAsync(OrderEvent event) {
return kafkaTemplate.send("orders", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to send order event: {}", event.getOrderId(), ex);
} else {
log.info("Order event sent to partition {} offset {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
// Send with headers
public void sendWithHeaders(OrderEvent event) {
ProducerRecord<String, Object> record = new ProducerRecord<>("orders",
null, null, event.getOrderId(), event);
record.headers()
.add("event-type", "ORDER_CREATED".getBytes())
.add("source", "order-service".getBytes())
.add("correlation-id", UUID.randomUUID().toString().getBytes());
kafkaTemplate.send(record);
}
}
3.3 Consumer Configuration and Patterns
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processing-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// Offset management
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit
// Performance tuning
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); // 1MB
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(1000L, 3)
));
return factory;
}
}
Consumer listener patterns:
@Component
@RequiredArgsConstructor
public class OrderEventConsumer {
private final OrderService orderService;
// Single record processing
@KafkaListener(topics = "orders", groupId = "order-processing-group")
public void processOrder(ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
log.info("Processing order: {} from partition: {} offset: {}",
record.value().getOrderId(),
record.partition(),
record.offset());
orderService.process(record.value());
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process order: {}", record.value().getOrderId(), e);
// Don't acknowledge — will be retried by error handler
}
}
// Batch processing
@KafkaListener(topics = "analytics-events", groupId = "analytics-batch-group",
containerFactory = "batchKafkaListenerContainerFactory")
public void processBatch(List<ConsumerRecord<String, AnalyticsEvent>> records,
Acknowledgment ack) {
log.info("Processing batch of {} records", records.size());
try {
List<AnalyticsEvent> events = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
analyticsService.bulkInsert(events);
ack.acknowledge();
} catch (Exception e) {
log.error("Batch processing failed", e);
}
}
// Filtering with headers
@KafkaListener(topics = "orders",
filter = "orderCreatedFilter",
groupId = "notification-group")
public void handleOrderCreated(OrderEvent event) {
notificationService.sendOrderConfirmation(event);
}
}
3.4 Exactly-Once Semantics (EOS)
Kafka supports exactly-once semantics through the combination of idempotent producers and transactional APIs.
@Configuration
public class KafkaTransactionalConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
}
@Service
@RequiredArgsConstructor
public class TransactionalOrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional("kafkaTransactionManager")
public void processOrderTransactionally(OrderEvent order) {
// All messages in this method are sent atomically
kafkaTemplate.send("orders-processed", order.getOrderId(), order);
kafkaTemplate.send("inventory-updates", order.getProductId(),
new InventoryEvent(order.getProductId(), -order.getQuantity()));
kafkaTemplate.send("payment-requests", order.getOrderId(),
new PaymentEvent(order.getOrderId(), order.getTotalAmount()));
// If any send fails, all are rolled back
}
}
Three levels of delivery guarantees:
| Guarantee | Configuration | Use Case |
|---|---|---|
| At-most-once | acks=0, auto-commit | Metrics, logging |
| At-least-once | acks=all, manual commit, idempotent consumer | Most applications |
| Exactly-once | Transactional API + read-committed isolation | Financial, billing, inventory |
3.5 Kafka Streams
Kafka Streams is a client library for building applications and microservices where the input and output data are stored in Kafka.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
config.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
return new KafkaStreamsConfiguration(config);
}
}
@Component
public class OrderAnalyticsTopology {
@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {
// Real-time order analytics pipeline
KStream<String, OrderEvent> orders = streamsBuilder
.stream("orders", Consumed.with(Serdes.String(), orderEventSerde()));
// 1. Order count per product in tumbling windows
KTable<Windowed<String>, Long> productCounts = orders
.groupBy((key, order) -> order.getProductId(),
Grouped.with(Serdes.String(), orderEventSerde()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("product-counts-store"));
// 2. Revenue aggregation per category
KTable<String, Double> revenueByCategory = orders
.groupBy((key, order) -> order.getCategory(),
Grouped.with(Serdes.String(), orderEventSerde()))
.aggregate(
() -> 0.0,
(key, order, total) -> total + order.getTotalAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// 3. Join with customer data for enrichment
KTable<String, CustomerProfile> customers = streamsBuilder
.table("customer-profiles",
Consumed.with(Serdes.String(), customerProfileSerde()));
KStream<String, EnrichedOrder> enrichedOrders = orders
.selectKey((key, order) -> order.getCustomerId())
.join(customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde()));
enrichedOrders.to("enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde()));
// 4. Fraud detection with session windows
orders
.filter((key, order) -> order.getTotalAmount() > 1000)
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
.count()
.toStream()
.filter((windowedKey, count) -> count >= 3)
.mapValues((key, count) -> "ALERT: " + count + " high-value orders in session")
.to("fraud-alerts");
}
}
3.6 Kafka Connect
Kafka Connect is a framework for connecting Kafka with external systems like databases, key-value stores, search indexes, and file systems.
{
"name": "postgresql-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "password",
"database.dbname": "orders_db",
"database.server.name": "order-server",
"table.include.list": "public.orders,public.order_items",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"topic.prefix": "cdc",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "cdc-$3"
}
}
Elasticsearch sink connector:
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "enriched-orders",
"type.name": "_doc",
"key.ignore": false,
"schema.ignore": true,
"behavior.on.null.values": "delete",
"write.method": "upsert",
"batch.size": 200,
"max.buffered.records": 5000,
"flush.timeout.ms": 10000
}
}
3.7 Schema Registry
Schema Registry provides a centralized repository for message schemas, enabling schema evolution and compatibility checking.
// Producer with Avro schema
@Configuration
public class AvroProducerConfig {
@Bean
public ProducerFactory<String, GenericRecord> avroProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
config.put("schema.registry.url", "http://schema-registry:8081");
config.put("auto.register.schemas", true);
return new DefaultKafkaProducerFactory<>(config);
}
}
Schema evolution compatibility modes:
| Mode | Allowed Changes | Best For |
|---|---|---|
| BACKWARD | Delete fields, add optional fields | Consumer-first upgrades |
| FORWARD | Add fields, delete optional fields | Producer-first upgrades |
| FULL | Add/delete optional fields only | Independent upgrades |
| NONE | Any change allowed | Development only |
3.8 Kafka Performance Tuning Cheat Sheet
| Parameter | Default | Recommended | Impact |
|---|---|---|---|
| num.partitions | 1 | 6-12 per topic | Parallelism |
| replication.factor | 1 | 3 | Durability |
| min.insync.replicas | 1 | 2 | Write durability |
| batch.size (producer) | 16384 | 32768-65536 | Throughput vs latency |
| linger.ms (producer) | 0 | 5-100 | Batching efficiency |
| compression.type | none | zstd or lz4 | Network/disk savings |
| fetch.min.bytes (consumer) | 1 | 1024-16384 | Throughput vs latency |
| max.poll.records (consumer) | 500 | 100-1000 | Processing batch size |
| segment.bytes | 1073741824 | 536870912 | Log compaction frequency |
| retention.ms | 604800000 | Based on use case | Storage vs replay capability |
4. ZooKeeper to KRaft: The Great Migration
4.1 ZooKeeper's Role in Kafka
ZooKeeper has been Kafka's metadata management system since the beginning. It handles:
- Broker registration: tracking which brokers are alive
- Topic metadata: partition assignments, ISR lists, configurations
- Controller election: choosing which broker acts as the cluster controller
- Consumer group coordination (legacy): storing consumer offsets (moved to Kafka itself in 0.9+)
- ACLs: access control lists for authorization
4.2 Why ZooKeeper Had to Go
| Problem | Description |
|---|---|
| Operational complexity | Two distinct distributed systems to deploy, monitor, and maintain |
| Scaling bottleneck | ZooKeeper stores all metadata in memory; limits cluster to about 200K partitions |
| Split-brain risks | Network partitions can cause ZooKeeper and Kafka to disagree |
| Recovery time | Controller failover requires reading all metadata from ZooKeeper (minutes) |
| Expertise requirement | Teams need deep knowledge of both Kafka and ZooKeeper |
4.3 KRaft: Kafka's Built-in Consensus
KRaft (Kafka Raft) replaces ZooKeeper with a built-in Raft-based consensus protocol. Metadata is stored in an internal Kafka topic (__cluster_metadata), and a set of controller nodes manage cluster state.
ZooKeeper Mode: KRaft Mode:
┌────────┐ ┌────────────┐ ┌──────────────────────┐
│ Broker │───▶│ ZooKeeper │ │ Controller (Raft) │
│ Broker │───▶│ Ensemble │ │ Node 1 (leader) │
│ Broker │───▶│ (3-5 nodes)│ │ Node 2 (follower) │
└────────┘ └────────────┘ │ Node 3 (follower) │
└──────────┬───────────┘
│
┌──────────▼───────────┐
│ Brokers │
│ Broker 1 │
│ Broker 2 │
│ Broker 3 │
└──────────────────────┘
KRaft advantages:
| Feature | ZooKeeper Mode | KRaft Mode |
|---|---|---|
| Max partitions | ~200,000 | Millions |
| Controller failover | Minutes | Seconds |
| Metadata propagation | Asynchronous (laggy) | Event-driven (immediate) |
| Deployment | Two systems | One system |
| Operational overhead | High | Low |
| Shutdown time | Minutes | Seconds |
4.4 KIP-500: The Roadmap
KIP-500 ("Replace ZooKeeper with a Self-Managed Metadata Quorum") defined the multi-year migration plan:
| Kafka Version | Milestone | Year |
|---|---|---|
| 2.8 | Early access KRaft (development only) | 2021 |
| 3.3 | KRaft marked production-ready | 2022 |
| 3.5 | ZooKeeper migration tool available | 2023 |
| 3.7 | Bridge release (dual support) | 2024 |
| 4.0 | ZooKeeper support removed entirely | 2025 |
4.5 Migration Guide: ZooKeeper to KRaft
Step 1: Pre-migration checklist
# Verify Kafka version (must be 3.5+)
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | head -1
# Check current ZooKeeper metadata
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--cluster-id $(kafka-storage.sh random-uuid)
# Verify no deprecated features in use
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers
Step 2: Configure KRaft controllers
# kraft/server.properties for controller nodes
process.roles=controller
node.id=1
controller.quorum.voters=1@controller1:9093,2@controller2:9093,3@controller3:9093
controller.listener.names=CONTROLLER
listeners=CONTROLLER://:9093
log.dirs=/var/kraft-controller-logs
Step 3: Run the migration
# Format controller storage
kafka-storage.sh format \
--config kraft/server.properties \
--cluster-id $(kafka-storage.sh random-uuid) \
--release-version 3.7
# Start migration (ZooKeeper mode -> dual mode)
kafka-metadata.sh --migrate \
--zookeeper-connect zk1:2181,zk2:2181,zk3:2181 \
--bootstrap-server kafka1:9092
# Verify migration status
kafka-metadata.sh --status \
--bootstrap-server kafka1:9092
# Finalize (point of no return)
kafka-metadata.sh --finalize \
--bootstrap-server kafka1:9092
Step 4: Post-migration validation
# Verify all topics are accessible
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Verify consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Check controller status
kafka-metadata.sh --controllers \
--bootstrap-server localhost:9092
4.6 ZAB vs Raft: Protocol Comparison
| Aspect | ZAB (ZooKeeper) | Raft (KRaft) |
|---|---|---|
| Leader election | Epoch-based | Term-based |
| Log replication | Two-phase commit | Append entries |
| Consistency | Sequential consistency | Linearizable reads (optional) |
| Membership | Static config | Dynamic reconfiguration |
| Snapshot | Fuzzy snapshots | Consistent snapshots |
| Complexity | High | Moderate (by design) |
| Implementation | Separate system | Integrated into Kafka |
5. Streaming API Comparison: SSE vs WebSocket vs gRPC Streaming
5.1 Server-Sent Events (SSE)
SSE is a simple, HTTP-based protocol for server-to-client push. The server sends events over a long-lived HTTP connection using the text/event-stream content type.
WebFlux SSE endpoint:
@RestController
@RequestMapping("/api/stream")
public class SSEController {
private final StockPriceService stockPriceService;
@GetMapping(value = "/prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamPrices(
@RequestParam List<String> symbols) {
return stockPriceService.getPriceStream(symbols)
.map(price -> ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(price.getTimestamp()))
.event("price-update")
.data(price)
.retry(Duration.ofSeconds(5))
.build());
}
@GetMapping(value = "/heartbeat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> heartbeat() {
return Flux.interval(Duration.ofSeconds(30))
.map(seq -> ServerSentEvent.<String>builder()
.event("heartbeat")
.data("ping")
.build());
}
}
JavaScript client:
const eventSource = new EventSource('/api/stream/prices?symbols=AAPL,GOOG,MSFT')
eventSource.addEventListener('price-update', (event) => {
const price = JSON.parse(event.data)
updateUI(price)
})
eventSource.addEventListener('heartbeat', () => {
console.log('Connection alive')
})
eventSource.onerror = (error) => {
console.error('SSE error:', error)
// Browser automatically reconnects
}
5.2 WebSocket
WebSocket provides full-duplex communication over a single TCP connection. Both client and server can send messages at any time.
WebFlux WebSocket handler:
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketMapping(StockWebSocketHandler handler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/stocks", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Component
@RequiredArgsConstructor
public class StockWebSocketHandler implements WebSocketHandler {
private final StockPriceService stockPriceService;
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(WebSocketSession session) {
// Incoming messages (subscriptions)
Flux<String> incoming = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> log.info("Received subscription: {}", msg));
// Outgoing price updates
Flux<WebSocketMessage> outgoing = incoming
.flatMap(msg -> {
SubscriptionRequest req = parseRequest(msg);
return stockPriceService.getPriceStream(req.getSymbols());
})
.map(price -> {
try {
String json = objectMapper.writeValueAsString(price);
return session.textMessage(json);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return session.send(outgoing);
}
}
JavaScript client:
const ws = new WebSocket('ws://localhost:8080/ws/stocks')
ws.onopen = () => {
ws.send(
JSON.stringify({
action: 'subscribe',
symbols: ['AAPL', 'GOOG', 'MSFT'],
})
)
}
ws.onmessage = (event) => {
const price = JSON.parse(event.data)
updateUI(price)
}
ws.onclose = (event) => {
console.log('WebSocket closed:', event.code, event.reason)
// Manual reconnection needed
setTimeout(reconnect, 3000)
}
5.3 gRPC Streaming
gRPC supports four communication patterns: unary, server streaming, client streaming, and bidirectional streaming.
Proto definition:
syntax = "proto3";
package stocks;
service StockService {
// Unary
rpc GetPrice (PriceRequest) returns (StockPrice);
// Server streaming
rpc StreamPrices (PriceStreamRequest) returns (stream StockPrice);
// Client streaming
rpc RecordTrades (stream TradeRecord) returns (TradeSummary);
// Bidirectional streaming
rpc LiveTrading (stream TradeOrder) returns (stream TradeConfirmation);
}
message PriceStreamRequest {
repeated string symbols = 1;
int32 interval_ms = 2;
}
message StockPrice {
string symbol = 1;
double price = 2;
double change = 3;
int64 timestamp = 4;
int64 volume = 5;
}
Server-side implementation (Spring Boot + grpc-spring-boot-starter):
@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {
private final StockPriceService stockPriceService;
@Override
public void streamPrices(PriceStreamRequest request,
StreamObserver<StockPrice> responseObserver) {
stockPriceService.getPriceStream(request.getSymbolsList())
.subscribe(
price -> responseObserver.onNext(toProto(price)),
error -> responseObserver.onError(
Status.INTERNAL.withDescription(error.getMessage()).asException()),
responseObserver::onCompleted
);
}
@Override
public StreamObserver<TradeOrder> liveTrading(
StreamObserver<TradeConfirmation> responseObserver) {
return new StreamObserver<TradeOrder>() {
@Override
public void onNext(TradeOrder order) {
TradeConfirmation confirmation = executeTrade(order);
responseObserver.onNext(confirmation);
}
@Override
public void onError(Throwable t) {
log.error("Client error in live trading", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
5.4 Comparison Table
| Feature | SSE | WebSocket | gRPC Streaming |
|---|---|---|---|
| Protocol | HTTP/1.1 | WS (TCP) | HTTP/2 |
| Direction | Server to client | Bidirectional | All 4 patterns |
| Data Format | Text (UTF-8) | Text or Binary | Protobuf (binary) |
| Auto-reconnect | Built-in (browser) | Manual | Manual |
| Backpressure | Limited | None (native) | Built-in (flow control) |
| Browser Support | All modern browsers | All modern browsers | grpc-web (limited) |
| Firewall Friendly | Yes (standard HTTP) | Sometimes blocked | Requires HTTP/2 |
| Multiplexing | No | No | Yes (HTTP/2) |
| Max Connections | 6 per domain (HTTP/1) | No limit | Multiplexed |
| Compression | Standard HTTP | Per-message (optional) | Built-in |
| Latency | Low | Lowest | Low |
| Throughput | Moderate | High | Highest |
| Serialization | JSON (text) | JSON or custom binary | Protobuf (efficient) |
| Use Case | Notifications, feeds | Chat, gaming, collab | Microservices, IoT |
5.5 When to Choose What
- SSE: Dashboard updates, news feeds, stock tickers (read-only), notification streams. Simple to implement, works through proxies, auto-reconnects.
- WebSocket: Chat applications, collaborative editing, multiplayer games. Need bidirectional communication with low latency.
- gRPC Streaming: Microservice-to-microservice communication, IoT data ingestion, high-throughput internal APIs. Need schema enforcement, efficient serialization, and HTTP/2 features.
6. WebFlux + Kafka Integration
6.1 Reactor Kafka
Reactor Kafka provides a reactive API for Kafka, integrating seamlessly with Spring WebFlux.
@Configuration
public class ReactorKafkaConfig {
@Bean
public ReceiverOptions<String, String> receiverOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton("events"))
.addAssignListener(partitions ->
log.info("Assigned: {}", partitions))
.addRevokeListener(partitions ->
log.info("Revoked: {}", partitions))
.commitInterval(Duration.ofSeconds(5))
.commitBatchSize(100);
}
@Bean
public SenderOptions<String, String> senderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return SenderOptions.create(props);
}
@Bean
public KafkaReceiver<String, String> kafkaReceiver() {
return KafkaReceiver.create(receiverOptions());
}
@Bean
public KafkaSender<String, String> kafkaSender() {
return KafkaSender.create(senderOptions());
}
}
Reactive consumer with backpressure:
@Service
@RequiredArgsConstructor
public class ReactiveEventProcessor {
private final KafkaReceiver<String, String> kafkaReceiver;
private final EventService eventService;
@PostConstruct
public void startConsuming() {
kafkaReceiver.receive()
.groupBy(record -> record.receiverOffset().topicPartition())
.flatMap(partitionFlux -> partitionFlux
.publishOn(Schedulers.boundedElastic())
.concatMap(record -> processRecord(record)
.doOnSuccess(v -> record.receiverOffset().acknowledge())
.onErrorResume(e -> {
log.error("Error processing record: {}", record.key(), e);
record.receiverOffset().acknowledge();
return Mono.empty();
})
))
.subscribe();
}
private Mono<Void> processRecord(ReceiverRecord<String, String> record) {
return eventService.process(record.value())
.doOnNext(result ->
log.debug("Processed: partition={} offset={} key={}",
record.partition(), record.offset(), record.key()));
}
}
Reactive producer:
@Service
@RequiredArgsConstructor
public class ReactiveEventProducer {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
public Mono<Void> sendEvent(String topic, String key, Object event) {
return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
.flatMap(json -> {
SenderRecord<String, String, String> record = SenderRecord.create(
new ProducerRecord<>(topic, key, json), key);
return kafkaSender.send(Mono.just(record))
.next()
.doOnNext(result -> log.info("Sent to {}-{} offset {}",
result.recordMetadata().topic(),
result.recordMetadata().partition(),
result.recordMetadata().offset()))
.then();
});
}
public Flux<SenderResult<String>> sendBatch(String topic, List<Event> events) {
Flux<SenderRecord<String, String, String>> records = Flux.fromIterable(events)
.map(event -> {
String json = objectMapper.writeValueAsString(event);
return SenderRecord.create(
new ProducerRecord<>(topic, event.getId(), json), event.getId());
});
return kafkaSender.send(records)
.doOnError(e -> log.error("Batch send failed", e));
}
}
6.2 CQRS + Event Sourcing with WebFlux and Kafka
CQRS (Command Query Responsibility Segregation) separates read and write models. Combined with Event Sourcing, every state change is stored as an immutable event.
┌─────────────────────────────────────────────────────────┐
│ CQRS + Event Sourcing │
│ │
│ Command Side Query Side │
│ ┌──────────┐ ┌──────────┐ │
│ │ WebFlux │ │ WebFlux │ │
│ │ Command │ │ Query │ │
│ │ Handler │ │ Handler │ │
│ └────┬─────┘ └────▲─────┘ │
│ │ │ │
│ ┌────▼─────┐ ┌────┴─────┐ │
│ │ Domain │ │ Read │ │
│ │ Aggregate│ │ Model │ │
│ └────┬─────┘ │ (R2DBC) │ │
│ │ └────▲─────┘ │
│ ┌────▼─────────────────────────────┐ │ │
│ │ Kafka (Event Store) ├─┘ │
│ │ orders.events topic │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Command handler:
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderCommandController {
private final OrderCommandService commandService;
@PostMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public Mono<OrderId> createOrder(@RequestBody Mono<CreateOrderCommand> command) {
return command.flatMap(commandService::handle);
}
@PostMapping("/{orderId}/confirm")
public Mono<ResponseEntity<Void>> confirmOrder(@PathVariable String orderId) {
return commandService.handle(new ConfirmOrderCommand(orderId))
.then(Mono.just(ResponseEntity.accepted().build()));
}
}
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
public Mono<OrderId> handle(CreateOrderCommand command) {
// Validate command
OrderAggregate aggregate = OrderAggregate.create(command);
List<DomainEvent> events = aggregate.getUncommittedEvents();
// Publish events to Kafka
return Flux.fromIterable(events)
.flatMap(event -> publishEvent("order-events", aggregate.getId(), event))
.then(Mono.just(new OrderId(aggregate.getId())));
}
private Mono<Void> publishEvent(String topic, String key, DomainEvent event) {
return Mono.fromCallable(() -> objectMapper.writeValueAsString(event))
.flatMap(json -> {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, json);
record.headers().add("event-type", event.getType().getBytes());
return kafkaSender.send(Mono.just(SenderRecord.create(record, key)))
.next()
.then();
});
}
}
Event projection (read-side updater):
@Service
@RequiredArgsConstructor
public class OrderProjectionService {
private final KafkaReceiver<String, String> kafkaReceiver;
private final OrderReadRepository readRepository;
private final ObjectMapper objectMapper;
@PostConstruct
public void startProjection() {
kafkaReceiver.receive()
.concatMap(record -> {
String eventType = new String(
record.headers().lastHeader("event-type").value());
return processEvent(eventType, record.value())
.doOnSuccess(v -> record.receiverOffset().acknowledge());
})
.subscribe();
}
private Mono<Void> processEvent(String eventType, String payload) {
return switch (eventType) {
case "OrderCreated" -> {
OrderCreatedEvent event = objectMapper.readValue(payload, OrderCreatedEvent.class);
yield readRepository.save(OrderReadModel.from(event)).then();
}
case "OrderConfirmed" -> {
OrderConfirmedEvent event = objectMapper.readValue(payload, OrderConfirmedEvent.class);
yield readRepository.findById(event.getOrderId())
.map(model -> model.withStatus("CONFIRMED"))
.flatMap(readRepository::save)
.then();
}
default -> Mono.empty();
};
}
}
7. Real-World Project: Real-Time Stock Ticker System
7.1 System Architecture
┌──────────────────────────────────────────────────────────────────┐
│ Real-Time Stock Ticker System │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ Market Data │ │ Kafka │ │ WebFlux │ │
│ │ Providers │────▶│ Cluster │────▶│ API Gateway │ │
│ │ (WebSocket) │ │ │ │ │ │
│ └─────────────┘ │ raw-prices │ │ SSE /prices │ │
│ │ processed │ │ WS /ws/trade │ │
│ ┌─────────────┐ │ alerts │ │ gRPC /internal │ │
│ │ News Feed │────▶│ │ └───────┬─────────┘ │
│ │ (REST API) │ └──────┬───────┘ │ │
│ └─────────────┘ │ ┌──────▼─────────┐ │
│ ┌──────▼───────┐ │ Clients │ │
│ │ Kafka Streams│ │ Web Dashboard │ │
│ │ Processing │ │ Mobile App │ │
│ │ │ │ Trading Bot │ │
│ │ - VWAP calc │ └────────────────┘ │
│ │ - Anomaly │ │
│ │ - Aggregate │ ┌────────────────┐ │
│ └──────┬───────┘ │ PostgreSQL │ │
│ │ │ (R2DBC) │ │
│ └─────────────▶│ + Redis │ │
│ └────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
7.2 Market Data Ingestion Service
@Service
@RequiredArgsConstructor
@Slf4j
public class MarketDataIngestionService {
private final KafkaSender<String, String> kafkaSender;
private final ObjectMapper objectMapper;
private final WebSocketClient webSocketClient;
@PostConstruct
public void connect() {
URI uri = URI.create("wss://market-data-provider.example.com/stream");
webSocketClient.execute(uri, session -> {
// Subscribe to symbols
Mono<Void> subscribe = session.send(
Mono.just(session.textMessage(
"{\"action\":\"subscribe\",\"symbols\":[\"AAPL\",\"GOOG\",\"MSFT\",\"AMZN\"]}"
))
);
// Process incoming market data
Flux<Void> receive = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(this::publishToKafka)
.doOnError(e -> log.error("Market data stream error", e));
return subscribe.thenMany(receive).then();
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofMinutes(5)))
.subscribe();
}
private Mono<Void> publishToKafka(String rawData) {
try {
MarketTick tick = objectMapper.readValue(rawData, MarketTick.class);
ProducerRecord<String, String> record = new ProducerRecord<>(
"raw-prices", tick.getSymbol(), rawData);
record.headers()
.add("source", "market-data-provider".getBytes())
.add("timestamp", String.valueOf(tick.getTimestamp()).getBytes());
return kafkaSender.send(Mono.just(SenderRecord.create(record, tick.getSymbol())))
.next()
.then();
} catch (Exception e) {
log.error("Failed to parse market data: {}", rawData, e);
return Mono.empty();
}
}
}
7.3 Price Processing with Kafka Streams
@Component
public class PriceProcessingTopology {
@Autowired
public void buildPipeline(StreamsBuilder builder) {
KStream<String, String> rawPrices = builder.stream("raw-prices");
// 1. Parse and validate
KStream<String, StockPrice> validPrices = rawPrices
.mapValues(this::parsePrice)
.filter((symbol, price) -> price != null && price.getPrice() > 0);
// 2. Calculate VWAP (Volume Weighted Average Price) per 1-minute window
KTable<Windowed<String>, VWAPResult> vwap = validPrices
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(
VWAPAccumulator::new,
(symbol, price, acc) -> acc.add(price.getPrice(), price.getVolume()),
Materialized.<String, VWAPAccumulator, WindowStore<Bytes, byte[]>>
as("vwap-store")
.withValueSerde(vwapAccumulatorSerde())
)
.mapValues(VWAPAccumulator::result);
// 3. Anomaly detection (price change > 5% in 30 seconds)
validPrices
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)))
.aggregate(
PriceRange::new,
(symbol, price, range) -> range.update(price.getPrice()),
Materialized.with(Serdes.String(), priceRangeSerde())
)
.toStream()
.filter((key, range) -> range.percentChange() > 5.0)
.mapValues((key, range) -> new PriceAlert(
key.key(), range.percentChange(), "ANOMALY"))
.to("alerts", Produced.with(windowedSerde(), priceAlertSerde()));
// 4. Output processed prices
validPrices
.mapValues(price -> new ProcessedPrice(
price.getSymbol(), price.getPrice(), price.getVolume(),
price.getTimestamp(), System.currentTimeMillis()))
.to("processed-prices", Produced.with(Serdes.String(), processedPriceSerde()));
}
}
7.4 SSE Streaming Endpoint for Web Clients
@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {
private final KafkaReceiver<String, String> processedPriceReceiver;
private final ObjectMapper objectMapper;
private final Sinks.Many<ProcessedPrice> priceSink;
@PostConstruct
public void init() {
// Bridge Kafka to reactive sink
processedPriceReceiver.receive()
.map(record -> {
record.receiverOffset().acknowledge();
try {
return objectMapper.readValue(record.value(), ProcessedPrice.class);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull)
.subscribe(price -> priceSink.tryEmitNext(price));
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ProcessedPrice>> streamPrices(
@RequestParam(required = false) List<String> symbols) {
Flux<ProcessedPrice> priceFlux = priceSink.asFlux();
if (symbols != null && !symbols.isEmpty()) {
Set<String> symbolSet = new HashSet<>(symbols);
priceFlux = priceFlux.filter(p -> symbolSet.contains(p.getSymbol()));
}
return priceFlux
.map(price -> ServerSentEvent.<ProcessedPrice>builder()
.id(price.getSymbol() + "-" + price.getTimestamp())
.event("price")
.data(price)
.build())
.doOnCancel(() -> log.info("Client disconnected from price stream"));
}
@GetMapping(value = "/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PriceAlert>> streamAlerts() {
return alertReceiver.receive()
.map(record -> {
record.receiverOffset().acknowledge();
PriceAlert alert = objectMapper.readValue(record.value(), PriceAlert.class);
return ServerSentEvent.<PriceAlert>builder()
.event("alert")
.data(alert)
.build();
});
}
}
7.5 Application Configuration
# application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/stockdb
username: stockapp
password: secret
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
consumer:
group-id: stock-ticker-service
auto-offset-reset: latest
enable-auto-commit: false
producer:
acks: all
compression-type: zstd
batch-size: 32768
properties:
linger.ms: 10
webflux:
base-path: /api
server:
port: 8080
netty:
connection-timeout: 5000
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
tags:
application: stock-ticker
7.6 Docker Compose for Local Development
version: '3.8'
services:
kafka-1:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
ports:
- '9092:9092'
kafka-2:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
kafka-3:
image: apache/kafka:3.7.0
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
postgres:
image: postgres:16
environment:
POSTGRES_DB: stockdb
POSTGRES_USER: stockapp
POSTGRES_PASSWORD: secret
ports:
- '5432:5432'
redis:
image: redis:7-alpine
ports:
- '6379:6379'
8. Decision Matrix: When to Use What
8.1 Technology Selection Guide
| Scenario | WebFlux | Kafka | SSE | WebSocket | gRPC |
|---|---|---|---|---|---|
| Real-time dashboard | Yes | Yes | Best | Good | Overkill |
| Chat application | Yes | Optional | No | Best | Good |
| IoT data pipeline | Yes | Best | No | Good | Best |
| E-commerce order processing | Optional | Best | No | No | Good |
| Live sports scores | Yes | Yes | Best | Good | No |
| Financial trading platform | Yes | Best | No | Best | Best |
| Microservice event bus | No | Best | No | No | Good |
| Server notifications | Yes | Optional | Best | Good | No |
| Video/audio streaming | No | No | No | Partial | Good |
| Log aggregation pipeline | No | Best | No | No | Good |
8.2 Decision Flowchart
Start: Do you need real-time data?
│
├── No → Traditional REST API (Spring MVC) is fine
│
└── Yes → Is it server-to-client only?
│
├── Yes → SSE (simplest, auto-reconnect)
│
└── No → Do you need bidirectional communication?
│
├── Yes → Is it browser-facing?
│ │
│ ├── Yes → WebSocket
│ │
│ └── No → gRPC Bidirectional Streaming
│
└── Do you need durable event storage?
│
├── Yes → Kafka + consumer of choice
│
└── No → Direct WebFlux streaming
8.3 Performance Comparison by Workload
| Workload | Spring MVC | WebFlux | WebFlux + Kafka | Notes |
|---|---|---|---|---|
| 100 concurrent users | 50ms avg | 48ms | 52ms | Negligible difference |
| 10K concurrent users | 2s avg | 120ms | 150ms | WebFlux shines |
| 100K concurrent users | Timeout | 250ms | 300ms | MVC cannot handle |
| 1M events/sec ingest | N/A | Limited | 1.2M/s | Kafka essential |
| Complex event proc. | N/A | Possible | 500K/s | Kafka Streams advantage |
9. Six-Month Learning Roadmap
Month 1-2: Reactive Foundations
| Week | Topic | Deliverable |
|---|---|---|
| 1 | Reactive Streams spec, Publisher/Subscriber | Implement custom Publisher |
| 2 | Project Reactor: Mono/Flux fundamentals | 20 operator exercises |
| 3 | Error handling, retry, timeout patterns | Resilient API client |
| 4 | Testing reactive code (StepVerifier) | 90% test coverage on exercises |
| 5-6 | Spring WebFlux: controllers, functional endpoints | CRUD REST API (WebFlux) |
| 7-8 | R2DBC, WebClient, reactive security | Full-stack reactive application |
Month 3-4: Kafka Mastery
| Week | Topic | Deliverable |
|---|---|---|
| 9-10 | Kafka fundamentals: topics, partitions, consumers | Local 3-broker cluster setup |
| 11-12 | Spring Kafka: producer/consumer patterns | Order processing system |
| 13-14 | Kafka Streams: stateless and stateful operations | Real-time analytics pipeline |
| 15-16 | Exactly-once, transactions, Schema Registry | Transactional event system |
Month 5: Integration and Streaming APIs
| Week | Topic | Deliverable |
|---|---|---|
| 17 | SSE with WebFlux | Live notification system |
| 18 | WebSocket with WebFlux | Chat application |
| 19 | gRPC Streaming with Spring Boot | Internal service communication |
| 20 | Reactor Kafka + CQRS pattern | Event-sourced microservice |
Month 6: Production Readiness
| Week | Topic | Deliverable |
|---|---|---|
| 21 | KRaft migration, cluster operations | KRaft cluster deployment |
| 22 | Monitoring: Micrometer, Prometheus, Grafana | Observability dashboard |
| 23 | Performance tuning, load testing | Benchmark report |
| 24 | Capstone: Real-time stock ticker system | Production-ready project |
10. Quiz
Q1: What is the main advantage of Spring WebFlux over Spring MVC for I/O-bound workloads?
Answer: WebFlux uses a non-blocking event loop model (Netty) that can handle thousands of concurrent connections with just a few threads. Spring MVC uses a thread-per-request model where each concurrent request occupies a thread. For I/O-bound workloads (where threads spend most time waiting for database, network, or file operations), WebFlux is dramatically more efficient because it does not waste threads on waiting. In benchmarks, WebFlux can handle 7-8x more throughput with 77% less memory for I/O-bound workloads.
Q2: Explain the difference between flatMap, concatMap, and switchMap in Project Reactor.
Answer:
flatMap: Maps each element to a Publisher and merges them eagerly. Inner publishers run concurrently and results may arrive out of order. Best for maximum throughput when order does not matter.concatMap: Maps each element to a Publisher and subscribes sequentially. Waits for each inner publisher to complete before subscribing to the next. Preserves order but lower throughput.switchMap: Maps each element to a Publisher, but cancels the previous inner publisher when a new element arrives. Only the latest subscription is active. Ideal for search-as-you-type where only the latest query result matters.
Q3: Why did Kafka replace ZooKeeper with KRaft? Name three specific technical benefits.
Answer:
- Partition scalability: ZooKeeper stored all metadata in memory, limiting clusters to around 200,000 partitions. KRaft uses a log-based metadata store, enabling millions of partitions.
- Faster failover: Controller failover in ZooKeeper mode required reading all metadata from ZooKeeper (minutes). KRaft failover takes seconds because the new controller already has metadata replicated via Raft.
- Simplified operations: With ZooKeeper, teams had to deploy, configure, monitor, and maintain two separate distributed systems. KRaft eliminates the ZooKeeper dependency entirely, reducing operational complexity.
Additional benefits include faster controlled shutdown, elimination of split-brain risks between Kafka and ZooKeeper, and event-driven metadata propagation (versus ZooKeeper's asynchronous, potentially laggy propagation).
Q4: When would you choose SSE over WebSocket for a real-time feature?
Answer: Choose SSE when:
- Communication is server-to-client only — stock tickers, news feeds, notifications, dashboards. No need for the client to send data back on the same connection.
- You need auto-reconnect — SSE has built-in browser reconnection. WebSocket requires manual reconnection logic.
- HTTP infrastructure compatibility — SSE works over standard HTTP/1.1, passing through proxies and load balancers without special configuration. WebSocket upgrades can be blocked by corporate firewalls.
- Simplicity — SSE is simpler to implement (just a GET endpoint returning text/event-stream). WebSocket requires connection management, heartbeats, and reconnection logic.
Choose WebSocket when you need bidirectional communication (chat, gaming, collaborative editing) or binary data transfer.
Q5: Design a system that uses WebFlux, Kafka, and SSE together. Describe the data flow from event generation to client display.
Answer: Consider a real-time order tracking system:
-
Event Generation: When an order status changes, the Order Service publishes an
OrderStatusChangedevent to a Kafka topic (order-events) using Reactor Kafka. The producer usesacks=allfor durability. -
Event Processing: A Kafka Streams application consumes from
order-events, enriches events with customer data (KTable join fromcustomer-profilestopic), and produces enriched events toorder-notificationstopic. -
WebFlux Bridge: A WebFlux service uses
KafkaReceiver(Reactor Kafka) to reactively consume fromorder-notifications. It routes events into aSinks.Many(multicast hot publisher), keyed by customer ID. -
SSE Endpoint: The WebFlux controller exposes an SSE endpoint
/api/orders/trackthat filters theSinks.Manyflux by the authenticated user's customer ID. Each matching event is sent as aServerSentEvent. -
Client: The browser creates an
EventSourcepointing to the SSE endpoint. On each event, the UI updates the order tracking visualization. If the connection drops, the browser auto-reconnects (SSE built-in).
Data flow: Order Service to Kafka to Kafka Streams to Kafka to Reactor Kafka to Sinks.Many to SSE to Browser.
References
- Reactive Manifesto
- Reactive Streams Specification
- Project Reactor Reference Guide
- Spring WebFlux Documentation
- Spring WebFlux vs MVC Performance
- R2DBC Specification
- Netty Project
- Apache Kafka Documentation
- Kafka: The Definitive Guide (2nd Edition)
- KIP-500: Replace ZooKeeper
- Kafka KRaft Documentation
- Kafka Streams Documentation
- Confluent Schema Registry
- Kafka Connect Documentation
- Reactor Kafka Reference Guide
- Server-Sent Events (MDN)
- WebSocket API (MDN)
- gRPC Documentation
- CQRS Pattern (Microsoft)
- Event Sourcing Pattern (Microsoft)
- Debezium CDC Documentation
- Spring Boot gRPC Starter
- Kafka Performance Tuning Guide (Confluent)
- ZooKeeper Documentation
- Raft Consensus Algorithm