Skip to content
Published on

Event-Driven Architecture 実践ガイド:CQRS、Event Sourcing、Sagaパターン

Authors
Event-Driven Architecture - CQRS, Event Sourcing, Saga

はじめに

マイクロサービスアーキテクチャにおける最大の課題の一つは、複数のサービスにまたがるデータ整合性とビジネストランザクションの管理である。従来のモノリスでは単一データベースのACIDトランザクションで解決できたが、サービスごとに独立したデータストアを持つ分散環境では2PC(Two-Phase Commit)の限界が明確になる。パフォーマンスボトルネック、可用性の低下、サービス間の強い結合が発生するからである。

Event-Driven Architecture(EDA)はこの問題に対する根本的な解決策を提示する。サービス間通信を非同期イベントベースに切り替え、状態変更をイベントストリームで管理し、分散トランザクションを補償ベースのサガで処理する。本記事ではEDAの三つの核心パターンであるCQRS、Event Sourcing、Sagaを実践コードとともに深く分析し、本番運用に必要な戦略とトラブルシューティング技法を扱う。

Netflixは2億6千万の加入者向けパーソナライゼーションシステムに、LinkedInは1日数兆個のイベント処理に、Slackは数十億件の日次メッセージ処理にこれらのパターンを活用している。これらの大規模システムの経験で検証されたパターンとアンチパターンを一緒に見ていこう。

Event-Driven Architectureの基礎

イベントの三つの類型

EDAにおけるイベントはその目的に応じて三つに分類される。

類型説明特徴
Domain Eventビジネスドメインで発生した事実の記録OrderPlaced, PaymentCompleted不変、過去形の命名
Integration Eventサービス間通信のためのイベントOrderPlacedIntegrationEvent境界コンテキストの境界を越える
Event Notification変更発生の通知(データ最小化)OrderStatusChanged(IDのみ)受信者が必要なデータを直接照会

核心原則

EDAを正しく実装するための核心原則は以下の通り。

  1. 非同期通信:プロデューサーとコンシューマーが時間的に分離される
  2. 疎結合:サービスはイベントスキーマのみ知っていればよく、相手サービスの実装を知る必要がない
  3. 結果整合性:即座の強い整合性の代わりに一定時間後に整合性が保証される
  4. 冪等性:同一イベントが複数回処理されても結果が同一でなければならない
// TypeScript - Domain Event basic structure
interface DomainEvent {
  eventId: string
  eventType: string
  aggregateId: string
  aggregateType: string
  timestamp: Date
  version: number
  payload: Record<string, unknown>
  metadata: {
    correlationId: string
    causationId: string
    userId?: string
  }
}

// Order creation event example
const orderPlacedEvent: DomainEvent = {
  eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
  eventType: 'OrderPlaced',
  aggregateId: 'order-12345',
  aggregateType: 'Order',
  timestamp: new Date('2026-03-14T09:00:00Z'),
  version: 1,
  payload: {
    customerId: 'cust-67890',
    items: [
      { productId: 'prod-001', quantity: 2, price: 29900 },
      { productId: 'prod-002', quantity: 1, price: 15000 },
    ],
    totalAmount: 74800,
    shippingAddress: {
      city: 'ソウル',
      district: '江南区',
      detail: 'テヘラン路123',
    },
  },
  metadata: {
    correlationId: 'corr-abc123',
    causationId: 'cmd-place-order-001',
    userId: 'user-admin-01',
  },
}

CQRSパターン:CommandとQueryの分離

CQRSとは

CQRS(Command Query Responsibility Segregation)はデータの書き込み(Command)と読み取り(Query)を別々のモデルに分離するパターンである。Bertrand MeyerのCQS(Command Query Separation)原則をアーキテクチャレベルに拡張したもので、Greg Youngが2010年に体系化した。

従来のCRUDモデルでは同一のデータモデルが読み取りと書き込みの両方を担当する。しかし実際のビジネスでは読み取りと書き込みの要件は大きく異なる。

観点Command(書き込み)Query(読み取り)
目的状態変更、ビジネスルール検証データ照会、画面表示
比率全体トラフィックの10-20%全体トラフィックの80-90%
整合性強い整合性が必要結果整合性で許容可能
モデル複雑度豊富なドメインロジック非正規化された照会モデル
スケーリング主に垂直スケーリング水平スケーリングが容易(キャッシュ、レプリカ)

CQRS実装:TypeScript例

// TypeScript - CQRS Command Side

// Command definition
interface PlaceOrderCommand {
  type: 'PlaceOrder'
  customerId: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
  shippingAddress: string
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private orderRepository: OrderWriteRepository,
    private eventBus: EventBus,
    private inventoryService: InventoryService
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // 1. ビジネスルール検証
    await this.inventoryService.validateStock(command.items)

    // 2. Aggregate作成
    const order = Order.create({
      customerId: command.customerId,
      items: command.items,
      shippingAddress: command.shippingAddress,
    })

    // 3. 書き込みストアに保存
    await this.orderRepository.save(order)

    // 4. ドメインイベント発行(読み取りモデル同期用)
    for (const event of order.getDomainEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }
}

// Query Side - 読み取り専用モデル
interface OrderReadModel {
  orderId: string
  customerName: string
  orderDate: string
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: string
}

// Query Handler
class GetOrdersQueryHandler {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: {
    customerId: string
    status?: string
    page: number
    limit: number
  }): Promise<OrderReadModel[]> {
    // 読み取り専用の非正規化テーブルから直接照会
    return this.readDb.query(
      `SELECT order_id, customer_name, order_date, status,
              total_amount, item_count, last_updated
       FROM order_read_model
       WHERE customer_id = ?
       ${query.status ? 'AND status = ?' : ''}
       ORDER BY order_date DESC
       LIMIT ? OFFSET ?`,
      [query.customerId, query.status, query.limit, query.page * query.limit]
    )
  }
}

読み取りモデルプロジェクション

イベントを受信して読み取りモデルを更新するプロジェクションロジックはCQRSの核心である。

// TypeScript - Event-based Projection

class OrderProjection {
  constructor(private readDb: ReadDatabase) {}

  async handle(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.onOrderPlaced(event)
        break
      case 'OrderShipped':
        await this.onOrderShipped(event)
        break
      case 'OrderCancelled':
        await this.onOrderCancelled(event)
        break
    }
  }

  private async onOrderPlaced(event: DomainEvent): Promise<void> {
    const payload = event.payload as {
      customerId: string
      items: Array<{ quantity: number; price: number }>
      totalAmount: number
    }

    await this.readDb.upsert('order_read_model', {
      order_id: event.aggregateId,
      customer_id: payload.customerId,
      status: 'PLACED',
      total_amount: payload.totalAmount,
      item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
      order_date: event.timestamp,
      last_updated: event.timestamp,
      version: event.version,
    })
  }

  private async onOrderShipped(event: DomainEvent): Promise<void> {
    const payload = event.payload as { trackingNumber: string }

    // 冪等性保証:バージョンチェック
    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'SHIPPED',
        tracking_number: payload.trackingNumber,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId, version: event.version - 1 }
    )
  }

  private async onOrderCancelled(event: DomainEvent): Promise<void> {
    const payload = event.payload as { reason: string }

    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'CANCELLED',
        cancellation_reason: payload.reason,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId }
    )
  }
}

Event Sourcing:イベントベースの状態管理

Event Sourcingの核心概念

Event Sourcingはアグリゲートの現在の状態を保存する代わりに、状態を変更したすべてのイベントを順序通りに保存するパターンである。現在の状態はイベントストリームを最初からリプレイして再構成する。

従来の方式とEvent Sourcingの違いを比較すると以下の通り。

観点従来のCRUDEvent Sourcing
保存対象現在の状態(最新スナップショット)状態変更イベントの全履歴
データ損失以前の状態は消失すべての変更履歴を保存
監査別途実装が必要基本内蔵
デバッグ現在の状態のみ確認可能タイムトラベル可能
ストレージ比較的少ないイベント累積で増加
照会性能直接照会可能プロジェクションまたはスナップショットが必要

Event Sourcing実装

// TypeScript - Event Sourcing Aggregate

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version++
    this.uncommittedEvents.push(event)
  }

  protected abstract when(event: DomainEvent): void

  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _items: OrderItem[] = []
  private _status: OrderStatus = OrderStatus.DRAFT
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
    const order = new Order()
    const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderPlaced',
      aggregateId: params.orderId,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: 1,
      payload: {
        customerId: params.customerId,
        items: params.items,
        totalAmount,
      },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'create',
      },
    })

    return order
  }

  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error(`Cannot confirm order in status: ${this._status}`)
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { confirmedAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'confirm',
      },
    })
  }

  cancel(reason: string): void {
    if (this._status === OrderStatus.CANCELLED) {
      return // 冪等性保証
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderCancelled',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { reason, cancelledAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'cancel',
      },
    })
  }

  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced': {
        const p = event.payload as {
          customerId: string
          items: OrderItem[]
          totalAmount: number
        }
        this._id = event.aggregateId
        this._customerId = p.customerId
        this._items = p.items
        this._totalAmount = p.totalAmount
        this._status = OrderStatus.PLACED
        break
      }
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

interface OrderItem {
  productId: string
  quantity: number
  price: number
}

スナップショット戦略

イベント数が増えるとリプレイ時間が長くなる。スナップショットは特定時点の状態をキャッシュしてリプレイ性能を改善する。

# Python - Snapshot-based Event Store

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json


@dataclass
class Snapshot:
    aggregate_id: str
    aggregate_type: str
    version: int
    state: dict
    created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
    SNAPSHOT_INTERVAL = 100  # 100イベントごとにスナップショット作成

    def __init__(self, db_connection):
        self.db = db_connection

    async def save_events(
        self, aggregate_id: str, events: list[dict], expected_version: int
    ) -> None:
        """楽観的同時実行制御とともにイベントを保存"""
        async with self.db.transaction():
            current_version = await self._get_current_version(aggregate_id)
            if current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, "
                    f"but current version is {current_version}"
                )

            for i, event in enumerate(events):
                version = expected_version + i + 1
                await self.db.execute(
                    """INSERT INTO event_store
                       (event_id, aggregate_id, aggregate_type,
                        event_type, version, payload, metadata, created_at)
                       VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                    (
                        event["event_id"],
                        aggregate_id,
                        event["aggregate_type"],
                        event["event_type"],
                        version,
                        json.dumps(event["payload"]),
                        json.dumps(event["metadata"]),
                        datetime.utcnow(),
                    ),
                )

            new_version = expected_version + len(events)
            if new_version % self.SNAPSHOT_INTERVAL == 0:
                await self._create_snapshot(aggregate_id, new_version)

    async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
        """スナップショットからイベントをロードして状態を復元"""
        snapshot = await self._get_latest_snapshot(aggregate_id)

        if snapshot:
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ? AND version > ?
                   ORDER BY version ASC""",
                (aggregate_id, snapshot.version),
            )
            return events, snapshot
        else:
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ?
                   ORDER BY version ASC""",
                (aggregate_id,),
            )
            return events, None

    async def _create_snapshot(
        self, aggregate_id: str, version: int
    ) -> None:
        """現在の状態をスナップショットとして保存"""
        events, _ = await self.load_aggregate(aggregate_id)
        aggregate = self._rebuild_aggregate(events)
        await self.db.execute(
            """INSERT INTO snapshots
               (aggregate_id, aggregate_type, version, state, created_at)
               VALUES (?, ?, ?, ?, ?)""",
            (
                aggregate_id,
                aggregate.aggregate_type,
                version,
                json.dumps(aggregate.to_dict()),
                datetime.utcnow(),
            ),
        )

    async def _get_current_version(self, aggregate_id: str) -> int:
        result = await self.db.fetch_one(
            "SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
            (aggregate_id,),
        )
        return result[0] if result[0] else 0


class ConcurrencyError(Exception):
    pass

Sagaパターン:分散トランザクション管理

Sagaパターンとは

Sagaパターンは分散環境で複数のサービスにまたがるビジネストランザクションを管理するパターンである。従来の分散トランザクション(2PC)と異なり、各サービスのローカルトランザクションを順次実行し、失敗時には既に完了したステップに対して補償トランザクションを実行して整合性を復元する。

注文処理Sagaの流れを例にすると以下の通り。

正常フロー

  1. 注文サービス:注文作成(OrderCreated)
  2. 決済サービス:決済処理(PaymentProcessed)
  3. 在庫サービス:在庫確保(InventoryReserved)
  4. 配送サービス:配送作成(ShipmentCreated)

失敗時の補償フロー(ステップ3の在庫確保失敗時):

  1. 決済サービス:決済返金(PaymentRefunded)-- 補償
  2. 注文サービス:注文キャンセル(OrderCancelled)-- 補償

Orchestrationベースのsaga実装

// TypeScript - Saga Orchestrator (Order Processing)

interface SagaStep {
  name: string
  action: () => Promise<void>
  compensation: () => Promise<void>
}

class OrderSagaOrchestrator {
  private completedSteps: SagaStep[] = []
  private sagaLog: SagaLogEntry[] = []

  constructor(
    private paymentService: PaymentService,
    private inventoryService: InventoryService,
    private shippingService: ShippingService,
    private sagaStore: SagaStore
  ) {}

  async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
    const sagaId = crypto.randomUUID()

    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        action: async () => {
          await this.paymentService.processPayment({
            orderId,
            amount: orderData.totalAmount,
            customerId: orderData.customerId,
          })
        },
        compensation: async () => {
          await this.paymentService.refundPayment({
            orderId,
            amount: orderData.totalAmount,
          })
        },
      },
      {
        name: 'ReserveInventory',
        action: async () => {
          await this.inventoryService.reserve({
            orderId,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.inventoryService.releaseReservation({
            orderId,
            items: orderData.items,
          })
        },
      },
      {
        name: 'CreateShipment',
        action: async () => {
          await this.shippingService.createShipment({
            orderId,
            address: orderData.shippingAddress,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.shippingService.cancelShipment({ orderId })
        },
      },
    ]

    try {
      for (const step of steps) {
        await this.logStep(sagaId, step.name, 'STARTED')

        try {
          await step.action()
          this.completedSteps.push(step)
          await this.logStep(sagaId, step.name, 'COMPLETED')
        } catch (error) {
          await this.logStep(sagaId, step.name, 'FAILED', error)
          await this.compensate(sagaId)
          return {
            success: false,
            sagaId,
            failedStep: step.name,
            error: (error as Error).message,
          }
        }
      }

      await this.sagaStore.markCompleted(sagaId)
      return { success: true, sagaId }
    } catch (compensationError) {
      await this.sagaStore.markRequiresIntervention(sagaId)
      throw new SagaCompensationFailedError(sagaId, compensationError as Error)
    }
  }

  private async compensate(sagaId: string): Promise<void> {
    const stepsToCompensate = [...this.completedSteps].reverse()

    for (const step of stepsToCompensate) {
      try {
        await this.logStep(sagaId, step.name, 'COMPENSATING')
        await step.compensation()
        await this.logStep(sagaId, step.name, 'COMPENSATED')
      } catch (error) {
        await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
        await this.sagaStore.enqueueRetry(sagaId, step.name)
        throw error
      }
    }
  }

  private async logStep(
    sagaId: string,
    stepName: string,
    status: string,
    error?: unknown
  ): Promise<void> {
    const entry: SagaLogEntry = {
      sagaId,
      stepName,
      status,
      timestamp: new Date(),
      error: error ? (error as Error).message : undefined,
    }
    this.sagaLog.push(entry)
    await this.sagaStore.appendLog(entry)
  }
}

interface SagaResult {
  success: boolean
  sagaId: string
  failedStep?: string
  error?: string
}

interface SagaLogEntry {
  sagaId: string
  stepName: string
  status: string
  timestamp: Date
  error?: string
}

Choreography vs Orchestration比較

Sagaパターンの二つの実装方式を詳細に比較する。

比較項目Choreography(コレオグラフィ)Orchestration(オーケストレーション)
制御方式分散 - 各サービスがイベントを発行/購読中央集中 - オーケストレーターがフロー制御
結合度低い(イベントスキーマのみ共有)中間(オーケストレーターが全サービスを知る必要)
可視性低い(フロー追跡が困難)高い(オーケストレーターで状態確認可能)
複雑度管理参加サービス増加時に急激に複雑線形に増加
単一障害点なしオーケストレーターがSPOFになりうる
補償ロジック各サービスに分散オーケストレーターに集中
テスト統合テストが困難オーケストレーター単体テストが容易
適切な規模2-4サービス参加の単純なワークフロー5個以上のサービスの複雑なワークフロー
代表ツールKafka, RabbitMQ, SNS/SQSTemporal, Camunda, AWS Step Functions

Choreographyパターンコード例

# Python - Choreography-based Saga (event subscription approach)

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json


class EventBus:
    """Simple in-memory event bus (use Kafka/RabbitMQ in production)"""

    def __init__(self):
        self._handlers: dict[str, list] = {}

    def subscribe(self, event_type: str, handler):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: dict):
        handlers = self._handlers.get(event_type, [])
        for handler in handlers:
            await handler(payload)


class PaymentService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        event_bus.subscribe("OrderCreated", self.on_order_created)
        event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)

    async def on_order_created(self, payload: dict):
        """注文作成時に決済処理"""
        try:
            order_id = payload["order_id"]
            amount = payload["total_amount"]

            payment_result = await self._process_payment(
                order_id, amount
            )

            await self.event_bus.publish("PaymentProcessed", {
                "order_id": order_id,
                "payment_id": payment_result["payment_id"],
                "amount": amount,
            })
        except Exception as e:
            await self.event_bus.publish("PaymentFailed", {
                "order_id": payload["order_id"],
                "reason": str(e),
            })

    async def on_inventory_failed(self, payload: dict):
        """在庫不足時に決済返金(補償トランザクション)"""
        order_id = payload["order_id"]
        await self._refund_payment(order_id)
        await self.event_bus.publish("PaymentRefunded", {
            "order_id": order_id,
        })

    async def _process_payment(self, order_id: str, amount: int) -> dict:
        return {"payment_id": f"pay-{order_id}"}

    async def _refund_payment(self, order_id: str) -> None:
        pass


class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        event_bus.subscribe("PaymentProcessed", self.on_payment_processed)

    async def on_payment_processed(self, payload: dict):
        """決済完了時に在庫確保"""
        try:
            order_id = payload["order_id"]
            await self._reserve_inventory(order_id)

            await self.event_bus.publish("InventoryReserved", {
                "order_id": order_id,
            })
        except InsufficientStockError:
            await self.event_bus.publish("InventoryReservationFailed", {
                "order_id": payload["order_id"],
                "reason": "insufficient_stock",
            })


class InsufficientStockError(Exception):
    pass

ハイブリッドアプローチ:いつどちらを選ぶか

実務では純粋なChoreographyやOrchestrationのどちらか一つだけを使うより、ワークフローの複雑度に応じて混合するのが効果的である。

  • Choreography選択:2-3サービスだけが参加する単純な通知、キャッシュ無効化、ログ収集など
  • Orchestration選択:決済-在庫-配送のように順序と補償が重要なコアビジネスフロー
  • ハイブリッド:コアビジネスフローはOrchestrationで、副作用(メール送信、通知、分析イベント)はChoreographyで処理

運用時の注意事項とトラブルシューティング

1. 冪等性の確保

分散環境でイベントは「少なくとも一回(at-least-once)」配信が保証されるため、同一イベントが重複処理される可能性がある。すべてのイベントハンドラーは必ず冪等性を保証しなければならない。

// TypeScript - Idempotency guarantee pattern

class IdempotentEventHandler {
  constructor(
    private processedEvents: ProcessedEventStore,
    private handler: EventHandler
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    const isProcessed = await this.processedEvents.exists(event.eventId)
    if (isProcessed) {
      console.log(`Event ${event.eventId} already processed, skipping`)
      return
    }

    try {
      await this.handler.handle(event)

      await this.processedEvents.markAsProcessed(event.eventId, {
        processedAt: new Date(),
        ttl: 60 * 60 * 24 * 7,
      })
    } catch (error) {
      throw error
    }
  }
}

class RedisProcessedEventStore implements ProcessedEventStore {
  constructor(private redis: Redis) {}

  async exists(eventId: string): Promise<boolean> {
    const result = await this.redis.exists(`processed:${eventId}`)
    return result === 1
  }

  async markAsProcessed(
    eventId: string,
    options: { processedAt: Date; ttl: number }
  ): Promise<void> {
    await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
  }
}

2. イベント順序の保証

Kafkaでイベント順序を保証するには、同一Aggregateのイベントが同一パーティションにルーティングされる必要がある。Aggregate IDをパーティションキーとして使用するのが一般的である。

3. スキーマ進化

イベントは不変であるため、スキーマ変更時には下位互換性を維持する必要がある。AvroやProtobufのようなスキーマレジストリを使用するか、アップキャスティングパターンを適用する。

// TypeScript - Event Upcaster (Schema Evolution)

class EventUpcaster {
  private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()

  register(
    eventType: string,
    fromVersion: number,
    upcaster: (event: DomainEvent) => DomainEvent
  ): void {
    if (!this.upcasters.has(eventType)) {
      this.upcasters.set(eventType, new Map())
    }
    this.upcasters.get(eventType)!.set(fromVersion, upcaster)
  }

  upcast(event: DomainEvent): DomainEvent {
    const typeUpcasters = this.upcasters.get(event.eventType)
    if (!typeUpcasters) return event

    let currentEvent = event
    let schemaVersion = (event.metadata as any).schemaVersion || 1

    while (typeUpcasters.has(schemaVersion)) {
      const upcasterFn = typeUpcasters.get(schemaVersion)!
      currentEvent = upcasterFn(currentEvent)
      schemaVersion++
    }

    return currentEvent
  }
}

const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => {
  const payload = event.payload as any
  return {
    ...event,
    payload: {
      ...payload,
      shippingAddress: {
        full: payload.shippingAddress,
        city: '',
        zipCode: '',
      },
    },
    metadata: {
      ...event.metadata,
      schemaVersion: 2,
    },
  }
})

4. モニタリング核心指標

EDAシステムの健全性を確認するために以下の指標をモニタリングすべきである。

指標説明警告閾値
Consumer Lagコンシューマーの処理遅延(未処理イベント数)1000件以上
Event Processing Latencyイベント発行から処理までの所要時間P99 5秒以上
Saga Completion RateSaga成功率99%未満
Compensation Failure Rate補償トランザクション失敗率0.1%以上
Projection Lag読み取りモデルと書き込みモデルの同期遅延30秒以上
Dead Letter Queue Size処理不可イベント数0件超過時即時アラート

まとめ

Event-Driven Architectureの三つの核心パターンであるCQRS、Event Sourcing、Sagaはそれぞれ独立しても強力だが、一緒に使うとマイクロサービスアーキテクチャのデータ整合性問題を根本的に解決する。

要点を整理すると以下の通り。

  1. CQRS:読み取りと書き込みの非対称な要件を認めて分離すること。80-90%を占める読み取りトラフィックを独立して最適化できる。

  2. Event Sourcing:現在の状態の代わりに変更履歴を保存すること。完全な監査追跡、タイムトラベルデバッグ、多様な読み取りモデル生成が可能になる。ただしスナップショット戦略とスキーマ進化戦略は最初から考慮すべきである。

  3. Sagaパターン:分散トランザクションを補償ベースで管理すること。単純なフローはChoreographyで、複雑なビジネスロジックはOrchestrationで実装しつつ、補償トランザクションの失敗まで備えた復旧戦略を必ず用意すべきである。

  4. イベントストア選択:純粋なEvent Sourcingが目的ならEventStoreDB、大容量ストリーミングならKafka、AWSサーバーレスならDynamoDBを基本に考慮しつつ、要件に応じて組み合わせることができる。

これらのパターンは強力だが、複雑さというコストが伴う。すべてのサービスに一括適用するより、ビジネス複雑度が高くデータ整合性が重要なコアドメインから段階的に導入するのが現実的な戦略である。運用モニタリング、冪等性保証、スキーマ進化戦略なしにこれらのパターンを導入すると、かえってシステムの安定性を損なう可能性があることを肝に銘じよう。

参考資料