- Published on
Event-Driven Architecture 実践ガイド:CQRS、Event Sourcing、Sagaパターン
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- はじめに
- Event-Driven Architectureの基礎
- CQRSパターン:CommandとQueryの分離
- Event Sourcing:イベントベースの状態管理
- Sagaパターン:分散トランザクション管理
- Choreography vs Orchestration比較
- 運用時の注意事項とトラブルシューティング
- まとめ
- 参考資料

はじめに
マイクロサービスアーキテクチャにおける最大の課題の一つは、複数のサービスにまたがるデータ整合性とビジネストランザクションの管理である。従来のモノリスでは単一データベースのACIDトランザクションで解決できたが、サービスごとに独立したデータストアを持つ分散環境では2PC(Two-Phase Commit)の限界が明確になる。パフォーマンスボトルネック、可用性の低下、サービス間の強い結合が発生するからである。
Event-Driven Architecture(EDA)はこの問題に対する根本的な解決策を提示する。サービス間通信を非同期イベントベースに切り替え、状態変更をイベントストリームで管理し、分散トランザクションを補償ベースのサガで処理する。本記事ではEDAの三つの核心パターンであるCQRS、Event Sourcing、Sagaを実践コードとともに深く分析し、本番運用に必要な戦略とトラブルシューティング技法を扱う。
Netflixは2億6千万の加入者向けパーソナライゼーションシステムに、LinkedInは1日数兆個のイベント処理に、Slackは数十億件の日次メッセージ処理にこれらのパターンを活用している。これらの大規模システムの経験で検証されたパターンとアンチパターンを一緒に見ていこう。
Event-Driven Architectureの基礎
イベントの三つの類型
EDAにおけるイベントはその目的に応じて三つに分類される。
| 類型 | 説明 | 例 | 特徴 |
|---|---|---|---|
| Domain Event | ビジネスドメインで発生した事実の記録 | OrderPlaced, PaymentCompleted | 不変、過去形の命名 |
| Integration Event | サービス間通信のためのイベント | OrderPlacedIntegrationEvent | 境界コンテキストの境界を越える |
| Event Notification | 変更発生の通知(データ最小化) | OrderStatusChanged(IDのみ) | 受信者が必要なデータを直接照会 |
核心原則
EDAを正しく実装するための核心原則は以下の通り。
- 非同期通信:プロデューサーとコンシューマーが時間的に分離される
- 疎結合:サービスはイベントスキーマのみ知っていればよく、相手サービスの実装を知る必要がない
- 結果整合性:即座の強い整合性の代わりに一定時間後に整合性が保証される
- 冪等性:同一イベントが複数回処理されても結果が同一でなければならない
// TypeScript - Domain Event basic structure
interface DomainEvent {
eventId: string
eventType: string
aggregateId: string
aggregateType: string
timestamp: Date
version: number
payload: Record<string, unknown>
metadata: {
correlationId: string
causationId: string
userId?: string
}
}
// Order creation event example
const orderPlacedEvent: DomainEvent = {
eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
eventType: 'OrderPlaced',
aggregateId: 'order-12345',
aggregateType: 'Order',
timestamp: new Date('2026-03-14T09:00:00Z'),
version: 1,
payload: {
customerId: 'cust-67890',
items: [
{ productId: 'prod-001', quantity: 2, price: 29900 },
{ productId: 'prod-002', quantity: 1, price: 15000 },
],
totalAmount: 74800,
shippingAddress: {
city: 'ソウル',
district: '江南区',
detail: 'テヘラン路123',
},
},
metadata: {
correlationId: 'corr-abc123',
causationId: 'cmd-place-order-001',
userId: 'user-admin-01',
},
}
CQRSパターン:CommandとQueryの分離
CQRSとは
CQRS(Command Query Responsibility Segregation)はデータの書き込み(Command)と読み取り(Query)を別々のモデルに分離するパターンである。Bertrand MeyerのCQS(Command Query Separation)原則をアーキテクチャレベルに拡張したもので、Greg Youngが2010年に体系化した。
従来のCRUDモデルでは同一のデータモデルが読み取りと書き込みの両方を担当する。しかし実際のビジネスでは読み取りと書き込みの要件は大きく異なる。
| 観点 | Command(書き込み) | Query(読み取り) |
|---|---|---|
| 目的 | 状態変更、ビジネスルール検証 | データ照会、画面表示 |
| 比率 | 全体トラフィックの10-20% | 全体トラフィックの80-90% |
| 整合性 | 強い整合性が必要 | 結果整合性で許容可能 |
| モデル複雑度 | 豊富なドメインロジック | 非正規化された照会モデル |
| スケーリング | 主に垂直スケーリング | 水平スケーリングが容易(キャッシュ、レプリカ) |
CQRS実装:TypeScript例
// TypeScript - CQRS Command Side
// Command definition
interface PlaceOrderCommand {
type: 'PlaceOrder'
customerId: string
items: Array<{
productId: string
quantity: number
price: number
}>
shippingAddress: string
}
// Command Handler
class PlaceOrderHandler {
constructor(
private orderRepository: OrderWriteRepository,
private eventBus: EventBus,
private inventoryService: InventoryService
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// 1. ビジネスルール検証
await this.inventoryService.validateStock(command.items)
// 2. Aggregate作成
const order = Order.create({
customerId: command.customerId,
items: command.items,
shippingAddress: command.shippingAddress,
})
// 3. 書き込みストアに保存
await this.orderRepository.save(order)
// 4. ドメインイベント発行(読み取りモデル同期用)
for (const event of order.getDomainEvents()) {
await this.eventBus.publish(event)
}
return order.id
}
}
// Query Side - 読み取り専用モデル
interface OrderReadModel {
orderId: string
customerName: string
orderDate: string
status: string
totalAmount: number
itemCount: number
lastUpdated: string
}
// Query Handler
class GetOrdersQueryHandler {
constructor(private readDb: ReadDatabase) {}
async handle(query: {
customerId: string
status?: string
page: number
limit: number
}): Promise<OrderReadModel[]> {
// 読み取り専用の非正規化テーブルから直接照会
return this.readDb.query(
`SELECT order_id, customer_name, order_date, status,
total_amount, item_count, last_updated
FROM order_read_model
WHERE customer_id = ?
${query.status ? 'AND status = ?' : ''}
ORDER BY order_date DESC
LIMIT ? OFFSET ?`,
[query.customerId, query.status, query.limit, query.page * query.limit]
)
}
}
読み取りモデルプロジェクション
イベントを受信して読み取りモデルを更新するプロジェクションロジックはCQRSの核心である。
// TypeScript - Event-based Projection
class OrderProjection {
constructor(private readDb: ReadDatabase) {}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderPlaced':
await this.onOrderPlaced(event)
break
case 'OrderShipped':
await this.onOrderShipped(event)
break
case 'OrderCancelled':
await this.onOrderCancelled(event)
break
}
}
private async onOrderPlaced(event: DomainEvent): Promise<void> {
const payload = event.payload as {
customerId: string
items: Array<{ quantity: number; price: number }>
totalAmount: number
}
await this.readDb.upsert('order_read_model', {
order_id: event.aggregateId,
customer_id: payload.customerId,
status: 'PLACED',
total_amount: payload.totalAmount,
item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
order_date: event.timestamp,
last_updated: event.timestamp,
version: event.version,
})
}
private async onOrderShipped(event: DomainEvent): Promise<void> {
const payload = event.payload as { trackingNumber: string }
// 冪等性保証:バージョンチェック
await this.readDb.updateWhere(
'order_read_model',
{
status: 'SHIPPED',
tracking_number: payload.trackingNumber,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId, version: event.version - 1 }
)
}
private async onOrderCancelled(event: DomainEvent): Promise<void> {
const payload = event.payload as { reason: string }
await this.readDb.updateWhere(
'order_read_model',
{
status: 'CANCELLED',
cancellation_reason: payload.reason,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId }
)
}
}
Event Sourcing:イベントベースの状態管理
Event Sourcingの核心概念
Event Sourcingはアグリゲートの現在の状態を保存する代わりに、状態を変更したすべてのイベントを順序通りに保存するパターンである。現在の状態はイベントストリームを最初からリプレイして再構成する。
従来の方式とEvent Sourcingの違いを比較すると以下の通り。
| 観点 | 従来のCRUD | Event Sourcing |
|---|---|---|
| 保存対象 | 現在の状態(最新スナップショット) | 状態変更イベントの全履歴 |
| データ損失 | 以前の状態は消失 | すべての変更履歴を保存 |
| 監査 | 別途実装が必要 | 基本内蔵 |
| デバッグ | 現在の状態のみ確認可能 | タイムトラベル可能 |
| ストレージ | 比較的少ない | イベント累積で増加 |
| 照会性能 | 直接照会可能 | プロジェクションまたはスナップショットが必要 |
Event Sourcing実装
// TypeScript - Event Sourcing Aggregate
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
protected version: number = 0
abstract get id(): string
protected apply(event: DomainEvent): void {
this.when(event)
this.version++
this.uncommittedEvents.push(event)
}
protected abstract when(event: DomainEvent): void
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this.version++
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
}
class Order extends EventSourcedAggregate {
private _id: string = ''
private _customerId: string = ''
private _items: OrderItem[] = []
private _status: OrderStatus = OrderStatus.DRAFT
private _totalAmount: number = 0
get id(): string {
return this._id
}
static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
const order = new Order()
const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
order.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderPlaced',
aggregateId: params.orderId,
aggregateType: 'Order',
timestamp: new Date(),
version: 1,
payload: {
customerId: params.customerId,
items: params.items,
totalAmount,
},
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'create',
},
})
return order
}
confirm(): void {
if (this._status !== OrderStatus.PLACED) {
throw new Error(`Cannot confirm order in status: ${this._status}`)
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderConfirmed',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { confirmedAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'confirm',
},
})
}
cancel(reason: string): void {
if (this._status === OrderStatus.CANCELLED) {
return // 冪等性保証
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderCancelled',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { reason, cancelledAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'cancel',
},
})
}
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced': {
const p = event.payload as {
customerId: string
items: OrderItem[]
totalAmount: number
}
this._id = event.aggregateId
this._customerId = p.customerId
this._items = p.items
this._totalAmount = p.totalAmount
this._status = OrderStatus.PLACED
break
}
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED
break
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED
break
}
}
}
enum OrderStatus {
DRAFT = 'DRAFT',
PLACED = 'PLACED',
CONFIRMED = 'CONFIRMED',
SHIPPED = 'SHIPPED',
CANCELLED = 'CANCELLED',
}
interface OrderItem {
productId: string
quantity: number
price: number
}
スナップショット戦略
イベント数が増えるとリプレイ時間が長くなる。スナップショットは特定時点の状態をキャッシュしてリプレイ性能を改善する。
# Python - Snapshot-based Event Store
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
@dataclass
class Snapshot:
aggregate_id: str
aggregate_type: str
version: int
state: dict
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
SNAPSHOT_INTERVAL = 100 # 100イベントごとにスナップショット作成
def __init__(self, db_connection):
self.db = db_connection
async def save_events(
self, aggregate_id: str, events: list[dict], expected_version: int
) -> None:
"""楽観的同時実行制御とともにイベントを保存"""
async with self.db.transaction():
current_version = await self._get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but current version is {current_version}"
)
for i, event in enumerate(events):
version = expected_version + i + 1
await self.db.execute(
"""INSERT INTO event_store
(event_id, aggregate_id, aggregate_type,
event_type, version, payload, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
event["event_id"],
aggregate_id,
event["aggregate_type"],
event["event_type"],
version,
json.dumps(event["payload"]),
json.dumps(event["metadata"]),
datetime.utcnow(),
),
)
new_version = expected_version + len(events)
if new_version % self.SNAPSHOT_INTERVAL == 0:
await self._create_snapshot(aggregate_id, new_version)
async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
"""スナップショットからイベントをロードして状態を復元"""
snapshot = await self._get_latest_snapshot(aggregate_id)
if snapshot:
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ? AND version > ?
ORDER BY version ASC""",
(aggregate_id, snapshot.version),
)
return events, snapshot
else:
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ?
ORDER BY version ASC""",
(aggregate_id,),
)
return events, None
async def _create_snapshot(
self, aggregate_id: str, version: int
) -> None:
"""現在の状態をスナップショットとして保存"""
events, _ = await self.load_aggregate(aggregate_id)
aggregate = self._rebuild_aggregate(events)
await self.db.execute(
"""INSERT INTO snapshots
(aggregate_id, aggregate_type, version, state, created_at)
VALUES (?, ?, ?, ?, ?)""",
(
aggregate_id,
aggregate.aggregate_type,
version,
json.dumps(aggregate.to_dict()),
datetime.utcnow(),
),
)
async def _get_current_version(self, aggregate_id: str) -> int:
result = await self.db.fetch_one(
"SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
(aggregate_id,),
)
return result[0] if result[0] else 0
class ConcurrencyError(Exception):
pass
Sagaパターン:分散トランザクション管理
Sagaパターンとは
Sagaパターンは分散環境で複数のサービスにまたがるビジネストランザクションを管理するパターンである。従来の分散トランザクション(2PC)と異なり、各サービスのローカルトランザクションを順次実行し、失敗時には既に完了したステップに対して補償トランザクションを実行して整合性を復元する。
注文処理Sagaの流れを例にすると以下の通り。
正常フロー:
- 注文サービス:注文作成(OrderCreated)
- 決済サービス:決済処理(PaymentProcessed)
- 在庫サービス:在庫確保(InventoryReserved)
- 配送サービス:配送作成(ShipmentCreated)
失敗時の補償フロー(ステップ3の在庫確保失敗時):
- 決済サービス:決済返金(PaymentRefunded)-- 補償
- 注文サービス:注文キャンセル(OrderCancelled)-- 補償
Orchestrationベースのsaga実装
// TypeScript - Saga Orchestrator (Order Processing)
interface SagaStep {
name: string
action: () => Promise<void>
compensation: () => Promise<void>
}
class OrderSagaOrchestrator {
private completedSteps: SagaStep[] = []
private sagaLog: SagaLogEntry[] = []
constructor(
private paymentService: PaymentService,
private inventoryService: InventoryService,
private shippingService: ShippingService,
private sagaStore: SagaStore
) {}
async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
const sagaId = crypto.randomUUID()
const steps: SagaStep[] = [
{
name: 'ProcessPayment',
action: async () => {
await this.paymentService.processPayment({
orderId,
amount: orderData.totalAmount,
customerId: orderData.customerId,
})
},
compensation: async () => {
await this.paymentService.refundPayment({
orderId,
amount: orderData.totalAmount,
})
},
},
{
name: 'ReserveInventory',
action: async () => {
await this.inventoryService.reserve({
orderId,
items: orderData.items,
})
},
compensation: async () => {
await this.inventoryService.releaseReservation({
orderId,
items: orderData.items,
})
},
},
{
name: 'CreateShipment',
action: async () => {
await this.shippingService.createShipment({
orderId,
address: orderData.shippingAddress,
items: orderData.items,
})
},
compensation: async () => {
await this.shippingService.cancelShipment({ orderId })
},
},
]
try {
for (const step of steps) {
await this.logStep(sagaId, step.name, 'STARTED')
try {
await step.action()
this.completedSteps.push(step)
await this.logStep(sagaId, step.name, 'COMPLETED')
} catch (error) {
await this.logStep(sagaId, step.name, 'FAILED', error)
await this.compensate(sagaId)
return {
success: false,
sagaId,
failedStep: step.name,
error: (error as Error).message,
}
}
}
await this.sagaStore.markCompleted(sagaId)
return { success: true, sagaId }
} catch (compensationError) {
await this.sagaStore.markRequiresIntervention(sagaId)
throw new SagaCompensationFailedError(sagaId, compensationError as Error)
}
}
private async compensate(sagaId: string): Promise<void> {
const stepsToCompensate = [...this.completedSteps].reverse()
for (const step of stepsToCompensate) {
try {
await this.logStep(sagaId, step.name, 'COMPENSATING')
await step.compensation()
await this.logStep(sagaId, step.name, 'COMPENSATED')
} catch (error) {
await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
await this.sagaStore.enqueueRetry(sagaId, step.name)
throw error
}
}
}
private async logStep(
sagaId: string,
stepName: string,
status: string,
error?: unknown
): Promise<void> {
const entry: SagaLogEntry = {
sagaId,
stepName,
status,
timestamp: new Date(),
error: error ? (error as Error).message : undefined,
}
this.sagaLog.push(entry)
await this.sagaStore.appendLog(entry)
}
}
interface SagaResult {
success: boolean
sagaId: string
failedStep?: string
error?: string
}
interface SagaLogEntry {
sagaId: string
stepName: string
status: string
timestamp: Date
error?: string
}
Choreography vs Orchestration比較
Sagaパターンの二つの実装方式を詳細に比較する。
| 比較項目 | Choreography(コレオグラフィ) | Orchestration(オーケストレーション) |
|---|---|---|
| 制御方式 | 分散 - 各サービスがイベントを発行/購読 | 中央集中 - オーケストレーターがフロー制御 |
| 結合度 | 低い(イベントスキーマのみ共有) | 中間(オーケストレーターが全サービスを知る必要) |
| 可視性 | 低い(フロー追跡が困難) | 高い(オーケストレーターで状態確認可能) |
| 複雑度管理 | 参加サービス増加時に急激に複雑 | 線形に増加 |
| 単一障害点 | なし | オーケストレーターがSPOFになりうる |
| 補償ロジック | 各サービスに分散 | オーケストレーターに集中 |
| テスト | 統合テストが困難 | オーケストレーター単体テストが容易 |
| 適切な規模 | 2-4サービス参加の単純なワークフロー | 5個以上のサービスの複雑なワークフロー |
| 代表ツール | Kafka, RabbitMQ, SNS/SQS | Temporal, Camunda, AWS Step Functions |
Choreographyパターンコード例
# Python - Choreography-based Saga (event subscription approach)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class EventBus:
"""Simple in-memory event bus (use Kafka/RabbitMQ in production)"""
def __init__(self):
self._handlers: dict[str, list] = {}
def subscribe(self, event_type: str, handler):
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, payload: dict):
handlers = self._handlers.get(event_type, [])
for handler in handlers:
await handler(payload)
class PaymentService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
event_bus.subscribe("OrderCreated", self.on_order_created)
event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)
async def on_order_created(self, payload: dict):
"""注文作成時に決済処理"""
try:
order_id = payload["order_id"]
amount = payload["total_amount"]
payment_result = await self._process_payment(
order_id, amount
)
await self.event_bus.publish("PaymentProcessed", {
"order_id": order_id,
"payment_id": payment_result["payment_id"],
"amount": amount,
})
except Exception as e:
await self.event_bus.publish("PaymentFailed", {
"order_id": payload["order_id"],
"reason": str(e),
})
async def on_inventory_failed(self, payload: dict):
"""在庫不足時に決済返金(補償トランザクション)"""
order_id = payload["order_id"]
await self._refund_payment(order_id)
await self.event_bus.publish("PaymentRefunded", {
"order_id": order_id,
})
async def _process_payment(self, order_id: str, amount: int) -> dict:
return {"payment_id": f"pay-{order_id}"}
async def _refund_payment(self, order_id: str) -> None:
pass
class InventoryService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
event_bus.subscribe("PaymentProcessed", self.on_payment_processed)
async def on_payment_processed(self, payload: dict):
"""決済完了時に在庫確保"""
try:
order_id = payload["order_id"]
await self._reserve_inventory(order_id)
await self.event_bus.publish("InventoryReserved", {
"order_id": order_id,
})
except InsufficientStockError:
await self.event_bus.publish("InventoryReservationFailed", {
"order_id": payload["order_id"],
"reason": "insufficient_stock",
})
class InsufficientStockError(Exception):
pass
ハイブリッドアプローチ:いつどちらを選ぶか
実務では純粋なChoreographyやOrchestrationのどちらか一つだけを使うより、ワークフローの複雑度に応じて混合するのが効果的である。
- Choreography選択:2-3サービスだけが参加する単純な通知、キャッシュ無効化、ログ収集など
- Orchestration選択:決済-在庫-配送のように順序と補償が重要なコアビジネスフロー
- ハイブリッド:コアビジネスフローはOrchestrationで、副作用(メール送信、通知、分析イベント)はChoreographyで処理
運用時の注意事項とトラブルシューティング
1. 冪等性の確保
分散環境でイベントは「少なくとも一回(at-least-once)」配信が保証されるため、同一イベントが重複処理される可能性がある。すべてのイベントハンドラーは必ず冪等性を保証しなければならない。
// TypeScript - Idempotency guarantee pattern
class IdempotentEventHandler {
constructor(
private processedEvents: ProcessedEventStore,
private handler: EventHandler
) {}
async handle(event: DomainEvent): Promise<void> {
const isProcessed = await this.processedEvents.exists(event.eventId)
if (isProcessed) {
console.log(`Event ${event.eventId} already processed, skipping`)
return
}
try {
await this.handler.handle(event)
await this.processedEvents.markAsProcessed(event.eventId, {
processedAt: new Date(),
ttl: 60 * 60 * 24 * 7,
})
} catch (error) {
throw error
}
}
}
class RedisProcessedEventStore implements ProcessedEventStore {
constructor(private redis: Redis) {}
async exists(eventId: string): Promise<boolean> {
const result = await this.redis.exists(`processed:${eventId}`)
return result === 1
}
async markAsProcessed(
eventId: string,
options: { processedAt: Date; ttl: number }
): Promise<void> {
await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
}
}
2. イベント順序の保証
Kafkaでイベント順序を保証するには、同一Aggregateのイベントが同一パーティションにルーティングされる必要がある。Aggregate IDをパーティションキーとして使用するのが一般的である。
3. スキーマ進化
イベントは不変であるため、スキーマ変更時には下位互換性を維持する必要がある。AvroやProtobufのようなスキーマレジストリを使用するか、アップキャスティングパターンを適用する。
// TypeScript - Event Upcaster (Schema Evolution)
class EventUpcaster {
private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()
register(
eventType: string,
fromVersion: number,
upcaster: (event: DomainEvent) => DomainEvent
): void {
if (!this.upcasters.has(eventType)) {
this.upcasters.set(eventType, new Map())
}
this.upcasters.get(eventType)!.set(fromVersion, upcaster)
}
upcast(event: DomainEvent): DomainEvent {
const typeUpcasters = this.upcasters.get(event.eventType)
if (!typeUpcasters) return event
let currentEvent = event
let schemaVersion = (event.metadata as any).schemaVersion || 1
while (typeUpcasters.has(schemaVersion)) {
const upcasterFn = typeUpcasters.get(schemaVersion)!
currentEvent = upcasterFn(currentEvent)
schemaVersion++
}
return currentEvent
}
}
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => {
const payload = event.payload as any
return {
...event,
payload: {
...payload,
shippingAddress: {
full: payload.shippingAddress,
city: '',
zipCode: '',
},
},
metadata: {
...event.metadata,
schemaVersion: 2,
},
}
})
4. モニタリング核心指標
EDAシステムの健全性を確認するために以下の指標をモニタリングすべきである。
| 指標 | 説明 | 警告閾値 |
|---|---|---|
| Consumer Lag | コンシューマーの処理遅延(未処理イベント数) | 1000件以上 |
| Event Processing Latency | イベント発行から処理までの所要時間 | P99 5秒以上 |
| Saga Completion Rate | Saga成功率 | 99%未満 |
| Compensation Failure Rate | 補償トランザクション失敗率 | 0.1%以上 |
| Projection Lag | 読み取りモデルと書き込みモデルの同期遅延 | 30秒以上 |
| Dead Letter Queue Size | 処理不可イベント数 | 0件超過時即時アラート |
まとめ
Event-Driven Architectureの三つの核心パターンであるCQRS、Event Sourcing、Sagaはそれぞれ独立しても強力だが、一緒に使うとマイクロサービスアーキテクチャのデータ整合性問題を根本的に解決する。
要点を整理すると以下の通り。
-
CQRS:読み取りと書き込みの非対称な要件を認めて分離すること。80-90%を占める読み取りトラフィックを独立して最適化できる。
-
Event Sourcing:現在の状態の代わりに変更履歴を保存すること。完全な監査追跡、タイムトラベルデバッグ、多様な読み取りモデル生成が可能になる。ただしスナップショット戦略とスキーマ進化戦略は最初から考慮すべきである。
-
Sagaパターン:分散トランザクションを補償ベースで管理すること。単純なフローはChoreographyで、複雑なビジネスロジックはOrchestrationで実装しつつ、補償トランザクションの失敗まで備えた復旧戦略を必ず用意すべきである。
-
イベントストア選択:純粋なEvent Sourcingが目的ならEventStoreDB、大容量ストリーミングならKafka、AWSサーバーレスならDynamoDBを基本に考慮しつつ、要件に応じて組み合わせることができる。
これらのパターンは強力だが、複雑さというコストが伴う。すべてのサービスに一括適用するより、ビジネス複雑度が高くデータ整合性が重要なコアドメインから段階的に導入するのが現実的な戦略である。運用モニタリング、冪等性保証、スキーマ進化戦略なしにこれらのパターンを導入すると、かえってシステムの安定性を損なう可能性があることを肝に銘じよう。
参考資料
- Microsoft Azure - CQRS Pattern - CQRSパターンの公式リファレンスアーキテクチャドキュメント
- Microservices.io - Event Sourcing Pattern - Chris RichardsonのEvent Sourcingパターン説明
- Microservices.io - Saga Pattern - 分散トランザクション管理のためのSagaパターン詳細ガイド
- EventStoreDB Documentation - EventStoreDB公式ドキュメントおよび開始ガイド
- Temporal.io - Mastering Saga Patterns - TemporalベースのSagaパターン実装マスターガイド
- ByteByteGo - Saga Pattern Demystified - Orchestration vs Choreography比較分析
- Microsoft - Exploring CQRS and Event Sourcing - Microsoft patterns and practices CQRS Journeyガイドブック
- Domain Centric - EventStoreDB vs Kafka - EventStoreDBとKafkaの詳細比較
- AWS - Event Sourcing on AWS - AWSベースEvent Sourcingアーキテクチャパターン
- Azure Architecture Center - Saga Design Pattern - MicrosoftのSagaデザインパターン公式ドキュメント