Skip to content
Published on

WebFlux + Kafka + ZooKeeper 완전 정복: 리액티브 스트리밍 아키텍처의 모든 것

Authors

들어가며

현대 소프트웨어 시스템은 "실시간"을 요구합니다. 주식 시세는 밀리초 단위로 업데이트되고, 채팅 메시지는 즉시 전달되어야 하며, IoT 센서 데이터는 끊임없이 쏟아집니다. 전통적인 요청-응답(Request-Response) 모델로는 이러한 요구사항을 감당하기 어렵습니다.

이 글에서는 실시간 데이터 처리 아키텍처의 핵심 기술 세 가지를 깊이 있게 다룹니다.

  1. Spring WebFlux — 리액티브 웹 프레임워크로 높은 동시접속을 효율적으로 처리
  2. Apache Kafka — 분산 이벤트 스트리밍 플랫폼으로 대용량 데이터를 안정적으로 전달
  3. Streaming API — SSE, WebSocket, gRPC Streaming으로 클라이언트에 실시간 데이터 푸시

또한 Kafka의 역사적 전환점인 ZooKeeper에서 KRaft로의 마이그레이션도 상세히 살펴봅니다. 각 기술의 동작 원리부터 실전 코드 예제, 성능 벤치마크, 아키텍처 패턴까지 — 리액티브 스트리밍 아키텍처의 모든 것을 한 곳에 담았습니다.


1. 리액티브 프로그래밍이란? (기초부터)

Reactive Manifesto

리액티브 시스템은 2014년에 발표된 Reactive Manifesto에서 네 가지 핵심 속성을 정의합니다.

  • Responsive (응답성): 시스템은 가능한 한 적시에 응답해야 합니다. 응답성은 사용성과 유용성의 기초입니다.
  • Resilient (탄력성): 시스템은 장애에 직면하더라도 응답성을 유지합니다. 복제(replication), 격리(containment), 위임(delegation)으로 달성합니다.
  • Elastic (유연성): 시스템은 작업 부하의 변화에 따라 자원을 증감시키며 응답성을 유지합니다.
  • Message Driven (메시지 기반): 리액티브 시스템은 비동기 메시지 전달에 의존하여 컴포넌트 간 느슨한 결합, 격리, 위치 투명성을 보장합니다.
         ┌─────────────────────────────────┐
Responsive              (빠르고 일관된 응답)         └──────────┬──────────┬───────────┘
                    │          │
         ┌──────────▼──┐  ┌───▼──────────┐
Resilient  │  │   Elastic          (장애 복원력) (부하 적응력)         └──────────┬──┘  └───┬──────────┘
                    │          │
         ┌──────────▼──────────▼───────────┐
Message Driven             (비동기 메시지 기반 통신)         └─────────────────────────────────┘

명령형 vs 리액티브 패러다임 비교

명령형(Imperative) 프로그래밍은 "무엇을 어떻게 할지" 단계별로 지시합니다.

// 명령형: 블로킹 방식
List<User> users = userRepository.findAll();          // DB 호출 - 스레드 블로킹
List<User> activeUsers = new ArrayList<>();
for (User user : users) {
    if (user.isActive()) {
        activeUsers.add(user);
    }
}
List<String> names = new ArrayList<>();
for (User user : activeUsers) {
    names.add(user.getName());
}
return names;

리액티브(Reactive) 프로그래밍은 "데이터 흐름과 변화 전파"를 선언적으로 표현합니다.

// 리액티브: 논블로킹 방식
return userRepository.findAll()       // Flux<User> 반환 - 논블로킹
    .filter(User::isActive)           // 활성 사용자 필터링
    .map(User::getName)               // 이름 추출
    .collectList();                   // Mono<List<String>> 반환

핵심 차이는 다음과 같습니다.

항목명령형리액티브
실행 모델동기/블로킹비동기/논블로킹
스레드 사용요청당 하나의 스레드소수의 이벤트 루프 스레드
데이터 처리Pull 기반 (소비자가 요청)Push 기반 (생산자가 전달)
에러 처리try-catchonError 시그널
백프레셔없음 (또는 수동 구현)내장 지원
적합한 상황낮은 동시접속, 간단한 로직높은 동시접속, I/O 집약적

Reactive Streams 스펙

Reactive Streams는 JVM 상에서 논블로킹 백프레셔를 지원하는 비동기 스트림 처리의 표준입니다. Java 9의 java.util.concurrent.Flow에 포함되었으며, 핵심 인터페이스는 네 가지입니다.

// 1. Publisher: 데이터를 생산하는 주체
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

// 2. Subscriber: 데이터를 소비하는 주체
public interface Subscriber<T> {
    void onSubscribe(Subscription s);  // 구독 시작
    void onNext(T t);                  // 데이터 수신
    void onError(Throwable t);         // 에러 발생
    void onComplete();                 // 스트림 완료
}

// 3. Subscription: Publisher-Subscriber 간 연결 관리
public interface Subscription {
    void request(long n);              // n개의 데이터 요청 (백프레셔)
    void cancel();                     // 구독 취소
}

// 4. Processor: Publisher이면서 Subscriber (중간 처리)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

데이터 흐름의 시퀀스는 다음과 같습니다.

Publisher                     Subscriber
   │                              │
   │◄──── subscribe() ────────────│
   │                              │
   │───── onSubscribe(sub) ──────►│
   │                              │
   │◄──── sub.request(n) ─────────│  ← 백프레셔: n개만 요청
   │                              │
   │───── onNext(data1) ─────────►│
   │───── onNext(data2) ─────────►│
   │───── ...   │───── onNext(dataN) ─────────►│
   │                              │
   │◄──── sub.request(m) ─────────│  ← 추가 m개 요청
   │                              │
   │───── onComplete() ──────────►│  (또는 onError())

백프레셔(Backpressure)의 중요성과 동작 원리

백프레셔는 소비자가 처리할 수 있는 속도에 맞춰 생산자의 데이터 전송 속도를 조절하는 메커니즘입니다. 백프레셔가 없으면 다음과 같은 문제가 발생합니다.

  • 메모리 초과(OOM): 빠른 생산자가 느린 소비자의 버퍼를 가득 채움
  • 데이터 유실: 버퍼 오버플로우로 데이터가 드롭됨
  • 시스템 장애: 연쇄적인 장애 전파 (Cascading Failure)

백프레셔 전략에는 여러 가지가 있습니다.

// 1. Buffer: 일정 크기까지 버퍼링 후 오버플로우 시 에러
Flux.range(1, 1000)
    .onBackpressureBuffer(256)
    .subscribe(item -> processSlowly(item));

// 2. Drop: 처리 못하는 데이터는 버림
Flux.range(1, 1000)
    .onBackpressureDrop(dropped ->
        log.warn("Dropped: " + dropped))
    .subscribe(item -> processSlowly(item));

// 3. Latest: 최신 값만 유지
Flux.range(1, 1000)
    .onBackpressureLatest()
    .subscribe(item -> processSlowly(item));

// 4. Error: 처리 못하면 에러 시그널
Flux.range(1, 1000)
    .onBackpressureError()
    .subscribe(item -> processSlowly(item));

2. Spring WebFlux 딥다이브

WebFlux vs Spring MVC 아키텍처 비교

Spring MVC와 WebFlux는 근본적으로 다른 실행 모델을 사용합니다.

Spring MVC: Thread-per-Request 모델

Client Request ──► Tomcat Thread Pool (200 threads)
                   Thread-1 ──► Controller ──► Service ──► DB (블로킹 대기)
                   Thread-2 ──► Controller ──► Service ──► DB (블로킹 대기)
                   Thread-3 ──► Controller ──► Service ──► External API (블로킹 대기)
                   ...
                   Thread-200 ──► (모든 스레드 고갈 시 요청 대기열에 쌓임)
  • 기본 스레드 풀 크기: Tomcat 기준 200개
  • 각 요청이 하나의 스레드를 점유 (I/O 대기 중에도)
  • 동시접속 200개 초과 시 요청 대기 발생

Spring WebFlux: Event Loop 모델

Client Request ──► Netty Event Loop (CPU 코어 수 = N개 스레드)
                   EventLoop-1 ──► Handler1 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
                   EventLoop-2 ──► Handler2 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
                   ...
                   EventLoop-N ──► (I/O 완료 콜백) ──► 응답 전송
  • 이벤트 루프 스레드 수: CPU 코어 수 (예: 8코어 = 8스레드)
  • 요청이 스레드를 점유하지 않음 (I/O 대기 시 해제)
  • 적은 스레드로 수만 개의 동시접속 처리 가능

Netty 이벤트 루프 모델

Netty는 WebFlux의 기본 런타임 서버입니다. Netty의 핵심 구조를 살펴보겠습니다.

┌────────────────────────────────────────────────────┐
Netty Server│                                                    │
│  ┌──────────────┐    ┌──────────────────────────┐  │
│  │  Boss Group   │    │     Worker Group          │  │
  (1 thread)  (CPU cores x 2 threads) │  │
│  │              │    │                          │  │
│  │  Accept      │───►│  EventLoop-1: Channel A  │  │
│  │  connections │    │  EventLoop-2: Channel B  │  │
│  │              │    │  EventLoop-3: Channel C  │  │
│  │              │    │  ...                     │  │
│  └──────────────┘    └──────────────────────────┘  │
│                                                    │
Channel Pipeline:│  ┌─────────┬──────────┬───────────┬────────────┐   │
│  │ DecoderHandler1Handler2Encoder   │   │
│  └─────────┴──────────┴───────────┴────────────┘   │
└────────────────────────────────────────────────────┘
  • Boss Group: 클라이언트 연결을 수락하고 Worker Group에 위임
  • Worker Group: 실제 I/O 처리를 담당하는 이벤트 루프 스레드
  • Channel Pipeline: 인바운드/아웃바운드 핸들러 체인으로 데이터 처리

Project Reactor: Mono vs Flux

Project Reactor는 Spring WebFlux의 리액티브 라이브러리입니다. 핵심 타입 두 가지를 이해하는 것이 중요합니다.

Mono: 0개 또는 1개의 요소를 방출하는 비동기 시퀀스

// Mono 생성
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.empty();
Mono<String> mono3 = Mono.error(new RuntimeException("Error"));

// 실전: 사용자 단건 조회
@GetMapping("/users/me")
public Mono<UserResponse> getCurrentUser(Authentication auth) {
    return userRepository.findByEmail(auth.getName())  // Mono<User>
        .map(UserResponse::from)                        // Mono<UserResponse>
        .switchIfEmpty(Mono.error(new UserNotFoundException()));
}

Flux: 0개에서 N개의 요소를 방출하는 비동기 시퀀스

// Flux 생성
Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> flux2 = Flux.range(1, 100);
Flux<Long> flux3 = Flux.interval(Duration.ofSeconds(1));  // 무한 스트림

// 실전: 사용자 목록 조회
@GetMapping("/users")
public Flux<UserResponse> getActiveUsers() {
    return userRepository.findAll()             // Flux<User>
        .filter(User::isActive)                 // 활성 사용자만
        .map(UserResponse::from)                // DTO 변환
        .take(100);                             // 최대 100명
}

핵심 Operators 가이드

리액티브 프로그래밍의 핵심은 오퍼레이터 조합입니다. 가장 많이 사용하는 오퍼레이터들을 살펴봅니다.

// 1. map: 동기 변환 (1:1)
Flux.just("hello", "world")
    .map(String::toUpperCase)
    // 결과: "HELLO", "WORLD"

// 2. flatMap: 비동기 변환 (1:N, 순서 보장 없음)
Flux.just(1L, 2L, 3L)
    .flatMap(id -> userRepository.findById(id))  // 각 ID로 비동기 조회
    // 결과: 순서가 보장되지 않음 (User2, User1, User3 가능)

// 3. concatMap: 비동기 변환 (1:N, 순서 보장)
Flux.just(1L, 2L, 3L)
    .concatMap(id -> userRepository.findById(id))
    // 결과: 순서 보장 (User1, User2, User3)

// 4. filter: 조건 필터링
Flux.range(1, 10)
    .filter(n -> n % 2 == 0)
    // 결과: 2, 4, 6, 8, 10

// 5. zip: 여러 소스를 조합 (1:1 매칭)
Mono<User> user = userRepository.findById(userId);
Mono<List<Order>> orders = orderRepository.findByUserId(userId).collectList();
Mono.zip(user, orders)
    .map(tuple -> new UserWithOrders(tuple.getT1(), tuple.getT2()));

// 6. merge: 여러 소스를 병합 (인터리빙)
Flux.merge(
    hotNewsFlux,     // 실시간 뉴스
    stockPriceFlux,  // 주식 시세
    alertFlux        // 알림
)
.subscribe(event -> dashboard.update(event));

// 7. concat: 순차적 연결
Flux.concat(
    cacheRepository.findById(id),      // 캐시 먼저
    databaseRepository.findById(id)    // 캐시 미스 시 DB
)
.next();  // 첫 번째 결과만

// 8. switchIfEmpty: 빈 결과 처리
userRepository.findByEmail(email)
    .switchIfEmpty(Mono.defer(() -> createDefaultUser(email)));

// 9. retry + backoff: 재시도 로직
webClient.get()
    .uri("/external-api/data")
    .retrieve()
    .bodyToMono(ExternalData.class)
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
        .maxBackoff(Duration.ofSeconds(10))
        .filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable));

// 10. onErrorResume: 에러 복구
userService.getUserProfile(userId)
    .onErrorResume(UserNotFoundException.class,
        ex -> Mono.just(UserProfile.anonymous()))
    .onErrorResume(ServiceException.class,
        ex -> cachedProfileService.getCachedProfile(userId));

WebFlux + R2DBC (리액티브 DB 접근)

R2DBC(Reactive Relational Database Connectivity)는 관계형 데이터베이스에 대한 논블로킹 접근을 제공합니다.

// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
// implementation 'io.r2dbc:r2dbc-postgresql'

// Entity
@Table("users")
public record User(
    @Id Long id,
    String email,
    String name,
    boolean active,
    LocalDateTime createdAt
) {}

// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    Flux<User> findByActiveTrue();

    @Query("SELECT * FROM users WHERE email = :email")
    Mono<User> findByEmail(String email);

    @Query("SELECT * FROM users WHERE created_at > :since ORDER BY created_at DESC")
    Flux<User> findRecentUsers(LocalDateTime since);
}

// Service
@Service
@RequiredArgsConstructor
public class UserService {

    private final UserRepository userRepository;
    private final DatabaseClient databaseClient;

    public Flux<User> getActiveUsers() {
        return userRepository.findByActiveTrue();
    }

    // 복잡한 쿼리는 DatabaseClient 사용
    public Flux<UserStats> getUserStats() {
        return databaseClient
            .sql("""
                SELECT u.id, u.name, COUNT(o.id) as order_count
                FROM users u
                LEFT JOIN orders o ON u.id = o.user_id
                GROUP BY u.id, u.name
                HAVING COUNT(o.id) > 0
                """)
            .map(row -> new UserStats(
                row.get("id", Long.class),
                row.get("name", String.class),
                row.get("order_count", Long.class)
            ))
            .all();
    }
}

WebFlux + WebClient (리액티브 HTTP 클라이언트)

WebClient는 RestTemplate을 대체하는 논블로킹 HTTP 클라이언트입니다.

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .filter(ExchangeFilterFunctions.basicAuthentication("user", "password"))
            .codecs(configurer -> configurer
                .defaultCodecs()
                .maxInMemorySize(16 * 1024 * 1024))  // 16MB
            .build();
    }
}

@Service
@RequiredArgsConstructor
public class ExternalApiService {

    private final WebClient webClient;

    // GET 요청
    public Mono<ProductResponse> getProduct(String productId) {
        return webClient.get()
            .uri("/products/{id}", productId)
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError,
                response -> Mono.error(new ProductNotFoundException(productId)))
            .onStatus(HttpStatusCode::is5xxServerError,
                response -> Mono.error(new ExternalServiceException()))
            .bodyToMono(ProductResponse.class)
            .timeout(Duration.ofSeconds(5))
            .retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
    }

    // POST 요청
    public Mono<OrderResponse> createOrder(OrderRequest request) {
        return webClient.post()
            .uri("/orders")
            .bodyValue(request)
            .retrieve()
            .bodyToMono(OrderResponse.class);
    }

    // Streaming 응답
    public Flux<EventData> streamEvents() {
        return webClient.get()
            .uri("/events/stream")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(EventData.class);
    }
}

성능 벤치마크: WebFlux vs MVC

다양한 동시접속 수준에서의 벤치마크 결과입니다. (4코어 8GB, 100ms 지연 시뮬레이션)

┌──────────────────┬────────────┬────────────┬────────────┐
│ 동시접속 수       │ Spring MVCWebFlux    │ 차이       │
├──────────────────┼────────────┼────────────┼────────────┤
1,0008,500 rps  │ 9,200 rps  │ +8%10,0003,200 rps  │ 9,100 rps  │ +184%100,000          │ 타임아웃    │ 8,800 rps  │ MVC 실패   │
├──────────────────┼────────────┼────────────┼────────────┤
│ 메모리 사용      │ 2.1 GB0.8 GB-62%│ 스레드 수        │ 20416-92%P99 응답시간     │ 1,200ms    │ 180ms      │ -85% (동시접속 10K)   │            │            │            │
└──────────────────┴────────────┴────────────┴────────────┘

핵심 포인트:

  • 낮은 동시접속(1,000 이하): MVC와 WebFlux의 성능 차이가 크지 않음
  • 높은 동시접속(10,000 이상): WebFlux가 압도적으로 유리
  • 초고동시접속(100,000): MVC는 스레드 고갈로 사실상 작동 불가

주의점: 블로킹 코드 감지

WebFlux에서 가장 흔한 실수는 이벤트 루프 스레드에서 블로킹 코드를 실행하는 것입니다.

// 절대 금지: 이벤트 루프에서 블로킹 호출
@GetMapping("/bad-example")
public Mono<String> badExample() {
    // JDBC는 블로킹 — 이벤트 루프를 멈추게 함
    User user = jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class);
    return Mono.just(user.getName());
}

// 올바른 방법 1: R2DBC 사용
@GetMapping("/good-example-1")
public Mono<String> goodExample1() {
    return r2dbcUserRepository.findById(1L)
        .map(User::getName);
}

// 올바른 방법 2: 블로킹 코드를 별도 스케줄러에서 실행
@GetMapping("/good-example-2")
public Mono<String> goodExample2() {
    return Mono.fromCallable(() ->
            jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class))
        .subscribeOn(Schedulers.boundedElastic())  // 블로킹 전용 스레드풀
        .map(User::getName);
}

블로킹 코드를 자동으로 감지하려면 Blockhound를 사용할 수 있습니다.

// build.gradle
// testImplementation 'io.projectreactor.tools:blockhound:1.0.9.RELEASE'

// 테스트 설정
@BeforeAll
static void setUp() {
    BlockHound.install();
}

@Test
void shouldNotBlock() {
    // 블로킹 코드가 있으면 BlockingOperationError 발생
    StepVerifier.create(userService.getActiveUsers())
        .expectNextCount(5)
        .verifyComplete();
}

3. Apache Kafka 완전 가이드

Kafka 아키텍처 개요

Apache Kafka는 LinkedIn에서 개발한 분산 이벤트 스트리밍 플랫폼입니다. 초당 수백만 건의 이벤트를 안정적으로 처리할 수 있습니다.

┌─────────────────────────────────────────────────────────────┐
Kafka Cluster│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Broker 1   │  │  Broker 2   │  │  Broker 3   │         │
│  │             │  │             │  │             │         │
│  │ Topic-A P0  │  │ Topic-A P1  │  │ Topic-A P2Leader│  │ Topic-B P1R │  │ Topic-B P0  │  │ Topic-A P0RReplica│
│  │ Topic-A P2R │  │ Topic-A P0R │  │ Topic-B P1  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
P = Partition (Leader),  R = Replica (Follower)└─────────────────────────────────────────────────────────────┘
         ▲                                    │
         │                                    ▼
   ┌─────────────┐                   ┌─────────────────┐
Producers  │                   │  Consumer Group   │             │                   │                 │
App1, App2  │                   │ Consumer1P0   │             │                   │ Consumer2P1   │             │                   │ Consumer3P2   └─────────────┘                   └─────────────────┘

핵심 구성 요소:

  • Broker: Kafka 서버 인스턴스. 클러스터에 여러 브로커가 존재
  • Topic: 메시지를 구분하는 논리적 카테고리 (예: user-events, order-events)
  • Partition: 토픽을 분할하는 물리적 단위. 병렬 처리의 기본 단위
  • Replica: 파티션의 복제본. Leader와 Follower로 구분
  • ISR (In-Sync Replicas): Leader와 동기화된 Replica 집합

Producer: 메시지 전송의 모든 것

// Producer 설정
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // 성능 튜닝
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);        // 32KB 배치
        config.put(ProducerConfig.LINGER_MS_CONFIG, 20);            // 20ms 대기
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");  // 압축
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);  // 64MB 버퍼

        // 안정성
        config.put(ProducerConfig.ACKS_CONFIG, "all");              // 모든 ISR 확인
        config.put(ProducerConfig.RETRIES_CONFIG, 3);               // 재시도
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 보장

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Acknowledgment 모드 비교:

acks 값동작안정성성능
0전송 후 확인 안 함낮음 (데이터 유실 가능)최고
1Leader만 확인중간 (Leader 장애 시 유실 가능)높음
all (-1)모든 ISR 확인최고 (ISR 모두 확인)보통

Idempotent Producer: enable.idempotence=true 설정 시, 동일 메시지를 중복 전송하더라도 한 번만 기록합니다. Producer ID와 Sequence Number를 사용하여 중복을 감지합니다.

Consumer: 메시지 소비의 모든 것

// Consumer 설정
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service-group");

        // 오프셋 관리
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 수동 커밋

        // 성능 튜닝
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);      // 1KB 최소 패치
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);     // 500ms 대기
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);      // 최대 500건

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);  // 3개의 컨슈머 스레드
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

// Consumer 리스너
@Service
@Slf4j
public class OrderEventConsumer {

    @KafkaListener(
        topics = "order-events",
        groupId = "order-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderEvent(
            ConsumerRecord<String, OrderEvent> record,
            Acknowledgment ack) {
        try {
            log.info("Received: topic={}, partition={}, offset={}, key={}",
                record.topic(), record.partition(), record.offset(), record.key());

            OrderEvent event = record.value();
            processOrderEvent(event);

            ack.acknowledge();  // 수동 커밋
        } catch (Exception e) {
            log.error("Failed to process order event", e);
            // 에러 처리: DLQ(Dead Letter Queue)로 전송하거나 재시도
        }
    }
}

Consumer Group과 파티션 할당:

Topic: order-events (6 partitions)

Consumer Group: order-service-group
  Consumer-1: P0, P1  (2개 파티션 담당)
  Consumer-2: P2, P3  (2개 파티션 담당)
  Consumer-3: P4, P5  (2개 파티션 담당)

규칙: 하나의 파티션은 그룹 내 하나의 컨슈머만 소비 가능
     컨슈머 수 > 파티션 수이면 일부 컨슈머는 유휴 상태

리밸런싱(Rebalancing): 컨슈머가 그룹에 참가하거나 탈퇴하면 파티션 재할당이 발생합니다. 리밸런싱 중에는 모든 컨슈머가 일시적으로 소비를 중단합니다.

Exactly-Once Semantics (EOS)

Kafka의 전달 보증 수준은 세 가지입니다.

  • At-Most-Once: 메시지가 유실될 수 있지만 중복은 없음
  • At-Least-Once: 메시지가 중복될 수 있지만 유실은 없음
  • Exactly-Once: 메시지가 정확히 한 번만 처리됨

Exactly-Once를 달성하려면 Transactional Producer와 Consumer를 함께 사용합니다.

// Transactional Producer 설정
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-producer-1");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// 트랜잭션 사용 예제
@Service
@RequiredArgsConstructor
public class OrderProcessor {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void processOrder(Order order) {
        kafkaTemplate.executeInTransaction(ops -> {
            // 하나의 트랜잭션으로 묶임
            ops.send("order-confirmed", order.getId(), new OrderConfirmedEvent(order));
            ops.send("inventory-update", order.getProductId(), new InventoryUpdateEvent(order));
            ops.send("notification", order.getUserId(), new NotificationEvent(order));
            return true;
        });
        // 3개의 메시지가 모두 성공하거나 모두 실패
    }
}

Kafka Streams 개요

Kafka Streams는 Kafka 위에 구축된 스트림 처리 라이브러리입니다.

// KStream: 이벤트 스트림 (각 레코드가 독립적)
// KTable: 변경 로그 (키의 최신 값만 유지)

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    public KStream<String, OrderEvent> orderStream(StreamsBuilder builder) {
        KStream<String, OrderEvent> stream = builder.stream("order-events");

        // 1. 필터링 + 변환
        stream
            .filter((key, event) -> event.getAmount() > 10000)  // 1만원 이상
            .mapValues(event -> new HighValueOrderEvent(event))
            .to("high-value-orders");

        // 2. 윈도우 집계: 5분 간격 주문 금액 합계
        stream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .aggregate(
                () -> 0L,
                (key, event, total) -> total + event.getAmount(),
                Materialized.as("order-amount-store")
            )
            .toStream()
            .to("order-amount-per-5min");

        // 3. KStream-KTable Join
        KTable<String, UserProfile> userTable = builder.table("user-profiles");
        stream
            .join(userTable,
                (order, user) -> new EnrichedOrder(order, user))
            .to("enriched-orders");

        return stream;
    }
}

Kafka Connect와 Debezium CDC

Kafka Connect는 외부 시스템과 Kafka 간 데이터를 안정적으로 이동시키는 프레임워크입니다.

{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-server",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "1",
    "topic.prefix": "myapp",
    "database.include.list": "orders_db",
    "table.include.list": "orders_db.orders,orders_db.order_items",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes"
  }
}

Debezium은 CDC(Change Data Capture) 커넥터로, 데이터베이스의 변경 사항을 실시간으로 Kafka 토픽에 스트리밍합니다. INSERT, UPDATE, DELETE 이벤트가 모두 캡처됩니다.

Schema Registry + Avro

스키마 레지스트리는 Kafka 메시지의 스키마를 중앙에서 관리하여 호환성을 보장합니다.

// user-event.avsc (Avro 스키마)
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.example.events",
  "fields": [
    {"name": "userId", "type": "string"},
    {"name": "action", "type": {"type": "enum", "name": "Action",
      "symbols": ["CREATED", "UPDATED", "DELETED"]}},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}

성능 튜닝 가이드

┌──────────────────────────────────────────────────────────────┐
Producer 튜닝                              │
├──────────────────┬───────────────────────────────────────────┤
│ batch.size16KB(기본)32~64KB (처리량 증가)│ linger.ms0(기본)10~50ms (배치 효율성)│ compression.type │ none → lz4 (속도), zstd (압축률)│ buffer.memory32MB(기본)64~128MB (대량 전송 시)│ max.in.flight5(기본)1 (순서 보장 필요 시)├──────────────────┴───────────────────────────────────────────┤
Consumer 튜닝                              │
├──────────────────┬───────────────────────────────────────────┤
│ fetch.min.bytes1(기본)1024~10240 (배치 페치)│ fetch.max.wait500ms(기본)조정 (지연 vs 처리량)│ max.poll.records500(기본) → 워크로드에 따라 조정            │
│ session.timeout45s(기본) → 리밸런싱 민감도 조절            │
├──────────────────┴───────────────────────────────────────────┤
Broker 튜닝                                │
├──────────────────┬───────────────────────────────────────────┤
│ num.partitions   │ 토픽 기본 파티션  (목표 처리량 / 파티션당)│ replication      │ 3 (프로덕션 권장)│ min.insync2 (acks=all과 함께 사용)│ log.retention7(기본) → 비즈니스 요구에 따라             │
└──────────────────┴───────────────────────────────────────────┘

운영: 파티션 수 결정 공식

파티션 수를 결정하는 경험적 공식입니다.

목표 처리량 = T (messages/sec)
단일 파티션 처리량 = P (messages/sec)
필요 파티션 수 = max(T/P_producer, T/P_consumer)

예시:
- 목표: 100,000 msg/sec
- Producer 단일 파티션: 50,000 msg/sec
- Consumer 단일 파티션: 10,000 msg/sec
- 필요 파티션: max(100K/50K, 100K/10K) = max(2, 10) = 10
실무 권장:
- 소규모: 6~12 파티션
- 중규모: 12~30 파티션
- 대규모: 30~100 파티션
- 초대규모: 100+ (KRaft 모드에서 100만개 이상 지원)

4. ZooKeeper에서 KRaft로: 역사적 전환

ZooKeeper란?

Apache ZooKeeper는 분산 시스템의 코디네이션을 담당하는 서비스입니다. ZAB(ZooKeeper Atomic Broadcast) 프로토콜 기반의 합의 알고리즘을 사용합니다.

┌────────────────────────────────────────────┐
ZooKeeper Ensemble│                                            │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐   │
│  │  ZK Node  │ │  ZK Node  │ │  ZK Node  │   │
  (Leader)(Follower)(Follower)│   │
│  └──────────┘ └──────────┘ └──────────┘   │
│                                            │
│  저장 데이터:/brokers/ids/[0,1,2]     - 브로커 목록     │
/brokers/topics/my-topic - 토픽 메타데이터  │
/controller              - 컨트롤러 선출    │
/admin/delete_topics     - 토픽 삭제 요청   │
/config                  - 동적 설정        │
└────────────────────────────────────────────┘
           ▲          ▲          ▲
           │          │          │
     Kafka Broker1  Broker2   Broker3

Kafka가 ZooKeeper에 의존했던 이유는 다음과 같습니다.

  • 컨트롤러 선출: 클러스터 내 하나의 브로커가 컨트롤러 역할을 담당
  • 토픽/파티션 메타데이터: 토픽 설정, 파티션 할당 정보 저장
  • 브로커 등록: 살아있는 브로커 목록 관리 (Ephemeral Node)
  • ACL 관리: 접근 제어 목록 저장
  • 컨슈머 그룹 관리: 과거에는 컨슈머 오프셋도 ZK에 저장 (현재는 내부 토픽 사용)

ZooKeeper의 한계

ZooKeeper는 Kafka의 규모가 커지면서 여러 문제를 드러냈습니다.

  1. 운영 복잡성: Kafka와 별도로 ZooKeeper 클러스터를 운영해야 함 (별도의 모니터링, 패치, 백업)
  2. 확장성 병목: ZK의 znode에 메타데이터를 저장하므로, 수십만 개 이상의 파티션에서 성능 저하
  3. 컨트롤러 페일오버 지연: 컨트롤러가 죽으면 새 컨트롤러가 ZK에서 전체 메타데이터를 로드해야 하므로 수 분 소요 가능
  4. 단일 장애점 가능성: ZK 클러스터 자체가 과부하되거나 장애 발생 시 Kafka 전체에 영향
  5. 이중 인프라 비용: ZK 3노드 + Kafka N노드 운영 부담

KRaft 모드 (KIP-500)

KRaft(Kafka Raft)는 ZooKeeper를 완전히 제거하고, Kafka 자체에 Raft 기반 합의 프로토콜을 내장한 아키텍처입니다.

┌────────────────────────────────────────────────────┐
Kafka Cluster (KRaft Mode)│                                                    │
│  ┌───────────────────────────────────────────┐     │
│  │         Controller Quorum                  │     │
│  │                                           │     │
│  │  Broker-0      Broker-1      Broker-2     │     │
  (Controller   (Controller   (Controller  │     │
│  │   + Broker)     + Broker)     + Broker)   │     │
│  │  [Active]      [Follower]    [Follower]   │     │
│  └───────────────────────────────────────────┘     │
│                                                    │
│  ┌───────────────────────────────────────────┐     │
│  │         Data Brokers                       │     │
│  │  Broker-3      Broker-4      Broker-5     │     │
  (Broker only) (Broker only) (Broker only)│     │
│  └───────────────────────────────────────────┘     │
│                                                    │
Metadata Log: __cluster_metadata (내부 토픽)│  합의 프로토콜: Raft (자체 구현)└────────────────────────────────────────────────────┘

KRaft의 장점

항목ZooKeeper 모드KRaft 모드
외부 의존성ZK 클러스터 필요없음 (자체 완결)
운영 복잡성높음 (2개 시스템)낮음 (1개 시스템)
최대 파티션 수약 20만 개100만 개 이상
컨트롤러 페일오버수 분수 초
메타데이터 전파ZK Watch + RPCRaft 로그 복제
배포 복잡성ZK 별도 설치/설정Kafka만 설치

마이그레이션 가이드: ZK에서 KRaft로

Kafka 3.6 이상에서 KRaft가 GA(General Availability)이며, Kafka 4.0에서 ZooKeeper는 완전히 제거됩니다.

# 1단계: 클러스터 ID 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --cluster-id

# 2단계: ZK에서 메타데이터 마이그레이션 준비
bin/kafka-storage.sh format \
  --config config/kraft/server.properties \
  --cluster-id YOUR_CLUSTER_ID \
  --release-version 3.7

# 3단계: 브로커 설정 변경 (server.properties)
# ZK 모드 설정 제거
# zookeeper.connect=localhost:2181  ← 제거

# KRaft 모드 설정 추가
# process.roles=broker,controller
# node.id=1
# controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
# controller.listener.names=CONTROLLER
# listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# 4단계: 롤링 재시작 (하나씩 순차적으로)
# 각 브로커를 중지 → KRaft 설정 적용 → 재시작

# 5단계: 마이그레이션 완료 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --verify

마이그레이션 시 주의사항:

  • 반드시 Kafka 3.6 이상 버전에서 진행
  • 마이그레이션 전 전체 백업 수행
  • 스테이징 환경에서 먼저 검증
  • 롤링 방식으로 진행하여 다운타임 최소화
  • 마이그레이션 중에는 토픽 생성/삭제 자제

5. Streaming API 비교: SSE vs WebSocket vs gRPC Streaming

세 가지 Streaming API 비교

항목SSEWebSocketgRPC Streaming
통신 방향서버 → 클라이언트 (단방향)양방향 (Full Duplex)양방향 (Full Duplex)
프로토콜HTTP/1.1WS (HTTP Upgrade)HTTP/2
데이터 형식Text (UTF-8)Text / BinaryProtobuf (Binary)
자동 재연결내장 지원수동 구현 필요수동 구현 필요
브라우저 지원모든 모던 브라우저모든 모던 브라우저grpc-web 필요
멀티플렉싱불가 (HTTP/1.1)불가가능 (HTTP/2)
프록시/방화벽친화적 (HTTP)일부 제한일부 제한
직렬화 오버헤드낮음낮음~중간매우 낮음
적합한 케이스알림, 피드, 시세채팅, 게임, 협업MSA 간 통신

SSE (Server-Sent Events) 구현

WebFlux 기반 SSE:

@RestController
@RequestMapping("/api/stream")
public class SseController {

    private final StockPriceService stockPriceService;

    // 방법 1: Flux + ServerSentEvent 래퍼
    @GetMapping(value = "/stocks", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
            @RequestParam List<String> symbols) {
        return stockPriceService.getPriceStream(symbols)
            .map(price -> ServerSentEvent.<StockPrice>builder()
                .id(String.valueOf(price.getTimestamp()))
                .event("price-update")
                .data(price)
                .retry(Duration.ofSeconds(5))
                .build());
    }

    // 방법 2: 심플한 Flux 스트림
    @GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<NotificationEvent> streamNotifications(
            @AuthenticationPrincipal UserDetails user) {
        return notificationService.getNotificationStream(user.getUsername())
            .doOnCancel(() -> log.info("Client disconnected: {}", user.getUsername()));
    }

    // 방법 3: Heartbeat 포함 (연결 유지)
    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Object>> streamWithHeartbeat() {
        Flux<ServerSentEvent<Object>> dataStream = eventService.getEventStream()
            .map(event -> ServerSentEvent.builder()
                .event("data")
                .data((Object) event)
                .build());

        Flux<ServerSentEvent<Object>> heartbeat = Flux.interval(Duration.ofSeconds(30))
            .map(tick -> ServerSentEvent.builder()
                .event("heartbeat")
                .data((Object) "ping")
                .build());

        return Flux.merge(dataStream, heartbeat);
    }
}

클라이언트 (JavaScript):

const eventSource = new EventSource('/api/stream/stocks?symbols=AAPL,GOOGL,MSFT')

eventSource.addEventListener('price-update', (event) => {
  const price = JSON.parse(event.data)
  updateDashboard(price)
})

eventSource.addEventListener('heartbeat', () => {
  console.log('Connection alive')
})

eventSource.onerror = (error) => {
  console.error('SSE error:', error)
  // EventSource는 자동으로 재연결을 시도합니다
}

WebSocket 구현

WebFlux 기반 WebSocket:

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping webSocketHandlerMapping(ChatWebSocketHandler handler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/chat", handler);

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

@Component
@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {

    private final Sinks.Many<ChatMessage> chatSink =
        Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 인바운드: 클라이언트 → 서버
        Mono<Void> input = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::parseChatMessage)
            .doOnNext(msg -> {
                log.info("Received: {}", msg);
                chatSink.tryEmitNext(msg);  // 모든 구독자에게 브로드캐스트
            })
            .then();

        // 아웃바운드: 서버 → 클라이언트
        Mono<Void> output = session.send(
            chatSink.asFlux()
                .map(msg -> session.textMessage(toJson(msg)))
        );

        // 양방향 동시 처리
        return Mono.zip(input, output).then();
    }

    private ChatMessage parseChatMessage(String payload) {
        return new ObjectMapper().readValue(payload, ChatMessage.class);
    }

    private String toJson(ChatMessage msg) {
        return new ObjectMapper().writeValueAsString(msg);
    }
}

gRPC Streaming 구현

Proto 파일 정의:

syntax = "proto3";

package stockservice;

service StockService {
  // Server Streaming: 서버가 연속으로 전송
  rpc StreamPrices(StockRequest) returns (stream StockPrice);

  // Client Streaming: 클라이언트가 연속으로 전송
  rpc SendOrders(stream OrderRequest) returns (OrderSummary);

  // Bidirectional Streaming: 양방향
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message StockRequest {
  repeated string symbols = 1;
}

message StockPrice {
  string symbol = 1;
  double price = 2;
  int64 timestamp = 3;
}

message OrderRequest {
  string symbol = 1;
  int32 quantity = 2;
  double price = 3;
}

message OrderSummary {
  int32 total_orders = 1;
  double total_amount = 2;
}

message ChatMessage {
  string user = 1;
  string content = 2;
  int64 timestamp = 3;
}

gRPC Server Streaming 구현:

@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {

    private final StockPriceService priceService;

    @Override
    public void streamPrices(StockRequest request,
                             StreamObserver<StockPrice> responseObserver) {
        // 구독 시작
        Disposable subscription = priceService
            .getPriceStream(request.getSymbolsList())
            .subscribe(
                price -> {
                    // 각 가격 업데이트를 클라이언트에 전송
                    responseObserver.onNext(StockPrice.newBuilder()
                        .setSymbol(price.getSymbol())
                        .setPrice(price.getPrice())
                        .setTimestamp(price.getTimestamp())
                        .build());
                },
                error -> {
                    responseObserver.onError(Status.INTERNAL
                        .withDescription(error.getMessage())
                        .asRuntimeException());
                },
                () -> {
                    responseObserver.onCompleted();
                }
            );

        // 클라이언트 연결 해제 시 구독 취소
        Context.current().addListener(
            context -> subscription.dispose(),
            Runnable::run
        );
    }
}

Streaming API 선택 가이드

                    ┌─────────────┐
                    │ 양방향 통신  │
                    │  필요한가?                    └──────┬──────┘
/     \ 아니오
                      /       \
            ┌────────▼──┐  ┌──▼──────────┐
            │ 브라우저    │  │    SSE            │ 직접 통신? (가장 간단)            └─────┬─────┘  └─────────────┘
/   \ 아니오
               /     \
    ┌─────────▼─┐  ┌─▼──────────────┐
WebSocket │  │ gRPC Streaming     (브라우저) (MSA/백엔드)    └───────────┘  └────────────────┘

6. WebFlux + Kafka 통합 아키텍처

Reactor Kafka 설정

Reactor Kafka는 Kafka를 리액티브하게 사용할 수 있게 해주는 라이브러리입니다.

// build.gradle
// implementation 'io.projectreactor.kafka:reactor-kafka:1.3.23'

@Configuration
public class ReactorKafkaConfig {

    @Bean
    public ReactiveKafkaProducerTemplate<String, Object> reactiveProducer() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        SenderOptions<String, Object> senderOptions = SenderOptions.create(props);
        return new ReactiveKafkaProducerTemplate<>(senderOptions);
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, StockPrice> reactiveConsumer() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-price-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        ReceiverOptions<String, StockPrice> receiverOptions =
            ReceiverOptions.<String, StockPrice>create(props)
                .subscription(Collections.singleton("stock-prices"));

        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }
}

실시간 데이터 파이프라인: Kafka에서 브라우저까지

┌──────────┐    ┌─────────┐    ┌───────────┐    ┌──────────┐    ┌──────────┐
External │    │  Kafka  │    │  WebFlux  │    │   SSE    │    │  ReactAPI    │───►│ Producer│───►│ Consumer  │───►│ Endpoint │───►│ Frontend│          │    │         │     (Reactor) │    │          │    │          │
└──────────┘    └─────────┘    └───────────┘    └──────────┘    └──────────┘
  시세 소스        토픽 발행       리액티브 소비     HTTP 스트림     실시간 렌더링
// 1. Kafka Producer: 외부 API에서 시세 데이터를 Kafka로
@Service
@RequiredArgsConstructor
@Slf4j
public class StockPriceProducer {

    private final ReactiveKafkaProducerTemplate<String, Object> producer;
    private final WebClient marketDataClient;

    @Scheduled(fixedRate = 1000)  // 1초마다
    public void fetchAndPublish() {
        marketDataClient.get()
            .uri("/v1/quotes?symbols=AAPL,GOOGL,MSFT")
            .retrieve()
            .bodyToFlux(StockPrice.class)
            .flatMap(price -> producer.send("stock-prices", price.getSymbol(), price))
            .doOnNext(result -> log.debug("Sent: topic={}, partition={}, offset={}",
                result.recordMetadata().topic(),
                result.recordMetadata().partition(),
                result.recordMetadata().offset()))
            .subscribe();
    }
}

// 2. Kafka Consumer + SSE: Kafka에서 브라우저로
@RestController
@RequestMapping("/api/stream")
@RequiredArgsConstructor
public class StockStreamController {

    private final ReactiveKafkaConsumerTemplate<String, StockPrice> consumer;

    // Kafka 토픽을 SSE로 브리지
    @GetMapping(value = "/stock-prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
            @RequestParam(required = false) List<String> symbols) {

        return consumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .filter(price -> symbols == null || symbols.contains(price.getSymbol()))
            .map(price -> ServerSentEvent.<StockPrice>builder()
                .id(price.getSymbol() + "-" + price.getTimestamp())
                .event("price-update")
                .data(price)
                .build())
            .doOnCancel(() -> log.info("Client disconnected from stock stream"));
    }
}

CQRS + Event Sourcing with Kafka

┌──────────────────────────────────────────────────────────┐
CQRS Pattern│                                                          │
Command SideQuery Side│                            │                             │
│  ┌──────────┐              │  ┌───────────┐              │
│  │ WebFluxCommand     │  │  WebFluxQuery│  │Controller│────────┐     │  │Controller │◄─────┐       │
│  └──────────┘        │     │  └───────────┘      │       │
│                      ▼     │                     │       │
│  ┌──────────────────────┐  │  ┌─────────────────────┐   │
│  │   Kafka Topic        │  │  │   Read Database      │   │
   (Event Store)      │──┼─►│   (Materialized     │   │
│  │   order-events       │  │  │    View / Redis)     │   │
│  └──────────────────────┘  │  └─────────────────────┘   │
│                            │                             │
└──────────────────────────────────────────────────────────┘
// Command: 주문 생성 → Kafka에 이벤트 발행
@Service
@RequiredArgsConstructor
public class OrderCommandService {

    private final ReactiveKafkaProducerTemplate<String, Object> producer;

    public Mono<String> createOrder(CreateOrderCommand command) {
        String orderId = UUID.randomUUID().toString();
        OrderCreatedEvent event = new OrderCreatedEvent(
            orderId, command.getUserId(), command.getItems(),
            command.getTotalAmount(), Instant.now()
        );

        return producer.send("order-events", orderId, event)
            .map(result -> orderId);
    }
}

// Query: Kafka 이벤트를 소비하여 읽기 모델 업데이트
@Service
@RequiredArgsConstructor
public class OrderProjectionService {

    private final ReactiveKafkaConsumerTemplate<String, OrderEvent> consumer;
    private final ReactiveRedisTemplate<String, OrderView> redisTemplate;

    @PostConstruct
    public void startProjection() {
        consumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .flatMap(this::updateProjection)
            .doOnError(e -> log.error("Projection error", e))
            .retry()
            .subscribe();
    }

    private Mono<Boolean> updateProjection(OrderEvent event) {
        if (event instanceof OrderCreatedEvent created) {
            OrderView view = OrderView.from(created);
            return redisTemplate.opsForValue()
                .set("order:" + created.getOrderId(), view);
        }
        return Mono.just(true);
    }
}

// Query Controller: Redis에서 읽기
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderQueryController {

    private final ReactiveRedisTemplate<String, OrderView> redisTemplate;

    @GetMapping("/{orderId}")
    public Mono<OrderView> getOrder(@PathVariable String orderId) {
        return redisTemplate.opsForValue()
            .get("order:" + orderId)
            .switchIfEmpty(Mono.error(new OrderNotFoundException(orderId)));
    }
}

7. 실전 프로젝트: 실시간 주식 시세 시스템

전체 아키텍처

┌────────────────────────────────────────────────────────────────────┐
Real-Time Stock Price System│                                                                    │
│  ┌──────────┐   ┌──────────┐   ┌────────────┐   ┌──────────────┐ │
│  │ Market   │   │  Kafka   │   │  WebFlux   │   │   React      │ │
│  │ Data API │──►│ Producer │──►│  Consumer  │──►│  Frontend    │ │
 (Yahoo/  │   │          │   │  + SSE  (Dashboard) │ │
│  │  Alpha)  │   │          │   │            │   │              │ │
│  └──────────┘   └──────────┘   └────────────┘   └──────────────┘ │
│       │              │               │                  │         │
│       │         ┌────▼────┐    ┌─────▼─────┐    ┌──────▼──────┐ │
│       │         │  Kafka  │    │  Redis     │    │ Prometheus  │ │
│       │         │  Topic  │    │  Cache     │    │ + Grafana   │ │
│       │          (3 rep) │    │            │    │             │ │
│       │         └─────────┘    └───────────┘    └─────────────┘ │
│       │                                                          │
│  ┌────▼──────────────────────────────────────┐                   │
│  │  Kafka Streams: 집계/분석                   │                   │
│  │  - 5분 이동평균                              │                   │
│  │  - 일일 최고/최저가                          │                   │
│  │  - 거래량 이상 탐지                          │                   │
│  └───────────────────────────────────────────┘                   │
└────────────────────────────────────────────────────────────────────┘

핵심 코드 스니펫

1) Market Data Fetcher:

@Component
@RequiredArgsConstructor
@Slf4j
public class MarketDataFetcher {

    private final WebClient marketClient;
    private final ReactiveKafkaProducerTemplate<String, Object> kafkaProducer;

    @Scheduled(fixedRate = 1000)
    public void fetchPrices() {
        List<String> symbols = List.of("AAPL", "GOOGL", "MSFT", "AMZN", "TSLA");

        Flux.fromIterable(symbols)
            .flatMap(symbol -> marketClient.get()
                .uri("/quote/{symbol}", symbol)
                .retrieve()
                .bodyToMono(MarketQuote.class)
                .map(quote -> new StockPrice(
                    symbol, quote.getPrice(), quote.getVolume(),
                    quote.getChange(), Instant.now().toEpochMilli()))
                .onErrorResume(e -> {
                    log.warn("Failed to fetch {}: {}", symbol, e.getMessage());
                    return Mono.empty();
                }))
            .flatMap(price -> kafkaProducer
                .send("stock-prices", price.getSymbol(), price)
                .doOnSuccess(r -> log.debug("Published: {} = {}",
                    price.getSymbol(), price.getPrice())))
            .subscribe();
    }
}

2) Kafka Streams 집계:

@Configuration
@EnableKafkaStreams
public class StockAnalyticsConfig {

    @Bean
    public KStream<String, StockPrice> stockAnalytics(StreamsBuilder builder) {
        KStream<String, StockPrice> priceStream = builder.stream("stock-prices");

        // 5분 이동평균 계산
        priceStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
            .aggregate(
                PriceAggregation::new,
                (symbol, price, agg) -> agg.add(price.getPrice()),
                Materialized.<String, PriceAggregation, WindowStore<Bytes, byte[]>>
                    as("price-5min-avg")
                    .withValueSerde(new PriceAggregationSerde()))
            .toStream()
            .map((windowed, agg) -> KeyValue.pair(
                windowed.key(),
                new PriceAverage(windowed.key(), agg.getAverage(),
                    agg.getMin(), agg.getMax(), agg.getCount())))
            .to("stock-price-analytics");

        return priceStream;
    }
}

// 집계 클래스
public class PriceAggregation {
    private double sum = 0;
    private double min = Double.MAX_VALUE;
    private double max = Double.MIN_VALUE;
    private long count = 0;

    public PriceAggregation add(double price) {
        sum += price;
        min = Math.min(min, price);
        max = Math.max(max, price);
        count++;
        return this;
    }

    public double getAverage() {
        return count > 0 ? sum / count : 0;
    }

    // getters ...
}

3) SSE 엔드포인트:

@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {

    private final ReactiveKafkaConsumerTemplate<String, StockPrice> priceConsumer;
    private final ReactiveKafkaConsumerTemplate<String, PriceAverage> analyticsConsumer;

    // 실시간 시세 스트림
    @GetMapping(value = "/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> liveStream(
            @RequestParam List<String> symbols) {
        return priceConsumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .filter(p -> symbols.contains(p.getSymbol()))
            .map(p -> ServerSentEvent.<StockPrice>builder()
                .event("price")
                .data(p)
                .build());
    }

    // 분석 데이터 스트림
    @GetMapping(value = "/analytics", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<PriceAverage>> analyticsStream() {
        return analyticsConsumer.receiveAutoAck()
            .map(ConsumerRecord::value)
            .map(avg -> ServerSentEvent.<PriceAverage>builder()
                .event("analytics")
                .data(avg)
                .build());
    }
}

4) React Frontend (TypeScript):

// hooks/useStockStream.ts
import { useState, useEffect, useCallback } from 'react';

interface StockPrice {
  symbol: string;
  price: number;
  volume: number;
  change: number;
  timestamp: number;
}

export function useStockStream(symbols: string[]) {
  const [prices, setPrices] = useState<Map<string, StockPrice>>(new Map());

  useEffect(() => {
    const params = symbols.map(s => `symbols=${s}`).join('&');
    const eventSource = new EventSource(`/api/stocks/live?${params}`);

    eventSource.addEventListener('price', (event: MessageEvent) => {
      const price: StockPrice = JSON.parse(event.data);
      setPrices(prev => new Map(prev).set(price.symbol, price));
    });

    eventSource.onerror = () => {
      console.error('SSE connection error, will auto-reconnect...');
    };

    return () => eventSource.close();
  }, [symbols.join(',')]);

  return prices;
}

// components/StockDashboard.tsx
function StockDashboard() {
  const prices = useStockStream(['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']);

  return (
    <div className="grid grid-cols-5 gap-4">
      {Array.from(prices.entries()).map(([symbol, price]) => (
        <div key={symbol} className="p-4 rounded-lg bg-gray-800">
          <h3 className="text-lg font-bold">{symbol}</h3>
          <p className="text-2xl">${price.price.toFixed(2)}</p>
          <p className={price.change >= 0 ? 'text-green-400' : 'text-red-400'}>
            {price.change >= 0 ? '+' : ''}{price.change.toFixed(2)}%
          </p>
        </div>
      ))}
    </div>
  );
}

확장성: 파티션 기반 수평 확장

확장  (1 인스턴스):
  WebFlux Instance-1Kafka Consumer (P0, P1, P2, P3, P4, P5)

확장  (3 인스턴스):
  WebFlux Instance-1Kafka Consumer (P0, P1)SSE Client 1~1000
  WebFlux Instance-2Kafka Consumer (P2, P3)SSE Client 1001~2000
  WebFlux Instance-3Kafka Consumer (P4, P5)SSE Client 2001~3000

핵심: Kafka 파티션 수 = 최대 병렬 Consumer     파티션 (symbol)로 같은 종목은 같은 파티션에 순서 보장

모니터링 설정

# docker-compose-monitoring.yml
services:
  prometheus:
    image: prom/prometheus:v2.51.0
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - '9090:9090'

  grafana:
    image: grafana/grafana:10.4.0
    ports:
      - '3000:3000'
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

  kafka-exporter:
    image: danielqsj/kafka-exporter:latest
    command: ['--kafka.server=kafka:9092']
    ports:
      - '9308:9308'
// Micrometer 메트릭 추가
@Component
@RequiredArgsConstructor
public class StockMetrics {

    private final MeterRegistry registry;
    private final AtomicLong connectedClients = new AtomicLong(0);

    @PostConstruct
    void init() {
        Gauge.builder("stock.stream.connected_clients", connectedClients, AtomicLong::get)
            .description("Number of connected SSE clients")
            .register(registry);
    }

    public void recordPricePublished(String symbol) {
        registry.counter("stock.price.published",
            "symbol", symbol).increment();
    }

    public void recordLatency(String symbol, long latencyMs) {
        registry.timer("stock.price.latency",
            "symbol", symbol)
            .record(Duration.ofMillis(latencyMs));
    }

    public void clientConnected() { connectedClients.incrementAndGet(); }
    public void clientDisconnected() { connectedClients.decrementAndGet(); }
}

8. 언제 무엇을 선택할 것인가?

기술 선택 의사결정 매트릭스

시나리오추천 기술이유
간단한 CRUD APISpring MVC학습 곡선 낮고 생태계 풍부
높은 동시접속 APIWebFlux적은 리소스로 만 단위 동시접속
이벤트 스트리밍/로그Kafka높은 처리량, 내구성, 리플레이 가능
실시간 알림/피드SSE단방향, 자동 재연결, HTTP 호환
실시간 채팅/게임WebSocket양방향, 낮은 지연시간
MSA 간 실시간 통신gRPC Streaming양방향, Protobuf, 높은 성능
CDC(변경 감지)Kafka Connect + DebeziumDB 변경을 실시간 캡처
복잡한 스트림 처리Kafka StreamsStateful 처리, 윈도우 집계
CQRS 패턴Kafka + WebFlux이벤트 소싱 + 리액티브 쿼리

조합 패턴

실전에서는 단일 기술이 아니라 여러 기술을 조합합니다.

패턴 1: 실시간 대시보드
  Data SourceKafkaWebFlux ConsumerSSEBrowser

패턴 2: 실시간 채팅 + 메시지 저장
  BrowserWebSocketWebFluxKafka (저장)Elasticsearch (검색)

패턴 3: MSA 이벤트 드리븐
  Service AKafkaService B (WebFlux Consumer)
Service C (WebFlux Consumer)
Analytics (Kafka Streams)

패턴 4: IoT 데이터 처리
  SensorsMQTTKafka ConnectKafkaKafka Streams (집계)
WebFluxSSEDashboard

안티패턴 (피해야 할 것들)

  1. WebFlux에서 JDBC 사용: 이벤트 루프를 블로킹하여 성능 급격 저하. R2DBC 또는 Schedulers.boundedElastic() 사용
  2. 모든 API를 WebFlux로: 간단한 CRUD는 MVC가 더 적합. 불필요한 복잡성 증가
  3. Kafka를 단순 메시지 큐로: RabbitMQ/SQS가 더 적합한 경우도 있음. Kafka는 이벤트 로그에 최적화
  4. 파티션 수 과다 설정: 파티션이 많으면 리밸런싱 비용 증가. 필요에 맞게 설정
  5. SSE에서 대용량 데이터: SSE는 텍스트 기반이므로 바이너리 데이터에 부적합. gRPC 사용

9. 학습 로드맵 (6개월)

1개월 차: 리액티브 프로그래밍 기초

  • Week 1-2: Project Reactor 핵심 (Mono, Flux, 오퍼레이터)
    • 공식 문서의 "Learn" 섹션 완독
    • Lite Rx Hands-On 실습 프로젝트
  • Week 3-4: Spring WebFlux 기본
    • 간단한 REST API를 MVC에서 WebFlux로 마이그레이션
    • R2DBC 연동 실습

2개월 차: WebFlux 심화

  • Week 1-2: WebClient, WebSocket, SSE 구현
    • 실시간 알림 시스템 미니 프로젝트
  • Week 3-4: 테스트 (StepVerifier, WebTestClient)
    • BlockHound 적용
    • 성능 테스트 (Gatling/k6)

3개월 차: Apache Kafka 기초

  • Week 1-2: Kafka 아키텍처, Producer/Consumer
    • Docker Compose로 로컬 Kafka 클러스터 구축
    • 기본 메시지 발행/소비 실습
  • Week 3-4: Consumer Group, Offset 관리, EOS
    • 멀티 컨슈머 그룹 시나리오 실습
    • Transactional Producer 구현

4개월 차: Kafka 심화

  • Week 1-2: Kafka Streams, KTable/KStream
    • 윈도우 집계, Join 실습
  • Week 3-4: Kafka Connect, Schema Registry
    • Debezium CDC 연동
    • Avro 스키마 관리

5개월 차: 통합 프로젝트

  • Week 1-2: WebFlux + Kafka 통합
    • Reactor Kafka 활용
    • CQRS + Event Sourcing 패턴
  • Week 3-4: 실시간 대시보드 프로젝트
    • Kafka에서 SSE까지 전체 파이프라인
    • React 프론트엔드 연동

6개월 차: 운영과 최적화

  • Week 1-2: KRaft 모드 마이그레이션 실습
    • 모니터링 (Prometheus + Grafana)
    • 알림 설정 (Consumer Lag, Error Rate)
  • Week 3-4: 성능 튜닝과 장애 대응
    • 부하 테스트 및 병목 분석
    • 장애 시나리오 시뮬레이션 (브로커 다운, 네트워크 파티션)

실전 퀴즈

아래 퀴즈를 통해 학습 내용을 점검해 보세요.

Q1: WebFlux에서 Mono.just()과 Mono.defer()의 차이는 무엇인가요?

Mono.just(value)는 값을 즉시 캡처하여 Mono로 감쌉니다. 구독 시점과 관계없이 동일한 값을 반환합니다.

Mono.defer(() -> Mono.just(value))는 구독 시점에 람다를 평가합니다. 따라서 매 구독마다 새로운 값을 생성할 수 있습니다.

// just: 생성 시점에 값이 결정됨
Mono<Long> eager = Mono.just(System.currentTimeMillis());
// 1초 후 구독해도 동일한 타임스탬프

// defer: 구독 시점에 값이 결정됨
Mono<Long> lazy = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
// 구독할 때마다 새로운 타임스탬프

실무에서 switchIfEmpty와 함께 사용할 때 중요합니다. switchIfEmpty(Mono.just(createDefault()))는 createDefault()가 즉시 실행되지만, switchIfEmpty(Mono.defer(() -> createDefault()))는 실제로 빈 경우에만 실행됩니다.

Q2: Kafka에서 acks=all과 min.insync.replicas=2를 함께 설정하면 어떤 보장을 제공하나요?

acks=all은 Leader가 모든 ISR(In-Sync Replicas)의 확인을 받은 후에 Producer에게 성공을 반환합니다.

min.insync.replicas=2는 ISR의 최소 크기를 2로 설정합니다. ISR이 2 미만이면 Producer는 NotEnoughReplicasException을 받습니다.

이 조합은 다음을 보장합니다:

  • 메시지가 최소 2개의 브로커에 기록된 후에만 성공 응답
  • Replication Factor가 3일 때, 1대의 브로커가 장애나도 데이터 유실 없음
  • 2대 이상 동시 장애 시 쓰기 실패 (데이터 유실 방지를 위해 의도적 거부)

프로덕션에서 가장 많이 사용하는 안정성 설정입니다.

Q3: SSE 연결에서 클라이언트가 네트워크 장애로 끊어졌다가 재연결할 때, 어떻게 유실된 이벤트를 복구할 수 있나요?

SSE는 Last-Event-ID 헤더를 통해 재연결 시 마지막으로 수신한 이벤트 ID를 서버에 전달합니다. 서버는 이를 기반으로 유실된 이벤트를 재전송할 수 있습니다.

구현 방법:

  1. 서버에서 각 이벤트에 고유 ID를 부여합니다 (예: Kafka offset 또는 타임스탬프).
  2. 클라이언트 재연결 시 Last-Event-ID 헤더를 확인합니다.
  3. 해당 ID 이후의 이벤트를 Kafka에서 다시 소비하여 전송합니다.
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<EventData>> stream(
        @RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
    long startOffset = lastEventId != null ? Long.parseLong(lastEventId) + 1 : -1;
    // startOffset부터 Kafka 소비 시작
    return eventService.getEventsFrom(startOffset)
        .map(e -> ServerSentEvent.<EventData>builder()
            .id(String.valueOf(e.getOffset()))
            .data(e)
            .build());
}

Kafka의 메시지 보존(retention) 덕분에 일정 기간 내의 이벤트는 언제든 다시 읽을 수 있습니다.

Q4: KRaft 모드에서 Controller Quorum의 Active Controller가 장애나면 어떻게 되나요?

KRaft 모드에서는 Raft 합의 프로토콜을 사용하여 Controller 선출을 자동으로 처리합니다.

  1. Active Controller가 하트비트를 보내지 않으면, Follower Controller들이 이를 감지합니다.
  2. Election Timeout이 만료되면 Follower 중 하나가 후보(Candidate)로 전환됩니다.
  3. 과반수(Quorum)의 투표를 받은 후보가 새로운 Active Controller가 됩니다.
  4. 새 Controller는 이미 Raft 로그를 통해 메타데이터를 복제하고 있으므로, ZooKeeper 모드처럼 전체 메타데이터를 다시 로드할 필요가 없습니다.

결과적으로 페일오버 시간이 ZooKeeper 모드의 수 분에서 수 초로 단축됩니다. Controller Quorum이 3개(1 Active + 2 Follower)일 때, 1대 장애까지 허용 가능합니다.

Q5: WebFlux에서 flatMap과 concatMap의 차이를 설명하고, 각각 언제 사용해야 하나요?

flatMap: 내부 Publisher를 구독할 때 동시성(concurrency)을 허용합니다. 여러 내부 스트림이 인터리빙(interleaving)되어 순서가 보장되지 않습니다.

concatMap: 내부 Publisher를 순차적으로 구독합니다. 이전 스트림이 완료된 후에 다음 스트림을 구독하므로 순서가 보장됩니다.

// flatMap: 순서 무관, 최대 성능
// 사용 예: 여러 사용자의 프로필을 병렬로 조회
Flux.fromIterable(userIds)
    .flatMap(id -> userService.findById(id))  // 동시 실행
    // 결과 순서: [User3, User1, User2] (비결정적)

// concatMap: 순서 보장, 순차 실행
// 사용 예: 이벤트를 순서대로 처리해야 할 때
Flux.fromIterable(events)
    .concatMap(event -> processEvent(event))  // 하나씩 순차 실행
    // 결과 순서: [Event1, Event2, Event3] (항상 동일)

선택 기준:

  • 순서가 중요하지 않고 성능이 중요하면 flatMap
  • 순서가 반드시 보장되어야 하면 concatMap
  • flatMap에 concurrency 파라미터를 설정하여 동시 실행 수를 제한할 수도 있습니다: flatMap(fn, 10) (최대 10개 동시)

참고 자료

공식 문서

아키텍처 및 패턴

Kafka 생태계

모니터링 및 운영

학습 자료