- Authors

- Name
- Youngju Kim
- @fjvbn20031
- なぜイベントソーシングとCQRSを一緒に扱うのか
- アーキテクチャ全体構造
- ドメインイベント設計
- コマンドハンドラーとAggregate実装
- プロジェクション設計と実装
- スナップショット戦略
- イベントバージョン管理(Event Versioning)
- 障害復旧手順
- 運用チェックリスト
- よくあるミスとアンチパターン
- イベントソーシング vs 従来型CRUD判断基準
- まとめ
- References

なぜイベントソーシングとCQRSを一緒に扱うのか
イベントソーシング(Event Sourcing)とCQRS(Command Query Responsibility Segregation)は独立したパターンですが、プロダクション環境ではほぼ常に一緒に使用されます。イベントソーシングで状態変更の全履歴をイベントストリームに記録すると、現在の状態はイベントリプレイ(replay)でしか復元できないため、読み取り性能が急激に低下します。CQRSで読み取り専用プロジェクション(Read Model)を分離すれば、この問題が解決されます。
本記事は概念説明にとどまりません。EventStoreDBベースのイベントストア構築、TypeScriptとPythonでのコマンドハンドラーとプロジェクションの実装、スナップショット戦略、イベントスキーマバージョン管理、障害復旧手順まで運用レベルの内容を全て網羅します。2025年以降EventStoreDBがKurrentにリブランディングされた状況と最新のgRPCクライアントAPIも反映しています。
アーキテクチャ全体構造
イベントソーシング + CQRSシステムの全体フローは以下の通りです。
- クライアントがコマンド(Command)を送信する
- コマンドハンドラーがドメイン不変条件(invariant)を検証する
- 検証を通過するとドメインイベントを生成しイベントストアにappendする
- イベントストアが**サブスクライバー(Subscriber)**にイベントをプッシュする
- プロジェクションハンドラーがイベントを受信して**読み取りモデル(Read Model)**を更新する
- クエリハンドラーが読み取りモデルからデータを照会してクライアントに返す
核心原則:書き込みパス(Command Path)と読み取りパス(Query Path)は完全に分離されます。書き込み側はイベントストアにのみ書き込み、読み取り側はプロジェクションされた読み取りモデルからのみ照会します。
イベントストアソリューション比較
プロダクションで使用可能なイベントストアソリューションを比較します。技術スタックと運用環境に合った選択が重要です。
| ソリューション | 言語エコシステム | ストレージ方式 | プロジェクション内蔵 | ライセンス | 運用複雑度 |
|---|---|---|---|---|---|
| EventStoreDB (Kurrent) | 言語非依存 (gRPC) | 専用ファイルシステム | あり (JSベース) | BSL (商用無料) | 中間 |
| Axon Server | JVM (Java/Kotlin) | 専用ストレージ | なし (Frameworkで処理) | オープンソース + Enterprise | 中間 |
| Marten | .NET (C#) | PostgreSQL | あり (C#ベース) | MIT | 低い |
| EventSourcingDB | 言語非依存 (gRPC) | 専用エンジン | なし | オープンソース | 低い |
| PostgreSQL + 自作 | 言語非依存 | RDBMS | なし (自作) | オープンソース | 高い |
| MongoDB + 自作 | 言語非依存 | Document DB | なし (自作) | SSPL | 高い |
EventStoreDBはGreg Youngが設計した専用イベントストアで、ストリーム単位のappend-onlyストレージ、内蔵プロジェクション、catch-upサブスクリプションなどイベントソーシングに必要な機能をネイティブで提供します。本記事ではEventStoreDBを基準に実装例を作成します。
ドメインイベント設計
イベントソーシングシステムにおいて、ドメインイベントはシステムの最も重要な契約(contract)です。一度保存されたイベントは絶対に変更・削除しません。したがって、イベントスキーマの設計には慎重を期す必要があります。
イベント設計原則
- 過去形の動詞で命名する:
OrderCreated、PaymentProcessed、ItemShipped - ビジネスの意図を込める:
ReservationStatusChangedではなくRoomBooked - **自己完結的(self-contained)**であること:イベント1つで何が起こったのかを完全に把握できる必要がある
- 単純型のみ使用する:string、number、boolean、配列。Value Objectをイベントに直接含めない
- バージョンフィールドを含む:スキーマ進化に必須
TypeScriptイベント定義
// events/order-events.ts
interface BaseEvent {
eventType: string
eventVersion: number
aggregateId: string
timestamp: string
metadata: {
correlationId: string
causationId: string
userId: string
}
}
interface OrderCreated extends BaseEvent {
eventType: 'OrderCreated'
eventVersion: 1
data: {
orderId: string
customerId: string
items: Array<{
productId: string
productName: string
quantity: number
unitPrice: number
}>
totalAmount: number
currency: string
shippingAddress: {
street: string
city: string
zipCode: string
country: string
}
}
}
interface OrderPaymentConfirmed extends BaseEvent {
eventType: 'OrderPaymentConfirmed'
eventVersion: 1
data: {
orderId: string
paymentId: string
amount: number
method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
confirmedAt: string
}
}
interface OrderCancelled extends BaseEvent {
eventType: 'OrderCancelled'
eventVersion: 1
data: {
orderId: string
reason: string
cancelledBy: string
refundAmount: number
}
}
type OrderEvent = OrderCreated | OrderPaymentConfirmed | OrderCancelled
correlationIdとcausationIdをメタデータに必ず含めます。分散システムで1つのユーザーリクエストが複数のイベントを発生させる場合、この2つのフィールドがないと障害追跡が不可能になります。
コマンドハンドラーとAggregate実装
コマンドハンドラーはイベントソーシングシステムの書き込みパスを担当します。Aggregateはドメイン不変条件を守る境界であり、イベントをリプレイして現在の状態を復元します。
TypeScript Aggregate実装
// aggregates/order-aggregate.ts
import { EventStoreDBClient, jsonEvent, FORWARDS, START } from '@eventstore/db-client'
interface OrderState {
orderId: string
status: 'CREATED' | 'PAID' | 'SHIPPED' | 'CANCELLED'
customerId: string
totalAmount: number
items: Array<{ productId: string; quantity: number; unitPrice: number }>
version: number
}
class OrderAggregate {
private state: OrderState
private pendingEvents: OrderEvent[] = []
constructor() {
this.state = {
orderId: '',
status: 'CREATED',
customerId: '',
totalAmount: 0,
items: [],
version: -1,
}
}
// イベントリプレイで状態を復元
static async load(client: EventStoreDBClient, orderId: string): Promise<OrderAggregate> {
const aggregate = new OrderAggregate()
const streamName = `order-${orderId}`
const events = client.readStream(streamName, {
direction: FORWARDS,
fromRevision: START,
})
for await (const resolvedEvent of events) {
const event = resolvedEvent.event
if (event) {
aggregate.apply(event.data as OrderEvent, false)
aggregate.state.version = Number(resolvedEvent.event!.revision)
}
}
return aggregate
}
// コマンド:注文作成
createOrder(command: {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; unitPrice: number }>
}): void {
// 不変条件の検証
if (this.state.orderId !== '') {
throw new Error(`Order ${command.orderId} already exists`)
}
if (command.items.length === 0) {
throw new Error('Order must contain at least one item')
}
const totalAmount = command.items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)
const event: OrderCreated = {
eventType: 'OrderCreated',
eventVersion: 1,
aggregateId: command.orderId,
timestamp: new Date().toISOString(),
metadata: { correlationId: '', causationId: '', userId: '' },
data: {
orderId: command.orderId,
customerId: command.customerId,
items: command.items.map((i) => ({
...i,
productName: '', // 照会時に補完
})),
totalAmount,
currency: 'KRW',
shippingAddress: { street: '', city: '', zipCode: '', country: 'KR' },
},
}
this.apply(event, true)
}
// コマンド:決済確認
confirmPayment(
paymentId: string,
amount: number,
method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
): void {
if (this.state.status !== 'CREATED') {
throw new Error(`Cannot confirm payment for order in status: ${this.state.status}`)
}
if (amount !== this.state.totalAmount) {
throw new Error(
`Payment amount ${amount} does not match order total ${this.state.totalAmount}`
)
}
const event: OrderPaymentConfirmed = {
eventType: 'OrderPaymentConfirmed',
eventVersion: 1,
aggregateId: this.state.orderId,
timestamp: new Date().toISOString(),
metadata: { correlationId: '', causationId: '', userId: '' },
data: {
orderId: this.state.orderId,
paymentId,
amount,
method,
confirmedAt: new Date().toISOString(),
},
}
this.apply(event, true)
}
// イベント適用(状態遷移)
private apply(event: OrderEvent, isNew: boolean): void {
switch (event.eventType) {
case 'OrderCreated':
this.state.orderId = event.data.orderId
this.state.customerId = event.data.customerId
this.state.totalAmount = event.data.totalAmount
this.state.items = event.data.items
this.state.status = 'CREATED'
break
case 'OrderPaymentConfirmed':
this.state.status = 'PAID'
break
case 'OrderCancelled':
this.state.status = 'CANCELLED'
break
}
if (isNew) {
this.pendingEvents.push(event)
}
}
// イベントストアに保存
async save(client: EventStoreDBClient): Promise<void> {
if (this.pendingEvents.length === 0) return
const streamName = `order-${this.state.orderId}`
const events = this.pendingEvents.map((e) =>
jsonEvent({ type: e.eventType, data: e.data, metadata: e.metadata })
)
await client.appendToStream(streamName, events, {
expectedRevision: this.state.version === -1 ? 'no_stream' : BigInt(this.state.version),
})
this.pendingEvents = []
}
}
expectedRevisionパラメータが楽観的同時実行制御(Optimistic Concurrency Control)の核心です。同じAggregateに対して2つのコマンドが同時に実行されると、先に保存した側が成功し、後の側はWrongExpectedVersionErrorを受け取ります。リトライロジックでイベントを再ロードしてコマンドを再実行する必要があります。
プロジェクション設計と実装
プロジェクションはイベントストリームから読み取りモデルを構築するプロセスです。イベントソーシングシステムにおいて、プロジェクションは「望む形のビューを自由に作れる」という核心的な利点を実現する部分です。
プロジェクションパターン分類
- インラインプロジェクション(Inline Projection):イベント保存と同時に読み取りモデルを更新します。強い一貫性を保証しますが、書き込み性能が低下します。
- 非同期プロジェクション(Async Projection):サブスクリプション(Subscription)を通じて非同期で読み取りモデルを更新します。結果整合性(Eventual Consistency)ですが、書き込み性能が良好です。ほとんどのプロダクションシステムがこの方式を採用しています。
- ライブプロジェクション(Live Projection):リクエストごとにイベントをリプレイします。常に最新状態を保証しますが、イベントが多い場合は性能が急激に低下します。
Python非同期プロジェクション実装
以下はPythonでEventStoreDBのPersistent Subscriptionを利用した非同期プロジェクションの実装例です。
# projections/order_summary_projection.py
import asyncio
import json
from datetime import datetime
from esdbclient import EventStoreDBClient, NewEvent, StreamState
from dataclasses import dataclass, asdict
from typing import Optional
import asyncpg
@dataclass
class OrderSummaryReadModel:
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: str
updated_at: str
payment_method: Optional[str] = None
cancelled_reason: Optional[str] = None
class OrderSummaryProjection:
def __init__(self, esdb_client: EventStoreDBClient, pg_pool: asyncpg.Pool):
self.esdb = esdb_client
self.pg_pool = pg_pool
self._checkpoint_interval = 100
self._processed_count = 0
async def start(self):
"""catch-upサブスクリプションでプロジェクションを開始"""
last_position = await self._load_checkpoint()
subscription = self.esdb.subscribe_to_all(
from_position=last_position,
filter_include=[r"order-.*"], # order-で始まるストリームのみサブスクライブ
)
for event in subscription:
try:
await self._handle_event(event)
self._processed_count += 1
# 定期的にチェックポイントを保存
if self._processed_count % self._checkpoint_interval == 0:
await self._save_checkpoint(event.commit_position)
except Exception as e:
print(f"Projection error at position {event.commit_position}: {e}")
# エラー発生時にチェックポイントを保存して再起動
await self._save_checkpoint(event.commit_position)
raise
async def _handle_event(self, event):
"""イベントタイプ別ハンドラーにルーティング"""
handler_map = {
'OrderCreated': self._on_order_created,
'OrderPaymentConfirmed': self._on_payment_confirmed,
'OrderCancelled': self._on_order_cancelled,
}
handler = handler_map.get(event.type)
if handler:
data = json.loads(event.data)
await handler(data)
async def _on_order_created(self, data: dict):
"""注文作成イベント処理 - 読み取りモデルINSERT"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
INSERT INTO order_summary (
order_id, customer_id, status, total_amount,
item_count, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
""",
data['orderId'],
data['customerId'],
'CREATED',
data['totalAmount'],
len(data['items']),
datetime.fromisoformat(data.get('createdAt', datetime.now().isoformat())),
datetime.now(),
)
async def _on_payment_confirmed(self, data: dict):
"""決済確認イベント処理 - 読み取りモデルUPDATE"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
UPDATE order_summary
SET status = 'PAID',
payment_method = $2,
updated_at = $3
WHERE order_id = $1
""", data['orderId'], data['method'], datetime.now())
async def _on_order_cancelled(self, data: dict):
"""注文キャンセルイベント処理"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
UPDATE order_summary
SET status = 'CANCELLED',
cancelled_reason = $2,
updated_at = $3
WHERE order_id = $1
""", data['orderId'], data['reason'], datetime.now())
async def _load_checkpoint(self) -> Optional[int]:
"""最後の処理位置をロード"""
async with self.pg_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT position FROM projection_checkpoints WHERE name = $1",
'order_summary'
)
return row['position'] if row else None
async def _save_checkpoint(self, position: int):
"""処理位置を保存"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
INSERT INTO projection_checkpoints (name, position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (name) DO UPDATE SET
position = EXCLUDED.position,
updated_at = NOW()
""", 'order_summary', position)
プロジェクション実装で最も重要なのは**冪等性(Idempotency)**です。上記コードのON CONFLICT ... DO UPDATE構文が核心です。プロジェクションが途中で失敗して再起動すると、すでに処理したイベントを再び受け取る可能性があります。冪等に処理しなければデータが破損します。
読み取りモデルテーブルスキーマ
-- プロジェクションが使用する読み取りモデルテーブル
CREATE TABLE order_summary (
order_id VARCHAR(36) PRIMARY KEY,
customer_id VARCHAR(36) NOT NULL,
status VARCHAR(20) NOT NULL,
total_amount DECIMAL(15,2) NOT NULL,
item_count INTEGER NOT NULL,
payment_method VARCHAR(20),
cancelled_reason TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
-- 顧客別注文照会最適化インデックス
CREATE INDEX idx_order_summary_customer ON order_summary(customer_id, status);
-- ステータス別フィルタリングインデックス
CREATE INDEX idx_order_summary_status ON order_summary(status, created_at DESC);
-- プロジェクションチェックポイントテーブル
CREATE TABLE projection_checkpoints (
name VARCHAR(100) PRIMARY KEY,
position BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL
);
読み取りモデルはプロジェクションの要件に合わせて自由に設計します。正規化する必要はありません。顧客ダッシュボード用、管理者統計用、検索エンジンインデックス用など、複数のプロジェクションを同時に運用できます。新しいビューが必要になれば、新しいプロジェクションを追加してイベントを最初からリプレイするだけです。
スナップショット戦略
Aggregateに数千、数万件のイベントが蓄積されると、毎回全イベントをリプレイするのは非効率です。スナップショットは特定時点のAggregate状態を保存してリプレイ範囲を縮小します。
スナップショット適用基準
スナップショットを早すぎる段階で導入すると複雑度だけが上がります。以下の基準で判断します。
- Aggregateあたりの平均イベント数が50以上であればスナップショット導入を検討する
- イベントリプレイ時間が100msを超過する場合はスナップショットが必要
- スナップショットの間隔は通常100〜500イベントごとに生成する
TypeScriptスナップショット実装
// snapshots/snapshot-store.ts
interface Snapshot<T> {
aggregateId: string
aggregateType: string
version: number // スナップショット時点のAggregateバージョン
schemaVersion: number // スナップショットスキーマバージョン(アップグレード検出用)
state: T
createdAt: string
}
class SnapshotStore {
private client: EventStoreDBClient
private snapshotInterval: number
constructor(client: EventStoreDBClient, snapshotInterval = 200) {
this.client = client
this.snapshotInterval = snapshotInterval
}
async saveSnapshot<T>(snapshot: Snapshot<T>): Promise<void> {
const streamName = `snapshot-${snapshot.aggregateType}-${snapshot.aggregateId}`
const event = jsonEvent({
type: 'Snapshot',
data: snapshot,
})
// スナップショットストリームは最新1つだけ保持(maxCount設定)
await this.client.appendToStream(streamName, [event])
await this.client.setStreamMetadata(streamName, {
maxCount: 3, // 直近3つを保持してロールバック可能性を確保
})
}
async loadSnapshot<T>(
aggregateType: string,
aggregateId: string,
currentSchemaVersion: number
): Promise<Snapshot<T> | null> {
const streamName = `snapshot-${aggregateType}-${aggregateId}`
try {
const events = this.client.readStream(streamName, {
direction: BACKWARDS,
fromRevision: END,
maxCount: 1,
})
for await (const resolved of events) {
const snapshot = resolved.event!.data as Snapshot<T>
// スキーマバージョンが異なる場合はスナップショットを無視(全イベントリプレイ)
if (snapshot.schemaVersion !== currentSchemaVersion) {
console.warn(
`Snapshot schema mismatch for ${aggregateId}: ` +
`expected ${currentSchemaVersion}, got ${snapshot.schemaVersion}. ` +
`Replaying all events.`
)
return null
}
return snapshot
}
} catch (error) {
// スナップショットストリームがない場合はnullを返す
return null
}
return null
}
shouldTakeSnapshot(currentVersion: number, lastSnapshotVersion: number): boolean {
return currentVersion - lastSnapshotVersion >= this.snapshotInterval
}
}
スナップショットのschemaVersionフィールドに注目する必要があります。Aggregateの構造が変更されると、既存のスナップショットはもはや有効ではありません。schemaVersionが不一致の場合、スナップショットを無視して全イベントをリプレイし、新しいスナップショットを生成します。これがイベントソーシングの利点です。イベントは不変なので、いつでも再リプレイできます。
注意:スナップショットは性能最適化技法であり、必須要素ではありません。スナップショットなしでもシステムは正常に動作する必要があります。スナップショットが破損したり無効化された場合、全イベントリプレイへの自動フォールバックを実装してください。
イベントバージョン管理(Event Versioning)
プロダクションシステムが進化すると、イベントスキーマも変更されます。イベントソーシングでは保存されたイベントは絶対に変更しないため、スキーマ変更を安全に処理する戦略が必要です。
バージョン管理戦略比較
| 戦略 | 説明 | 適した状況 | リスク |
|---|---|---|---|
| Weak Schema | 新フィールド追加時にデフォルト値を使用、既存イベントと互換 | フィールド追加のみ必要な場合 | 低い |
| Upcasting | イベントデシリアライズ時にミドルウェアで変換 | フィールド名変更、型変更 | 中間 |
| New Event Type | 完全に新しいイベントタイプを定義 | イベントの意味自体が変更 | 低い |
| Copy-Replace Stream | ストリームを新スキーマで複製後に置換 | 大規模スキーママイグレーション | 高い |
Upcasting実装例
最も実用的な戦略はUpcastingです。イベントをデシリアライズする過程で以前のバージョンを現在のバージョンに変換するミドルウェアを配置します。
// versioning/event-upcaster.ts
type Upcaster = (event: any) => any
// バージョン別アップキャスター登録
const upcasters: Map<string, Map<number, Upcaster>> = new Map()
// OrderCreated v1 -> v2: shippingAddressを構造化されたオブジェクトに変更
upcasters.set(
'OrderCreated',
new Map([
[
1,
(event: any) => {
// v1でshippingAddressが単一文字列だった場合
const address =
typeof event.data.shippingAddress === 'string'
? {
street: event.data.shippingAddress,
city: 'UNKNOWN',
zipCode: 'UNKNOWN',
country: 'KR',
}
: event.data.shippingAddress
return {
...event,
eventVersion: 2,
data: {
...event.data,
shippingAddress: address,
// v2で追加されたフィールドにデフォルト値を適用
orderSource: event.data.orderSource ?? 'WEB',
},
}
},
],
])
)
function upcastEvent(event: any): any {
const eventUpcasters = upcasters.get(event.eventType)
if (!eventUpcasters) return event
let current = event
const targetVersion = Math.max(...Array.from(eventUpcasters.keys())) + 1
// 現在のバージョンから最新バージョンまで順次適用
for (let v = current.eventVersion; v < targetVersion; v++) {
const upcaster = eventUpcasters.get(v)
if (upcaster) {
current = upcaster(current)
}
}
return current
}
// 使用例
// const rawEvent = loadFromEventStore();
// const currentEvent = upcastEvent(rawEvent);
// aggregate.apply(currentEvent);
イベントバージョン管理で最も重要な原則は既存のイベントを絶対に変更しないことです。イベントストアに保存されたデータは不変です。代わりに読み取り時点で変換(Upcasting)するか、新しいイベントタイプを定義します。
障害復旧手順
イベントソーシングシステムでの障害復旧は従来のシステムとは異なります。イベントストアが真実の源泉(Single Source of Truth)であるため、プロジェクションはいつでも再構築できます。
プロジェクション破損時の復旧
プロジェクションに誤ったデータが含まれている場合や、プロジェクションロジックにバグがあった場合に使用する手順です。
- プロジェクションサービスの停止:該当プロジェクションのサブスクリプションを停止する
- 読み取りモデルテーブルのDROPまたはTRUNCATE:既存の誤ったデータを削除する
- チェックポイントの初期化:
projection_checkpointsテーブルから該当プロジェクションのpositionを削除する - プロジェクションサービスの再起動:イベントストアの最初からすべてのイベントをリプレイして読み取りモデルを再構築する
- 再構築完了の確認:プロジェクションが現在位置まで追いついた(caught-up)ことを確認する
警告:イベントが数億件ある場合、再構築には数時間かかる可能性があります。並列処理可能なプロジェクションアーキテクチャを事前に設計するか、パーティション単位で再構築できるように準備してください。
イベントストアクラスター障害時
EventStoreDBはリーダー-フォロワーアーキテクチャを使用します。リーダーノードがダウンすると、フォロワーの1つが自動的にリーダーに昇格します。この過程での注意点は以下の通りです。
- 書き込み失敗処理:リーダー切り替え中に書き込みが失敗した場合はリトライする。
expectedRevisionで冪等性が保証される - サブスクリプション再接続:Persistent Subscriptionは自動再接続されるが、Catch-up Subscriptionは最後のチェックポイントから手動で再接続する必要がある
- スプリットブレイン防止:最低3ノードクラスターを運用して、過半数(quorum)ベースのリーダー選出を可能にする
一般的な障害シナリオと対応
| 障害シナリオ | 原因 | 対応方法 |
|---|---|---|
| プロジェクションデータ不整合 | プロジェクションロジックバグ | プロジェクション再構築(全イベントリプレイ) |
| Aggregateロード失敗 | イベントストリーム破損 | スナップショットフォールバック後に部分リプレイ、クラスターレプリカを確認 |
| 同時書き込み競合 | 同じAggregateの同時修正 | WrongExpectedVersionErrorをキャッチしてリトライ |
| サブスクリプション遅延 | プロジェクション処理速度不足 | パーティショニングまたはプロジェクションインスタンスの水平スケーリング |
| イベントストアディスク満杯 | イベント増加 | アーカイブポリシーの適用、ストリームmaxAge/maxCount設定 |
| スナップショットスキーマ不一致 | Aggregate構造変更後のデプロイ | 全イベントリプレイへの自動フォールバック |
運用チェックリスト
プロダクションデプロイ前に必ず確認すべき項目です。
設計段階チェックリスト
- イベントスキーマに
eventVersionフィールドが含まれているか - イベントメタデータに
correlationId、causationIdが含まれているか - Aggregateの不変条件検証ロジックが完全か
- コマンド失敗時のエラー応答が明確か
- CQRSが本当に必要なドメインか(単純なCRUDではないか再確認)
実装段階チェックリスト
- プロジェクションハンドラーが冪等(Idempotent)か
- 楽観的同時実行制御が実装されているか(
expectedRevision) - イベントシリアライズ/デシリアライズテストが作成されているか
- Upcasterが以前のバージョンのイベントを正しく変換するか
- スナップショットスキーマバージョン不一致時に全イベントリプレイにフォールバックするか
運用段階チェックリスト
- イベントストアクラスターが3ノード以上で構成されているか
- プロジェクションconsumer lagモニタリングが設定されているか
- イベントストアディスク使用量アラートが設定されているか
- プロジェクション再構築手順がドキュメント化されているか
- スナップショット生成周期と保持ポリシーが設定されているか
- Dead Letter Queue(DLQ)が設定されているか(処理不可イベントの隔離)
- 障害時の手動イベント補正スクリプトが準備されているか
よくあるミスとアンチパターン
実践で頻繁に発生するミスを整理します。イベントソーシングプロジェクトが失敗するほとんどの原因がここにあります。
アンチパターン1:イベントに現在の状態全体を保存
// BAD:イベントに全状態を入れるのはCRUDと変わらない
interface OrderUpdated {
eventType: 'OrderUpdated'
data: {
order: Order // 全Orderオブジェクト
}
}
// GOOD:変更された事実のみをイベントとして記録
interface OrderItemAdded {
eventType: 'OrderItemAdded'
data: {
orderId: string
productId: string
quantity: number
unitPrice: number
}
}
アンチパターン2:プロジェクションで外部サービスを呼び出す
プロジェクションハンドラーで外部APIを呼び出すと、イベントリプレイ(replay)時に副作用が発生します。プロジェクションは純粋関数のようにイベントデータだけで読み取りモデルを更新すべきです。外部サービス呼び出しが必要なロジックは別のPolicyハンドラーやSagaに分離します。
アンチパターン3:すべてのドメインにイベントソーシングを適用
イベントソーシングは状態変更履歴がビジネス価値を持つドメインに適用します。ユーザー設定、コードテーブルなど単純なCRUDドメインにイベントソーシングを適用すると複雑度だけが上がり、得るものはありません。システムのコアドメイン(Core Domain)にのみ選択的に適用するのが正しい戦略です。
アンチパターン4:イベントストリームに10万件以上蓄積
1つのAggregateストリームにイベントが10万件以上蓄積されると、ロード時間が数秒以上かかります。スナップショットで緩和できますが、根本的な原因はAggregate境界の設計が間違っていることです。Aggregateをより小さな単位に分割するか、定期的なイベント生成パターンを見直してください。
イベントソーシング vs 従来型CRUD判断基準
最後に、イベントソーシング導入の可否を判断する際に参考になる比較表です。
| 基準 | 従来型CRUD | イベントソーシング |
|---|---|---|
| 現在状態の照会 | シンプル(直接読み取り) | 複雑(イベントリプレイまたはプロジェクション) |
| 変更履歴の追跡 | 別途監査ログが必要 | 自動確保(イベント自体が履歴) |
| スキーマ変更 | ALTER TABLEマイグレーション | イベントバージョン管理 + Upcasting |
| デバッグ | 現在状態のみ確認可能 | 全履歴リプレイで問題原因追跡可能 |
| データ整合性保証 | ACIDトランザクション | Aggregate単位の一貫性 + 結果整合性 |
| ストレージ容量 | 現在状態のみ保存 | すべてのイベントが蓄積(保存コスト高い) |
| 読み取り性能 | インデックス最適化で十分 | プロジェクション設計が必須 |
| 初期開発速度 | 速い | 遅い(学習曲線、インフラ構成) |
| 複雑ドメイン対応力 | ドメイン複雑度増加時に限界 | イベント中心モデリングで拡張性確保 |
| チーム能力要件 | 一般的 | DDD、イベントモデリング経験が必要 |
金融取引、物流追跡、医療記録、コラボレーションツールのように変更履歴自体がビジネス価値であるドメインでは、イベントソーシングは卓越した選択です。それ以外の場合は従来のCRUDを維持し、必要な部分にのみ監査ログを追加するのが現実的です。
まとめ
イベントソーシングとCQRSは強力ですが複雑なパターンです。成功裏に導入するには以下を覚えてください。
- イベントスキーマの設計に最も多くの時間を投資すること。イベントは永遠に残ります。
- プロジェクションは必ず冪等に実装すること。イベントリプレイが安全でなければ運用が成り立ちません。
- スナップショットは性能最適化ツールであり、アーキテクチャの必須要素ではありません。必要な時に導入してください。
- イベントバージョン管理戦略をプロジェクト初期に確立すること。後から追加するのは困難です。
- すべてのドメインに適用しないこと。コアドメインにのみ選択的に適用してください。
References
- Microsoft Azure - Event Sourcing Pattern
- Microsoft Azure - CQRS Pattern
- Martin Fowler - Event Sourcing
- Martin Fowler - CQRS
- EventStoreDB Documentation - Projections
- AWS Prescriptive Guidance - Event Sourcing Pattern
- Oskar Dudycz - How to (not) do the events versioning
- Kurrent Blog - Snapshots in Event Sourcing
- microservices.io - Event Sourcing
- Greg Young - Versioning in an Event Sourced System