はじめに:Event Streamingの進化(しんか)
Event Streamingは現代(げんだい)の分散(ぶんさん)システムの核心(かくしん)インフラです。Apache Kafkaを中心としたイベントストリーミングエコシステムは、単純(たんじゅん)なメッセージキューを超(こ)え、リアルタイムデータパイプライン、イベントソーシング、CQRS、ストリーム処理(しょり)の基盤(きばん)となりました。
このガイドでは、Kafkaの内部構造(ないぶこうぞう)からExactly-Onceセマンティクス、Schema Registry、Apache Flinkまで深層的(しんそうてき)に扱(あつか)います。
1. Kafka内部構造の深掘り
パーティション構造
Kafkaのトピックは1つ以上のパーティションに分割(ぶんかつ)されます。各パーティションは順序(じゅんじょ)が保証(ほしょう)される不変(ふへん)のログです。
Topic: orders (3 partitions)
Partition 0: [msg0] [msg3] [msg6] [msg9] ... → offset増加
Partition 1: [msg1] [msg4] [msg7] [msg10] ... → offset増加
Partition 2: [msg2] [msg5] [msg8] [msg11] ... → offset増加
- 同じパーティション内でのみ順序保証
- パーティション数 = Consumer Group内の最大並列消費数
- パーティションキーで同じキーのメッセージが同じパーティションへ
セグメントファイルとインデックス
各パーティションはディスク上にセグメントファイルとして保存されます。
/kafka-logs/orders-0/
├── 00000000000000000000.log # セグメントファイル(実際のメッセージ)
├── 00000000000000000000.index # オフセットインデックス
├── 00000000000000000000.timeindex # タイムスタンプインデックス
├── 00000000000005242880.log # 次のセグメント
├── 00000000000005242880.index
├── 00000000000005242880.timeindex
└── leader-epoch-checkpoint
セグメントファイル構造
========================
.logファイル(メッセージ保存)
┌──────────────────────────────────────┐
│ Batch Header │
│ - Base Offset: 0 │
│ - Batch Length: 256 bytes │
│ - Magic: 2 (Kafka 0.11+) │
│ - CRC: checksum │
│ - Attributes: compression, txn │
│ - Producer ID: 12345 │
│ - Producer Epoch: 0 │
│ - Base Sequence: 0 │
├──────────────────────────────────────┤
│ Record 0 │
│ - Offset Delta: 0 │
│ - Timestamp Delta: 0 │
│ - Key: "order-123" │
│ - Value: (serialized payload) │
│ - Headers: [("source", "web")] │
├──────────────────────────────────────┤
│ Record 1 │
│ - Offset Delta: 1 │
│ - ... │
└──────────────────────────────────────┘
.indexファイル(疎インデックス)
┌────────────────────────┐
│ Relative Offset → File │
│ Position │
├────────────────────────┤
│ 0 → 0 │
│ 4096 → 32768 │
│ 8192 → 65536 │
└────────────────────────┘
.timeindexファイル
┌────────────────────────────────┐
│ Timestamp → Relative Offset │
├────────────────────────────────┤
│ 1700000000000 → 0 │
│ 1700000060000 → 4096 │
└────────────────────────────────┘
Log Compaction
Log Compactionは各キーの最新(さいしん)の値(あたい)のみを保持(ほじ)する保存戦略(せんりゃく)です。
Compaction前:
offset 0: key=A, value=v1
offset 1: key=B, value=v1
offset 2: key=A, value=v2 ← Aの最新値
offset 3: key=C, value=v1
offset 4: key=B, value=v2 ← Bの最新値
offset 5: key=A, value=v3 ← Aの最新値
Compaction後:
offset 3: key=C, value=v1 (Cの唯一の値)
offset 4: key=B, value=v2 (Bの最新値)
offset 5: key=A, value=v3 (Aの最新値)
# Log Compaction設定
log.cleanup.policy=compact
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.min.compaction.lag.ms=0
min.cleanable.dirty.ratio=0.5
ISR (In-Sync Replicas)
ISRはリーダーと同期(どうき)されたレプリカの集合(しゅうごう)です。
Partition 0 (Replication Factor = 3)
======================================
Broker 1 (Leader): [0] [1] [2] [3] [4] [5] ← LEO: 6
Broker 2 (Follower): [0] [1] [2] [3] [4] ← LEO: 5
Broker 3 (Follower): [0] [1] [2] [3] [4] [5] ← LEO: 6
ISR = {Broker 1, Broker 2, Broker 3}
HW (High Watermark) = 5 (ISR内の最小LEO)
- ConsumerはHWまでしか読めない
- Broker 2の遅延が大きくなるとISRから除外
- replica.lag.time.max.ms超過時にISRから除外
# ISR関連設定
min.insync.replicas=2 # 最小同期レプリカ数
replica.lag.time.max.ms=30000 # ISR除外基準時間
unclean.leader.election.enable=false # ISR外レプリカのリーダー選出を防止
KRaft vs ZooKeeper
Kafka 3.xからZooKeeperなしで動作(どうさ)するKRaftモードが導入(どうにゅう)されました。
ZooKeeperモード(レガシー)
==========================
[ZooKeeper Ensemble]
↕ メタデータ
[Controller (Brokerの1つ)]
↕ リーダー選出
[Broker 1] [Broker 2] [Broker 3]
問題点:
- ZooKeeperの別途運用が必要
- メタデータ同期の遅延
- パーティション数増加時の性能低下
KRaftモード (Kafka 3.x+)
==========================
[Controller Quorum]
Broker 1 (Controller + Broker)
Broker 2 (Controller + Broker)
Broker 3 (Controller + Broker)
利点:
- ZooKeeper依存性の排除
- メタデータ管理性能の向上
- 数百万パーティションのサポート
- 運用複雑度の削減
# KRaft設定
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
controller.listener.names=CONTROLLER
2. Producer深掘り
パーティショナー
プロデューサーはメッセージをどのパーティションに送(おく)るか決定(けってい)するパーティショナーを使用します。
// デフォルトパーティショナーの動作
public class DefaultPartitioner implements Partitioner {
// キーがある場合: murmur2ハッシュ
// キーがない場合: Sticky Partitioner(バッチ単位でパーティション変更)
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// Sticky Partitioner: バッチが一杯になるまで同じパーティション
return stickyPartitionCache.partition(topic, cluster);
}
// キーベースのパーティショニング: 同じキー = 同じパーティション
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// カスタムパーティショナー例: リージョンベース
public class RegionPartitioner implements Partitioner {
private Map<String, Integer> regionMapping;
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String region = extractRegion(value);
int numPartitions = cluster.partitionsForTopic(topic).size();
return regionMapping.getOrDefault(region, 0) % numPartitions;
}
}
バッチと圧縮
# Producerバッチ設定
batch.size=16384 # バッチサイズ (bytes)
linger.ms=5 # バッチ待機時間
buffer.memory=33554432 # 全体バッファメモリ
# 圧縮設定
compression.type=lz4 # none, gzip, snappy, lz4, zstd
Producerバッチ処理フロー
==========================
send() ──▶ [Serializer] ──▶ [Partitioner] ──▶ [RecordAccumulator]
│
┌───────────┴───────────┐
│ Partition 0 バッチ │
│ [msg1][msg2][msg3] │
├───────────────────────┤
│ Partition 1 バッチ │
│ [msg4][msg5] │
└───────────┬───────────┘
│
batch.size到達
またはlinger.ms経過
│
▼
[Sender Thread]
│
圧縮 + ネットワーク送信
│
▼
[Kafka Broker]
冪等(べきとう)プロデューサー(Idempotent Producer)
# 冪等プロデューサー設定
enable.idempotence=true # Kafka 3.0+のデフォルト値
acks=all # 冪等性の要件
retries=2147483647 # 無限リトライ
max.in.flight.requests.per.connection=5 # 最大5(冪等性保証範囲)
冪等プロデューサーの動作原理
==========================
Producer Broker
│ │
│── ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 0 │
│ Message: "order-1" ──────▶│ ✓ 保存
│ │
│── ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 1 │
│ Message: "order-2" ──────▶│ ✓ 保存
│ │
│── (ネットワークタイムアウトで再送) │
│ ProducerID: 1000 │
│ Epoch: 0 │
│ Sequence: 1 │
│ Message: "order-2" ──────▶│ ✗ 重複! 無視
│ │
トランザクショナルプロデューサー
// トランザクショナルプロデューサー例
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("transactional.id", "order-processor-1");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// 複数トピック/パーティションにアトミックに送信
producer.send(new ProducerRecord<>("orders", "key1", "order-created"));
producer.send(new ProducerRecord<>("inventory", "key1", "stock-reserved"));
producer.send(new ProducerRecord<>("notifications", "key1", "email-queued"));
// Consumerオフセットもトランザクションに含められる
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
3. Consumer深掘り
Consumer Groupとパーティション割り当て
Topic: orders (6 partitions)
Consumer Group: order-processor
===================================
ケース1: 3 Consumers
Consumer A: [P0] [P1]
Consumer B: [P2] [P3]
Consumer C: [P4] [P5]
ケース2: 6 Consumers(理想的)
Consumer A: [P0]
Consumer B: [P1]
Consumer C: [P2]
Consumer D: [P3]
Consumer E: [P4]
Consumer F: [P5]
ケース3: 8 Consumers(2つがアイドル)
Consumer A: [P0]
Consumer B: [P1]
Consumer C: [P2]
Consumer D: [P3]
Consumer E: [P4]
Consumer F: [P5]
Consumer G: (アイドル)
Consumer H: (アイドル)
パーティション割り当て戦略
// パーティション割り当て戦略の比較
Properties props = new Properties();
// 1. Range Assignor(デフォルト)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor");
// トピック別にパーティションを範囲で分ける
// 問題: C1に偏る可能性
// 2. RoundRobin Assignor
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// すべてのトピックのパーティションをラウンドロビンで分配
// 3. Sticky Assignor
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.StickyAssignor");
// 既存の割り当てを最大限維持 + 均等分配
// 4. CooperativeSticky Assignor(推奨)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Sticky + インクリメンタルリバランス
// 全体停止なしにパーティション再割り当て
リバランス:Eager vs Incremental
Eager Rebalancing(従来方式)
================================
1. Consumer C3が追加
2. すべてのConsumerがパーティションを解放(Stop-the-World)
C1: [] C2: [] C3: []
3. 新しい割り当てを計算
4. パーティション再割り当て
C1: [P0,P1] C2: [P2,P3] C3: [P4,P5]
→ 全体の処理が停止!
Incremental Cooperative Rebalancing(推奨)
================================
1. Consumer C3が追加
2. 第1次リバランス: 移動が必要なパーティションのみ解放
C1: [P0,P1,P2] → C1: [P0,P1] (P2解放)
C2: [P3,P4,P5] → C2: [P3,P4] (P5解放)
3. 第2次リバランス: 解放されたパーティションをC3に割り当て
C3: [] → C3: [P2,P5]
→ 残りのパーティションは中断なく処理を継続!
オフセット管理
// 自動コミット(デフォルト)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// リスク: 処理前にコミットされるとメッセージ喪失の可能性
// 手動コミット(推奨)
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// メッセージ処理
processRecord(record);
}
// 同期コミット(処理完了後)
consumer.commitSync();
// または非同期コミット(性能向上)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
}
4. Exactly-Onceセマンティクス
メッセージ配信保証レベル
At-Most-Once(最大1回)
================================
Producer ──send()──▶ Broker
(acks=0, 確認なし)
- メッセージ喪失の可能性あり
- 重複なし
- 最速
At-Least-Once(最低1回)
================================
Producer ──send()──▶ Broker ──ack──▶ Producer
(acks=all, リトライあり)
- メッセージ喪失なし
- 重複の可能性あり
- ほとんどのデフォルト設定
Exactly-Once(正確に1回)
================================
Producer ──txn──▶ Broker ──commit──▶ Consumer(read_committed)
(冪等プロデューサー + トランザクション + read_committed)
- メッセージ喪失なし
- 重複なし
- 最強だがオーバーヘッドあり
Exactly-Once実装
// Exactly-Once: Consume-Transform-Produceパターン
Properties producerProps = new Properties();
producerProps.put("transactional.id", "eos-processor-1");
producerProps.put("enable.idempotence", "true");
Properties consumerProps = new Properties();
consumerProps.put("group.id", "eos-group");
consumerProps.put("isolation.level", "read_committed"); // 核心!
consumerProps.put("enable.auto.commit", "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
consumer.subscribe(Collections.singleton("input-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
String result = transform(record.value());
producer.send(new ProducerRecord<>(
"output-topic", record.key(), result));
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords =
records.records(partition);
long lastOffset = partRecords.get(
partRecords.size() - 1).offset();
offsets.put(partition,
new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets,
consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
5. Schema Registry
シリアライゼーション形式の比較
| 特性 | Avro | Protobuf | JSON Schema |
|---|---|---|---|
| シリアライズサイズ | 小 | 小 | 大 |
| スキーマ進化 | 優秀 | 優秀 | 普通 |
| コード生成 | 任意 | 必須 | 不要 |
| 可読性 | 低(バイナリ) | 低(バイナリ) | 高(テキスト) |
| 動的型付け | サポート | 非サポート | サポート |
| Kafka互換性 | 最良 | 優秀 | 普通 |
Avroスキーマと進化
// Version 1: 初期スキーマ
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "created_at", "type": "long"}
]
}
// Version 2: Backward Compatible進化
// 新フィールドにdefault値を追加 → 既存Consumerが新データを読める
{
"type": "record",
"name": "Order",
"namespace": "com.company.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "created_at", "type": "long"},
{"name": "status", "type": "string", "default": "CREATED"},
{"name": "region", "type": ["null", "string"], "default": null}
]
}
互換性モード
BACKWARD(デフォルト、推奨)
================================
- 新スキーマで旧データを読める
- 新フィールド: default値必須
- フィールド削除: 可能
- Consumer先にアップグレード → Producer アップグレード
FORWARD
================================
- 旧スキーマで新データを読める
- 新フィールド: default値必須
- フィールド削除: default値のあるフィールドのみ
- Producer先にアップグレード → Consumerアップグレード
FULL
================================
- BACKWARD + FORWARD両方を満たす
- 最も安全だが最も制限的
NONE
================================
- 互換性チェックなし
- 危険!本番環境での使用禁止
# Schema Registry API使用
import requests
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
# スキーマ登録
schema = {
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/subjects/orders-value/versions",
json={"schema": json.dumps(schema), "schemaType": "AVRO"},
)
# 互換性チェック
response = requests.post(
f"{SCHEMA_REGISTRY_URL}/compatibility/subjects/orders-value/versions/latest",
json={"schema": json.dumps(new_schema), "schemaType": "AVRO"},
)
compatibility = response.json()
print(f"Compatible: {compatibility['is_compatible']}")
Protobufスキーマ進化
// Version 1
syntax = "proto3";
package com.company.events;
message Order {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
int64 created_at = 5;
}
// Version 2(互換性を維持)
message Order {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
int64 created_at = 5;
// 新フィールド: 既存フィールド番号の再利用禁止!
string status = 6; // 新規追加
string region = 7; // 新規追加
}
6. Apache Flink深掘り
Flinkアーキテクチャ概要
┌─────────────────────────────────────────────┐
│ Flink Cluster │
├─────────────────────────────────────────────┤
│ │
│ [JobManager] │
│ - ジョブスケジューリング │
│ - チェックポイント調整 │
│ - 障害復旧 │
│ │
│ [TaskManager 1] [TaskManager 2] │
│ - Task Slot 1 - Task Slot 1 │
│ - Task Slot 2 - Task Slot 2 │
│ - Task Slot 3 - Task Slot 3 │
│ │
├─────────────────────────────────────────────┤
│ [State Backend] │
│ - RocksDB (大規模状態) │
│ - HashMap (小規模状態) │
│ │
│ [Checkpoint Storage] │
│ - S3 / HDFS / GCS │
└─────────────────────────────────────────────┘
DataStream API
// Flink DataStream API例
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka Source
KafkaSource<Order> source = KafkaSource.<Order>builder()
.setBootstrapServers("broker1:9092")
.setTopics("orders")
.setGroupId("flink-order-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(
OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new OrderDeserializer())
.build();
DataStream<Order> orders = env.fromSource(
source, WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, ts) -> order.getCreatedAt()),
"Kafka Source"
);
// 処理パイプライン
DataStream<OrderStats> stats = orders
.filter(order -> order.getStatus().equals("COMPLETED"))
.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderStatsAggregator());
// Kafka Sink
KafkaSink<OrderStats> sink = KafkaSink.<OrderStats>builder()
.setBootstrapServers("broker1:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("order-stats")
.setValueSerializationSchema(new OrderStatsSerializer())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-order-stats")
.build();
stats.sinkTo(sink);
env.execute("Order Statistics Pipeline");
ウィンドウ(Windowing)
Tumbling Window(タンブリングウィンドウ)
================================
時間: ──|──1──2──3──|──4──5──6──|──7──8──9──|──
ウィンドウ: [ Window 1 ][ Window 2 ][ Window 3 ]
- 固定サイズ、重複なし
- 例: 5分ごとに集計
Sliding Window(スライディングウィンドウ)
================================
時間: ──|──1──2──3──4──5──6──7──8──|──
ウィンドウ: [ Window 1 ]
[ Window 2 ]
[ Window 3 ]
- 固定サイズ、スライド間隔で移動
- 例: 10分ウィンドウ、5分スライド
Session Window(セッションウィンドウ)
================================
時間: ──1─2──3─────────5─6──7─────────9──
ウィンドウ: [Session 1] [Session 2] [S3]
gap gap
- アクティビティベース、可変サイズ
- gap時間中にイベントがなければウィンドウ終了
// ウィンドウ例
// Tumbling Window
orders.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount");
// Sliding Window
orders.keyBy(Order::getRegion)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.sum("amount");
// Session Window
orders.keyBy(Order::getCustomerId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregator());
ウォーターマークと遅延データ
Event Time vs Processing Time
================================
イベント発生時間(Event Time): t=10 t=12 t=11 t=15 t=13
処理時間(Processing Time): T=20 T=21 T=22 T=23 T=24
イベントが順序通りに到着しない!
→ ウォーターマークで「ここまでのイベントはすべて到着したはず」を表示
ウォーターマーク動作
================================
イベント: [t=10] [t=12] [t=11] [t=15] [t=13]
ウォーターマーク: W(10) W(12) W(12) W(15) W(15)
(maxOutOfOrderness = 5秒の場合)
実際のウォーターマーク: W(5) W(7) W(7) W(10) W(10)
ウィンドウ [0, 10): ウォーターマークが10を超えると発火(fire)
// ウォーターマーク戦略
WatermarkStrategy<Order> strategy = WatermarkStrategy
// 最大10秒の遅延を許容
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, timestamp) -> order.getCreatedAt())
// 5分以上イベントがなければウォーターマークを進行
.withIdleness(Duration.ofMinutes(5));
// 遅延データ処理
OutputTag<Order> lateOutputTag = new OutputTag<>("late-orders") {};
SingleOutputStreamOperator<OrderStats> stats = orders
.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // 1分追加待機
.sideOutputLateData(lateOutputTag) // その後はサイドアウトプット
.aggregate(new OrderStatsAggregator());
// 遅延データを別途処理
DataStream<Order> lateOrders = stats.getSideOutput(lateOutputTag);
lateOrders.addSink(new LateOrderAlertSink());
状態管理とチェックポイント
// Keyed State例
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
private ValueState<Boolean> flagState;
private ListState<Transaction> recentTransactions;
private MapState<String, Integer> merchantCounts;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor =
new ValueStateDescriptor<>("flag", Boolean.class);
flagState = getRuntimeContext().getState(flagDescriptor);
ListStateDescriptor<Transaction> listDescriptor =
new ListStateDescriptor<>("recent-txns", Transaction.class);
recentTransactions = getRuntimeContext().getListState(listDescriptor);
MapStateDescriptor<String, Integer> mapDescriptor =
new MapStateDescriptor<>("merchant-counts", String.class, Integer.class);
merchantCounts = getRuntimeContext().getMapState(mapDescriptor);
}
@Override
public void processElement(Transaction txn, Context ctx, Collector<Alert> out) {
Boolean flag = flagState.value();
if (flag != null && txn.getAmount() > 10000) {
out.collect(new Alert(txn.getCustomerId(), "HIGH_AMOUNT_AFTER_FLAG"));
}
if (txn.getAmount() < 1.0) {
flagState.update(true);
ctx.timerService().registerEventTimeTimer(
txn.getTimestamp() + 60000);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
flagState.clear();
}
}
チェックポイント動作
================================
[Source] ──▶ [Map] ──▶ [KeyBy] ──▶ [Window] ──▶ [Sink]
1. JobManager: チェックポイントバリアを注入
Source ──▶ |barrier| ──▶ Map ──▶ KeyBy ──▶ Window ──▶ Sink
2. バリアがオペレーターを通過する時に状態スナップショット
Source(snapshot) ──▶ Map(snapshot) ──▶ ...
3. すべてのオペレータースナップショット完了 → チェックポイント成功
4. 障害発生時: 最後の成功チェックポイントから復元
// チェックポイント設定
env.enableCheckpointing(60000); // 60秒間隔
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// State Backend設定
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/flink/");
7. Kafka Streams vs Apache Flink
比較表
| 特性 | Kafka Streams | Apache Flink |
|---|---|---|
| デプロイ方式 | ライブラリ(JVMアプリに組み込み) | 分散クラスター |
| 入力ソース | Kafkaのみ | Kafka, Kinesis, ファイルなど |
| Exactly-Once | Kafka内部のみ | 外部システムを含む |
| ウィンドウ | Tumbling, Sliding, Session | + Global, Custom |
| 状態管理 | RocksDB(ローカル) | RocksDB + 分散スナップショット |
| SQLサポート | KSQL(別途) | Flink SQL(内蔵) |
| スループット | 中-大 | 大-超大 |
| 運用複雑度 | 低(アプリデプロイ) | 高(クラスター管理) |
| 適合するケース | Kafkaエコシステム内の処理 | 複雑なストリーム処理 |
選択基準
use_kafka_streams_when:
- "KafkaからKafkaへの単純な処理"
- "別途クラスター運用が負担の場合"
- "マイクロサービス内部に組み込み"
- "中程度のスループットで十分な場合"
- "チームがKafkaエコシステムに精通している場合"
use_flink_when:
- "複雑なイベント処理 (CEP)"
- "多様なソース/シンクの接続"
- "大規模な状態管理が必要"
- "高度なウィンドウ関数が必要"
- "SQLベースのストリーム処理"
- "超大規模スループットが必要"
8. Kafka Connect
Source/Sinkコネクター
// Debezium MySQL Source Connector
{
"name": "mysql-source-orders",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-primary",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz-password",
"database.server.id": "1",
"topic.prefix": "cdc",
"database.include.list": "order_service",
"table.include.list": "order_service.orders,order_service.order_items",
"schema.history.internal.kafka.bootstrap.servers": "broker:9092",
"schema.history.internal.kafka.topic": "schema-changes.orders"
}
}
Debezium CDCフロー
================================
[MySQL] ──binlog──▶ [Debezium Connector] ──▶ [Kafka Topic]
│
┌───────────────┤
▼ ▼
[Flink Job] [ES Sink Connector]
│ │
▼ ▼
[Kafka Topic] [Elasticsearch]
CDCイベント構造:
- before: 変更前のレコード (UPDATE/DELETE)
- after: 変更後のレコード (INSERT/UPDATE)
- source: ソースDBメタデータ
- op: 操作タイプ (c=create, u=update, d=delete, r=read)
9. パフォーマンスチューニング
Producerチューニング
# Producer性能最適化
batch.size=65536 # 64KB(デフォルト16KB)
linger.ms=10 # 10ms待機(バッチ効率)
buffer.memory=67108864 # 64MB
compression.type=lz4 # 圧縮(ネットワーク/ディスク節約)
acks=all # 信頼性
max.in.flight.requests.per.connection=5 # パイプライニング
Consumerチューニング
# Consumer性能最適化
fetch.min.bytes=1048576 # 1MB(バッチ効率)
fetch.max.wait.ms=500 # 最大500ms待機
max.poll.records=500 # 一度に取得するレコード数
max.partition.fetch.bytes=1048576 # パーティションあたり1MB
session.timeout.ms=30000 # セッションタイムアウト
heartbeat.interval.ms=10000 # ハートビート間隔
max.poll.interval.ms=300000 # ポール間隔最大5分
Brokerチューニング
# Broker性能最適化
num.partitions=12 # デフォルトパーティション数
default.replication.factor=3 # レプリケーションファクター
num.io.threads=8 # I/Oスレッド
num.network.threads=3 # ネットワークスレッド
num.replica.fetchers=4 # レプリカフェッチャー
log.segment.bytes=1073741824 # 1GBセグメント
log.retention.hours=168 # 7日保持
パーティション数の決定
パーティション数の決定式
================================
必要スループット: 100MB/s
単一パーティションスループット: ~10MB/s (Producer)
~5MB/s (Consumer)
Producer観点: 100MB/s / 10MB/s = 10パーティション
Consumer観点: 100MB/s / 5MB/s = 20パーティション
→ 最低20パーティション必要
追加考慮事項:
- Consumerインスタンス数に合わせる(パーティション数 >= Consumer数)
- パーティション数の増加は可能だが減少は不可
- 多すぎるパーティション: メタデータオーバーヘッド、リバランス遅延
- 推奨: トピックあたり6-50個
10. モニタリング
JMXメトリクス
# 主要Kafkaメトリクス
broker_metrics:
- name: "UnderReplicatedPartitions"
description: "ISRから外れたパーティション数"
alert_threshold: "> 0"
severity: critical
- name: "ActiveControllerCount"
description: "アクティブコントローラー数"
alert_threshold: "!= 1"
severity: critical
- name: "OfflinePartitionsCount"
description: "オフラインパーティション数"
alert_threshold: "> 0"
severity: critical
consumer_metrics:
- name: "records-lag-max"
description: "最大Consumer Lag"
alert_threshold: "> 10000"
severity: warning
- name: "records-consumed-rate"
description: "秒あたり消費レコード数"
- name: "commit-latency-avg"
description: "平均コミットレイテンシ"
Consumer Lagモニタリング
Consumer Lag計算
================================
Partition 0:
Log End Offset (LEO): 1000 (最新メッセージ)
Consumer Offset: 850 (最後のコミット)
Lag: 1000 - 850 = 150
Partition 1:
LEO: 2000
Consumer Offset: 1950
Lag: 50
Total Lag = 150 + 50 = 200 messages
# Burrow (LinkedInのConsumer Lagモニタリング)
burrow_config:
general:
pidfile: "/var/run/burrow.pid"
stdout-logfile: "/var/log/burrow.log"
cluster:
kafka-prod:
class-name: kafka
servers: ["broker1:9092", "broker2:9092", "broker3:9092"]
topic-refresh: 120
offset-refresh: 30
notifier:
slack:
class-name: http
url-open: "https://hooks.slack.com/services/xxx/yyy/zzz"
send-close: true
11. クイズ
Q1. Kafkaで同じキーのメッセージが常に同じパーティションに入る理由は?
デフォルトパーティショナーがmurmur2ハッシュ関数を使用して、キーのハッシュ値をパーティション数で割った余りでパーティションを決定するためです。
partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
同じキーは常に同じハッシュ値を生成するため、同じパーティションに割り当てられます。これにより同じキーのメッセージの順序が保証されます。
ただし、パーティション数が変更されるとキーとパーティションのマッピングが変わる可能性があります。
Q2. Exactly-Onceセマンティクスを達成するために必要な3つの要素は?
-
冪等プロデューサー(Idempotent Producer):
enable.idempotence=trueでProducerIDとSequence Numberを使用して重複送信を防止。 -
トランザクショナルプロデューサー(Transactional Producer):
transactional.idを設定し、beginTransaction()、commitTransaction()で複数メッセージをアトミックに送信。 -
read_committed分離レベル: Consumerで
isolation.level=read_committedを設定し、コミットされたトランザクションのメッセージのみを読み取り。
この3つが結合されてConsume-Transform-Produceパターンで正確に1回の処理を保証します。
Q3. AvroのBACKWARD互換性で新フィールドを追加する際に必ず必要なものは?
default値が必ず必要です。
BACKWARD互換性は新スキーマ(v2)で旧データ(v1で書かれた)を読めなければなりません。旧データには新フィールドがないため、新フィールドにdefault値がないとデシリアライズ時にエラーが発生します。
例: "name": "status", "type": "string", "default": "CREATED"
オプショナルフィールドは["null", "string"]のunion型に"default": nullを使用します。
Q4. Flinkのウォーターマークが解決する問題は何ですか?
ウォーターマークはイベント時間(Event Time)ベースの処理における遅延データ問題を解決します。
分散システムでは、イベントが発生順序と異なる順序で到着することがあります。ウォーターマークは、特定の時点までのイベントがすべて到着したであろうという推定を提供します。
W(t) = 「時間t以前のすべてのイベントが到着したと推定」
ウォーターマークがウィンドウの終了時間を超えると、そのウィンドウの計算結果を出力します。allowedLatenessを設定すると、ウォーターマーク後も遅延データを一定時間受け入れることができます。
Q5. Kafka StreamsとApache Flinkのどちらを選ぶべき基準は?
Kafka Streams選択基準:
- KafkaからKafkaへの単純な処理
- 別途クラスター運用が負担の場合
- マイクロサービスに組み込む場合
- チームがKafkaエコシステムに精通している場合
Apache Flink選択基準:
- 複雑なイベント処理(CEP)が必要な場合
- Kafka以外の多様なソース/シンク接続が必要な場合
- 大規模な状態管理が必要な場合
- SQLベースのストリーム処理が必要な場合
- 超大規模スループットが必要な場合
核心的な違い: Kafka Streamsはライブラリ(JVMアプリに組み込み)、Flinkは分散クラスターです。
参考資料
- Apache Kafka Documentation. (2025). https://kafka.apache.org/documentation/
- Narkhede, N., Shapira, G., Palino, T. (2021). Kafka: The Definitive Guide, 2nd ed. O'Reilly Media.
- Apache Flink Documentation. (2025). https://flink.apache.org/docs/
- Confluent Schema Registry Documentation. (2025). https://docs.confluent.io/platform/current/schema-registry/
- Hueske, F., Kalavri, V. (2019). Stream Processing with Apache Flink. O'Reilly Media.
- KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500
- Debezium Documentation. (2025). https://debezium.io/documentation/
- Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
- Confluent Blog. (2024). "Exactly-Once Semantics in Apache Kafka."
- Apache Avro Specification. (2025). https://avro.apache.org/docs/
- Burrow - Kafka Consumer Lag Checking. https://github.com/linkedin/Burrow
- Confluent Blog. (2025). "Kafka Performance Tuning Best Practices."
- Flink Forward Conference Talks. (2024). https://www.flink-forward.org/
- KIP-848: The Next Generation Consumer Rebalance Protocol. Apache Kafka.
현재 단락 (1/822)
Event Streamingは現代(げんだい)の分散(ぶんさん)システムの核心(かくしん)インフラです。Apache Kafkaを中心としたイベントストリーミングエコシステムは、単純(たんじゅん)な...