- Published on
Event Streaming Deep Dive Guide 2025: Kafka Internals, Flink, Exactly-Once, Schema Evolution
- Authors

- Name
- Youngju Kim
- @fjvbn20031
Introduction: The Evolution of Event Streaming
Event Streaming is the core infrastructure of modern distributed systems. The event streaming ecosystem centered on Apache Kafka has evolved beyond simple message queues to become the foundation for real-time data pipelines, event sourcing, CQRS, and stream processing.
This guide provides an in-depth look at Kafka internals, Exactly-Once semantics, Schema Registry, and Apache Flink.
1. Kafka Internals Deep Dive
Partition Structure
A Kafka topic is divided into one or more partitions. Each partition is an immutable, ordered log.
Topic: orders (3 partitions)
Partition 0: [msg0] [msg3] [msg6] [msg9] ... → offset increases
Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset increases
Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset increases
- Ordering guaranteed only within the same partition
- Partition count = max parallel consumers in a Consumer Group
- Partition key routes messages with the same key to the same partition
Segment Files and Indexes
Each partition is stored on disk as segment files.
/kafka-logs/orders-0/
├── 00000000000000000000.log # Segment file (actual messages)
├── 00000000000000000000.index # Offset index
├── 00000000000000000000.timeindex # Timestamp index
├── 00000000000005242880.log # Next segment
├── 00000000000005242880.index
├── 00000000000005242880.timeindex
└── leader-epoch-checkpoint
Segment File Structure
========================
.log file (message storage)
┌──────────────────────────────────────┐
│ Batch Header │
│ - Base Offset: 0 │
│ - Batch Length: 256 bytes │
│ - Magic: 2 (Kafka 0.11+) │
│ - CRC: checksum │
│ - Attributes: compression, txn │
│ - Producer ID: 12345 │
│ - Producer Epoch: 0 │
│ - Base Sequence: 0 │
├──────────────────────────────────────┤
│ Record 0 │
│ - Offset Delta: 0 │
│ - Timestamp Delta: 0 │
│ - Key: "order-123" │
│ - Value: (serialized payload) │
│ - Headers: [("source", "web")] │
├──────────────────────────────────────┤
│ Record 1 │
│ - Offset Delta: 1 │
│ - ... │
└──────────────────────────────────────┘
.index file (sparse index)
┌────────────────────────┐
│ Relative Offset → File │
│ Position │
├────────────────────────┤
│ 0 → 0 │
│ 4096 → 32768 │
│ 8192 → 65536 │
└────────────────────────┘
.timeindex file
┌────────────────────────────────┐
│ Timestamp → Relative Offset │
├────────────────────────────────┤
│ 1700000000000 → 0 │
│ 1700000060000 → 4096 │
└────────────────────────────────┘
Log Compaction
Log Compaction is a retention strategy that keeps only the latest value for each key.
Before compaction:
offset 0: key=A, value=v1
offset 1: key=B, value=v1
offset 2: key=A, value=v2 ← latest for A
offset 3: key=C, value=v1
offset 4: key=B, value=v2 ← latest for B
offset 5: key=A, value=v3 ← latest for A
After compaction:
offset 3: key=C, value=v1 (only value for C)
offset 4: key=B, value=v2 (latest for B)
offset 5: key=A, value=v3 (latest for A)
# Log Compaction configuration
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
min.cleanable.dirty.ratio=0.5
ISR (In-Sync Replicas)
ISR is the set of replicas that are synchronized with the leader.
Partition 0 (Replication Factor = 3)
======================================
Broker 1 (Leader): [0] [1] [2] [3] [4] [5] ← LEO: 6
Broker 2 (Follower): [0] [1] [2] [3] [4] ← LEO: 5
Broker 3 (Follower): [0] [1] [2] [3] [4] [5] ← LEO: 6
ISR = {Broker 1, Broker 2, Broker 3}
HW (High Watermark) = 5 (minimum LEO within ISR)
- Consumers can only read up to HW
- If Broker 2 lags too much, it is removed from ISR
- Removed from ISR when exceeding replica.lag.time.max.ms
# ISR-related configuration
min.insync.replicas=2 # Minimum in-sync replicas
replica.lag.time.max.ms=30000 # ISR removal threshold
unclean.leader.election.enable=false # Prevent non-ISR replica leader election
KRaft vs ZooKeeper
Starting from Kafka 3.x, KRaft mode was introduced to operate without ZooKeeper.
ZooKeeper Mode (Legacy)
==========================
[ZooKeeper Ensemble]
↕ metadata
[Controller (one of the Brokers)]
↕ leader election
[Broker 1] [Broker 2] [Broker 3]
Issues:
- Requires separate ZooKeeper operations
- Metadata synchronization delays
- Performance degradation as partition count increases
KRaft Mode (Kafka 3.x+)
==========================
[Controller Quorum]
Broker 1 (Controller + Broker)
Broker 2 (Controller + Broker)
Broker 3 (Controller + Broker)
Advantages:
- Eliminates ZooKeeper dependency
- Improved metadata management performance
- Supports millions of partitions
- Reduced operational complexity
# KRaft configuration
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER
2. Producer Deep Dive
Partitioner
Producers use a partitioner to determine which partition a message goes to.
// Default partitioner behavior
public class DefaultPartitioner implements Partitioner {
// With key: murmur2 hash
// Without key: Sticky Partitioner (changes partition per batch)
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// Sticky Partitioner: same partition until batch is full
return stickyPartitionCache.partition(topic, cluster);
}
// Key-based partitioning: same key = same partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// Custom partitioner example: region-based
public class RegionPartitioner implements Partitioner {
private Map<String, Integer> regionMapping;
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String region = extractRegion(value);
int numPartitions = cluster.partitionsForTopic(topic).size();
return regionMapping.getOrDefault(region, 0) % numPartitions;
}
}
Batching and Compression
# Producer batch settings
batch.size=16384 # Batch size (bytes)
linger.ms=5 # Batch wait time
buffer.memory=33554432 # Total buffer memory
# Compression settings
compression.type=lz4 # none, gzip, snappy, lz4, zstd
Producer Batch Processing Flow
==========================
send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]
│
┌───────────┴───────────┐
│ Partition 0 batch │
│ [msg1][msg2][msg3] │
├───────────────────────┤
│ Partition 1 batch │
│ [msg4][msg5] │
└───────────┬───────────┘
│
batch.size reached
or linger.ms elapsed
│
▼
[Sender Thread]
│
Compress + network send
│
▼
[Kafka Broker]
Idempotent Producer
# Idempotent producer settings
enable.idempotence=true # Default since Kafka 3.0+
acks=all # Required for idempotency
retries=2147483647 # Infinite retries
max.in.flight.requests.per.connection=5 # Max 5 (idempotency range)
Idempotent Producer Operation
==========================
Producer Broker
│ │
│── ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 0 │
│ Message: "order-1" ──────▶│ ✓ Stored
│ │
│── ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 1 │
│ Message: "order-2" ──────▶│ ✓ Stored
│ │
│── (Resend due to network timeout) │
│ ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 1 │
│ Message: "order-2" ──────▶│ ✗ Duplicate! Ignored
│ │
Transactional Producer
// Transactional producer example
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// Atomically send to multiple topics/partitions
producer.send(new ProducerRecord<>("orders", "key1", "order-created"));
producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));
producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));
// Consumer offsets can be included in the transaction
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
3. Consumer Deep Dive
Consumer Groups and Partition Assignment
Topic: orders (6 partitions)
Consumer Group: order-processor
===================================
Case 1: 3 Consumers
Consumer A: [P0] [P1]
Consumer B: [P2] [P3]
Consumer C: [P4] [P5]
Case 2: 6 Consumers (ideal)
Consumer A: [P0]
Consumer B: [P1]
Consumer C: [P2]
Consumer D: [P3]
Consumer E: [P4]
Consumer F: [P5]
Case 3: 8 Consumers (2 idle)
Consumer A: [P0]
Consumer B: [P1]
Consumer C: [P2]
Consumer D: [P3]
Consumer E: [P4]
Consumer F: [P5]
Consumer G: (idle)
Consumer H: (idle)
Partition Assignment Strategies
// Partition assignment strategy comparison
Properties props = new Properties();
// 1. Range Assignor (default)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor");
// Divides partitions into ranges per topic
// Topic A: P0,P1,P2 -> C1: [P0,P1], C2: [P2]
// Topic B: P0,P1,P2 -> C1: [P0,P1], C2: [P2]
// Issue: skew toward C1
// 2. RoundRobin Assignor
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// Distributes all topic partitions in round-robin
// C1: [A-P0, B-P1], C2: [A-P1, B-P0], C3: [A-P2, B-P2]
// 3. Sticky Assignor
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.StickyAssignor");
// Maximizes existing assignment retention + even distribution
// Moves minimum partitions during rebalance
// 4. CooperativeSticky Assignor (recommended)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Sticky + incremental rebalancing
// Reassigns partitions without full stop
Rebalancing: Eager vs Incremental
Eager Rebalancing (legacy)
================================
1. Consumer C3 added
2. All consumers release partitions (Stop-the-World)
C1: [] C2: [] C3: []
3. Calculate new assignment
4. Reassign partitions
C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]
→ Full processing stop occurs!
Incremental Cooperative Rebalancing (recommended)
================================
1. Consumer C3 added
2. 1st rebalance: only release partitions that need to move
C1: [P0,P1,P2] → C1: [P0,P1] (P2 released)
C2: [P3,P4,P5] → C2: [P3,P4] (P5 released)
3. 2nd rebalance: assign released partitions to C3
C3: [] → C3: [P2,P5]
→ Remaining partitions continue processing without interruption!
Offset Management
// Auto commit (default)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// Risk: if committed before processing, messages can be lost
// Manual commit (recommended)
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process message
processRecord(record);
}
// Synchronous commit (after processing complete)
consumer.commitSync();
// Or async commit (performance improvement)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
}
Offset Storage: __consumer_offsets topic
============================================
Key: (group_id, topic, partition)
Value: (offset, metadata, timestamp)
Example:
Key: (order-processor, orders, 0)
Value: (offset=42, metadata="", timestamp=1700000000)
4. Exactly-Once Semantics
Message Delivery Guarantees
At-Most-Once
================================
Producer ──send()──▶ Broker
(acks=0, no confirmation)
- Messages can be lost
- No duplicates
- Fastest
At-Least-Once
================================
Producer ──send()──▶ Broker ──ack──▶ Producer
(acks=all, with retries)
- No message loss
- Duplicates possible
- Most default configurations
Exactly-Once
================================
Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)
(idempotent producer + transactions + read_committed)
- No message loss
- No duplicates
- Strongest but with overhead
Exactly-Once Implementation
// Exactly-Once: Consume-Transform-Produce pattern
Properties producerProps = new Properties();
producerProps.put("transactional.id", "eos-processor-1");
producerProps.put("enable.idempotence", "true");
Properties consumerProps = new Properties();
consumerProps.put("group.id", "eos-group");
consumerProps.put("isolation.level", "read_committed"); // Key!
consumerProps.put("enable.auto.commit", "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Collections.singleton("input-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// Transform
String result = transform(record.value());
// Produce to output topic
producer.send(new ProducerRecord<>(
"output-topic", record.key(), result));
}
// Include consumer offsets in transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords =
records.records(partition);
long lastOffset = partRecords.get(
partRecords.size() - 1).offset();
offsets.put(partition,
new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets,
consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
Exactly-Once Transaction Flow
================================
1. beginTransaction()
2. send(output-topic, msg1) → Temporarily stored on Broker (uncommitted)
3. send(output-topic, msg2) → Temporarily stored
4. sendOffsetsToTransaction() → Consumer offsets included in transaction
5. commitTransaction() → All messages + offsets atomically committed
read_committed consumers only read committed messages
→ If failure occurs mid-way, abortTransaction() rolls back everything
5. Schema Registry
Serialization Format Comparison
| Feature | Avro | Protobuf | JSON Schema |
|---|---|---|---|
| Serialized Size | Small | Small | Large |
| Schema Evolution | Excellent | Excellent | Average |
| Code Generation | Optional | Required | Not needed |
| Readability | Low (binary) | Low (binary) | High (text) |
| Dynamic Typing | Supported | Not supported | Supported |
| Kafka Compatibility | Best | Excellent | Average |
Avro Schema and Evolution
// Version 1: Initial schema
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "created_at", "type": "long"}
]
}
// Version 2: Backward Compatible evolution
// New fields with default values → existing consumers can read new data
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "created_at", "type": "long"},
{"name": "status", "type": "string", "default": "CREATED"},
{"name": "region", "type": ["null", "string"], "default": null}
]
}
Compatibility Modes
BACKWARD (default, recommended)
================================
- New schema can read data written with old schema
- New fields: default value required
- Field removal: allowed
- Upgrade consumers first → then producers
FORWARD
================================
- Old schema can read data written with new schema
- New fields: default value required
- Field removal: only fields with default values
- Upgrade producers first → then consumers
FULL
================================
- Both BACKWARD and FORWARD satisfied
- Safest but most restrictive
NONE
================================
- No compatibility checks
- Dangerous! Do not use in production
# Schema Registry API usage
import requests
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
# Register schema
schema = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",
json={"schema": json.dumps(schema), "schemaType": "AVRO"},
)
# Check compatibility
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",
json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},
)
compatibility = response.json()
print(f"Compatible: {compatibility['is_compatible']}")
# Set compatibility mode
requests.put(
f"{SCHEMA_REGISTRY_URL}/config/orders-value",
json={"compatibility": "BACKWARD"},
)
Protobuf Schema Evolution
// Version 1
syntax = "proto3";
package com.company.events;
message Order {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
int64 created_at = 5;
}
// Version 2 (maintaining compatibility)
message Order {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
int64 created_at = 5;
// New fields: never reuse old field numbers!
string status = 6; // newly added
string region = 7; // newly added
// reserved 2; // reserve deleted field numbers
}
6. Apache Flink Deep Dive
Flink Architecture Overview
┌─────────────────────────────────────────────┐
│ Flink Cluster │
├─────────────────────────────────────────────┤
│ │
│ [JobManager] │
│ - Job scheduling │
│ - Checkpoint coordination │
│ - Failure recovery │
│ │
│ [TaskManager 1] [TaskManager 2] │
│ - Task Slot 1 - Task Slot 1 │
│ - Task Slot 2 - Task Slot 2 │
│ - Task Slot 3 - Task Slot 3 │
│ │
├─────────────────────────────────────────────┤
│ [State Backend] │
│ - RocksDB (large state) │
│ - HashMap (small state) │
│ │
│ [Checkpoint Storage] │
│ - S3 / HDFS / GCS │
└─────────────────────────────────────────────┘
DataStream API
// Flink DataStream API example
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka Source
KafkaSource<Order> source = KafkaSource.<Order>builder()
.setBootstrapServers("broker1:9092")
.setTopics("orders")
.setGroupId("flink-order-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(
OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new OrderDeserializer())
.build();
DataStream<Order> orders = env.fromSource(
source, WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, ts) -> order.getCreatedAt()),
"Kafka Source"
);
// Processing pipeline
DataStream<OrderStats> stats = orders
.filter(order -> order.getStatus().equals("COMPLETED"))
.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderStatsAggregator());
// Kafka Sink
KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()
.setBootstrapServers("broker1:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("order-stats")
.setValueSerializationSchema(new OrderStatsSerializer())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-order-stats")
.build();
stats.sinkTo(sink);
env.execute("Order Statistics Pipeline");
Windowing
Tumbling Window
================================
Time: ──|──1──2──3──|──4──5──6──|──7──8──9──|──
Windows: [ Window 1 ][ Window 2 ][ Window 3 ]
- Fixed size, non-overlapping
- Example: aggregate every 5 minutes
Sliding Window
================================
Time: ──|──1──2──3──4──5──6──7──8──|──
Windows: [ Window 1 ]
[ Window 2 ]
[ Window 3 ]
- Fixed size, slides by interval
- Example: 10-minute window, 5-minute slide
Session Window
================================
Time: ──1─2──3─────────5─6──7─────────9──
Windows: [Session 1] [Session 2] [S3]
gap gap
- Activity-based, variable size
- Window closes after gap of inactivity
// Window examples
// Tumbling Window
orders.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount");
// Sliding Window
orders.keyBy(Order::getRegion)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.sum("amount");
// Session Window
orders.keyBy(Order::getCustomerId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregator());
Watermarks and Late Data
Event Time vs Processing Time
================================
Event generation time(Event Time): t=10 t=12 t=11 t=15 t=13
Processing time(Processing Time): T=20 T=21 T=22 T=23 T=24
Events do not arrive in order!
→ Watermarks indicate "all events up to this point should have arrived"
Watermark Operation
================================
Events: [t=10] [t=12] [t=11] [t=15] [t=13]
Watermarks: W(10) W(12) W(12) W(15) W(15)
(with maxOutOfOrderness = 5 seconds)
Actual watermarks: W(5) W(7) W(7) W(10) W(10)
Window [0, 10): fires when watermark exceeds 10
// Watermark strategy
WatermarkStrategy<Order> strategy = WatermarkStrategy
// Allow up to 10 seconds of late arrivals
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, timestamp) -> order.getCreatedAt())
// Advance watermark if no events for 5 minutes
.withIdleness(Duration.ofMinutes(5));
// Late data handling
OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};
SingleOutputStreamOperator<OrderStats> stats = orders
.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // Wait 1 more minute
.sideOutputLateData(lateOutputTag) // After that, to side output
.aggregate(new OrderStatsAggregator());
// Handle late data separately
DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);
lateOrders.addSink(new LateOrderAlertSink());
State Management and Checkpointing
// Keyed State example
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
// ValueState: single value per key
private ValueState<Boolean> flagState;
// ListState: list per key
private ListState<Transaction> recentTransactions;
// MapState: map per key
private MapState<String, Integer> merchantCounts;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor =
new ValueStateDescriptor<>("flag", Boolean.class);
flagState = getRuntimeContext().getState(flagDescriptor);
ListStateDescriptor<Transaction> listDescriptor =
new ListStateDescriptor<>("recent-txns", Transaction.class);
recentTransactions = getRuntimeContext().getListState(listDescriptor);
MapStateDescriptor<String, Integer> mapDescriptor =
new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
merchantCounts = getRuntimeContext().getMapState(mapDescriptor);
}
@Override
public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
Boolean flag = flagState.value();
if (flag != null && txn.getAmount() > 10000) {
out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));
}
if (txn.getAmount() < 1.0) {
flagState.update(true);
// Timer to clear flag after 1 minute
ctx.timerService().registerEventTimeTimer(
txn.getTimestamp() + 60000);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
flagState.clear();
}
}
Checkpoint Operation
================================
[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]
1. JobManager: inject checkpoint barrier
Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink
2. State snapshot taken as barrier passes each operator
Source(snapshot) ──▶ Map(snapshot) ──▶ ...
3. All operator snapshots complete → checkpoint success
4. On failure: restore from last successful checkpoint
// Checkpoint configuration
env.enableCheckpointing(60000); // 60-second interval
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// State Backend configuration
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");
7. Kafka Streams vs Apache Flink
Comparison Table
| Feature | Kafka Streams | Apache Flink |
|---|---|---|
| Deployment | Library (embedded in JVM app) | Distributed cluster |
| Input Sources | Kafka only | Kafka, Kinesis, files, etc. |
| Exactly-Once | Kafka internal only | Including external systems |
| Windows | Tumbling, Sliding, Session | + Global, Custom |
| State Management | RocksDB (local) | RocksDB + distributed snapshots |
| SQL Support | KSQL (separate) | Flink SQL (built-in) |
| Throughput | Medium-High | High-Ultra |
| Operational Complexity | Low (app deployment) | High (cluster management) |
| Best For | Processing within Kafka ecosystem | Complex stream processing |
Selection Criteria
use_kafka_streams_when:
- "Simple Kafka-to-Kafka processing"
- "Separate cluster operations are burdensome"
- "Embedding inside microservices"
- "Medium throughput is sufficient"
- "Team is familiar with Kafka ecosystem"
use_flink_when:
- "Complex event processing (CEP)"
- "Connecting diverse sources/sinks"
- "Large-scale state management needed"
- "Advanced window functions needed"
- "SQL-based stream processing"
- "Ultra-high throughput required"
8. Kafka Connect
Source/Sink Connectors
// Debezium MySQL Source Connector
{
"name": "mysql-source-orders",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-primary",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz-password",
"database.server.id": "1",
"topic.prefix": "cdc",
"database.include.list": "order_service",
"table.include.list": "order_service.orders,order_service.order_items",
"schema.history.internal.kafka.bootstrap.servers": "broker:9092",
"schema.history.internal.kafka.topic": "schema-changes.orders",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "cdc\\.order_service\\.(.*)",
"transforms.route.replacement": "order.$1.cdc"
}
}
// Elasticsearch Sink Connector
{
"name": "es-sink-orders",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"topics": "order.orders.cdc",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"write.method": "upsert",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "order_id"
}
}
Debezium CDC
Debezium CDC Flow
================================
[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]
│
┌───────────────┤
▼ ▼
[Flink Job] [ES Sink Connector]
│ │
▼ ▼
[Kafka Topic] [Elasticsearch]
CDC Event Structure:
- before: record before change (UPDATE/DELETE)
- after: record after change (INSERT/UPDATE)
- source: source DB metadata
- op: operation type (c=create, u=update, d=delete, r=read)
9. Performance Tuning
Producer Tuning
# Producer performance optimization
batch.size=65536 # 64KB (default 16KB)
linger.ms=10 # Wait 10ms (batch efficiency)
buffer.memory=67108864 # 64MB
compression.type=lz4 # Compression (saves network/disk)
acks=all # Reliability
max.in.flight.requests.per.connection=5 # Pipelining
send.buffer.bytes=131072 # Socket buffer
Consumer Tuning
# Consumer performance optimization
fetch.min.bytes=1048576 # 1MB (batch efficiency)
fetch.max.wait.ms=500 # Wait up to 500ms
max.poll.records=500 # Records per poll
max.partition.fetch.bytes=1048576 # 1MB per partition
session.timeout.ms=30000 # Session timeout
heartbeat.interval.ms=10000 # Heartbeat interval
max.poll.interval.ms=300000 # Max poll interval 5min
Broker Tuning
# Broker performance optimization
num.partitions=12 # Default partition count
default.replication.factor=3 # Replication factor
num.io.threads=8 # I/O threads
num.network.threads=3 # Network threads
num.replica.fetchers=4 # Replica fetchers
socket.send.buffer.bytes=102400 # Socket send buffer
socket.receive.buffer.bytes=102400 # Socket receive buffer
log.segment.bytes=1073741824 # 1GB segments
log.retention.hours=168 # 7-day retention
log.retention.bytes=-1 # No size limit
Determining Partition Count
Partition Count Formula
================================
Required throughput: 100MB/s
Single partition throughput: ~10MB/s (Producer)
~5MB/s (Consumer)
Producer perspective: 100MB/s / 10MB/s = 10 partitions
Consumer perspective: 100MB/s / 5MB/s = 20 partitions
→ Minimum 20 partitions needed
Additional considerations:
- Match consumer instance count (partitions >= consumers)
- Partition count can increase but cannot decrease
- Too many partitions: metadata overhead, rebalancing delay
- Recommendation: 6-50 per topic
10. Monitoring
JMX Metrics
# Key Kafka metrics
broker_metrics:
- name: "UnderReplicatedPartitions"
description: "Partitions fallen out of ISR"
alert_threshold: "> 0"
severity: critical
- name: "ActiveControllerCount"
description: "Number of active controllers"
alert_threshold: "!= 1"
severity: critical
- name: "OfflinePartitionsCount"
description: "Number of offline partitions"
alert_threshold: "> 0"
severity: critical
- name: "RequestsPerSec"
description: "Requests per second"
alert_threshold: "> 10000"
severity: warning
producer_metrics:
- name: "record-send-rate"
description: "Records sent per second"
- name: "record-error-rate"
description: "Record errors per second"
alert_threshold: "> 0"
- name: "request-latency-avg"
description: "Average request latency"
alert_threshold: "> 100ms"
consumer_metrics:
- name: "records-lag-max"
description: "Maximum consumer lag"
alert_threshold: "> 10000"
severity: warning
- name: "records-consumed-rate"
description: "Records consumed per second"
- name: "commit-latency-avg"
description: "Average commit latency"
Consumer Lag Monitoring
Consumer Lag Calculation
================================
Partition 0:
Log End Offset (LEO): 1000 (latest message)
Consumer Offset: 850 (last committed)
Lag: 1000 - 850 = 150
Partition 1:
LEO: 2000
Consumer Offset: 1950
Lag: 50
Total Lag = 150 + 50 = 200 messages
# Burrow (LinkedIn's Consumer Lag Monitoring)
burrow_config:
general:
pidfile: "/var/run/burrow.pid"
stdout-logfile: "/var/log/burrow.log"
cluster:
kafka-prod:
class-name: kafka
servers: ["broker1:9092", "broker2:9092", "broker3:9092"]
topic-refresh: 120
offset-refresh: 30
consumer:
kafka-prod:
class-name: kafka
cluster: kafka-prod
servers: ["broker1:9092", "broker2:9092"]
group-denylist: "^(console-consumer|_)"
start-latest: true
notifier:
slack:
class-name: http
url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"
template-open: |
Consumer group lag alert:
Group: {{.GroupName}}
Status: {{.Status}}
send-close: true
Grafana Dashboard
Kafka Monitoring Dashboard Panels
================================
1. Cluster Overview
- Broker count / status
- Total partition count
- Under-replicated partitions
- Active Controller
2. Throughput
- Messages In/Out per sec
- Bytes In/Out per sec
- Requests per sec
3. Consumer Groups
- Consumer Lag (per group)
- Consume Rate
- Rebalance count
4. Latency
- Produce Request Latency (p50, p99)
- Fetch Request Latency
- End-to-End Latency
5. Resources
- Disk Usage per Broker
- Network I/O
- CPU / Memory
11. Quiz
Q1. Why do messages with the same key always go to the same partition in Kafka?
Because the default partitioner uses the murmur2 hash function to determine the partition by taking the hash of the key modulo the number of partitions.
partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
The same key always produces the same hash value, so it is assigned to the same partition. This ensures ordering of messages with the same key.
Note that if the partition count changes, key-to-partition mapping may change.
Q2. What are the 3 elements required to achieve Exactly-Once Semantics?
-
Idempotent Producer: With
enable.idempotence=true, uses ProducerID and Sequence Number to prevent duplicate sends. -
Transactional Producer: Configure
transactional.idand usebeginTransaction(),commitTransaction()to atomically send multiple messages. -
read_committed isolation level: Set
isolation.level=read_committedon the consumer to only read messages from committed transactions.
These three combine to guarantee exactly-once processing in the Consume-Transform-Produce pattern.
Q3. What is required when adding a new field in Avro BACKWARD compatibility?
A default value is required.
BACKWARD compatibility means the new schema (v2) must be able to read data written with the old schema (v1). Since old data does not have the new field, without a default value, deserialization will fail.
Example: "name": "status", "type": "string", "default": "CREATED"
For optional fields, use a ["null", "string"] union type with "default": null.
Q4. What problem do Flink watermarks solve?
Watermarks solve the late-arriving data problem in event time-based processing.
In distributed systems, events may arrive out of order relative to when they occurred. Watermarks provide an estimate that all events up to a certain point have arrived.
W(t) = "Estimate that all events before time t have arrived"
When the watermark exceeds a window's end time, that window's computation results are emitted. Setting allowedLateness allows late data to be accepted for a period even after the watermark has passed.
Q5. What are the criteria for choosing between Kafka Streams and Apache Flink?
Choose Kafka Streams when:
- Simple Kafka-to-Kafka processing
- Separate cluster operations are burdensome
- Embedding inside microservices
- Team is familiar with the Kafka ecosystem
Choose Apache Flink when:
- Complex Event Processing (CEP) is needed
- Connecting diverse sources/sinks beyond Kafka
- Large-scale state management is required
- SQL-based stream processing is needed
- Ultra-high throughput is required
Key difference: Kafka Streams is a library (embedded in JVM apps), while Flink is a distributed cluster.
References
- Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/
- Narkhede, N., Shapira, G., Palino, T. (2021). Kafka: The Definitive Guide, 2nd ed. O'Reilly Media.
- Apache Flink Documentation. (2025). https://flink.apache.org/docs/
- Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/
- Hueske, F., Kalavri, V. (2019). Stream Processing with Apache Flink. O'Reilly Media.
- KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
- Debezium Documentation. (2025). https://debezium.io/documentation/
- Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
- Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."
- Apache Avro Specification. (2025). https://avro.apache.org/docs/
- Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow
- Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."
- Flink Forward Conference Talks. (2024). https://www.flink-forward.org/
- KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.