- Published on
WebFlux + Kafka + ZooKeeper 완전 정복: 리액티브 스트리밍 아키텍처의 모든 것
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- 들어가며
- 1. 리액티브 프로그래밍이란? (기초부터)
- 2. Spring WebFlux 딥다이브
- 3. Apache Kafka 완전 가이드
- 4. ZooKeeper에서 KRaft로: 역사적 전환
- 5. Streaming API 비교: SSE vs WebSocket vs gRPC Streaming
- 6. WebFlux + Kafka 통합 아키텍처
- 7. 실전 프로젝트: 실시간 주식 시세 시스템
- 8. 언제 무엇을 선택할 것인가?
- 9. 학습 로드맵 (6개월)
- 실전 퀴즈
- 참고 자료
들어가며
현대 소프트웨어 시스템은 "실시간"을 요구합니다. 주식 시세는 밀리초 단위로 업데이트되고, 채팅 메시지는 즉시 전달되어야 하며, IoT 센서 데이터는 끊임없이 쏟아집니다. 전통적인 요청-응답(Request-Response) 모델로는 이러한 요구사항을 감당하기 어렵습니다.
이 글에서는 실시간 데이터 처리 아키텍처의 핵심 기술 세 가지를 깊이 있게 다룹니다.
- Spring WebFlux — 리액티브 웹 프레임워크로 높은 동시접속을 효율적으로 처리
- Apache Kafka — 분산 이벤트 스트리밍 플랫폼으로 대용량 데이터를 안정적으로 전달
- Streaming API — SSE, WebSocket, gRPC Streaming으로 클라이언트에 실시간 데이터 푸시
또한 Kafka의 역사적 전환점인 ZooKeeper에서 KRaft로의 마이그레이션도 상세히 살펴봅니다. 각 기술의 동작 원리부터 실전 코드 예제, 성능 벤치마크, 아키텍처 패턴까지 — 리액티브 스트리밍 아키텍처의 모든 것을 한 곳에 담았습니다.
1. 리액티브 프로그래밍이란? (기초부터)
Reactive Manifesto
리액티브 시스템은 2014년에 발표된 Reactive Manifesto에서 네 가지 핵심 속성을 정의합니다.
- Responsive (응답성): 시스템은 가능한 한 적시에 응답해야 합니다. 응답성은 사용성과 유용성의 기초입니다.
- Resilient (탄력성): 시스템은 장애에 직면하더라도 응답성을 유지합니다. 복제(replication), 격리(containment), 위임(delegation)으로 달성합니다.
- Elastic (유연성): 시스템은 작업 부하의 변화에 따라 자원을 증감시키며 응답성을 유지합니다.
- Message Driven (메시지 기반): 리액티브 시스템은 비동기 메시지 전달에 의존하여 컴포넌트 간 느슨한 결합, 격리, 위치 투명성을 보장합니다.
┌─────────────────────────────────┐
│ Responsive │
│ (빠르고 일관된 응답) │
└──────────┬──────────┬───────────┘
│ │
┌──────────▼──┐ ┌───▼──────────┐
│ Resilient │ │ Elastic │
│ (장애 복원력) │ │ (부하 적응력) │
└──────────┬──┘ └───┬──────────┘
│ │
┌──────────▼──────────▼───────────┐
│ Message Driven │
│ (비동기 메시지 기반 통신) │
└─────────────────────────────────┘
명령형 vs 리액티브 패러다임 비교
명령형(Imperative) 프로그래밍은 "무엇을 어떻게 할지" 단계별로 지시합니다.
// 명령형: 블로킹 방식
List<User> users = userRepository.findAll(); // DB 호출 - 스레드 블로킹
List<User> activeUsers = new ArrayList<>();
for (User user : users) {
if (user.isActive()) {
activeUsers.add(user);
}
}
List<String> names = new ArrayList<>();
for (User user : activeUsers) {
names.add(user.getName());
}
return names;
리액티브(Reactive) 프로그래밍은 "데이터 흐름과 변화 전파"를 선언적으로 표현합니다.
// 리액티브: 논블로킹 방식
return userRepository.findAll() // Flux<User> 반환 - 논블로킹
.filter(User::isActive) // 활성 사용자 필터링
.map(User::getName) // 이름 추출
.collectList(); // Mono<List<String>> 반환
핵심 차이는 다음과 같습니다.
| 항목 | 명령형 | 리액티브 |
|---|---|---|
| 실행 모델 | 동기/블로킹 | 비동기/논블로킹 |
| 스레드 사용 | 요청당 하나의 스레드 | 소수의 이벤트 루프 스레드 |
| 데이터 처리 | Pull 기반 (소비자가 요청) | Push 기반 (생산자가 전달) |
| 에러 처리 | try-catch | onError 시그널 |
| 백프레셔 | 없음 (또는 수동 구현) | 내장 지원 |
| 적합한 상황 | 낮은 동시접속, 간단한 로직 | 높은 동시접속, I/O 집약적 |
Reactive Streams 스펙
Reactive Streams는 JVM 상에서 논블로킹 백프레셔를 지원하는 비동기 스트림 처리의 표준입니다. Java 9의 java.util.concurrent.Flow에 포함되었으며, 핵심 인터페이스는 네 가지입니다.
// 1. Publisher: 데이터를 생산하는 주체
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
// 2. Subscriber: 데이터를 소비하는 주체
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 구독 시작
void onNext(T t); // 데이터 수신
void onError(Throwable t); // 에러 발생
void onComplete(); // 스트림 완료
}
// 3. Subscription: Publisher-Subscriber 간 연결 관리
public interface Subscription {
void request(long n); // n개의 데이터 요청 (백프레셔)
void cancel(); // 구독 취소
}
// 4. Processor: Publisher이면서 Subscriber (중간 처리)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
데이터 흐름의 시퀀스는 다음과 같습니다.
Publisher Subscriber
│ │
│◄──── subscribe() ────────────│
│ │
│───── onSubscribe(sub) ──────►│
│ │
│◄──── sub.request(n) ─────────│ ← 백프레셔: n개만 요청
│ │
│───── onNext(data1) ─────────►│
│───── onNext(data2) ─────────►│
│───── ... │
│───── onNext(dataN) ─────────►│
│ │
│◄──── sub.request(m) ─────────│ ← 추가 m개 요청
│ │
│───── onComplete() ──────────►│ (또는 onError())
백프레셔(Backpressure)의 중요성과 동작 원리
백프레셔는 소비자가 처리할 수 있는 속도에 맞춰 생산자의 데이터 전송 속도를 조절하는 메커니즘입니다. 백프레셔가 없으면 다음과 같은 문제가 발생합니다.
- 메모리 초과(OOM): 빠른 생산자가 느린 소비자의 버퍼를 가득 채움
- 데이터 유실: 버퍼 오버플로우로 데이터가 드롭됨
- 시스템 장애: 연쇄적인 장애 전파 (Cascading Failure)
백프레셔 전략에는 여러 가지가 있습니다.
// 1. Buffer: 일정 크기까지 버퍼링 후 오버플로우 시 에러
Flux.range(1, 1000)
.onBackpressureBuffer(256)
.subscribe(item -> processSlowly(item));
// 2. Drop: 처리 못하는 데이터는 버림
Flux.range(1, 1000)
.onBackpressureDrop(dropped ->
log.warn("Dropped: " + dropped))
.subscribe(item -> processSlowly(item));
// 3. Latest: 최신 값만 유지
Flux.range(1, 1000)
.onBackpressureLatest()
.subscribe(item -> processSlowly(item));
// 4. Error: 처리 못하면 에러 시그널
Flux.range(1, 1000)
.onBackpressureError()
.subscribe(item -> processSlowly(item));
2. Spring WebFlux 딥다이브
WebFlux vs Spring MVC 아키텍처 비교
Spring MVC와 WebFlux는 근본적으로 다른 실행 모델을 사용합니다.
Spring MVC: Thread-per-Request 모델
Client Request ──► Tomcat Thread Pool (200 threads)
│
Thread-1 ──► Controller ──► Service ──► DB (블로킹 대기)
Thread-2 ──► Controller ──► Service ──► DB (블로킹 대기)
Thread-3 ──► Controller ──► Service ──► External API (블로킹 대기)
...
Thread-200 ──► (모든 스레드 고갈 시 요청 대기열에 쌓임)
- 기본 스레드 풀 크기: Tomcat 기준 200개
- 각 요청이 하나의 스레드를 점유 (I/O 대기 중에도)
- 동시접속 200개 초과 시 요청 대기 발생
Spring WebFlux: Event Loop 모델
Client Request ──► Netty Event Loop (CPU 코어 수 = N개 스레드)
│
EventLoop-1 ──► Handler1 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
EventLoop-2 ──► Handler2 ──► (논블로킹 I/O 시작) ──► 다음 요청 처리
...
EventLoop-N ──► (I/O 완료 콜백) ──► 응답 전송
- 이벤트 루프 스레드 수: CPU 코어 수 (예: 8코어 = 8스레드)
- 요청이 스레드를 점유하지 않음 (I/O 대기 시 해제)
- 적은 스레드로 수만 개의 동시접속 처리 가능
Netty 이벤트 루프 모델
Netty는 WebFlux의 기본 런타임 서버입니다. Netty의 핵심 구조를 살펴보겠습니다.
┌────────────────────────────────────────────────────┐
│ Netty Server │
│ │
│ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Boss Group │ │ Worker Group │ │
│ │ (1 thread) │ │ (CPU cores x 2 threads) │ │
│ │ │ │ │ │
│ │ Accept │───►│ EventLoop-1: Channel A │ │
│ │ connections │ │ EventLoop-2: Channel B │ │
│ │ │ │ EventLoop-3: Channel C │ │
│ │ │ │ ... │ │
│ └──────────────┘ └──────────────────────────┘ │
│ │
│ Channel Pipeline: │
│ ┌─────────┬──────────┬───────────┬────────────┐ │
│ │ Decoder │ Handler1 │ Handler2 │ Encoder │ │
│ └─────────┴──────────┴───────────┴────────────┘ │
└────────────────────────────────────────────────────┘
- Boss Group: 클라이언트 연결을 수락하고 Worker Group에 위임
- Worker Group: 실제 I/O 처리를 담당하는 이벤트 루프 스레드
- Channel Pipeline: 인바운드/아웃바운드 핸들러 체인으로 데이터 처리
Project Reactor: Mono vs Flux
Project Reactor는 Spring WebFlux의 리액티브 라이브러리입니다. 핵심 타입 두 가지를 이해하는 것이 중요합니다.
Mono: 0개 또는 1개의 요소를 방출하는 비동기 시퀀스
// Mono 생성
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.empty();
Mono<String> mono3 = Mono.error(new RuntimeException("Error"));
// 실전: 사용자 단건 조회
@GetMapping("/users/me")
public Mono<UserResponse> getCurrentUser(Authentication auth) {
return userRepository.findByEmail(auth.getName()) // Mono<User>
.map(UserResponse::from) // Mono<UserResponse>
.switchIfEmpty(Mono.error(new UserNotFoundException()));
}
Flux: 0개에서 N개의 요소를 방출하는 비동기 시퀀스
// Flux 생성
Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> flux2 = Flux.range(1, 100);
Flux<Long> flux3 = Flux.interval(Duration.ofSeconds(1)); // 무한 스트림
// 실전: 사용자 목록 조회
@GetMapping("/users")
public Flux<UserResponse> getActiveUsers() {
return userRepository.findAll() // Flux<User>
.filter(User::isActive) // 활성 사용자만
.map(UserResponse::from) // DTO 변환
.take(100); // 최대 100명
}
핵심 Operators 가이드
리액티브 프로그래밍의 핵심은 오퍼레이터 조합입니다. 가장 많이 사용하는 오퍼레이터들을 살펴봅니다.
// 1. map: 동기 변환 (1:1)
Flux.just("hello", "world")
.map(String::toUpperCase)
// 결과: "HELLO", "WORLD"
// 2. flatMap: 비동기 변환 (1:N, 순서 보장 없음)
Flux.just(1L, 2L, 3L)
.flatMap(id -> userRepository.findById(id)) // 각 ID로 비동기 조회
// 결과: 순서가 보장되지 않음 (User2, User1, User3 가능)
// 3. concatMap: 비동기 변환 (1:N, 순서 보장)
Flux.just(1L, 2L, 3L)
.concatMap(id -> userRepository.findById(id))
// 결과: 순서 보장 (User1, User2, User3)
// 4. filter: 조건 필터링
Flux.range(1, 10)
.filter(n -> n % 2 == 0)
// 결과: 2, 4, 6, 8, 10
// 5. zip: 여러 소스를 조합 (1:1 매칭)
Mono<User> user = userRepository.findById(userId);
Mono<List<Order>> orders = orderRepository.findByUserId(userId).collectList();
Mono.zip(user, orders)
.map(tuple -> new UserWithOrders(tuple.getT1(), tuple.getT2()));
// 6. merge: 여러 소스를 병합 (인터리빙)
Flux.merge(
hotNewsFlux, // 실시간 뉴스
stockPriceFlux, // 주식 시세
alertFlux // 알림
)
.subscribe(event -> dashboard.update(event));
// 7. concat: 순차적 연결
Flux.concat(
cacheRepository.findById(id), // 캐시 먼저
databaseRepository.findById(id) // 캐시 미스 시 DB
)
.next(); // 첫 번째 결과만
// 8. switchIfEmpty: 빈 결과 처리
userRepository.findByEmail(email)
.switchIfEmpty(Mono.defer(() -> createDefaultUser(email)));
// 9. retry + backoff: 재시도 로직
webClient.get()
.uri("/external-api/data")
.retrieve()
.bodyToMono(ExternalData.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable));
// 10. onErrorResume: 에러 복구
userService.getUserProfile(userId)
.onErrorResume(UserNotFoundException.class,
ex -> Mono.just(UserProfile.anonymous()))
.onErrorResume(ServiceException.class,
ex -> cachedProfileService.getCachedProfile(userId));
WebFlux + R2DBC (리액티브 DB 접근)
R2DBC(Reactive Relational Database Connectivity)는 관계형 데이터베이스에 대한 논블로킹 접근을 제공합니다.
// build.gradle
// implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
// implementation 'io.r2dbc:r2dbc-postgresql'
// Entity
@Table("users")
public record User(
@Id Long id,
String email,
String name,
boolean active,
LocalDateTime createdAt
) {}
// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByActiveTrue();
@Query("SELECT * FROM users WHERE email = :email")
Mono<User> findByEmail(String email);
@Query("SELECT * FROM users WHERE created_at > :since ORDER BY created_at DESC")
Flux<User> findRecentUsers(LocalDateTime since);
}
// Service
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
private final DatabaseClient databaseClient;
public Flux<User> getActiveUsers() {
return userRepository.findByActiveTrue();
}
// 복잡한 쿼리는 DatabaseClient 사용
public Flux<UserStats> getUserStats() {
return databaseClient
.sql("""
SELECT u.id, u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.name
HAVING COUNT(o.id) > 0
""")
.map(row -> new UserStats(
row.get("id", Long.class),
row.get("name", String.class),
row.get("order_count", Long.class)
))
.all();
}
}
WebFlux + WebClient (리액티브 HTTP 클라이언트)
WebClient는 RestTemplate을 대체하는 논블로킹 HTTP 클라이언트입니다.
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.filter(ExchangeFilterFunctions.basicAuthentication("user", "password"))
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024)) // 16MB
.build();
}
}
@Service
@RequiredArgsConstructor
public class ExternalApiService {
private final WebClient webClient;
// GET 요청
public Mono<ProductResponse> getProduct(String productId) {
return webClient.get()
.uri("/products/{id}", productId)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError,
response -> Mono.error(new ProductNotFoundException(productId)))
.onStatus(HttpStatusCode::is5xxServerError,
response -> Mono.error(new ExternalServiceException()))
.bodyToMono(ProductResponse.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
}
// POST 요청
public Mono<OrderResponse> createOrder(OrderRequest request) {
return webClient.post()
.uri("/orders")
.bodyValue(request)
.retrieve()
.bodyToMono(OrderResponse.class);
}
// Streaming 응답
public Flux<EventData> streamEvents() {
return webClient.get()
.uri("/events/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(EventData.class);
}
}
성능 벤치마크: WebFlux vs MVC
다양한 동시접속 수준에서의 벤치마크 결과입니다. (4코어 8GB, 100ms 지연 시뮬레이션)
┌──────────────────┬────────────┬────────────┬────────────┐
│ 동시접속 수 │ Spring MVC │ WebFlux │ 차이 │
├──────────────────┼────────────┼────────────┼────────────┤
│ 1,000 │ 8,500 rps │ 9,200 rps │ +8% │
│ 10,000 │ 3,200 rps │ 9,100 rps │ +184% │
│ 100,000 │ 타임아웃 │ 8,800 rps │ MVC 실패 │
├──────────────────┼────────────┼────────────┼────────────┤
│ 메모리 사용 │ 2.1 GB │ 0.8 GB │ -62% │
│ 스레드 수 │ 204 │ 16 │ -92% │
│ P99 응답시간 │ 1,200ms │ 180ms │ -85% │
│ (동시접속 10K) │ │ │ │
└──────────────────┴────────────┴────────────┴────────────┘
핵심 포인트:
- 낮은 동시접속(1,000 이하): MVC와 WebFlux의 성능 차이가 크지 않음
- 높은 동시접속(10,000 이상): WebFlux가 압도적으로 유리
- 초고동시접속(100,000): MVC는 스레드 고갈로 사실상 작동 불가
주의점: 블로킹 코드 감지
WebFlux에서 가장 흔한 실수는 이벤트 루프 스레드에서 블로킹 코드를 실행하는 것입니다.
// 절대 금지: 이벤트 루프에서 블로킹 호출
@GetMapping("/bad-example")
public Mono<String> badExample() {
// JDBC는 블로킹 — 이벤트 루프를 멈추게 함
User user = jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class);
return Mono.just(user.getName());
}
// 올바른 방법 1: R2DBC 사용
@GetMapping("/good-example-1")
public Mono<String> goodExample1() {
return r2dbcUserRepository.findById(1L)
.map(User::getName);
}
// 올바른 방법 2: 블로킹 코드를 별도 스케줄러에서 실행
@GetMapping("/good-example-2")
public Mono<String> goodExample2() {
return Mono.fromCallable(() ->
jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = 1", User.class))
.subscribeOn(Schedulers.boundedElastic()) // 블로킹 전용 스레드풀
.map(User::getName);
}
블로킹 코드를 자동으로 감지하려면 Blockhound를 사용할 수 있습니다.
// build.gradle
// testImplementation 'io.projectreactor.tools:blockhound:1.0.9.RELEASE'
// 테스트 설정
@BeforeAll
static void setUp() {
BlockHound.install();
}
@Test
void shouldNotBlock() {
// 블로킹 코드가 있으면 BlockingOperationError 발생
StepVerifier.create(userService.getActiveUsers())
.expectNextCount(5)
.verifyComplete();
}
3. Apache Kafka 완전 가이드
Kafka 아키텍처 개요
Apache Kafka는 LinkedIn에서 개발한 분산 이벤트 스트리밍 플랫폼입니다. 초당 수백만 건의 이벤트를 안정적으로 처리할 수 있습니다.
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic-A P0 │ │ Topic-A P1 │ │ Topic-A P2 │ Leader │
│ │ Topic-B P1R │ │ Topic-B P0 │ │ Topic-A P0R │ Replica│
│ │ Topic-A P2R │ │ Topic-A P0R │ │ Topic-B P1 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ P = Partition (Leader), R = Replica (Follower) │
└─────────────────────────────────────────────────────────────┘
▲ │
│ ▼
┌─────────────┐ ┌─────────────────┐
│ Producers │ │ Consumer Group │
│ │ │ │
│ App1, App2 │ │ Consumer1 ← P0 │
│ │ │ Consumer2 ← P1 │
│ │ │ Consumer3 ← P2 │
└─────────────┘ └─────────────────┘
핵심 구성 요소:
- Broker: Kafka 서버 인스턴스. 클러스터에 여러 브로커가 존재
- Topic: 메시지를 구분하는 논리적 카테고리 (예: user-events, order-events)
- Partition: 토픽을 분할하는 물리적 단위. 병렬 처리의 기본 단위
- Replica: 파티션의 복제본. Leader와 Follower로 구분
- ISR (In-Sync Replicas): Leader와 동기화된 Replica 집합
Producer: 메시지 전송의 모든 것
// Producer 설정
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 성능 튜닝
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB 배치
config.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 20ms 대기
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 압축
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB 버퍼
// 안정성
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 모든 ISR 확인
config.put(ProducerConfig.RETRIES_CONFIG, 3); // 재시도
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 보장
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Acknowledgment 모드 비교:
| acks 값 | 동작 | 안정성 | 성능 |
|---|---|---|---|
| 0 | 전송 후 확인 안 함 | 낮음 (데이터 유실 가능) | 최고 |
| 1 | Leader만 확인 | 중간 (Leader 장애 시 유실 가능) | 높음 |
| all (-1) | 모든 ISR 확인 | 최고 (ISR 모두 확인) | 보통 |
Idempotent Producer: enable.idempotence=true 설정 시, 동일 메시지를 중복 전송하더라도 한 번만 기록합니다. Producer ID와 Sequence Number를 사용하여 중복을 감지합니다.
Consumer: 메시지 소비의 모든 것
// Consumer 설정
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service-group");
// 오프셋 관리
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋
// 성능 튜닝
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB 최소 패치
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 500ms 대기
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 최대 500건
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 3개의 컨슈머 스레드
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
// Consumer 리스너
@Service
@Slf4j
public class OrderEventConsumer {
@KafkaListener(
topics = "order-events",
groupId = "order-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
log.info("Received: topic={}, partition={}, offset={}, key={}",
record.topic(), record.partition(), record.offset(), record.key());
OrderEvent event = record.value();
processOrderEvent(event);
ack.acknowledge(); // 수동 커밋
} catch (Exception e) {
log.error("Failed to process order event", e);
// 에러 처리: DLQ(Dead Letter Queue)로 전송하거나 재시도
}
}
}
Consumer Group과 파티션 할당:
Topic: order-events (6 partitions)
Consumer Group: order-service-group
Consumer-1: P0, P1 (2개 파티션 담당)
Consumer-2: P2, P3 (2개 파티션 담당)
Consumer-3: P4, P5 (2개 파티션 담당)
규칙: 하나의 파티션은 그룹 내 하나의 컨슈머만 소비 가능
컨슈머 수 > 파티션 수이면 일부 컨슈머는 유휴 상태
리밸런싱(Rebalancing): 컨슈머가 그룹에 참가하거나 탈퇴하면 파티션 재할당이 발생합니다. 리밸런싱 중에는 모든 컨슈머가 일시적으로 소비를 중단합니다.
Exactly-Once Semantics (EOS)
Kafka의 전달 보증 수준은 세 가지입니다.
- At-Most-Once: 메시지가 유실될 수 있지만 중복은 없음
- At-Least-Once: 메시지가 중복될 수 있지만 유실은 없음
- Exactly-Once: 메시지가 정확히 한 번만 처리됨
Exactly-Once를 달성하려면 Transactional Producer와 Consumer를 함께 사용합니다.
// Transactional Producer 설정
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-producer-1");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 트랜잭션 사용 예제
@Service
@RequiredArgsConstructor
public class OrderProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void processOrder(Order order) {
kafkaTemplate.executeInTransaction(ops -> {
// 하나의 트랜잭션으로 묶임
ops.send("order-confirmed", order.getId(), new OrderConfirmedEvent(order));
ops.send("inventory-update", order.getProductId(), new InventoryUpdateEvent(order));
ops.send("notification", order.getUserId(), new NotificationEvent(order));
return true;
});
// 3개의 메시지가 모두 성공하거나 모두 실패
}
}
Kafka Streams 개요
Kafka Streams는 Kafka 위에 구축된 스트림 처리 라이브러리입니다.
// KStream: 이벤트 스트림 (각 레코드가 독립적)
// KTable: 변경 로그 (키의 최신 값만 유지)
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KStream<String, OrderEvent> orderStream(StreamsBuilder builder) {
KStream<String, OrderEvent> stream = builder.stream("order-events");
// 1. 필터링 + 변환
stream
.filter((key, event) -> event.getAmount() > 10000) // 1만원 이상
.mapValues(event -> new HighValueOrderEvent(event))
.to("high-value-orders");
// 2. 윈도우 집계: 5분 간격 주문 금액 합계
stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
() -> 0L,
(key, event, total) -> total + event.getAmount(),
Materialized.as("order-amount-store")
)
.toStream()
.to("order-amount-per-5min");
// 3. KStream-KTable Join
KTable<String, UserProfile> userTable = builder.table("user-profiles");
stream
.join(userTable,
(order, user) -> new EnrichedOrder(order, user))
.to("enriched-orders");
return stream;
}
}
Kafka Connect와 Debezium CDC
Kafka Connect는 외부 시스템과 Kafka 간 데이터를 안정적으로 이동시키는 프레임워크입니다.
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-server",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"topic.prefix": "myapp",
"database.include.list": "orders_db",
"table.include.list": "orders_db.orders,orders_db.order_items",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes"
}
}
Debezium은 CDC(Change Data Capture) 커넥터로, 데이터베이스의 변경 사항을 실시간으로 Kafka 토픽에 스트리밍합니다. INSERT, UPDATE, DELETE 이벤트가 모두 캡처됩니다.
Schema Registry + Avro
스키마 레지스트리는 Kafka 메시지의 스키마를 중앙에서 관리하여 호환성을 보장합니다.
// user-event.avsc (Avro 스키마)
{
"type": "record",
"name": "UserEvent",
"namespace": "com.example.events",
"fields": [
{"name": "userId", "type": "string"},
{"name": "action", "type": {"type": "enum", "name": "Action",
"symbols": ["CREATED", "UPDATED", "DELETED"]}},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
성능 튜닝 가이드
┌──────────────────────────────────────────────────────────────┐
│ Producer 튜닝 │
├──────────────────┬───────────────────────────────────────────┤
│ batch.size │ 16KB(기본) → 32~64KB (처리량 증가) │
│ linger.ms │ 0(기본) → 10~50ms (배치 효율성) │
│ compression.type │ none → lz4 (속도), zstd (압축률) │
│ buffer.memory │ 32MB(기본) → 64~128MB (대량 전송 시) │
│ max.in.flight │ 5(기본) → 1 (순서 보장 필요 시) │
├──────────────────┴───────────────────────────────────────────┤
│ Consumer 튜닝 │
├──────────────────┬───────────────────────────────────────────┤
│ fetch.min.bytes │ 1(기본) → 1024~10240 (배치 페치) │
│ fetch.max.wait │ 500ms(기본) → 조정 (지연 vs 처리량) │
│ max.poll.records │ 500(기본) → 워크로드에 따라 조정 │
│ session.timeout │ 45s(기본) → 리밸런싱 민감도 조절 │
├──────────────────┴───────────────────────────────────────────┤
│ Broker 튜닝 │
├──────────────────┬───────────────────────────────────────────┤
│ num.partitions │ 토픽 기본 파티션 수 (목표 처리량 / 파티션당)│
│ replication │ 3 (프로덕션 권장) │
│ min.insync │ 2 (acks=all과 함께 사용) │
│ log.retention │ 7일(기본) → 비즈니스 요구에 따라 │
└──────────────────┴───────────────────────────────────────────┘
운영: 파티션 수 결정 공식
파티션 수를 결정하는 경험적 공식입니다.
목표 처리량 = T (messages/sec)
단일 파티션 처리량 = P (messages/sec)
필요 파티션 수 = max(T/P_producer, T/P_consumer)
예시:
- 목표: 100,000 msg/sec
- Producer 단일 파티션: 50,000 msg/sec
- Consumer 단일 파티션: 10,000 msg/sec
- 필요 파티션: max(100K/50K, 100K/10K) = max(2, 10) = 10개
실무 권장:
- 소규모: 6~12 파티션
- 중규모: 12~30 파티션
- 대규모: 30~100 파티션
- 초대규모: 100+ (KRaft 모드에서 100만개 이상 지원)
4. ZooKeeper에서 KRaft로: 역사적 전환
ZooKeeper란?
Apache ZooKeeper는 분산 시스템의 코디네이션을 담당하는 서비스입니다. ZAB(ZooKeeper Atomic Broadcast) 프로토콜 기반의 합의 알고리즘을 사용합니다.
┌────────────────────────────────────────────┐
│ ZooKeeper Ensemble │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ZK Node │ │ ZK Node │ │ ZK Node │ │
│ │ (Leader) │ │(Follower)│ │(Follower)│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 저장 데이터: │
│ /brokers/ids/[0,1,2] - 브로커 목록 │
│ /brokers/topics/my-topic - 토픽 메타데이터 │
│ /controller - 컨트롤러 선출 │
│ /admin/delete_topics - 토픽 삭제 요청 │
│ /config - 동적 설정 │
└────────────────────────────────────────────┘
▲ ▲ ▲
│ │ │
Kafka Broker1 Broker2 Broker3
Kafka가 ZooKeeper에 의존했던 이유는 다음과 같습니다.
- 컨트롤러 선출: 클러스터 내 하나의 브로커가 컨트롤러 역할을 담당
- 토픽/파티션 메타데이터: 토픽 설정, 파티션 할당 정보 저장
- 브로커 등록: 살아있는 브로커 목록 관리 (Ephemeral Node)
- ACL 관리: 접근 제어 목록 저장
- 컨슈머 그룹 관리: 과거에는 컨슈머 오프셋도 ZK에 저장 (현재는 내부 토픽 사용)
ZooKeeper의 한계
ZooKeeper는 Kafka의 규모가 커지면서 여러 문제를 드러냈습니다.
- 운영 복잡성: Kafka와 별도로 ZooKeeper 클러스터를 운영해야 함 (별도의 모니터링, 패치, 백업)
- 확장성 병목: ZK의 znode에 메타데이터를 저장하므로, 수십만 개 이상의 파티션에서 성능 저하
- 컨트롤러 페일오버 지연: 컨트롤러가 죽으면 새 컨트롤러가 ZK에서 전체 메타데이터를 로드해야 하므로 수 분 소요 가능
- 단일 장애점 가능성: ZK 클러스터 자체가 과부하되거나 장애 발생 시 Kafka 전체에 영향
- 이중 인프라 비용: ZK 3노드 + Kafka N노드 운영 부담
KRaft 모드 (KIP-500)
KRaft(Kafka Raft)는 ZooKeeper를 완전히 제거하고, Kafka 자체에 Raft 기반 합의 프로토콜을 내장한 아키텍처입니다.
┌────────────────────────────────────────────────────┐
│ Kafka Cluster (KRaft Mode) │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ Controller Quorum │ │
│ │ │ │
│ │ Broker-0 Broker-1 Broker-2 │ │
│ │ (Controller (Controller (Controller │ │
│ │ + Broker) + Broker) + Broker) │ │
│ │ [Active] [Follower] [Follower] │ │
│ └───────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ Data Brokers │ │
│ │ Broker-3 Broker-4 Broker-5 │ │
│ │ (Broker only) (Broker only) (Broker only)│ │
│ └───────────────────────────────────────────┘ │
│ │
│ Metadata Log: __cluster_metadata (내부 토픽) │
│ 합의 프로토콜: Raft (자체 구현) │
└────────────────────────────────────────────────────┘
KRaft의 장점
| 항목 | ZooKeeper 모드 | KRaft 모드 |
|---|---|---|
| 외부 의존성 | ZK 클러스터 필요 | 없음 (자체 완결) |
| 운영 복잡성 | 높음 (2개 시스템) | 낮음 (1개 시스템) |
| 최대 파티션 수 | 약 20만 개 | 100만 개 이상 |
| 컨트롤러 페일오버 | 수 분 | 수 초 |
| 메타데이터 전파 | ZK Watch + RPC | Raft 로그 복제 |
| 배포 복잡성 | ZK 별도 설치/설정 | Kafka만 설치 |
마이그레이션 가이드: ZK에서 KRaft로
Kafka 3.6 이상에서 KRaft가 GA(General Availability)이며, Kafka 4.0에서 ZooKeeper는 완전히 제거됩니다.
# 1단계: 클러스터 ID 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --cluster-id
# 2단계: ZK에서 메타데이터 마이그레이션 준비
bin/kafka-storage.sh format \
--config config/kraft/server.properties \
--cluster-id YOUR_CLUSTER_ID \
--release-version 3.7
# 3단계: 브로커 설정 변경 (server.properties)
# ZK 모드 설정 제거
# zookeeper.connect=localhost:2181 ← 제거
# KRaft 모드 설정 추가
# process.roles=broker,controller
# node.id=1
# controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
# controller.listener.names=CONTROLLER
# listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# 4단계: 롤링 재시작 (하나씩 순차적으로)
# 각 브로커를 중지 → KRaft 설정 적용 → 재시작
# 5단계: 마이그레이션 완료 확인
bin/kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log --verify
마이그레이션 시 주의사항:
- 반드시 Kafka 3.6 이상 버전에서 진행
- 마이그레이션 전 전체 백업 수행
- 스테이징 환경에서 먼저 검증
- 롤링 방식으로 진행하여 다운타임 최소화
- 마이그레이션 중에는 토픽 생성/삭제 자제
5. Streaming API 비교: SSE vs WebSocket vs gRPC Streaming
세 가지 Streaming API 비교
| 항목 | SSE | WebSocket | gRPC Streaming |
|---|---|---|---|
| 통신 방향 | 서버 → 클라이언트 (단방향) | 양방향 (Full Duplex) | 양방향 (Full Duplex) |
| 프로토콜 | HTTP/1.1 | WS (HTTP Upgrade) | HTTP/2 |
| 데이터 형식 | Text (UTF-8) | Text / Binary | Protobuf (Binary) |
| 자동 재연결 | 내장 지원 | 수동 구현 필요 | 수동 구현 필요 |
| 브라우저 지원 | 모든 모던 브라우저 | 모든 모던 브라우저 | grpc-web 필요 |
| 멀티플렉싱 | 불가 (HTTP/1.1) | 불가 | 가능 (HTTP/2) |
| 프록시/방화벽 | 친화적 (HTTP) | 일부 제한 | 일부 제한 |
| 직렬화 오버헤드 | 낮음 | 낮음~중간 | 매우 낮음 |
| 적합한 케이스 | 알림, 피드, 시세 | 채팅, 게임, 협업 | MSA 간 통신 |
SSE (Server-Sent Events) 구현
WebFlux 기반 SSE:
@RestController
@RequestMapping("/api/stream")
public class SseController {
private final StockPriceService stockPriceService;
// 방법 1: Flux + ServerSentEvent 래퍼
@GetMapping(value = "/stocks", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
@RequestParam List<String> symbols) {
return stockPriceService.getPriceStream(symbols)
.map(price -> ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(price.getTimestamp()))
.event("price-update")
.data(price)
.retry(Duration.ofSeconds(5))
.build());
}
// 방법 2: 심플한 Flux 스트림
@GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<NotificationEvent> streamNotifications(
@AuthenticationPrincipal UserDetails user) {
return notificationService.getNotificationStream(user.getUsername())
.doOnCancel(() -> log.info("Client disconnected: {}", user.getUsername()));
}
// 방법 3: Heartbeat 포함 (연결 유지)
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> streamWithHeartbeat() {
Flux<ServerSentEvent<Object>> dataStream = eventService.getEventStream()
.map(event -> ServerSentEvent.builder()
.event("data")
.data((Object) event)
.build());
Flux<ServerSentEvent<Object>> heartbeat = Flux.interval(Duration.ofSeconds(30))
.map(tick -> ServerSentEvent.builder()
.event("heartbeat")
.data((Object) "ping")
.build());
return Flux.merge(dataStream, heartbeat);
}
}
클라이언트 (JavaScript):
const eventSource = new EventSource('/api/stream/stocks?symbols=AAPL,GOOGL,MSFT')
eventSource.addEventListener('price-update', (event) => {
const price = JSON.parse(event.data)
updateDashboard(price)
})
eventSource.addEventListener('heartbeat', () => {
console.log('Connection alive')
})
eventSource.onerror = (error) => {
console.error('SSE error:', error)
// EventSource는 자동으로 재연결을 시도합니다
}
WebSocket 구현
WebFlux 기반 WebSocket:
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketHandlerMapping(ChatWebSocketHandler handler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/chat", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Component
@Slf4j
public class ChatWebSocketHandler implements WebSocketHandler {
private final Sinks.Many<ChatMessage> chatSink =
Sinks.many().multicast().onBackpressureBuffer();
@Override
public Mono<Void> handle(WebSocketSession session) {
// 인바운드: 클라이언트 → 서버
Mono<Void> input = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::parseChatMessage)
.doOnNext(msg -> {
log.info("Received: {}", msg);
chatSink.tryEmitNext(msg); // 모든 구독자에게 브로드캐스트
})
.then();
// 아웃바운드: 서버 → 클라이언트
Mono<Void> output = session.send(
chatSink.asFlux()
.map(msg -> session.textMessage(toJson(msg)))
);
// 양방향 동시 처리
return Mono.zip(input, output).then();
}
private ChatMessage parseChatMessage(String payload) {
return new ObjectMapper().readValue(payload, ChatMessage.class);
}
private String toJson(ChatMessage msg) {
return new ObjectMapper().writeValueAsString(msg);
}
}
gRPC Streaming 구현
Proto 파일 정의:
syntax = "proto3";
package stockservice;
service StockService {
// Server Streaming: 서버가 연속으로 전송
rpc StreamPrices(StockRequest) returns (stream StockPrice);
// Client Streaming: 클라이언트가 연속으로 전송
rpc SendOrders(stream OrderRequest) returns (OrderSummary);
// Bidirectional Streaming: 양방향
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message StockRequest {
repeated string symbols = 1;
}
message StockPrice {
string symbol = 1;
double price = 2;
int64 timestamp = 3;
}
message OrderRequest {
string symbol = 1;
int32 quantity = 2;
double price = 3;
}
message OrderSummary {
int32 total_orders = 1;
double total_amount = 2;
}
message ChatMessage {
string user = 1;
string content = 2;
int64 timestamp = 3;
}
gRPC Server Streaming 구현:
@GrpcService
public class StockGrpcService extends StockServiceGrpc.StockServiceImplBase {
private final StockPriceService priceService;
@Override
public void streamPrices(StockRequest request,
StreamObserver<StockPrice> responseObserver) {
// 구독 시작
Disposable subscription = priceService
.getPriceStream(request.getSymbolsList())
.subscribe(
price -> {
// 각 가격 업데이트를 클라이언트에 전송
responseObserver.onNext(StockPrice.newBuilder()
.setSymbol(price.getSymbol())
.setPrice(price.getPrice())
.setTimestamp(price.getTimestamp())
.build());
},
error -> {
responseObserver.onError(Status.INTERNAL
.withDescription(error.getMessage())
.asRuntimeException());
},
() -> {
responseObserver.onCompleted();
}
);
// 클라이언트 연결 해제 시 구독 취소
Context.current().addListener(
context -> subscription.dispose(),
Runnable::run
);
}
}
Streaming API 선택 가이드
┌─────────────┐
│ 양방향 통신 │
│ 필요한가? │
└──────┬──────┘
예 / \ 아니오
/ \
┌────────▼──┐ ┌──▼──────────┐
│ 브라우저 │ │ SSE │
│ 직접 통신? │ │ (가장 간단) │
└─────┬─────┘ └─────────────┘
예 / \ 아니오
/ \
┌─────────▼─┐ ┌─▼──────────────┐
│ WebSocket │ │ gRPC Streaming │
│ (브라우저) │ │ (MSA/백엔드) │
└───────────┘ └────────────────┘
6. WebFlux + Kafka 통합 아키텍처
Reactor Kafka 설정
Reactor Kafka는 Kafka를 리액티브하게 사용할 수 있게 해주는 라이브러리입니다.
// build.gradle
// implementation 'io.projectreactor.kafka:reactor-kafka:1.3.23'
@Configuration
public class ReactorKafkaConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, Object> reactiveProducer() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
SenderOptions<String, Object> senderOptions = SenderOptions.create(props);
return new ReactiveKafkaProducerTemplate<>(senderOptions);
}
@Bean
public ReactiveKafkaConsumerTemplate<String, StockPrice> reactiveConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-price-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ReceiverOptions<String, StockPrice> receiverOptions =
ReceiverOptions.<String, StockPrice>create(props)
.subscription(Collections.singleton("stock-prices"));
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
}
실시간 데이터 파이프라인: Kafka에서 브라우저까지
┌──────────┐ ┌─────────┐ ┌───────────┐ ┌──────────┐ ┌──────────┐
│ External │ │ Kafka │ │ WebFlux │ │ SSE │ │ React │
│ API │───►│ Producer│───►│ Consumer │───►│ Endpoint │───►│ Frontend │
│ │ │ │ │ (Reactor) │ │ │ │ │
└──────────┘ └─────────┘ └───────────┘ └──────────┘ └──────────┘
시세 소스 토픽 발행 리액티브 소비 HTTP 스트림 실시간 렌더링
// 1. Kafka Producer: 외부 API에서 시세 데이터를 Kafka로
@Service
@RequiredArgsConstructor
@Slf4j
public class StockPriceProducer {
private final ReactiveKafkaProducerTemplate<String, Object> producer;
private final WebClient marketDataClient;
@Scheduled(fixedRate = 1000) // 1초마다
public void fetchAndPublish() {
marketDataClient.get()
.uri("/v1/quotes?symbols=AAPL,GOOGL,MSFT")
.retrieve()
.bodyToFlux(StockPrice.class)
.flatMap(price -> producer.send("stock-prices", price.getSymbol(), price))
.doOnNext(result -> log.debug("Sent: topic={}, partition={}, offset={}",
result.recordMetadata().topic(),
result.recordMetadata().partition(),
result.recordMetadata().offset()))
.subscribe();
}
}
// 2. Kafka Consumer + SSE: Kafka에서 브라우저로
@RestController
@RequestMapping("/api/stream")
@RequiredArgsConstructor
public class StockStreamController {
private final ReactiveKafkaConsumerTemplate<String, StockPrice> consumer;
// Kafka 토픽을 SSE로 브리지
@GetMapping(value = "/stock-prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamStockPrices(
@RequestParam(required = false) List<String> symbols) {
return consumer.receiveAutoAck()
.map(ConsumerRecord::value)
.filter(price -> symbols == null || symbols.contains(price.getSymbol()))
.map(price -> ServerSentEvent.<StockPrice>builder()
.id(price.getSymbol() + "-" + price.getTimestamp())
.event("price-update")
.data(price)
.build())
.doOnCancel(() -> log.info("Client disconnected from stock stream"));
}
}
CQRS + Event Sourcing with Kafka
┌──────────────────────────────────────────────────────────┐
│ CQRS Pattern │
│ │
│ Command Side │ Query Side │
│ │ │
│ ┌──────────┐ │ ┌───────────┐ │
│ │ WebFlux │ Command │ │ WebFlux │ Query │
│ │Controller│────────┐ │ │Controller │◄─────┐ │
│ └──────────┘ │ │ └───────────┘ │ │
│ ▼ │ │ │
│ ┌──────────────────────┐ │ ┌─────────────────────┐ │
│ │ Kafka Topic │ │ │ Read Database │ │
│ │ (Event Store) │──┼─►│ (Materialized │ │
│ │ order-events │ │ │ View / Redis) │ │
│ └──────────────────────┘ │ └─────────────────────┘ │
│ │ │
└──────────────────────────────────────────────────────────┘
// Command: 주문 생성 → Kafka에 이벤트 발행
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final ReactiveKafkaProducerTemplate<String, Object> producer;
public Mono<String> createOrder(CreateOrderCommand command) {
String orderId = UUID.randomUUID().toString();
OrderCreatedEvent event = new OrderCreatedEvent(
orderId, command.getUserId(), command.getItems(),
command.getTotalAmount(), Instant.now()
);
return producer.send("order-events", orderId, event)
.map(result -> orderId);
}
}
// Query: Kafka 이벤트를 소비하여 읽기 모델 업데이트
@Service
@RequiredArgsConstructor
public class OrderProjectionService {
private final ReactiveKafkaConsumerTemplate<String, OrderEvent> consumer;
private final ReactiveRedisTemplate<String, OrderView> redisTemplate;
@PostConstruct
public void startProjection() {
consumer.receiveAutoAck()
.map(ConsumerRecord::value)
.flatMap(this::updateProjection)
.doOnError(e -> log.error("Projection error", e))
.retry()
.subscribe();
}
private Mono<Boolean> updateProjection(OrderEvent event) {
if (event instanceof OrderCreatedEvent created) {
OrderView view = OrderView.from(created);
return redisTemplate.opsForValue()
.set("order:" + created.getOrderId(), view);
}
return Mono.just(true);
}
}
// Query Controller: Redis에서 읽기
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderQueryController {
private final ReactiveRedisTemplate<String, OrderView> redisTemplate;
@GetMapping("/{orderId}")
public Mono<OrderView> getOrder(@PathVariable String orderId) {
return redisTemplate.opsForValue()
.get("order:" + orderId)
.switchIfEmpty(Mono.error(new OrderNotFoundException(orderId)));
}
}
7. 실전 프로젝트: 실시간 주식 시세 시스템
전체 아키텍처
┌────────────────────────────────────────────────────────────────────┐
│ Real-Time Stock Price System │
│ │
│ ┌──────────┐ ┌──────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Market │ │ Kafka │ │ WebFlux │ │ React │ │
│ │ Data API │──►│ Producer │──►│ Consumer │──►│ Frontend │ │
│ │ (Yahoo/ │ │ │ │ + SSE │ │ (Dashboard) │ │
│ │ Alpha) │ │ │ │ │ │ │ │
│ └──────────┘ └──────────┘ └────────────┘ └──────────────┘ │
│ │ │ │ │ │
│ │ ┌────▼────┐ ┌─────▼─────┐ ┌──────▼──────┐ │
│ │ │ Kafka │ │ Redis │ │ Prometheus │ │
│ │ │ Topic │ │ Cache │ │ + Grafana │ │
│ │ │ (3 rep) │ │ │ │ │ │
│ │ └─────────┘ └───────────┘ └─────────────┘ │
│ │ │
│ ┌────▼──────────────────────────────────────┐ │
│ │ Kafka Streams: 집계/분석 │ │
│ │ - 5분 이동평균 │ │
│ │ - 일일 최고/최저가 │ │
│ │ - 거래량 이상 탐지 │ │
│ └───────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
핵심 코드 스니펫
1) Market Data Fetcher:
@Component
@RequiredArgsConstructor
@Slf4j
public class MarketDataFetcher {
private final WebClient marketClient;
private final ReactiveKafkaProducerTemplate<String, Object> kafkaProducer;
@Scheduled(fixedRate = 1000)
public void fetchPrices() {
List<String> symbols = List.of("AAPL", "GOOGL", "MSFT", "AMZN", "TSLA");
Flux.fromIterable(symbols)
.flatMap(symbol -> marketClient.get()
.uri("/quote/{symbol}", symbol)
.retrieve()
.bodyToMono(MarketQuote.class)
.map(quote -> new StockPrice(
symbol, quote.getPrice(), quote.getVolume(),
quote.getChange(), Instant.now().toEpochMilli()))
.onErrorResume(e -> {
log.warn("Failed to fetch {}: {}", symbol, e.getMessage());
return Mono.empty();
}))
.flatMap(price -> kafkaProducer
.send("stock-prices", price.getSymbol(), price)
.doOnSuccess(r -> log.debug("Published: {} = {}",
price.getSymbol(), price.getPrice())))
.subscribe();
}
}
2) Kafka Streams 집계:
@Configuration
@EnableKafkaStreams
public class StockAnalyticsConfig {
@Bean
public KStream<String, StockPrice> stockAnalytics(StreamsBuilder builder) {
KStream<String, StockPrice> priceStream = builder.stream("stock-prices");
// 5분 이동평균 계산
priceStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
PriceAggregation::new,
(symbol, price, agg) -> agg.add(price.getPrice()),
Materialized.<String, PriceAggregation, WindowStore<Bytes, byte[]>>
as("price-5min-avg")
.withValueSerde(new PriceAggregationSerde()))
.toStream()
.map((windowed, agg) -> KeyValue.pair(
windowed.key(),
new PriceAverage(windowed.key(), agg.getAverage(),
agg.getMin(), agg.getMax(), agg.getCount())))
.to("stock-price-analytics");
return priceStream;
}
}
// 집계 클래스
public class PriceAggregation {
private double sum = 0;
private double min = Double.MAX_VALUE;
private double max = Double.MIN_VALUE;
private long count = 0;
public PriceAggregation add(double price) {
sum += price;
min = Math.min(min, price);
max = Math.max(max, price);
count++;
return this;
}
public double getAverage() {
return count > 0 ? sum / count : 0;
}
// getters ...
}
3) SSE 엔드포인트:
@RestController
@RequestMapping("/api/stocks")
@RequiredArgsConstructor
public class StockStreamController {
private final ReactiveKafkaConsumerTemplate<String, StockPrice> priceConsumer;
private final ReactiveKafkaConsumerTemplate<String, PriceAverage> analyticsConsumer;
// 실시간 시세 스트림
@GetMapping(value = "/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> liveStream(
@RequestParam List<String> symbols) {
return priceConsumer.receiveAutoAck()
.map(ConsumerRecord::value)
.filter(p -> symbols.contains(p.getSymbol()))
.map(p -> ServerSentEvent.<StockPrice>builder()
.event("price")
.data(p)
.build());
}
// 분석 데이터 스트림
@GetMapping(value = "/analytics", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<PriceAverage>> analyticsStream() {
return analyticsConsumer.receiveAutoAck()
.map(ConsumerRecord::value)
.map(avg -> ServerSentEvent.<PriceAverage>builder()
.event("analytics")
.data(avg)
.build());
}
}
4) React Frontend (TypeScript):
// hooks/useStockStream.ts
import { useState, useEffect, useCallback } from 'react';
interface StockPrice {
symbol: string;
price: number;
volume: number;
change: number;
timestamp: number;
}
export function useStockStream(symbols: string[]) {
const [prices, setPrices] = useState<Map<string, StockPrice>>(new Map());
useEffect(() => {
const params = symbols.map(s => `symbols=${s}`).join('&');
const eventSource = new EventSource(`/api/stocks/live?${params}`);
eventSource.addEventListener('price', (event: MessageEvent) => {
const price: StockPrice = JSON.parse(event.data);
setPrices(prev => new Map(prev).set(price.symbol, price));
});
eventSource.onerror = () => {
console.error('SSE connection error, will auto-reconnect...');
};
return () => eventSource.close();
}, [symbols.join(',')]);
return prices;
}
// components/StockDashboard.tsx
function StockDashboard() {
const prices = useStockStream(['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']);
return (
<div className="grid grid-cols-5 gap-4">
{Array.from(prices.entries()).map(([symbol, price]) => (
<div key={symbol} className="p-4 rounded-lg bg-gray-800">
<h3 className="text-lg font-bold">{symbol}</h3>
<p className="text-2xl">${price.price.toFixed(2)}</p>
<p className={price.change >= 0 ? 'text-green-400' : 'text-red-400'}>
{price.change >= 0 ? '+' : ''}{price.change.toFixed(2)}%
</p>
</div>
))}
</div>
);
}
확장성: 파티션 기반 수평 확장
확장 전 (1 인스턴스):
WebFlux Instance-1 ← Kafka Consumer (P0, P1, P2, P3, P4, P5)
확장 후 (3 인스턴스):
WebFlux Instance-1 ← Kafka Consumer (P0, P1) ← SSE Client 1~1000
WebFlux Instance-2 ← Kafka Consumer (P2, P3) ← SSE Client 1001~2000
WebFlux Instance-3 ← Kafka Consumer (P4, P5) ← SSE Client 2001~3000
핵심: Kafka 파티션 수 = 최대 병렬 Consumer 수
파티션 키(symbol)로 같은 종목은 같은 파티션에 순서 보장
모니터링 설정
# docker-compose-monitoring.yml
services:
prometheus:
image: prom/prometheus:v2.51.0
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- '9090:9090'
grafana:
image: grafana/grafana:10.4.0
ports:
- '3000:3000'
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
kafka-exporter:
image: danielqsj/kafka-exporter:latest
command: ['--kafka.server=kafka:9092']
ports:
- '9308:9308'
// Micrometer 메트릭 추가
@Component
@RequiredArgsConstructor
public class StockMetrics {
private final MeterRegistry registry;
private final AtomicLong connectedClients = new AtomicLong(0);
@PostConstruct
void init() {
Gauge.builder("stock.stream.connected_clients", connectedClients, AtomicLong::get)
.description("Number of connected SSE clients")
.register(registry);
}
public void recordPricePublished(String symbol) {
registry.counter("stock.price.published",
"symbol", symbol).increment();
}
public void recordLatency(String symbol, long latencyMs) {
registry.timer("stock.price.latency",
"symbol", symbol)
.record(Duration.ofMillis(latencyMs));
}
public void clientConnected() { connectedClients.incrementAndGet(); }
public void clientDisconnected() { connectedClients.decrementAndGet(); }
}
8. 언제 무엇을 선택할 것인가?
기술 선택 의사결정 매트릭스
| 시나리오 | 추천 기술 | 이유 |
|---|---|---|
| 간단한 CRUD API | Spring MVC | 학습 곡선 낮고 생태계 풍부 |
| 높은 동시접속 API | WebFlux | 적은 리소스로 만 단위 동시접속 |
| 이벤트 스트리밍/로그 | Kafka | 높은 처리량, 내구성, 리플레이 가능 |
| 실시간 알림/피드 | SSE | 단방향, 자동 재연결, HTTP 호환 |
| 실시간 채팅/게임 | WebSocket | 양방향, 낮은 지연시간 |
| MSA 간 실시간 통신 | gRPC Streaming | 양방향, Protobuf, 높은 성능 |
| CDC(변경 감지) | Kafka Connect + Debezium | DB 변경을 실시간 캡처 |
| 복잡한 스트림 처리 | Kafka Streams | Stateful 처리, 윈도우 집계 |
| CQRS 패턴 | Kafka + WebFlux | 이벤트 소싱 + 리액티브 쿼리 |
조합 패턴
실전에서는 단일 기술이 아니라 여러 기술을 조합합니다.
패턴 1: 실시간 대시보드
Data Source → Kafka → WebFlux Consumer → SSE → Browser
패턴 2: 실시간 채팅 + 메시지 저장
Browser ↔ WebSocket ↔ WebFlux → Kafka (저장) → Elasticsearch (검색)
패턴 3: MSA 이벤트 드리븐
Service A → Kafka → Service B (WebFlux Consumer)
→ Service C (WebFlux Consumer)
→ Analytics (Kafka Streams)
패턴 4: IoT 데이터 처리
Sensors → MQTT → Kafka Connect → Kafka → Kafka Streams (집계)
→ WebFlux → SSE → Dashboard
안티패턴 (피해야 할 것들)
- WebFlux에서 JDBC 사용: 이벤트 루프를 블로킹하여 성능 급격 저하. R2DBC 또는
Schedulers.boundedElastic()사용 - 모든 API를 WebFlux로: 간단한 CRUD는 MVC가 더 적합. 불필요한 복잡성 증가
- Kafka를 단순 메시지 큐로: RabbitMQ/SQS가 더 적합한 경우도 있음. Kafka는 이벤트 로그에 최적화
- 파티션 수 과다 설정: 파티션이 많으면 리밸런싱 비용 증가. 필요에 맞게 설정
- SSE에서 대용량 데이터: SSE는 텍스트 기반이므로 바이너리 데이터에 부적합. gRPC 사용
9. 학습 로드맵 (6개월)
1개월 차: 리액티브 프로그래밍 기초
- Week 1-2: Project Reactor 핵심 (Mono, Flux, 오퍼레이터)
- 공식 문서의 "Learn" 섹션 완독
- Lite Rx Hands-On 실습 프로젝트
- Week 3-4: Spring WebFlux 기본
- 간단한 REST API를 MVC에서 WebFlux로 마이그레이션
- R2DBC 연동 실습
2개월 차: WebFlux 심화
- Week 1-2: WebClient, WebSocket, SSE 구현
- 실시간 알림 시스템 미니 프로젝트
- Week 3-4: 테스트 (StepVerifier, WebTestClient)
- BlockHound 적용
- 성능 테스트 (Gatling/k6)
3개월 차: Apache Kafka 기초
- Week 1-2: Kafka 아키텍처, Producer/Consumer
- Docker Compose로 로컬 Kafka 클러스터 구축
- 기본 메시지 발행/소비 실습
- Week 3-4: Consumer Group, Offset 관리, EOS
- 멀티 컨슈머 그룹 시나리오 실습
- Transactional Producer 구현
4개월 차: Kafka 심화
- Week 1-2: Kafka Streams, KTable/KStream
- 윈도우 집계, Join 실습
- Week 3-4: Kafka Connect, Schema Registry
- Debezium CDC 연동
- Avro 스키마 관리
5개월 차: 통합 프로젝트
- Week 1-2: WebFlux + Kafka 통합
- Reactor Kafka 활용
- CQRS + Event Sourcing 패턴
- Week 3-4: 실시간 대시보드 프로젝트
- Kafka에서 SSE까지 전체 파이프라인
- React 프론트엔드 연동
6개월 차: 운영과 최적화
- Week 1-2: KRaft 모드 마이그레이션 실습
- 모니터링 (Prometheus + Grafana)
- 알림 설정 (Consumer Lag, Error Rate)
- Week 3-4: 성능 튜닝과 장애 대응
- 부하 테스트 및 병목 분석
- 장애 시나리오 시뮬레이션 (브로커 다운, 네트워크 파티션)
실전 퀴즈
아래 퀴즈를 통해 학습 내용을 점검해 보세요.
Q1: WebFlux에서 Mono.just()과 Mono.defer()의 차이는 무엇인가요?
Mono.just(value)는 값을 즉시 캡처하여 Mono로 감쌉니다. 구독 시점과 관계없이 동일한 값을 반환합니다.
Mono.defer(() -> Mono.just(value))는 구독 시점에 람다를 평가합니다. 따라서 매 구독마다 새로운 값을 생성할 수 있습니다.
// just: 생성 시점에 값이 결정됨
Mono<Long> eager = Mono.just(System.currentTimeMillis());
// 1초 후 구독해도 동일한 타임스탬프
// defer: 구독 시점에 값이 결정됨
Mono<Long> lazy = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
// 구독할 때마다 새로운 타임스탬프
실무에서 switchIfEmpty와 함께 사용할 때 중요합니다. switchIfEmpty(Mono.just(createDefault()))는 createDefault()가 즉시 실행되지만, switchIfEmpty(Mono.defer(() -> createDefault()))는 실제로 빈 경우에만 실행됩니다.
Q2: Kafka에서 acks=all과 min.insync.replicas=2를 함께 설정하면 어떤 보장을 제공하나요?
acks=all은 Leader가 모든 ISR(In-Sync Replicas)의 확인을 받은 후에 Producer에게 성공을 반환합니다.
min.insync.replicas=2는 ISR의 최소 크기를 2로 설정합니다. ISR이 2 미만이면 Producer는 NotEnoughReplicasException을 받습니다.
이 조합은 다음을 보장합니다:
- 메시지가 최소 2개의 브로커에 기록된 후에만 성공 응답
- Replication Factor가 3일 때, 1대의 브로커가 장애나도 데이터 유실 없음
- 2대 이상 동시 장애 시 쓰기 실패 (데이터 유실 방지를 위해 의도적 거부)
프로덕션에서 가장 많이 사용하는 안정성 설정입니다.
Q3: SSE 연결에서 클라이언트가 네트워크 장애로 끊어졌다가 재연결할 때, 어떻게 유실된 이벤트를 복구할 수 있나요?
SSE는 Last-Event-ID 헤더를 통해 재연결 시 마지막으로 수신한 이벤트 ID를 서버에 전달합니다. 서버는 이를 기반으로 유실된 이벤트를 재전송할 수 있습니다.
구현 방법:
- 서버에서 각 이벤트에 고유 ID를 부여합니다 (예: Kafka offset 또는 타임스탬프).
- 클라이언트 재연결 시
Last-Event-ID헤더를 확인합니다. - 해당 ID 이후의 이벤트를 Kafka에서 다시 소비하여 전송합니다.
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<EventData>> stream(
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
long startOffset = lastEventId != null ? Long.parseLong(lastEventId) + 1 : -1;
// startOffset부터 Kafka 소비 시작
return eventService.getEventsFrom(startOffset)
.map(e -> ServerSentEvent.<EventData>builder()
.id(String.valueOf(e.getOffset()))
.data(e)
.build());
}
Kafka의 메시지 보존(retention) 덕분에 일정 기간 내의 이벤트는 언제든 다시 읽을 수 있습니다.
Q4: KRaft 모드에서 Controller Quorum의 Active Controller가 장애나면 어떻게 되나요?
KRaft 모드에서는 Raft 합의 프로토콜을 사용하여 Controller 선출을 자동으로 처리합니다.
- Active Controller가 하트비트를 보내지 않으면, Follower Controller들이 이를 감지합니다.
- Election Timeout이 만료되면 Follower 중 하나가 후보(Candidate)로 전환됩니다.
- 과반수(Quorum)의 투표를 받은 후보가 새로운 Active Controller가 됩니다.
- 새 Controller는 이미 Raft 로그를 통해 메타데이터를 복제하고 있으므로, ZooKeeper 모드처럼 전체 메타데이터를 다시 로드할 필요가 없습니다.
결과적으로 페일오버 시간이 ZooKeeper 모드의 수 분에서 수 초로 단축됩니다. Controller Quorum이 3개(1 Active + 2 Follower)일 때, 1대 장애까지 허용 가능합니다.
Q5: WebFlux에서 flatMap과 concatMap의 차이를 설명하고, 각각 언제 사용해야 하나요?
flatMap: 내부 Publisher를 구독할 때 동시성(concurrency)을 허용합니다. 여러 내부 스트림이 인터리빙(interleaving)되어 순서가 보장되지 않습니다.
concatMap: 내부 Publisher를 순차적으로 구독합니다. 이전 스트림이 완료된 후에 다음 스트림을 구독하므로 순서가 보장됩니다.
// flatMap: 순서 무관, 최대 성능
// 사용 예: 여러 사용자의 프로필을 병렬로 조회
Flux.fromIterable(userIds)
.flatMap(id -> userService.findById(id)) // 동시 실행
// 결과 순서: [User3, User1, User2] (비결정적)
// concatMap: 순서 보장, 순차 실행
// 사용 예: 이벤트를 순서대로 처리해야 할 때
Flux.fromIterable(events)
.concatMap(event -> processEvent(event)) // 하나씩 순차 실행
// 결과 순서: [Event1, Event2, Event3] (항상 동일)
선택 기준:
- 순서가 중요하지 않고 성능이 중요하면 flatMap
- 순서가 반드시 보장되어야 하면 concatMap
- flatMap에 concurrency 파라미터를 설정하여 동시 실행 수를 제한할 수도 있습니다:
flatMap(fn, 10)(최대 10개 동시)
참고 자료
공식 문서
- Spring WebFlux Reference: https://docs.spring.io/spring-framework/reference/web/webflux.html
- Project Reactor Reference: https://projectreactor.io/docs/core/release/reference/
- Apache Kafka Documentation: https://kafka.apache.org/documentation/
- KRaft (KIP-500): https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
- Reactive Streams Specification: https://www.reactive-streams.org/
- R2DBC Specification: https://r2dbc.io/spec/1.0.0.RELEASE/spec/html/
아키텍처 및 패턴
- Reactive Manifesto: https://www.reactivemanifesto.org/
- Kafka The Definitive Guide (O'Reilly)
- Designing Data-Intensive Applications — Martin Kleppmann
- Enterprise Integration Patterns: https://www.enterpriseintegrationpatterns.com/
- CQRS Pattern (Microsoft): https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs
- Event Sourcing Pattern: https://martinfowler.com/eaaDev/EventSourcing.html
Kafka 생태계
- Confluent Schema Registry: https://docs.confluent.io/platform/current/schema-registry/
- Debezium CDC: https://debezium.io/documentation/
- Kafka Streams Documentation: https://kafka.apache.org/documentation/streams/
- Reactor Kafka: https://projectreactor.io/docs/kafka/release/reference/
모니터링 및 운영
- Kafka Exporter for Prometheus: https://github.com/danielqsj/kafka_exporter
- Micrometer Metrics: https://micrometer.io/docs
- Grafana Kafka Dashboard: https://grafana.com/grafana/dashboards/
- BlockHound (블로킹 감지): https://github.com/reactor/BlockHound
학습 자료
- Lite Rx Hands-On: https://github.com/reactor/lite-rx-api-hands-on
- Kafka Tutorials (Confluent): https://developer.confluent.io/tutorials/
- Spring WebFlux Workshop: https://github.com/spring-projects/spring-framework
- gRPC Java: https://grpc.io/docs/languages/java/
- SSE Specification (W3C): https://html.spec.whatwg.org/multipage/server-sent-events.html