Skip to content

필사 모드: イベントソーシング本番運用のアンチパターン:スキーマ進化、スナップショット、イベントストアスケーリング

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

この記事の位置づけ

本シリーズの前回の記事では、イベントソーシングとCQRSの基本的な実装(EventStoreDBベースのアーキテクチャやKafkaベースのマイクロサービスパターン、Sagaオーケストレーション)を解説した。本記事は、コアコンセプト(append-onlyイベントストリーム、リプレイによるアグリゲート状態復元、リードモデルプロジェクション、コマンドパスとクエリパスの分離)を理解していることを前提とする。

本記事では、本番環境で発生する問題とそれを防ぐパターンに特化する。具体的には、イベントスキーマの進化とバージョニング戦略、パフォーマンスのためのスナップショット、大規模なプロジェクション再構築、イベントストア技術選定のトレードオフ、本番障害を引き起こすアンチパターン、実際の障害事例と復旧手順を取り上げる。

イベントスキーマの進化とバージョニング

イベントソーシングシステムにおいて、イベントはイミュータブル(不変)である。一度永続化されたイベントは、変更も削除もできない。しかし、ビジネス要件は常に変化する。フィールドが追加、名前変更、削除される。イベント構造は進化する。これがイベントソーシングにおける最も困難な運用上の課題である。

4つのバージョニング戦略

スキーマ変更に対処する主要な戦略は4つあり、それぞれ異なるトレードオフがある。

**戦略1: 弱いスキーマ(寛容なリーダー)**

古いフィールドを維持しつつ、新しいオプショナルフィールドを追加する。コンシューマーは理解できないフィールドを無視する。

// バージョン1: 元のイベント

interface OrderPlacedV1 {

type: 'OrderPlaced'

orderId: string

customerId: string

totalAmount: number

}

// バージョン2: shippingAddressを追加(後方互換のためオプショナル)

interface OrderPlacedV2 {

type: 'OrderPlaced'

orderId: string

customerId: string

totalAmount: number

shippingAddress?: {

street: string

city: string

zipCode: string

country: string

}

currency?: string // デフォルト: 'USD'

}

// コンシューマーは寛容なリーダーパターンを使用

function handleOrderPlaced(event: OrderPlacedV1 | OrderPlacedV2) {

const currency = 'currency' in event ? event.currency : 'USD'

const address = 'shippingAddress' in event ? event.shippingAddress : null // フォールバックとして顧客プロファイルから取得

}

これは追加的な変更にのみ有効である。フィールドの名前変更、構造変更、削除が必要な場合は破綻する。

**戦略2: アップキャスティング(読み取り時変換)**

古いイベントを読み取り時に最新のスキーマバージョンに変換する。イベントストアには元のバイト列が保持されるが、アプリケーション層が処理前にアップキャスターチェーンを適用する。

from dataclasses import dataclass

from typing import Any, Callable

@dataclass

class EventEnvelope:

stream_id: str

event_type: str

version: int

data: dict

metadata: dict

class UpcasterChain:

"""アップキャスターをチェーンしてイベントを任意のバージョンから最新に変換する。"""

def __init__(self):

self._upcasters: dict[tuple[str, int], Callable] = {}

def register(self, event_type: str, from_version: int,

upcaster: Callable[[dict], dict]):

self._upcasters[(event_type, from_version)] = upcaster

def upcast(self, envelope: EventEnvelope,

target_version: int) -> EventEnvelope:

current = envelope

while current.version < target_version:

key = (current.event_type, current.version)

if key not in self._upcasters:

raise ValueError(

f"No upcaster for {current.event_type} "

f"v{current.version} -> v{current.version + 1}"

)

new_data = self._upcasters[key](current.data)

current = EventEnvelope(

stream_id=current.stream_id,

event_type=current.event_type,

version=current.version + 1,

data=new_data,

metadata=current.metadata,

)

return current

アップキャスターの登録

chain = UpcasterChain()

V1 -> V2: 'name'を'firstName'と'lastName'に分割

chain.register('CustomerRegistered', 1, lambda d: {

**d,

'firstName': d['name'].split(' ')[0],

'lastName': ' '.join(d['name'].split(' ')[1:]),

})

V2 -> V3: デフォルト値で'tier'フィールドを追加

chain.register('CustomerRegistered', 2, lambda d: {

**d,

'tier': 'standard',

})

アップキャスティングは、ほとんどのスキーマ進化シナリオで推奨されるアプローチである。元のイベントはストレージ上でそのまま保持され、インメモリ表現のみが変化する。

**戦略3: 新しいイベントタイプ**

イベントのセマンティクスが根本的に変わる場合は、既存のイベントをバージョニングするのではなく、新しいイベントタイプを導入する。

// OrderPlacedV3で破壊的変更を加える代わりに、

// 完全に新しいイベントタイプを作成

interface OrderSubmitted {

type: 'OrderSubmitted' // 新しい名前が新しいセマンティクスを示す

orderId: string

customer: {

id: string

tier: 'standard' | 'premium' | 'enterprise'

}

lineItems: Array<{

productId: string

quantity: number

unitPrice: number

discount: number

}>

pricing: {

subtotal: number

tax: number

shipping: number

total: number

currency: string

}

submittedAt: string

}

バージョン番号の増殖を避けられるが、移行期間中はプロジェクションが古いイベントタイプと新しいイベントタイプの両方を処理する必要がある。

**戦略4: コピーアンドトランスフォーム(イベントストア移行)**

すべてのイベントに変換を適用して、イベントストア全体を書き直す。これは最終手段である。アップキャスティングが読み取り時にコスト過大になった場合、またはGDPR削除権などの規制コンプライアンスのためにデータを消去する必要がある場合にのみ使用する。

-- 警告: これは破壊的操作。必ず完全バックアップを取得すること。

-- ステップ1: 新しいeventsテーブルを作成

CREATE TABLE events_v2 (

global_position BIGSERIAL PRIMARY KEY,

stream_id UUID NOT NULL,

stream_position INTEGER NOT NULL,

event_type VARCHAR(256) NOT NULL,

data JSONB NOT NULL,

metadata JSONB NOT NULL,

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

UNIQUE (stream_id, stream_position)

);

-- ステップ2: 変換を伴う移行

INSERT INTO events_v2 (stream_id, stream_position, event_type, data, metadata, created_at)

SELECT

stream_id,

stream_position,

event_type,

CASE

WHEN event_type = 'CustomerRegistered' AND data->>'version' = '1'

THEN jsonb_build_object(

'version', '3',

'firstName', split_part(data->>'name', ' ', 1),

'lastName', regexp_replace(data->>'name', '^[^ ]+ ', ''),

'email', data->>'email',

'tier', 'standard'

)

ELSE data

END,

metadata,

created_at

FROM events

ORDER BY global_position;

-- ステップ3: テーブルのスワップ(メンテナンスウィンドウ中に実施)

ALTER TABLE events RENAME TO events_deprecated;

ALTER TABLE events_v2 RENAME TO events;

スキーマ進化の判断マトリクス

| 変更タイプ | 弱いスキーマ | アップキャスティング | 新しいイベントタイプ | コピーアンドトランスフォーム |

| -------------------------- | ------------ | -------------------- | -------------------- | ---------------------------- |

| オプショナルフィールド追加 | 最適 | 過剰 | 過剰 | 過剰 |

| フィールド名変更 | 不可 | 最適 | 良好 | 大規模 |

| フィールド分割 | 不可 | 最適 | 良好 | 大規模 |

| フィールド削除 | リスクあり | 良好 | 最適 | 良好 |

| セマンティクス変更 | 不可 | 不可 | 最適 | 良好 |

| 規制対応の消去 | 不可 | 不可 | 不可 | 必須 |

スナップショット戦略

アグリゲートのライフタイム中に数千のイベントが蓄積されると、コマンドのたびにすべてのイベントをリプレイするのは許容できないほど遅くなる。スナップショットは、アグリゲートの現在の状態を定期的に保存することで、将来のロードではスナップショット以降のイベントのみをリプレイすればよくなる。

スナップショット実装パターン

**パターン1: 定期スナップショット(N件ごと)**

from dataclasses import dataclass, field

from typing import Optional

@dataclass

class Snapshot:

stream_id: str

version: int # スナップショット時のイベントバージョン

state: dict

created_at: str

class SnapshotStore:

"""PostgreSQLバックエンドのスナップショットストア。"""

def __init__(self, conn):

self._conn = conn

async def save_snapshot(self, snapshot: Snapshot):

await self._conn.execute("""

INSERT INTO snapshots (stream_id, version, state, created_at)

VALUES ($1, $2, $3, $4)

ON CONFLICT (stream_id) DO UPDATE

SET version = $4, state = $3, created_at = $4

""", snapshot.stream_id, snapshot.version,

json.dumps(snapshot.state), snapshot.created_at)

async def load_snapshot(self, stream_id: str) -> Optional[Snapshot]:

row = await self._conn.fetchrow("""

SELECT stream_id, version, state, created_at

FROM snapshots WHERE stream_id = $1

""", stream_id)

if row:

return Snapshot(

stream_id=row['stream_id'],

version=row['version'],

state=json.loads(row['state']),

created_at=row['created_at'],

)

return None

class AggregateRepository:

SNAPSHOT_INTERVAL = 100 # 100件ごとにスナップショット

def __init__(self, event_store, snapshot_store):

self._event_store = event_store

self._snapshot_store = snapshot_store

async def load(self, stream_id: str):

snapshot = await self._snapshot_store.load_snapshot(stream_id)

if snapshot:

aggregate = self._rebuild_from_snapshot(snapshot)

start_version = snapshot.version + 1

else:

aggregate = self._create_empty()

start_version = 0

events = await self._event_store.read_stream(

stream_id, from_version=start_version

)

for event in events:

aggregate.apply(event)

return aggregate

async def save(self, aggregate):

new_events = aggregate.uncommitted_events

current_version = aggregate.version

await self._event_store.append(

aggregate.stream_id, new_events, current_version

)

閾値に達したらスナップショットを作成

if current_version % self.SNAPSHOT_INTERVAL == 0:

await self._snapshot_store.save_snapshot(Snapshot(

stream_id=aggregate.stream_id,

version=current_version,

state=aggregate.to_dict(),

created_at=self._now(),

))

**パターン2: 時間ベースのスナップショット**

イベント数ではなく時間間隔でスナップショットを取得する。アグリゲート間でイベント頻度が大きく異なる場合に有効。

**パターン3: オンデマンドスナップショット**

ロードレイテンシが閾値を超えた場合にのみスナップショットを作成する。アクセス頻度が低いアグリゲートに対して不要なスナップショット書き込みを避けられる。

スナップショットのアンチパターン

- **早すぎるスナップショット導入**: パフォーマンス問題を計測で確認する前にスナップショットを導入してはならない。ほとんどのアグリゲートは100件未満のイベントであり、リプレイは瞬時に完了する。

- **スナップショットのバージョン結合**: スナップショットにはバージョンが必要。アグリゲートの状態構造を変更すると、古いスナップショットが読み取り不能になる。スナップショットには必ずスキーマバージョンを含める。

- **古いスナップショットを削除しない**: スナップショットはストレージを蓄積する。アグリゲートごとに最新のスナップショットのみを保持するリテンションポリシーを実装する。

プロジェクションの再構築

プロジェクション(リードモデル)はイベントストリームから派生する。プロジェクションにバグがある場合、または新しいプロジェクションを導入する場合、関連するすべてのイベントをリプレイしてゼロから再構築する必要がある。

大規模な再構築

数百万のイベントに対するプロジェクション再構築は自明ではない。ポジション0からイベントを1件ずつ処理するナイーブなアプローチは、数時間から数日かかる可能性がある。

from dataclasses import dataclass

@dataclass

class ProjectionRebuildConfig:

batch_size: int = 1000

max_concurrent_batches: int = 5

checkpoint_interval: int = 10_000

progress_log_interval: int = 50_000

class ProjectionRebuilder:

"""バッチ処理とチェックポイントによる最適化されたプロジェクション再構築。"""

def __init__(self, event_store, projection_handler, checkpoint_store,

config: ProjectionRebuildConfig = None):

self._event_store = event_store

self._handler = projection_handler

self._checkpoint_store = checkpoint_store

self._config = config or ProjectionRebuildConfig()

async def rebuild(self, projection_name: str):

再構築が中断された場合、最後のチェックポイントから再開

last_position = await self._checkpoint_store.get(

f"rebuild:{projection_name}"

)

start_position = last_position + 1 if last_position else 0

total_processed = 0

position = start_position

while True:

events = await self._event_store.read_all(

from_position=position,

count=self._config.batch_size,

)

if not events:

break

バッチを処理

await self._handler.handle_batch(events)

position = events[-1].global_position + 1

total_processed += len(events)

定期的にチェックポイント

if total_processed % self._config.checkpoint_interval == 0:

await self._checkpoint_store.save(

f"rebuild:{projection_name}",

events[-1].global_position,

)

if total_processed % self._config.progress_log_interval == 0:

print(

f"再構築進捗: {total_processed}件処理済み, "

f"ポジション: {position}"

)

print(f"再構築完了: 合計{total_processed}件処理済み")

プロジェクション再構築の主要原則

1. **冪等性**: プロジェクションは冪等でなければならない。同じイベントを2回処理しても同じ結果を生む必要がある。盲目的なINSERTではなく`UPSERT`または`INSERT ... ON CONFLICT`を使用する。

2. **チェックポイント**: 最後に処理したグローバルポジションを保存し、障害後に再構築を再開できるようにする。

3. **ブルーグリーンプロジェクション**: 新しいプロジェクションを別のテーブルまたはデータベースに構築し、アトミックにスワップする。再構築中のダウンタイムを回避できる。

4. **バックプレッシャー**: プロジェクションが外部システム(Elasticsearch、Redis)に書き込む場合、リプレイ中にそれらを圧倒しないようレート制限を実装する。

イベントストア技術比較

適切なイベントストアの選定は、重要なアーキテクチャ上の決定である。以下の表は、主要な4つの選択肢を重要なディメンションで比較する。

| ディメンション | EventStoreDB | Apache Kafka | PostgreSQL | DynamoDB |

| ---------------------------- | ---------------------------------------- | ----------------------------------------------------- | ------------------------------------------ | --------------------------------- |

| **主な用途** | 専用イベントストア | 分散ストリーミング基盤 | 汎用RDBMS | マネージドNoSQL(AWS) |

| **ストリーム単位の読み取り** | ネイティブ(ストリームIDでインデックス) | コンシューマーのキーフィルタリングが必要 | stream_idカラムのカスタムクエリ | パーティションキークエリ |

| **グローバル順序** | 組み込みグローバルポジション | パーティション内のみ、グローバル順序なし | BIGSERIALグローバルポジションカラム | ネイティブのグローバル順序なし |

| **楽観的同時実行制御** | ネイティブの期待バージョン | サポートなし | カスタムバージョンカラムチェック | 条件式(Condition Expressions) |

| **組み込みプロジェクション** | あり(JavaScriptベース) | なし(Kafka StreamsまたはksqlDBを使用) | なし(カスタムコード) | なし(DynamoDB Streams + Lambda) |

| **最大スループット** | 約15,000書き込み/秒(ノードあたり) | 100,000+書き込み/秒(クラスターあたり) | 10,000-50,000書き込み/秒(チューニング後) | 実質無制限(オンデマンド) |

| **サブスクリプションモデル** | キャッチアップ・永続サブスクリプション | コンシューマーグループ(オフセット追跡) | ポーリングまたはLISTEN/NOTIFY | DynamoDB Streams(CDC) |

| **運用複雑度** | 中(専用クラスター) | 高(ZooKeeper/KRaft、ブローカー、スキーマレジストリ) | 低(既存インフラ) | 低(フルマネージド) |

| **データ保持** | 無制限(append-only) | 設定可能(リテンションポリシー) | 無制限(手動管理) | 設定可能(TTL) |

| **GDPR削除** | 暗号シュレッディングまたはストリーム削除 | トピックコンパクション(トゥームストーン) | SQL DELETE(不変性を破壊) | TTLまたは条件付き削除 |

| **最適な用途** | 専用ESシステム | 高スループットイベントストリーミング | 既存PostgreSQLを持つチーム | AWSネイティブサーバーレス |

使い分けの指針

- **EventStoreDB**: 専用のイベントソーシングシステムを構築し、ストリーム、サブスクリプション、プロジェクションのネイティブサポートが必要な場合。書き込みスループットが秒間15,000イベント未満の場合。

- **Apache Kafka**: 主な要件がマイクロサービス間の高スループットイベントストリーミングである場合。イベントソーシングのプリミティブ(ストリーム読み取り、同時実行制御)をKafka上に自前またはフレームワークで構築する意思がある場合。

- **PostgreSQL**: すでにPostgreSQLを運用しており、新しいインフラを導入せずにイベントソーシングを始めたい場合。強いトランザクション保証が必要で、標準SQLでストリーム読み取りを実装できる場合。

- **DynamoDB**: AWS上で運用しており、フルマネージドのサーバーレスイベントストアが必要な場合。DynamoDB StreamsがプロジェクションのためのCDCを提供し、従量課金モデルが変動ワークロードに適している場合。

本番運用のアンチパターン

アンチパターン1: プロパティソーシング

`NameChanged`、`EmailChanged`、`AddressChanged`のようなイベントを保存する代わりに、`CustomerRelocated`や`CustomerContactUpdated`のようなドメイン的に意味のあるイベントを保存すべきである。プロパティソーシングされたイベントはビジネス意図を持たず、変更が行われた理由を理解することが不可能になる。

// 悪い例: プロパティソーシング - ビジネス意図がない

const badEvents = [

{ type: 'FieldUpdated', field: 'status', value: 'suspended' },

{ type: 'FieldUpdated', field: 'reason', value: 'payment_failed' },

]

// 良い例: ビジネス意味を持つドメインイベント

const goodEvents = [

{

type: 'AccountSuspendedDueToPaymentFailure',

accountId: 'ACC-123',

failedPaymentId: 'PAY-456',

suspendedAt: '2026-03-07T10:30:00Z',

retryScheduledAt: '2026-03-10T10:30:00Z',

},

]

アンチパターン2: サービス間通信としてのイベントソーシング

イベントソーシングは単一の境界づけられたコンテキスト内のローカルな決定である。内部ドメインイベントを他のサービスに直接公開すべきではない。代わりに、内部イベントをパブリックコントラクトにマッピングした統合イベントをメッセージバスに公開する。

アンチパターン3: 大きなイベント(ファットイベント)

すべてのイベントにアグリゲートの状態全体を格納すると、イベントソーシングの目的が失われる。イベントにはデルタ(何が変わったか、なぜ変わったか)のみを含めるべきである。

アンチパターン4: イベントハンドラの冪等性の欠如

プロジェクションハンドラがバッチ処理中にクラッシュすると、イベントは再配信される。冪等性がなければ、リードモデルにデータが重複する。

-- 悪い例: 盲目的なINSERT(再配信時に重複)

INSERT INTO order_summary (order_id, status, total)

VALUES ('ORD-001', 'placed', 150.00);

-- 良い例: 冪等なUPSERT

INSERT INTO order_summary (order_id, status, total, last_event_position)

VALUES ('ORD-001', 'placed', 150.00, 42)

ON CONFLICT (order_id) DO UPDATE

SET status = EXCLUDED.status,

total = EXCLUDED.total,

last_event_position = EXCLUDED.last_event_position

WHERE order_summary.last_event_position < EXCLUDED.last_event_position;

アンチパターン5: 相関IDと因果IDの欠如

相関IDがないと、イベントストリーム間の本番障害デバッグはほぼ不可能になる。すべてのイベントに`correlationId`(発信元リクエスト)と`causationId`(このイベントを直接引き起こしたイベント)を含めるべきである。

実際の障害事例

事例1: プロジェクション遅延による古い読み取り

**シナリオ**: あるECプラットフォームで、ピーク時に在庫プロジェクションがイベントストアから30秒遅延し、チェックアウト失敗が発生した。リードモデルでは商品が「在庫あり」と表示されていたが、ライトモデルでは在庫が既に枯渇していたため注文が拒否された。

**根本原因**: プロジェクションハンドラが並列化なしで順次処理していた。フラッシュセール中にイベント書き込み速度がプロジェクション処理速度を超えた。

**復旧策**:

1. パーティション化されたプロジェクション処理を実装 -- アグリゲートIDで並列化する。

2. リードモデルクエリにプロジェクション遅延を返す「鮮度」インジケータを追加する。

3. チェックアウトのようなクリティカルパスでは、プロジェクションではなくイベントストリームから直接読み取る(より強い整合性)。

事例2: アグリゲートリファクタリング後のスナップショット破損

**シナリオ**: あるチームが`Order`アグリゲートをリファクタリングし、`shippingInfo`を別々の`address`と`carrier`フィールドに分割した。デプロイ後、既存のスナップショットを持つすべての注文が、古いスナップショット形式でのデシリアライゼーションエラーにより読み込みに失敗した。

**復旧策**:

1. 影響を受けたアグリゲートタイプの既存スナップショットをすべて無効化する。

2. デシリアライゼーション失敗時にフルリプレイにフォールバックするtry-catchでスナップショットロードをラップしてデプロイする。

3. マイグレーションロジック付きのスナップショットバージョニングを実装する。

事例3: イベントストアのディスク枯渇

**シナリオ**: 大量のIoTプラットフォームが生のセンサー読み取り値をイベントとして保存していた。50,000台のセンサーが毎秒レポートし、イベントストアは1日あたり200GB増加した。3ヶ月後の週末に、EventStoreDBクラスターのディスク容量が枯渇し、完全な障害が発生した。

**復旧策**:

1. スケジュールジョブで古いイベントストリームをコールドストレージ(S3)にアーカイブする。

2. リテンションウィンドウを超えたストリームにストリームトランケーションを使用する。

3. 生のセンサー読み取り値がイベントストアに属するかどうかを再検討する -- 高頻度テレメトリには時系列データベースがより適切な可能性がある。

運用チェックリスト

イベントソーシングで本番環境に移行する前に、以下を確認する。

- バージョン追跡付きのイベントスキーマレジストリが整備されている。

- すべての歴史的なイベントバージョンに対してアップキャスターが登録・テストされている。

- バージョニングとリテンションポリシーを含むスナップショット戦略が定義されている。

- ダウンタイムなしでプロジェクション再構築をトリガーできる(ブルーグリーンデプロイメント)。

- すべてのイベントハンドラが重複排除チェック付きで冪等である。

- すべてのイベントに相関IDと因果IDが存在する。

- イベントストア監視に以下が含まれる:追記レイテンシ、サブスクリプション遅延、ディスク使用量、ストリーム数。

- 災害復旧計画に以下が含まれる:イベントストアのバックアップ/リストア手順、プロジェクション再構築の運用手順書。

- GDPRコンプライアンス戦略が文書化されている(暗号シュレッディングまたはストリーム削除)。

アーキテクチャ図: 本番イベントソーシングシステム

+------------------+ +-----------------+ +------------------+

| API Gateway |---->| Command Handler |---->| Domain Aggregate |

+------------------+ +-----------------+ +--------+---------+

|

+---------------------------+

| Domain Events

v

+--------------------+

| Event Store |

| (append-only log) |

+--------+-----------+

|

+--------------+--------------+

| | |

v v v

+--------+---+ +------+------+ +----+--------+

| Projection | | Projection | | Projection |

| Handler A | | Handler B | | Handler C |

+--------+---+ +------+------+ +----+--------+

| | |

v v v

+--------+---+ +------+------+ +----+--------+

| PostgreSQL | |Elasticsearch| | Redis |

| Read Model | |Search Index | | Cache |

+------------+ +-------------+ +-------------+

イベントスキーマ進化パイプライン:

Event Store (生のバイト列)

|

v

+----------+ +-----------+ +-----------+

| Upcaster |---->| Upcaster |---->| Upcaster |

| V1->V2 | | V2->V3 | | V3->V4 |

+----------+ +-----------+ +-----------+

|

v

+---------------+

| Current Event |

| (Version 4) |

+---------------+

|

+-----------+-----------+

| |

v v

+--------+------+ +---------+-----+

| Command Side | | Projection |

| (Aggregate) | | (Read Model) |

+---------------+ +---------------+

まとめ

イベントソーシングは、完全な監査証跡、テンポラルクエリ、読み書きの独立したスケーラビリティを必要とするシステムに絶大な価値を提供する。しかし、運用上の複雑さは現実のものである。スキーマ進化、スナップショット、プロジェクション管理、イベントストアスケーリングはオプションの関心事ではなく、イベントソーシングシステムが成功するか運用負荷になるかを決定する核心的な課題である。

重要なポイントは以下の通りである:スキーマ進化の主要戦略としてアップキャスティングを使用すること、計測されたパフォーマンス要求がある場合にのみスナップショットを導入すること、冪等なリプレイのためにプロジェクションを設計すること、実際のスループット要件と運用能力に基づいてイベントストア技術を選定すること。

参考文献

- [Martin Fowler - Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html)

- [Greg Young - CQRS Documents](https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf)

- [Greg Young - Versioning in an Event Sourced System](https://leanpub.com/esversioning/read)

- [EventStoreDB (Kurrent) 公式ドキュメント](https://www.kurrent.io/)

- [Axon Framework ドキュメント](https://docs.axoniq.io/)

- [Microsoft Azure - Event Sourcing パターン](https://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)

- [AWS - DynamoDBでCQRSイベントストアを構築する](https://aws.amazon.com/blogs/database/build-a-cqrs-event-store-with-amazon-dynamodb/)

- [Oskar Dudycz - イベントバージョニングの方法(と方法でないこと)](https://event-driven.io/en/how_to_do_event_versioning/)

- [Oskar Dudycz - プロパティソーシングアンチパターン](https://event-driven.io/en/property_sourcing/)

- [EventStoreDB vs Kafka - Domain Centric](https://domaincentric.net/blog/eventstoredb-vs-kafka)

クイズ

Q1:

「イベントソーシング本番運用のアンチパターン:スキーマ進化、スナップショット、イベントストアスケーリング」の主なトピックは何ですか?

イベントソーシングの本番運用ガイド。イベントスキーマ進化戦略、スナップショットパターン、プロジェクション再構築、イベントストアスケーリング比較、回避すべきアンチパターン、実際の障害事例と復旧手順を解説。

本シリーズの前回の記事では、イベントソーシングとCQRSの基本的な実装(EventStoreDBベースのアーキテクチャやKafkaベースのマイクロサービスパターン、Sagaオーケストレーション)を解説した。本記事は、コアコンセプト(append-onlyイベントストリーム、リプレイによるアグリゲート状態復元、リードモデルプロジェクション、コマンドパスとクエリパスの分離)を理解していることを前提とする。

本記事では、本番環境で発生する問題とそれを防ぐパターンに特化する。

イベントソーシングシステムにおいて、イベントはイミュータブル(不変)である。一度永続化されたイベントは、変更も削除もできない。しかし、ビジネス要件は常に変化する。フィールドが追加、名前変更、削除される。イベント構造は進化する。これがイベントソーシングにおける最も困難な運用上の課題である。

4つのバージョニング戦略

スキーマ変更に対処する主要な戦略は4つあり、それぞれ異なるトレードオフがある。 戦略1:

弱いスキーマ(寛容なリーダー)

古いフィールドを維持しつつ、新しいオプショナルフィールドを追加する。コンシューマーは理解できないフィールドを無視する。

アグリゲートのライフタイム中に数千のイベントが蓄積されると、コマンドのたびにすべてのイベントをリプレイするのは許容できないほど遅くなる。スナップショットは、アグリゲートの現在の状態を定期的に保存することで、将来のロードではスナップショット以降のイベントのみをリプレイすればよくなる。

スナップショット実装パターン パターン1: 定期スナップショット(N件ごと) パターン2:

時間ベースのスナップショット

イベント数ではなく時間間隔でスナップショットを取得する。アグリゲート間でイベント頻度が大きく異なる場合に有効。

プロジェクション(リードモデル)はイベントストリームから派生する。プロジェクションにバグがある場合、または新しいプロジェクションを導入する場合、関連するすべてのイベントをリプレイしてゼロから再構築する必要がある。

大規模な再構築

数百万のイベントに対するプロジェクション再構築は自明ではない。ポジション0からイベントを1件ずつ処理するナイーブなアプローチは、数時間から数日かかる可能性がある。

プロジェクション再構築の主要原則 冪等性:

プロジェクションは冪等でなければならない。同じイベントを2回処理しても同じ結果を生む必要がある。

현재 단락 (1/430)

本シリーズの前回の記事では、イベントソーシングとCQRSの基本的な実装(EventStoreDBベースのアーキテクチャやKafkaベースのマイクロサービスパターン、Sagaオーケストレーション)を解説...

작성 글자: 0원문 글자: 17,107작성 단락: 0/430