Skip to content

Split View: Event Streaming 심화 가이드 2025: Kafka 내부 구조, Flink, Exactly-Once, 스키마 진화

✨ Learn with Quiz
|

Event Streaming 심화 가이드 2025: Kafka 내부 구조, Flink, Exactly-Once, 스키마 진화

들어가며: Event Streaming의 진화

Event Streaming은 현대 분산 시스템의 핵심 인프라입니다. Apache Kafka를 중심으로 한 이벤트 스트리밍 생태계는 단순한 메시지 큐를 넘어, 실시간 데이터 파이프라인, 이벤트 소싱, CQRS, 스트림 처리의 기반이 되었습니다.

이 가이드에서는 Kafka의 내부 구조부터 Exactly-Once 시맨틱스, Schema Registry, Apache Flink까지 심층적으로 다룹니다.


1. Kafka 내부 구조 심화

파티션 구조

Kafka의 토픽은 하나 이상의 파티션으로 분할됩니다. 각 파티션은 순서가 보장되는 불변의 로그입니다.

Topic: orders (3 partitions)

Partition 0: [msg0] [msg3] [msg6] [msg9]  ... → offset 증가
Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset 증가
Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset 증가

- 같은 파티션 내에서만 순서 보장
- 파티션 수 = Consumer Group 내 최대 병렬 소비 수
- 파티션 키로 같은 키의 메시지가 같은 파티션으로

세그먼트 파일과 인덱스

각 파티션은 디스크에 세그먼트 파일로 저장됩니다.

/kafka-logs/orders-0/
├── 00000000000000000000.log      # 세그먼트 파일 (실제 메시지)
├── 00000000000000000000.index    # 오프셋 인덱스
├── 00000000000000000000.timeindex # 타임스탬프 인덱스
├── 00000000000005242880.log      # 다음 세그먼트
├── 00000000000005242880.index
├── 00000000000005242880.timeindex
└── leader-epoch-checkpoint
세그먼트 파일 구조
========================

.log 파일 (메시지 저장)
┌──────────────────────────────────────┐
Batch Header- Base Offset: 0- Batch Length: 256 bytes           │
- Magic: 2 (Kafka 0.11+)- CRC: checksum                     │
- Attributes: compression, txn      │
- Producer ID: 12345- Producer Epoch: 0- Base Sequence: 0├──────────────────────────────────────┤
Record 0- Offset Delta: 0- Timestamp Delta: 0- Key: "order-123"- Value: (serialized payload)- Headers: [("source", "web")]├──────────────────────────────────────┤
Record 1- Offset Delta: 1- ...└──────────────────────────────────────┘

.index 파일 (희소 인덱스)
┌────────────────────────┐
Relative OffsetFilePosition├────────────────────────┤
00409632768819265536└────────────────────────┘

.timeindex 파일
┌────────────────────────────────┐
TimestampRelative Offset├────────────────────────────────┤
1700000000000017000000600004096└────────────────────────────────┘

Log Compaction

Log Compaction은 각 키의 최신 값만 유지하는 보존 전략입니다.

Compaction:
offset 0: key=A, value=v1
offset 1: key=B, value=v1
offset 2: key=A, value=v2  ← A의 최신 값
offset 3: key=C, value=v1
offset 4: key=B, value=v2  ← B의 최신 값
offset 5: key=A, value=v3  ← A의 최신 값

Compaction:
offset 3: key=C, value=v1  (C의 유일한 값)
offset 4: key=B, value=v2  (B의 최신 값)
offset 5: key=A, value=v3  (A의 최신 값)
# Log Compaction 설정
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
min.cleanable.dirty.ratio=0.5

ISR (In-Sync Replicas)

ISR은 리더와 동기화된 레플리카의 집합입니다.

Partition 0 (Replication Factor = 3)
======================================

Broker 1 (Leader):      [0] [1] [2] [3] [4] [5]LEO: 6
Broker 2 (Follower):    [0] [1] [2] [3] [4]LEO: 5
Broker 3 (Follower):    [0] [1] [2] [3] [4] [5]LEO: 6

ISR = {Broker 1, Broker 2, Broker 3}
HW (High Watermark) = 5 (ISR 내 최소 LEO)

- Consumer는 HW까지만 읽을 수 있음
- Broker 2가 지연이 심해지면 ISR에서 제거
- replica.lag.time.max.ms 초과 시 ISR에서 제거
# ISR 관련 설정
min.insync.replicas=2           # 최소 동기화 레플리카 수
replica.lag.time.max.ms=30000   # ISR 탈락 기준 시간
unclean.leader.election.enable=false  # ISR 외 레플리카의 리더 선출 방지

KRaft vs ZooKeeper

Kafka 3.x부터 ZooKeeper 없이 동작하는 KRaft 모드가 도입되었습니다.

ZooKeeper 모드 (레거시)
==========================
[ZooKeeper Ensemble]
  ↕ 메타데이터
[Controller (Broker1)]
  ↕ 리더 선출
[Broker 1] [Broker 2] [Broker 3]

문제점:
- ZooKeeper 별도 운영 필요
- 메타데이터 동기화 지연
- 파티션 수 증가 시 성능 저하


KRaft 모드 (Kafka 3.x+)
==========================
[Controller Quorum]
  Broker 1 (Controller + Broker)
  Broker 2 (Controller + Broker)
  Broker 3 (Controller + Broker)

장점:
- ZooKeeper 의존성 제거
- 메타데이터 관리 성능 향상
- 수백만 파티션 지원
- 운영 복잡도 감소
# KRaft 설정
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER

2. Producer 심화

Partitioner

프로듀서는 메시지를 어느 파티션에 보낼지 결정하는 파티셔너를 사용합니다.

// 기본 파티셔너 동작
public class DefaultPartitioner implements Partitioner {
    // 키가 있는 경우: murmur2 해시
    // 키가 없는 경우: Sticky Partitioner (배치 단위로 파티션 변경)

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            // Sticky Partitioner: 배치가 꽉 찰 때까지 같은 파티션
            return stickyPartitionCache.partition(topic, cluster);
        }

        // Key-based partitioning: 같은 키 = 같은 파티션
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
// 커스텀 파티셔너 예시: 지역 기반
public class RegionPartitioner implements Partitioner {
    private Map<String, Integer> regionMapping;

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String region = extractRegion(value);
        int numPartitions = cluster.partitionsForTopic(topic).size();

        return regionMapping.getOrDefault(region, 0) % numPartitions;
    }
}

배치와 압축

# Producer 배치 설정
batch.size=16384                  # 배치 크기 (bytes)
linger.ms=5                       # 배치 대기 시간
buffer.memory=33554432            # 전체 버퍼 메모리

# 압축 설정
compression.type=lz4              # none, gzip, snappy, lz4, zstd
Producer 배치 처리 흐름
==========================

send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]
                                         ┌───────────┴───────────┐
Partition 0 배치     │
[msg1][msg2][msg3]                                         ├───────────────────────┤
Partition 1 배치     │
[msg4][msg5]                                         └───────────┬───────────┘
                                              batch.size 도달
                                              또는 linger.ms 경과
                                              [Sender Thread]
                                              압축 + 네트워크 전송
                                              [Kafka Broker]

멱등 프로듀서 (Idempotent Producer)

# 멱등 프로듀서 설정
enable.idempotence=true    # Kafka 3.0+ 기본값
acks=all                   # 멱등성 요구사항
retries=2147483647         # 무한 재시도
max.in.flight.requests.per.connection=5  # 최대 5 (멱등성 보장 범위)
멱등 프로듀서 동작 원리
==========================

Producer                          Broker
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 0Message: "order-1"   ──────▶│  ✓ 저장
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✓ 저장
   │                                │
   │── (네트워크 타임아웃으로 재전송)ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✗ 중복! 무시
   │                                │

트랜잭셔널 프로듀서

// 트랜잭셔널 프로듀서 예시
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // 여러 토픽/파티션에 원자적으로 전송
    producer.send(new ProducerRecord<>("orders", "key1", "order-created"));
    producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));
    producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));

    // Consumer 오프셋도 트랜잭션에 포함 가능
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);

    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

3. Consumer 심화

Consumer Group과 파티션 할당

Topic: orders (6 partitions)

Consumer Group: order-processor
===================================

Case 1: 3 Consumers
  Consumer A: [P0] [P1]
  Consumer B: [P2] [P3]
  Consumer C: [P4] [P5]

Case 2: 6 Consumers (이상적)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]

Case 3: 8 Consumers (2개 유휴)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]
  Consumer G: (유휴 - idle)
  Consumer H: (유휴 - idle)

파티션 할당 전략

// 파티션 할당 전략 비교
Properties props = new Properties();

// 1. Range Assignor (기본)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RangeAssignor");
// 토픽별로 파티션을 범위로 나눔
// Topic A: P0,P1,P2 → C1: [P0,P1], C2: [P2]
// Topic B: P0,P1,P2 → C1: [P0,P1], C2: [P2]
// 문제: C1에 편중

// 2. RoundRobin Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 모든 토픽의 파티션을 라운드 로빈으로 분배
// C1: [A-P0, B-P1], C2: [A-P1, B-P0], C3: [A-P2, B-P2]

// 3. Sticky Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.StickyAssignor");
// 기존 할당 최대한 유지 + 균등 분배
// 리밸런싱 시 최소한의 파티션만 이동

// 4. CooperativeSticky Assignor (권장)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Sticky + 점진적 리밸런싱
// 전체 중단 없이 파티션 재할당

리밸런싱: Eager vs Incremental

Eager Rebalancing (기존 방식)
================================
1. Consumer C3 추가됨
2. 모든 Consumer가 파티션 해제 (Stop-the-World)
   C1: [] C2: [] C3: []
3. 새로운 할당 계산
4. 파티션 재할당
   C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]
→ 전체 처리 중단 발생!


Incremental Cooperative Rebalancing (권장)
================================
1. Consumer C3 추가됨
2. 1차 리밸런스: 이동 필요한 파티션만 해제
   C1: [P0,P1,P2]C1: [P0,P1]  (P2 해제)
   C2: [P3,P4,P5]C2: [P3,P4]  (P5 해제)
3. 2차 리밸런스: 해제된 파티션을 C3에 할당
   C3: []C3: [P2,P5]
→ 나머지 파티션은 중단 없이 계속 처리!

오프셋 관리

// 자동 커밋 (기본)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// 위험: 처리 전 커밋되면 메시지 유실 가능

// 수동 커밋 (권장)
props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        // 메시지 처리
        processRecord(record);
    }

    // 동기 커밋 (처리 완료 후)
    consumer.commitSync();

    // 또는 비동기 커밋 (성능 향상)
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("Commit failed", exception);
        }
    });
}
오프셋 저장소: __consumer_offsets 토픽
============================================

Key: (group_id, topic, partition)
Value: (offset, metadata, timestamp)

예시:
Key: (order-processor, orders, 0)
Value: (offset=42, metadata="", timestamp=1700000000)

4. Exactly-Once Semantics

메시지 전달 보장 수준

At-Most-Once (최대 한 번)
================================
Producer ──send()──▶ Broker
  (acks=0, 확인 안 함)
- 메시지 유실 가능
- 중복 없음
- 가장 빠름

At-Least-Once (최소 한 번)
================================
Producer ──send()──▶ Broker ──ack──▶ Producer
  (acks=all, 재시도 있음)
- 메시지 유실 없음
- 중복 가능
- 대부분의 기본 설정

Exactly-Once (정확히 한 번)
================================
Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)
  (멱등 프로듀서 + 트랜잭션 + read_committed)
- 메시지 유실 없음
- 중복 없음
- 가장 강력하지만 오버헤드 있음

Exactly-Once 구현

// Exactly-Once: Consume-Transform-Produce 패턴
Properties producerProps = new Properties();
producerProps.put("transactional.id", "eos-processor-1");
producerProps.put("enable.idempotence", "true");

Properties consumerProps = new Properties();
consumerProps.put("group.id", "eos-group");
consumerProps.put("isolation.level", "read_committed");  // 핵심!
consumerProps.put("enable.auto.commit", "false");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

producer.initTransactions();
consumer.subscribe(Collections.singleton("input-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (!records.isEmpty()) {
        producer.beginTransaction();

        try {
            for (ConsumerRecord<String, String> record : records) {
                // Transform
                String result = transform(record.value());

                // Produce to output topic
                producer.send(new ProducerRecord<>(
                    "output-topic", record.key(), result));
            }

            // Consumer 오프셋을 트랜잭션에 포함
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partRecords =
                    records.records(partition);
                long lastOffset = partRecords.get(
                    partRecords.size() - 1).offset();
                offsets.put(partition,
                    new OffsetAndMetadata(lastOffset + 1));
            }

            producer.sendOffsetsToTransaction(offsets,
                consumer.groupMetadata());
            producer.commitTransaction();

        } catch (Exception e) {
            producer.abortTransaction();
        }
    }
}
Exactly-Once 트랜잭션 흐름
================================

1. beginTransaction()
2. send(output-topic, msg1)Broker에 임시 저장 (uncommitted)
3. send(output-topic, msg2)Broker에 임시 저장
4. sendOffsetsToTransaction()Consumer 오프셋도 트랜잭션에 포함
5. commitTransaction()          → 모든 메시지 + 오프셋 원자적 커밋

read_committed Consumer는 커밋된 메시지만 읽음
→ 중간에 실패하면 abortTransaction()으로 모두 롤백

5. Schema Registry

스키마 직렬화 형식 비교

특성AvroProtobufJSON Schema
직렬화 크기작음작음
스키마 진화우수우수보통
코드 생성선택적필수불필요
가독성낮음 (바이너리)낮음 (바이너리)높음 (텍스트)
동적 타이핑지원미지원지원
Kafka 호환성최상우수보통

Avro 스키마와 진화

// Version 1: 초기 스키마
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"}
  ]
}
// Version 2: Backward Compatible 진화
// 새 필드에 default 값 추가 → 기존 Consumer가 새 데이터를 읽을 수 있음
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"},
    {"name": "status", "type": "string", "default": "CREATED"},
    {"name": "region", "type": ["null", "string"], "default": null}
  ]
}

호환성 모드

BACKWARD (기본값, 권장)
================================
- 새 스키마로 기존 데이터를 읽을 수 있음
- 새 필드: default 값 필수
- 필드 삭제: 가능
- Consumer 먼저 업그레이드 → Producer 업그레이드

FORWARD
================================
- 기존 스키마로 새 데이터를 읽을 수 있음
- 새 필드: default 값 필수
- 필드 삭제: default 값이 있는 필드만
- Producer 먼저 업그레이드 → Consumer 업그레이드

FULL
================================
- BACKWARD + FORWARD 모두 충족
- 가장 안전하지만 가장 제한적

NONE
================================
- 호환성 검사 없음
- 위험! 운영 환경에서 사용 금지
# Schema Registry API 사용
import requests

SCHEMA_REGISTRY_URL = "http://schema-registry:8081"

# 스키마 등록
schema = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"}
    ]
}

response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",
    json={"schema": json.dumps(schema), "schemaType": "AVRO"},
)

# 호환성 검사
response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",
    json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},
)
compatibility = response.json()
print(f"Compatible: {compatibility['is_compatible']}")

# 호환성 모드 설정
requests.put(
    f"{SCHEMA_REGISTRY_URL}/config/orders-value",
    json={"compatibility": "BACKWARD"},
)

Protobuf 스키마 진화

// Version 1
syntax = "proto3";
package com.company.events;

message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
}

// Version 2 (호환성 유지)
message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
  // 새 필드: 기존 필드 번호 사용 금지!
  string status = 6;          // 새로 추가
  string region = 7;          // 새로 추가
  // reserved 2;              // 삭제된 필드 번호 예약
}

┌─────────────────────────────────────────────┐
Flink Cluster├─────────────────────────────────────────────┤
│                                             │
[JobManager]- 잡 스케줄링                              │
- 체크포인트 조정                          │
- 장애 복구                               │
│                                             │
[TaskManager 1]     [TaskManager 2]- Task Slot 1       - Task Slot 1- Task Slot 2       - Task Slot 2- Task Slot 3       - Task Slot 3│                                             │
├─────────────────────────────────────────────┤
[State Backend]- RocksDB (대규모 상태)- HashMap (소규모 상태)│                                             │
[Checkpoint Storage]- S3 / HDFS / GCS└─────────────────────────────────────────────┘

DataStream API

// Flink DataStream API 예시
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka Source
KafkaSource<Order> source = KafkaSource.<Order>builder()
    .setBootstrapServers("broker1:9092")
    .setTopics("orders")
    .setGroupId("flink-order-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new OrderDeserializer())
    .build();

DataStream<Order> orders = env.fromSource(
    source, WatermarkStrategy
        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((order, ts) -> order.getCreatedAt()),
    "Kafka Source"
);

// 처리 파이프라인
DataStream<OrderStats> stats = orders
    .filter(order -> order.getStatus().equals("COMPLETED"))
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new OrderStatsAggregator());

// Kafka Sink
KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()
    .setBootstrapServers("broker1:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("order-stats")
        .setValueSerializationSchema(new OrderStatsSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-order-stats")
    .build();

stats.sinkTo(sink);
env.execute("Order Statistics Pipeline");

윈도우 (Windowing)

Tumbling Window (텀블링 윈도우)
================================
시간: ──|──1──2──3──|──4──5──6──|──7──8──9──|──
윈도우:  [  Window 1  ][  Window 2  ][  Window 3  ]
- 고정 크기, 겹치지 않음
-: 5분마다 집계


Sliding Window (슬라이딩 윈도우)
================================
시간: ──|──1──2──3──4──5──6──7──8──|──
윈도우:  [    Window 1    ]
            [    Window 2    ]
                [    Window 3    ]
- 고정 크기, 슬라이드 간격만큼 이동
-: 10분 윈도우, 5분 슬라이드


Session Window (세션 윈도우)
================================
시간: ──12──3─────────56──7─────────9──
윈도우:  [Session 1]     [Session 2]     [S3]
                  gap          gap
- 활동 기반, 가변 크기
- gap 시간 동안 이벤트 없으면 윈도우 종료
// 윈도우 예시
// Tumbling Window
orders.keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount");

// Sliding Window
orders.keyBy(Order::getRegion)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .sum("amount");

// Session Window
orders.keyBy(Order::getCustomerId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator());

워터마크와 늦은 데이터

Event Time vs Processing Time
================================

이벤트 발생 시간(Event Time):  t=10  t=12  t=11  t=15  t=13
처리 시간(Processing Time):    T=20  T=21  T=22  T=23  T=24

이벤트가 순서대로 도착하지 않음!
→ 워터마크로 "여기까지의 이벤트는 모두 도착했을 것" 표시


워터마크 동작
================================

이벤트: [t=10] [t=12] [t=11] [t=15] [t=13]
워터마크: W(10)  W(12)  W(12)  W(15)  W(15)
         (maxOutOfOrderness = 5초인 경우)
         실제 워터마크: W(5)  W(7)  W(7)  W(10) W(10)

윈도우 [0, 10): 워터마크가 10을 넘으면 발화(fire)
// 워터마크 전략
WatermarkStrategy<Order> strategy = WatermarkStrategy
    // 최대 10초 지연 허용
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((order, timestamp) -> order.getCreatedAt())
    // 5분 이상 이벤트 없으면 워터마크 진행
    .withIdleness(Duration.ofMinutes(5));

// 늦은 데이터 처리
OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};

SingleOutputStreamOperator<OrderStats> stats = orders
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // 1분 추가 대기
    .sideOutputLateData(lateOutputTag)  // 그 이후는 사이드 아웃풋
    .aggregate(new OrderStatsAggregator());

// 늦은 데이터 별도 처리
DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);
lateOrders.addSink(new LateOrderAlertSink());

상태 관리와 체크포인트

// Keyed State 예시
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

    // ValueState: 키별 단일 값
    private ValueState<Boolean> flagState;

    // ListState: 키별 리스트
    private ListState<Transaction> recentTransactions;

    // MapState: 키별 맵
    private MapState<String, Integer> merchantCounts;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor =
            new ValueStateDescriptor<>("flag", Boolean.class);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ListStateDescriptor<Transaction> listDescriptor =
            new ListStateDescriptor<>("recent-txns", Transaction.class);
        recentTransactions = getRuntimeContext().getListState(listDescriptor);

        MapStateDescriptor<String, Integer> mapDescriptor =
            new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
        merchantCounts = getRuntimeContext().getMapState(mapDescriptor);
    }

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
        Boolean flag = flagState.value();

        if (flag != null && txn.getAmount() > 10000) {
            out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));
        }

        if (txn.getAmount() < 1.0) {
            flagState.update(true);
            // 1분 후 플래그 해제 타이머
            ctx.timerService().registerEventTimeTimer(
                txn.getTimestamp() + 60000);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        flagState.clear();
    }
}
체크포인트 동작
================================

[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]

1. JobManager: 체크포인트 배리어 주입
   Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink

2. 배리어가 오퍼레이터를 통과할 때 상태 스냅샷
   Source(snapshot) ──▶ Map(snapshot) ──▶ ...

3. 모든 오퍼레이터 스냅샷 완료 → 체크포인트 성공

4. 장애 발생 시: 마지막 성공 체크포인트에서 복원
// 체크포인트 설정
env.enableCheckpointing(60000);  // 60초 간격
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// State Backend 설정
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");

비교표

특성Kafka StreamsApache Flink
배포 방식라이브러리 (JVM 앱에 임베드)분산 클러스터
입력 소스Kafka만Kafka, Kinesis, 파일 등
Exactly-OnceKafka 내부만외부 시스템 포함
윈도우Tumbling, Sliding, Session+ Global, Custom
상태 관리RocksDB (로컬)RocksDB + 분산 스냅샷
SQL 지원KSQL (별도)Flink SQL (내장)
처리량중-대대-초대
운영 복잡도낮음 (앱 배포)높음 (클러스터 관리)
적합한 경우Kafka 생태계 내 처리복잡한 스트림 처리

선택 기준

use_kafka_streams_when:
  - "Kafka에서 읽고 Kafka에 쓰는 단순한 처리"
  - "별도 클러스터 운영이 부담스러운 경우"
  - "마이크로서비스 내부에 임베드"
  - "중간 정도의 처리량으로 충분한 경우"
  - "팀이 Kafka 생태계에 익숙한 경우"

use_flink_when:
  - "복잡한 이벤트 처리 (CEP)"
  - "다양한 소스/싱크 연결"
  - "대규모 상태 관리 필요"
  - "고급 윈도우 함수 필요"
  - "SQL 기반 스트림 처리"
  - "초대규모 처리량 필요"

8. Kafka Connect

Source/Sink Connector

// Debezium MySQL Source Connector
{
  "name": "mysql-source-orders",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-primary",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz-password",
    "database.server.id": "1",
    "topic.prefix": "cdc",
    "database.include.list": "order_service",
    "table.include.list": "order_service.orders,order_service.order_items",
    "schema.history.internal.kafka.bootstrap.servers": "broker:9092",
    "schema.history.internal.kafka.topic": "schema-changes.orders",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "cdc\\.order_service\\.(.*)",
    "transforms.route.replacement": "order.$1.cdc"
  }
}
// Elasticsearch Sink Connector
{
  "name": "es-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "order.orders.cdc",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "transforms": "extractKey",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "order_id"
  }
}

Debezium CDC

Debezium CDC 흐름
================================

[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]
                                    ┌───────────────┤
                                    ▼               ▼
                              [Flink Job]    [ES Sink Connector]
                                    │               │
                                    ▼               ▼
                              [Kafka Topic]  [Elasticsearch]

CDC 이벤트 구조:
- before: 변경 전 레코드 (UPDATE/DELETE)
- after: 변경 후 레코드 (INSERT/UPDATE)
- source: 소스 DB 메타데이터
- op: 작업 유형 (c=create, u=update, d=delete, r=read)

9. 성능 튜닝

Producer 튜닝

# Producer 성능 최적화
batch.size=65536              # 64KB (기본 16KB)
linger.ms=10                  # 10ms 대기 (배치 효율)
buffer.memory=67108864        # 64MB
compression.type=lz4          # 압축 (네트워크/디스크 절약)
acks=all                      # 안정성
max.in.flight.requests.per.connection=5  # 파이프라이닝
send.buffer.bytes=131072      # 소켓 버퍼

Consumer 튜닝

# Consumer 성능 최적화
fetch.min.bytes=1048576       # 1MB (배치 효율)
fetch.max.wait.ms=500         # 최대 500ms 대기
max.poll.records=500          # 한 번에 가져올 레코드 수
max.partition.fetch.bytes=1048576  # 파티션당 1MB
session.timeout.ms=30000      # 세션 타임아웃
heartbeat.interval.ms=10000   # 하트비트 간격
max.poll.interval.ms=300000   # 폴 간격 최대 5분

Broker 튜닝

# Broker 성능 최적화
num.partitions=12                    # 기본 파티션 수
default.replication.factor=3         # 복제 팩터
num.io.threads=8                     # I/O 스레드
num.network.threads=3                # 네트워크 스레드
num.replica.fetchers=4               # 레플리카 페처
socket.send.buffer.bytes=102400      # 소켓 전송 버퍼
socket.receive.buffer.bytes=102400   # 소켓 수신 버퍼
log.segment.bytes=1073741824         # 1GB 세그먼트
log.retention.hours=168              # 7일 보존
log.retention.bytes=-1               # 크기 제한 없음

파티션 수 결정

파티션 수 결정 공식
================================

필요 처리량: 100MB/s
단일 파티션 처리량: ~10MB/s (Producer)
                    ~5MB/s (Consumer)

Producer 관점: 100MB/s / 10MB/s = 10 파티션
Consumer 관점: 100MB/s / 5MB/s = 20 파티션

→ 최소 20 파티션 필요

추가 고려사항:
- Consumer 인스턴스 수에 맞춤 (파티션 수 >= Consumer)
- 파티션 수 증가는 가능하지만 감소는 불가
- 너무 많은 파티션: 메타데이터 오버헤드, 리밸런싱 지연
- 권장: 토픽당 6-50개 사이

10. 모니터링

JMX 메트릭

# 핵심 Kafka 메트릭
broker_metrics:
  - name: "UnderReplicatedPartitions"
    description: "ISR에서 벗어난 파티션 수"
    alert_threshold: "> 0"
    severity: critical

  - name: "ActiveControllerCount"
    description: "활성 컨트롤러 수"
    alert_threshold: "!= 1"
    severity: critical

  - name: "OfflinePartitionsCount"
    description: "오프라인 파티션 수"
    alert_threshold: "> 0"
    severity: critical

  - name: "RequestsPerSec"
    description: "초당 요청 수"
    alert_threshold: "> 10000"
    severity: warning

producer_metrics:
  - name: "record-send-rate"
    description: "초당 전송 레코드 수"

  - name: "record-error-rate"
    description: "초당 에러 레코드 수"
    alert_threshold: "> 0"

  - name: "request-latency-avg"
    description: "평균 요청 지연시간"
    alert_threshold: "> 100ms"

consumer_metrics:
  - name: "records-lag-max"
    description: "최대 Consumer Lag"
    alert_threshold: "> 10000"
    severity: warning

  - name: "records-consumed-rate"
    description: "초당 소비 레코드 수"

  - name: "commit-latency-avg"
    description: "평균 커밋 지연시간"

Consumer Lag 모니터링

Consumer Lag 계산
================================

Partition 0:
  Log End Offset (LEO): 1000  (최신 메시지)
  Consumer Offset:       850  (마지막 커밋)
  Lag: 1000 - 850 = 150

Partition 1:
  LEO: 2000
  Consumer Offset: 1950
  Lag: 50

Total Lag = 150 + 50 = 200 messages
# Burrow (LinkedIn의 Consumer Lag 모니터링)
burrow_config:
  general:
    pidfile: "/var/run/burrow.pid"
    stdout-logfile: "/var/log/burrow.log"

  cluster:
    kafka-prod:
      class-name: kafka
      servers: ["broker1:9092", "broker2:9092", "broker3:9092"]
      topic-refresh: 120
      offset-refresh: 30

  consumer:
    kafka-prod:
      class-name: kafka
      cluster: kafka-prod
      servers: ["broker1:9092", "broker2:9092"]
      group-denylist: "^(console-consumer|_)"
      start-latest: true

  notifier:
    slack:
      class-name: http
      url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"
      template-open: |
        Consumer group lag alert:
        Group: {{.GroupName}}
        Status: {{.Status}}
      send-close: true

Grafana 대시보드

Kafka 모니터링 대시보드 패널
================================

1. Cluster Overview
   - Broker/ 상태
   - 총 파티션 수
   - Under-replicated 파티션
   - Active Controller

2. Throughput
   - Messages In/Out per sec
   - Bytes In/Out per sec
   - Requests per sec

3. Consumer Groups
   - Consumer Lag (그룹별)
   - Consume Rate
   - Rebalance 횟수

4. Latency
   - Produce Request Latency (p50, p99)
   - Fetch Request Latency
   - End-to-End Latency

5. Resources
   - Disk Usage per Broker
   - Network I/O
   - CPU / Memory

11. 퀴즈

Q1. Kafka에서 같은 키를 가진 메시지가 항상 같은 파티션에 들어가는 이유는?

기본 파티셔너가 murmur2 해시 함수를 사용하여 키의 해시값을 파티션 수로 나눈 나머지로 파티션을 결정하기 때문입니다.

partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

같은 키는 항상 같은 해시값을 생성하므로 같은 파티션에 할당됩니다. 이를 통해 같은 키의 메시지 순서가 보장됩니다.

단, 파티션 수가 변경되면 키-파티션 매핑이 달라질 수 있습니다.

Q2. Exactly-Once Semantics를 달성하기 위해 필요한 3가지 요소는?
  1. 멱등 프로듀서 (Idempotent Producer): enable.idempotence=true로 ProducerID와 Sequence Number를 사용하여 중복 전송을 방지합니다.

  2. 트랜잭셔널 프로듀서 (Transactional Producer): transactional.id를 설정하고 beginTransaction(), commitTransaction()으로 여러 메시지를 원자적으로 전송합니다.

  3. read_committed 격리 수준: Consumer에서 isolation.level=read_committed로 설정하여 커밋된 트랜잭션의 메시지만 읽습니다.

이 세 가지가 결합되어 Consume-Transform-Produce 패턴에서 정확히 한 번만 처리되는 것을 보장합니다.

Q3. Avro의 BACKWARD 호환성에서 새 필드를 추가할 때 반드시 필요한 것은?

default 값이 반드시 필요합니다.

BACKWARD 호환성은 새 스키마(v2)로 기존 데이터(v1으로 작성된)를 읽을 수 있어야 합니다. 기존 데이터에는 새 필드가 없으므로, 새 필드에 default 값이 없으면 역직렬화 시 오류가 발생합니다.

예시: "name": "status", "type": "string", "default": "CREATED"

Optional 필드는 ["null", "string"] union 타입에 "default": null을 사용합니다.

Q4. Flink의 워터마크가 해결하는 문제는 무엇인가?

워터마크는 이벤트 시간(Event Time) 기반 처리에서 늦게 도착하는 데이터 문제를 해결합니다.

분산 시스템에서 이벤트는 발생 순서와 다르게 도착할 수 있습니다. 워터마크는 특정 시점까지의 이벤트가 모두 도착했을 것이라는 추정을 제공합니다.

W(t) = "시간 t 이전의 모든 이벤트가 도착했을 것으로 추정"

워터마크가 윈도우의 끝 시간을 넘으면, 해당 윈도우의 계산 결과를 출력합니다. allowedLateness를 설정하면 워터마크 이후에도 늦은 데이터를 일정 시간 동안 수용할 수 있습니다.

Q5. Kafka Streams와 Apache Flink 중 어떤 것을 선택해야 하는 기준은?

Kafka Streams 선택 기준:

  • Kafka에서 읽고 Kafka에 쓰는 단순한 처리
  • 별도 클러스터 운영이 부담스러운 경우
  • 마이크로서비스에 임베드하는 경우
  • 팀이 Kafka 생태계에 익숙한 경우

Apache Flink 선택 기준:

  • 복잡한 이벤트 처리(CEP)가 필요한 경우
  • Kafka 외 다양한 소스/싱크 연결이 필요한 경우
  • 대규모 상태 관리가 필요한 경우
  • SQL 기반 스트림 처리가 필요한 경우
  • 초대규모 처리량이 필요한 경우

핵심 차이: Kafka Streams는 라이브러리(JVM 앱에 임베드), Flink는 분산 클러스터입니다.


참고 자료

  1. Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/
  2. Narkhede, N., Shapira, G., Palino, T. (2021). Kafka: The Definitive Guide, 2nd ed. O'Reilly Media.
  3. Apache Flink Documentation. (2025). https://flink.apache.org/docs/
  4. Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/
  5. Hueske, F., Kalavri, V. (2019). Stream Processing with Apache Flink. O'Reilly Media.
  6. KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
  7. Debezium Documentation. (2025). https://debezium.io/documentation/
  8. Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
  9. Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."
  10. Apache Avro Specification. (2025). https://avro.apache.org/docs/
  11. Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow
  12. Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."
  13. Flink Forward Conference Talks. (2024). https://www.flink-forward.org/
  14. KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.

Event Streaming Deep Dive Guide 2025: Kafka Internals, Flink, Exactly-Once, Schema Evolution

Introduction: The Evolution of Event Streaming

Event Streaming is the core infrastructure of modern distributed systems. The event streaming ecosystem centered on Apache Kafka has evolved beyond simple message queues to become the foundation for real-time data pipelines, event sourcing, CQRS, and stream processing.

This guide provides an in-depth look at Kafka internals, Exactly-Once semantics, Schema Registry, and Apache Flink.


1. Kafka Internals Deep Dive

Partition Structure

A Kafka topic is divided into one or more partitions. Each partition is an immutable, ordered log.

Topic: orders (3 partitions)

Partition 0: [msg0] [msg3] [msg6] [msg9]  ... → offset increases
Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset increases
Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset increases

- Ordering guaranteed only within the same partition
- Partition count = max parallel consumers in a Consumer Group
- Partition key routes messages with the same key to the same partition

Segment Files and Indexes

Each partition is stored on disk as segment files.

/kafka-logs/orders-0/
├── 00000000000000000000.log      # Segment file (actual messages)
├── 00000000000000000000.index    # Offset index
├── 00000000000000000000.timeindex # Timestamp index
├── 00000000000005242880.log      # Next segment
├── 00000000000005242880.index
├── 00000000000005242880.timeindex
└── leader-epoch-checkpoint
Segment File Structure
========================

.log file (message storage)
┌──────────────────────────────────────┐
Batch Header- Base Offset: 0- Batch Length: 256 bytes           │
- Magic: 2 (Kafka 0.11+)- CRC: checksum                     │
- Attributes: compression, txn      │
- Producer ID: 12345- Producer Epoch: 0- Base Sequence: 0├──────────────────────────────────────┤
Record 0- Offset Delta: 0- Timestamp Delta: 0- Key: "order-123"- Value: (serialized payload)- Headers: [("source", "web")]├──────────────────────────────────────┤
Record 1- Offset Delta: 1- ...└──────────────────────────────────────┘

.index file (sparse index)
┌────────────────────────┐
Relative OffsetFilePosition├────────────────────────┤
00409632768819265536└────────────────────────┘

.timeindex file
┌────────────────────────────────┐
TimestampRelative Offset├────────────────────────────────┤
1700000000000017000000600004096└────────────────────────────────┘

Log Compaction

Log Compaction is a retention strategy that keeps only the latest value for each key.

Before compaction:
offset 0: key=A, value=v1
offset 1: key=B, value=v1
offset 2: key=A, value=v2  ← latest for A
offset 3: key=C, value=v1
offset 4: key=B, value=v2  ← latest for B
offset 5: key=A, value=v3  ← latest for A

After compaction:
offset 3: key=C, value=v1  (only value for C)
offset 4: key=B, value=v2  (latest for B)
offset 5: key=A, value=v3  (latest for A)
# Log Compaction configuration
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
min.cleanable.dirty.ratio=0.5

ISR (In-Sync Replicas)

ISR is the set of replicas that are synchronized with the leader.

Partition 0 (Replication Factor = 3)
======================================

Broker 1 (Leader):      [0] [1] [2] [3] [4] [5]LEO: 6
Broker 2 (Follower):    [0] [1] [2] [3] [4]LEO: 5
Broker 3 (Follower):    [0] [1] [2] [3] [4] [5]LEO: 6

ISR = {Broker 1, Broker 2, Broker 3}
HW (High Watermark) = 5 (minimum LEO within ISR)

- Consumers can only read up to HW
- If Broker 2 lags too much, it is removed from ISR
- Removed from ISR when exceeding replica.lag.time.max.ms
# ISR-related configuration
min.insync.replicas=2           # Minimum in-sync replicas
replica.lag.time.max.ms=30000   # ISR removal threshold
unclean.leader.election.enable=false  # Prevent non-ISR replica leader election

KRaft vs ZooKeeper

Starting from Kafka 3.x, KRaft mode was introduced to operate without ZooKeeper.

ZooKeeper Mode (Legacy)
==========================
[ZooKeeper Ensemble]
  ↕ metadata
[Controller (one of the Brokers)]
  ↕ leader election
[Broker 1] [Broker 2] [Broker 3]

Issues:
- Requires separate ZooKeeper operations
- Metadata synchronization delays
- Performance degradation as partition count increases


KRaft Mode (Kafka 3.x+)
==========================
[Controller Quorum]
  Broker 1 (Controller + Broker)
  Broker 2 (Controller + Broker)
  Broker 3 (Controller + Broker)

Advantages:
- Eliminates ZooKeeper dependency
- Improved metadata management performance
- Supports millions of partitions
- Reduced operational complexity
# KRaft configuration
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER

2. Producer Deep Dive

Partitioner

Producers use a partitioner to determine which partition a message goes to.

// Default partitioner behavior
public class DefaultPartitioner implements Partitioner {
    // With key: murmur2 hash
    // Without key: Sticky Partitioner (changes partition per batch)

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            // Sticky Partitioner: same partition until batch is full
            return stickyPartitionCache.partition(topic, cluster);
        }

        // Key-based partitioning: same key = same partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
// Custom partitioner example: region-based
public class RegionPartitioner implements Partitioner {
    private Map<String, Integer> regionMapping;

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String region = extractRegion(value);
        int numPartitions = cluster.partitionsForTopic(topic).size();

        return regionMapping.getOrDefault(region, 0) % numPartitions;
    }
}

Batching and Compression

# Producer batch settings
batch.size=16384                  # Batch size (bytes)
linger.ms=5                       # Batch wait time
buffer.memory=33554432            # Total buffer memory

# Compression settings
compression.type=lz4              # none, gzip, snappy, lz4, zstd
Producer Batch Processing Flow
==========================

send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]
                                         ┌───────────┴───────────┐
Partition 0 batch    │
[msg1][msg2][msg3]                                         ├───────────────────────┤
Partition 1 batch    │
[msg4][msg5]                                         └───────────┬───────────┘
                                              batch.size reached
                                              or linger.ms elapsed
                                              [Sender Thread]
                                              Compress + network send
                                              [Kafka Broker]

Idempotent Producer

# Idempotent producer settings
enable.idempotence=true    # Default since Kafka 3.0+
acks=all                   # Required for idempotency
retries=2147483647         # Infinite retries
max.in.flight.requests.per.connection=5  # Max 5 (idempotency range)
Idempotent Producer Operation
==========================

Producer                          Broker
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 0Message: "order-1"   ──────▶│  ✓ Stored
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✓ Stored
   │                                │
   │── (Resend due to network timeout)ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✗ Duplicate! Ignored
   │                                │

Transactional Producer

// Transactional producer example
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // Atomically send to multiple topics/partitions
    producer.send(new ProducerRecord<>("orders", "key1", "order-created"));
    producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));
    producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));

    // Consumer offsets can be included in the transaction
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);

    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

3. Consumer Deep Dive

Consumer Groups and Partition Assignment

Topic: orders (6 partitions)

Consumer Group: order-processor
===================================

Case 1: 3 Consumers
  Consumer A: [P0] [P1]
  Consumer B: [P2] [P3]
  Consumer C: [P4] [P5]

Case 2: 6 Consumers (ideal)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]

Case 3: 8 Consumers (2 idle)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]
  Consumer G: (idle)
  Consumer H: (idle)

Partition Assignment Strategies

// Partition assignment strategy comparison
Properties props = new Properties();

// 1. Range Assignor (default)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RangeAssignor");
// Divides partitions into ranges per topic
// Topic A: P0,P1,P2 -> C1: [P0,P1], C2: [P2]
// Topic B: P0,P1,P2 -> C1: [P0,P1], C2: [P2]
// Issue: skew toward C1

// 2. RoundRobin Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RoundRobinAssignor");
// Distributes all topic partitions in round-robin
// C1: [A-P0, B-P1], C2: [A-P1, B-P0], C3: [A-P2, B-P2]

// 3. Sticky Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.StickyAssignor");
// Maximizes existing assignment retention + even distribution
// Moves minimum partitions during rebalance

// 4. CooperativeSticky Assignor (recommended)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Sticky + incremental rebalancing
// Reassigns partitions without full stop

Rebalancing: Eager vs Incremental

Eager Rebalancing (legacy)
================================
1. Consumer C3 added
2. All consumers release partitions (Stop-the-World)
   C1: [] C2: [] C3: []
3. Calculate new assignment
4. Reassign partitions
   C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]
Full processing stop occurs!


Incremental Cooperative Rebalancing (recommended)
================================
1. Consumer C3 added
2. 1st rebalance: only release partitions that need to move
   C1: [P0,P1,P2]C1: [P0,P1]  (P2 released)
   C2: [P3,P4,P5]C2: [P3,P4]  (P5 released)
3. 2nd rebalance: assign released partitions to C3
   C3: []C3: [P2,P5]
Remaining partitions continue processing without interruption!

Offset Management

// Auto commit (default)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// Risk: if committed before processing, messages can be lost

// Manual commit (recommended)
props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        // Process message
        processRecord(record);
    }

    // Synchronous commit (after processing complete)
    consumer.commitSync();

    // Or async commit (performance improvement)
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("Commit failed", exception);
        }
    });
}
Offset Storage: __consumer_offsets topic
============================================

Key: (group_id, topic, partition)
Value: (offset, metadata, timestamp)

Example:
Key: (order-processor, orders, 0)
Value: (offset=42, metadata="", timestamp=1700000000)

4. Exactly-Once Semantics

Message Delivery Guarantees

At-Most-Once
================================
Producer ──send()──▶ Broker
  (acks=0, no confirmation)
- Messages can be lost
- No duplicates
- Fastest

At-Least-Once
================================
Producer ──send()──▶ Broker ──ack──▶ Producer
  (acks=all, with retries)
- No message loss
- Duplicates possible
- Most default configurations

Exactly-Once
================================
Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)
  (idempotent producer + transactions + read_committed)
- No message loss
- No duplicates
- Strongest but with overhead

Exactly-Once Implementation

// Exactly-Once: Consume-Transform-Produce pattern
Properties producerProps = new Properties();
producerProps.put("transactional.id", "eos-processor-1");
producerProps.put("enable.idempotence", "true");

Properties consumerProps = new Properties();
consumerProps.put("group.id", "eos-group");
consumerProps.put("isolation.level", "read_committed");  // Key!
consumerProps.put("enable.auto.commit", "false");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

producer.initTransactions();
consumer.subscribe(Collections.singleton("input-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (!records.isEmpty()) {
        producer.beginTransaction();

        try {
            for (ConsumerRecord<String, String> record : records) {
                // Transform
                String result = transform(record.value());

                // Produce to output topic
                producer.send(new ProducerRecord<>(
                    "output-topic", record.key(), result));
            }

            // Include consumer offsets in transaction
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partRecords =
                    records.records(partition);
                long lastOffset = partRecords.get(
                    partRecords.size() - 1).offset();
                offsets.put(partition,
                    new OffsetAndMetadata(lastOffset + 1));
            }

            producer.sendOffsetsToTransaction(offsets,
                consumer.groupMetadata());
            producer.commitTransaction();

        } catch (Exception e) {
            producer.abortTransaction();
        }
    }
}
Exactly-Once Transaction Flow
================================

1. beginTransaction()
2. send(output-topic, msg1)Temporarily stored on Broker (uncommitted)
3. send(output-topic, msg2)Temporarily stored
4. sendOffsetsToTransaction()Consumer offsets included in transaction
5. commitTransaction()All messages + offsets atomically committed

read_committed consumers only read committed messages
If failure occurs mid-way, abortTransaction() rolls back everything

5. Schema Registry

Serialization Format Comparison

FeatureAvroProtobufJSON Schema
Serialized SizeSmallSmallLarge
Schema EvolutionExcellentExcellentAverage
Code GenerationOptionalRequiredNot needed
ReadabilityLow (binary)Low (binary)High (text)
Dynamic TypingSupportedNot supportedSupported
Kafka CompatibilityBestExcellentAverage

Avro Schema and Evolution

// Version 1: Initial schema
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"}
  ]
}
// Version 2: Backward Compatible evolution
// New fields with default values → existing consumers can read new data
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"},
    {"name": "status", "type": "string", "default": "CREATED"},
    {"name": "region", "type": ["null", "string"], "default": null}
  ]
}

Compatibility Modes

BACKWARD (default, recommended)
================================
- New schema can read data written with old schema
- New fields: default value required
- Field removal: allowed
- Upgrade consumers first → then producers

FORWARD
================================
- Old schema can read data written with new schema
- New fields: default value required
- Field removal: only fields with default values
- Upgrade producers first → then consumers

FULL
================================
- Both BACKWARD and FORWARD satisfied
- Safest but most restrictive

NONE
================================
- No compatibility checks
- Dangerous! Do not use in production
# Schema Registry API usage
import requests

SCHEMA_REGISTRY_URL = "http://schema-registry:8081"

# Register schema
schema = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"}
    ]
}

response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",
    json={"schema": json.dumps(schema), "schemaType": "AVRO"},
)

# Check compatibility
response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",
    json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},
)
compatibility = response.json()
print(f"Compatible: {compatibility['is_compatible']}")

# Set compatibility mode
requests.put(
    f"{SCHEMA_REGISTRY_URL}/config/orders-value",
    json={"compatibility": "BACKWARD"},
)

Protobuf Schema Evolution

// Version 1
syntax = "proto3";
package com.company.events;

message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
}

// Version 2 (maintaining compatibility)
message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
  // New fields: never reuse old field numbers!
  string status = 6;          // newly added
  string region = 7;          // newly added
  // reserved 2;              // reserve deleted field numbers
}

┌─────────────────────────────────────────────┐
Flink Cluster├─────────────────────────────────────────────┤
│                                             │
[JobManager]- Job scheduling                         │
- Checkpoint coordination                │
- Failure recovery                       │
│                                             │
[TaskManager 1]     [TaskManager 2]- Task Slot 1       - Task Slot 1- Task Slot 2       - Task Slot 2- Task Slot 3       - Task Slot 3│                                             │
├─────────────────────────────────────────────┤
[State Backend]- RocksDB (large state)- HashMap (small state)│                                             │
[Checkpoint Storage]- S3 / HDFS / GCS└─────────────────────────────────────────────┘

DataStream API

// Flink DataStream API example
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka Source
KafkaSource<Order> source = KafkaSource.<Order>builder()
    .setBootstrapServers("broker1:9092")
    .setTopics("orders")
    .setGroupId("flink-order-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new OrderDeserializer())
    .build();

DataStream<Order> orders = env.fromSource(
    source, WatermarkStrategy
        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((order, ts) -> order.getCreatedAt()),
    "Kafka Source"
);

// Processing pipeline
DataStream<OrderStats> stats = orders
    .filter(order -> order.getStatus().equals("COMPLETED"))
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new OrderStatsAggregator());

// Kafka Sink
KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()
    .setBootstrapServers("broker1:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("order-stats")
        .setValueSerializationSchema(new OrderStatsSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-order-stats")
    .build();

stats.sinkTo(sink);
env.execute("Order Statistics Pipeline");

Windowing

Tumbling Window
================================
Time:   ──|──1──2──3──|──4──5──6──|──7──8──9──|──
Windows:  [  Window 1  ][  Window 2  ][  Window 3  ]
- Fixed size, non-overlapping
- Example: aggregate every 5 minutes


Sliding Window
================================
Time:   ──|──1──2──3──4──5──6──7──8──|──
Windows:  [    Window 1    ]
            [    Window 2    ]
                [    Window 3    ]
- Fixed size, slides by interval
- Example: 10-minute window, 5-minute slide


Session Window
================================
Time:   ──12──3─────────56──7─────────9──
Windows:  [Session 1]     [Session 2]     [S3]
                  gap          gap
- Activity-based, variable size
- Window closes after gap of inactivity
// Window examples
// Tumbling Window
orders.keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount");

// Sliding Window
orders.keyBy(Order::getRegion)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .sum("amount");

// Session Window
orders.keyBy(Order::getCustomerId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator());

Watermarks and Late Data

Event Time vs Processing Time
================================

Event generation time(Event Time):   t=10  t=12  t=11  t=15  t=13
Processing time(Processing Time):    T=20  T=21  T=22  T=23  T=24

Events do not arrive in order!
Watermarks indicate "all events up to this point should have arrived"


Watermark Operation
================================

Events:     [t=10] [t=12] [t=11] [t=15] [t=13]
Watermarks: W(10)  W(12)  W(12)  W(15)  W(15)
            (with maxOutOfOrderness = 5 seconds)
            Actual watermarks: W(5)  W(7)  W(7)  W(10) W(10)

Window [0, 10): fires when watermark exceeds 10
// Watermark strategy
WatermarkStrategy<Order> strategy = WatermarkStrategy
    // Allow up to 10 seconds of late arrivals
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((order, timestamp) -> order.getCreatedAt())
    // Advance watermark if no events for 5 minutes
    .withIdleness(Duration.ofMinutes(5));

// Late data handling
OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};

SingleOutputStreamOperator<OrderStats> stats = orders
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Wait 1 more minute
    .sideOutputLateData(lateOutputTag)  // After that, to side output
    .aggregate(new OrderStatsAggregator());

// Handle late data separately
DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);
lateOrders.addSink(new LateOrderAlertSink());

State Management and Checkpointing

// Keyed State example
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

    // ValueState: single value per key
    private ValueState<Boolean> flagState;

    // ListState: list per key
    private ListState<Transaction> recentTransactions;

    // MapState: map per key
    private MapState<String, Integer> merchantCounts;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor =
            new ValueStateDescriptor<>("flag", Boolean.class);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ListStateDescriptor<Transaction> listDescriptor =
            new ListStateDescriptor<>("recent-txns", Transaction.class);
        recentTransactions = getRuntimeContext().getListState(listDescriptor);

        MapStateDescriptor<String, Integer> mapDescriptor =
            new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
        merchantCounts = getRuntimeContext().getMapState(mapDescriptor);
    }

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
        Boolean flag = flagState.value();

        if (flag != null && txn.getAmount() > 10000) {
            out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));
        }

        if (txn.getAmount() < 1.0) {
            flagState.update(true);
            // Timer to clear flag after 1 minute
            ctx.timerService().registerEventTimeTimer(
                txn.getTimestamp() + 60000);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        flagState.clear();
    }
}
Checkpoint Operation
================================

[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]

1. JobManager: inject checkpoint barrier
   Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink

2. State snapshot taken as barrier passes each operator
   Source(snapshot) ──▶ Map(snapshot) ──▶ ...

3. All operator snapshots complete → checkpoint success

4. On failure: restore from last successful checkpoint
// Checkpoint configuration
env.enableCheckpointing(60000);  // 60-second interval
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// State Backend configuration
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");

Comparison Table

FeatureKafka StreamsApache Flink
DeploymentLibrary (embedded in JVM app)Distributed cluster
Input SourcesKafka onlyKafka, Kinesis, files, etc.
Exactly-OnceKafka internal onlyIncluding external systems
WindowsTumbling, Sliding, Session+ Global, Custom
State ManagementRocksDB (local)RocksDB + distributed snapshots
SQL SupportKSQL (separate)Flink SQL (built-in)
ThroughputMedium-HighHigh-Ultra
Operational ComplexityLow (app deployment)High (cluster management)
Best ForProcessing within Kafka ecosystemComplex stream processing

Selection Criteria

use_kafka_streams_when:
  - "Simple Kafka-to-Kafka processing"
  - "Separate cluster operations are burdensome"
  - "Embedding inside microservices"
  - "Medium throughput is sufficient"
  - "Team is familiar with Kafka ecosystem"

use_flink_when:
  - "Complex event processing (CEP)"
  - "Connecting diverse sources/sinks"
  - "Large-scale state management needed"
  - "Advanced window functions needed"
  - "SQL-based stream processing"
  - "Ultra-high throughput required"

8. Kafka Connect

Source/Sink Connectors

// Debezium MySQL Source Connector
{
  "name": "mysql-source-orders",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-primary",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz-password",
    "database.server.id": "1",
    "topic.prefix": "cdc",
    "database.include.list": "order_service",
    "table.include.list": "order_service.orders,order_service.order_items",
    "schema.history.internal.kafka.bootstrap.servers": "broker:9092",
    "schema.history.internal.kafka.topic": "schema-changes.orders",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "cdc\\.order_service\\.(.*)",
    "transforms.route.replacement": "order.$1.cdc"
  }
}
// Elasticsearch Sink Connector
{
  "name": "es-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "order.orders.cdc",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "transforms": "extractKey",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "order_id"
  }
}

Debezium CDC

Debezium CDC Flow
================================

[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]
                                    ┌───────────────┤
                                    ▼               ▼
                              [Flink Job]    [ES Sink Connector]
                                    │               │
                                    ▼               ▼
                              [Kafka Topic]  [Elasticsearch]

CDC Event Structure:
- before: record before change (UPDATE/DELETE)
- after: record after change (INSERT/UPDATE)
- source: source DB metadata
- op: operation type (c=create, u=update, d=delete, r=read)

9. Performance Tuning

Producer Tuning

# Producer performance optimization
batch.size=65536              # 64KB (default 16KB)
linger.ms=10                  # Wait 10ms (batch efficiency)
buffer.memory=67108864        # 64MB
compression.type=lz4          # Compression (saves network/disk)
acks=all                      # Reliability
max.in.flight.requests.per.connection=5  # Pipelining
send.buffer.bytes=131072      # Socket buffer

Consumer Tuning

# Consumer performance optimization
fetch.min.bytes=1048576       # 1MB (batch efficiency)
fetch.max.wait.ms=500         # Wait up to 500ms
max.poll.records=500          # Records per poll
max.partition.fetch.bytes=1048576  # 1MB per partition
session.timeout.ms=30000      # Session timeout
heartbeat.interval.ms=10000   # Heartbeat interval
max.poll.interval.ms=300000   # Max poll interval 5min

Broker Tuning

# Broker performance optimization
num.partitions=12                    # Default partition count
default.replication.factor=3         # Replication factor
num.io.threads=8                     # I/O threads
num.network.threads=3                # Network threads
num.replica.fetchers=4               # Replica fetchers
socket.send.buffer.bytes=102400      # Socket send buffer
socket.receive.buffer.bytes=102400   # Socket receive buffer
log.segment.bytes=1073741824         # 1GB segments
log.retention.hours=168              # 7-day retention
log.retention.bytes=-1               # No size limit

Determining Partition Count

Partition Count Formula
================================

Required throughput: 100MB/s
Single partition throughput: ~10MB/s (Producer)
                             ~5MB/s (Consumer)

Producer perspective: 100MB/s / 10MB/s = 10 partitions
Consumer perspective: 100MB/s / 5MB/s = 20 partitions

Minimum 20 partitions needed

Additional considerations:
- Match consumer instance count (partitions >= consumers)
- Partition count can increase but cannot decrease
- Too many partitions: metadata overhead, rebalancing delay
- Recommendation: 6-50 per topic

10. Monitoring

JMX Metrics

# Key Kafka metrics
broker_metrics:
  - name: "UnderReplicatedPartitions"
    description: "Partitions fallen out of ISR"
    alert_threshold: "> 0"
    severity: critical

  - name: "ActiveControllerCount"
    description: "Number of active controllers"
    alert_threshold: "!= 1"
    severity: critical

  - name: "OfflinePartitionsCount"
    description: "Number of offline partitions"
    alert_threshold: "> 0"
    severity: critical

  - name: "RequestsPerSec"
    description: "Requests per second"
    alert_threshold: "> 10000"
    severity: warning

producer_metrics:
  - name: "record-send-rate"
    description: "Records sent per second"

  - name: "record-error-rate"
    description: "Record errors per second"
    alert_threshold: "> 0"

  - name: "request-latency-avg"
    description: "Average request latency"
    alert_threshold: "> 100ms"

consumer_metrics:
  - name: "records-lag-max"
    description: "Maximum consumer lag"
    alert_threshold: "> 10000"
    severity: warning

  - name: "records-consumed-rate"
    description: "Records consumed per second"

  - name: "commit-latency-avg"
    description: "Average commit latency"

Consumer Lag Monitoring

Consumer Lag Calculation
================================

Partition 0:
  Log End Offset (LEO): 1000  (latest message)
  Consumer Offset:       850  (last committed)
  Lag: 1000 - 850 = 150

Partition 1:
  LEO: 2000
  Consumer Offset: 1950
  Lag: 50

Total Lag = 150 + 50 = 200 messages
# Burrow (LinkedIn's Consumer Lag Monitoring)
burrow_config:
  general:
    pidfile: "/var/run/burrow.pid"
    stdout-logfile: "/var/log/burrow.log"

  cluster:
    kafka-prod:
      class-name: kafka
      servers: ["broker1:9092", "broker2:9092", "broker3:9092"]
      topic-refresh: 120
      offset-refresh: 30

  consumer:
    kafka-prod:
      class-name: kafka
      cluster: kafka-prod
      servers: ["broker1:9092", "broker2:9092"]
      group-denylist: "^(console-consumer|_)"
      start-latest: true

  notifier:
    slack:
      class-name: http
      url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"
      template-open: |
        Consumer group lag alert:
        Group: {{.GroupName}}
        Status: {{.Status}}
      send-close: true

Grafana Dashboard

Kafka Monitoring Dashboard Panels
================================

1. Cluster Overview
   - Broker count / status
   - Total partition count
   - Under-replicated partitions
   - Active Controller

2. Throughput
   - Messages In/Out per sec
   - Bytes In/Out per sec
   - Requests per sec

3. Consumer Groups
   - Consumer Lag (per group)
   - Consume Rate
   - Rebalance count

4. Latency
   - Produce Request Latency (p50, p99)
   - Fetch Request Latency
   - End-to-End Latency

5. Resources
   - Disk Usage per Broker
   - Network I/O
   - CPU / Memory

11. Quiz

Q1. Why do messages with the same key always go to the same partition in Kafka?

Because the default partitioner uses the murmur2 hash function to determine the partition by taking the hash of the key modulo the number of partitions.

partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

The same key always produces the same hash value, so it is assigned to the same partition. This ensures ordering of messages with the same key.

Note that if the partition count changes, key-to-partition mapping may change.

Q2. What are the 3 elements required to achieve Exactly-Once Semantics?
  1. Idempotent Producer: With enable.idempotence=true, uses ProducerID and Sequence Number to prevent duplicate sends.

  2. Transactional Producer: Configure transactional.id and use beginTransaction(), commitTransaction() to atomically send multiple messages.

  3. read_committed isolation level: Set isolation.level=read_committed on the consumer to only read messages from committed transactions.

These three combine to guarantee exactly-once processing in the Consume-Transform-Produce pattern.

Q3. What is required when adding a new field in Avro BACKWARD compatibility?

A default value is required.

BACKWARD compatibility means the new schema (v2) must be able to read data written with the old schema (v1). Since old data does not have the new field, without a default value, deserialization will fail.

Example: "name": "status", "type": "string", "default": "CREATED"

For optional fields, use a ["null", "string"] union type with "default": null.

Q4. What problem do Flink watermarks solve?

Watermarks solve the late-arriving data problem in event time-based processing.

In distributed systems, events may arrive out of order relative to when they occurred. Watermarks provide an estimate that all events up to a certain point have arrived.

W(t) = "Estimate that all events before time t have arrived"

When the watermark exceeds a window's end time, that window's computation results are emitted. Setting allowedLateness allows late data to be accepted for a period even after the watermark has passed.

Q5. What are the criteria for choosing between Kafka Streams and Apache Flink?

Choose Kafka Streams when:

  • Simple Kafka-to-Kafka processing
  • Separate cluster operations are burdensome
  • Embedding inside microservices
  • Team is familiar with the Kafka ecosystem

Choose Apache Flink when:

  • Complex Event Processing (CEP) is needed
  • Connecting diverse sources/sinks beyond Kafka
  • Large-scale state management is required
  • SQL-based stream processing is needed
  • Ultra-high throughput is required

Key difference: Kafka Streams is a library (embedded in JVM apps), while Flink is a distributed cluster.


References

  1. Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/
  2. Narkhede, N., Shapira, G., Palino, T. (2021). Kafka: The Definitive Guide, 2nd ed. O'Reilly Media.
  3. Apache Flink Documentation. (2025). https://flink.apache.org/docs/
  4. Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/
  5. Hueske, F., Kalavri, V. (2019). Stream Processing with Apache Flink. O'Reilly Media.
  6. KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
  7. Debezium Documentation. (2025). https://debezium.io/documentation/
  8. Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
  9. Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."
  10. Apache Avro Specification. (2025). https://avro.apache.org/docs/
  11. Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow
  12. Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."
  13. Flink Forward Conference Talks. (2024). https://www.flink-forward.org/
  14. KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.