Skip to content

✍️ 필사 모드: Event Streaming Deep Dive Guide 2025: Kafka Internals, Flink, Exactly-Once, Schema Evolution

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

Introduction: The Evolution of Event Streaming

Event Streaming is the core infrastructure of modern distributed systems. The event streaming ecosystem centered on Apache Kafka has evolved beyond simple message queues to become the foundation for real-time data pipelines, event sourcing, CQRS, and stream processing.

This guide provides an in-depth look at Kafka internals, Exactly-Once semantics, Schema Registry, and Apache Flink.


1. Kafka Internals Deep Dive

Partition Structure

A Kafka topic is divided into one or more partitions. Each partition is an immutable, ordered log.

Topic: orders (3 partitions)

Partition 0: [msg0] [msg3] [msg6] [msg9]  ... → offset increases
Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset increases
Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset increases

- Ordering guaranteed only within the same partition
- Partition count = max parallel consumers in a Consumer Group
- Partition key routes messages with the same key to the same partition

Segment Files and Indexes

Each partition is stored on disk as segment files.

/kafka-logs/orders-0/
├── 00000000000000000000.log      # Segment file (actual messages)
├── 00000000000000000000.index    # Offset index
├── 00000000000000000000.timeindex # Timestamp index
├── 00000000000005242880.log      # Next segment
├── 00000000000005242880.index
├── 00000000000005242880.timeindex
└── leader-epoch-checkpoint
Segment File Structure
========================

.log file (message storage)
┌──────────────────────────────────────┐
Batch Header- Base Offset: 0- Batch Length: 256 bytes           │
- Magic: 2 (Kafka 0.11+)- CRC: checksum                     │
- Attributes: compression, txn      │
- Producer ID: 12345- Producer Epoch: 0- Base Sequence: 0├──────────────────────────────────────┤
Record 0- Offset Delta: 0- Timestamp Delta: 0- Key: "order-123"- Value: (serialized payload)- Headers: [("source", "web")]├──────────────────────────────────────┤
Record 1- Offset Delta: 1- ...└──────────────────────────────────────┘

.index file (sparse index)
┌────────────────────────┐
Relative OffsetFilePosition├────────────────────────┤
00409632768819265536└────────────────────────┘

.timeindex file
┌────────────────────────────────┐
TimestampRelative Offset├────────────────────────────────┤
1700000000000017000000600004096└────────────────────────────────┘

Log Compaction

Log Compaction is a retention strategy that keeps only the latest value for each key.

Before compaction:
offset 0: key=A, value=v1
offset 1: key=B, value=v1
offset 2: key=A, value=v2  ← latest for A
offset 3: key=C, value=v1
offset 4: key=B, value=v2  ← latest for B
offset 5: key=A, value=v3  ← latest for A

After compaction:
offset 3: key=C, value=v1  (only value for C)
offset 4: key=B, value=v2  (latest for B)
offset 5: key=A, value=v3  (latest for A)
# Log Compaction configuration
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
min.cleanable.dirty.ratio=0.5

ISR (In-Sync Replicas)

ISR is the set of replicas that are synchronized with the leader.

Partition 0 (Replication Factor = 3)
======================================

Broker 1 (Leader):      [0] [1] [2] [3] [4] [5]LEO: 6
Broker 2 (Follower):    [0] [1] [2] [3] [4]LEO: 5
Broker 3 (Follower):    [0] [1] [2] [3] [4] [5]LEO: 6

ISR = {Broker 1, Broker 2, Broker 3}
HW (High Watermark) = 5 (minimum LEO within ISR)

- Consumers can only read up to HW
- If Broker 2 lags too much, it is removed from ISR
- Removed from ISR when exceeding replica.lag.time.max.ms
# ISR-related configuration
min.insync.replicas=2           # Minimum in-sync replicas
replica.lag.time.max.ms=30000   # ISR removal threshold
unclean.leader.election.enable=false  # Prevent non-ISR replica leader election

KRaft vs ZooKeeper

Starting from Kafka 3.x, KRaft mode was introduced to operate without ZooKeeper.

ZooKeeper Mode (Legacy)
==========================
[ZooKeeper Ensemble]
  ↕ metadata
[Controller (one of the Brokers)]
  ↕ leader election
[Broker 1] [Broker 2] [Broker 3]

Issues:
- Requires separate ZooKeeper operations
- Metadata synchronization delays
- Performance degradation as partition count increases


KRaft Mode (Kafka 3.x+)
==========================
[Controller Quorum]
  Broker 1 (Controller + Broker)
  Broker 2 (Controller + Broker)
  Broker 3 (Controller + Broker)

Advantages:
- Eliminates ZooKeeper dependency
- Improved metadata management performance
- Supports millions of partitions
- Reduced operational complexity
# KRaft configuration
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER

2. Producer Deep Dive

Partitioner

Producers use a partitioner to determine which partition a message goes to.

// Default partitioner behavior
public class DefaultPartitioner implements Partitioner {
    // With key: murmur2 hash
    // Without key: Sticky Partitioner (changes partition per batch)

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            // Sticky Partitioner: same partition until batch is full
            return stickyPartitionCache.partition(topic, cluster);
        }

        // Key-based partitioning: same key = same partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
// Custom partitioner example: region-based
public class RegionPartitioner implements Partitioner {
    private Map<String, Integer> regionMapping;

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String region = extractRegion(value);
        int numPartitions = cluster.partitionsForTopic(topic).size();

        return regionMapping.getOrDefault(region, 0) % numPartitions;
    }
}

Batching and Compression

# Producer batch settings
batch.size=16384                  # Batch size (bytes)
linger.ms=5                       # Batch wait time
buffer.memory=33554432            # Total buffer memory

# Compression settings
compression.type=lz4              # none, gzip, snappy, lz4, zstd
Producer Batch Processing Flow
==========================

send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]
                                         ┌───────────┴───────────┐
Partition 0 batch    │
[msg1][msg2][msg3]                                         ├───────────────────────┤
Partition 1 batch    │
[msg4][msg5]                                         └───────────┬───────────┘
                                              batch.size reached
                                              or linger.ms elapsed
                                              [Sender Thread]
                                              Compress + network send
                                              [Kafka Broker]

Idempotent Producer

# Idempotent producer settings
enable.idempotence=true    # Default since Kafka 3.0+
acks=all                   # Required for idempotency
retries=2147483647         # Infinite retries
max.in.flight.requests.per.connection=5  # Max 5 (idempotency range)
Idempotent Producer Operation
==========================

Producer                          Broker
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 0Message: "order-1"   ──────▶│  ✓ Stored
   │                                │
   │── ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✓ Stored
   │                                │
   │── (Resend due to network timeout)ProducerID: 1000Epoch: 0Sequence: 1Message: "order-2"   ──────▶│  ✗ Duplicate! Ignored
   │                                │

Transactional Producer

// Transactional producer example
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // Atomically send to multiple topics/partitions
    producer.send(new ProducerRecord<>("orders", "key1", "order-created"));
    producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));
    producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));

    // Consumer offsets can be included in the transaction
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);

    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

3. Consumer Deep Dive

Consumer Groups and Partition Assignment

Topic: orders (6 partitions)

Consumer Group: order-processor
===================================

Case 1: 3 Consumers
  Consumer A: [P0] [P1]
  Consumer B: [P2] [P3]
  Consumer C: [P4] [P5]

Case 2: 6 Consumers (ideal)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]

Case 3: 8 Consumers (2 idle)
  Consumer A: [P0]
  Consumer B: [P1]
  Consumer C: [P2]
  Consumer D: [P3]
  Consumer E: [P4]
  Consumer F: [P5]
  Consumer G: (idle)
  Consumer H: (idle)

Partition Assignment Strategies

// Partition assignment strategy comparison
Properties props = new Properties();

// 1. Range Assignor (default)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RangeAssignor");
// Divides partitions into ranges per topic
// Topic A: P0,P1,P2 -> C1: [P0,P1], C2: [P2]
// Topic B: P0,P1,P2 -> C1: [P0,P1], C2: [P2]
// Issue: skew toward C1

// 2. RoundRobin Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.RoundRobinAssignor");
// Distributes all topic partitions in round-robin
// C1: [A-P0, B-P1], C2: [A-P1, B-P0], C3: [A-P2, B-P2]

// 3. Sticky Assignor
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.StickyAssignor");
// Maximizes existing assignment retention + even distribution
// Moves minimum partitions during rebalance

// 4. CooperativeSticky Assignor (recommended)
props.put("partition.assignment.strategy",
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Sticky + incremental rebalancing
// Reassigns partitions without full stop

Rebalancing: Eager vs Incremental

Eager Rebalancing (legacy)
================================
1. Consumer C3 added
2. All consumers release partitions (Stop-the-World)
   C1: [] C2: [] C3: []
3. Calculate new assignment
4. Reassign partitions
   C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]
Full processing stop occurs!


Incremental Cooperative Rebalancing (recommended)
================================
1. Consumer C3 added
2. 1st rebalance: only release partitions that need to move
   C1: [P0,P1,P2]C1: [P0,P1]  (P2 released)
   C2: [P3,P4,P5]C2: [P3,P4]  (P5 released)
3. 2nd rebalance: assign released partitions to C3
   C3: []C3: [P2,P5]
Remaining partitions continue processing without interruption!

Offset Management

// Auto commit (default)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// Risk: if committed before processing, messages can be lost

// Manual commit (recommended)
props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        // Process message
        processRecord(record);
    }

    // Synchronous commit (after processing complete)
    consumer.commitSync();

    // Or async commit (performance improvement)
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("Commit failed", exception);
        }
    });
}
Offset Storage: __consumer_offsets topic
============================================

Key: (group_id, topic, partition)
Value: (offset, metadata, timestamp)

Example:
Key: (order-processor, orders, 0)
Value: (offset=42, metadata="", timestamp=1700000000)

4. Exactly-Once Semantics

Message Delivery Guarantees

At-Most-Once
================================
Producer ──send()──▶ Broker
  (acks=0, no confirmation)
- Messages can be lost
- No duplicates
- Fastest

At-Least-Once
================================
Producer ──send()──▶ Broker ──ack──▶ Producer
  (acks=all, with retries)
- No message loss
- Duplicates possible
- Most default configurations

Exactly-Once
================================
Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)
  (idempotent producer + transactions + read_committed)
- No message loss
- No duplicates
- Strongest but with overhead

Exactly-Once Implementation

// Exactly-Once: Consume-Transform-Produce pattern
Properties producerProps = new Properties();
producerProps.put("transactional.id", "eos-processor-1");
producerProps.put("enable.idempotence", "true");

Properties consumerProps = new Properties();
consumerProps.put("group.id", "eos-group");
consumerProps.put("isolation.level", "read_committed");  // Key!
consumerProps.put("enable.auto.commit", "false");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

producer.initTransactions();
consumer.subscribe(Collections.singleton("input-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (!records.isEmpty()) {
        producer.beginTransaction();

        try {
            for (ConsumerRecord<String, String> record : records) {
                // Transform
                String result = transform(record.value());

                // Produce to output topic
                producer.send(new ProducerRecord<>(
                    "output-topic", record.key(), result));
            }

            // Include consumer offsets in transaction
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partRecords =
                    records.records(partition);
                long lastOffset = partRecords.get(
                    partRecords.size() - 1).offset();
                offsets.put(partition,
                    new OffsetAndMetadata(lastOffset + 1));
            }

            producer.sendOffsetsToTransaction(offsets,
                consumer.groupMetadata());
            producer.commitTransaction();

        } catch (Exception e) {
            producer.abortTransaction();
        }
    }
}
Exactly-Once Transaction Flow
================================

1. beginTransaction()
2. send(output-topic, msg1)Temporarily stored on Broker (uncommitted)
3. send(output-topic, msg2)Temporarily stored
4. sendOffsetsToTransaction()Consumer offsets included in transaction
5. commitTransaction()All messages + offsets atomically committed

read_committed consumers only read committed messages
If failure occurs mid-way, abortTransaction() rolls back everything

5. Schema Registry

Serialization Format Comparison

FeatureAvroProtobufJSON Schema
Serialized SizeSmallSmallLarge
Schema EvolutionExcellentExcellentAverage
Code GenerationOptionalRequiredNot needed
ReadabilityLow (binary)Low (binary)High (text)
Dynamic TypingSupportedNot supportedSupported
Kafka CompatibilityBestExcellentAverage

Avro Schema and Evolution

// Version 1: Initial schema
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"}
  ]
}
// Version 2: Backward Compatible evolution
// New fields with default values → existing consumers can read new data
{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.events",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "USD"},
    {"name": "created_at", "type": "long"},
    {"name": "status", "type": "string", "default": "CREATED"},
    {"name": "region", "type": ["null", "string"], "default": null}
  ]
}

Compatibility Modes

BACKWARD (default, recommended)
================================
- New schema can read data written with old schema
- New fields: default value required
- Field removal: allowed
- Upgrade consumers first → then producers

FORWARD
================================
- Old schema can read data written with new schema
- New fields: default value required
- Field removal: only fields with default values
- Upgrade producers first → then consumers

FULL
================================
- Both BACKWARD and FORWARD satisfied
- Safest but most restrictive

NONE
================================
- No compatibility checks
- Dangerous! Do not use in production
# Schema Registry API usage
import requests

SCHEMA_REGISTRY_URL = "http://schema-registry:8081"

# Register schema
schema = {
    "type": "record",
    "name": "Order",
    "fields": [
        {"name": "order_id", "type": "string"},
        {"name": "amount", "type": "double"}
    ]
}

response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",
    json={"schema": json.dumps(schema), "schemaType": "AVRO"},
)

# Check compatibility
response = requests.post(
    f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",
    json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},
)
compatibility = response.json()
print(f"Compatible: {compatibility['is_compatible']}")

# Set compatibility mode
requests.put(
    f"{SCHEMA_REGISTRY_URL}/config/orders-value",
    json={"compatibility": "BACKWARD"},
)

Protobuf Schema Evolution

// Version 1
syntax = "proto3";
package com.company.events;

message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
}

// Version 2 (maintaining compatibility)
message Order {
  string order_id = 1;
  string customer_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at = 5;
  // New fields: never reuse old field numbers!
  string status = 6;          // newly added
  string region = 7;          // newly added
  // reserved 2;              // reserve deleted field numbers
}

┌─────────────────────────────────────────────┐
Flink Cluster├─────────────────────────────────────────────┤
│                                             │
[JobManager]- Job scheduling                         │
- Checkpoint coordination                │
- Failure recovery                       │
│                                             │
[TaskManager 1]     [TaskManager 2]- Task Slot 1       - Task Slot 1- Task Slot 2       - Task Slot 2- Task Slot 3       - Task Slot 3│                                             │
├─────────────────────────────────────────────┤
[State Backend]- RocksDB (large state)- HashMap (small state)│                                             │
[Checkpoint Storage]- S3 / HDFS / GCS└─────────────────────────────────────────────┘

DataStream API

// Flink DataStream API example
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka Source
KafkaSource<Order> source = KafkaSource.<Order>builder()
    .setBootstrapServers("broker1:9092")
    .setTopics("orders")
    .setGroupId("flink-order-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new OrderDeserializer())
    .build();

DataStream<Order> orders = env.fromSource(
    source, WatermarkStrategy
        .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((order, ts) -> order.getCreatedAt()),
    "Kafka Source"
);

// Processing pipeline
DataStream<OrderStats> stats = orders
    .filter(order -> order.getStatus().equals("COMPLETED"))
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new OrderStatsAggregator());

// Kafka Sink
KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()
    .setBootstrapServers("broker1:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("order-stats")
        .setValueSerializationSchema(new OrderStatsSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-order-stats")
    .build();

stats.sinkTo(sink);
env.execute("Order Statistics Pipeline");

Windowing

Tumbling Window
================================
Time:   ──|──1──2──3──|──4──5──6──|──7──8──9──|──
Windows:  [  Window 1  ][  Window 2  ][  Window 3  ]
- Fixed size, non-overlapping
- Example: aggregate every 5 minutes


Sliding Window
================================
Time:   ──|──1──2──3──4──5──6──7──8──|──
Windows:  [    Window 1    ]
            [    Window 2    ]
                [    Window 3    ]
- Fixed size, slides by interval
- Example: 10-minute window, 5-minute slide


Session Window
================================
Time:   ──12──3─────────56──7─────────9──
Windows:  [Session 1]     [Session 2]     [S3]
                  gap          gap
- Activity-based, variable size
- Window closes after gap of inactivity
// Window examples
// Tumbling Window
orders.keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount");

// Sliding Window
orders.keyBy(Order::getRegion)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .sum("amount");

// Session Window
orders.keyBy(Order::getCustomerId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator());

Watermarks and Late Data

Event Time vs Processing Time
================================

Event generation time(Event Time):   t=10  t=12  t=11  t=15  t=13
Processing time(Processing Time):    T=20  T=21  T=22  T=23  T=24

Events do not arrive in order!
Watermarks indicate "all events up to this point should have arrived"


Watermark Operation
================================

Events:     [t=10] [t=12] [t=11] [t=15] [t=13]
Watermarks: W(10)  W(12)  W(12)  W(15)  W(15)
            (with maxOutOfOrderness = 5 seconds)
            Actual watermarks: W(5)  W(7)  W(7)  W(10) W(10)

Window [0, 10): fires when watermark exceeds 10
// Watermark strategy
WatermarkStrategy<Order> strategy = WatermarkStrategy
    // Allow up to 10 seconds of late arrivals
    .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((order, timestamp) -> order.getCreatedAt())
    // Advance watermark if no events for 5 minutes
    .withIdleness(Duration.ofMinutes(5));

// Late data handling
OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};

SingleOutputStreamOperator<OrderStats> stats = orders
    .keyBy(Order::getRegion)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Wait 1 more minute
    .sideOutputLateData(lateOutputTag)  // After that, to side output
    .aggregate(new OrderStatsAggregator());

// Handle late data separately
DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);
lateOrders.addSink(new LateOrderAlertSink());

State Management and Checkpointing

// Keyed State example
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

    // ValueState: single value per key
    private ValueState<Boolean> flagState;

    // ListState: list per key
    private ListState<Transaction> recentTransactions;

    // MapState: map per key
    private MapState<String, Integer> merchantCounts;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor =
            new ValueStateDescriptor<>("flag", Boolean.class);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ListStateDescriptor<Transaction> listDescriptor =
            new ListStateDescriptor<>("recent-txns", Transaction.class);
        recentTransactions = getRuntimeContext().getListState(listDescriptor);

        MapStateDescriptor<String, Integer> mapDescriptor =
            new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
        merchantCounts = getRuntimeContext().getMapState(mapDescriptor);
    }

    @Override
    public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
        Boolean flag = flagState.value();

        if (flag != null && txn.getAmount() > 10000) {
            out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));
        }

        if (txn.getAmount() < 1.0) {
            flagState.update(true);
            // Timer to clear flag after 1 minute
            ctx.timerService().registerEventTimeTimer(
                txn.getTimestamp() + 60000);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        flagState.clear();
    }
}
Checkpoint Operation
================================

[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]

1. JobManager: inject checkpoint barrier
   Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink

2. State snapshot taken as barrier passes each operator
   Source(snapshot) ──▶ Map(snapshot) ──▶ ...

3. All operator snapshots complete → checkpoint success

4. On failure: restore from last successful checkpoint
// Checkpoint configuration
env.enableCheckpointing(60000);  // 60-second interval
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// State Backend configuration
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");

Comparison Table

FeatureKafka StreamsApache Flink
DeploymentLibrary (embedded in JVM app)Distributed cluster
Input SourcesKafka onlyKafka, Kinesis, files, etc.
Exactly-OnceKafka internal onlyIncluding external systems
WindowsTumbling, Sliding, Session+ Global, Custom
State ManagementRocksDB (local)RocksDB + distributed snapshots
SQL SupportKSQL (separate)Flink SQL (built-in)
ThroughputMedium-HighHigh-Ultra
Operational ComplexityLow (app deployment)High (cluster management)
Best ForProcessing within Kafka ecosystemComplex stream processing

Selection Criteria

use_kafka_streams_when:
  - "Simple Kafka-to-Kafka processing"
  - "Separate cluster operations are burdensome"
  - "Embedding inside microservices"
  - "Medium throughput is sufficient"
  - "Team is familiar with Kafka ecosystem"

use_flink_when:
  - "Complex event processing (CEP)"
  - "Connecting diverse sources/sinks"
  - "Large-scale state management needed"
  - "Advanced window functions needed"
  - "SQL-based stream processing"
  - "Ultra-high throughput required"

8. Kafka Connect

Source/Sink Connectors

// Debezium MySQL Source Connector
{
  "name": "mysql-source-orders",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-primary",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz-password",
    "database.server.id": "1",
    "topic.prefix": "cdc",
    "database.include.list": "order_service",
    "table.include.list": "order_service.orders,order_service.order_items",
    "schema.history.internal.kafka.bootstrap.servers": "broker:9092",
    "schema.history.internal.kafka.topic": "schema-changes.orders",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "cdc\\.order_service\\.(.*)",
    "transforms.route.replacement": "order.$1.cdc"
  }
}
// Elasticsearch Sink Connector
{
  "name": "es-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "topics": "order.orders.cdc",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",
    "behavior.on.null.values": "delete",
    "write.method": "upsert",
    "transforms": "extractKey",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "order_id"
  }
}

Debezium CDC

Debezium CDC Flow
================================

[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]
                                    ┌───────────────┤
                                    ▼               ▼
                              [Flink Job]    [ES Sink Connector]
                                    │               │
                                    ▼               ▼
                              [Kafka Topic]  [Elasticsearch]

CDC Event Structure:
- before: record before change (UPDATE/DELETE)
- after: record after change (INSERT/UPDATE)
- source: source DB metadata
- op: operation type (c=create, u=update, d=delete, r=read)

9. Performance Tuning

Producer Tuning

# Producer performance optimization
batch.size=65536              # 64KB (default 16KB)
linger.ms=10                  # Wait 10ms (batch efficiency)
buffer.memory=67108864        # 64MB
compression.type=lz4          # Compression (saves network/disk)
acks=all                      # Reliability
max.in.flight.requests.per.connection=5  # Pipelining
send.buffer.bytes=131072      # Socket buffer

Consumer Tuning

# Consumer performance optimization
fetch.min.bytes=1048576       # 1MB (batch efficiency)
fetch.max.wait.ms=500         # Wait up to 500ms
max.poll.records=500          # Records per poll
max.partition.fetch.bytes=1048576  # 1MB per partition
session.timeout.ms=30000      # Session timeout
heartbeat.interval.ms=10000   # Heartbeat interval
max.poll.interval.ms=300000   # Max poll interval 5min

Broker Tuning

# Broker performance optimization
num.partitions=12                    # Default partition count
default.replication.factor=3         # Replication factor
num.io.threads=8                     # I/O threads
num.network.threads=3                # Network threads
num.replica.fetchers=4               # Replica fetchers
socket.send.buffer.bytes=102400      # Socket send buffer
socket.receive.buffer.bytes=102400   # Socket receive buffer
log.segment.bytes=1073741824         # 1GB segments
log.retention.hours=168              # 7-day retention
log.retention.bytes=-1               # No size limit

Determining Partition Count

Partition Count Formula
================================

Required throughput: 100MB/s
Single partition throughput: ~10MB/s (Producer)
                             ~5MB/s (Consumer)

Producer perspective: 100MB/s / 10MB/s = 10 partitions
Consumer perspective: 100MB/s / 5MB/s = 20 partitions

Minimum 20 partitions needed

Additional considerations:
- Match consumer instance count (partitions >= consumers)
- Partition count can increase but cannot decrease
- Too many partitions: metadata overhead, rebalancing delay
- Recommendation: 6-50 per topic

10. Monitoring

JMX Metrics

# Key Kafka metrics
broker_metrics:
  - name: "UnderReplicatedPartitions"
    description: "Partitions fallen out of ISR"
    alert_threshold: "> 0"
    severity: critical

  - name: "ActiveControllerCount"
    description: "Number of active controllers"
    alert_threshold: "!= 1"
    severity: critical

  - name: "OfflinePartitionsCount"
    description: "Number of offline partitions"
    alert_threshold: "> 0"
    severity: critical

  - name: "RequestsPerSec"
    description: "Requests per second"
    alert_threshold: "> 10000"
    severity: warning

producer_metrics:
  - name: "record-send-rate"
    description: "Records sent per second"

  - name: "record-error-rate"
    description: "Record errors per second"
    alert_threshold: "> 0"

  - name: "request-latency-avg"
    description: "Average request latency"
    alert_threshold: "> 100ms"

consumer_metrics:
  - name: "records-lag-max"
    description: "Maximum consumer lag"
    alert_threshold: "> 10000"
    severity: warning

  - name: "records-consumed-rate"
    description: "Records consumed per second"

  - name: "commit-latency-avg"
    description: "Average commit latency"

Consumer Lag Monitoring

Consumer Lag Calculation
================================

Partition 0:
  Log End Offset (LEO): 1000  (latest message)
  Consumer Offset:       850  (last committed)
  Lag: 1000 - 850 = 150

Partition 1:
  LEO: 2000
  Consumer Offset: 1950
  Lag: 50

Total Lag = 150 + 50 = 200 messages
# Burrow (LinkedIn's Consumer Lag Monitoring)
burrow_config:
  general:
    pidfile: "/var/run/burrow.pid"
    stdout-logfile: "/var/log/burrow.log"

  cluster:
    kafka-prod:
      class-name: kafka
      servers: ["broker1:9092", "broker2:9092", "broker3:9092"]
      topic-refresh: 120
      offset-refresh: 30

  consumer:
    kafka-prod:
      class-name: kafka
      cluster: kafka-prod
      servers: ["broker1:9092", "broker2:9092"]
      group-denylist: "^(console-consumer|_)"
      start-latest: true

  notifier:
    slack:
      class-name: http
      url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"
      template-open: |
        Consumer group lag alert:
        Group: {{.GroupName}}
        Status: {{.Status}}
      send-close: true

Grafana Dashboard

Kafka Monitoring Dashboard Panels
================================

1. Cluster Overview
   - Broker count / status
   - Total partition count
   - Under-replicated partitions
   - Active Controller

2. Throughput
   - Messages In/Out per sec
   - Bytes In/Out per sec
   - Requests per sec

3. Consumer Groups
   - Consumer Lag (per group)
   - Consume Rate
   - Rebalance count

4. Latency
   - Produce Request Latency (p50, p99)
   - Fetch Request Latency
   - End-to-End Latency

5. Resources
   - Disk Usage per Broker
   - Network I/O
   - CPU / Memory

11. Quiz

Q1. Why do messages with the same key always go to the same partition in Kafka?

Because the default partitioner uses the murmur2 hash function to determine the partition by taking the hash of the key modulo the number of partitions.

partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

The same key always produces the same hash value, so it is assigned to the same partition. This ensures ordering of messages with the same key.

Note that if the partition count changes, key-to-partition mapping may change.

Q2. What are the 3 elements required to achieve Exactly-Once Semantics?
  1. Idempotent Producer: With enable.idempotence=true, uses ProducerID and Sequence Number to prevent duplicate sends.

  2. Transactional Producer: Configure transactional.id and use beginTransaction(), commitTransaction() to atomically send multiple messages.

  3. read_committed isolation level: Set isolation.level=read_committed on the consumer to only read messages from committed transactions.

These three combine to guarantee exactly-once processing in the Consume-Transform-Produce pattern.

Q3. What is required when adding a new field in Avro BACKWARD compatibility?

A default value is required.

BACKWARD compatibility means the new schema (v2) must be able to read data written with the old schema (v1). Since old data does not have the new field, without a default value, deserialization will fail.

Example: "name": "status", "type": "string", "default": "CREATED"

For optional fields, use a ["null", "string"] union type with "default": null.

Q4. What problem do Flink watermarks solve?

Watermarks solve the late-arriving data problem in event time-based processing.

In distributed systems, events may arrive out of order relative to when they occurred. Watermarks provide an estimate that all events up to a certain point have arrived.

W(t) = "Estimate that all events before time t have arrived"

When the watermark exceeds a window's end time, that window's computation results are emitted. Setting allowedLateness allows late data to be accepted for a period even after the watermark has passed.

Q5. What are the criteria for choosing between Kafka Streams and Apache Flink?

Choose Kafka Streams when:

  • Simple Kafka-to-Kafka processing
  • Separate cluster operations are burdensome
  • Embedding inside microservices
  • Team is familiar with the Kafka ecosystem

Choose Apache Flink when:

  • Complex Event Processing (CEP) is needed
  • Connecting diverse sources/sinks beyond Kafka
  • Large-scale state management is required
  • SQL-based stream processing is needed
  • Ultra-high throughput is required

Key difference: Kafka Streams is a library (embedded in JVM apps), while Flink is a distributed cluster.


References

  1. Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/
  2. Narkhede, N., Shapira, G., Palino, T. (2021). Kafka: The Definitive Guide, 2nd ed. O'Reilly Media.
  3. Apache Flink Documentation. (2025). https://flink.apache.org/docs/
  4. Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/
  5. Hueske, F., Kalavri, V. (2019). Stream Processing with Apache Flink. O'Reilly Media.
  6. KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
  7. Debezium Documentation. (2025). https://debezium.io/documentation/
  8. Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
  9. Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."
  10. Apache Avro Specification. (2025). https://avro.apache.org/docs/
  11. Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow
  12. Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."
  13. Flink Forward Conference Talks. (2024). https://www.flink-forward.org/
  14. KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.

현재 단락 (1/926)

Event Streaming is the core infrastructure of modern distributed systems. The event streaming ecosys...

작성 글자: 0원문 글자: 32,073작성 단락: 0/926