Skip to content
Published on

Event Streaming 심화 가이드 2025: Kafka 내부 구조, Flink, Exactly-Once, 스키마 진화

Authors

들어가며: Event Streaming의 진화

Event Streaming은 현대 분산 시스템의 핵심 인프라입니다. Apache Kafka를 중심으로 한 이벤트 스트리밍 생태계는 단순한 메시지 큐를 넘어, 실시간 데이터 파이프라인, 이벤트 소싱, CQRS, 스트림 처리의 기반이 되었습니다.

이 가이드에서는 Kafka의 내부 구조부터 Exactly-Once 시맨틱스, Schema Registry, Apache Flink까지 심층적으로 다룹니다.


1. Kafka 내부 구조 심화

파티션 구조

Kafka의 토픽은 하나 이상의 파티션으로 분할됩니다. 각 파티션은 순서가 보장되는 불변의 로그입니다.

Topic: orders (3 partitions)

Partition 0: [msg0] [msg3] [msg6] [msg9]  ... → offset 증가
Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset 증가
Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset 증가

- 같은 파티션 내에서만 순서 보장
- 파티션 수 = Consumer Group 내 최대 병렬 소비 수
- 파티션 키로 같은 키의 메시지가 같은 파티션으로

세그먼트 파일과 인덱스

각 파티션은 디스크에 세그먼트 파일로 저장됩니다.

/kafka-logs/orders-0/
├── 00000000000000000000.log      # 세그먼트 파일 (실제 메시지)
├── 00000000000000000000.index    # 오프셋 인덱스
├── 00000000000000000000.timeindex # 타임스탬프 인덱스
├── 00000000000005242880.log      # 다음 세그먼트
├── 00000000000005242880.index
├── 00000000000005242880.timeindex
└── leader-epoch-checkpoint
세그먼트 파일 구조
========================

.log 파일 (메시지 저장)
┌──────────────────────────────────────┐
Batch Header- Base Offset: 0- Batch Length: 256 bytes           │
- Magic: 2 (Kafka 0.11+)- CRC: checksum                     │
- Attributes: compression, txn      │
- Producer ID: 12345- Producer Epoch: 0- Base Sequence: 0├──────────────────────────────────────┤
Record 0- Offset Delta: 0- Timestamp Delta: 0- Key: "order-123"- Value: (serialized payload)- Headers: [("source", "web")]├──────────────────────────────────────┤
Record 1- Offset Delta: 1- ...└──────────────────────────────────────┘

.index 파일 (희소 인덱스)
┌────────────────────────┐
Relative OffsetFilePosition├────────────────────────┤
00409632768819265536└────────────────────────┘

.timeindex 파일
┌────────────────────────────────┐
TimestampRelative Offset├────────────────────────────────┤
1700000000000017000000600004096└────────────────────────────────┘

Log Compaction

Log Compaction은 각 키의 최신 값만 유지하는 보존 전략입니다.

Compaction:
offset 0: key=A, value=v1
offset 1: key=B, value=v1
offset 2: key=A, value=v2  ← A의 최신 값
offset 3: key=C, value=v1
offset 4: key=B, value=v2  ← B의 최신 값
offset 5: key=A, value=v3  ← A의 최신 값

Compaction:
offset 3: key=C, value=v1  (C의 유일한 값)
offset 4: key=B, value=v2  (B의 최신 값)
offset 5: key=A, value=v3  (A의 최신 값)
# Log Compaction 설정
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
min.cleanable.dirty.ratio=0.5

ISR (In-Sync Replicas)

ISR은 리더와 동기화된 레플리카의 집합입니다.

Partition 0 (Replication Factor = 3)
======================================

Broker 1 (Leader):      [0] [1] [2] [3] [4] [5]LEO: 6
Broker 2 (Follower):    [0] [1] [2] [3] [4]LEO: 5
Broker 3 (Follower):    [0] [1] [2] [3] [4] [5]LEO: 6

ISR = {Broker 1, Broker 2, Broker 3}
HW (High Watermark) = 5 (ISR 내 최소 LEO)

- Consumer는 HW까지만 읽을 수 있음
- Broker 2가 지연이 심해지면 ISR에서 제거
- replica.lag.time.max.ms 초과 시 ISR에서 제거
# ISR 관련 설정
min.insync.replicas=2           # 최소 동기화 레플리카 수
replica.lag.time.max.ms=30000   # ISR 탈락 기준 시간
unclean.leader.election.enable=false  # ISR 외 레플리카의 리더 선출 방지

KRaft vs ZooKeeper

Kafka 3.x부터 ZooKeeper 없이 동작하는 KRaft 모드가 도입되었습니다.

ZooKeeper 모드 (레거시)
==========================
[ZooKeeper Ensemble]
  ↕ 메타데이터
[Controller (Broker1)]
  ↕ 리더 선출
[Broker 1] [Broker 2] [Broker 3]

문제점:
- ZooKeeper 별도 운영 필요
- 메타데이터 동기화 지연
- 파티션 수 증가 시 성능 저하


KRaft 모드 (Kafka 3.x+)
==========================
[Controller Quorum]
  Broker 1 (Controller + Broker)
  Broker 2 (Controller + Broker)
  Broker 3 (Controller + Broker)

장점:
- ZooKeeper 의존성 제거
- 메타데이터 관리 성능 향상
- 수백만 파티션 지원
- 운영 복잡도 감소
# KRaft 설정
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER

2. Producer 심화

Partitioner

프로듀서는 메시지를 어느 파티션에 보낼지 결정하는 파티셔너를 사용합니다.

// 기본 파티셔너 동작
public class DefaultPartitioner implements Partitioner {
    // 키가 있는 경우: murmur2 해시
    // 키가 없는 경우: Sticky Partitioner (배치 단위로 파티션 변경)

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            // Sticky Partitioner: 배치가 꽉 찰 때까지 같은 파티션
            return stickyPartitionCache.partition(topic, cluster);
        }

        // Key-based partitioning: 같은 키 = 같은 파티션
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
// 커스텀 파티셔너 예시: 지역 기반
public class RegionPartitioner implements Partitioner {
    private Map<String, Integer> regionMapping;

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String region = extractRegion(value);
        int numPartitions = cluster.partitionsForTopic(topic).size();

        return regionMapping.getOrDefault(region, 0) % numPartitions;
    }
}

배치와 압축

# Producer 배치 설정
batch.size=16384                  # 배치 크기 (bytes)
linger.ms=5                       # 배치 대기 시간
buffer.memory=33554432            # 전체 버퍼 메모리

# 압축 설정
compression.type=lz4              # none, gzip, snappy, lz4, zstd
Producer 배치 처리 흐름
==========================

send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]
                                         ┌───────────┴───────────┐
Partition 0 배치     │
[msg1][msg2][msg3]                                         ├───────────────────────┤
Partition 1 배치     │
[msg4][msg5]                                         └───────────┬───────────┘
                                              batch.size 도달
                                              또는 linger.ms 경과
                                              [Sender Thread]
                                              압축 + 네트워크 전송
                                              [Kafka Broker]

멱등 프로듀서 (Idempotent Producer)

# 멱등 프로듀서 설정
enable.idempotence=true    # Kafka 3.0+ 기본값
acks=all                   # 멱등성 요구사항
retries=2147483647         # 무한 재시도
max.in.flight.requests.per.connection=5  # 최대 5 (멱등성 보장 범위)
멱등 프로듀서 동작 원리
==========================

Producer                          Broker
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 0Message: "order-1"   ──────▶│  ✓ 저장
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✓ 저장
   │                                │
   │── (네트워크 타임아웃으로 재전송)ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✗ 중복! 무시
   │                                │

트랜잭셔널 프로듀서

// 트랜잭셔널 프로듀서 예시
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // 여러 토픽/파티션에 원자적으로 전송
    producer.send(new ProducerRecord<>("orders", "key1", "order-created"));
    producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));
    producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));

    // Consumer 오프셋도 트랜잭션에 포함 가능
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);

    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

3. Consumer 심화

Consumer Group과 파티션 할당

Topic: orders (6 partitions)

Consumer Group: order-processor
===================================

Case 1: 3 Consumers
  Consumer A: [P0] [P1]
  Consumer B: [P2] [P3]
  Consumer C: [P4] [P5]

Case 2: 6 Consumers (이상적)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]

Case 3: 8 Consumers (2개 유휴)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]
  Consumer G: (유휴 - idle)
  Consumer H: (유휴 - idle)

파티션 할당 전략

// 파티션 할당 전략 비교
Properties props = new Properties();

// 1. Range Assignor (기본)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RangeAssignor");
// 토픽별로 파티션을 범위로 나눔
// Topic A: P0,P1,P2 → C1: [P0,P1], C2: [P2]
// Topic B: P0,P1,P2 → C1: [P0,P1], C2: [P2]
// 문제: C1에 편중

// 2. RoundRobin Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 모든 토픽의 파티션을 라운드 로빈으로 분배
// C1: [A-P0, B-P1], C2: [A-P1, B-P0], C3: [A-P2, B-P2]

// 3. Sticky Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.StickyAssignor");
// 기존 할당 최대한 유지 + 균등 분배
// 리밸런싱 시 최소한의 파티션만 이동

// 4. CooperativeSticky Assignor (권장)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Sticky + 점진적 리밸런싱
// 전체 중단 없이 파티션 재할당

리밸런싱: Eager vs Incremental

Eager Rebalancing (기존 방식)
================================
1. Consumer C3 추가됨
2. 모든 Consumer가 파티션 해제 (Stop-the-World)
   C1: [] C2: [] C3: []
3. 새로운 할당 계산
4. 파티션 재할당
   C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]
→ 전체 처리 중단 발생!


Incremental Cooperative Rebalancing (권장)
================================
1. Consumer C3 추가됨
2. 1차 리밸런스: 이동 필요한 파티션만 해제
   C1: [P0,P1,P2]C1: [P0,P1]  (P2 해제)
   C2: [P3,P4,P5]C2: [P3,P4]  (P5 해제)
3. 2차 리밸런스: 해제된 파티션을 C3에 할당
   C3: []C3: [P2,P5]
→ 나머지 파티션은 중단 없이 계속 처리!

오프셋 관리

// 자동 커밋 (기본)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// 위험: 처리 전 커밋되면 메시지 유실 가능

// 수동 커밋 (권장)
props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        // 메시지 처리
        processRecord(record);
    }

    // 동기 커밋 (처리 완료 후)
    consumer.commitSync();

    // 또는 비동기 커밋 (성능 향상)
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("Commit failed", exception);
        }
    });
}
오프셋 저장소: __consumer_offsets 토픽
============================================

Key: (group_id, topic, partition)
Value: (offset, metadata, timestamp)

예시:
Key: (order-processor, orders, 0)
Value: (offset=42, metadata="", timestamp=1700000000)

4. Exactly-Once Semantics

메시지 전달 보장 수준

At-Most-Once (최대 한 번)
================================
Producer ──send()──▶ Broker
  (acks=0, 확인 안 함)
- 메시지 유실 가능
- 중복 없음
- 가장 빠름

At-Least-Once (최소 한 번)
================================
Producer ──send()──▶ Broker ──ack──▶ Producer
  (acks=all, 재시도 있음)
- 메시지 유실 없음
- 중복 가능
- 대부분의 기본 설정

Exactly-Once (정확히 한 번)
================================
Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)
  (멱등 프로듀서 + 트랜잭션 + read_committed)
- 메시지 유실 없음
- 중복 없음
- 가장 강력하지만 오버헤드 있음

Exactly-Once 구현

// Exactly-Once: Consume-Transform-Produce 패턴
Properties producerProps = new Properties();
producerProps.put("transactional.id", "eos-processor-1");
producerProps.put("enable.idempotence", "true");

Properties consumerProps = new Properties();
consumerProps.put("group.id", "eos-group");
consumerProps.put("isolation.level", "read_committed");  // 핵심!
consumerProps.put("enable.auto.commit", "false");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

producer.initTransactions();
consumer.subscribe(Collections.singleton("input-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (!records.isEmpty()) {
        producer.beginTransaction();

        try {
            for (ConsumerRecord<String, String> record : records) {
                // Transform
                String result = transform(record.value());

                // Produce to output topic
                producer.send(new ProducerRecord<>(
                    "output-topic", record.key(), result));
            }

            // Consumer 오프셋을 트랜잭션에 포함
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partRecords =
                    records.records(partition);
                long lastOffset = partRecords.get(
                    partRecords.size() - 1).offset();
                offsets.put(partition,
                    new OffsetAndMetadata(lastOffset + 1));
            }

            producer.sendOffsetsToTransaction(offsets,
                consumer.groupMetadata());
            producer.commitTransaction();

        } catch (Exception e) {
            producer.abortTransaction();
        }
    }
}
Exactly-Once 트랜잭션 흐름
================================

1. beginTransaction()
2. send(output-topic, msg1)Broker에 임시 저장 (uncommitted)
3. send(output-topic, msg2)Broker에 임시 저장
4. sendOffsetsToTransaction()Consumer 오프셋도 트랜잭션에 포함
5. commitTransaction()          → 모든 메시지 + 오프셋 원자적 커밋

read_committed Consumer는 커밋된 메시지만 읽음
→ 중간에 실패하면 abortTransaction()으로 모두 롤백

5. Schema Registry

스키마 직렬화 형식 비교

특성AvroProtobufJSON Schema
직렬화 크기작음작음
스키마 진화우수우수보통
코드 생성선택적필수불필요
가독성낮음 (바이너리)낮음 (바이너리)높음 (텍스트)
동적 타이핑지원미지원지원
Kafka 호환성최상우수보통

Avro 스키마와 진화

// Version 1: 초기 스키마
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"}
  ]
}
// Version 2: Backward Compatible 진화
// 새 필드에 default 값 추가 → 기존 Consumer가 새 데이터를 읽을 수 있음
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"},
    {"name": "status", "type": "string", "default": "CREATED"},
    {"name": "region", "type": ["null", "string"], "default": null}
  ]
}

호환성 모드

BACKWARD (기본값, 권장)
================================
- 새 스키마로 기존 데이터를 읽을 수 있음
- 새 필드: default 값 필수
- 필드 삭제: 가능
- Consumer 먼저 업그레이드 → Producer 업그레이드

FORWARD
================================
- 기존 스키마로 새 데이터를 읽을 수 있음
- 새 필드: default 값 필수
- 필드 삭제: default 값이 있는 필드만
- Producer 먼저 업그레이드 → Consumer 업그레이드

FULL
================================
- BACKWARD + FORWARD 모두 충족
- 가장 안전하지만 가장 제한적

NONE
================================
- 호환성 검사 없음
- 위험! 운영 환경에서 사용 금지
# Schema Registry API 사용
import requests

SCHEMA_REGISTRY_URL = "http://schema-registry:8081"

# 스키마 등록
schema = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"}
    ]
}

response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",
    json={"schema": json.dumps(schema), "schemaType": "AVRO"},
)

# 호환성 검사
response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",
    json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},
)
compatibility = response.json()
print(f"Compatible: {compatibility['is_compatible']}")

# 호환성 모드 설정
requests.put(
    f"{SCHEMA_REGISTRY_URL}/config/orders-value",
    json={"compatibility": "BACKWARD"},
)

Protobuf 스키마 진화

// Version 1
syntax = "proto3";
package com.company.events;

message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
}

// Version 2 (호환성 유지)
message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
  // 새 필드: 기존 필드 번호 사용 금지!
  string status = 6;          // 새로 추가
  string region = 7;          // 새로 추가
  // reserved 2;              // 삭제된 필드 번호 예약
}

┌─────────────────────────────────────────────┐
Flink Cluster├─────────────────────────────────────────────┤
│                                             │
[JobManager]- 잡 스케줄링                              │
- 체크포인트 조정                          │
- 장애 복구                               │
│                                             │
[TaskManager 1]     [TaskManager 2]- Task Slot 1       - Task Slot 1- Task Slot 2       - Task Slot 2- Task Slot 3       - Task Slot 3│                                             │
├─────────────────────────────────────────────┤
[State Backend]- RocksDB (대규모 상태)- HashMap (소규모 상태)│                                             │
[Checkpoint Storage]- S3 / HDFS / GCS└─────────────────────────────────────────────┘

DataStream API

// Flink DataStream API 예시
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka Source
KafkaSource<Order> source = KafkaSource.<Order>builder()
    .setBootstrapServers("broker1:9092")
    .setTopics("orders")
    .setGroupId("flink-order-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new OrderDeserializer())
    .build();

DataStream<Order> orders = env.fromSource(
    source, WatermarkStrategy
        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((order, ts) -> order.getCreatedAt()),
    "Kafka Source"
);

// 처리 파이프라인
DataStream<OrderStats> stats = orders
    .filter(order -> order.getStatus().equals("COMPLETED"))
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new OrderStatsAggregator());

// Kafka Sink
KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()
    .setBootstrapServers("broker1:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("order-stats")
        .setValueSerializationSchema(new OrderStatsSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-order-stats")
    .build();

stats.sinkTo(sink);
env.execute("Order Statistics Pipeline");

윈도우 (Windowing)

Tumbling Window (텀블링 윈도우)
================================
시간: ──|──1──2──3──|──4──5──6──|──7──8──9──|──
윈도우:  [  Window 1  ][  Window 2  ][  Window 3  ]
- 고정 크기, 겹치지 않음
-: 5분마다 집계


Sliding Window (슬라이딩 윈도우)
================================
시간: ──|──1──2──3──4──5──6──7──8──|──
윈도우:  [    Window 1    ]
            [    Window 2    ]
                [    Window 3    ]
- 고정 크기, 슬라이드 간격만큼 이동
-: 10분 윈도우, 5분 슬라이드


Session Window (세션 윈도우)
================================
시간: ──12──3─────────56──7─────────9──
윈도우:  [Session 1]     [Session 2]     [S3]
                  gap          gap
- 활동 기반, 가변 크기
- gap 시간 동안 이벤트 없으면 윈도우 종료
// 윈도우 예시
// Tumbling Window
orders.keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount");

// Sliding Window
orders.keyBy(Order::getRegion)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .sum("amount");

// Session Window
orders.keyBy(Order::getCustomerId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator());

워터마크와 늦은 데이터

Event Time vs Processing Time
================================

이벤트 발생 시간(Event Time):  t=10  t=12  t=11  t=15  t=13
처리 시간(Processing Time):    T=20  T=21  T=22  T=23  T=24

이벤트가 순서대로 도착하지 않음!
→ 워터마크로 "여기까지의 이벤트는 모두 도착했을 것" 표시


워터마크 동작
================================

이벤트: [t=10] [t=12] [t=11] [t=15] [t=13]
워터마크: W(10)  W(12)  W(12)  W(15)  W(15)
         (maxOutOfOrderness = 5초인 경우)
         실제 워터마크: W(5)  W(7)  W(7)  W(10) W(10)

윈도우 [0, 10): 워터마크가 10을 넘으면 발화(fire)
// 워터마크 전략
WatermarkStrategy<Order> strategy = WatermarkStrategy
    // 최대 10초 지연 허용
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((order, timestamp) -> order.getCreatedAt())
    // 5분 이상 이벤트 없으면 워터마크 진행
    .withIdleness(Duration.ofMinutes(5));

// 늦은 데이터 처리
OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};

SingleOutputStreamOperator<OrderStats> stats = orders
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // 1분 추가 대기
    .sideOutputLateData(lateOutputTag)  // 그 이후는 사이드 아웃풋
    .aggregate(new OrderStatsAggregator());

// 늦은 데이터 별도 처리
DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);
lateOrders.addSink(new LateOrderAlertSink());

상태 관리와 체크포인트

// Keyed State 예시
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

    // ValueState: 키별 단일 값
    private ValueState<Boolean> flagState;

    // ListState: 키별 리스트
    private ListState<Transaction> recentTransactions;

    // MapState: 키별 맵
    private MapState<String, Integer> merchantCounts;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor =
            new ValueStateDescriptor<>("flag", Boolean.class);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ListStateDescriptor<Transaction> listDescriptor =
            new ListStateDescriptor<>("recent-txns", Transaction.class);
        recentTransactions = getRuntimeContext().getListState(listDescriptor);

        MapStateDescriptor<String, Integer> mapDescriptor =
            new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
        merchantCounts = getRuntimeContext().getMapState(mapDescriptor);
    }

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
        Boolean flag = flagState.value();

        if (flag != null && txn.getAmount() > 10000) {
            out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));
        }

        if (txn.getAmount() < 1.0) {
            flagState.update(true);
            // 1분 후 플래그 해제 타이머
            ctx.timerService().registerEventTimeTimer(
                txn.getTimestamp() + 60000);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        flagState.clear();
    }
}
체크포인트 동작
================================

[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]

1. JobManager: 체크포인트 배리어 주입
   Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink

2. 배리어가 오퍼레이터를 통과할 때 상태 스냅샷
   Source(snapshot) ──▶ Map(snapshot) ──▶ ...

3. 모든 오퍼레이터 스냅샷 완료 → 체크포인트 성공

4. 장애 발생 시: 마지막 성공 체크포인트에서 복원
// 체크포인트 설정
env.enableCheckpointing(60000);  // 60초 간격
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// State Backend 설정
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");

비교표

특성Kafka StreamsApache Flink
배포 방식라이브러리 (JVM 앱에 임베드)분산 클러스터
입력 소스Kafka만Kafka, Kinesis, 파일 등
Exactly-OnceKafka 내부만외부 시스템 포함
윈도우Tumbling, Sliding, Session+ Global, Custom
상태 관리RocksDB (로컬)RocksDB + 분산 스냅샷
SQL 지원KSQL (별도)Flink SQL (내장)
처리량중-대대-초대
운영 복잡도낮음 (앱 배포)높음 (클러스터 관리)
적합한 경우Kafka 생태계 내 처리복잡한 스트림 처리

선택 기준

use_kafka_streams_when:
  - "Kafka에서 읽고 Kafka에 쓰는 단순한 처리"
  - "별도 클러스터 운영이 부담스러운 경우"
  - "마이크로서비스 내부에 임베드"
  - "중간 정도의 처리량으로 충분한 경우"
  - "팀이 Kafka 생태계에 익숙한 경우"

use_flink_when:
  - "복잡한 이벤트 처리 (CEP)"
  - "다양한 소스/싱크 연결"
  - "대규모 상태 관리 필요"
  - "고급 윈도우 함수 필요"
  - "SQL 기반 스트림 처리"
  - "초대규모 처리량 필요"

8. Kafka Connect

Source/Sink Connector

// Debezium MySQL Source Connector
{
  "name": "mysql-source-orders",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-primary",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz-password",
    "database.server.id": "1",
    "topic.prefix": "cdc",
    "database.include.list": "order_service",
    "table.include.list": "order_service.orders,order_service.order_items",
    "schema.history.internal.kafka.bootstrap.servers": "broker:9092",
    "schema.history.internal.kafka.topic": "schema-changes.orders",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "cdc\\.order_service\\.(.*)",
    "transforms.route.replacement": "order.$1.cdc"
  }
}
// Elasticsearch Sink Connector
{
  "name": "es-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "order.orders.cdc",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "transforms": "extractKey",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "order_id"
  }
}

Debezium CDC

Debezium CDC 흐름
================================

[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]
                                    ┌───────────────┤
                                    ▼               ▼
                              [Flink Job]    [ES Sink Connector]
                                    │               │
                                    ▼               ▼
                              [Kafka Topic]  [Elasticsearch]

CDC 이벤트 구조:
- before: 변경 전 레코드 (UPDATE/DELETE)
- after: 변경 후 레코드 (INSERT/UPDATE)
- source: 소스 DB 메타데이터
- op: 작업 유형 (c=create, u=update, d=delete, r=read)

9. 성능 튜닝

Producer 튜닝

# Producer 성능 최적화
batch.size=65536              # 64KB (기본 16KB)
linger.ms=10                  # 10ms 대기 (배치 효율)
buffer.memory=67108864        # 64MB
compression.type=lz4          # 압축 (네트워크/디스크 절약)
acks=all                      # 안정성
max.in.flight.requests.per.connection=5  # 파이프라이닝
send.buffer.bytes=131072      # 소켓 버퍼

Consumer 튜닝

# Consumer 성능 최적화
fetch.min.bytes=1048576       # 1MB (배치 효율)
fetch.max.wait.ms=500         # 최대 500ms 대기
max.poll.records=500          # 한 번에 가져올 레코드 수
max.partition.fetch.bytes=1048576  # 파티션당 1MB
session.timeout.ms=30000      # 세션 타임아웃
heartbeat.interval.ms=10000   # 하트비트 간격
max.poll.interval.ms=300000   # 폴 간격 최대 5분

Broker 튜닝

# Broker 성능 최적화
num.partitions=12                    # 기본 파티션 수
default.replication.factor=3         # 복제 팩터
num.io.threads=8                     # I/O 스레드
num.network.threads=3                # 네트워크 스레드
num.replica.fetchers=4               # 레플리카 페처
socket.send.buffer.bytes=102400      # 소켓 전송 버퍼
socket.receive.buffer.bytes=102400   # 소켓 수신 버퍼
log.segment.bytes=1073741824         # 1GB 세그먼트
log.retention.hours=168              # 7일 보존
log.retention.bytes=-1               # 크기 제한 없음

파티션 수 결정

파티션 수 결정 공식
================================

필요 처리량: 100MB/s
단일 파티션 처리량: ~10MB/s (Producer)
                    ~5MB/s (Consumer)

Producer 관점: 100MB/s / 10MB/s = 10 파티션
Consumer 관점: 100MB/s / 5MB/s = 20 파티션

→ 최소 20 파티션 필요

추가 고려사항:
- Consumer 인스턴스 수에 맞춤 (파티션 수 >= Consumer)
- 파티션 수 증가는 가능하지만 감소는 불가
- 너무 많은 파티션: 메타데이터 오버헤드, 리밸런싱 지연
- 권장: 토픽당 6-50개 사이

10. 모니터링

JMX 메트릭

# 핵심 Kafka 메트릭
broker_metrics:
  - name: "UnderReplicatedPartitions"
    description: "ISR에서 벗어난 파티션 수"
    alert_threshold: "> 0"
    severity: critical

  - name: "ActiveControllerCount"
    description: "활성 컨트롤러 수"
    alert_threshold: "!= 1"
    severity: critical

  - name: "OfflinePartitionsCount"
    description: "오프라인 파티션 수"
    alert_threshold: "> 0"
    severity: critical

  - name: "RequestsPerSec"
    description: "초당 요청 수"
    alert_threshold: "> 10000"
    severity: warning

producer_metrics:
  - name: "record-send-rate"
    description: "초당 전송 레코드 수"

  - name: "record-error-rate"
    description: "초당 에러 레코드 수"
    alert_threshold: "> 0"

  - name: "request-latency-avg"
    description: "평균 요청 지연시간"
    alert_threshold: "> 100ms"

consumer_metrics:
  - name: "records-lag-max"
    description: "최대 Consumer Lag"
    alert_threshold: "> 10000"
    severity: warning

  - name: "records-consumed-rate"
    description: "초당 소비 레코드 수"

  - name: "commit-latency-avg"
    description: "평균 커밋 지연시간"

Consumer Lag 모니터링

Consumer Lag 계산
================================

Partition 0:
  Log End Offset (LEO): 1000  (최신 메시지)
  Consumer Offset:       850  (마지막 커밋)
  Lag: 1000 - 850 = 150

Partition 1:
  LEO: 2000
  Consumer Offset: 1950
  Lag: 50

Total Lag = 150 + 50 = 200 messages
# Burrow (LinkedIn의 Consumer Lag 모니터링)
burrow_config:
  general:
    pidfile: "/var/run/burrow.pid"
    stdout-logfile: "/var/log/burrow.log"

  cluster:
    kafka-prod:
      class-name: kafka
      servers: ["broker1:9092", "broker2:9092", "broker3:9092"]
      topic-refresh: 120
      offset-refresh: 30

  consumer:
    kafka-prod:
      class-name: kafka
      cluster: kafka-prod
      servers: ["broker1:9092", "broker2:9092"]
      group-denylist: "^(console-consumer|_)"
      start-latest: true

  notifier:
    slack:
      class-name: http
      url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"
      template-open: |
        Consumer group lag alert:
        Group: {{.GroupName}}
        Status: {{.Status}}
      send-close: true

Grafana 대시보드

Kafka 모니터링 대시보드 패널
================================

1. Cluster Overview
   - Broker/ 상태
   - 총 파티션 수
   - Under-replicated 파티션
   - Active Controller

2. Throughput
   - Messages In/Out per sec
   - Bytes In/Out per sec
   - Requests per sec

3. Consumer Groups
   - Consumer Lag (그룹별)
   - Consume Rate
   - Rebalance 횟수

4. Latency
   - Produce Request Latency (p50, p99)
   - Fetch Request Latency
   - End-to-End Latency

5. Resources
   - Disk Usage per Broker
   - Network I/O
   - CPU / Memory

11. 퀴즈

Q1. Kafka에서 같은 키를 가진 메시지가 항상 같은 파티션에 들어가는 이유는?

기본 파티셔너가 murmur2 해시 함수를 사용하여 키의 해시값을 파티션 수로 나눈 나머지로 파티션을 결정하기 때문입니다.

partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

같은 키는 항상 같은 해시값을 생성하므로 같은 파티션에 할당됩니다. 이를 통해 같은 키의 메시지 순서가 보장됩니다.

단, 파티션 수가 변경되면 키-파티션 매핑이 달라질 수 있습니다.

Q2. Exactly-Once Semantics를 달성하기 위해 필요한 3가지 요소는?
  1. 멱등 프로듀서 (Idempotent Producer): enable.idempotence=true로 ProducerID와 Sequence Number를 사용하여 중복 전송을 방지합니다.

  2. 트랜잭셔널 프로듀서 (Transactional Producer): transactional.id를 설정하고 beginTransaction(), commitTransaction()으로 여러 메시지를 원자적으로 전송합니다.

  3. read_committed 격리 수준: Consumer에서 isolation.level=read_committed로 설정하여 커밋된 트랜잭션의 메시지만 읽습니다.

이 세 가지가 결합되어 Consume-Transform-Produce 패턴에서 정확히 한 번만 처리되는 것을 보장합니다.

Q3. Avro의 BACKWARD 호환성에서 새 필드를 추가할 때 반드시 필요한 것은?

default 값이 반드시 필요합니다.

BACKWARD 호환성은 새 스키마(v2)로 기존 데이터(v1으로 작성된)를 읽을 수 있어야 합니다. 기존 데이터에는 새 필드가 없으므로, 새 필드에 default 값이 없으면 역직렬화 시 오류가 발생합니다.

예시: "name": "status", "type": "string", "default": "CREATED"

Optional 필드는 ["null", "string"] union 타입에 "default": null을 사용합니다.

Q4. Flink의 워터마크가 해결하는 문제는 무엇인가?

워터마크는 이벤트 시간(Event Time) 기반 처리에서 늦게 도착하는 데이터 문제를 해결합니다.

분산 시스템에서 이벤트는 발생 순서와 다르게 도착할 수 있습니다. 워터마크는 특정 시점까지의 이벤트가 모두 도착했을 것이라는 추정을 제공합니다.

W(t) = "시간 t 이전의 모든 이벤트가 도착했을 것으로 추정"

워터마크가 윈도우의 끝 시간을 넘으면, 해당 윈도우의 계산 결과를 출력합니다. allowedLateness를 설정하면 워터마크 이후에도 늦은 데이터를 일정 시간 동안 수용할 수 있습니다.

Q5. Kafka Streams와 Apache Flink 중 어떤 것을 선택해야 하는 기준은?

Kafka Streams 선택 기준:

  • Kafka에서 읽고 Kafka에 쓰는 단순한 처리
  • 별도 클러스터 운영이 부담스러운 경우
  • 마이크로서비스에 임베드하는 경우
  • 팀이 Kafka 생태계에 익숙한 경우

Apache Flink 선택 기준:

  • 복잡한 이벤트 처리(CEP)가 필요한 경우
  • Kafka 외 다양한 소스/싱크 연결이 필요한 경우
  • 대규모 상태 관리가 필요한 경우
  • SQL 기반 스트림 처리가 필요한 경우
  • 초대규모 처리량이 필요한 경우

핵심 차이: Kafka Streams는 라이브러리(JVM 앱에 임베드), Flink는 분산 클러스터입니다.


참고 자료

  1. Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/
  2. Narkhede, N., Shapira, G., Palino, T. (2021). Kafka: The Definitive Guide, 2nd ed. O'Reilly Media.
  3. Apache Flink Documentation. (2025). https://flink.apache.org/docs/
  4. Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/
  5. Hueske, F., Kalavri, V. (2019). Stream Processing with Apache Flink. O'Reilly Media.
  6. KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
  7. Debezium Documentation. (2025). https://debezium.io/documentation/
  8. Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
  9. Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."
  10. Apache Avro Specification. (2025). https://avro.apache.org/docs/
  11. Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow
  12. Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."
  13. Flink Forward Conference Talks. (2024). https://www.flink-forward.org/
  14. KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.