Skip to content

필사 모드: Event-Driven Architecture + CQRS + Event Sourcing 実践実装:Kafka/RabbitMQ基盤の分散システム設計

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

はじめに

マイクロサービスアーキテクチャが一般化するにつれ、サービス間通信方式はシステムのスケーラビリティとレジリエンスを決定づける核心要素となった。同期REST呼び出しベースのアーキテクチャはサービス間の強い結合を生み、一つのサービス障害がシステム全体に伝播するカスケード障害を引き起こす。

Event-Driven Architecture(EDA)はこの問題を解決する代表的なパターンだ。サービスはイベントを発行(publish)し、関心のあるサービスがそれを購読(subscribe)して非同期で処理する。ここにCQRS(Command Query Responsibility Segregation)とEvent Sourcingを組み合わせると、読み取り/書き込みワークロードを独立してスケーリングし、すべての状態変更の監査証跡(audit trail)を自然に確保できる。

本記事ではEDAのコアパターンからCQRSとEvent Sourcingの実装、メッセージブローカー比較、Sagaパターン、そしてプロダクションで経験する障害事例と対応方法をコードレベルで扱う。

Event-Driven Architecture コアパターン

イベントの種類

EDAにおけるイベントは大きく3つに分類される。

| 種類 | 説明 | 例 | 特徴 |

| ---------------------- | ---------------------------------------- | ------------------------------ | ---------------------------- |

| **Domain Event** | ビジネスドメインで発生した意味のある事実 | OrderPlaced, PaymentCompleted | 過去形命名、不変 |

| **Integration Event** | サービス境界を越えて伝播するイベント | OrderShipped(物流サービスへ) | サービス間の契約 |

| **Notification Event** | 単純な通知(データ最小化) | OrderStatusChanged | 受信者が詳細データを別途取得 |

EDA通信パターン比較

| パターン | 結合度 | 配信保証 | 順序保証 | ユースケース |

| ------------------- | -------- | ------------- | ------------------------- | -------------------------- |

| **Pub/Sub** | 疎 | At-least-once | 保証なし | 通知、キャッシュ無効化 |

| **Event Streaming** | 疎 | At-least-once | パーティション内保証 | ログ集約、リアルタイム分析 |

| **Event Sourcing** | 自己参照 | 永続保存 | アグリゲート内保証 | 監査証跡、状態復元 |

| **Request-Reply** | 強 | 同期 | リクエスト-レスポンスペア | 同期的な確認が必要な場合 |

TypeScriptイベント基本構造

// ドメインイベント基本インターフェース

interface DomainEvent {

readonly eventId: string

readonly eventType: string

readonly aggregateId: string

readonly aggregateType: string

readonly version: number

readonly timestamp: Date

readonly metadata: EventMetadata

readonly payload: Record<string, unknown>

}

interface EventMetadata {

readonly correlationId: string

readonly causationId: string

readonly userId?: string

readonly traceId?: string

}

// 具体的なイベント例

class OrderPlacedEvent implements DomainEvent {

readonly eventType = 'OrderPlaced'

readonly aggregateType = 'Order'

constructor(

public readonly eventId: string,

public readonly aggregateId: string,

public readonly version: number,

public readonly timestamp: Date,

public readonly metadata: EventMetadata,

public readonly payload: {

customerId: string

items: Array<{ productId: string; quantity: number; price: number }>

totalAmount: number

currency: string

}

) {}

}

CQRS(Command Query Responsibility Segregation)深掘り

CQRSのコア原則

CQRSは読み取り(Query)モデルと書き込み(Command)モデルを完全に分離するパターンだ。従来のCRUDモデルでは同一のデータモデルで読み取りと書き込みを両方処理するが、CQRSではそれぞれの目的に最適化された別々のモデルを使用する。

// Command側 - 書き込みモデル

interface Command {

readonly commandId: string

readonly commandType: string

readonly timestamp: Date

}

class PlaceOrderCommand implements Command {

readonly commandType = 'PlaceOrder'

constructor(

public readonly commandId: string,

public readonly timestamp: Date,

public readonly customerId: string,

public readonly items: Array<{

productId: string

quantity: number

price: number

}>,

public readonly shippingAddress: string

) {}

}

// Command Handler

class PlaceOrderHandler {

constructor(

private readonly orderRepository: OrderRepository,

private readonly eventBus: EventBus

) {}

async handle(command: PlaceOrderCommand): Promise<string> {

// ビジネス検証

await this.validateInventory(command.items)

await this.validateCustomerCredit(command.customerId)

// アグリゲート生成とイベント発行

const order = Order.create(command.customerId, command.items, command.shippingAddress)

await this.orderRepository.save(order)

// ドメインイベント発行

for (const event of order.getUncommittedEvents()) {

await this.eventBus.publish(event)

}

return order.id

}

private async validateInventory(

items: Array<{ productId: string; quantity: number; price: number }>

): Promise<void> {

// 在庫確認ロジック

}

private async validateCustomerCredit(customerId: string): Promise<void> {

// 顧客与信確認ロジック

}

}

Query側 - 読み取りモデル

// Query側 - 読み取りに最適化された非正規化モデル

interface OrderSummaryReadModel {

orderId: string

customerName: string

customerEmail: string

orderDate: Date

status: string

totalAmount: number

itemCount: number

lastUpdated: Date

}

// Query Handler

class GetOrderSummaryHandler {

constructor(private readonly readDb: ReadDatabase) {}

async handle(orderId: string): Promise<OrderSummaryReadModel | null> {

// 読み取り専用DBから非正規化データを直接取得

return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])

}

}

// Projection - イベントを読み取りモデルに変換

class OrderSummaryProjection {

constructor(private readonly readDb: ReadDatabase) {}

async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {

await this.readDb.execute(

`INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)

VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,

[

event.aggregateId,

event.payload.customerId,

event.timestamp,

event.payload.totalAmount,

event.payload.items.length,

new Date(),

]

)

}

async handleOrderShipped(event: DomainEvent): Promise<void> {

await this.readDb.execute(

`UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,

[new Date(), event.aggregateId]

)

}

}

Event Sourcing実装

Event Store設計

Event Sourcingでは状態を直接保存せず、状態変更イベントのシーケンスを保存する。現在の状態はイベントを順番にリプレイ(replay)して復元する。

// Event Storeインターフェース

interface EventStore {

append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>

getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>

getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>

}

// PostgreSQLベースのEvent Store実装

class PostgresEventStore implements EventStore {

constructor(private readonly pool: Pool) {}

async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {

const client = await this.pool.connect()

try {

await client.query('BEGIN')

// 楽観的同時実行制御(Optimistic Concurrency Control)

const result = await client.query(

'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',

[aggregateId]

)

const currentVersion = result.rows[0]?.current_version ?? 0

if (currentVersion !== expectedVersion) {

throw new ConcurrencyError(

`Expected version ${expectedVersion}, but current version is ${currentVersion}`

)

}

// イベントバッチ挿入

for (const event of events) {

await client.query(

`INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)

VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,

[

event.eventId,

event.aggregateId,

event.aggregateType,

event.eventType,

event.version,

JSON.stringify(event.payload),

JSON.stringify(event.metadata),

event.timestamp,

]

)

}

await client.query('COMMIT')

} catch (error) {

await client.query('ROLLBACK')

throw error

} finally {

client.release()

}

}

async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {

const result = await this.pool.query(

'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',

[aggregateId, fromVersion]

)

return result.rows.map(this.deserializeEvent)

}

async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {

const query = fromTimestamp

? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'

: 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'

const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]

const result = await this.pool.query(query, params)

return result.rows.map(this.deserializeEvent)

}

private deserializeEvent(row: any): DomainEvent {

return {

eventId: row.event_id,

eventType: row.event_type,

aggregateId: row.aggregate_id,

aggregateType: row.aggregate_type,

version: row.version,

timestamp: row.timestamp,

metadata: JSON.parse(row.metadata),

payload: JSON.parse(row.payload),

}

}

}

アグリゲートとイベントリプレイ

// Event Sourcedアグリゲート

abstract class EventSourcedAggregate {

private uncommittedEvents: DomainEvent[] = []

protected version: number = 0

abstract get id(): string

protected apply(event: DomainEvent): void {

this.when(event)

this.version = event.version

this.uncommittedEvents.push(event)

}

protected abstract when(event: DomainEvent): void

getUncommittedEvents(): DomainEvent[] {

return [...this.uncommittedEvents]

}

clearUncommittedEvents(): void {

this.uncommittedEvents = []

}

loadFromHistory(events: DomainEvent[]): void {

for (const event of events) {

this.when(event)

this.version = event.version

}

}

}

// Orderアグリゲート

class Order extends EventSourcedAggregate {

private _id: string = ''

private _customerId: string = ''

private _status: OrderStatus = OrderStatus.DRAFT

private _items: OrderItem[] = []

private _totalAmount: number = 0

get id(): string {

return this._id

}

static create(

customerId: string,

items: Array<{ productId: string; quantity: number; price: number }>,

shippingAddress: string

): Order {

const order = new Order()

const orderId = generateUUID()

const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

order.apply({

eventId: generateUUID(),

eventType: 'OrderPlaced',

aggregateId: orderId,

aggregateType: 'Order',

version: 1,

timestamp: new Date(),

metadata: { correlationId: generateUUID(), causationId: generateUUID() },

payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },

})

return order

}

confirm(): void {

if (this._status !== OrderStatus.PLACED) {

throw new Error('Order can only be confirmed when in PLACED status')

}

this.apply({

eventId: generateUUID(),

eventType: 'OrderConfirmed',

aggregateId: this._id,

aggregateType: 'Order',

version: this.version + 1,

timestamp: new Date(),

metadata: { correlationId: generateUUID(), causationId: generateUUID() },

payload: { confirmedAt: new Date().toISOString() },

})

}

protected when(event: DomainEvent): void {

switch (event.eventType) {

case 'OrderPlaced':

this._id = event.aggregateId

this._customerId = event.payload.customerId as string

this._status = OrderStatus.PLACED

this._items = event.payload.items as OrderItem[]

this._totalAmount = event.payload.totalAmount as number

break

case 'OrderConfirmed':

this._status = OrderStatus.CONFIRMED

break

case 'OrderShipped':

this._status = OrderStatus.SHIPPED

break

case 'OrderCancelled':

this._status = OrderStatus.CANCELLED

break

}

}

}

enum OrderStatus {

DRAFT = 'DRAFT',

PLACED = 'PLACED',

CONFIRMED = 'CONFIRMED',

SHIPPED = 'SHIPPED',

CANCELLED = 'CANCELLED',

}

Python Event Store実装

from dataclasses import dataclass, field

from datetime import datetime

from typing import Any, Protocol

from uuid import uuid4

@dataclass(frozen=True)

class DomainEvent:

event_id: str

event_type: str

aggregate_id: str

aggregate_type: str

version: int

timestamp: datetime

payload: dict[str, Any]

metadata: dict[str, str] = field(default_factory=dict)

class EventStore(Protocol):

async def append(

self,

aggregate_id: str,

events: list[DomainEvent],

expected_version: int,

) -> None: ...

async def get_events(

self, aggregate_id: str, from_version: int = 0

) -> list[DomainEvent]: ...

class PostgresEventStore:

def __init__(self, pool: asyncpg.Pool):

self._pool = pool

async def append(

self,

aggregate_id: str,

events: list[DomainEvent],

expected_version: int,

) -> None:

async with self._pool.acquire() as conn:

async with conn.transaction():

楽観的同時実行制御

row = await conn.fetchrow(

"SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",

aggregate_id,

)

current = row["cur"] or 0

if current != expected_version:

raise ConcurrencyError(

f"Expected {expected_version}, got {current}"

)

for event in events:

await conn.execute(

"""

INSERT INTO events

(event_id, aggregate_id, aggregate_type,

event_type, version, payload, metadata, timestamp)

VALUES ($1, $2, $3, $4, $5, $6, $7, $8)

""",

event.event_id,

event.aggregate_id,

event.aggregate_type,

event.event_type,

event.version,

json.dumps(event.payload),

json.dumps(event.metadata),

event.timestamp,

)

async def get_events(

self, aggregate_id: str, from_version: int = 0

) -> list[DomainEvent]:

async with self._pool.acquire() as conn:

rows = await conn.fetch(

"""

SELECT * FROM events

WHERE aggregate_id = $1 AND version > $2

ORDER BY version ASC

""",

aggregate_id,

from_version,

)

return [self._deserialize(row) for row in rows]

@staticmethod

def _deserialize(row) -> DomainEvent:

return DomainEvent(

event_id=row["event_id"],

event_type=row["event_type"],

aggregate_id=row["aggregate_id"],

aggregate_type=row["aggregate_type"],

version=row["version"],

timestamp=row["timestamp"],

payload=json.loads(row["payload"]),

metadata=json.loads(row["metadata"]),

)

メッセージブローカー比較:Kafka vs RabbitMQ vs NATS

主要特性比較

| 項目 | Apache Kafka | RabbitMQ | NATS JetStream |

| ------------------------ | -------------------------------- | -------------------------- | --------------------------- |

| **モデル** | 分散ログ(Append-only) | メッセージキュー(Push型) | ストリーミング(Pull/Push) |

| **メッセージ保存** | 設定期間中永久保存 | 消費後削除(デフォルト) | 設定期間保存 |

| **順序保証** | パーティション内保証 | キュー単位保証 | ストリーム内保証 |

| **スループット** | 秒間数百万件 | 秒間数万件 | 秒間数十万件 |

| **レイテンシ** | ms単位(バッチ) | us単位(単件) | us単位 |

| **コンシューマグループ** | ネイティブサポート | 競合コンシューマパターン | ネイティブサポート |

| **リプレイ** | オフセットベース自由移動 | 限定的(dead letter) | シーケンスベース移動 |

| **運用複雑度** | 高(ZooKeeper/KRaft) | 中 | 低 |

| **適したケース** | イベントストリーミング、ログ集約 | タスクキュー、RPC | 軽量メッセージング、IoT |

Kafka Producer/Consumer例

const kafka = new Kafka({

clientId: 'order-service',

brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],

logLevel: logLevel.WARN,

retry: {

initialRetryTime: 100,

retries: 8,

},

})

// Producer - イベント発行

class KafkaEventPublisher {

private producer = kafka.producer({

idempotent: true, // 冪等性保証

maxInFlightRequests: 5,

transactionalId: 'order-service-producer',

})

async publish(event: DomainEvent): Promise<void> {

await this.producer.send({

topic: `events.${event.aggregateType.toLowerCase()}`,

messages: [

{

key: event.aggregateId, // パーティショニングキー = アグリゲートID

value: JSON.stringify(event),

headers: {

'event-type': event.eventType,

'correlation-id': event.metadata.correlationId,

'content-type': 'application/json',

},

},

],

})

}

async publishBatch(events: DomainEvent[]): Promise<void> {

const transaction = await this.producer.transaction()

try {

for (const event of events) {

await transaction.send({

topic: `events.${event.aggregateType.toLowerCase()}`,

messages: [

{

key: event.aggregateId,

value: JSON.stringify(event),

headers: {

'event-type': event.eventType,

'correlation-id': event.metadata.correlationId,

},

},

],

})

}

await transaction.commit()

} catch (error) {

await transaction.abort()

throw error

}

}

}

// Consumer - イベント消費(冪等性保証付き)

class KafkaEventConsumer {

private consumer = kafka.consumer({ groupId: 'projection-service' })

private processedEvents = new Set<string>()

async start(): Promise<void> {

await this.consumer.subscribe({

topics: ['events.order'],

fromBeginning: false,

})

await this.consumer.run({

eachMessage: async ({ topic, partition, message }) => {

const event: DomainEvent = JSON.parse(message.value!.toString())

// 冪等性チェック - 処理済みイベントをスキップ

if (await this.isAlreadyProcessed(event.eventId)) {

console.log(`Skipping duplicate event: ${event.eventId}`)

return

}

await this.handleEvent(event)

await this.markAsProcessed(event.eventId)

},

})

}

private async isAlreadyProcessed(eventId: string): Promise<boolean> {

return this.processedEvents.has(eventId)

}

private async markAsProcessed(eventId: string): Promise<void> {

this.processedEvents.add(eventId)

}

private async handleEvent(event: DomainEvent): Promise<void> {

switch (event.eventType) {

case 'OrderPlaced':

await this.handleOrderPlaced(event)

break

case 'OrderConfirmed':

await this.handleOrderConfirmed(event)

break

}

}

private async handleOrderPlaced(event: DomainEvent): Promise<void> {

console.log(`Processing OrderPlaced for ${event.aggregateId}`)

}

private async handleOrderConfirmed(event: DomainEvent): Promise<void> {

console.log(`Processing OrderConfirmed for ${event.aggregateId}`)

}

}

Sagaパターン:分散トランザクション管理

Orchestration Saga vs Choreography Saga

| 項目 | Orchestration | Choreography |

| -------------- | ----------------------------------- | ----------------------------------- |

| **制御方式** | 中央オーケストレーターが調整 | 各サービスが自律的に反応 |

| **結合度** | オーケストレーターに依存 | サービス間の疎結合 |

| **複雑性** | オーケストレーターに集中 | サービスに分散 |

| **追跡** | 中央で状態確認可能 | 分散された状態追跡が必要 |

| **エラー処理** | 中央で補償トランザクション実行 | 各サービスが補償イベントを発行 |

| **適した場合** | 複雑なワークフロー(5ステップ以上) | 単純なワークフロー(3ステップ以下) |

Orchestration Saga実装

// Saga状態マシン

enum SagaStatus {

STARTED = 'STARTED',

ORDER_CREATED = 'ORDER_CREATED',

PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',

INVENTORY_RESERVED = 'INVENTORY_RESERVED',

COMPLETED = 'COMPLETED',

COMPENSATING = 'COMPENSATING',

FAILED = 'FAILED',

}

interface SagaStep {

name: string

execute: (context: SagaContext) => Promise<void>

compensate: (context: SagaContext) => Promise<void>

}

interface SagaContext {

sagaId: string

orderId: string

customerId: string

items: Array<{ productId: string; quantity: number; price: number }>

paymentId?: string

reservationId?: string

[key: string]: unknown

}

class OrderSagaOrchestrator {

private steps: SagaStep[] = []

private completedSteps: SagaStep[] = []

constructor(

private readonly sagaRepository: SagaRepository,

private readonly eventBus: EventBus

) {

this.steps = [

{

name: 'CreateOrder',

execute: async (ctx) => {

await this.eventBus.publish({

eventType: 'CreateOrderRequested',

aggregateId: ctx.orderId,

payload: { customerId: ctx.customerId, items: ctx.items },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'CancelOrderRequested',

aggregateId: ctx.orderId,

payload: { reason: 'Saga compensation' },

} as DomainEvent)

},

},

{

name: 'ProcessPayment',

execute: async (ctx) => {

const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

await this.eventBus.publish({

eventType: 'ProcessPaymentRequested',

aggregateId: ctx.orderId,

payload: { customerId: ctx.customerId, amount: totalAmount },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'RefundPaymentRequested',

aggregateId: ctx.orderId,

payload: { paymentId: ctx.paymentId },

} as DomainEvent)

},

},

{

name: 'ReserveInventory',

execute: async (ctx) => {

await this.eventBus.publish({

eventType: 'ReserveInventoryRequested',

aggregateId: ctx.orderId,

payload: { items: ctx.items },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'ReleaseInventoryRequested',

aggregateId: ctx.orderId,

payload: { reservationId: ctx.reservationId },

} as DomainEvent)

},

},

]

}

async start(context: SagaContext): Promise<void> {

await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)

await this.executeNextStep(context, 0)

}

private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {

if (stepIndex >= this.steps.length) {

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)

return

}

const step = this.steps[stepIndex]

try {

await step.execute(context)

this.completedSteps.push(step)

await this.executeNextStep(context, stepIndex + 1)

} catch (error) {

console.error(`Saga step '${step.name}' failed:`, error)

await this.compensate(context)

}

}

private async compensate(context: SagaContext): Promise<void> {

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)

// 完了済みステップを逆順で補償

for (const step of [...this.completedSteps].reverse()) {

try {

await step.compensate(context)

} catch (error) {

console.error(`Compensation for '${step.name}' failed:`, error)

// 補償失敗時は手動介入が必要 - アラート送信

}

}

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)

}

}

Snapshot戦略:イベントリプレイ最適化

イベント数が多くなるとリプレイ時間が長くなる。Snapshotは特定時点のアグリゲート状態を保存してこの問題を解決する。

// Snapshotベースのアグリゲートロード

class EventSourcedRepository<T extends EventSourcedAggregate> {

private readonly SNAPSHOT_INTERVAL = 50 // 50イベントごとにスナップショット

constructor(

private readonly eventStore: EventStore,

private readonly snapshotStore: SnapshotStore,

private readonly factory: () => T

) {}

async load(aggregateId: string): Promise<T> {

const aggregate = this.factory()

// 1. 最新スナップショットをロード

const snapshot = await this.snapshotStore.getLatest(aggregateId)

if (snapshot) {

aggregate.restoreFromSnapshot(snapshot.state)

// 2. スナップショット以降のイベントのみリプレイ

const events = await this.eventStore.getEvents(aggregateId, snapshot.version)

aggregate.loadFromHistory(events)

} else {

// 3. 全イベントリプレイ

const events = await this.eventStore.getEvents(aggregateId)

aggregate.loadFromHistory(events)

}

return aggregate

}

async save(aggregate: T): Promise<void> {

const events = aggregate.getUncommittedEvents()

await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)

// スナップショット生成条件確認

if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {

await this.snapshotStore.save({

aggregateId: aggregate.id,

version: aggregate.version,

state: aggregate.toSnapshot(),

timestamp: new Date(),

})

}

aggregate.clearUncommittedEvents()

}

}

運用上の注意事項

必ず知っておくべき重要な警告

1. **イベントスキーマ変更はDBマイグレーションより危険**:Event Storeのイベントは不変であるため、スキーマを変更すると過去のイベントリプレイが壊れる。必ず下位互換性(backward compatibility)を維持しなければならない。

2. **CQRS導入は複雑度を最低2倍にする**:読み取りモデルと書き込みモデルの同期遅延(eventual consistency)をUIとビジネスロジックの両方で必ず処理しなければならない。

3. **Saga補償トランザクション失敗は手動介入が必要**:補償トランザクションも失敗する可能性があり、その場合Dead Letter Queueと手動復旧プロセスが必ず必要だ。

4. **イベント順序逆転はデータ整合性を破壊する**:Kafkaのパーティショニングキーが不適切に設定されると、同一アグリゲートのイベントが異なるパーティションに分散され、順序が逆転する。

5. **Event Storeの無限成長を管理すべき**:SnapshotとアーカイビングなしではEvent Storeが無限に大きくなり、リプレイ性能が急激に低下する。

障害事例と復旧手順

事例1:イベント順序逆転

**症状**:OrderPlacedの前にOrderShippedイベントが到着し、プロジェクションが失敗する。

**原因**:Kafkaパーティショニングキーを設定しなかったため、イベントが異なるパーティションに分散された。

**復旧**:

1. コンシューマグループオフセットリセット

kafka-consumer-groups.sh --bootstrap-server kafka:9092 \

--group projection-service \

--topic events.order \

--reset-offsets --to-earliest --execute

2. プロジェクションテーブル初期化

psql -c "TRUNCATE TABLE order_summaries;"

3. プロジェクションサービス再起動(完全リプレイ)

kubectl rollout restart deployment/projection-service

**予防**:必ずアグリゲートIDをKafkaパーティショニングキーとして使用する。

事例2:重複イベント処理

**症状**:注文が二重に生成されたり、決済が2回処理される。

**原因**:コンシューマ障害後の再起動時に、すでに処理されたメッセージを再度消費する(at-least-once特性)。

**復旧**:

// 冪等性キーテーブルベースの重複防止

class IdempotencyGuard {

constructor(private readonly db: Database) {}

async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {

try {

// UNIQUE制約で重複挿入を防止

await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [

idempotencyKey,

new Date(),

])

} catch (error) {

// 処理済みイベント

console.log(`Event ${idempotencyKey} already processed, skipping`)

return null

}

return operation()

}

}

事例3:スキーマ進化の失敗

**症状**:新バージョンのイベントハンドラーが過去のイベントをデシリアライズできず、プロジェクションが中断される。

**原因**:イベントに必須フィールドを追加したが、過去のイベントにはそのフィールドがない。

**復旧**:

// イベントアップキャスター(Upcaster)パターン

class EventUpcaster {

private upcasters: Map<string, (event: any) => DomainEvent> = new Map()

register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {

this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)

}

upcast(event: any): DomainEvent {

const key = `${event.eventType}:v${event.schemaVersion || 1}`

const upcaster = this.upcasters.get(key)

if (upcaster) {

return this.upcast(upcaster(event)) // 再帰的に最新バージョンまで

}

return event

}

}

// 使用例:v1 -> v2変換

const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => ({

...event,

schemaVersion: 2,

payload: {

...event.payload,

currency: event.payload.currency || 'KRW', // デフォルト値追加

shippingMethod: 'STANDARD', // 新フィールドにデフォルト値設定

},

}))

プロダクションチェックリスト

Event Store

- [ ] イベントテーブルにaggregate_id + versionユニークインデックス設定

- [ ] 楽観的同時実行制御(Optimistic Concurrency)実装確認

- [ ] Snapshot戦略設定(50〜100イベントごと)

- [ ] イベントアーカイビングポリシー策定(コールドストレージ移行)

- [ ] Event Storeバックアップと復旧手順の検証

CQRS

- [ ] Read Model再構築(Rebuild)プロセス自動化

- [ ] 読み取り/書き込みDB分離とレプリケーション遅延モニタリング

- [ ] Projection失敗時のアラートと自動リトライ設定

- [ ] Eventual Consistency処理(UIでの楽観的更新)

メッセージブローカー

- [ ] Kafka:パーティショニングキーをアグリゲートIDに設定

- [ ] Consumer Group lagモニタリング(BurrowまたはKafka Exporter)

- [ ] Dead Letter Queue(DLQ)設定とモニタリング

- [ ] メッセージシリアライゼーション形式決定(Avro + Schema Registry推奨)

- [ ] ブローカークラスタレプリケーションファクター3以上設定

Saga

- [ ] 補償トランザクションの定義とテスト完了

- [ ] Saga状態永続化(DB保存)確認

- [ ] Sagaタイムアウト設定(無限待機防止)

- [ ] 補償失敗時の手動復旧プロセス文書化

イベントスキーマ

- [ ] Schema Registry導入(Confluent Schema Registry、AWS Glue)

- [ ] 下位互換性検証自動化(CIパイプラインに含める)

- [ ] イベントアップキャスター(Upcaster)実装

- [ ] イベントカタログ文書の維持

モニタリング

- [ ] イベント処理レイテンシメトリクス収集

- [ ] Consumer Group lagアラート設定

- [ ] イベント処理失敗率ダッシュボード構成

- [ ] Correlation IDベースのエンドツーエンドトレーシング

参考資料

- [Microsoft - CQRS Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs)

- [Microsoft - Event Sourcing Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)

- [Martin Fowler - Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html)

- [Martin Fowler - CQRS](https://martinfowler.com/bliki/CQRS.html)

- [Chris Richardson - Saga Pattern](https://microservices.io/patterns/data/saga.html)

- [Apache Kafka Documentation](https://kafka.apache.org/documentation/)

- [Confluent - Event-Driven Architecture](https://www.confluent.io/learn/event-driven-architecture/)

- [Greg Young - CQRS Documents](https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf)

- [Vaughn Vernon - Implementing Domain-Driven Design](https://www.oreilly.com/library/view/implementing-domain-driven-design/9780133039900/)

현재 단락 (1/784)

マイクロサービスアーキテクチャが一般化するにつれ、サービス間通信方式はシステムのスケーラビリティとレジリエンスを決定づける核心要素となった。同期REST呼び出しベースのアーキテクチャはサービス間の強い...

작성 글자: 0원문 글자: 22,665작성 단락: 0/784