Skip to content

필사 모드: Outboxパターンと CDCで実現するマイクロサービスデータ同期:Debezium実践ガイド

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

はじめに:Dual Write問題

マイクロサービスアーキテクチャで最も頻繁に直面する問題の一つは、**データベースとメッセージブローカーに同時にデータを書き込まなければならない状況**です。注文サービスが注文を作成すると同時に、在庫サービスにイベントを発行しなければならないケースを考えてみましょう。単純に実装すると、次のようなコードになります。

// 危険な Dual Write パターン

@Transactional

public Order createOrder(OrderRequest request) {

Order order = orderRepository.save(new Order(request)); // Step 1: DB保存

// Step 2: メッセージブローカーにイベント発行

kafkaTemplate.send("order-events", new OrderCreatedEvent(order));

return order;

}

このコードは致命的な問題を抱えています。**DB保存は成功したがKafka発行が失敗**した場合、注文は存在するのに他のサービスはそれを知ることができません。逆に**Kafka発行は成功したがDBトランザクションがロールバック**された場合、存在しない注文に対するイベントが伝播されます。これがまさに**Dual Write問題**です。

分散トランザクション(2PC)で解決することもできますが、Kafkaは従来のXAトランザクションに参加できず、2PC自体が性能ボトルネックと可用性低下を引き起こします。この問題を根本的に解決するパターンが**Outboxパターン**であり、これを効率的に実現する技術が**CDC(Change Data Capture)**です。

1. Outboxパターンアーキテクチャ

1.1 核心アイデア

Outboxパターンの核心はシンプルです。**イベントをメッセージブローカーに直接発行するのではなく、ビジネスデータと同一トランザクションでOutboxテーブルに記録する**ということです。こうすることで、DBのACIDトランザクションが両方の操作のアトミック性を保証します。その後、別のプロセスがOutboxテーブルを読み取ってメッセージブローカーに転送します。

[サービスコード]

├── ビジネスデータ INSERT ─────┐

│ │ 同一DBトランザクション

└── Outboxテーブル INSERT ───────┘

[Outbox Relay]

(CDC or Polling)

[メッセージブローカー]

(Kafka, RabbitMQ)

[他のマイクロサービス]

この方式の利点は明確です。ビジネスデータとイベントが一つのトランザクションにまとめられるため、両方とも保存されるか、両方とも保存されないかのどちらかになります。最低1回(at-least-once)の配信が保証され、コンシューマ側で冪等性を実装すれば、実質的に1回(effectively-once)のセマンティクスも実現できます。

1.2 Outboxパターンの2つの実装方式

Outboxテーブルの変更内容をメッセージブローカーに転送する方式は、大きく2つあります。

1. **Polling方式**:定期的にOutboxテーブルをクエリして未発行のイベントを取得します

2. **CDC方式**:データベースのトランザクションログ(WAL/binlog)を監視して変更内容をキャプチャします

2. Outboxテーブル設計

2.1 基本スキーマ

-- PostgreSQL Outboxテーブル DDL

CREATE TABLE outbox_events (

id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

aggregate_type VARCHAR(255) NOT NULL, -- イベント所属ドメイン(例:'Order')

aggregate_id VARCHAR(255) NOT NULL, -- ドメインエンティティID(例:注文ID)

event_type VARCHAR(255) NOT NULL, -- イベントタイプ(例:'OrderCreated')

payload JSONB NOT NULL, -- イベント本文

metadata JSONB DEFAULT '{}', -- 追加メタデータ

created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),

published_at TIMESTAMP WITH TIME ZONE,-- Polling方式用:発行完了時刻

retry_count INT DEFAULT 0, -- Polling方式用:リトライ回数

status VARCHAR(20) DEFAULT 'PENDING' -- PENDING, PUBLISHED, FAILED

);

-- Polling方式で未発行イベント検索用のインデックス

CREATE INDEX idx_outbox_status_created ON outbox_events(status, created_at)

WHERE status = 'PENDING';

-- CDC方式ではstatus/published_atカラムは不要であり、

-- イベント発行後に行を削除する方式を使用できます

2.2 設計時の重要な考慮事項

**aggregate_idをKafkaパーティションキーとして使用**:同じaggregate_idを持つイベントが同じパーティションに入るため、そのエンティティに対するイベントの順序が保証されます。例えば、注文123に対する'OrderCreated' -> 'OrderPaid' -> 'OrderShipped'イベントが順序通りに処理されます。

**payloadに必要な情報をすべて含める**:コンシューマがイベントだけで処理を完了できるよう、payloadに十分な情報を含めます。コンシューマがプロデューサのDBを直接参照することは、サービス間の結合度を高めるため避けるべきです。

**テーブルの肥大化防止**:CDC方式では、イベントをキャプチャした後、定期的に古い行を削除します。Polling方式では、発行完了したイベントを一定期間後にアーカイブまたは削除します。

2.3 MySQL用トリガーベースOutbox(代替案)

-- MySQLでトリガーを利用した自動Outbox記録

DELIMITER //

CREATE TRIGGER after_order_insert

AFTER INSERT ON orders

FOR EACH ROW

BEGIN

INSERT INTO outbox_events (

id, aggregate_type, aggregate_id, event_type, payload, created_at

) VALUES (

UUID(),

'Order',

NEW.order_id,

'OrderCreated',

JSON_OBJECT(

'orderId', NEW.order_id,

'userId', NEW.user_id,

'totalAmount', NEW.total_amount,

'status', NEW.status,

'createdAt', NEW.created_at

),

NOW()

);

END //

CREATE TRIGGER after_order_update

AFTER UPDATE ON orders

FOR EACH ROW

BEGIN

IF OLD.status != NEW.status THEN

INSERT INTO outbox_events (

id, aggregate_type, aggregate_id, event_type, payload, created_at

) VALUES (

UUID(),

'Order',

NEW.order_id,

CONCAT('OrderStatus', NEW.status),

JSON_OBJECT(

'orderId', NEW.order_id,

'previousStatus', OLD.status,

'newStatus', NEW.status,

'updatedAt', NEW.updated_at

),

NOW()

);

END IF;

END //

DELIMITER ;

> トリガーベースのアプローチはアプリケーションコードを修正しなくてもよいという利点がありますが、トリガーの実行コストがメイントランザクションに含まれるため、性能に影響を与える可能性があります。また、複雑なビジネスロジックをトリガーに入れることは難しく、デバッグも困難であるため、ほとんどの場合はアプリケーションレベルで明示的にOutbox行を挿入することを推奨します。

3. CDCの概念と動作原理

3.1 CDCとは何か

CDC(Change Data Capture)は、データベースの変更内容(INSERT、UPDATE、DELETE)をリアルタイムでキャプチャして外部システムに転送する技術です。CDCの核心は**データベースのトランザクションログを直接読み取ること**です。

- **PostgreSQL**:WAL(Write-Ahead Log)の論理レプリケーション(logical replication)スロットを使用

- **MySQL**:binlog(binary log)を読み取って変更内容をキャプチャ

- **SQL Server**:CDC機能が内蔵されており、変更テーブルを自動生成

- **MongoDB**:Change Streamsを通じてoplogの変更内容をストリーミング

3.2 CDCの利点

ログベースCDCはPolling方式と比較して、いくつかの利点があります。

1. **極めて低いレイテンシ**:トランザクションコミット後、ミリ秒単位で変更内容をキャプチャ

2. **DB負荷の最小化**:インデックスベースのクエリではなくログストリームを読み取るため、DBへの追加負荷がほとんどない

3. **DELETEのキャプチャが可能**:Polling方式では削除された行を検知することが困難ですが、ログにはDELETEイベントも記録される

4. **すべての変更を捕捉**:中間状態の変更も見逃さない(Pollingはインターバル間の変更を見逃す可能性がある)

5. **スキーマ変更の追跡**:テーブル構造の変更もログに記録されるため、スキーマの進化を処理可能

4. Debeziumのインストールとコネクタ構成

4.1 Debeziumの概要

DebeziumはRed Hatが主導するオープンソースCDCプラットフォームで、Kafka Connectフレームワーク上で動作します。PostgreSQL、MySQL、MongoDB、SQL Server、Oracle、Cassandra、Vitessなど多様なデータベースをサポートしており、Outbox Event Router SMT(Single Message Transformation)を通じてOutboxパターンをネイティブにサポートしています。

4.2 Docker Composeでフルスタック構成

docker-compose.yml - Debezium + Kafka フルスタック

version: '3.8'

services:

postgres:

image: postgres:16

environment:

POSTGRES_DB: orderdb

POSTGRES_USER: appuser

POSTGRES_PASSWORD: secret

command:

- 'postgres'

- '-c'

- 'wal_level=logical' # CDCに必要な設定

- '-c'

- 'max_replication_slots=4'

- '-c'

- 'max_wal_senders=4'

ports:

- '5432:5432'

volumes:

- postgres_data:/var/lib/postgresql/data

zookeeper:

image: confluentinc/cp-zookeeper:7.6.0

environment:

ZOOKEEPER_CLIENT_PORT: 2181

ZOOKEEPER_TICK_TIME: 2000

kafka:

image: confluentinc/cp-kafka:7.6.0

depends_on:

- zookeeper

ports:

- '9092:9092'

environment:

KAFKA_BROKER_ID: 1

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

connect:

image: debezium/connect:2.7

depends_on:

- kafka

- postgres

ports:

- '8083:8083'

environment:

BOOTSTRAP_SERVERS: kafka:29092

GROUP_ID: debezium-connect-group

CONFIG_STORAGE_TOPIC: connect-configs

OFFSET_STORAGE_TOPIC: connect-offsets

STATUS_STORAGE_TOPIC: connect-status

CONFIG_STORAGE_REPLICATION_FACTOR: 1

OFFSET_STORAGE_REPLICATION_FACTOR: 1

STATUS_STORAGE_REPLICATION_FACTOR: 1

kafka-ui:

image: provectuslabs/kafka-ui:latest

ports:

- '8080:8080'

environment:

KAFKA_CLUSTERS_0_NAME: local

KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium

KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083

volumes:

postgres_data:

4.3 Debeziumコネクタ登録(Outbox Event Router含む)

{

"name": "order-outbox-connector",

"config": {

"connector.class": "io.debezium.connector.postgresql.PostgresConnector",

"database.hostname": "postgres",

"database.port": "5432",

"database.user": "appuser",

"database.password": "secret",

"database.dbname": "orderdb",

"topic.prefix": "order-service",

"schema.include.list": "public",

"table.include.list": "public.outbox_events",

"tombstones.on.delete": "false",

"slot.name": "order_outbox_slot",

"plugin.name": "pgoutput",

"transforms": "outbox",

"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",

"transforms.outbox.table.fields.additional.placement": "event_type:header:eventType",

"transforms.outbox.table.field.event.id": "id",

"transforms.outbox.table.field.event.key": "aggregate_id",

"transforms.outbox.table.field.event.payload": "payload",

"transforms.outbox.table.field.event.timestamp": "created_at",

"transforms.outbox.route.by.field": "aggregate_type",

"transforms.outbox.route.topic.replacement": "events.${routedByValue}",

"transforms.outbox.table.expand.json.payload": "true",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter.schemas.enable": "false",

"heartbeat.interval.ms": "10000",

"snapshot.mode": "initial"

}

}

この設定の重要なポイントは以下の通りです。

- `table.include.list`でOutboxテーブルのみを監視し、不要なCDCイベントを防止します

- `transforms.outbox.route.by.field`を`aggregate_type`に設定することで、ドメインごとにKafkaトピックが自動生成されます。例えば、aggregate_typeが'Order'であれば`events.Order`トピックに発行されます

- `table.field.event.key`を`aggregate_id`に設定することで、同じエンティティのイベントが同じKafkaパーティションに入ります

- `heartbeat.interval.ms`を設定することで、Outboxテーブルに変更がなくても定期的にオフセットを更新します。この設定がないとWALの保持期間が不必要に長くなる可能性があります

コネクタ登録

curl -X POST http://localhost:8083/connectors \

-H "Content-Type: application/json" \

-d @order-outbox-connector.json

コネクタ状態確認

curl -s http://localhost:8083/connectors/order-outbox-connector/status | jq .

コネクタ再起動

curl -X POST http://localhost:8083/connectors/order-outbox-connector/restart

トピック一覧確認(Kafka UI または CLI)

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --list

5. Kafka Connectパイプライン

5.1 パイプライン全体の流れ

Kafka ConnectベースのCDCパイプラインの全体フローは以下の通りです。

1. アプリケーションがビジネスデータとOutboxイベントを同一トランザクションでDBに書き込みます

2. Debezium Source ConnectorがPostgreSQLのWALを読み取り、Outboxテーブルの変更を検知します

3. Outbox Event Router SMTが元のCDCイベントをドメインごとのKafkaトピックにルーティングします

4. コンシューママイクロサービスが該当トピックをサブスクライブしてイベントを処理します

5. 必要に応じてSink Connectorがイベントを他のデータストア(Elasticsearch、S3など)に転送します

5.2 Spring Boot Outboxパターン実装

// OutboxEvent.java - Outboxエンティティ

@Entity

@Table(name = "outbox_events")

public class OutboxEvent {

@Id

@GeneratedValue(strategy = GenerationType.UUID)

private UUID id;

@Column(name = "aggregate_type", nullable = false)

private String aggregateType;

@Column(name = "aggregate_id", nullable = false)

private String aggregateId;

@Column(name = "event_type", nullable = false)

private String eventType;

@Column(name = "payload", columnDefinition = "jsonb", nullable = false)

private String payload;

@Column(name = "created_at", nullable = false)

private Instant createdAt;

// デフォルトコンストラクタ、getter、setter省略

public static OutboxEvent create(String aggregateType, String aggregateId,

String eventType, Object payload) {

OutboxEvent event = new OutboxEvent();

event.aggregateType = aggregateType;

event.aggregateId = aggregateId;

event.eventType = eventType;

event.payload = toJson(payload);

event.createdAt = Instant.now();

return event;

}

private static String toJson(Object obj) {

try {

return new ObjectMapper()

.registerModule(new JavaTimeModule())

.writeValueAsString(obj);

} catch (JsonProcessingException e) {

throw new RuntimeException("JSONシリアライズ失敗", e);

}

}

}

// OrderService.java - Outboxパターン適用注文サービス

@Service

@RequiredArgsConstructor

public class OrderService {

private final OrderRepository orderRepository;

private final OutboxEventRepository outboxRepository;

@Transactional // 一つのトランザクションでビジネスデータとイベントを同時に保存

public Order createOrder(OrderRequest request) {

// 1. ビジネスロジック実行

Order order = Order.builder()

.userId(request.getUserId())

.items(request.getItems())

.totalAmount(calculateTotal(request.getItems()))

.status(OrderStatus.CREATED)

.build();

Order savedOrder = orderRepository.save(order);

// 2. Outboxテーブルにイベント記録(同一トランザクション)

OrderCreatedPayload eventPayload = OrderCreatedPayload.builder()

.orderId(savedOrder.getId().toString())

.userId(savedOrder.getUserId())

.totalAmount(savedOrder.getTotalAmount())

.items(savedOrder.getItems())

.createdAt(savedOrder.getCreatedAt())

.build();

OutboxEvent outboxEvent = OutboxEvent.create(

"Order", // aggregate_type -> Kafkaトピック決定

savedOrder.getId().toString(), // aggregate_id -> Kafkaパーティションキー

"OrderCreated", // event_type

eventPayload // payload

);

outboxRepository.save(outboxEvent);

return savedOrder;

}

@Transactional

public Order cancelOrder(UUID orderId) {

Order order = orderRepository.findById(orderId)

.orElseThrow(() -> new OrderNotFoundException(orderId));

if (order.getStatus() != OrderStatus.CREATED) {

throw new IllegalStateException(

"CREATED状態の注文のみキャンセルできます。現在の状態: " + order.getStatus()

);

}

order.setStatus(OrderStatus.CANCELLED);

Order savedOrder = orderRepository.save(order);

OutboxEvent outboxEvent = OutboxEvent.create(

"Order",

savedOrder.getId().toString(),

"OrderCancelled",

Map.of(

"orderId", savedOrder.getId().toString(),

"reason", "ユーザーリクエスト",

"cancelledAt", Instant.now().toString()

)

);

outboxRepository.save(outboxEvent);

return savedOrder;

}

}

6. イベント順序保証と冪等性処理

6.1 イベント順序保証

Outboxパターンにおけるイベント順序保証は、以下の原則に従います。

- **同一Aggregateに対する順序保証**:`aggregate_id`をKafkaパーティションキーとして使用すると、同じエンティティに対するイベントが同じパーティションに順序通りに保存されます

- **異なるAggregate間では順序が保証されない**:これは意図的な設計であり、マイクロサービス間の疎結合に合致します

- **パーティション数変更時の注意**:Kafkaトピックのパーティション数を変更すると、既存のキーとパーティションのマッピングが崩れるため、運用中はパーティション数を変更しないことが安全です

6.2 冪等性処理(Pythonコンシューマ例)

from datetime import datetime

from kafka import KafkaConsumer

from sqlalchemy import create_engine, text

from sqlalchemy.orm import Session

冪等性保証のための処理履歴テーブル

CREATE TABLE processed_events (

event_id UUID PRIMARY KEY,

processed_at TIMESTAMP NOT NULL DEFAULT NOW()

);

engine = create_engine('postgresql://user:pass@localhost:5432/inventorydb')

consumer = KafkaConsumer(

'events.Order',

bootstrap_servers=['localhost:9092'],

group_id='inventory-service',

auto_offset_reset='earliest',

enable_auto_commit=False, # 手動コミットによる正確な制御

value_deserializer=lambda m: json.loads(m.decode('utf-8')),

max_poll_records=100,

)

def process_order_event(event_data: dict, event_id: str) -> None:

"""冪等性を保証するイベント処理"""

with Session(engine) as session:

with session.begin():

1. 既に処理済みのイベントか確認

result = session.execute(

text("SELECT 1 FROM processed_events WHERE event_id = :id"),

{"id": event_id}

)

if result.fetchone():

print(f"既に処理済みのイベントをスキップ: {event_id}")

return

2. ビジネスロジック実行

event_type = event_data.get('eventType', '')

if event_type == 'OrderCreated':

items = event_data.get('items', [])

for item in items:

session.execute(

text("""

UPDATE inventory

SET reserved_quantity = reserved_quantity + :qty

WHERE product_id = :product_id

AND available_quantity >= :qty

"""),

{"qty": item['quantity'], "product_id": item['productId']}

)

elif event_type == 'OrderCancelled':

order_id = event_data.get('orderId')

session.execute(

text("""

UPDATE inventory i

SET reserved_quantity = reserved_quantity - oi.quantity

FROM order_items oi

WHERE oi.order_id = :order_id

AND oi.product_id = i.product_id

"""),

{"order_id": order_id}

)

3. 処理履歴記録(同一トランザクション)

session.execute(

text("""

INSERT INTO processed_events (event_id, processed_at)

VALUES (:id, :now)

"""),

{"id": event_id, "now": datetime.utcnow()}

)

print(f"イベント処理完了: {event_id} ({event_type})")

メインコンシューマループ

print("イベントコンシューマ起動...")

try:

for message in consumer:

try:

event_data = message.value

Debezium Outbox Event Routerが設定したヘッダーからevent_idを抽出

headers = {k: v.decode('utf-8') for k, v in message.headers}

event_id = headers.get('id', str(uuid.uuid4()))

process_order_event(event_data, event_id)

処理成功時のみオフセットコミット

consumer.commit()

except Exception as e:

print(f"イベント処理失敗: {e}")

失敗時はDLQ(Dead Letter Queue)に送信するかリトライロジックを適用

ここでは単純にログ出力して次のイベントに進む

except KeyboardInterrupt:

print("コンシューマ終了")

finally:

consumer.close()

冪等性処理の核心は、**イベント処理と処理履歴の記録を同一DBトランザクションにまとめること**です。こうすることで、イベント処理中に障害が発生しても処理履歴が記録されないため、再起動時に同じイベントを再処理できます。既に処理済みのイベントは処理履歴テーブルで確認してスキップします。

7. Polling方式 vs CDC方式の比較

| 比較項目 | Polling方式 | CDC方式(Debezium) |

| ---------------------- | ------------------------------------ | --------------------------------- |

| **レイテンシ** | ポーリング周期に依存(秒〜分) | ミリ秒レベルのニアリアルタイム |

| **DB負荷** | 定期的なSELECTクエリ負荷 | WAL/binlog読み取りにより負荷最小 |

| **実装の複雑さ** | 低い(CRON + SQL) | 高い(Kafka Connect + Debezium) |

| **インフラ要件** | DBのみ必要 | Kafka + Kafka Connect + Debezium |

| **DELETE検知** | 不可(soft delete必要) | 可能(ログに記録) |

| **中間状態キャプチャ** | 不可(最後の状態のみ) | 可能(すべての変更をキャプチャ) |

| **順序保証** | タイムスタンプベース(脆弱) | ログベース(堅牢) |

| **スケーラビリティ** | DBコネクション負担増加 | Kafka Connect水平スケーリング可能 |

| **運用の複雑さ** | 低い | 高い(Kafkaクラスタ管理が必要) |

| **障害復旧** | ステータスカラムベースの簡易リトライ | Kafkaオフセットベースの精密な復旧 |

| **適合環境** | 小規模、低レイテンシ許容 | 大規模、リアルタイム処理が必要 |

> 最初にOutboxパターンを導入する際はPolling方式から始め、トラフィック増加に応じてCDC方式に移行する段階的なアプローチも有効です。Polling方式であってもDual Write問題は完全に解決でき、レイテンシ要件が秒単位であれば十分な場合があります。

8. Debezium vs Maxwell vs Canal 比較

| 比較項目 | Debezium | Maxwell | Canal |

| ------------------------ | ----------------------------------------------------------- | ------------------------------- | ------------------------------- |

| **対応DB** | PostgreSQL, MySQL, MongoDB, SQL Server, Oracle, Cassandra等 | MySQLのみ | MySQLのみ |

| **アーキテクチャ** | Kafka Connectベースの分散型 | 単一Javaプロセス | 単一Javaプロセス |

| **メッセージブローカー** | Kafka(デフォルト)、Pulsar、NATS等 | Kafka、RabbitMQ、Redis等 | Kafka、RocketMQ等 |

| **Outboxサポート** | EventRouter SMTネイティブサポート | 別途実装が必要 | 別途実装が必要 |

| **スキーマ進化** | Schema Registry統合サポート | 限定的 | 限定的 |

| **スケーラビリティ** | Kafka Connect分散モードで水平スケーリング | 単一プロセスの限界 | 単一プロセスの限界 |

| **設定の複雑さ** | 高い(Kafka Connectの理解が必要) | 低い(シンプルな設定) | 中程度 |

| **コミュニティ** | 非常に活発(Red Hat後援) | 小規模 | 中国コミュニティ中心(Alibaba) |

| **運用成熟度** | 高い | 中程度 | 中程度 |

| **適合環境** | 大規模、複数DB、エンタープライズ | MySQLのみ使用する小規模サービス | MySQLベースの中国エコシステム |

**選択ガイド**:ほとんどのプロダクション環境では**Debezium**を推奨します。多様なデータベースをサポートし、Kafka Connectの分散モードで高可用性を確保でき、Outbox Event Routerをネイティブにサポートしています。MySQLのみ使用していて迅速なPoCが必要な場合は、**Maxwell**のシンプルさが魅力的です。CanalはAlibabaが開発したもので、中国のエコシステムで主に使用されており、海外での採用は比較的少ないです。

9. 障害シナリオと復旧手順

9.1 シナリオ1:Debeziumコネクタ障害

**症状**:Kafka Connectのコネクタ状態がFAILEDに変わり、新しいイベントがKafkaに発行されなくなります。

**主な原因**:DB接続切断、WALスロット削除、スキーマ変更の互換性問題、Kafkaブローカー接続不可など

**復旧手順**:

1. コネクタ状態確認

curl -s http://localhost:8083/connectors/order-outbox-connector/status | jq .

2. コネクタタスクの再起動を試行

curl -X POST http://localhost:8083/connectors/order-outbox-connector/tasks/0/restart

3. 再起動できない場合はコネクタを削除して再登録

curl -X DELETE http://localhost:8083/connectors/order-outbox-connector

curl -X POST http://localhost:8083/connectors \

-H "Content-Type: application/json" \

-d @order-outbox-connector.json

4. PostgreSQL WALスロット状態確認

psql -c "SELECT * FROM pg_replication_slots;"

5. 必要に応じてスロットを再作成

psql -c "SELECT pg_drop_replication_slot('order_outbox_slot');"

コネクタ再登録時に自動的にスロットが作成される

**注意事項**:WALスロットが有効な状態でDebeziumが長期間停止すると、PostgreSQLが該当スロット以降のWALファイルを削除できず、ディスクが満杯になる可能性があります。必ずWALサイズを監視し、`max_slot_wal_keep_size`パラメータで上限を設定する必要があります。

9.2 シナリオ2:Kafkaブローカー障害

**症状**:Debeziumコネクタがイベントを発行できず、コンシューマもイベントを消費できなくなります。

**復旧手順**:

1. Kafkaブローカーを復旧します

2. Debeziumは最後に成功したオフセットから自動的に再開します(Kafka Connectのオフセット管理)

3. コンシューマも最後にコミットされたオフセットから再開します

4. CDC方式では、DBのWALが保持されている限りデータ損失は発生しません

9.3 シナリオ3:コンシューマ処理失敗

**症状**:特定のイベントが繰り返し処理に失敗し、コンシューマが進行できなくなります(poison pill)。

**復旧手順**:

1. リトライポリシーを適用します(最大3〜5回、指数バックオフ)

2. 最大リトライ回数超過時にDLQ(Dead Letter Queue)トピックに送信します

3. DLQのメッセージを手動で分析し、修正後に元のトピックに再発行します

9.4 シナリオ4:Outboxテーブルの肥大化

**症状**:Outboxテーブルが数百万行に膨れ上がり、DB性能が低下します。

**復旧手順**:

1. CDC方式では、イベントがキャプチャされた後、一定時間経過した行を削除します

2. PostgreSQLの場合、パーティショニングを適用して古いパーティションをDROPします

3. 定期的なVACUUMを実行してデッドタプル(dead tuple)を整理します

-- 日別パーティショニングされたOutboxテーブル

CREATE TABLE outbox_events (

id UUID NOT NULL DEFAULT gen_random_uuid(),

aggregate_type VARCHAR(255) NOT NULL,

aggregate_id VARCHAR(255) NOT NULL,

event_type VARCHAR(255) NOT NULL,

payload JSONB NOT NULL,

created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()

) PARTITION BY RANGE (created_at);

-- 日別パーティション自動生成(pg_partman活用)

SELECT partman.create_parent(

'public.outbox_events',

'created_at',

'native',

'daily'

);

-- 7日以上経過したパーティションを自動削除

UPDATE partman.part_config

SET retention = '7 days', retention_keep_table = false

WHERE parent_table = 'public.outbox_events';

10. 運用時の注意事項チェックリスト

**Debezium/CDC設定段階**:

- PostgreSQLの`wal_level`を`logical`に設定して再起動したか確認します

- `max_replication_slots`と`max_wal_senders`をコネクタ数より余裕を持って設定します

- `max_slot_wal_keep_size`を設定してWALディスク暴走を防止します

- Debeziumコネクタの`heartbeat.interval.ms`を必ず設定してオフセット更新を保証します

- `snapshot.mode`を慎重に選択します(初回は`initial`、以降の再起動時は`schema_only`)

**イベント設計段階**:

- イベントpayloadにコンシューマが必要とするすべての情報を含めます(自己完結型イベント)

- スキーマの進化を考慮して、フィールドの追加は許容しつつ既存フィールドの削除は避けます

- aggregate_idをKafkaパーティションキーとして使用し、同じエンティティのイベント順序を保証します

- イベントサイズを1MB以下に維持します(Kafkaデフォルトのメッセージ最大サイズ)

**コンシューマ実装段階**:

- すべてのコンシューマに冪等性を実装します(processed_eventsテーブル活用)

- auto.commitを無効化し、手動でオフセットをコミットします

- DLQを構成してpoison pillメッセージによる処理停滞を防止します

- コンシューマグループのセッションタイムアウトとハートビート間隔を適切に設定します

**モニタリング段階**:

- Debeziumコネクタ状態を定期的に確認します(RUNNING/FAILED)

- WALスロットサイズとディスク使用量を監視します

- イベント発行レイテンシ(CDC lag)を監視します

- コンシューマラグ(consumer lag)を監視します

- Outboxテーブルのサイズと行数を監視します

- DLQトピックのメッセージ数にアラートを設定します

**障害対策**:

- Kafka Connectを分散モードで運用し、ワーカーノード障害時にタスクが自動再割り当てされるようにします

- 定期的にコネクタの削除・再登録の復旧手順をテストします

- WALスロット関連の障害シナリオを運用マニュアルに文書化します

- パイプライン全体のエンドツーエンドテストを自動化します

参考資料

1. [Debezium公式ドキュメント - Outbox Event Router](https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html) - Outbox Event Router SMTの全設定オプションと使用方法

2. [Debeziumブログ - Reliable Microservices Data Exchange With the Outbox Pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/) - OutboxパターンとDebeziumを活用したマイクロサービスデータ交換の原理

3. [Thorben Janssen - Implementing the Outbox Pattern with CDC using Debezium](https://thorben-janssen.com/outbox-pattern-with-cdc-and-debezium/) - JPA/Hibernate環境でのOutboxパターン実装実践ガイド

4. [Decodable - Revisiting the Outbox Pattern](https://www.decodable.co/blog/revisiting-the-outbox-pattern) - 2024年の視点からのOutboxパターンの再評価と代替案分析

5. [Upsolver - Debezium vs Maxwell](https://www.upsolver.com/blog/debezium-vs-maxwell) - CDCツール比較と選択基準の詳細分析

6. [RisingWave - Debezium vs Other CDC Tools](https://risingwave.com/blog/debezium-vs-other-change-data-capture-tools-a-comprehensive-comparison/) - Debeziumと競合CDCツールの総合比較

7. [DEV Community - CDC Maxwell vs Debezium](https://dev.to/saifcse/cdc-maxwell-vs-debezium-1lh0) - MaxwellとDebeziumのアーキテクチャおよび機能比較

8. [Medium - Change Data Capture vs Outbox Pattern](https://medium.com/@luke.greene86/change-data-capture-vs-outbox-pattern-how-to-choose-the-right-tool-for-reliable-event-delivery-e8ee8f5f50bd) - CDCとOutboxパターンの違いと適切な使用シナリオの分析

クイズ

Q1: 「Outboxパターンと

CDCで実現するマイクロサービスデータ同期:Debezium実践ガイド」の主なトピックは何ですか?

Outboxパターンと CDC(Change Data Capture)を活用したマイクロサービスデータ同期の総合ガイド。Dual

Write問題の分析、Outboxテーブル設計、Debeziumコネクタ構成、Kafka

Connectパイプライン、イベント順序保証、冪等性処理、障害復旧まで実践的に解説します。

1.1 核心アイデア

Outboxパターンの核心はシンプルです。イベントをメッセージブローカーに直接発行するのではなく、ビジネスデータと同一トランザクションでOutboxテーブルに記録するということです。こうすることで、DBのACIDトランザクションが両方の操作のアトミック性を保証します。その後、別のプロセスがOutboxテーブルを読み取ってメッセージブローカーに転送します。

この方式の利点は明確です。ビジネスデータとイベントが一つのトランザクションにまとめられるため、両方とも保存されるか、両方とも保存されないかのどちらかになります。

2.1 基本スキーマ 2.2 設計時の重要な考慮事項

aggregate_idをKafkaパーティションキーとして使用:同じaggregate_idを持つイベントが同じパーティションに入るため、そのエンティティに対するイベントの順序が保証されます。例えば、注文123に対する'OrderCreated'

-> 'OrderPaid' -> 'OrderShipped'イベントが順序通りに処理されます。

payloadに必要な情報をすべて含める:コンシューマがイベントだけで処理を完了できるよう、payloadに十分な情報を含めます。

3.1 CDCとは何か CDC(Change Data

Capture)は、データベースの変更内容(INSERT、UPDATE、DELETE)をリアルタイムでキャプチャして外部システムに転送する技術です。CDCの核心はデータベースのトランザクションログを直接読み取ることです。

4.1 Debeziumの概要 DebeziumはRed Hatが主導するオープンソースCDCプラットフォームで、Kafka

Connectフレームワーク上で動作します。PostgreSQL、MySQL、MongoDB、SQL

Server、Oracle、Cassandra、Vitessなど多様なデータベースをサポートしており、Outbox Event Router

SMT(Single Message Transformation)を通じてOutboxパターンをネイティブにサポートしています。

현재 단락 (1/529)

マイクロサービスアーキテクチャで最も頻繁に直面する問題の一つは、**データベースとメッセージブローカーに同時にデータを書き込まなければならない状況**です。注文サービスが注文を作成すると同時に、在庫サ...

작성 글자: 0원문 글자: 21,188작성 단락: 0/529