- Published on
Event-Driven Architecture + CQRS + Event Sourcing 実践実装:Kafka/RabbitMQ基盤の分散システム設計
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- はじめに
- Event-Driven Architecture コアパターン
- CQRS(Command Query Responsibility Segregation)深掘り
- Event Sourcing実装
- メッセージブローカー比較:Kafka vs RabbitMQ vs NATS
- Sagaパターン:分散トランザクション管理
- Snapshot戦略:イベントリプレイ最適化
- 運用上の注意事項
- 障害事例と復旧手順
- プロダクションチェックリスト
- 参考資料

はじめに
マイクロサービスアーキテクチャが一般化するにつれ、サービス間通信方式はシステムのスケーラビリティとレジリエンスを決定づける核心要素となった。同期REST呼び出しベースのアーキテクチャはサービス間の強い結合を生み、一つのサービス障害がシステム全体に伝播するカスケード障害を引き起こす。
Event-Driven Architecture(EDA)はこの問題を解決する代表的なパターンだ。サービスはイベントを発行(publish)し、関心のあるサービスがそれを購読(subscribe)して非同期で処理する。ここにCQRS(Command Query Responsibility Segregation)とEvent Sourcingを組み合わせると、読み取り/書き込みワークロードを独立してスケーリングし、すべての状態変更の監査証跡(audit trail)を自然に確保できる。
本記事ではEDAのコアパターンからCQRSとEvent Sourcingの実装、メッセージブローカー比較、Sagaパターン、そしてプロダクションで経験する障害事例と対応方法をコードレベルで扱う。
Event-Driven Architecture コアパターン
イベントの種類
EDAにおけるイベントは大きく3つに分類される。
| 種類 | 説明 | 例 | 特徴 |
|---|---|---|---|
| Domain Event | ビジネスドメインで発生した意味のある事実 | OrderPlaced, PaymentCompleted | 過去形命名、不変 |
| Integration Event | サービス境界を越えて伝播するイベント | OrderShipped(物流サービスへ) | サービス間の契約 |
| Notification Event | 単純な通知(データ最小化) | OrderStatusChanged | 受信者が詳細データを別途取得 |
EDA通信パターン比較
| パターン | 結合度 | 配信保証 | 順序保証 | ユースケース |
|---|---|---|---|---|
| Pub/Sub | 疎 | At-least-once | 保証なし | 通知、キャッシュ無効化 |
| Event Streaming | 疎 | At-least-once | パーティション内保証 | ログ集約、リアルタイム分析 |
| Event Sourcing | 自己参照 | 永続保存 | アグリゲート内保証 | 監査証跡、状態復元 |
| Request-Reply | 強 | 同期 | リクエスト-レスポンスペア | 同期的な確認が必要な場合 |
TypeScriptイベント基本構造
// ドメインイベント基本インターフェース
interface DomainEvent {
readonly eventId: string
readonly eventType: string
readonly aggregateId: string
readonly aggregateType: string
readonly version: number
readonly timestamp: Date
readonly metadata: EventMetadata
readonly payload: Record<string, unknown>
}
interface EventMetadata {
readonly correlationId: string
readonly causationId: string
readonly userId?: string
readonly traceId?: string
}
// 具体的なイベント例
class OrderPlacedEvent implements DomainEvent {
readonly eventType = 'OrderPlaced'
readonly aggregateType = 'Order'
constructor(
public readonly eventId: string,
public readonly aggregateId: string,
public readonly version: number,
public readonly timestamp: Date,
public readonly metadata: EventMetadata,
public readonly payload: {
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
totalAmount: number
currency: string
}
) {}
}
CQRS(Command Query Responsibility Segregation)深掘り
CQRSのコア原則
CQRSは読み取り(Query)モデルと書き込み(Command)モデルを完全に分離するパターンだ。従来のCRUDモデルでは同一のデータモデルで読み取りと書き込みを両方処理するが、CQRSではそれぞれの目的に最適化された別々のモデルを使用する。
// Command側 - 書き込みモデル
interface Command {
readonly commandId: string
readonly commandType: string
readonly timestamp: Date
}
class PlaceOrderCommand implements Command {
readonly commandType = 'PlaceOrder'
constructor(
public readonly commandId: string,
public readonly timestamp: Date,
public readonly customerId: string,
public readonly items: Array<{
productId: string
quantity: number
price: number
}>,
public readonly shippingAddress: string
) {}
}
// Command Handler
class PlaceOrderHandler {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// ビジネス検証
await this.validateInventory(command.items)
await this.validateCustomerCredit(command.customerId)
// アグリゲート生成とイベント発行
const order = Order.create(command.customerId, command.items, command.shippingAddress)
await this.orderRepository.save(order)
// ドメインイベント発行
for (const event of order.getUncommittedEvents()) {
await this.eventBus.publish(event)
}
return order.id
}
private async validateInventory(
items: Array<{ productId: string; quantity: number; price: number }>
): Promise<void> {
// 在庫確認ロジック
}
private async validateCustomerCredit(customerId: string): Promise<void> {
// 顧客与信確認ロジック
}
}
Query側 - 読み取りモデル
// Query側 - 読み取りに最適化された非正規化モデル
interface OrderSummaryReadModel {
orderId: string
customerName: string
customerEmail: string
orderDate: Date
status: string
totalAmount: number
itemCount: number
lastUpdated: Date
}
// Query Handler
class GetOrderSummaryHandler {
constructor(private readonly readDb: ReadDatabase) {}
async handle(orderId: string): Promise<OrderSummaryReadModel | null> {
// 読み取り専用DBから非正規化データを直接取得
return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])
}
}
// Projection - イベントを読み取りモデルに変換
class OrderSummaryProjection {
constructor(private readonly readDb: ReadDatabase) {}
async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
await this.readDb.execute(
`INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)
VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,
[
event.aggregateId,
event.payload.customerId,
event.timestamp,
event.payload.totalAmount,
event.payload.items.length,
new Date(),
]
)
}
async handleOrderShipped(event: DomainEvent): Promise<void> {
await this.readDb.execute(
`UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,
[new Date(), event.aggregateId]
)
}
}
Event Sourcing実装
Event Store設計
Event Sourcingでは状態を直接保存せず、状態変更イベントのシーケンスを保存する。現在の状態はイベントを順番にリプレイ(replay)して復元する。
// Event Storeインターフェース
interface EventStore {
append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>
getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>
getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>
}
// PostgreSQLベースのEvent Store実装
class PostgresEventStore implements EventStore {
constructor(private readonly pool: Pool) {}
async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// 楽観的同時実行制御(Optimistic Concurrency Control)
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
[aggregateId]
)
const currentVersion = result.rows[0]?.current_version ?? 0
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current version is ${currentVersion}`
)
}
// イベントバッチ挿入
for (const event of events) {
await client.query(
`INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.eventId,
event.aggregateId,
event.aggregateType,
event.eventType,
event.version,
JSON.stringify(event.payload),
JSON.stringify(event.metadata),
event.timestamp,
]
)
}
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
const result = await this.pool.query(
'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',
[aggregateId, fromVersion]
)
return result.rows.map(this.deserializeEvent)
}
async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {
const query = fromTimestamp
? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'
: 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'
const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]
const result = await this.pool.query(query, params)
return result.rows.map(this.deserializeEvent)
}
private deserializeEvent(row: any): DomainEvent {
return {
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
timestamp: row.timestamp,
metadata: JSON.parse(row.metadata),
payload: JSON.parse(row.payload),
}
}
}
アグリゲートとイベントリプレイ
// Event Sourcedアグリゲート
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
protected version: number = 0
abstract get id(): string
protected apply(event: DomainEvent): void {
this.when(event)
this.version = event.version
this.uncommittedEvents.push(event)
}
protected abstract when(event: DomainEvent): void
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this.version = event.version
}
}
}
// Orderアグリゲート
class Order extends EventSourcedAggregate {
private _id: string = ''
private _customerId: string = ''
private _status: OrderStatus = OrderStatus.DRAFT
private _items: OrderItem[] = []
private _totalAmount: number = 0
get id(): string {
return this._id
}
static create(
customerId: string,
items: Array<{ productId: string; quantity: number; price: number }>,
shippingAddress: string
): Order {
const order = new Order()
const orderId = generateUUID()
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)
order.apply({
eventId: generateUUID(),
eventType: 'OrderPlaced',
aggregateId: orderId,
aggregateType: 'Order',
version: 1,
timestamp: new Date(),
metadata: { correlationId: generateUUID(), causationId: generateUUID() },
payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },
})
return order
}
confirm(): void {
if (this._status !== OrderStatus.PLACED) {
throw new Error('Order can only be confirmed when in PLACED status')
}
this.apply({
eventId: generateUUID(),
eventType: 'OrderConfirmed',
aggregateId: this._id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
metadata: { correlationId: generateUUID(), causationId: generateUUID() },
payload: { confirmedAt: new Date().toISOString() },
})
}
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this._id = event.aggregateId
this._customerId = event.payload.customerId as string
this._status = OrderStatus.PLACED
this._items = event.payload.items as OrderItem[]
this._totalAmount = event.payload.totalAmount as number
break
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED
break
case 'OrderShipped':
this._status = OrderStatus.SHIPPED
break
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED
break
}
}
}
enum OrderStatus {
DRAFT = 'DRAFT',
PLACED = 'PLACED',
CONFIRMED = 'CONFIRMED',
SHIPPED = 'SHIPPED',
CANCELLED = 'CANCELLED',
}
Python Event Store実装
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Protocol
from uuid import uuid4
import json
import asyncpg
@dataclass(frozen=True)
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
aggregate_type: str
version: int
timestamp: datetime
payload: dict[str, Any]
metadata: dict[str, str] = field(default_factory=dict)
class EventStore(Protocol):
async def append(
self,
aggregate_id: str,
events: list[DomainEvent],
expected_version: int,
) -> None: ...
async def get_events(
self, aggregate_id: str, from_version: int = 0
) -> list[DomainEvent]: ...
class PostgresEventStore:
def __init__(self, pool: asyncpg.Pool):
self._pool = pool
async def append(
self,
aggregate_id: str,
events: list[DomainEvent],
expected_version: int,
) -> None:
async with self._pool.acquire() as conn:
async with conn.transaction():
# 楽観的同時実行制御
row = await conn.fetchrow(
"SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",
aggregate_id,
)
current = row["cur"] or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected {expected_version}, got {current}"
)
for event in events:
await conn.execute(
"""
INSERT INTO events
(event_id, aggregate_id, aggregate_type,
event_type, version, payload, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
event.event_id,
event.aggregate_id,
event.aggregate_type,
event.event_type,
event.version,
json.dumps(event.payload),
json.dumps(event.metadata),
event.timestamp,
)
async def get_events(
self, aggregate_id: str, from_version: int = 0
) -> list[DomainEvent]:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
""",
aggregate_id,
from_version,
)
return [self._deserialize(row) for row in rows]
@staticmethod
def _deserialize(row) -> DomainEvent:
return DomainEvent(
event_id=row["event_id"],
event_type=row["event_type"],
aggregate_id=row["aggregate_id"],
aggregate_type=row["aggregate_type"],
version=row["version"],
timestamp=row["timestamp"],
payload=json.loads(row["payload"]),
metadata=json.loads(row["metadata"]),
)
メッセージブローカー比較:Kafka vs RabbitMQ vs NATS
主要特性比較
| 項目 | Apache Kafka | RabbitMQ | NATS JetStream |
|---|---|---|---|
| モデル | 分散ログ(Append-only) | メッセージキュー(Push型) | ストリーミング(Pull/Push) |
| メッセージ保存 | 設定期間中永久保存 | 消費後削除(デフォルト) | 設定期間保存 |
| 順序保証 | パーティション内保証 | キュー単位保証 | ストリーム内保証 |
| スループット | 秒間数百万件 | 秒間数万件 | 秒間数十万件 |
| レイテンシ | ms単位(バッチ) | us単位(単件) | us単位 |
| コンシューマグループ | ネイティブサポート | 競合コンシューマパターン | ネイティブサポート |
| リプレイ | オフセットベース自由移動 | 限定的(dead letter) | シーケンスベース移動 |
| 運用複雑度 | 高(ZooKeeper/KRaft) | 中 | 低 |
| 適したケース | イベントストリーミング、ログ集約 | タスクキュー、RPC | 軽量メッセージング、IoT |
Kafka Producer/Consumer例
import { Kafka, logLevel } from 'kafkajs'
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8,
},
})
// Producer - イベント発行
class KafkaEventPublisher {
private producer = kafka.producer({
idempotent: true, // 冪等性保証
maxInFlightRequests: 5,
transactionalId: 'order-service-producer',
})
async publish(event: DomainEvent): Promise<void> {
await this.producer.send({
topic: `events.${event.aggregateType.toLowerCase()}`,
messages: [
{
key: event.aggregateId, // パーティショニングキー = アグリゲートID
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'correlation-id': event.metadata.correlationId,
'content-type': 'application/json',
},
},
],
})
}
async publishBatch(events: DomainEvent[]): Promise<void> {
const transaction = await this.producer.transaction()
try {
for (const event of events) {
await transaction.send({
topic: `events.${event.aggregateType.toLowerCase()}`,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'correlation-id': event.metadata.correlationId,
},
},
],
})
}
await transaction.commit()
} catch (error) {
await transaction.abort()
throw error
}
}
}
// Consumer - イベント消費(冪等性保証付き)
class KafkaEventConsumer {
private consumer = kafka.consumer({ groupId: 'projection-service' })
private processedEvents = new Set<string>()
async start(): Promise<void> {
await this.consumer.subscribe({
topics: ['events.order'],
fromBeginning: false,
})
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event: DomainEvent = JSON.parse(message.value!.toString())
// 冪等性チェック - 処理済みイベントをスキップ
if (await this.isAlreadyProcessed(event.eventId)) {
console.log(`Skipping duplicate event: ${event.eventId}`)
return
}
await this.handleEvent(event)
await this.markAsProcessed(event.eventId)
},
})
}
private async isAlreadyProcessed(eventId: string): Promise<boolean> {
return this.processedEvents.has(eventId)
}
private async markAsProcessed(eventId: string): Promise<void> {
this.processedEvents.add(eventId)
}
private async handleEvent(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderPlaced':
await this.handleOrderPlaced(event)
break
case 'OrderConfirmed':
await this.handleOrderConfirmed(event)
break
}
}
private async handleOrderPlaced(event: DomainEvent): Promise<void> {
console.log(`Processing OrderPlaced for ${event.aggregateId}`)
}
private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
console.log(`Processing OrderConfirmed for ${event.aggregateId}`)
}
}
Sagaパターン:分散トランザクション管理
Orchestration Saga vs Choreography Saga
| 項目 | Orchestration | Choreography |
|---|---|---|
| 制御方式 | 中央オーケストレーターが調整 | 各サービスが自律的に反応 |
| 結合度 | オーケストレーターに依存 | サービス間の疎結合 |
| 複雑性 | オーケストレーターに集中 | サービスに分散 |
| 追跡 | 中央で状態確認可能 | 分散された状態追跡が必要 |
| エラー処理 | 中央で補償トランザクション実行 | 各サービスが補償イベントを発行 |
| 適した場合 | 複雑なワークフロー(5ステップ以上) | 単純なワークフロー(3ステップ以下) |
Orchestration Saga実装
// Saga状態マシン
enum SagaStatus {
STARTED = 'STARTED',
ORDER_CREATED = 'ORDER_CREATED',
PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',
INVENTORY_RESERVED = 'INVENTORY_RESERVED',
COMPLETED = 'COMPLETED',
COMPENSATING = 'COMPENSATING',
FAILED = 'FAILED',
}
interface SagaStep {
name: string
execute: (context: SagaContext) => Promise<void>
compensate: (context: SagaContext) => Promise<void>
}
interface SagaContext {
sagaId: string
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
paymentId?: string
reservationId?: string
[key: string]: unknown
}
class OrderSagaOrchestrator {
private steps: SagaStep[] = []
private completedSteps: SagaStep[] = []
constructor(
private readonly sagaRepository: SagaRepository,
private readonly eventBus: EventBus
) {
this.steps = [
{
name: 'CreateOrder',
execute: async (ctx) => {
await this.eventBus.publish({
eventType: 'CreateOrderRequested',
aggregateId: ctx.orderId,
payload: { customerId: ctx.customerId, items: ctx.items },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'CancelOrderRequested',
aggregateId: ctx.orderId,
payload: { reason: 'Saga compensation' },
} as DomainEvent)
},
},
{
name: 'ProcessPayment',
execute: async (ctx) => {
const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
await this.eventBus.publish({
eventType: 'ProcessPaymentRequested',
aggregateId: ctx.orderId,
payload: { customerId: ctx.customerId, amount: totalAmount },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'RefundPaymentRequested',
aggregateId: ctx.orderId,
payload: { paymentId: ctx.paymentId },
} as DomainEvent)
},
},
{
name: 'ReserveInventory',
execute: async (ctx) => {
await this.eventBus.publish({
eventType: 'ReserveInventoryRequested',
aggregateId: ctx.orderId,
payload: { items: ctx.items },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'ReleaseInventoryRequested',
aggregateId: ctx.orderId,
payload: { reservationId: ctx.reservationId },
} as DomainEvent)
},
},
]
}
async start(context: SagaContext): Promise<void> {
await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)
await this.executeNextStep(context, 0)
}
private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {
if (stepIndex >= this.steps.length) {
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)
return
}
const step = this.steps[stepIndex]
try {
await step.execute(context)
this.completedSteps.push(step)
await this.executeNextStep(context, stepIndex + 1)
} catch (error) {
console.error(`Saga step '${step.name}' failed:`, error)
await this.compensate(context)
}
}
private async compensate(context: SagaContext): Promise<void> {
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)
// 完了済みステップを逆順で補償
for (const step of [...this.completedSteps].reverse()) {
try {
await step.compensate(context)
} catch (error) {
console.error(`Compensation for '${step.name}' failed:`, error)
// 補償失敗時は手動介入が必要 - アラート送信
}
}
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)
}
}
Snapshot戦略:イベントリプレイ最適化
イベント数が多くなるとリプレイ時間が長くなる。Snapshotは特定時点のアグリゲート状態を保存してこの問題を解決する。
// Snapshotベースのアグリゲートロード
class EventSourcedRepository<T extends EventSourcedAggregate> {
private readonly SNAPSHOT_INTERVAL = 50 // 50イベントごとにスナップショット
constructor(
private readonly eventStore: EventStore,
private readonly snapshotStore: SnapshotStore,
private readonly factory: () => T
) {}
async load(aggregateId: string): Promise<T> {
const aggregate = this.factory()
// 1. 最新スナップショットをロード
const snapshot = await this.snapshotStore.getLatest(aggregateId)
if (snapshot) {
aggregate.restoreFromSnapshot(snapshot.state)
// 2. スナップショット以降のイベントのみリプレイ
const events = await this.eventStore.getEvents(aggregateId, snapshot.version)
aggregate.loadFromHistory(events)
} else {
// 3. 全イベントリプレイ
const events = await this.eventStore.getEvents(aggregateId)
aggregate.loadFromHistory(events)
}
return aggregate
}
async save(aggregate: T): Promise<void> {
const events = aggregate.getUncommittedEvents()
await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)
// スナップショット生成条件確認
if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotStore.save({
aggregateId: aggregate.id,
version: aggregate.version,
state: aggregate.toSnapshot(),
timestamp: new Date(),
})
}
aggregate.clearUncommittedEvents()
}
}
運用上の注意事項
必ず知っておくべき重要な警告
-
イベントスキーマ変更はDBマイグレーションより危険:Event Storeのイベントは不変であるため、スキーマを変更すると過去のイベントリプレイが壊れる。必ず下位互換性(backward compatibility)を維持しなければならない。
-
CQRS導入は複雑度を最低2倍にする:読み取りモデルと書き込みモデルの同期遅延(eventual consistency)をUIとビジネスロジックの両方で必ず処理しなければならない。
-
Saga補償トランザクション失敗は手動介入が必要:補償トランザクションも失敗する可能性があり、その場合Dead Letter Queueと手動復旧プロセスが必ず必要だ。
-
イベント順序逆転はデータ整合性を破壊する:Kafkaのパーティショニングキーが不適切に設定されると、同一アグリゲートのイベントが異なるパーティションに分散され、順序が逆転する。
-
Event Storeの無限成長を管理すべき:SnapshotとアーカイビングなしではEvent Storeが無限に大きくなり、リプレイ性能が急激に低下する。
障害事例と復旧手順
事例1:イベント順序逆転
症状:OrderPlacedの前にOrderShippedイベントが到着し、プロジェクションが失敗する。
原因:Kafkaパーティショニングキーを設定しなかったため、イベントが異なるパーティションに分散された。
復旧:
# 1. コンシューマグループオフセットリセット
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group projection-service \
--topic events.order \
--reset-offsets --to-earliest --execute
# 2. プロジェクションテーブル初期化
psql -c "TRUNCATE TABLE order_summaries;"
# 3. プロジェクションサービス再起動(完全リプレイ)
kubectl rollout restart deployment/projection-service
予防:必ずアグリゲートIDをKafkaパーティショニングキーとして使用する。
事例2:重複イベント処理
症状:注文が二重に生成されたり、決済が2回処理される。
原因:コンシューマ障害後の再起動時に、すでに処理されたメッセージを再度消費する(at-least-once特性)。
復旧:
// 冪等性キーテーブルベースの重複防止
class IdempotencyGuard {
constructor(private readonly db: Database) {}
async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {
try {
// UNIQUE制約で重複挿入を防止
await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [
idempotencyKey,
new Date(),
])
} catch (error) {
// 処理済みイベント
console.log(`Event ${idempotencyKey} already processed, skipping`)
return null
}
return operation()
}
}
事例3:スキーマ進化の失敗
症状:新バージョンのイベントハンドラーが過去のイベントをデシリアライズできず、プロジェクションが中断される。
原因:イベントに必須フィールドを追加したが、過去のイベントにはそのフィールドがない。
復旧:
// イベントアップキャスター(Upcaster)パターン
class EventUpcaster {
private upcasters: Map<string, (event: any) => DomainEvent> = new Map()
register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {
this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)
}
upcast(event: any): DomainEvent {
const key = `${event.eventType}:v${event.schemaVersion || 1}`
const upcaster = this.upcasters.get(key)
if (upcaster) {
return this.upcast(upcaster(event)) // 再帰的に最新バージョンまで
}
return event
}
}
// 使用例:v1 -> v2変換
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => ({
...event,
schemaVersion: 2,
payload: {
...event.payload,
currency: event.payload.currency || 'KRW', // デフォルト値追加
shippingMethod: 'STANDARD', // 新フィールドにデフォルト値設定
},
}))
プロダクションチェックリスト
Event Store
- イベントテーブルにaggregate_id + versionユニークインデックス設定
- 楽観的同時実行制御(Optimistic Concurrency)実装確認
- Snapshot戦略設定(50〜100イベントごと)
- イベントアーカイビングポリシー策定(コールドストレージ移行)
- Event Storeバックアップと復旧手順の検証
CQRS
- Read Model再構築(Rebuild)プロセス自動化
- 読み取り/書き込みDB分離とレプリケーション遅延モニタリング
- Projection失敗時のアラートと自動リトライ設定
- Eventual Consistency処理(UIでの楽観的更新)
メッセージブローカー
- Kafka:パーティショニングキーをアグリゲートIDに設定
- Consumer Group lagモニタリング(BurrowまたはKafka Exporter)
- Dead Letter Queue(DLQ)設定とモニタリング
- メッセージシリアライゼーション形式決定(Avro + Schema Registry推奨)
- ブローカークラスタレプリケーションファクター3以上設定
Saga
- 補償トランザクションの定義とテスト完了
- Saga状態永続化(DB保存)確認
- Sagaタイムアウト設定(無限待機防止)
- 補償失敗時の手動復旧プロセス文書化
イベントスキーマ
- Schema Registry導入(Confluent Schema Registry、AWS Glue)
- 下位互換性検証自動化(CIパイプラインに含める)
- イベントアップキャスター(Upcaster)実装
- イベントカタログ文書の維持
モニタリング
- イベント処理レイテンシメトリクス収集
- Consumer Group lagアラート設定
- イベント処理失敗率ダッシュボード構成
- Correlation IDベースのエンドツーエンドトレーシング