- Published on
非同期処理パターン完全攻略:CQRS、Saga、Event Sourcing — シニアバックエンド開発者の必須武器
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- 1. なぜ非同期なのか?大規模システムの生存戦略
- 2. Event-Driven Architecture 基礎
- 3. CQRS ディープダイブ
- 4. Sagaパターン完全<ruby>攻略<rp>(</rp><rt>こうりゃく</rt><rp>)</rp></ruby>
- 5. Event Sourcing
- 6. Outbox Pattern(信頼性の核心)
- 7. <ruby>冪等性<rp>(</rp><rt>べきとうせい</rt><rp>)</rp></ruby>パターン
- 8. Dead Letter Queue 戦略
- 9. Exactly-Once Semantics
- 10. Reactive Streams vs Virtual Threads(2025)
- 11. 実践アーキテクチャ:決済システム設計
- クイズ
- 参考資料
1. なぜ非同期なのか?大規模システムの生存戦略
現代
の大規模システムは、
同期式
のリクエスト-レスポンスモデルでは
対応
できない規模のトラフィックを処理しています。
実際の企業事例:
| 企業 | 規模 | 非同期処理方式 |
|---|---|---|
| Netflix | 2.6億サブスクライバー、毎秒数百万イベント | Apache Kafkaベースのイベントストリーミング |
| Uber | 日次ペタバイトのリアルタイムデータ | Apache Kafka + Apache Flink |
| 7兆+メッセージ/日のKafka処理 | 自社開発Kafkaインフラ | |
| Shopify | ブラックフライデー毎秒80万リクエスト | 非同期注文処理パイプライン |
同期式
アーキテクチャでは、サービスAがサービスBを
呼
び出し、BがCを呼び出すと、全体のレイテンシは各サービス呼び出し時間の合計になります。一つのサービスが遅くなると、チェーン全体がブロッキングされます。
非同期
アーキテクチャでは、メッセージブローカーを通じてサービス間の
結合度
を下げ、各サービスが独立にスケーリング可能です。
同期式: Client -> A(50ms) -> B(100ms) -> C(200ms) = 350ms 合計待機
非同期式: Client -> A(50ms) -> Message Broker -> (B, C 並列処理)
Clientは50ms後に応答、BとCは非同期で処理
非同期移行の主要メリット:
- 時間的結合の除去 — プロデューサーとコンシューマーが同時に稼働する必要がない
- 独立したスケーリング — 読み取りと書き込みワークロードを別々にスケーリング
- 障害の分離 — 一つのサービスの障害がシステム全体に波及しない
- ピークトラフィックの吸収 — メッセージキューがバッファーとして機能
しかし、非同期はタダではありません。Eventual Consistency(結果整合性)、メッセージ順序保証、重複処理、分散トランザクションといった新しい課題が生まれます。本記事で解説するパターンは、まさにこれらの課題への解答です。
2. Event-Driven Architecture 基礎
イベント駆動アーキテクチャ(EDA)は非同期パターンの土台です。システムの状態変更を「イベント」として表現し、関心のあるサービスがこれを購読して反応します。
2-1. Domain Events vs Integration Events
| 区分 | Domain Event | Integration Event |
|---|---|---|
| 範囲 | 単一バウンデッドコンテキスト内部 | バウンデッドコンテキスト間 |
| 例 | OrderItemAdded | OrderPlaced |
| 配信方式 | インメモリイベントバス | メッセージブローカー(Kafka, RabbitMQ) |
| スキーマ管理 | 内部的なので自由に変更可能 | コントラクト管理が必須 |
// Domain Event - バウンデッドコンテキスト内部
public record OrderItemAdded(
UUID orderId,
UUID productId,
int quantity,
BigDecimal price,
Instant occurredAt
) implements DomainEvent {}
// Integration Event - 外部サービスに発行
public record OrderPlacedEvent(
UUID orderId,
UUID customerId,
BigDecimal totalAmount,
List<OrderLineItem> items,
Instant occurredAt,
int version // スキーマバージョン管理
) implements IntegrationEvent {}
2-2. Event Bus vs Event Store
Event Bus(例:Kafka, RabbitMQ)はイベントを配信するパイプラインです。コンシューマーがイベントを受信すると、通常ブローカーから削除(またはTTL後に失効)されます。
Event Storeは全てのイベントを永続的に保管する貯蔵所です。イベントをリプレイ(replay)して、任意の時点の状態を復元できます。
Event Bus (Kafka):
Producer -> [Topic: orders] -> Consumer Group A (注文サービス)
-> Consumer Group B (通知サービス)
-> Consumer Group C (分析サービス)
Event Store:
[Event #1: OrderCreated]
[Event #2: ItemAdded]
[Event #3: PaymentReceived]
[Event #4: OrderShipped]
-> イベント#1~#4をリプレイすると現在の注文状態を復元可能
2-3. Eventual Consistencyの理解
分散システムで強い一貫性(Strong Consistency)を維持するには、分散ロック、2PC(2-Phase Commit)等が必要ですが、パフォーマンスと可用性が大幅に低下します。
CAP定理によると、ネットワークパーティション時に一貫性(C)と可用性(A)のどちらかを選択する必要があります。ほとんどの大規模システムは可用性(A)を選択し、Eventual Consistencyを受容します。
Eventual Consistencyとは「今すぐはデータが不整合かもしれないが、十分な時間が経てば全てのノードが同じ状態に収束する」ということです。
実務での意味:
- 注文後に在庫がすぐに減らないことがある(ミリ秒〜数秒の遅延)
- ユーザーには「注文処理中です」を先に表示し、確定後に更新
- 一貫性ウィンドウ(Consistency Window)の最小化が鍵
3. CQRS ディープダイブ
CQRS(Command Query Responsibility Segregation)は読み取りと書き込みモデルを完全に分離するパターンです。
3-1. 基本概念
従来
のCRUDモデルでは、一つのデータモデルが読み取りと書き込みの両方を担当します。しかし実際には、読み取りと書き込みは非常に異なる要件を持ちます。
従来のCRUD:
Client -> [単一モデル] -> Database
(読み取り/書き込み両方)
CQRS:
Client --Command--> [Write Model] -> Command DB (正規化)
| Events
Client <--Query---- [Read Model] <- Read DB (非正規化、最適化)
Write Model(コマンド側):
- ビジネスルール検証
- ドメイン不変条件(Invariant)の保証
- 正規化されたスキーマ
- 強い一貫性
Read Model(クエリ側):
- クエリに最適化された非正規化ビュー
- 多様なストレージの利用可能(Elasticsearch, Redis, MongoDB)
- Eventual Consistency許容
3-2. いつCQRSを使うべきか?
適している場合:
- 読み取り/書き込み比率が極端に異なるとき(読み取りが100:1以上)
- 複雑なドメインロジック + 多様なクエリ要件
- 読み取りと書き込みのスケーリング要件が異なるとき
- Event Sourcingと組み合わせて使うとき
適していない場合:
- 単純なCRUDアプリケーション
- 強い一貫性が絶対に必要な場合
- ドメインが単純でクエリパターンが予測可能な場合
3-3. Spring Boot + Kafka 実装
// === Command側:注文作成 ===
@Service
@RequiredArgsConstructor
public class OrderCommandService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Transactional
public UUID createOrder(CreateOrderCommand command) {
// 1. ビジネスルール検証
validateOrder(command);
// 2. ドメインモデルで処理
Order order = Order.create(
command.customerId(),
command.items(),
command.shippingAddress()
);
// 3. Write DBに保存
orderRepository.save(order);
// 4. イベント発行(Read Model同期用)
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems().stream()
.map(this::toEventItem)
.toList(),
Instant.now()
);
kafkaTemplate.send("order-events", order.getId().toString(), event);
return order.getId();
}
private void validateOrder(CreateOrderCommand command) {
if (command.items().isEmpty()) {
throw new InvalidOrderException("注文項目が空です");
}
// 在庫確認、顧客ステータス確認など
}
}
// === Query側:注文照会プロジェクション ===
@Service
@RequiredArgsConstructor
public class OrderQueryService {
private final OrderReadRepository readRepository;
public OrderDetailView getOrderDetail(UUID orderId) {
return readRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public Page<OrderSummaryView> getCustomerOrders(
UUID customerId, Pageable pageable) {
return readRepository.findByCustomerId(customerId, pageable);
}
}
// Read Model - クエリに最適化された非正規化ビュー
@Document(collection = "order_views")
public record OrderDetailView(
UUID orderId,
UUID customerId,
String customerName, // 非正規化:顧客名を含む
String customerEmail,
BigDecimal totalAmount,
String status,
List<OrderItemView> items,
AddressView shippingAddress,
Instant createdAt,
Instant lastUpdatedAt
) {}
// === イベントハンドラー:Write -> Read 同期 ===
@Component
@RequiredArgsConstructor
public class OrderProjectionHandler {
private final OrderReadRepository readRepository;
private final CustomerQueryClient customerClient;
@KafkaListener(topics = "order-events", groupId = "order-projection")
public void handleOrderEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> handleOrderCreated(e);
case OrderStatusChangedEvent e -> handleStatusChanged(e);
case OrderCancelledEvent e -> handleOrderCancelled(e);
default -> log.warn("不明なイベントタイプ: {}", event.getClass());
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
// 顧客情報を取得して非正規化ビューを生成
CustomerInfo customer = customerClient.getCustomer(event.customerId());
OrderDetailView view = new OrderDetailView(
event.orderId(),
event.customerId(),
customer.name(),
customer.email(),
event.totalAmount(),
"CREATED",
event.items().stream().map(this::toItemView).toList(),
null,
event.occurredAt(),
event.occurredAt()
);
readRepository.save(view);
}
}
3-4. Read Model 同期戦略
| 戦略 | 遅延 | 一貫性 | 複雑度 | 適している場合 |
|---|---|---|---|---|
| イベントハンドラー(Kafka) | 数ms〜数秒 | Eventual | 中 | ほとんどの場合 |
| CDC(Debezium) | 数秒 | Eventual | 高 | レガシーシステム連携 |
| ポーリング(Scheduled) | 数秒〜数分 | Eventual | 低 | 単純な場合 |
| 同期式更新 | 即時 | Strong | 低 | パフォーマンス要件が低い時 |
4. Sagaパターン完全攻略
マイクロサービスで複数のサービスにまたがるビジネストランザクションをどう管理するか?従来のモノリスのDBトランザクションは使用できません。Sagaパターンは分散トランザクションを一連のローカルトランザクションに分解します。
4-1. Choreography(振付型)
Choreography方式では中央コーディネーターがありません。各サービスがイベントを発行し、次のサービスがそのイベントを購読して自身のローカルトランザクションを実行します。
ECサイト注文フロー(Choreography):
Order Service --OrderPlaced--> Payment Service
|
PaymentCompleted
|
v
Inventory Service
|
InventoryReserved
|
v
Shipping Service
|
ShipmentCreated
|
v
Order Service (ステータス更新)
失敗時(補償トランザクション):
Inventory Service --InventoryReservationFailed--> Payment Service (返金)
|
PaymentRefunded
|
v
Order Service (注文キャンセル)
// === Choreography Saga:注文サービス ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public UUID placeOrder(PlaceOrderCommand command) {
Order order = Order.create(command);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// イベント発行 - 決済サービスが購読
kafkaTemplate.send("order-events", new OrderPlacedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
order.getItems()
));
return order.getId();
}
// 決済完了イベント受信
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentCompleted(PaymentCompletedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
}
// 決済失敗イベント受信
@KafkaListener(topics = "payment-events", groupId = "order-service")
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.setStatus(OrderStatus.CANCELLED);
order.setCancellationReason(event.reason());
orderRepository.save(order);
}
}
// === Choreography Saga:決済サービス ===
@Service
@RequiredArgsConstructor
public class PaymentService {
private final PaymentRepository paymentRepository;
private final PaymentGateway paymentGateway;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order-events", groupId = "payment-service")
@Transactional
public void onOrderPlaced(OrderPlacedEvent event) {
try {
// 決済処理
PaymentResult result = paymentGateway.charge(
event.customerId(),
event.totalAmount()
);
Payment payment = Payment.create(
event.orderId(),
event.customerId(),
event.totalAmount(),
result.transactionId()
);
paymentRepository.save(payment);
// 成功イベント発行 - 在庫サービスが購読
kafkaTemplate.send("payment-events", new PaymentCompletedEvent(
event.orderId(),
payment.getId(),
result.transactionId()
));
} catch (PaymentException e) {
// 失敗イベント発行 - 注文サービスが購読してキャンセル
kafkaTemplate.send("payment-events", new PaymentFailedEvent(
event.orderId(),
e.getMessage()
));
}
}
}
Choreographyの長所と短所:
| 長所 | 短所 |
|---|---|
| サービス間の疎結合 | 全体フローの把握が困難 |
| 各サービスが独立にデプロイ可能 | デバッグが複雑 |
| 単一障害点(SPOF)なし | 循環依存の発生可能性 |
| 実装が比較的シンプル | サービス増加でイベント追跡の難易度上昇 |
4-2. Orchestration(指揮型)
Orchestration方式では中央コーディネーター(Orchestrator)が全体のワークフローを管理します。各サービスにコマンドを送り、レスポンスに応じて次のステップを決定します。
ECサイト注文フロー(Orchestration):
Order Saga Orchestrator
/ | \
v v v
Payment Service Inventory Shipping
(charge) (reserve) (create)
v v v
(result) (result) (result)
\ | /
Order Saga Orchestrator
(最終決定)
// === Orchestration Saga:Temporalフレームワーク使用 ===
@WorkflowInterface
public interface OrderSagaWorkflow {
@WorkflowMethod
OrderResult processOrder(PlaceOrderCommand command);
}
@WorkflowImpl(workflows = OrderSagaWorkflow.class)
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {
// 各サービスのActivityスタブ
private final PaymentActivities paymentActivities =
Workflow.newActivityStub(PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build());
private final InventoryActivities inventoryActivities =
Workflow.newActivityStub(InventoryActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build());
private final ShippingActivities shippingActivities =
Workflow.newActivityStub(ShippingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.build());
@Override
public OrderResult processOrder(PlaceOrderCommand command) {
String paymentId = null;
String reservationId = null;
try {
// Step 1:決済
paymentId = paymentActivities.processPayment(
command.customerId(),
command.totalAmount()
);
// Step 2:在庫予約
reservationId = inventoryActivities.reserveInventory(
command.items()
);
// Step 3:配送作成
String shipmentId = shippingActivities.createShipment(
command.orderId(),
command.shippingAddress()
);
return OrderResult.success(command.orderId(), shipmentId);
} catch (Exception e) {
// 補償トランザクション実行
compensate(paymentId, reservationId);
return OrderResult.failed(command.orderId(), e.getMessage());
}
}
private void compensate(String paymentId, String reservationId) {
// 逆順で補償トランザクションを実行
if (reservationId != null) {
inventoryActivities.cancelReservation(reservationId);
}
if (paymentId != null) {
paymentActivities.refundPayment(paymentId);
}
}
}
// === Activity実装 ===
@ActivityInterface
public interface PaymentActivities {
String processPayment(UUID customerId, BigDecimal amount);
void refundPayment(String paymentId);
}
@Component
@RequiredArgsConstructor
public class PaymentActivitiesImpl implements PaymentActivities {
private final PaymentGateway paymentGateway;
private final PaymentRepository paymentRepository;
@Override
public String processPayment(UUID customerId, BigDecimal amount) {
PaymentResult result = paymentGateway.charge(customerId, amount);
Payment payment = Payment.create(customerId, amount, result.transactionId());
paymentRepository.save(payment);
return payment.getId().toString();
}
@Override
public void refundPayment(String paymentId) {
Payment payment = paymentRepository.findById(UUID.fromString(paymentId))
.orElseThrow();
paymentGateway.refund(payment.getTransactionId());
payment.setStatus(PaymentStatus.REFUNDED);
paymentRepository.save(payment);
}
}
Orchestrationの長所と短所:
| 長所 | 短所 |
|---|---|
| ワークフロー全体が一箇所で管理される | オーケストレーターがSPOFになる可能性 |
| デバッグとモニタリングが容易 | Choreographyより結合度が高い |
| 複雑な補償ロジックの管理が容易 | オーケストレーターにロジックが集中するリスク |
| Temporal/Cadence等のフレームワークサポート | 追加インフラ(ワークフローエンジン)が必要 |
4-3. Choreography vs Orchestration 選択ガイド
| 基準 | Choreography | Orchestration |
|---|---|---|
| サービス数 | 3〜4個以下 | 5個以上 |
| ワークフロー複雑度 | 線形的、シンプル | 条件分岐、並列処理 |
| 可観測性要件 | 低 | 高 |
| チーム構成 | 独立したチーム | 中央プラットフォームチーム |
| 補償ロジック | シンプル | 複雑(複数経路) |
4-4. Compensating Transactions(補償トランザクション)
補償トランザクションは、既にコミットされたトランザクションを意味論的に取り消す操作です。物理的なロールバックではなく、逆作用(reverse action)を実行します。
// 補償トランザクションの例
public enum CompensationAction {
PAYMENT_CHARGE -> PAYMENT_REFUND,
INVENTORY_RESERVE -> INVENTORY_RELEASE,
SHIPPING_CREATE -> SHIPPING_CANCEL,
COUPON_APPLY -> COUPON_RESTORE,
POINTS_DEDUCT -> POINTS_RESTORE
}
Semantic Compensation vs Exact Compensation:
- Exact Compensation:正確に元の状態に復元(例:決済キャンセル → 全額返金)
- Semantic Compensation:ビジネス的に意味のある補償(例:航空券キャンセル → 一部返金 + クレジット)
実務ではSemantic Compensationがより一般的です。時間が経つと正確な復元が不可能な場合が多いためです。
5. Event Sourcing
Event Sourcingは、エンティティの現在の状態を保存する代わりに、状態変更を引き起こした全てのイベントを順序通りに保存するパターンです。
5-1. 核心概念
従来のCRUD:
Accountテーブル: id=1, balance=750
Event Sourcing:
Event #1: AccountCreated(id=1, initialBalance=1000)
Event #2: MoneyWithdrawn(id=1, amount=200)
Event #3: MoneyDeposited(id=1, amount=150)
Event #4: MoneyWithdrawn(id=1, amount=200)
-> リプレイ: 1000 - 200 + 150 - 200 = 750 (現在の残高)
全ての状態変更が不変(immutable)なイベントログに追記(append-only)されます。削除や変更はありません。
5-2. Event Store 実装
// === Event Store インターフェース ===
public interface EventStore {
void saveEvents(UUID aggregateId, List<DomainEvent> events, int expectedVersion);
List<DomainEvent> getEvents(UUID aggregateId);
List<DomainEvent> getEvents(UUID aggregateId, int fromVersion);
}
// === PostgreSQLベースEvent Store実装 ===
@Repository
@RequiredArgsConstructor
public class PostgresEventStore implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
@Transactional
public void saveEvents(UUID aggregateId, List<DomainEvent> events,
int expectedVersion) {
// 楽観的同時実行制御
Integer currentVersion = jdbcTemplate.queryForObject(
"SELECT COALESCE(MAX(version), 0) FROM event_store WHERE aggregate_id = ?",
Integer.class, aggregateId
);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException(
"同時実行の衝突: 期待バージョン " + expectedVersion +
", 実際バージョン " + currentVersion
);
}
int version = expectedVersion;
for (DomainEvent event : events) {
version++;
jdbcTemplate.update(
"""
INSERT INTO event_store
(event_id, aggregate_id, aggregate_type, event_type,
event_data, version, created_at)
VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)
""",
UUID.randomUUID(),
aggregateId,
event.getAggregateType(),
event.getClass().getSimpleName(),
objectMapper.writeValueAsString(event),
version,
Instant.now()
);
}
}
@Override
public List<DomainEvent> getEvents(UUID aggregateId) {
return jdbcTemplate.query(
"SELECT * FROM event_store WHERE aggregate_id = ? ORDER BY version",
this::mapRowToEvent,
aggregateId
);
}
}
-- Event Storeテーブルスキーマ
CREATE TABLE event_store (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, version) -- 楽観的同時実行制御
);
CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
5-3. Aggregateでのイベント適用
// === Event-Sourced Aggregate ===
public class OrderAggregate {
private UUID id;
private UUID customerId;
private OrderStatus status;
private BigDecimal totalAmount;
private List<OrderItem> items = new ArrayList<>();
private int version = 0;
// 未コミットイベントリスト
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// イベントリプレイで状態復元
public static OrderAggregate fromHistory(List<DomainEvent> history) {
OrderAggregate aggregate = new OrderAggregate();
for (DomainEvent event : history) {
aggregate.apply(event, false);
}
return aggregate;
}
// 新しいコマンド処理
public void placeOrder(UUID customerId, List<OrderItem> items) {
if (this.status != null) {
throw new IllegalStateException("既に作成された注文です");
}
BigDecimal total = items.stream()
.map(item -> item.price().multiply(BigDecimal.valueOf(item.quantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
apply(new OrderPlacedEvent(this.id, customerId, total, items, Instant.now()), true);
}
public void cancelOrder(String reason) {
if (this.status == OrderStatus.SHIPPED) {
throw new IllegalStateException("配送済みの注文はキャンセルできません");
}
apply(new OrderCancelledEvent(this.id, reason, Instant.now()), true);
}
// イベント適用(状態変更)
private void apply(DomainEvent event, boolean isNew) {
when(event); // 状態変更
this.version++;
if (isNew) {
uncommittedEvents.add(event);
}
}
// イベント別の状態変更ロジック
private void when(DomainEvent event) {
switch (event) {
case OrderPlacedEvent e -> {
this.id = e.orderId();
this.customerId = e.customerId();
this.totalAmount = e.totalAmount();
this.items = new ArrayList<>(e.items());
this.status = OrderStatus.PLACED;
}
case OrderCancelledEvent e -> {
this.status = OrderStatus.CANCELLED;
}
case OrderShippedEvent e -> {
this.status = OrderStatus.SHIPPED;
}
default -> throw new IllegalArgumentException("不明なイベント: " + event);
}
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
}
5-4. Snapshotパターン
イベントが数万個蓄積されると、毎回全てをリプレイするのは非効率です。Snapshotを使って特定時点の状態を保存しておけば、その後のイベントだけをリプレイすれば済みます。
// Snapshot保存と復元
@Service
public class SnapshotService {
private static final int SNAPSHOT_INTERVAL = 100; // 100イベントごと
public OrderAggregate loadAggregate(UUID aggregateId) {
// 1. 最新スナップショットを取得
Optional<Snapshot> snapshot = snapshotRepository
.findLatest(aggregateId);
if (snapshot.isPresent()) {
// 2. スナップショットから状態復元
OrderAggregate aggregate = deserialize(snapshot.get().getData());
int fromVersion = snapshot.get().getVersion();
// 3. スナップショット以降のイベントだけリプレイ
List<DomainEvent> newEvents = eventStore
.getEvents(aggregateId, fromVersion + 1);
for (DomainEvent event : newEvents) {
aggregate.apply(event, false);
}
return aggregate;
} else {
// スナップショットなし、全てリプレイ
List<DomainEvent> allEvents = eventStore.getEvents(aggregateId);
return OrderAggregate.fromHistory(allEvents);
}
}
public void saveIfNeeded(OrderAggregate aggregate) {
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
snapshotRepository.save(new Snapshot(
aggregate.getId(),
aggregate.getVersion(),
serialize(aggregate),
Instant.now()
));
}
}
}
5-5. Event Sourcing vs CRUD 比較
| 基準 | CRUD | Event Sourcing |
|---|---|---|
| 保存方式 | 現在の状態のみ保存 | 全ての状態変更イベントを保存 |
| 監査証跡 | 別途実装が必要 | 内蔵(イベント自体が監査ログ) |
| タイムトラベル | 不可能 | 可能(任意時点の状態を復元) |
| ストレージ容量 | 相対的に少ない | イベント蓄積で増加 |
| 複雑度 | 低い | 高い(イベント設計、スナップショット等) |
| 同時実行制御 | 悲観的/楽観的ロック | イベントバージョンベースの楽観的制御 |
| パフォーマンス | 読み取り最適化が容易 | リプレイコスト(スナップショットで緩和) |
| スキーマ変更 | マイグレーションが必要 | イベントアップキャスティング |
| CQRS連携 | 選択事項 | 自然な組み合わせ |
5-6. Event Sourcingを使うべきでないとき
- 単純なCRUDドメイン(ブログ、設定管理)
- 強い一貫性が必須の場合(リアルタイム残高確認が必要なATM)
- イベント設計が不安定な初期段階(頻繁な変更時にアップキャスティングコスト)
- チームの経験が不足している場合(学習曲線が急)
- 規制要件でデータ削除が必要な場合(GDPRの忘れられる権利と衝突)
6. Outbox Pattern(信頼性の核心)
6-1. 問題:二重書込(Dual Write)
分散システムで最も頻繁なミスの一つは、データベース保存とメッセージ発行を別々に実行することです。
// 危険なコード:二重書込問題
@Transactional
public void createOrder(Order order) {
orderRepository.save(order); // Step 1: DB保存
kafkaTemplate.send("orders", event); // Step 2: メッセージ発行
// Step 1成功 + Step 2失敗 = DBにはあるがイベントは未発行
// Step 1成功 + Step 2成功 + DBコミット失敗 = イベントは発行されたがDBにない
}
DBトランザクションとメッセージブローカー送信は異なるインフラであるため、アトミックに処理できません。
6-2. 解決:Outbox Pattern
Outboxパターンは、イベントをDBトランザクション内でoutboxテーブルに保存し、別プロセスがこれをメッセージブローカーに配信します。
1. ビジネスロジック + outbox INSERT = 単一DBトランザクション(原子性保証)
2. 別プロセス(CDC/Poller)がoutboxテーブルを読んでKafkaに発行
3. 発行成功後、outboxレコードを処理完了にマーク
-- Outboxテーブルスキーマ
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
status VARCHAR(20) DEFAULT 'PENDING'
);
CREATE INDEX idx_outbox_pending ON outbox (status, created_at)
WHERE status = 'PENDING';
// === Outboxパターン実装 ===
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
@Transactional // 単一トランザクションで保証
public UUID createOrder(CreateOrderCommand command) {
// 1. ビジネスロジック実行
Order order = Order.create(command);
orderRepository.save(order);
// 2. 同じトランザクションでoutboxにイベント保存
OutboxMessage message = OutboxMessage.builder()
.aggregateType("Order")
.aggregateId(order.getId())
.eventType("OrderCreated")
.payload(objectMapper.writeValueAsString(
new OrderCreatedEvent(order.getId(), order.getTotalAmount())
))
.build();
outboxRepository.save(message);
return order.getId();
// トランザクションコミット時にorderとoutboxの両方が一緒に保存される
}
}
6-3. CDC(Debezium)方式
DebeziumはDBのトランザクションログ(WAL)を監視して変更をキャプチャし、Kafkaに配信します。
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz_password",
"database.dbname": "orderdb",
"database.server.name": "order-service",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "events.order"
}
}
6-4. Polling vs CDC 比較
| 基準 | Polling | CDC (Debezium) |
|---|---|---|
| 遅延 | ポーリング間隔に依存(数秒〜数分) | ほぼリアルタイム(数ms) |
| DB負荷 | 定期的クエリで負荷発生 | WAL読み取りで負荷最小 |
| 実装複雑度 | 低い(スケジューラー + クエリ) | 高い(Debeziumインフラが必要) |
| 順序保証 | outboxテーブルの順序 | WAL順序のまま |
| 運用負荷 | 低い | Kafka Connectクラスターの運用が必要 |
| スケーラビリティ | 単一ポーラーがボトルネック可能 | 高いスケーラビリティ |
7. 冪等性パターン
分散システムでは、メッセージは最低1回(at-least-once)配信されます。ネットワーク障害、ブローカー再起動、コンシューマー再デプロイ等により重複受信が発生します。冪等性は同じメッセージを複数回処理しても、結果が1回処理した場合と同じになることを保証します。
7-1. Producer冪等性(Kafka)
# Kafka Producer設定
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.properties.acks=all
spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
spring.kafka.producer.properties.retries=2147483647
Kafkaの冪等プロデューサーは各メッセージにシーケンス番号を付与し、ブローカーが重複を検出します。
7-2. Consumer冪等性
// === 冪等コンシューマー実装 ===
@Component
@RequiredArgsConstructor
public class IdempotentOrderEventHandler {
private final ProcessedEventRepository processedEventRepository;
private final OrderProjectionRepository orderProjectionRepository;
@KafkaListener(topics = "order-events", groupId = "order-projection")
@Transactional
public void handleOrderEvent(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header("eventId") String eventId) {
// 1. 既に処理済みのイベントか確認
if (processedEventRepository.existsById(eventId)) {
log.info("イベント重複受信を無視: eventId={}", eventId);
return;
}
// 2. ビジネスロジック処理
processEvent(event);
// 3. 処理完了を記録(同一トランザクション)
processedEventRepository.save(new ProcessedEvent(
eventId,
event.getClass().getSimpleName(),
Instant.now()
));
}
private void processEvent(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> createOrderProjection(e);
case OrderStatusChangedEvent e -> updateOrderStatus(e);
default -> log.warn("不明なイベントタイプ");
}
}
}
-- イベント処理履歴テーブル
CREATE TABLE processed_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
7-3. Unique Constraint vs Redis SET NX
| アプローチ | 長所 | 短所 |
|---|---|---|
| DB Unique Constraint | トランザクションと共にアトミック | DB負荷が増加 |
| Redis SET NX | 非常に高速な照会 | Redis障害時に保証されない |
| DB + Redis組み合わせ | 高速な一次フィルター + 確実な保証 | 実装が複雑 |
// Redis SET NXを利用した高速重複チェック
@Component
public class RedisIdempotencyGuard {
private final StringRedisTemplate redisTemplate;
private static final Duration TTL = Duration.ofHours(24);
public boolean tryAcquire(String idempotencyKey) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(
"idempotency:" + idempotencyKey,
"processed",
TTL
);
return Boolean.TRUE.equals(result);
}
}
8. Dead Letter Queue 戦略
メッセージ処理に失敗した場合、無限リトライはシステムを麻痺させます。DLQ(Dead Letter Queue)は処理不能なメッセージを別キューに隔離してシステムの安定性を保証します。
8-1. Retry with Exponential Backoff
// === Spring Kafka リトライ + DLQ設定 ===
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<String, Object> kafkaTemplate) {
// DLQに送るrecoverer
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(
record.topic() + ".DLQ", record.partition()));
// Exponential Backoff リトライ設定
ExponentialBackOff backOff = new ExponentialBackOff();
backOff.setInitialInterval(1000L); // 1秒
backOff.setMultiplier(2.0); // 2倍ずつ増加
backOff.setMaxInterval(60000L); // 最大60秒
backOff.setMaxElapsedTime(300000L); // 最大5分
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(recoverer, backOff);
// リトライしない例外(ビジネスエラー)
errorHandler.addNotRetryableExceptions(
InvalidOrderException.class,
DuplicateEventException.class
);
return errorHandler;
}
}
8-2. DLQ モニタリングとアラート
// === DLQモニタリングサービス ===
@Component
@RequiredArgsConstructor
public class DlqMonitoringService {
private final MeterRegistry meterRegistry;
private final AlertService alertService;
@KafkaListener(topics = "order-events.DLQ", groupId = "dlq-monitor")
public void monitorDlq(
ConsumerRecord<String, Object> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic) {
// メトリクス記録
meterRegistry.counter("dlq.messages.received",
"originalTopic", originalTopic,
"errorType", classifyError(errorMessage)
).increment();
// アラート送信
alertService.sendAlert(DlqAlert.builder()
.originalTopic(originalTopic)
.messageKey(record.key())
.errorMessage(errorMessage)
.timestamp(Instant.ofEpochMilli(record.timestamp()))
.severity(classifySeverity(errorMessage))
.build()
);
log.error("DLQメッセージ受信: topic={}, key={}, error={}",
originalTopic, record.key(), errorMessage);
}
}
8-3. DLQ 再処理パイプライン
// === DLQメッセージ再処理 ===
@RestController
@RequestMapping("/api/dlq")
@RequiredArgsConstructor
public class DlqReprocessController {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final DlqMessageRepository dlqRepository;
// 単件再処理
@PostMapping("/reprocess/single")
public ResponseEntity<String> reprocessSingle(
@RequestParam String messageId) {
DlqMessage message = dlqRepository.findById(messageId).orElseThrow();
// 元のトピックに再発行
kafkaTemplate.send(
message.getOriginalTopic(),
message.getKey(),
message.getValue()
);
message.setStatus("REPROCESSED");
message.setReprocessedAt(Instant.now());
dlqRepository.save(message);
return ResponseEntity.ok("再処理リクエスト完了: " + messageId);
}
// バッチ再処理
@PostMapping("/reprocess/batch")
public ResponseEntity<String> reprocessBatch(
@RequestParam String originalTopic,
@RequestParam(defaultValue = "100") int batchSize) {
List<DlqMessage> messages = dlqRepository
.findByOriginalTopicAndStatus(originalTopic, "PENDING",
PageRequest.of(0, batchSize));
int count = 0;
for (DlqMessage message : messages) {
kafkaTemplate.send(message.getOriginalTopic(),
message.getKey(), message.getValue());
message.setStatus("REPROCESSED");
count++;
}
dlqRepository.saveAll(messages);
return ResponseEntity.ok(count + "件の再処理リクエスト完了");
}
}
9. Exactly-Once Semantics
メッセージ配信保証レベルには3種類あります。
| 保証レベル | 説明 | 複雑度 |
|---|---|---|
| At-most-once | 最大1回配信(紛失の可能性) | 低 |
| At-least-once | 最低1回配信(重複の可能性) | 中 |
| Exactly-once | 正確に1回配信 | 高 |
9-1. Kafka Exactly-Once 実装
Kafkaは0.11からExactly-Once Semantics(EOS)をサポートしています。
// === Kafka Exactly-Once設定 ===
@Configuration
public class KafkaExactlyOnceConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-");
config.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
}
9-2. End-to-End Exactly-Once
真のEnd-to-End Exactly-Onceは、ProducerからConsumerの最終状態保存まで保証する必要があります。
End-to-End Exactly-Onceチェーン:
Producer (idempotent + transactions)
-> Kafka Broker (transaction log)
-> Consumer (read_committed + manual offset)
-> Database (idempotency key + business logic)
チェーンの全リンクが連携してExactly-Onceを保証
// === End-to-End Exactly-Once処理 ===
@Component
@RequiredArgsConstructor
public class ExactlyOnceOrderProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
private final ProcessedOffsetRepository offsetRepository;
@KafkaListener(topics = "payment-completed")
@Transactional("chainedKafkaTransactionManager")
public void processPaymentCompleted(
ConsumerRecord<String, PaymentCompletedEvent> record) {
String offsetKey = record.topic() + "-" +
record.partition() + "-" + record.offset();
// 既に処理済みのオフセットか確認
if (offsetRepository.existsById(offsetKey)) {
return;
}
// ビジネスロジック
PaymentCompletedEvent event = record.value();
Order order = orderRepository.findById(event.orderId()).orElseThrow();
order.markAsPaid(event.paymentId());
orderRepository.save(order);
// 次のステップのイベント発行(同じKafkaトランザクション)
kafkaTemplate.send("order-paid",
new OrderPaidEvent(order.getId(), event.paymentId()));
// 処理済みオフセットを記録(同じDBトランザクション)
offsetRepository.save(new ProcessedOffset(offsetKey, Instant.now()));
}
}
10. Reactive Streams vs Virtual Threads(2025)
10-1. Project Reactor:背圧(Backpressure)が核心
リアクティブストリームはノンブロッキングI/Oと背圧(Backpressure)メカニズムを核心設計原則とします。
// === Reactorベースの非同期処理 ===
@Service
public class ReactiveOrderService {
public Flux<OrderEvent> processOrderStream() {
return kafkaReceiver.receive()
.flatMap(record -> {
OrderEvent event = record.value();
return processEvent(event)
.then(record.receiverOffset().commit())
.thenReturn(event);
}, 16) // 同時処理数制限(背圧)
.onErrorResume(e -> {
log.error("処理失敗", e);
return Mono.empty();
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
private Mono<Void> processEvent(OrderEvent event) {
return Mono.fromCallable(() -> {
// ビジネスロジック
return null;
}).subscribeOn(Schedulers.boundedElastic()).then();
}
}
10-2. Java 21 Virtual Threads:よりシンプルなコード
Virtual Threadsは従来の同期コードスタイルを維持しながら、高い同時実行性を実現します。
// === Virtual Threadsベースの非同期処理 ===
@Configuration
public class VirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
};
}
}
@Service
public class VirtualThreadOrderService {
// 同期コードだがVirtual Thread上で実行
// -> ブロッキングI/O時にOSスレッドではなくVirtual Threadのみブロッキング
public OrderResult processOrder(UUID orderId) {
// 並列処理:StructuredTaskScope活用
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<PaymentResult> paymentTask = scope.fork(() ->
paymentService.processPayment(orderId)
);
Subtask<InventoryResult> inventoryTask = scope.fork(() ->
inventoryService.checkInventory(orderId)
);
scope.join();
scope.throwIfFailed();
return new OrderResult(
paymentTask.get(),
inventoryTask.get()
);
}
}
}
10-3. 選択ガイド:意思決定マトリクス
| 基準 | Reactive (Reactor/WebFlux) | Virtual Threads |
|---|---|---|
| コード複雑度 | 高い(リアクティブ演算子の学習) | 低い(既存の同期コード) |
| 背圧制御 | 内蔵(Flux/Mono) | 手動実装が必要 |
| デバッグ | 困難(複雑なスタックトレース) | 容易(従来のスタックトレース) |
| メモリ効率 | 非常に良い | 良い(OSスレッドより) |
| I/O集約ワークロード | 最適 | 非常に良い |
| CPU集約ワークロード | 不向き | 不向き(プラットフォームスレッド使用) |
| レガシーコード移行 | 全面書き換えが必要 | 設定変更のみで可能 |
| エコシステム成熟度 | 高い(5年以上) | 成長中(2023〜) |
推奨ガイド:
- 新プロジェクト + シンプルなI/O:Virtual Threads
- ストリーミングデータ処理:Reactive Streams
- 既存Spring MVCプロジェクト:Virtual Threads(移行コスト最小)
- リアルタイムデータパイプライン:Reactive Streams
11. 実践アーキテクチャ:決済システム設計
全てのパターンを統合したECサイト決済システムの全体アーキテクチャを設計します。
ECサイト決済システム全体アーキテクチャ
====================================
[Client] -> [API Gateway]
|
v
[Order Service] --- Write DB (PostgreSQL)
| |
| (Outbox Pattern) | (CDC - Debezium)
| v
| [Kafka: order-events]
| / | \
v v v v
[Saga Orchestrator] [Payment] [Inventory] [Notification]
(Temporal) Service Service Service
| | |
| commands | |
v v v
[payment-commands] [payment- [inventory-
events] events]
|
[DLQ + Monitoring]
|
[Alerting (PagerDuty)]
Read Side:
[Kafka: order-events] -> [Order Projection] -> Read DB (MongoDB)
-> [Analytics Service] -> Data Warehouse
-> [Search Service] -> Elasticsearch
適用パターンまとめ
| パターン | 適用箇所 | 目的 |
|---|---|---|
| CQRS | Order Service | 注文の読み取り/書き込み分離 |
| Saga (Orchestration) | Temporal Orchestrator | 決済-在庫-配送トランザクション管理 |
| Event Sourcing | Order Aggregate | 注文状態変更履歴の管理 |
| Outbox Pattern | Order Service → Kafka | DB-メッセージの原子性保証 |
| 冪等性 | 全Consumer | 重複メッセージ処理の防止 |
| DLQ | 全Kafka Consumer | 処理失敗メッセージの隔離 |
| Exactly-Once | 決済処理 | 二重決済の防止 |
クイズ
Q1. CQRSでWrite ModelとRead Modelが異なるデータベースを使用する場合、2つのモデル間のデータ一貫性を保証するために最も一般的に使用されるアプローチは?
正解:イベントベースの非同期同期(Eventual Consistency)
Write Modelが状態を変更する際にイベントを発行し、Read Modelのプロジェクションハンドラーがこのイベントを購読してRead DBを更新します。2つのモデル間には一時的な不整合(数ms〜数秒)が存在し得ますが、最終的に一貫性が保証されます。
強い一貫性が必要な場合は、CQRSの適合性を再検討すべきです。
Q2. Choreography SagaでサービスAがイベントを発行し、サービスBが処理に失敗した場合、サービスAはどのようにこれを認知できますか?
正解: サービスBが失敗イベント(例:PaymentFailed)を発行し、サービスAがこれを購読して認知します。Choreographyでは各サービスが成功/失敗イベントの両方を発行する必要があり、関連サービスがこれを購読して適切に対応します。
これがChoreographyの短所の一つです。サービスが増えるほどイベントフローの追跡が複雑になり、タイムアウトベースの検知が追加で必要になることがあります。
Q3. Event Sourcingでイベントが100万個蓄積されたAggregateをロードする際のパフォーマンス問題を解決する核心技法は?
正解:Snapshotパターン
定期的に(例:100イベントごと)Aggregateの現在の状態をスナップショットとして保存します。ロード時に最新のスナップショットから開始し、その後のイベントだけをリプレイします。
例:100万個のイベント中、最後のスナップショットが999,900番目であれば、100個のイベントだけリプレイすれば良いです。
Q4. Outbox PatternでCDC(Change Data Capture)方式がPolling方式より好まれる主な理由は?
正解: CDCはDBのトランザクションログ(WAL)を直接読み取るため、ほぼリアルタイム(数ms)で変更を検知します。Pollingは定期的にクエリを実行するため遅延が大きく(数秒〜数分)、頻繁なクエリでDBに負荷をかけます。
また、CDCはWALの順序そのままでイベントを配信するため順序保証が自然で、DBトランザクションログを読むため追加のクエリ負荷がほとんどありません。
Q5. KafkaでExactly-Once Semanticsを達成するには、Producer、Broker、Consumerそれぞれでどのような設定が必要ですか?
正解:
- Producer:enable.idempotence=true + transactional.id設定(冪等プロデューサー + トランザクション)
- Broker:トランザクションログの維持(デフォルトで有効)
- Consumer:isolation.level=read_committed + 手動オフセットコミット + 冪等処理
End-to-End Exactly-Onceのためには、ConsumerがDBに結果を保存する際にも冪等性を保証する必要があります。Kafkaトランザクションはkafka内部のExactly-Onceのみを保証し、外部システム(DB)まで含めるにはConsumer側の冪等性が必須です。
参考資料
- Martin Fowler — "Event Sourcing" (martinfowler.com)
- Chris Richardson — "Microservices Patterns" (Manning, 2018)
- Vaughn Vernon — "Implementing Domain-Driven Design" (Addison-Wesley, 2013)
- Greg Young — "CQRS Documents" (cqrs.files.wordpress.com)
- Apache Kafka Documentation — "Exactly-Once Semantics" (kafka.apache.org)
- Debezium Documentation — "Outbox Event Router" (debezium.io)
- Temporal.io Documentation — "Saga Pattern with Temporal" (docs.temporal.io)
- Netflix Tech Blog — "Evolution of the Netflix Data Pipeline" (netflixtechblog.com)
- Uber Engineering Blog — "Building Reliable Reprocessing and Dead Letter Queues" (eng.uber.com)
- Confluent Blog — "Exactly-Once Semantics Are Possible" (confluent.io)
- AWS Architecture Blog — "Saga Orchestration Pattern" (aws.amazon.com)
- Microsoft Azure Docs — "CQRS Pattern" (learn.microsoft.com)
- Spring for Apache Kafka Reference — "Error Handling and DLQ" (docs.spring.io)
- JEP 444 — "Virtual Threads" (openjdk.org)
- Project Reactor Reference — "Backpressure" (projectreactor.io)
- Pat Helland — "Idempotence Is Not a Medical Condition" (ACM Queue, 2012)