TL;DR
- CDC = データ同期のゲームチェンジャー: ポーリングの代わりに変更イベントストリーム。リアルタイムかつ効率的
- Debezium が事実上の標準: PostgreSQL, MySQL, MongoDB, SQL Server などをサポート
- WAL/Binlog ベース: DB 内部ログを読んで変更を捕捉。非侵襲的
- Outbox Pattern: dual-write 問題を解決。マイクロサービスイベントの標準
- ユースケース: DW 同期、検索インデックス更新、キャッシュ無効化、イベント駆動アーキテクチャ
1. CDC が必要な理由
1.1 従来のデータ同期
ポーリング:
while True:
new_records = db.query("SELECT * FROM users WHERE updated_at > ?", last_sync)
sync_to_warehouse(new_records)
last_sync = now()
sleep(60)
問題点:
- 遅延: 1分単位、リアルタイムではない
- 負荷: 毎回 DB にクエリ
- DELETE 検知不可:
WHERE updated_at > ?では削除は捕捉できない - 漏れ: トランザクションの時間差で抜ける
- スケール不可: データ増加でクエリが遅くなる
1.2 CDC が約束するもの
[DB] -> [WAL/Binlog] -> [CDC tool] -> [Kafka] -> [Consumers]
^
リアルタイム、非侵襲的
利点:
- リアルタイム: ミリ秒単位の latency
- DB 負荷ゼロ: ログを読むだけ
- 全ての変更: INSERT/UPDATE/DELETE を捕捉
- 順序保証: トランザクション順
- スケーラブル: データ量に依存しない
1.3 ユースケース
| ユースケース | 説明 |
|---|---|
| データウェアハウス同期 | OLTP から Snowflake/BigQuery へリアルタイム |
| 検索インデックス更新 | DB 変更を Elasticsearch に即反映 |
| キャッシュ無効化 | DB 更新で Redis キャッシュを無効化 |
| イベント駆動 | DB 変更をドメインイベントに |
| マイクロサービス同期 | あるサービスのデータを他が参照 |
| DB マイグレーション | 無停止での旧から新 DB への移行 |
| 監査ログ | 全ての変更履歴を保持 |
2. CDC の動作原理
2.1 PostgreSQL の WAL
WAL (Write-Ahead Log): 全ての変更をディスクに記録するログ。障害復旧の基盤。
INSERT INTO users (name) VALUES ('Alice');
-> [WAL]: tx_id=123, op=INSERT, table=users, data={name:'Alice'}
-> [Disk page write]
CDC はこの WAL を読んで変更を捕捉する。
2.2 Logical vs Physical Replication
Physical Replication (Streaming):
- ディスクページ単位の複製
- Standby は Primary と byte-identical
- 利点: 高速、正確
- 欠点: 同一バージョン、同一スキーマ必須
Logical Replication:
- 論理的な変更 (INSERT/UPDATE/DELETE) を複製
- 異なるスキーマ・バージョンも可
- 利点: 柔軟、CDC に適する
- 欠点: 若干のオーバーヘッド
2.3 PostgreSQL Logical Replication 設定
# postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
-- ユーザ作成
CREATE USER cdc_user REPLICATION LOGIN PASSWORD 'secret';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
-- Publication 作成 (どのテーブルを複製するか)
CREATE PUBLICATION my_pub FOR TABLE users, orders;
-- 全テーブル対象
CREATE PUBLICATION my_pub FOR ALL TABLES;
-- Replication slot 作成 (変更を保存する場所)
SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');
2.4 MySQL の Binlog
MySQL は Binlog を使う:
# my.cnf
log_bin = mysql-bin
binlog_format = ROW # CDC には ROW 必須
binlog_row_image = FULL
server_id = 1
expire_logs_days = 7
binlog_format:
STATEMENT: SQL 文自体 (再生時に非決定的)ROW: 行単位の変更 (CDC に適する)MIXED: 両方
2.5 MongoDB Change Streams
const changeStream = db.collection('users').watch()
changeStream.on('change', (change) => {
console.log(change)
// { operationType: 'insert', fullDocument: {...}, ... }
})
MongoDB は oplog ベース。
3. Debezium — CDC の標準
3.1 Debezium とは
Debezium = オープンソースの CDC プラットフォーム。Red Hat 発。
アーキテクチャ:
[PostgreSQL] -> [Debezium connector] -> [Kafka Connect] -> [Kafka topics] -> [Consumers]
主要機能:
- 幅広い DB 対応 (PG, MySQL, MongoDB, Oracle, SQL Server, Cassandra)
- Kafka Connect 上に構築
- Exactly-once 配信 (Kafka の保証)
- Schema 変更検知
- フィルタ、変換
3.2 Debezium インストール (Docker Compose)
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
connect:
image: debezium/connect:2.5
depends_on: [kafka, postgres]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
postgres:
image: debezium/example-postgres:2.5
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
3.3 PostgreSQL Connector 登録
curl -X POST http://connect:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "cdc_user",
"database.password": "secret",
"database.dbname": "mydb",
"database.server.name": "mydb",
"table.include.list": "public.users,public.orders",
"plugin.name": "pgoutput",
"publication.name": "my_pub",
"slot.name": "my_slot"
}
}'
これで Debezium が PostgreSQL の変更を自動的に Kafka に発行する。
3.4 Kafka トピック構造
Debezium は テーブル毎にトピック を作成:
mydb.public.users <- users テーブルの変更
mydb.public.orders <- orders テーブルの変更
メッセージ形式:
{
"before": null,
"after": {
"id": 123,
"name": "Alice",
"email": "alice@example.com"
},
"source": {
"version": "2.5",
"connector": "postgresql",
"name": "mydb",
"ts_ms": 1681545600000,
"snapshot": "false",
"db": "mydb",
"schema": "public",
"table": "users",
"txId": 12345,
"lsn": 23456789
},
"op": "c",
"ts_ms": 1681545600000
}
op の値: c=create, u=update, d=delete, r=read (Snapshot)。
3.5 Kafka Consumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'mydb.public.users',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
op = event['op']
if op == 'c':
print(f"new user: {event['after']}")
elif op == 'u':
print(f"update: {event['before']} -> {event['after']}")
elif op == 'd':
print(f"delete: {event['before']}")
4. Outbox Pattern — Dual-Write 問題の解決
4.1 Dual-Write 問題
シナリオ: 注文作成時に DB 保存 + Kafka イベント発行。
def create_order(order):
db.save(order) # 1
kafka.send("orders", order) # 2
問題: 1 と 2 の間でクラッシュしたら?
- 1 のみ成功: DB にはあるがイベント未送信 → 下流で不整合
- 2 のみ成功: イベントは送ったが DB に無い → ゴーストイベント
dual-write 問題: 2つのシステムに対する原子的書き込みは不可能。
4.2 Outbox Pattern
要点: DB トランザクション内で outbox テーブルにもイベントを保存する。
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255),
aggregate_id VARCHAR(255),
event_type VARCHAR(255),
payload JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
def create_order(order):
with db.transaction():
db.save(order)
db.save(OutboxEvent(
aggregate_type='Order',
aggregate_id=order.id,
event_type='OrderCreated',
payload=order.to_json()
))
原子性保証: 2つの INSERT が同一トランザクション、共に成功か失敗。
4.3 Outbox -> Kafka
方法 1: Polling Publisher (シンプル)
while True:
events = db.query("""
SELECT * FROM outbox
WHERE published = false
ORDER BY created_at
LIMIT 100
""")
for event in events:
kafka.send(event.aggregate_type, event.payload)
db.execute("UPDATE outbox SET published = true WHERE id = ?", event.id)
sleep(1)
欠点: ポーリングのオーバーヘッド、多少の遅延。
方法 2: CDC (推奨)
Debezium が outbox テーブルを監視し、自動で Kafka に発行。
curl -X POST http://connect:8083/connectors \
-d '{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.payload": "payload"
}
}'
効果: ポーリングコード不要、ほぼリアルタイム。
5. データウェアハウス同期
5.1 シナリオ
OLTP (PostgreSQL) -> DW (Snowflake/BigQuery) のリアルタイム同期。
5.2 従来 ETL の限界
[Extract] -> [Transform] -> [Load]
(バッチ・日次) (遅い) (大容量)
問題: 24時間遅延、DB への大負荷、全データ再処理。
5.3 CDC ベースの ELT
[PostgreSQL] -> [Debezium] -> [Kafka] -> [Stream Processor] -> [Snowflake]
利点: リアルタイム (秒単位)、DB 負荷最小 (WAL だけ読む)、変更のみ処理。
5.4 ツール
| ツール | 特徴 |
|---|---|
| Debezium + Kafka Connect | OSS、セルフホスト |
| Fivetran | マネージド、高価、簡単 |
| Airbyte | OSS + マネージド |
| Stitch | シンプル、マネージド |
| AWS DMS | AWS 統合 |
| Striim | エンタープライズ |
| Hevo | ノーコード |
5.5 Schema Evolution
問題: ソース DB のカラム追加/削除にどう対応するか。
Debezium の挙動:
- 新カラム: 自動検知、イベントに含める
- カラム削除: イベント発行 (DELETE スタイル)
- 型変更: 新スキーマで発行
Schema Registry (Confluent): 全スキーマバージョン保存、互換性検証、コンシューマが安全に読める。
6. 検索インデックス同期
6.1 シナリオ
PostgreSQL -> Elasticsearch のリアルタイムインデックス更新。
6.2 アーキテクチャ
[PostgreSQL] -> [Debezium] -> [Kafka] -> [Elasticsearch Sink Connector] -> [Elasticsearch]
6.3 Elasticsearch Sink Connector
curl -X POST http://connect:8083/connectors \
-d '{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "mydb.public.products",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.malformed.documents": "warn"
}
}'
6.4 効果
- 新商品追加 -> 1-2秒で検索可能
- 価格更新 -> 検索結果が自動更新
- 削除 -> インデックスから即時除去
従来のバッチ reindex は時間単位・数時間かかる。CDC ならリアルタイム。
7. キャッシュ無効化
7.1 問題
def get_user(user_id):
user = cache.get(f"user:{user_id}")
if user is None:
user = db.query(...)
cache.set(f"user:{user_id}", user, ttl=3600)
return user
def update_user(user_id, data):
db.execute("UPDATE users SET ... WHERE id=?", user_id)
cache.delete(f"user:{user_id}") # 忘れたら? 他サービスが更新したら?
解決策: CDC による自動無効化。
7.2 CDC ベース
consumer = KafkaConsumer('mydb.public.users')
for message in consumer:
event = message.value
user_id = event['after']['id'] if event['op'] != 'd' else event['before']['id']
cache.delete(f"user:{user_id}")
利点: コード変更不要、全変更を自動捕捉、中央管理。
8. 無停止マイグレーション
8.1 シナリオ
レガシー MySQL -> 新規 PostgreSQL へのマイグレーション。
8.2 手順
Step 1: CDC 設定
[MySQL (現行)] -> [Debezium] -> [Kafka] -> [PostgreSQL (新)]
Step 2: 初期 Snapshot — Debezium が MySQL の全データを PG に一括コピー。
Step 3: リアルタイム同期 — 以降の変更のみ PG に反映、両 DB がほぼリアルタイムで一致。
Step 4: 読み込みトラフィック切替 — 一部読み込みを PG へ (検証)、徐々に100%へ。
Step 5: 書き込みトラフィック切替 — CDC 方向反転 (PG -> MySQL) でロールバック対応、書き込みを PG に、期間経過後 MySQL 廃止。
8.3 利点
- 無停止: ユーザへの影響なし
- ロールバック可能: 問題時は即復帰
- 段階的: リスク分散
- 検証: 両 DB 比較で整合性確認
9. よくある落とし穴と対策
9.1 Replication Slot の蓄積
使われない PostgreSQL replication slot は WAL が無限に蓄積 してディスクが溢れる。
-- アクティブなスロット確認
SELECT slot_name, active, restart_lsn
FROM pg_replication_slots;
-- 使ってないスロット削除
SELECT pg_drop_replication_slot('unused_slot');
監視必須: WAL サイズのアラート。
9.2 大規模トランザクション
数百万行を1トランザクションで処理すると Debezium のメモリが爆発。
対策: トランザクション分割、max.queue.size チューニング、Heartbeat 活用。
9.3 Schema 変更
ALTER TABLE が CDC を壊すことがある。
予防: 追加のみ (additive)、DDL を小さな単位で、Debezium の schema history を使う。
9.4 Exactly-Once Delivery
Debezium は at-least-once (重複あり得る)。
対策: コンシューマ側の冪等性確保、Kafka のトランザクション API、メッセージ ID 追跡。
9.5 モニタリング
# Connector 状態
curl http://connect:8083/connectors/postgres-connector/status
# メトリクス (JMX)
debezium_metrics_QueueRemainingCapacity
debezium_metrics_NumberOfEventsFiltered
主要指標: Lag (ソース DB との差分)、Queue size、Error rate。
10. CDC vs 代替
10.1 CDC vs Polling
| CDC | Polling | |
|---|---|---|
| 遅延 | リアルタイム (ms) | 分/時間 |
| DB 負荷 | 非常に低い | 高い |
| DELETE 検知 | あり | なし |
| 複雑度 | 高 | 低 |
| インフラ | Kafka 等が必要 | 不要 |
10.2 CDC vs Triggers
Triggers: DB トリガで別テーブルに書く。シンプルだが DB 性能に影響、トランザクション負荷。
CDC: 外部ツールが WAL を読む。非侵襲的だが別インフラが必要。
10.3 CDC vs アプリケーションイベント
App-level: コードが直接イベント発行。ビジネスロジックと統合できるが dual-write 問題 (Outbox で解決)。
CDC: DB から直接。安全・完全だが、データモデルレベルのみ。
組み合わせ: Outbox + CDC = 両方の良いとこ取り。
クイズ
1. Logical Replication と Physical Replication の違い?
答え: Physical Replication はディスクページ単位の複製で、Standby は Primary と byte-identical。高速だが同一バージョン・同一スキーマ必須、主に HA 用途。Logical Replication は論理変更 (INSERT/UPDATE/DELETE) の複製で、異なるスキーマ・バージョン可能。CDC に適し、PostgreSQL 10+ でサポート。CDC は logical replication ベースで、wal_level = logical で有効化。
2. Outbox Pattern はどのように dual-write 問題を解決するか?
答え: DB トランザクション内で outbox テーブルにもイベントを一緒に保存する。2つの INSERT が同じトランザクション → 原子的に一緒に成功または失敗。その後、別プロセス (Polling または CDC) が outbox から Kafka へイベントを転送。結果: DB とメッセージブローカ間の原子性を保証。Debezium + Outbox Event Router transformation が標準実装。マイクロサービスでほぼ必須のパターン。
3. Debezium が事実上の標準になった理由?
答え: (1) 広い DB サポート — PostgreSQL, MySQL, MongoDB, Oracle, SQL Server, Cassandra、(2) Kafka Connect 上に構築 — 実績あるインフラ、(3) オープンソース — 無料、(4) 機能充実 — Schema 変更検知、フィルタ、transformation、(5) Red Hat バックアップ — 安定した開発。代替 (Maxwell, Canal) は特定 DB のみ、または機能不足。クラウドマネージド (Fivetran, Airbyte) は高いが運用負担は少ない。
4. CDC ベースのキャッシュ無効化の利点?
答え: (1) 自動 — cache.delete() を忘れない、(2) 完全 — DB の全変更 (他サービスも含む) を捕捉、(3) 中央管理 — 無効化ロジックが一箇所、(4) 信頼性 — DB トランザクション commit 時点で発火。欠点: 若干の遅延 (通常1秒以内)、別インフラ必要。複数のマイクロサービスが同じ DB を共有 する場合に特に有効。
5. 無停止 DB マイグレーションにおける CDC の役割?
答え: (1) 初期 Snapshot: Debezium が全データを新 DB に一括コピー。(2) リアルタイム同期: WAL を読んで新 DB に即反映、両 DB がほぼリアルタイムに一致。(3) 段階的トラフィック切替: 読み込みを徐々に新 DB へ、次に書き込み。(4) ロールバック可能: 問題時に CDC 方向反転。無停止 で数ヶ月かけた安全なマイグレーションが可能。Stripe や Airbnb が PostgreSQL マイグレーションでこの方式を採用。
参考資料
- Debezium — 公式ドキュメント
- PostgreSQL Logical Replication
- Kafka Connect
- Debezium Outbox Pattern
- Designing Data-Intensive Applications — Martin Kleppmann
- Confluent CDC
- Stripe's Live Migration
- GitHub Engineering: MySQL Schema Changes
- Airbyte — Debezium 代替
- Fivetran — マネージド CDC
- Maxwell's Daemon — MySQL 専用 CDC
현재 단락 (1/357)
- **CDC = データ同期のゲームチェンジャー**: ポーリングの代わりに変更イベントストリーム。リアルタイムかつ効率的