Skip to content

필사 모드: Event Streaming 심화 가이드 2025: Kafka 내부 구조, Flink, Exactly-Once, 스키마 진화

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며: Event Streaming의 진화

Event Streaming은 현대 분산 시스템의 핵심 인프라입니다. Apache Kafka를 중심으로 한 이벤트 스트리밍 생태계는 단순한 메시지 큐를 넘어, 실시간 데이터 파이프라인, 이벤트 소싱, CQRS, 스트림 처리의 기반이 되었습니다.

이 가이드에서는 Kafka의 내부 구조부터 Exactly-Once 시맨틱스, Schema Registry, Apache Flink까지 심층적으로 다룹니다.

1. Kafka 내부 구조 심화

파티션 구조

Kafka의 토픽은 하나 이상의 **파티션**으로 분할됩니다. 각 파티션은 순서가 보장되는 불변의 로그입니다.

Topic: orders (3 partitions)

Partition 0: [msg0] [msg3] [msg6] [msg9] ... → offset 증가

Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset 증가

Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset 증가

- 같은 파티션 내에서만 순서 보장

- 파티션 수 = Consumer Group 내 최대 병렬 소비 수

- 파티션 키로 같은 키의 메시지가 같은 파티션으로

세그먼트 파일과 인덱스

각 파티션은 디스크에 **세그먼트 파일**로 저장됩니다.

/kafka-logs/orders-0/

├── 00000000000000000000.log # 세그먼트 파일 (실제 메시지)

├── 00000000000000000000.index # 오프셋 인덱스

├── 00000000000000000000.timeindex # 타임스탬프 인덱스

├── 00000000000005242880.log # 다음 세그먼트

├── 00000000000005242880.index

├── 00000000000005242880.timeindex

└── leader-epoch-checkpoint

세그먼트 파일 구조

========================

.log 파일 (메시지 저장)

┌──────────────────────────────────────┐

│ Batch Header │

│ - Base Offset: 0 │

│ - Batch Length: 256 bytes │

│ - Magic: 2 (Kafka 0.11+) │

│ - CRC: checksum │

│ - Attributes: compression, txn │

│ - Producer ID: 12345 │

│ - Producer Epoch: 0 │

│ - Base Sequence: 0 │

├──────────────────────────────────────┤

│ Record 0 │

│ - Offset Delta: 0 │

│ - Timestamp Delta: 0 │

│ - Key: "order-123" │

│ - Value: (serialized payload) │

│ - Headers: [("source", "web")] │

├──────────────────────────────────────┤

│ Record 1 │

│ - Offset Delta: 1 │

│ - ... │

└──────────────────────────────────────┘

.index 파일 (희소 인덱스)

┌────────────────────────┐

│ Relative Offset → File │

│ Position │

├────────────────────────┤

│ 0 → 0 │

│ 4096 → 32768 │

│ 8192 → 65536 │

└────────────────────────┘

.timeindex 파일

┌────────────────────────────────┐

│ Timestamp → Relative Offset │

├────────────────────────────────┤

│ 1700000000000 → 0 │

│ 1700000060000 → 4096 │

└────────────────────────────────┘

Log Compaction

Log Compaction은 각 키의 최신 값만 유지하는 보존 전략입니다.

Compaction 전:

offset 0: key=A, value=v1

offset 1: key=B, value=v1

offset 2: key=A, value=v2 ← A의 최신 값

offset 3: key=C, value=v1

offset 4: key=B, value=v2 ← B의 최신 값

offset 5: key=A, value=v3 ← A의 최신 값

Compaction 후:

offset 3: key=C, value=v1 (C의 유일한 값)

offset 4: key=B, value=v2 (B의 최신 값)

offset 5: key=A, value=v3 (A의 최신 값)

Log Compaction 설정

log.cleanup.policy=compact

log.cleaner.min.cleanable.ratio=0.5

log.cleaner.min.compaction.lag.ms=0

min.cleanable.dirty.ratio=0.5

ISR (In-Sync Replicas)

ISR은 리더와 동기화된 레플리카의 집합입니다.

Partition 0 (Replication Factor = 3)

======================================

Broker 1 (Leader): [0] [1] [2] [3] [4] [5] ← LEO: 6

Broker 2 (Follower): [0] [1] [2] [3] [4] ← LEO: 5

Broker 3 (Follower): [0] [1] [2] [3] [4] [5] ← LEO: 6

ISR = {Broker 1, Broker 2, Broker 3}

HW (High Watermark) = 5 (ISR 내 최소 LEO)

- Consumer는 HW까지만 읽을 수 있음

- Broker 2가 지연이 심해지면 ISR에서 제거

- replica.lag.time.max.ms 초과 시 ISR에서 제거

ISR 관련 설정

min.insync.replicas=2 # 최소 동기화 레플리카 수

replica.lag.time.max.ms=30000 # ISR 탈락 기준 시간

unclean.leader.election.enable=false # ISR 외 레플리카의 리더 선출 방지

KRaft vs ZooKeeper

Kafka 3.x부터 ZooKeeper 없이 동작하는 KRaft 모드가 도입되었습니다.

ZooKeeper 모드 (레거시)

==========================

[ZooKeeper Ensemble]

↕ 메타데이터

[Controller (Broker 중 1개)]

↕ 리더 선출

[Broker 1] [Broker 2] [Broker 3]

문제점:

- ZooKeeper 별도 운영 필요

- 메타데이터 동기화 지연

- 파티션 수 증가 시 성능 저하

KRaft 모드 (Kafka 3.x+)

==========================

[Controller Quorum]

Broker 1 (Controller + Broker)

Broker 2 (Controller + Broker)

Broker 3 (Controller + Broker)

장점:

- ZooKeeper 의존성 제거

- 메타데이터 관리 성능 향상

- 수백만 파티션 지원

- 운영 복잡도 감소

KRaft 설정

process.roles=broker,controller

node.id=1

controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093

controller.listener.names=CONTROLLER

2. Producer 심화

Partitioner

프로듀서는 메시지를 어느 파티션에 보낼지 결정하는 파티셔너를 사용합니다.

// 기본 파티셔너 동작

public class DefaultPartitioner implements Partitioner {

// 키가 있는 경우: murmur2 해시

// 키가 없는 경우: Sticky Partitioner (배치 단위로 파티션 변경)

public int partition(String topic, Object key, byte[] keyBytes,

Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

// Sticky Partitioner: 배치가 꽉 찰 때까지 같은 파티션

return stickyPartitionCache.partition(topic, cluster);

}

// Key-based partitioning: 같은 키 = 같은 파티션

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

// 커스텀 파티셔너 예시: 지역 기반

public class RegionPartitioner implements Partitioner {

private Map<String, Integer> regionMapping;

public int partition(String topic, Object key, byte[] keyBytes,

Object value, byte[] valueBytes, Cluster cluster) {

String region = extractRegion(value);

int numPartitions = cluster.partitionsForTopic(topic).size();

return regionMapping.getOrDefault(region, 0) % numPartitions;

}

}

배치와 압축

Producer 배치 설정

batch.size=16384 # 배치 크기 (bytes)

linger.ms=5 # 배치 대기 시간

buffer.memory=33554432 # 전체 버퍼 메모리

압축 설정

compression.type=lz4 # none, gzip, snappy, lz4, zstd

Producer 배치 처리 흐름

==========================

send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]

┌───────────┴───────────┐

│ Partition 0 배치 │

│ [msg1][msg2][msg3] │

├───────────────────────┤

│ Partition 1 배치 │

│ [msg4][msg5] │

└───────────┬───────────┘

batch.size 도달

또는 linger.ms 경과

[Sender Thread]

압축 + 네트워크 전송

[Kafka Broker]

멱등 프로듀서 (Idempotent Producer)

멱등 프로듀서 설정

enable.idempotence=true # Kafka 3.0+ 기본값

acks=all # 멱등성 요구사항

retries=2147483647 # 무한 재시도

max.in.flight.requests.per.connection=5 # 최대 5 (멱등성 보장 범위)

멱등 프로듀서 동작 원리

==========================

Producer Broker

│ │

│── ProducerID: 1000 │

│ Epoch: 0 │

│ Sequence: 0 │

│ Message: "order-1" ──────▶│ ✓ 저장

│ │

│── ProducerID: 1000 │

│ Epoch: 0 │

│ Sequence: 1 │

│ Message: "order-2" ──────▶│ ✓ 저장

│ │

│── (네트워크 타임아웃으로 재전송) │

│ ProducerID: 1000 │

│ Epoch: 0 │

│ Sequence: 1 │

│ Message: "order-2" ──────▶│ ✗ 중복! 무시

│ │

트랜잭셔널 프로듀서

// 트랜잭셔널 프로듀서 예시

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092,broker2:9092");

props.put("transactional.id", "order-processor-1");

props.put("enable.idempotence", "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

producer.initTransactions();

try {

producer.beginTransaction();

// 여러 토픽/파티션에 원자적으로 전송

producer.send(new ProducerRecord<>("orders", "key1", "order-created"));

producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));

producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));

// Consumer 오프셋도 트랜잭션에 포함 가능

producer.sendOffsetsToTransaction(offsets, consumerGroupId);

producer.commitTransaction();

} catch (Exception e) {

producer.abortTransaction();

throw e;

}

3. Consumer 심화

Consumer Group과 파티션 할당

Topic: orders (6 partitions)

Consumer Group: order-processor

===================================

Case 1: 3 Consumers

Consumer A: [P0] [P1]

Consumer B: [P2] [P3]

Consumer C: [P4] [P5]

Case 2: 6 Consumers (이상적)

Consumer A: [P0]

Consumer B: [P1]

Consumer C: [P2]

Consumer D: [P3]

Consumer E: [P4]

Consumer F: [P5]

Case 3: 8 Consumers (2개 유휴)

Consumer A: [P0]

Consumer B: [P1]

Consumer C: [P2]

Consumer D: [P3]

Consumer E: [P4]

Consumer F: [P5]

Consumer G: (유휴 - idle)

Consumer H: (유휴 - idle)

파티션 할당 전략

// 파티션 할당 전략 비교

Properties props = new Properties();

// 1. Range Assignor (기본)

props.put("partition.assignment.strategy",

"org.apache.kafka.clients.consumer.RangeAssignor");

// 토픽별로 파티션을 범위로 나눔

// Topic A: P0,P1,P2 → C1: [P0,P1], C2: [P2]

// Topic B: P0,P1,P2 → C1: [P0,P1], C2: [P2]

// 문제: C1에 편중

// 2. RoundRobin Assignor

props.put("partition.assignment.strategy",

"org.apache.kafka.clients.consumer.RoundRobinAssignor");

// 모든 토픽의 파티션을 라운드 로빈으로 분배

// C1: [A-P0, B-P1], C2: [A-P1, B-P0], C3: [A-P2, B-P2]

// 3. Sticky Assignor

props.put("partition.assignment.strategy",

"org.apache.kafka.clients.consumer.StickyAssignor");

// 기존 할당 최대한 유지 + 균등 분배

// 리밸런싱 시 최소한의 파티션만 이동

// 4. CooperativeSticky Assignor (권장)

props.put("partition.assignment.strategy",

"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

// Sticky + 점진적 리밸런싱

// 전체 중단 없이 파티션 재할당

리밸런싱: Eager vs Incremental

Eager Rebalancing (기존 방식)

================================

1. Consumer C3 추가됨

2. 모든 Consumer가 파티션 해제 (Stop-the-World)

C1: [] C2: [] C3: []

3. 새로운 할당 계산

4. 파티션 재할당

C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]

→ 전체 처리 중단 발생!

Incremental Cooperative Rebalancing (권장)

================================

1. Consumer C3 추가됨

2. 1차 리밸런스: 이동 필요한 파티션만 해제

C1: [P0,P1,P2] → C1: [P0,P1] (P2 해제)

C2: [P3,P4,P5] → C2: [P3,P4] (P5 해제)

3. 2차 리밸런스: 해제된 파티션을 C3에 할당

C3: [] → C3: [P2,P5]

→ 나머지 파티션은 중단 없이 계속 처리!

오프셋 관리

// 자동 커밋 (기본)

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "5000");

// 위험: 처리 전 커밋되면 메시지 유실 가능

// 수동 커밋 (권장)

props.put("enable.auto.commit", "false");

while (true) {

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {

// 메시지 처리

processRecord(record);

}

// 동기 커밋 (처리 완료 후)

consumer.commitSync();

// 또는 비동기 커밋 (성능 향상)

consumer.commitAsync((offsets, exception) -> {

if (exception != null) {

log.error("Commit failed", exception);

}

});

}

오프셋 저장소: __consumer_offsets 토픽

============================================

Key: (group_id, topic, partition)

Value: (offset, metadata, timestamp)

예시:

Key: (order-processor, orders, 0)

Value: (offset=42, metadata="", timestamp=1700000000)

4. Exactly-Once Semantics

메시지 전달 보장 수준

At-Most-Once (최대 한 번)

================================

Producer ──send()──▶ Broker

(acks=0, 확인 안 함)

- 메시지 유실 가능

- 중복 없음

- 가장 빠름

At-Least-Once (최소 한 번)

================================

Producer ──send()──▶ Broker ──ack──▶ Producer

(acks=all, 재시도 있음)

- 메시지 유실 없음

- 중복 가능

- 대부분의 기본 설정

Exactly-Once (정확히 한 번)

================================

Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)

(멱등 프로듀서 + 트랜잭션 + read_committed)

- 메시지 유실 없음

- 중복 없음

- 가장 강력하지만 오버헤드 있음

Exactly-Once 구현

// Exactly-Once: Consume-Transform-Produce 패턴

Properties producerProps = new Properties();

producerProps.put("transactional.id", "eos-processor-1");

producerProps.put("enable.idempotence", "true");

Properties consumerProps = new Properties();

consumerProps.put("group.id", "eos-group");

consumerProps.put("isolation.level", "read_committed"); // 핵심!

consumerProps.put("enable.auto.commit", "false");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

producer.initTransactions();

consumer.subscribe(Collections.singleton("input-topic"));

while (true) {

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (!records.isEmpty()) {

producer.beginTransaction();

try {

for (ConsumerRecord<String, String> record : records) {

// Transform

String result = transform(record.value());

// Produce to output topic

producer.send(new ProducerRecord<>(

"output-topic", record.key(), result));

}

// Consumer 오프셋을 트랜잭션에 포함

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

for (TopicPartition partition : records.partitions()) {

List<ConsumerRecord<String, String>> partRecords =

records.records(partition);

long lastOffset = partRecords.get(

partRecords.size() - 1).offset();

offsets.put(partition,

new OffsetAndMetadata(lastOffset + 1));

}

producer.sendOffsetsToTransaction(offsets,

consumer.groupMetadata());

producer.commitTransaction();

} catch (Exception e) {

producer.abortTransaction();

}

}

}

Exactly-Once 트랜잭션 흐름

================================

1. beginTransaction()

2. send(output-topic, msg1) → Broker에 임시 저장 (uncommitted)

3. send(output-topic, msg2) → Broker에 임시 저장

4. sendOffsetsToTransaction() → Consumer 오프셋도 트랜잭션에 포함

5. commitTransaction() → 모든 메시지 + 오프셋 원자적 커밋

read_committed Consumer는 커밋된 메시지만 읽음

→ 중간에 실패하면 abortTransaction()으로 모두 롤백

5. Schema Registry

스키마 직렬화 형식 비교

| 특성 | Avro | Protobuf | JSON Schema |

|---|---|---|---|

| 직렬화 크기 | 작음 | 작음 | 큼 |

| 스키마 진화 | 우수 | 우수 | 보통 |

| 코드 생성 | 선택적 | 필수 | 불필요 |

| 가독성 | 낮음 (바이너리) | 낮음 (바이너리) | 높음 (텍스트) |

| 동적 타이핑 | 지원 | 미지원 | 지원 |

| Kafka 호환성 | 최상 | 우수 | 보통 |

Avro 스키마와 진화

// Version 1: 초기 스키마

{

"type": "record",

"name": "Order",

"namespace": "com.company.events",

"fields": [

{"name": "order_id", "type": "string"},

{"name": "customer_id", "type": "string"},

{"name": "amount", "type": "double"},

{"name": "currency", "type": "string", "default": "USD"},

{"name": "created_at", "type": "long"}

]

}

// Version 2: Backward Compatible 진화

// 새 필드에 default 값 추가 → 기존 Consumer가 새 데이터를 읽을 수 있음

{

"type": "record",

"name": "Order",

"namespace": "com.company.events",

"fields": [

{"name": "order_id", "type": "string"},

{"name": "customer_id", "type": "string"},

{"name": "amount", "type": "double"},

{"name": "currency", "type": "string", "default": "USD"},

{"name": "created_at", "type": "long"},

{"name": "status", "type": "string", "default": "CREATED"},

{"name": "region", "type": ["null", "string"], "default": null}

]

}

호환성 모드

BACKWARD (기본값, 권장)

================================

- 새 스키마로 기존 데이터를 읽을 수 있음

- 새 필드: default 값 필수

- 필드 삭제: 가능

- Consumer 먼저 업그레이드 → Producer 업그레이드

FORWARD

================================

- 기존 스키마로 새 데이터를 읽을 수 있음

- 새 필드: default 값 필수

- 필드 삭제: default 값이 있는 필드만

- Producer 먼저 업그레이드 → Consumer 업그레이드

FULL

================================

- BACKWARD + FORWARD 모두 충족

- 가장 안전하지만 가장 제한적

NONE

================================

- 호환성 검사 없음

- 위험! 운영 환경에서 사용 금지

Schema Registry API 사용

SCHEMA_REGISTRY_URL = "http://schema-registry:8081"

스키마 등록

schema = {

"type": "record",

"name": "Order",

"fields": [

{"name": "order_id", "type": "string"},

{"name": "amount", "type": "double"}

]

}

response = requests.post(

f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",

json={"schema": json.dumps(schema), "schemaType": "AVRO"},

)

호환성 검사

response = requests.post(

f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",

json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},

)

compatibility = response.json()

print(f"Compatible: {compatibility['is_compatible']}")

호환성 모드 설정

requests.put(

f"{SCHEMA_REGISTRY_URL}/config/orders-value",

json={"compatibility": "BACKWARD"},

)

Protobuf 스키마 진화

// Version 1

syntax = "proto3";

package com.company.events;

message Order {

string order_id = 1;

string customer_id = 2;

double amount = 3;

string currency = 4;

int64 created_at = 5;

}

// Version 2 (호환성 유지)

message Order {

string order_id = 1;

string customer_id = 2;

double amount = 3;

string currency = 4;

int64 created_at = 5;

// 새 필드: 기존 필드 번호 사용 금지!

string status = 6; // 새로 추가

string region = 7; // 새로 추가

// reserved 2; // 삭제된 필드 번호 예약

}

6. Apache Flink 심화

Flink 아키텍처 개요

┌─────────────────────────────────────────────┐

│ Flink Cluster │

├─────────────────────────────────────────────┤

│ │

│ [JobManager] │

│ - 잡 스케줄링 │

│ - 체크포인트 조정 │

│ - 장애 복구 │

│ │

│ [TaskManager 1] [TaskManager 2] │

│ - Task Slot 1 - Task Slot 1 │

│ - Task Slot 2 - Task Slot 2 │

│ - Task Slot 3 - Task Slot 3 │

│ │

├─────────────────────────────────────────────┤

│ [State Backend] │

│ - RocksDB (대규모 상태) │

│ - HashMap (소규모 상태) │

│ │

│ [Checkpoint Storage] │

│ - S3 / HDFS / GCS │

└─────────────────────────────────────────────┘

DataStream API

// Flink DataStream API 예시

StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka Source

KafkaSource<Order> source = KafkaSource.<Order>builder()

.setBootstrapServers("broker1:9092")

.setTopics("orders")

.setGroupId("flink-order-processor")

.setStartingOffsets(OffsetsInitializer.committedOffsets(

OffsetResetStrategy.EARLIEST))

.setValueOnlyDeserializer(new OrderDeserializer())

.build();

DataStream<Order> orders = env.fromSource(

source, WatermarkStrategy

.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))

.withTimestampAssigner((order, ts) -> order.getCreatedAt()),

"Kafka Source"

);

// 처리 파이프라인

DataStream<OrderStats> stats = orders

.filter(order -> order.getStatus().equals("COMPLETED"))

.keyBy(Order::getRegion)

.window(TumblingEventTimeWindows.of(Time.minutes(5)))

.aggregate(new OrderStatsAggregator());

// Kafka Sink

KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()

.setBootstrapServers("broker1:9092")

.setRecordSerializer(KafkaRecordSerializationSchema.builder()

.setTopic("order-stats")

.setValueSerializationSchema(new OrderStatsSerializer())

.build())

.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

.setTransactionalIdPrefix("flink-order-stats")

.build();

stats.sinkTo(sink);

env.execute("Order Statistics Pipeline");

윈도우 (Windowing)

Tumbling Window (텀블링 윈도우)

================================

시간: ──|──1──2──3──|──4──5──6──|──7──8──9──|──

윈도우: [ Window 1 ][ Window 2 ][ Window 3 ]

- 고정 크기, 겹치지 않음

- 예: 5분마다 집계

Sliding Window (슬라이딩 윈도우)

================================

시간: ──|──1──2──3──4──5──6──7──8──|──

윈도우: [ Window 1 ]

[ Window 2 ]

[ Window 3 ]

- 고정 크기, 슬라이드 간격만큼 이동

- 예: 10분 윈도우, 5분 슬라이드

Session Window (세션 윈도우)

================================

시간: ──1─2──3─────────5─6──7─────────9──

윈도우: [Session 1] [Session 2] [S3]

gap gap

- 활동 기반, 가변 크기

- gap 시간 동안 이벤트 없으면 윈도우 종료

// 윈도우 예시

// Tumbling Window

orders.keyBy(Order::getRegion)

.window(TumblingEventTimeWindows.of(Time.minutes(5)))

.sum("amount");

// Sliding Window

orders.keyBy(Order::getRegion)

.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))

.sum("amount");

// Session Window

orders.keyBy(Order::getCustomerId)

.window(EventTimeSessionWindows.withGap(Time.minutes(30)))

.aggregate(new SessionAggregator());

워터마크와 늦은 데이터

Event Time vs Processing Time

================================

이벤트 발생 시간(Event Time): t=10 t=12 t=11 t=15 t=13

처리 시간(Processing Time): T=20 T=21 T=22 T=23 T=24

이벤트가 순서대로 도착하지 않음!

→ 워터마크로 "여기까지의 이벤트는 모두 도착했을 것" 표시

워터마크 동작

================================

이벤트: [t=10] [t=12] [t=11] [t=15] [t=13]

워터마크: W(10) W(12) W(12) W(15) W(15)

(maxOutOfOrderness = 5초인 경우)

실제 워터마크: W(5) W(7) W(7) W(10) W(10)

윈도우 [0, 10): 워터마크가 10을 넘으면 발화(fire)

// 워터마크 전략

WatermarkStrategy<Order> strategy = WatermarkStrategy

// 최대 10초 지연 허용

.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))

.withTimestampAssigner((order, timestamp) -> order.getCreatedAt())

// 5분 이상 이벤트 없으면 워터마크 진행

.withIdleness(Duration.ofMinutes(5));

// 늦은 데이터 처리

OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};

SingleOutputStreamOperator<OrderStats> stats = orders

.keyBy(Order::getRegion)

.window(TumblingEventTimeWindows.of(Time.minutes(5)))

.allowedLateness(Time.minutes(1)) // 1분 추가 대기

.sideOutputLateData(lateOutputTag) // 그 이후는 사이드 아웃풋

.aggregate(new OrderStatsAggregator());

// 늦은 데이터 별도 처리

DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);

lateOrders.addSink(new LateOrderAlertSink());

상태 관리와 체크포인트

// Keyed State 예시

public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

// ValueState: 키별 단일 값

private ValueState<Boolean> flagState;

// ListState: 키별 리스트

private ListState<Transaction> recentTransactions;

// MapState: 키별 맵

private MapState<String, Integer> merchantCounts;

@Override

public void open(Configuration parameters) {

ValueStateDescriptor<Boolean> flagDescriptor =

new ValueStateDescriptor<>("flag", Boolean.class);

flagState = getRuntimeContext().getState(flagDescriptor);

ListStateDescriptor<Transaction> listDescriptor =

new ListStateDescriptor<>("recent-txns", Transaction.class);

recentTransactions = getRuntimeContext().getListState(listDescriptor);

MapStateDescriptor<String, Integer> mapDescriptor =

new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);

merchantCounts = getRuntimeContext().getMapState(mapDescriptor);

}

@Override

public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {

Boolean flag = flagState.value();

if (flag != null && txn.getAmount() > 10000) {

out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));

}

if (txn.getAmount() < 1.0) {

flagState.update(true);

// 1분 후 플래그 해제 타이머

ctx.timerService().registerEventTimeTimer(

txn.getTimestamp() + 60000);

}

}

@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {

flagState.clear();

}

}

체크포인트 동작

================================

[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]

1. JobManager: 체크포인트 배리어 주입

Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink

2. 배리어가 오퍼레이터를 통과할 때 상태 스냅샷

Source(snapshot) ──▶ Map(snapshot) ──▶ ...

3. 모든 오퍼레이터 스냅샷 완료 → 체크포인트 성공

4. 장애 발생 시: 마지막 성공 체크포인트에서 복원

// 체크포인트 설정

env.enableCheckpointing(60000); // 60초 간격

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);

env.getCheckpointConfig().setCheckpointTimeout(120000);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// State Backend 설정

env.setStateBackend(new EmbeddedRocksDBStateBackend());

env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");

7. Kafka Streams vs Apache Flink

비교표

| 특성 | Kafka Streams | Apache Flink |

|---|---|---|

| 배포 방식 | 라이브러리 (JVM 앱에 임베드) | 분산 클러스터 |

| 입력 소스 | Kafka만 | Kafka, Kinesis, 파일 등 |

| Exactly-Once | Kafka 내부만 | 외부 시스템 포함 |

| 윈도우 | Tumbling, Sliding, Session | + Global, Custom |

| 상태 관리 | RocksDB (로컬) | RocksDB + 분산 스냅샷 |

| SQL 지원 | KSQL (별도) | Flink SQL (내장) |

| 처리량 | 중-대 | 대-초대 |

| 운영 복잡도 | 낮음 (앱 배포) | 높음 (클러스터 관리) |

| 적합한 경우 | Kafka 생태계 내 처리 | 복잡한 스트림 처리 |

선택 기준

use_kafka_streams_when:

- "Kafka에서 읽고 Kafka에 쓰는 단순한 처리"

- "별도 클러스터 운영이 부담스러운 경우"

- "마이크로서비스 내부에 임베드"

- "중간 정도의 처리량으로 충분한 경우"

- "팀이 Kafka 생태계에 익숙한 경우"

use_flink_when:

- "복잡한 이벤트 처리 (CEP)"

- "다양한 소스/싱크 연결"

- "대규모 상태 관리 필요"

- "고급 윈도우 함수 필요"

- "SQL 기반 스트림 처리"

- "초대규모 처리량 필요"

8. Kafka Connect

Source/Sink Connector

// Debezium MySQL Source Connector

{

"name": "mysql-source-orders",

"config": {

"connector.class": "io.debezium.connector.mysql.MySqlConnector",

"database.hostname": "mysql-primary",

"database.port": "3306",

"database.user": "debezium",

"database.password": "dbz-password",

"database.server.id": "1",

"topic.prefix": "cdc",

"database.include.list": "order_service",

"table.include.list": "order_service.orders,order_service.order_items",

"schema.history.internal.kafka.bootstrap.servers": "broker:9092",

"schema.history.internal.kafka.topic": "schema-changes.orders",

"transforms": "route",

"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",

"transforms.route.regex": "cdc\\.order_service\\.(.*)",

"transforms.route.replacement": "order.$1.cdc"

}

}

// Elasticsearch Sink Connector

{

"name": "es-sink-orders",

"config": {

"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"connection.url": "http://elasticsearch:9200",

"topics": "order.orders.cdc",

"type.name": "_doc",

"key.ignore": "false",

"schema.ignore": "true",

"behavior.on.null.values": "delete",

"write.method": "upsert",

"transforms": "extractKey",

"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",

"transforms.extractKey.field": "order_id"

}

}

Debezium CDC

Debezium CDC 흐름

================================

[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]

┌───────────────┤

▼ ▼

[Flink Job] [ES Sink Connector]

│ │

▼ ▼

[Kafka Topic] [Elasticsearch]

CDC 이벤트 구조:

- before: 변경 전 레코드 (UPDATE/DELETE)

- after: 변경 후 레코드 (INSERT/UPDATE)

- source: 소스 DB 메타데이터

- op: 작업 유형 (c=create, u=update, d=delete, r=read)

9. 성능 튜닝

Producer 튜닝

Producer 성능 최적화

batch.size=65536 # 64KB (기본 16KB)

linger.ms=10 # 10ms 대기 (배치 효율)

buffer.memory=67108864 # 64MB

compression.type=lz4 # 압축 (네트워크/디스크 절약)

acks=all # 안정성

max.in.flight.requests.per.connection=5 # 파이프라이닝

send.buffer.bytes=131072 # 소켓 버퍼

Consumer 튜닝

Consumer 성능 최적화

fetch.min.bytes=1048576 # 1MB (배치 효율)

fetch.max.wait.ms=500 # 최대 500ms 대기

max.poll.records=500 # 한 번에 가져올 레코드 수

max.partition.fetch.bytes=1048576 # 파티션당 1MB

session.timeout.ms=30000 # 세션 타임아웃

heartbeat.interval.ms=10000 # 하트비트 간격

max.poll.interval.ms=300000 # 폴 간격 최대 5분

Broker 튜닝

Broker 성능 최적화

num.partitions=12 # 기본 파티션 수

default.replication.factor=3 # 복제 팩터

num.io.threads=8 # I/O 스레드

num.network.threads=3 # 네트워크 스레드

num.replica.fetchers=4 # 레플리카 페처

socket.send.buffer.bytes=102400 # 소켓 전송 버퍼

socket.receive.buffer.bytes=102400 # 소켓 수신 버퍼

log.segment.bytes=1073741824 # 1GB 세그먼트

log.retention.hours=168 # 7일 보존

log.retention.bytes=-1 # 크기 제한 없음

파티션 수 결정

파티션 수 결정 공식

================================

필요 처리량: 100MB/s

단일 파티션 처리량: ~10MB/s (Producer)

~5MB/s (Consumer)

Producer 관점: 100MB/s / 10MB/s = 10 파티션

Consumer 관점: 100MB/s / 5MB/s = 20 파티션

→ 최소 20 파티션 필요

추가 고려사항:

- Consumer 인스턴스 수에 맞춤 (파티션 수 >= Consumer 수)

- 파티션 수 증가는 가능하지만 감소는 불가

- 너무 많은 파티션: 메타데이터 오버헤드, 리밸런싱 지연

- 권장: 토픽당 6-50개 사이

10. 모니터링

JMX 메트릭

핵심 Kafka 메트릭

broker_metrics:

- name: "UnderReplicatedPartitions"

description: "ISR에서 벗어난 파티션 수"

alert_threshold: "> 0"

severity: critical

- name: "ActiveControllerCount"

description: "활성 컨트롤러 수"

alert_threshold: "!= 1"

severity: critical

- name: "OfflinePartitionsCount"

description: "오프라인 파티션 수"

alert_threshold: "> 0"

severity: critical

- name: "RequestsPerSec"

description: "초당 요청 수"

alert_threshold: "> 10000"

severity: warning

producer_metrics:

- name: "record-send-rate"

description: "초당 전송 레코드 수"

- name: "record-error-rate"

description: "초당 에러 레코드 수"

alert_threshold: "> 0"

- name: "request-latency-avg"

description: "평균 요청 지연시간"

alert_threshold: "> 100ms"

consumer_metrics:

- name: "records-lag-max"

description: "최대 Consumer Lag"

alert_threshold: "> 10000"

severity: warning

- name: "records-consumed-rate"

description: "초당 소비 레코드 수"

- name: "commit-latency-avg"

description: "평균 커밋 지연시간"

Consumer Lag 모니터링

Consumer Lag 계산

================================

Partition 0:

Log End Offset (LEO): 1000 (최신 메시지)

Consumer Offset: 850 (마지막 커밋)

Lag: 1000 - 850 = 150

Partition 1:

LEO: 2000

Consumer Offset: 1950

Lag: 50

Total Lag = 150 + 50 = 200 messages

Burrow (LinkedIn의 Consumer Lag 모니터링)

burrow_config:

general:

pidfile: "/var/run/burrow.pid"

stdout-logfile: "/var/log/burrow.log"

cluster:

kafka-prod:

class-name: kafka

servers: ["broker1:9092", "broker2:9092", "broker3:9092"]

topic-refresh: 120

offset-refresh: 30

consumer:

kafka-prod:

class-name: kafka

cluster: kafka-prod

servers: ["broker1:9092", "broker2:9092"]

group-denylist: "^(console-consumer|_)"

start-latest: true

notifier:

slack:

class-name: http

url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"

template-open: |

Consumer group lag alert:

Group: {{.GroupName}}

Status: {{.Status}}

send-close: true

Grafana 대시보드

Kafka 모니터링 대시보드 패널

================================

1. Cluster Overview

- Broker 수 / 상태

- 총 파티션 수

- Under-replicated 파티션

- Active Controller

2. Throughput

- Messages In/Out per sec

- Bytes In/Out per sec

- Requests per sec

3. Consumer Groups

- Consumer Lag (그룹별)

- Consume Rate

- Rebalance 횟수

4. Latency

- Produce Request Latency (p50, p99)

- Fetch Request Latency

- End-to-End Latency

5. Resources

- Disk Usage per Broker

- Network I/O

- CPU / Memory

11. 퀴즈

기본 파티셔너가 **murmur2 해시** 함수를 사용하여 키의 해시값을 파티션 수로 나눈 나머지로 파티션을 결정하기 때문입니다.

`partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions`

같은 키는 항상 같은 해시값을 생성하므로 같은 파티션에 할당됩니다. 이를 통해 같은 키의 메시지 순서가 보장됩니다.

단, 파티션 수가 변경되면 키-파티션 매핑이 달라질 수 있습니다.

1. **멱등 프로듀서 (Idempotent Producer)**: `enable.idempotence=true`로 ProducerID와 Sequence Number를 사용하여 중복 전송을 방지합니다.

2. **트랜잭셔널 프로듀서 (Transactional Producer)**: `transactional.id`를 설정하고 `beginTransaction()`, `commitTransaction()`으로 여러 메시지를 원자적으로 전송합니다.

3. **read_committed 격리 수준**: Consumer에서 `isolation.level=read_committed`로 설정하여 커밋된 트랜잭션의 메시지만 읽습니다.

이 세 가지가 결합되어 Consume-Transform-Produce 패턴에서 정확히 한 번만 처리되는 것을 보장합니다.

**default 값**이 반드시 필요합니다.

BACKWARD 호환성은 새 스키마(v2)로 기존 데이터(v1으로 작성된)를 읽을 수 있어야 합니다. 기존 데이터에는 새 필드가 없으므로, 새 필드에 default 값이 없으면 역직렬화 시 오류가 발생합니다.

예시: `"name": "status", "type": "string", "default": "CREATED"`

Optional 필드는 `["null", "string"]` union 타입에 `"default": null`을 사용합니다.

워터마크는 **이벤트 시간(Event Time) 기반 처리에서 늦게 도착하는 데이터 문제**를 해결합니다.

분산 시스템에서 이벤트는 발생 순서와 다르게 도착할 수 있습니다. 워터마크는 특정 시점까지의 이벤트가 모두 도착했을 것이라는 추정을 제공합니다.

`W(t)` = "시간 t 이전의 모든 이벤트가 도착했을 것으로 추정"

워터마크가 윈도우의 끝 시간을 넘으면, 해당 윈도우의 계산 결과를 출력합니다. `allowedLateness`를 설정하면 워터마크 이후에도 늦은 데이터를 일정 시간 동안 수용할 수 있습니다.

**Kafka Streams 선택 기준**:

- Kafka에서 읽고 Kafka에 쓰는 단순한 처리

- 별도 클러스터 운영이 부담스러운 경우

- 마이크로서비스에 임베드하는 경우

- 팀이 Kafka 생태계에 익숙한 경우

**Apache Flink 선택 기준**:

- 복잡한 이벤트 처리(CEP)가 필요한 경우

- Kafka 외 다양한 소스/싱크 연결이 필요한 경우

- 대규모 상태 관리가 필요한 경우

- SQL 기반 스트림 처리가 필요한 경우

- 초대규모 처리량이 필요한 경우

핵심 차이: Kafka Streams는 라이브러리(JVM 앱에 임베드), Flink는 분산 클러스터입니다.

참고 자료

1. Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/

2. Narkhede, N., Shapira, G., Palino, T. (2021). *Kafka: The Definitive Guide*, 2nd ed. O'Reilly Media.

3. Apache Flink Documentation. (2025). https://flink.apache.org/docs/

4. Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/

5. Hueske, F., Kalavri, V. (2019). *Stream Processing with Apache Flink*. O'Reilly Media.

6. KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500

7. Debezium Documentation. (2025). https://debezium.io/documentation/

8. Kleppmann, M. (2017). *Designing Data-Intensive Applications*. O'Reilly Media.

9. Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."

10. Apache Avro Specification. (2025). https://avro.apache.org/docs/

11. Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow

12. Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."

13. Flink Forward Conference Talks. (2024). https://www.flink-forward.org/

14. KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.

현재 단락 (1/911)

Event Streaming은 현대 분산 시스템의 핵심 인프라입니다. Apache Kafka를 중심으로 한 이벤트 스트리밍 생태계는 단순한 메시지 큐를 넘어, 실시간 데이터 파이프...

작성 글자: 0원문 글자: 26,267작성 단락: 0/911