- Published on
Event Sourcing + CQRS アーキテクチャ実践実装ガイド:イベントストアからプロジェクション・Sagaパターンまで
- Authors
- Name
- はじめに
- 1. Event Sourcing コア原理
- 2. CQRS アーキテクチャパターン
- 3. Event Store 設計と実装
- 4. AggregateとDomain Eventの実装
- 5. プロジェクションとRead Model
- 6. Sagaパターンと分散トランザクション
- 7. スナップショットとイベント圧縮
- 8. Event Sourcing vs 従来のCRUD比較
- 9. 障害事例と復旧手順
- 10. 運用チェックリスト
- まとめ

はじめに
従来のCRUDシステムはデータの現在の状態のみを保存する。UPDATEが実行されると以前の値は失われ、DELETEが実行されるとデータ自体が消滅する。銀行口座の残高が100万円であることは分かるが、その残高がどのような入出金プロセスを経て形成されたかは分からない。
Event Sourcingはこの問題を根本的に解決する。状態変更そのものをイベントとして記録し、現在の状態はイベントの順次再生によって復元する。CQRSはこの構造の上で書き込み(Command)と読み取り(Query)を完全に分離し、それぞれの要件に最適化する。
本記事では、Event SourcingとCQRSを組み合わせたアーキテクチャの全体実装プロセスを扱う。イベントストア設計からAggregate実装、プロジェクション構築、Sagaパターンを活用した分散トランザクション管理、スナップショット最適化、そして運用時に発生する障害事例と復旧手順まで、実践コードとともに解説する。
1. Event Sourcing コア原理
1.1 イベントとは何か
Event Sourcingにおけるイベント(Event)とは、**過去に発生した変更不可能な事実(Immutable Fact)**である。イベントは過去形で命名され、ビジネスの意図を明確に表現しなければならない。
不適切なイベント設計と適切なイベント設計の比較は以下の通りである。
- 不適切な例:
NameChanged、EmailChanged- プロパティ単位のイベントはビジネス的意味を持たない - 適切な例:
CustomerRelocated、OrderConfirmed- ドメインの行為を表現するイベント
1.2 状態復元の原理
現在の状態は、初期状態にすべてのイベントを順次適用(replay)することで計算される。
現在の状態 = fold(初期状態, [イベント1, イベント2, ..., イベントN])
例えば、銀行口座の状態復元プロセスは以下の通りである。
初期残高: 0円
-> AccountOpened(金額: 0円) => 残高: 0円
-> MoneyDeposited(金額: 100万円) => 残高: 100万円
-> MoneyWithdrawn(金額: 30万円) => 残高: 70万円
-> MoneyDeposited(金額: 50万円) => 残高: 120万円
1.3 イベントの不変性原則
一度保存されたイベントは絶対に修正・削除してはならない。誤ったイベントが発行された場合は、**補償イベント(Compensating Event)**を新たに発行して論理的に取り消す。
// 誤った出金が発生した場合 - イベントを削除しない
// 代わりに補償イベントを発行する
interface MoneyWithdrawnCompensated {
type: 'MoneyWithdrawnCompensated'
originalEventId: string
amount: number
reason: string
timestamp: Date
}
2. CQRS アーキテクチャパターン
2.1 CommandとQueryの分離
CQRSの核心はシンプルである。データを変更するモデルとデータを照会するモデルを分離することだ。
[クライアント]
|
|-- Command(書込) --> [Command Handler] --> [Write Model / Event Store]
| |
| イベント発行
| |
| v
| [Projection Engine]
| |
| v
+-- Query(読取) ----> [Query Handler] ----> [Read Model / View DB]
2.2 なぜ分離するのか
読み取りと書き込みの要件は根本的に異なる。
- 書き込み(Command):ビジネスルール検証、ドメイン不変条件(invariant)保証、トランザクション整合性が必要
- 読み取り(Query):多様なビュー対応、低レイテンシ、高スループットが必要
一つのモデルで両方を満たそうとすると、双方に妥協が生じる。CQRSはそれぞれのモデルを独立に最適化できるようにする。
2.3 Command Handler 実装
// Command定義
interface CreateOrderCommand {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
}
// Command Handler
class OrderCommandHandler {
constructor(
private readonly eventStore: EventStore,
private readonly orderRepository: EventSourcedRepository<Order>
) {}
async handle(command: CreateOrderCommand): Promise<void> {
// 1. Aggregateロード(イベント再生)
const order = await this.orderRepository.load(command.orderId)
// 2. ビジネスロジック実行(イベント生成)
order.create(command.customerId, command.items)
// 3. イベント保存
await this.orderRepository.save(order)
}
}
3. Event Store 設計と実装
3.1 イベントストアスキーマ
イベントストアのコアテーブル構造は以下の通りである。
CREATE TABLE events (
-- グローバル一意識別子
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- イベントが属するストリーム(Aggregate)識別子
stream_id VARCHAR(255) NOT NULL,
-- ストリーム内のイベント順序(楽観的同時実行制御に使用)
stream_version BIGINT NOT NULL,
-- イベントタイプ(デシリアライズに使用)
event_type VARCHAR(255) NOT NULL,
-- イベントペイロード(JSON)
event_data JSONB NOT NULL,
-- メタデータ(相関ID、ユーザー情報など)
metadata JSONB DEFAULT '{}',
-- イベント発生時刻
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- グローバル順序(プロジェクションで全体順序追跡に使用)
global_position BIGSERIAL NOT NULL,
-- 同一ストリーム内でのバージョン重複防止(楽観的同時実行制御)
UNIQUE(stream_id, stream_version)
);
-- ストリーム別イベント照会インデックス
CREATE INDEX idx_events_stream ON events(stream_id, stream_version);
-- 全体イベント順序インデックス(プロジェクション用)
CREATE INDEX idx_events_global ON events(global_position);
-- イベントタイプ別照会インデックス
CREATE INDEX idx_events_type ON events(event_type);
3.2 TypeScript Event Store 実装
import { Pool } from 'pg'
interface DomainEvent {
eventType: string
data: Record<string, unknown>
metadata?: Record<string, unknown>
}
interface StoredEvent extends DomainEvent {
eventId: string
streamId: string
streamVersion: number
globalPosition: number
createdAt: Date
}
class PostgresEventStore {
constructor(private readonly pool: Pool) {}
/**
* イベントをストリームに追加する。
* expectedVersionで楽観的同時実行制御を行う。
*/
async appendToStream(
streamId: string,
expectedVersion: number,
events: DomainEvent[]
): Promise<StoredEvent[]> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// 楽観的同時実行検査
const versionCheck = await client.query(
'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',
[streamId]
)
const currentVersion = versionCheck.rows[0].current_version ?? -1
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current version is ${currentVersion}`
)
}
const storedEvents: StoredEvent[] = []
for (let i = 0; i < events.length; i++) {
const event = events[i]
const version = expectedVersion + i + 1
const result = await client.query(
`INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING event_id, global_position, created_at`,
[
streamId,
version,
event.eventType,
JSON.stringify(event.data),
JSON.stringify(event.metadata ?? {}),
]
)
storedEvents.push({
eventId: result.rows[0].event_id,
streamId,
streamVersion: version,
globalPosition: result.rows[0].global_position,
createdAt: result.rows[0].created_at,
...event,
})
}
await client.query('COMMIT')
return storedEvents
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
/**
* ストリームのすべてのイベントを順序通りに読み取る。
*/
async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE stream_id = $1 AND stream_version >= $2
ORDER BY stream_version ASC`,
[streamId, fromVersion]
)
return result.rows.map(this.mapToStoredEvent)
}
/**
* 全イベントをグローバル順序で読み取る(プロジェクション用)。
*/
async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE global_position > $1
ORDER BY global_position ASC
LIMIT $2`,
[fromPosition, limit]
)
return result.rows.map(this.mapToStoredEvent)
}
private mapToStoredEvent(row: any): StoredEvent {
return {
eventId: row.event_id,
streamId: row.stream_id,
streamVersion: row.stream_version,
globalPosition: row.global_position,
eventType: row.event_type,
data: row.event_data,
metadata: row.metadata,
createdAt: row.created_at,
}
}
}
3.3 専用Event Storeソリューション比較
| 項目 | EventStoreDB (Kurrent) | Axon Server | PostgreSQL 自前実装 |
|---|---|---|---|
| タイプ | 専用イベントDB | CQRSフレームワークサーバー | 汎用RDBMS |
| プロジェクション | サーバーサイドJavaScript | Tracking Processor | 自前実装が必要 |
| サブスクリプションモデル | Catch-up, Persistent | Tracking, Subscribing | Polling, LISTEN/NOTIFY |
| クラスタリング | 内蔵(Gossipプロトコル) | Axon Server Enterprise | 外部ソリューション必要 |
| 学習コスト | 中 | 高(フレームワーク全体) | 低(SQL基盤) |
| 柔軟性 | 中 | 低(Axonエコシステム依存) | 高 |
| 運用複雑度 | 中 | 高 | 低~中 |
4. AggregateとDomain Eventの実装
4.1 Aggregate Base Class
Event SourcingにおけるAggregateは、イベントを発行しイベントを再生して状態を復元する中核単位である。
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
private _version: number = -1
get version(): number {
return this._version
}
/**
* 外部から呼び出して状態を変更する。
* 内部的にイベントを生成し適用する。
*/
protected apply(event: DomainEvent): void {
this.when(event)
this.uncommittedEvents.push(event)
this._version++
}
/**
* イベントに基づいて内部状態を変更するハンドラ。
* サブクラスで実装する。
*/
protected abstract when(event: DomainEvent): void
/**
* 保存済みイベントを再生して状態を復元する。
*/
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this._version++
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
}
4.2 Order Aggregate 実装
// Domain Events
interface OrderCreated {
eventType: 'OrderCreated'
data: {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; unitPrice: number }>
totalAmount: number
}
}
interface OrderConfirmed {
eventType: 'OrderConfirmed'
data: {
orderId: string
confirmedAt: string
}
}
interface OrderCancelled {
eventType: 'OrderCancelled'
data: {
orderId: string
reason: string
cancelledAt: string
}
}
type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled
// Order Aggregate
class Order extends EventSourcedAggregate {
private orderId!: string
private customerId!: string
private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []
private totalAmount: number = 0
private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'
static create(
orderId: string,
customerId: string,
items: Array<{ productId: string; quantity: number; unitPrice: number }>
): Order {
const order = new Order()
const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)
// ビジネスルール検証
if (items.length === 0) {
throw new Error('注文には最低1つ以上の商品が必要です')
}
if (totalAmount <= 0) {
throw new Error('注文合計は0より大きくなければなりません')
}
order.apply({
eventType: 'OrderCreated',
data: { orderId, customerId, items, totalAmount },
})
return order
}
confirm(): void {
if (this.status !== 'CREATED') {
throw new Error('CREATED状態の注文のみ確認できます')
}
this.apply({
eventType: 'OrderConfirmed',
data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },
})
}
cancel(reason: string): void {
if (this.status === 'CANCELLED') {
throw new Error('既にキャンセルされた注文です')
}
this.apply({
eventType: 'OrderCancelled',
data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },
})
}
protected when(event: DomainEvent): void {
const orderEvent = event as OrderEvent
switch (orderEvent.eventType) {
case 'OrderCreated':
this.orderId = orderEvent.data.orderId
this.customerId = orderEvent.data.customerId
this.items = orderEvent.data.items
this.totalAmount = orderEvent.data.totalAmount
this.status = 'CREATED'
break
case 'OrderConfirmed':
this.status = 'CONFIRMED'
break
case 'OrderCancelled':
this.status = 'CANCELLED'
break
}
}
}
5. プロジェクションとRead Model
5.1 プロジェクションの役割
プロジェクション(Projection)はイベントストリームを購読し、**読み取りに最適化されたビュー(Read Model)**を構築するプロセスである。Event Sourcingのイベントストリームから直接照会するのは非効率的であるため、様々な照会要件に合わせた非正規化ビューを別途維持する。
5.2 Projection Engine 実装
interface ProjectionCheckpoint {
projectionName: string
lastProcessedPosition: number
}
abstract class Projection {
abstract readonly name: string
abstract handle(event: StoredEvent): Promise<void>
canHandle(eventType: string): boolean {
return this.handledEventTypes().includes(eventType)
}
abstract handledEventTypes(): string[]
}
class ProjectionEngine {
private projections: Projection[] = []
constructor(
private readonly eventStore: PostgresEventStore,
private readonly checkpointStore: CheckpointStore
) {}
register(projection: Projection): void {
this.projections.push(projection)
}
/**
* 登録された全プロジェクションを実行する。
* 各プロジェクションは最後に処理した位置から再開する。
*/
async run(): Promise<void> {
while (true) {
for (const projection of this.projections) {
const checkpoint = await this.checkpointStore.get(projection.name)
const lastPosition = checkpoint?.lastProcessedPosition ?? 0
const events = await this.eventStore.readAll(lastPosition, 100)
for (const event of events) {
if (projection.canHandle(event.eventType)) {
try {
await projection.handle(event)
} catch (error) {
console.error(
`Projection ${projection.name} failed at position ${event.globalPosition}:`,
error
)
break
}
}
await this.checkpointStore.save({
projectionName: projection.name,
lastProcessedPosition: event.globalPosition,
})
}
}
// ポーリング間隔
await new Promise((resolve) => setTimeout(resolve, 500))
}
}
}
5.3 注文ダッシュボード Read Model
class OrderDashboardProjection extends Projection {
readonly name = 'order-dashboard'
constructor(private readonly db: Pool) {
super()
}
handledEventTypes(): string[] {
return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']
}
async handle(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.db.query(
`INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, 'CREATED', $4)
ON CONFLICT (order_id) DO NOTHING`,
[event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]
)
break
case 'OrderConfirmed':
await this.db.query(
`UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,
[event.data.orderId, event.data.confirmedAt]
)
break
case 'OrderCancelled':
await this.db.query(
`UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,
[event.data.orderId, event.data.reason]
)
break
}
}
}
6. Sagaパターンと分散トランザクション
6.1 Sagaとは何か
Sagaパターンは、複数サービスにまたがるトランザクションを一連のローカルトランザクションと補償トランザクションで管理するパターンである。従来の2PC(Two-Phase Commit)と異なり、長時間ロックを保持しないため、拡張性に優れている。
二つの実装方式がある。
- Choreography(コレオグラフィ):各サービスがイベントを発行し、他のサービスが反応する。中央調整者が存在しない。
- Orchestration(オーケストレーション):中央オーケストレーターが全体フローを管理し、各サービスにコマンドを送信する。
6.2 Orchestration Saga 実装
// Saga状態定義
type OrderSagaStatus =
| 'STARTED'
| 'PAYMENT_PENDING'
| 'PAYMENT_COMPLETED'
| 'INVENTORY_RESERVED'
| 'COMPLETED'
| 'COMPENSATING'
| 'FAILED'
interface SagaStep {
name: string
execute: () => Promise<void>
compensate: () => Promise<void>
}
class OrderSaga {
private status: OrderSagaStatus = 'STARTED'
private completedSteps: SagaStep[] = []
constructor(
private readonly orderId: string,
private readonly paymentService: PaymentService,
private readonly inventoryService: InventoryService,
private readonly sagaLog: SagaLogStore
) {}
async execute(orderData: {
customerId: string
items: any[]
totalAmount: number
}): Promise<void> {
const steps: SagaStep[] = [
{
name: 'ProcessPayment',
execute: async () => {
await this.paymentService.processPayment(
this.orderId,
orderData.customerId,
orderData.totalAmount
)
this.status = 'PAYMENT_COMPLETED'
},
compensate: async () => {
await this.paymentService.refundPayment(this.orderId)
},
},
{
name: 'ReserveInventory',
execute: async () => {
await this.inventoryService.reserve(this.orderId, orderData.items)
this.status = 'INVENTORY_RESERVED'
},
compensate: async () => {
await this.inventoryService.releaseReservation(this.orderId)
},
},
]
for (const step of steps) {
try {
await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')
await step.execute()
await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')
this.completedSteps.push(step)
} catch (error) {
await this.sagaLog.record(this.orderId, step.name, 'FAILED')
console.error(`Saga step ${step.name} failed:`, error)
// 補償トランザクション実行(逆順)
this.status = 'COMPENSATING'
await this.compensate()
this.status = 'FAILED'
return
}
}
this.status = 'COMPLETED'
await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')
}
private async compensate(): Promise<void> {
// 完了した段階を逆順で補償
const stepsToCompensate = [...this.completedSteps].reverse()
for (const step of stepsToCompensate) {
try {
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')
await step.compensate()
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')
} catch (compensateError) {
// 補償失敗時は手動介入が必要
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')
console.error(`Compensation failed for step ${step.name}:`, compensateError)
}
}
}
}
6.3 Choreography vs Orchestration 比較
| 項目 | Choreography | Orchestration |
|---|---|---|
| 結合度 | 低い(イベント駆動) | 中程度(オーケストレーター依存) |
| 可視性 | 低い(フロー追跡が困難) | 高い(中央でフロー管理) |
| 複雑度 | サービス数増加で急激に上昇 | オーケストレーターに集中 |
| 単一障害点 | なし | オーケストレーター |
| テスト容易性 | 困難 | 比較的容易 |
| 適したケース | 2~3サービスの単純なフロー | 4サービス以上の複雑なフロー |
7. スナップショットとイベント圧縮
7.1 スナップショットの必要性
Aggregateのイベントが数千、数万件に達すると、状態復元時間が急激に増加する。スナップショットは特定時点のAggregate状態をシリアライズして保存し、以降のイベントのみ再生する最適化手法である。
interface Snapshot {
streamId: string
version: number
state: Record<string, unknown>
createdAt: Date
}
class SnapshotRepository {
constructor(private readonly pool: Pool) {}
async save(snapshot: Snapshot): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (stream_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
[snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
)
}
async load(streamId: string): Promise<Snapshot | null> {
const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])
return result.rows.length > 0 ? result.rows[0] : null
}
}
class EventSourcedRepository<T extends EventSourcedAggregate> {
private readonly SNAPSHOT_INTERVAL = 100 // 100イベントごとにスナップショット
constructor(
private readonly eventStore: PostgresEventStore,
private readonly snapshotRepo: SnapshotRepository,
private readonly factory: () => T
) {}
async load(streamId: string): Promise<T> {
const aggregate = this.factory()
// 1. スナップショットロード試行
const snapshot = await this.snapshotRepo.load(streamId)
let fromVersion = 0
if (snapshot) {
;(aggregate as any).restoreFromSnapshot(snapshot.state)
;(aggregate as any)._version = snapshot.version
fromVersion = snapshot.version + 1
}
// 2. スナップショット以降のイベントのみ再生
const events = await this.eventStore.readStream(streamId, fromVersion)
aggregate.loadFromHistory(events)
return aggregate
}
async save(aggregate: T, streamId: string): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents()
if (uncommittedEvents.length === 0) return
await this.eventStore.appendToStream(
streamId,
aggregate.version - uncommittedEvents.length,
uncommittedEvents
)
// スナップショット作成要否確認
if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotRepo.save({
streamId,
version: aggregate.version,
state: (aggregate as any).toSnapshot(),
createdAt: new Date(),
})
}
aggregate.clearUncommittedEvents()
}
}
7.2 スナップショット戦略ガイドライン
- 作成頻度:100~1,000イベントごと(ドメイン特性に応じて調整)
- 保存場所:同一ストリームまたは別ストレージ(別ストレージ推奨)
- 作成タイミング:非同期バックグラウンドで作成し、書き込みレイテンシに影響を与えないようにする
- リプレイ時間が100msを超える場合:スナップショットの導入を積極的に検討する
8. Event Sourcing vs 従来のCRUD比較
| 項目 | Event Sourcing | 従来のCRUD |
|---|---|---|
| データ保存 | イベント(変更履歴)を保存 | 現在の状態のみ保存 |
| 履歴追跡 | 完全な監査証跡を自動提供 | 別途監査テーブルが必要 |
| タイムトラベルクエリ | 特定時点の状態復元が可能 | 別途スナップショットテーブルが必要 |
| クエリ複雑度 | 高い(Read Modelの別途構築) | 低い(SQL直接照会) |
| 整合性モデル | 主に結果整合性 | 即時整合性 |
| スキーマ変更 | イベントバージョン管理が必要 | ALTER TABLE |
| デバッグ | イベント再生で問題を再現 | 現在の状態のみ確認可能 |
| 学習コスト | 高い | 低い |
| 適したドメイン | 金融、注文、物流など履歴重要ドメイン | 設定管理、カタログなど単純CRUD |
9. 障害事例と復旧手順
9.1 イベントストア障害
症状:イベント保存失敗によりCommand処理が停止
復旧手順:
- DB接続状態の確認と障害原因の特定
- イベントストア復旧後、未処理Commandのリトライ
- グローバルポジションのギャップ確認 -- ギャップが発生した場合はプロジェクションの整合性検証
9.2 プロジェクション同期遅延
症状:Read Modelが最新状態を反映していない
復旧手順:
- プロジェクションチェックポイントとイベントストアの最新ポジションを比較
- 遅延原因の特定(処理速度、障害、ポイズンイベントなど)
- 必要に応じてプロジェクションを初期化し、最初から再構築(rebuild)
9.3 Saga補償失敗
症状:補償トランザクション実行中に外部サービス障害が発生
復旧手順:
- SagaログからCOMPENSATION_FAILED状態を照会
- 失敗した補償ステップを手動でリトライ
- リトライ回数超過時はDead Letter Queueに記録し、手動介入を実施
9.4 イベントスキーマ変更
症状:イベント構造が変更され、既存イベントのデシリアライズが失敗
復旧手順:
- Upcasterパターンで旧バージョンイベントを新バージョンに変換
- プロジェクションで両方のバージョンを処理できるようにハンドラを修正
- 全プロジェクションのrebuildを実行
10. 運用チェックリスト
導入前レビュー
- ドメインにEvent Sourcingが本当に必要か?(履歴追跡、監査、タイムトラベルクエリの必要性)
- 結果整合性を受容できるか?
- チームがDDDとイベント駆動の思考に慣れているか?
設計フェーズ
- イベントがビジネスの意図を含んでいるか?(プロパティ変更ではなくドメイン行為)
- Aggregate境界は適切か?(大きすぎず小さすぎないか)
- イベントスキーマのバージョン管理戦略は策定されているか?
- プロジェクションrebuild戦略は準備されているか?
運用フェーズ
- イベントストア容量モニタリングとアーカイブ戦略
- プロジェクション遅延時間(lag)のモニタリング
- Saga失敗率と補償トランザクション成功率の追跡
- スナップショット作成周期とディスク使用量の管理
- ストリーム別イベント数のモニタリング(異常増加の検知)
障害対策
- プロジェクション全体rebuildの手順と想定所要時間の文書化
- Saga Dead Letter Queue処理プロセスの策定
- イベントストアのバックアップと復旧手順の検証
- Upcasterテストの自動化
まとめ
Event SourcingとCQRSは強力なアーキテクチャパターンであるが、すべてのシステムに適しているわけではない。履歴追跡、監査ログ、タイムトラベルクエリが核心的な要件であるドメイン(金融、注文管理、物流など)で最大の価値を発揮する。
導入を検討する場合、以下を推奨する。
- コアドメインにのみ選択的に適用せよ。すべてのサービスにEvent Sourcingを適用する必要はない。
- プロジェクションrebuildを最初から設計せよ。運用中のプロジェクション再構築は必ず必要になる。
- イベントスキーマ進化戦略を先に策定せよ。イベントは永久に保存されるため、スキーマ変更コストが最も高い。
- Saga補償失敗に備えよ。分散システムにおいて補償が失敗するのは時間の問題である。
結果整合性を受容し、イベント駆動の思考に転換する準備ができていれば、Event Sourcing + CQRSはシステムの復元力と拡張性を根本的に向上させるアーキテクチャ選択となるだろう。