들어가며: Event Streaming의 진화
Event Streaming은 현대 분산 시스템의 핵심 인프라입니다. Apache Kafka를 중심으로 한 이벤트 스트리밍 생태계는 단순한 메시지 큐를 넘어, 실시간 데이터 파이프라인, 이벤트 소싱, CQRS, 스트림 처리의 기반이 되었습니다.
이 가이드에서는 Kafka의 내부 구조부터 Exactly-Once 시맨틱스, Schema Registry, Apache Flink까지 심층적으로 다룹니다.
1. Kafka 내부 구조 심화
파티션 구조
Kafka의 토픽은 하나 이상의 파티션으로 분할됩니다. 각 파티션은 순서가 보장되는 불변의 로그입니다.
Topic: orders (3 partitions)
Partition 0: [msg0] [msg3] [msg6] [msg9] ... → offset 증가
Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset 증가
Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset 증가
- 같은 파티션 내에서만 순서 보장
- 파티션 수 = Consumer Group 내 최대 병렬 소비 수
- 파티션 키로 같은 키의 메시지가 같은 파티션으로
세그먼트 파일과 인덱스
각 파티션은 디스크에 세그먼트 파일로 저장됩니다.
/kafka-logs/orders-0/
├── 00000000000000000000.log # 세그먼트 파일 (실제 메시지)
├── 00000000000000000000.index # 오프셋 인덱스
├── 00000000000000000000.timeindex # 타임스탬프 인덱스
├── 00000000000005242880.log # 다음 세그먼트
├── 00000000000005242880.index
├── 00000000000005242880.timeindex
└── leader-epoch-checkpoint
세그먼트 파일 구조
========================
.log 파일 (메시지 저장)
┌──────────────────────────────────────┐
│ Batch Header │
│ - Base Offset: 0 │
│ - Batch Length: 256 bytes │
│ - Magic: 2 (Kafka 0.11+) │
│ - CRC: checksum │
│ - Attributes: compression, txn │
│ - Producer ID: 12345 │
│ - Producer Epoch: 0 │
│ - Base Sequence: 0 │
├──────────────────────────────────────┤
│ Record 0 │
│ - Offset Delta: 0 │
│ - Timestamp Delta: 0 │
│ - Key: "order-123" │
│ - Value: (serialized payload) │
│ - Headers: [("source", "web")] │
├──────────────────────────────────────┤
│ Record 1 │
│ - Offset Delta: 1 │
│ - ... │
└──────────────────────────────────────┘
.index 파일 (희소 인덱스)
┌────────────────────────┐
│ Relative Offset → File │
│ Position │
├────────────────────────┤
│ 0 → 0 │
│ 4096 → 32768 │
│ 8192 → 65536 │
└────────────────────────┘
.timeindex 파일
┌────────────────────────────────┐
│ Timestamp → Relative Offset │
├────────────────────────────────┤
│ 1700000000000 → 0 │
│ 1700000060000 → 4096 │
└────────────────────────────────┘
Log Compaction
Log Compaction은 각 키의 최신 값만 유지하는 보존 전략입니다.
Compaction 전:
offset 0: key=A, value=v1
offset 1: key=B, value=v1
offset 2: key=A, value=v2 ← A의 최신 값
offset 3: key=C, value=v1
offset 4: key=B, value=v2 ← B의 최신 값
offset 5: key=A, value=v3 ← A의 최신 값
Compaction 후:
offset 3: key=C, value=v1 (C의 유일한 값)
offset 4: key=B, value=v2 (B의 최신 값)
offset 5: key=A, value=v3 (A의 최신 값)
# Log Compaction 설정
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
min.cleanable.dirty.ratio=0.5
ISR (In-Sync Replicas)
ISR은 리더와 동기화된 레플리카의 집합입니다.
Partition 0 (Replication Factor = 3)
======================================
Broker 1 (Leader): [0] [1] [2] [3] [4] [5] ← LEO: 6
Broker 2 (Follower): [0] [1] [2] [3] [4] ← LEO: 5
Broker 3 (Follower): [0] [1] [2] [3] [4] [5] ← LEO: 6
ISR = {Broker 1, Broker 2, Broker 3}
HW (High Watermark) = 5 (ISR 내 최소 LEO)
- Consumer는 HW까지만 읽을 수 있음
- Broker 2가 지연이 심해지면 ISR에서 제거
- replica.lag.time.max.ms 초과 시 ISR에서 제거
# ISR 관련 설정
min.insync.replicas=2 # 최소 동기화 레플리카 수
replica.lag.time.max.ms=30000 # ISR 탈락 기준 시간
unclean.leader.election.enable=false # ISR 외 레플리카의 리더 선출 방지
KRaft vs ZooKeeper
Kafka 3.x부터 ZooKeeper 없이 동작하는 KRaft 모드가 도입되었습니다.
ZooKeeper 모드 (레거시)
==========================
[ZooKeeper Ensemble]
↕ 메타데이터
[Controller (Broker 중 1개)]
↕ 리더 선출
[Broker 1] [Broker 2] [Broker 3]
문제점:
- ZooKeeper 별도 운영 필요
- 메타데이터 동기화 지연
- 파티션 수 증가 시 성능 저하
KRaft 모드 (Kafka 3.x+)
==========================
[Controller Quorum]
Broker 1 (Controller + Broker)
Broker 2 (Controller + Broker)
Broker 3 (Controller + Broker)
장점:
- ZooKeeper 의존성 제거
- 메타데이터 관리 성능 향상
- 수백만 파티션 지원
- 운영 복잡도 감소
# KRaft 설정
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER
2. Producer 심화
Partitioner
프로듀서는 메시지를 어느 파티션에 보낼지 결정하는 파티셔너를 사용합니다.
// 기본 파티셔너 동작
public class DefaultPartitioner implements Partitioner {
// 키가 있는 경우: murmur2 해시
// 키가 없는 경우: Sticky Partitioner (배치 단위로 파티션 변경)
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// Sticky Partitioner: 배치가 꽉 찰 때까지 같은 파티션
return stickyPartitionCache.partition(topic, cluster);
}
// Key-based partitioning: 같은 키 = 같은 파티션
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// 커스텀 파티셔너 예시: 지역 기반
public class RegionPartitioner implements Partitioner {
private Map<String, Integer> regionMapping;
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String region = extractRegion(value);
int numPartitions = cluster.partitionsForTopic(topic).size();
return regionMapping.getOrDefault(region, 0) % numPartitions;
}
}
배치와 압축
# Producer 배치 설정
batch.size=16384 # 배치 크기 (bytes)
linger.ms=5 # 배치 대기 시간
buffer.memory=33554432 # 전체 버퍼 메모리
# 압축 설정
compression.type=lz4 # none, gzip, snappy, lz4, zstd
Producer 배치 처리 흐름
==========================
send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]
│
┌───────────┴───────────┐
│ Partition 0 배치 │
│ [msg1][msg2][msg3] │
├───────────────────────┤
│ Partition 1 배치 │
│ [msg4][msg5] │
└───────────┬───────────┘
│
batch.size 도달
또는 linger.ms 경과
│
▼
[Sender Thread]
│
압축 + 네트워크 전송
│
▼
[Kafka Broker]
멱등 프로듀서 (Idempotent Producer)
# 멱등 프로듀서 설정
enable.idempotence=true # Kafka 3.0+ 기본값
acks=all # 멱등성 요구사항
retries=2147483647 # 무한 재시도
max.in.flight.requests.per.connection=5 # 최대 5 (멱등성 보장 범위)
멱등 프로듀서 동작 원리
==========================
Producer Broker
│ │
│── ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 0 │
│ Message: "order-1" ──────▶│ ✓ 저장
│ │
│── ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 1 │
│ Message: "order-2" ──────▶│ ✓ 저장
│ │
│── (네트워크 타임아웃으로 재전송) │
│ ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 1 │
│ Message: "order-2" ──────▶│ ✗ 중복! 무시
│ │
트랜잭셔널 프로듀서
// 트랜잭셔널 프로듀서 예시
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// 여러 토픽/파티션에 원자적으로 전송
producer.send(new ProducerRecord<>("orders", "key1", "order-created"));
producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));
producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));
// Consumer 오프셋도 트랜잭션에 포함 가능
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
3. Consumer 심화
Consumer Group과 파티션 할당
Topic: orders (6 partitions)
Consumer Group: order-processor
===================================
Case 1: 3 Consumers
Consumer A: [P0] [P1]
Consumer B: [P2] [P3]
Consumer C: [P4] [P5]
Case 2: 6 Consumers (이상적)
Consumer A: [P0]
Consumer B: [P1]
Consumer C: [P2]
Consumer D: [P3]
Consumer E: [P4]
Consumer F: [P5]
Case 3: 8 Consumers (2개 유휴)
Consumer A: [P0]
Consumer B: [P1]
Consumer C: [P2]
Consumer D: [P3]
Consumer E: [P4]
Consumer F: [P5]
Consumer G: (유휴 - idle)
Consumer H: (유휴 - idle)
파티션 할당 전략
// 파티션 할당 전략 비교
Properties props = new Properties();
// 1. Range Assignor (기본)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor");
// 토픽별로 파티션을 범위로 나눔
// Topic A: P0,P1,P2 → C1: [P0,P1], C2: [P2]
// Topic B: P0,P1,P2 → C1: [P0,P1], C2: [P2]
// 문제: C1에 편중
// 2. RoundRobin Assignor
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 모든 토픽의 파티션을 라운드 로빈으로 분배
// C1: [A-P0, B-P1], C2: [A-P1, B-P0], C3: [A-P2, B-P2]
// 3. Sticky Assignor
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.StickyAssignor");
// 기존 할당 최대한 유지 + 균등 분배
// 리밸런싱 시 최소한의 파티션만 이동
// 4. CooperativeSticky Assignor (권장)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Sticky + 점진적 리밸런싱
// 전체 중단 없이 파티션 재할당
리밸런싱: Eager vs Incremental
Eager Rebalancing (기존 방식)
================================
1. Consumer C3 추가됨
2. 모든 Consumer가 파티션 해제 (Stop-the-World)
C1: [] C2: [] C3: []
3. 새로운 할당 계산
4. 파티션 재할당
C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]
→ 전체 처리 중단 발생!
Incremental Cooperative Rebalancing (권장)
================================
1. Consumer C3 추가됨
2. 1차 리밸런스: 이동 필요한 파티션만 해제
C1: [P0,P1,P2] → C1: [P0,P1] (P2 해제)
C2: [P3,P4,P5] → C2: [P3,P4] (P5 해제)
3. 2차 리밸런스: 해제된 파티션을 C3에 할당
C3: [] → C3: [P2,P5]
→ 나머지 파티션은 중단 없이 계속 처리!
오프셋 관리
// 자동 커밋 (기본)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// 위험: 처리 전 커밋되면 메시지 유실 가능
// 수동 커밋 (권장)
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 메시지 처리
processRecord(record);
}
// 동기 커밋 (처리 완료 후)
consumer.commitSync();
// 또는 비동기 커밋 (성능 향상)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
}
오프셋 저장소: __consumer_offsets 토픽
============================================
Key: (group_id, topic, partition)
Value: (offset, metadata, timestamp)
예시:
Key: (order-processor, orders, 0)
Value: (offset=42, metadata="", timestamp=1700000000)
4. Exactly-Once Semantics
메시지 전달 보장 수준
At-Most-Once (최대 한 번)
================================
Producer ──send()──▶ Broker
(acks=0, 확인 안 함)
- 메시지 유실 가능
- 중복 없음
- 가장 빠름
At-Least-Once (최소 한 번)
================================
Producer ──send()──▶ Broker ──ack──▶ Producer
(acks=all, 재시도 있음)
- 메시지 유실 없음
- 중복 가능
- 대부분의 기본 설정
Exactly-Once (정확히 한 번)
================================
Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)
(멱등 프로듀서 + 트랜잭션 + read_committed)
- 메시지 유실 없음
- 중복 없음
- 가장 강력하지만 오버헤드 있음
Exactly-Once 구현
// Exactly-Once: Consume-Transform-Produce 패턴
Properties producerProps = new Properties();
producerProps.put("transactional.id", "eos-processor-1");
producerProps.put("enable.idempotence", "true");
Properties consumerProps = new Properties();
consumerProps.put("group.id", "eos-group");
consumerProps.put("isolation.level", "read_committed"); // 핵심!
consumerProps.put("enable.auto.commit", "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Collections.singleton("input-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// Transform
String result = transform(record.value());
// Produce to output topic
producer.send(new ProducerRecord<>(
"output-topic", record.key(), result));
}
// Consumer 오프셋을 트랜잭션에 포함
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords =
records.records(partition);
long lastOffset = partRecords.get(
partRecords.size() - 1).offset();
offsets.put(partition,
new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets,
consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
Exactly-Once 트랜잭션 흐름
================================
1. beginTransaction()
2. send(output-topic, msg1) → Broker에 임시 저장 (uncommitted)
3. send(output-topic, msg2) → Broker에 임시 저장
4. sendOffsetsToTransaction() → Consumer 오프셋도 트랜잭션에 포함
5. commitTransaction() → 모든 메시지 + 오프셋 원자적 커밋
read_committed Consumer는 커밋된 메시지만 읽음
→ 중간에 실패하면 abortTransaction()으로 모두 롤백
5. Schema Registry
스키마 직렬화 형식 비교
| 특성 | Avro | Protobuf | JSON Schema |
|---|---|---|---|
| 직렬화 크기 | 작음 | 작음 | 큼 |
| 스키마 진화 | 우수 | 우수 | 보통 |
| 코드 생성 | 선택적 | 필수 | 불필요 |
| 가독성 | 낮음 (바이너리) | 낮음 (바이너리) | 높음 (텍스트) |
| 동적 타이핑 | 지원 | 미지원 | 지원 |
| Kafka 호환성 | 최상 | 우수 | 보통 |
Avro 스키마와 진화
// Version 1: 초기 스키마
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "created_at", "type": "long"}
]
}
// Version 2: Backward Compatible 진화
// 새 필드에 default 값 추가 → 기존 Consumer가 새 데이터를 읽을 수 있음
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "created_at", "type": "long"},
{"name": "status", "type": "string", "default": "CREATED"},
{"name": "region", "type": ["null", "string"], "default": null}
]
}
호환성 모드
BACKWARD (기본값, 권장)
================================
- 새 스키마로 기존 데이터를 읽을 수 있음
- 새 필드: default 값 필수
- 필드 삭제: 가능
- Consumer 먼저 업그레이드 → Producer 업그레이드
FORWARD
================================
- 기존 스키마로 새 데이터를 읽을 수 있음
- 새 필드: default 값 필수
- 필드 삭제: default 값이 있는 필드만
- Producer 먼저 업그레이드 → Consumer 업그레이드
FULL
================================
- BACKWARD + FORWARD 모두 충족
- 가장 안전하지만 가장 제한적
NONE
================================
- 호환성 검사 없음
- 위험! 운영 환경에서 사용 금지
# Schema Registry API 사용
import requests
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
# 스키마 등록
schema = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",
json={"schema": json.dumps(schema), "schemaType": "AVRO"},
)
# 호환성 검사
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",
json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},
)
compatibility = response.json()
print(f"Compatible: {compatibility['is_compatible']}")
# 호환성 모드 설정
requests.put(
f"{SCHEMA_REGISTRY_URL}/config/orders-value",
json={"compatibility": "BACKWARD"},
)
Protobuf 스키마 진화
// Version 1
syntax = "proto3";
package com.company.events;
message Order {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
int64 created_at = 5;
}
// Version 2 (호환성 유지)
message Order {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
int64 created_at = 5;
// 새 필드: 기존 필드 번호 사용 금지!
string status = 6; // 새로 추가
string region = 7; // 새로 추가
// reserved 2; // 삭제된 필드 번호 예약
}
6. Apache Flink 심화
Flink 아키텍처 개요
┌─────────────────────────────────────────────┐
│ Flink Cluster │
├─────────────────────────────────────────────┤
│ │
│ [JobManager] │
│ - 잡 스케줄링 │
│ - 체크포인트 조정 │
│ - 장애 복구 │
│ │
│ [TaskManager 1] [TaskManager 2] │
│ - Task Slot 1 - Task Slot 1 │
│ - Task Slot 2 - Task Slot 2 │
│ - Task Slot 3 - Task Slot 3 │
│ │
├─────────────────────────────────────────────┤
│ [State Backend] │
│ - RocksDB (대규모 상태) │
│ - HashMap (소규모 상태) │
│ │
│ [Checkpoint Storage] │
│ - S3 / HDFS / GCS │
└─────────────────────────────────────────────┘
DataStream API
// Flink DataStream API 예시
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka Source
KafkaSource<Order> source = KafkaSource.<Order>builder()
.setBootstrapServers("broker1:9092")
.setTopics("orders")
.setGroupId("flink-order-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(
OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new OrderDeserializer())
.build();
DataStream<Order> orders = env.fromSource(
source, WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, ts) -> order.getCreatedAt()),
"Kafka Source"
);
// 처리 파이프라인
DataStream<OrderStats> stats = orders
.filter(order -> order.getStatus().equals("COMPLETED"))
.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderStatsAggregator());
// Kafka Sink
KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()
.setBootstrapServers("broker1:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("order-stats")
.setValueSerializationSchema(new OrderStatsSerializer())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-order-stats")
.build();
stats.sinkTo(sink);
env.execute("Order Statistics Pipeline");
윈도우 (Windowing)
Tumbling Window (텀블링 윈도우)
================================
시간: ──|──1──2──3──|──4──5──6──|──7──8──9──|──
윈도우: [ Window 1 ][ Window 2 ][ Window 3 ]
- 고정 크기, 겹치지 않음
- 예: 5분마다 집계
Sliding Window (슬라이딩 윈도우)
================================
시간: ──|──1──2──3──4──5──6──7──8──|──
윈도우: [ Window 1 ]
[ Window 2 ]
[ Window 3 ]
- 고정 크기, 슬라이드 간격만큼 이동
- 예: 10분 윈도우, 5분 슬라이드
Session Window (세션 윈도우)
================================
시간: ──1─2──3─────────5─6──7─────────9──
윈도우: [Session 1] [Session 2] [S3]
gap gap
- 활동 기반, 가변 크기
- gap 시간 동안 이벤트 없으면 윈도우 종료
// 윈도우 예시
// Tumbling Window
orders.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount");
// Sliding Window
orders.keyBy(Order::getRegion)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.sum("amount");
// Session Window
orders.keyBy(Order::getCustomerId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregator());
워터마크와 늦은 데이터
Event Time vs Processing Time
================================
이벤트 발생 시간(Event Time): t=10 t=12 t=11 t=15 t=13
처리 시간(Processing Time): T=20 T=21 T=22 T=23 T=24
이벤트가 순서대로 도착하지 않음!
→ 워터마크로 "여기까지의 이벤트는 모두 도착했을 것" 표시
워터마크 동작
================================
이벤트: [t=10] [t=12] [t=11] [t=15] [t=13]
워터마크: W(10) W(12) W(12) W(15) W(15)
(maxOutOfOrderness = 5초인 경우)
실제 워터마크: W(5) W(7) W(7) W(10) W(10)
윈도우 [0, 10): 워터마크가 10을 넘으면 발화(fire)
// 워터마크 전략
WatermarkStrategy<Order> strategy = WatermarkStrategy
// 최대 10초 지연 허용
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, timestamp) -> order.getCreatedAt())
// 5분 이상 이벤트 없으면 워터마크 진행
.withIdleness(Duration.ofMinutes(5));
// 늦은 데이터 처리
OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};
SingleOutputStreamOperator<OrderStats> stats = orders
.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // 1분 추가 대기
.sideOutputLateData(lateOutputTag) // 그 이후는 사이드 아웃풋
.aggregate(new OrderStatsAggregator());
// 늦은 데이터 별도 처리
DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);
lateOrders.addSink(new LateOrderAlertSink());
상태 관리와 체크포인트
// Keyed State 예시
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
// ValueState: 키별 단일 값
private ValueState<Boolean> flagState;
// ListState: 키별 리스트
private ListState<Transaction> recentTransactions;
// MapState: 키별 맵
private MapState<String, Integer> merchantCounts;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor =
new ValueStateDescriptor<>("flag", Boolean.class);
flagState = getRuntimeContext().getState(flagDescriptor);
ListStateDescriptor<Transaction> listDescriptor =
new ListStateDescriptor<>("recent-txns", Transaction.class);
recentTransactions = getRuntimeContext().getListState(listDescriptor);
MapStateDescriptor<String, Integer> mapDescriptor =
new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
merchantCounts = getRuntimeContext().getMapState(mapDescriptor);
}
@Override
public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
Boolean flag = flagState.value();
if (flag != null && txn.getAmount() > 10000) {
out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));
}
if (txn.getAmount() < 1.0) {
flagState.update(true);
// 1분 후 플래그 해제 타이머
ctx.timerService().registerEventTimeTimer(
txn.getTimestamp() + 60000);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
flagState.clear();
}
}
체크포인트 동작
================================
[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]
1. JobManager: 체크포인트 배리어 주입
Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink
2. 배리어가 오퍼레이터를 통과할 때 상태 스냅샷
Source(snapshot) ──▶ Map(snapshot) ──▶ ...
3. 모든 오퍼레이터 스냅샷 완료 → 체크포인트 성공
4. 장애 발생 시: 마지막 성공 체크포인트에서 복원
// 체크포인트 설정
env.enableCheckpointing(60000); // 60초 간격
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// State Backend 설정
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");
7. Kafka Streams vs Apache Flink
비교표
| 특성 | Kafka Streams | Apache Flink |
|---|---|---|
| 배포 방식 | 라이브러리 (JVM 앱에 임베드) | 분산 클러스터 |
| 입력 소스 | Kafka만 | Kafka, Kinesis, 파일 등 |
| Exactly-Once | Kafka 내부만 | 외부 시스템 포함 |
| 윈도우 | Tumbling, Sliding, Session | + Global, Custom |
| 상태 관리 | RocksDB (로컬) | RocksDB + 분산 스냅샷 |
| SQL 지원 | KSQL (별도) | Flink SQL (내장) |
| 처리량 | 중-대 | 대-초대 |
| 운영 복잡도 | 낮음 (앱 배포) | 높음 (클러스터 관리) |
| 적합한 경우 | Kafka 생태계 내 처리 | 복잡한 스트림 처리 |
선택 기준
use_kafka_streams_when:
- "Kafka에서 읽고 Kafka에 쓰는 단순한 처리"
- "별도 클러스터 운영이 부담스러운 경우"
- "마이크로서비스 내부에 임베드"
- "중간 정도의 처리량으로 충분한 경우"
- "팀이 Kafka 생태계에 익숙한 경우"
use_flink_when:
- "복잡한 이벤트 처리 (CEP)"
- "다양한 소스/싱크 연결"
- "대규모 상태 관리 필요"
- "고급 윈도우 함수 필요"
- "SQL 기반 스트림 처리"
- "초대규모 처리량 필요"
8. Kafka Connect
Source/Sink Connector
// Debezium MySQL Source Connector
{
"name": "mysql-source-orders",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-primary",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz-password",
"database.server.id": "1",
"topic.prefix": "cdc",
"database.include.list": "order_service",
"table.include.list": "order_service.orders,order_service.order_items",
"schema.history.internal.kafka.bootstrap.servers": "broker:9092",
"schema.history.internal.kafka.topic": "schema-changes.orders",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "cdc\\.order_service\\.(.*)",
"transforms.route.replacement": "order.$1.cdc"
}
}
// Elasticsearch Sink Connector
{
"name": "es-sink-orders",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "order.orders.cdc",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"write.method": "upsert",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "order_id"
}
}
Debezium CDC
Debezium CDC 흐름
================================
[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]
│
┌───────────────┤
▼ ▼
[Flink Job] [ES Sink Connector]
│ │
▼ ▼
[Kafka Topic] [Elasticsearch]
CDC 이벤트 구조:
- before: 변경 전 레코드 (UPDATE/DELETE)
- after: 변경 후 레코드 (INSERT/UPDATE)
- source: 소스 DB 메타데이터
- op: 작업 유형 (c=create, u=update, d=delete, r=read)
9. 성능 튜닝
Producer 튜닝
# Producer 성능 최적화
batch.size=65536 # 64KB (기본 16KB)
linger.ms=10 # 10ms 대기 (배치 효율)
buffer.memory=67108864 # 64MB
compression.type=lz4 # 압축 (네트워크/디스크 절약)
acks=all # 안정성
max.in.flight.requests.per.connection=5 # 파이프라이닝
send.buffer.bytes=131072 # 소켓 버퍼
Consumer 튜닝
# Consumer 성능 최적화
fetch.min.bytes=1048576 # 1MB (배치 효율)
fetch.max.wait.ms=500 # 최대 500ms 대기
max.poll.records=500 # 한 번에 가져올 레코드 수
max.partition.fetch.bytes=1048576 # 파티션당 1MB
session.timeout.ms=30000 # 세션 타임아웃
heartbeat.interval.ms=10000 # 하트비트 간격
max.poll.interval.ms=300000 # 폴 간격 최대 5분
Broker 튜닝
# Broker 성능 최적화
num.partitions=12 # 기본 파티션 수
default.replication.factor=3 # 복제 팩터
num.io.threads=8 # I/O 스레드
num.network.threads=3 # 네트워크 스레드
num.replica.fetchers=4 # 레플리카 페처
socket.send.buffer.bytes=102400 # 소켓 전송 버퍼
socket.receive.buffer.bytes=102400 # 소켓 수신 버퍼
log.segment.bytes=1073741824 # 1GB 세그먼트
log.retention.hours=168 # 7일 보존
log.retention.bytes=-1 # 크기 제한 없음
파티션 수 결정
파티션 수 결정 공식
================================
필요 처리량: 100MB/s
단일 파티션 처리량: ~10MB/s (Producer)
~5MB/s (Consumer)
Producer 관점: 100MB/s / 10MB/s = 10 파티션
Consumer 관점: 100MB/s / 5MB/s = 20 파티션
→ 최소 20 파티션 필요
추가 고려사항:
- Consumer 인스턴스 수에 맞춤 (파티션 수 >= Consumer 수)
- 파티션 수 증가는 가능하지만 감소는 불가
- 너무 많은 파티션: 메타데이터 오버헤드, 리밸런싱 지연
- 권장: 토픽당 6-50개 사이
10. 모니터링
JMX 메트릭
# 핵심 Kafka 메트릭
broker_metrics:
- name: "UnderReplicatedPartitions"
description: "ISR에서 벗어난 파티션 수"
alert_threshold: "> 0"
severity: critical
- name: "ActiveControllerCount"
description: "활성 컨트롤러 수"
alert_threshold: "!= 1"
severity: critical
- name: "OfflinePartitionsCount"
description: "오프라인 파티션 수"
alert_threshold: "> 0"
severity: critical
- name: "RequestsPerSec"
description: "초당 요청 수"
alert_threshold: "> 10000"
severity: warning
producer_metrics:
- name: "record-send-rate"
description: "초당 전송 레코드 수"
- name: "record-error-rate"
description: "초당 에러 레코드 수"
alert_threshold: "> 0"
- name: "request-latency-avg"
description: "평균 요청 지연시간"
alert_threshold: "> 100ms"
consumer_metrics:
- name: "records-lag-max"
description: "최대 Consumer Lag"
alert_threshold: "> 10000"
severity: warning
- name: "records-consumed-rate"
description: "초당 소비 레코드 수"
- name: "commit-latency-avg"
description: "평균 커밋 지연시간"
Consumer Lag 모니터링
Consumer Lag 계산
================================
Partition 0:
Log End Offset (LEO): 1000 (최신 메시지)
Consumer Offset: 850 (마지막 커밋)
Lag: 1000 - 850 = 150
Partition 1:
LEO: 2000
Consumer Offset: 1950
Lag: 50
Total Lag = 150 + 50 = 200 messages
# Burrow (LinkedIn의 Consumer Lag 모니터링)
burrow_config:
general:
pidfile: "/var/run/burrow.pid"
stdout-logfile: "/var/log/burrow.log"
cluster:
kafka-prod:
class-name: kafka
servers: ["broker1:9092", "broker2:9092", "broker3:9092"]
topic-refresh: 120
offset-refresh: 30
consumer:
kafka-prod:
class-name: kafka
cluster: kafka-prod
servers: ["broker1:9092", "broker2:9092"]
group-denylist: "^(console-consumer|_)"
start-latest: true
notifier:
slack:
class-name: http
url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"
template-open: |
Consumer group lag alert:
Group: {{.GroupName}}
Status: {{.Status}}
send-close: true
Grafana 대시보드
Kafka 모니터링 대시보드 패널
================================
1. Cluster Overview
- Broker 수 / 상태
- 총 파티션 수
- Under-replicated 파티션
- Active Controller
2. Throughput
- Messages In/Out per sec
- Bytes In/Out per sec
- Requests per sec
3. Consumer Groups
- Consumer Lag (그룹별)
- Consume Rate
- Rebalance 횟수
4. Latency
- Produce Request Latency (p50, p99)
- Fetch Request Latency
- End-to-End Latency
5. Resources
- Disk Usage per Broker
- Network I/O
- CPU / Memory
11. 퀴즈
Q1. Kafka에서 같은 키를 가진 메시지가 항상 같은 파티션에 들어가는 이유는?
기본 파티셔너가 murmur2 해시 함수를 사용하여 키의 해시값을 파티션 수로 나눈 나머지로 파티션을 결정하기 때문입니다.
partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
같은 키는 항상 같은 해시값을 생성하므로 같은 파티션에 할당됩니다. 이를 통해 같은 키의 메시지 순서가 보장됩니다.
단, 파티션 수가 변경되면 키-파티션 매핑이 달라질 수 있습니다.
Q2. Exactly-Once Semantics를 달성하기 위해 필요한 3가지 요소는?
-
멱등 프로듀서 (Idempotent Producer):
enable.idempotence=true로 ProducerID와 Sequence Number를 사용하여 중복 전송을 방지합니다. -
트랜잭셔널 프로듀서 (Transactional Producer):
transactional.id를 설정하고beginTransaction(),commitTransaction()으로 여러 메시지를 원자적으로 전송합니다. -
read_committed 격리 수준: Consumer에서
isolation.level=read_committed로 설정하여 커밋된 트랜잭션의 메시지만 읽습니다.
이 세 가지가 결합되어 Consume-Transform-Produce 패턴에서 정확히 한 번만 처리되는 것을 보장합니다.
Q3. Avro의 BACKWARD 호환성에서 새 필드를 추가할 때 반드시 필요한 것은?
default 값이 반드시 필요합니다.
BACKWARD 호환성은 새 스키마(v2)로 기존 데이터(v1으로 작성된)를 읽을 수 있어야 합니다. 기존 데이터에는 새 필드가 없으므로, 새 필드에 default 값이 없으면 역직렬화 시 오류가 발생합니다.
예시: "name": "status", "type": "string", "default": "CREATED"
Optional 필드는 ["null", "string"] union 타입에 "default": null을 사용합니다.
Q4. Flink의 워터마크가 해결하는 문제는 무엇인가?
워터마크는 이벤트 시간(Event Time) 기반 처리에서 늦게 도착하는 데이터 문제를 해결합니다.
분산 시스템에서 이벤트는 발생 순서와 다르게 도착할 수 있습니다. 워터마크는 특정 시점까지의 이벤트가 모두 도착했을 것이라는 추정을 제공합니다.
W(t) = "시간 t 이전의 모든 이벤트가 도착했을 것으로 추정"
워터마크가 윈도우의 끝 시간을 넘으면, 해당 윈도우의 계산 결과를 출력합니다. allowedLateness를 설정하면 워터마크 이후에도 늦은 데이터를 일정 시간 동안 수용할 수 있습니다.
Q5. Kafka Streams와 Apache Flink 중 어떤 것을 선택해야 하는 기준은?
Kafka Streams 선택 기준:
- Kafka에서 읽고 Kafka에 쓰는 단순한 처리
- 별도 클러스터 운영이 부담스러운 경우
- 마이크로서비스에 임베드하는 경우
- 팀이 Kafka 생태계에 익숙한 경우
Apache Flink 선택 기준:
- 복잡한 이벤트 처리(CEP)가 필요한 경우
- Kafka 외 다양한 소스/싱크 연결이 필요한 경우
- 대규모 상태 관리가 필요한 경우
- SQL 기반 스트림 처리가 필요한 경우
- 초대규모 처리량이 필요한 경우
핵심 차이: Kafka Streams는 라이브러리(JVM 앱에 임베드), Flink는 분산 클러스터입니다.
참고 자료
- Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/
- Narkhede, N., Shapira, G., Palino, T. (2021). Kafka: The Definitive Guide, 2nd ed. O'Reilly Media.
- Apache Flink Documentation. (2025). https://flink.apache.org/docs/
- Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/
- Hueske, F., Kalavri, V. (2019). Stream Processing with Apache Flink. O'Reilly Media.
- KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
- Debezium Documentation. (2025). https://debezium.io/documentation/
- Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
- Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."
- Apache Avro Specification. (2025). https://avro.apache.org/docs/
- Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow
- Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."
- Flink Forward Conference Talks. (2024). https://www.flink-forward.org/
- KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.
현재 단락 (1/926)
Event Streaming은 현대 분산 시스템의 핵심 인프라입니다. Apache Kafka를 중심으로 한 이벤트 스트리밍 생태계는 단순한 메시지 큐를 넘어, 실시간 데이터 파이프...