Skip to content

Split View: Event-Driven Architecture 실전 가이드: CQRS, Event Sourcing, Saga 패턴

✨ Learn with Quiz
|

Event-Driven Architecture 실전 가이드: CQRS, Event Sourcing, Saga 패턴

Event-Driven Architecture - CQRS, Event Sourcing, Saga

들어가며

마이크로서비스 아키텍처에서 가장 큰 도전 중 하나는 여러 서비스에 걸친 데이터 일관성과 비즈니스 트랜잭션 관리다. 전통적인 모놀리스에서는 단일 데이터베이스의 ACID 트랜잭션으로 해결할 수 있었지만, 서비스별로 독립된 데이터 저장소를 가지는 분산 환경에서는 2PC(Two-Phase Commit)의 한계가 명확하다. 성능 병목, 가용성 저하, 그리고 서비스 간 강한 결합이 발생하기 때문이다.

Event-Driven Architecture(EDA)는 이 문제에 대한 근본적인 해법을 제시한다. 서비스 간 통신을 비동기 이벤트 기반으로 전환하고, 상태 변경을 이벤트 스트림으로 관리하며, 분산 트랜잭션을 보상 기반 사가로 처리한다. 이 글에서는 EDA의 세 가지 핵심 패턴인 CQRS, Event Sourcing, Saga를 실전 코드와 함께 깊이 있게 분석하고, 프로덕션 운영에서 필요한 전략과 트러블슈팅 기법을 다룬다.

Netflix는 2억 6천만 구독자를 위한 개인화 시스템에, LinkedIn은 하루 수조 개의 이벤트 처리에, Slack은 수십억 건의 일일 메시지 처리에 이 패턴들을 활용하고 있다. 이러한 대규모 시스템의 경험에서 검증된 패턴과 안티패턴을 함께 살펴보자.

Event-Driven Architecture 기초

이벤트의 세 가지 유형

EDA에서 이벤트는 그 목적에 따라 세 가지로 분류된다.

유형설명예시특징
Domain Event비즈니스 도메인에서 발생한 사실 기록OrderPlaced, PaymentCompleted불변, 과거형 명명
Integration Event서비스 간 통신을 위한 이벤트OrderPlacedIntegrationEvent바운디드 컨텍스트 경계를 넘음
Event Notification변경 발생 알림 (데이터 최소화)OrderStatusChanged (ID만 포함)수신자가 필요한 데이터를 직접 조회

핵심 원칙

EDA를 올바르게 구현하기 위한 핵심 원칙은 다음과 같다.

  1. 비동기 통신: 생산자(Producer)와 소비자(Consumer)가 시간적으로 분리된다
  2. 느슨한 결합: 서비스는 이벤트 스키마만 알면 되며, 상대 서비스의 구현을 알 필요가 없다
  3. 최종 일관성: 즉각적인 강한 일관성 대신 일정 시간 후 일관성이 보장된다
  4. 멱등성: 동일한 이벤트가 여러 번 처리되어도 결과가 동일해야 한다
// TypeScript - 도메인 이벤트 기본 구조
interface DomainEvent {
  eventId: string
  eventType: string
  aggregateId: string
  aggregateType: string
  timestamp: Date
  version: number
  payload: Record<string, unknown>
  metadata: {
    correlationId: string
    causationId: string
    userId?: string
  }
}

// 주문 생성 이벤트 예시
const orderPlacedEvent: DomainEvent = {
  eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
  eventType: 'OrderPlaced',
  aggregateId: 'order-12345',
  aggregateType: 'Order',
  timestamp: new Date('2026-03-14T09:00:00Z'),
  version: 1,
  payload: {
    customerId: 'cust-67890',
    items: [
      { productId: 'prod-001', quantity: 2, price: 29900 },
      { productId: 'prod-002', quantity: 1, price: 15000 },
    ],
    totalAmount: 74800,
    shippingAddress: {
      city: '서울',
      district: '강남구',
      detail: '테헤란로 123',
    },
  },
  metadata: {
    correlationId: 'corr-abc123',
    causationId: 'cmd-place-order-001',
    userId: 'user-admin-01',
  },
}

CQRS 패턴: Command와 Query의 분리

CQRS란 무엇인가

CQRS(Command Query Responsibility Segregation)는 데이터의 쓰기(Command)와 읽기(Query)를 별도의 모델로 분리하는 패턴이다. Bertrand Meyer의 CQS(Command Query Separation) 원칙을 아키텍처 수준으로 확장한 것으로, Greg Young이 2010년에 공식화했다.

전통적인 CRUD 모델에서는 동일한 데이터 모델이 읽기와 쓰기를 모두 담당한다. 그러나 실제 비즈니스에서 읽기와 쓰기의 요구사항은 크게 다르다.

관점Command (쓰기)Query (읽기)
목적상태 변경, 비즈니스 규칙 검증데이터 조회, 화면 표시
비율전체 트래픽의 10-20%전체 트래픽의 80-90%
일관성강한 일관성 필요최종 일관성 허용 가능
모델 복잡도도메인 로직이 풍부비정규화된 조회 모델
스케일링수직 확장 위주수평 확장 용이 (캐시, 리플리카)

CQRS 구현: TypeScript 예제

// TypeScript - CQRS Command 측

// Command 정의
interface PlaceOrderCommand {
  type: 'PlaceOrder'
  customerId: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
  shippingAddress: string
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private orderRepository: OrderWriteRepository,
    private eventBus: EventBus,
    private inventoryService: InventoryService
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // 1. 비즈니스 규칙 검증
    await this.inventoryService.validateStock(command.items)

    // 2. Aggregate 생성
    const order = Order.create({
      customerId: command.customerId,
      items: command.items,
      shippingAddress: command.shippingAddress,
    })

    // 3. 쓰기 저장소에 저장
    await this.orderRepository.save(order)

    // 4. 도메인 이벤트 발행 (읽기 모델 동기화용)
    for (const event of order.getDomainEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }
}

// Query 측 - 읽기 전용 모델
interface OrderReadModel {
  orderId: string
  customerName: string
  orderDate: string
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: string
}

// Query Handler
class GetOrdersQueryHandler {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: {
    customerId: string
    status?: string
    page: number
    limit: number
  }): Promise<OrderReadModel[]> {
    // 읽기 전용 비정규화 테이블에서 직접 조회
    return this.readDb.query(
      `SELECT order_id, customer_name, order_date, status,
              total_amount, item_count, last_updated
       FROM order_read_model
       WHERE customer_id = ?
       ${query.status ? 'AND status = ?' : ''}
       ORDER BY order_date DESC
       LIMIT ? OFFSET ?`,
      [query.customerId, query.status, query.limit, query.page * query.limit]
    )
  }
}

읽기 모델 프로젝션

이벤트를 수신하여 읽기 모델을 갱신하는 프로젝션(Projection) 로직은 CQRS의 핵심이다.

// TypeScript - 이벤트 기반 프로젝션

class OrderProjection {
  constructor(private readDb: ReadDatabase) {}

  async handle(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.onOrderPlaced(event)
        break
      case 'OrderShipped':
        await this.onOrderShipped(event)
        break
      case 'OrderCancelled':
        await this.onOrderCancelled(event)
        break
    }
  }

  private async onOrderPlaced(event: DomainEvent): Promise<void> {
    const payload = event.payload as {
      customerId: string
      items: Array<{ quantity: number; price: number }>
      totalAmount: number
    }

    await this.readDb.upsert('order_read_model', {
      order_id: event.aggregateId,
      customer_id: payload.customerId,
      status: 'PLACED',
      total_amount: payload.totalAmount,
      item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
      order_date: event.timestamp,
      last_updated: event.timestamp,
      version: event.version,
    })
  }

  private async onOrderShipped(event: DomainEvent): Promise<void> {
    const payload = event.payload as { trackingNumber: string }

    // 멱등성 보장: version 체크
    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'SHIPPED',
        tracking_number: payload.trackingNumber,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId, version: event.version - 1 }
    )
  }

  private async onOrderCancelled(event: DomainEvent): Promise<void> {
    const payload = event.payload as { reason: string }

    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'CANCELLED',
        cancellation_reason: payload.reason,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId }
    )
  }
}

Event Sourcing: 이벤트 기반 상태 관리

Event Sourcing 핵심 개념

Event Sourcing은 애그리게이트의 현재 상태를 저장하는 대신, 상태를 변경한 모든 이벤트를 순서대로 저장하는 패턴이다. 현재 상태는 이벤트 스트림을 처음부터 재생(replay)하여 재구성한다.

전통적 방식과 Event Sourcing의 차이를 비교하면 다음과 같다.

관점전통적 CRUDEvent Sourcing
저장 대상현재 상태 (최신 스냅샷)상태 변경 이벤트의 전체 이력
데이터 손실이전 상태 소실모든 변경 이력 보존
감사(Audit)별도 구현 필요기본 내장
디버깅현재 상태만 확인 가능시간 여행(Time Travel) 가능
저장 공간상대적으로 적음이벤트 누적으로 증가
조회 성능직접 조회 가능프로젝션 또는 스냅샷 필요

Event Sourcing 구현

// TypeScript - Event Sourcing Aggregate

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  // 이벤트 적용 (상태 변경)
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version++
    this.uncommittedEvents.push(event)
  }

  // 이벤트 핸들러 (서브클래스에서 구현)
  protected abstract when(event: DomainEvent): void

  // 이벤트 스트림에서 상태 복원
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

// 주문 Aggregate
class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _items: OrderItem[] = []
  private _status: OrderStatus = OrderStatus.DRAFT
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  // 팩토리 메서드 (생성 명령)
  static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
    const order = new Order()
    const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderPlaced',
      aggregateId: params.orderId,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: 1,
      payload: {
        customerId: params.customerId,
        items: params.items,
        totalAmount,
      },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'create',
      },
    })

    return order
  }

  // 주문 확인 명령
  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error(`Cannot confirm order in status: ${this._status}`)
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { confirmedAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'confirm',
      },
    })
  }

  // 주문 취소 명령 (보상 트랜잭션에서 활용)
  cancel(reason: string): void {
    if (this._status === OrderStatus.CANCELLED) {
      return // 멱등성 보장
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderCancelled',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { reason, cancelledAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'cancel',
      },
    })
  }

  // 이벤트 핸들러 - 상태 변경 로직
  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced': {
        const p = event.payload as {
          customerId: string
          items: OrderItem[]
          totalAmount: number
        }
        this._id = event.aggregateId
        this._customerId = p.customerId
        this._items = p.items
        this._totalAmount = p.totalAmount
        this._status = OrderStatus.PLACED
        break
      }
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

interface OrderItem {
  productId: string
  quantity: number
  price: number
}

스냅샷 전략

이벤트 수가 많아지면 재생 시간이 길어진다. 스냅샷은 특정 시점의 상태를 캐싱하여 재생 성능을 개선한다.

# Python - 스냅샷 기반 이벤트 스토어

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json


@dataclass
class Snapshot:
    aggregate_id: str
    aggregate_type: str
    version: int
    state: dict
    created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
    SNAPSHOT_INTERVAL = 100  # 100개 이벤트마다 스냅샷 생성

    def __init__(self, db_connection):
        self.db = db_connection

    async def save_events(
        self, aggregate_id: str, events: list[dict], expected_version: int
    ) -> None:
        """낙관적 동시성 제어와 함께 이벤트 저장"""
        async with self.db.transaction():
            # 현재 버전 확인 (낙관적 잠금)
            current_version = await self._get_current_version(aggregate_id)
            if current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, "
                    f"but current version is {current_version}"
                )

            # 이벤트 배치 삽입
            for i, event in enumerate(events):
                version = expected_version + i + 1
                await self.db.execute(
                    """INSERT INTO event_store
                       (event_id, aggregate_id, aggregate_type,
                        event_type, version, payload, metadata, created_at)
                       VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                    (
                        event["event_id"],
                        aggregate_id,
                        event["aggregate_type"],
                        event["event_type"],
                        version,
                        json.dumps(event["payload"]),
                        json.dumps(event["metadata"]),
                        datetime.utcnow(),
                    ),
                )

            # 스냅샷 생성 조건 확인
            new_version = expected_version + len(events)
            if new_version % self.SNAPSHOT_INTERVAL == 0:
                await self._create_snapshot(aggregate_id, new_version)

    async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
        """스냅샷부터 이벤트를 로드하여 상태 복원"""
        # 1. 가장 최근 스냅샷 조회
        snapshot = await self._get_latest_snapshot(aggregate_id)

        if snapshot:
            # 2. 스냅샷 이후 이벤트만 로드
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ? AND version > ?
                   ORDER BY version ASC""",
                (aggregate_id, snapshot.version),
            )
            return events, snapshot
        else:
            # 3. 스냅샷 없으면 전체 이벤트 로드
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ?
                   ORDER BY version ASC""",
                (aggregate_id,),
            )
            return events, None

    async def _create_snapshot(
        self, aggregate_id: str, version: int
    ) -> None:
        """현재 상태를 스냅샷으로 저장"""
        events, _ = await self.load_aggregate(aggregate_id)
        # Aggregate 재구성 후 상태 직렬화
        aggregate = self._rebuild_aggregate(events)
        await self.db.execute(
            """INSERT INTO snapshots
               (aggregate_id, aggregate_type, version, state, created_at)
               VALUES (?, ?, ?, ?, ?)""",
            (
                aggregate_id,
                aggregate.aggregate_type,
                version,
                json.dumps(aggregate.to_dict()),
                datetime.utcnow(),
            ),
        )

    async def _get_current_version(self, aggregate_id: str) -> int:
        result = await self.db.fetch_one(
            "SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
            (aggregate_id,),
        )
        return result[0] if result[0] else 0


class ConcurrencyError(Exception):
    pass

Saga 패턴: 분산 트랜잭션 관리

Saga 패턴이란

Saga 패턴은 분산 환경에서 여러 서비스에 걸친 비즈니스 트랜잭션을 관리하는 패턴이다. 전통적인 분산 트랜잭션(2PC)과 달리, 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 이미 완료된 단계에 대해 보상 트랜잭션(Compensating Transaction)을 실행하여 일관성을 복원한다.

주문 처리 Saga의 흐름을 예로 들면 다음과 같다.

정상 흐름:

  1. 주문 서비스: 주문 생성 (OrderCreated)
  2. 결제 서비스: 결제 처리 (PaymentProcessed)
  3. 재고 서비스: 재고 차감 (InventoryReserved)
  4. 배송 서비스: 배송 생성 (ShipmentCreated)

실패 시 보상 흐름 (3단계 재고 차감 실패 시):

  1. 결제 서비스: 결제 환불 (PaymentRefunded) -- 보상
  2. 주문 서비스: 주문 취소 (OrderCancelled) -- 보상

Orchestration 기반 Saga 구현

// TypeScript - Saga Orchestrator (주문 처리)

interface SagaStep {
  name: string
  action: () => Promise<void>
  compensation: () => Promise<void>
}

class OrderSagaOrchestrator {
  private completedSteps: SagaStep[] = []
  private sagaLog: SagaLogEntry[] = []

  constructor(
    private paymentService: PaymentService,
    private inventoryService: InventoryService,
    private shippingService: ShippingService,
    private sagaStore: SagaStore
  ) {}

  async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
    const sagaId = crypto.randomUUID()

    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        action: async () => {
          await this.paymentService.processPayment({
            orderId,
            amount: orderData.totalAmount,
            customerId: orderData.customerId,
          })
        },
        compensation: async () => {
          await this.paymentService.refundPayment({
            orderId,
            amount: orderData.totalAmount,
          })
        },
      },
      {
        name: 'ReserveInventory',
        action: async () => {
          await this.inventoryService.reserve({
            orderId,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.inventoryService.releaseReservation({
            orderId,
            items: orderData.items,
          })
        },
      },
      {
        name: 'CreateShipment',
        action: async () => {
          await this.shippingService.createShipment({
            orderId,
            address: orderData.shippingAddress,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.shippingService.cancelShipment({ orderId })
        },
      },
    ]

    try {
      for (const step of steps) {
        await this.logStep(sagaId, step.name, 'STARTED')

        try {
          await step.action()
          this.completedSteps.push(step)
          await this.logStep(sagaId, step.name, 'COMPLETED')
        } catch (error) {
          await this.logStep(sagaId, step.name, 'FAILED', error)
          // 보상 트랜잭션 실행
          await this.compensate(sagaId)
          return {
            success: false,
            sagaId,
            failedStep: step.name,
            error: (error as Error).message,
          }
        }
      }

      await this.sagaStore.markCompleted(sagaId)
      return { success: true, sagaId }
    } catch (compensationError) {
      // 보상 트랜잭션도 실패한 경우 - 수동 개입 필요
      await this.sagaStore.markRequiresIntervention(sagaId)
      throw new SagaCompensationFailedError(sagaId, compensationError as Error)
    }
  }

  private async compensate(sagaId: string): Promise<void> {
    // 완료된 단계를 역순으로 보상
    const stepsToCompensate = [...this.completedSteps].reverse()

    for (const step of stepsToCompensate) {
      try {
        await this.logStep(sagaId, step.name, 'COMPENSATING')
        await step.compensation()
        await this.logStep(sagaId, step.name, 'COMPENSATED')
      } catch (error) {
        await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
        // 보상 실패 시 재시도 큐에 등록
        await this.sagaStore.enqueueRetry(sagaId, step.name)
        throw error
      }
    }
  }

  private async logStep(
    sagaId: string,
    stepName: string,
    status: string,
    error?: unknown
  ): Promise<void> {
    const entry: SagaLogEntry = {
      sagaId,
      stepName,
      status,
      timestamp: new Date(),
      error: error ? (error as Error).message : undefined,
    }
    this.sagaLog.push(entry)
    await this.sagaStore.appendLog(entry)
  }
}

interface SagaResult {
  success: boolean
  sagaId: string
  failedStep?: string
  error?: string
}

interface SagaLogEntry {
  sagaId: string
  stepName: string
  status: string
  timestamp: Date
  error?: string
}

Choreography vs Orchestration 비교

Saga 패턴의 두 가지 구현 방식을 상세히 비교한다.

비교 항목Choreography (안무)Orchestration (오케스트레이션)
제어 방식분산 - 각 서비스가 이벤트를 발행/구독중앙 집중 - 오케스트레이터가 흐름 제어
결합도낮음 (이벤트 스키마만 공유)중간 (오케스트레이터가 모든 서비스를 알아야 함)
가시성낮음 (흐름 추적 어려움)높음 (오케스트레이터에서 상태 확인 가능)
복잡도 관리참여 서비스 증가 시 급격히 복잡선형적으로 증가
단일 장애점없음오케스트레이터가 SPOF가 될 수 있음
보상 로직각 서비스에 분산오케스트레이터에 집중
테스트통합 테스트 어려움오케스트레이터 단위 테스트 용이
적합한 규모2-4개 서비스 참여하는 단순 워크플로5개 이상 서비스의 복잡한 워크플로
대표 도구Kafka, RabbitMQ, SNS/SQSTemporal, Camunda, AWS Step Functions

Choreography 패턴 코드 예시

# Python - Choreography 기반 Saga (이벤트 구독 방식)

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json


class EventBus:
    """간단한 인메모리 이벤트 버스 (실무에서는 Kafka/RabbitMQ 사용)"""

    def __init__(self):
        self._handlers: dict[str, list] = {}

    def subscribe(self, event_type: str, handler):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: dict):
        handlers = self._handlers.get(event_type, [])
        for handler in handlers:
            await handler(payload)


# 결제 서비스 - Choreography 방식
class PaymentService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # 주문 생성 이벤트 구독
        event_bus.subscribe("OrderCreated", self.on_order_created)
        # 재고 부족 이벤트 구독 (보상용)
        event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)

    async def on_order_created(self, payload: dict):
        """주문 생성 시 결제 처리"""
        try:
            order_id = payload["order_id"]
            amount = payload["total_amount"]

            # 결제 처리 로직
            payment_result = await self._process_payment(
                order_id, amount
            )

            # 성공 이벤트 발행
            await self.event_bus.publish("PaymentProcessed", {
                "order_id": order_id,
                "payment_id": payment_result["payment_id"],
                "amount": amount,
            })
        except Exception as e:
            # 실패 이벤트 발행
            await self.event_bus.publish("PaymentFailed", {
                "order_id": payload["order_id"],
                "reason": str(e),
            })

    async def on_inventory_failed(self, payload: dict):
        """재고 부족 시 결제 환불 (보상 트랜잭션)"""
        order_id = payload["order_id"]
        await self._refund_payment(order_id)
        await self.event_bus.publish("PaymentRefunded", {
            "order_id": order_id,
        })

    async def _process_payment(self, order_id: str, amount: int) -> dict:
        # 실제 결제 처리 로직
        return {"payment_id": f"pay-{order_id}"}

    async def _refund_payment(self, order_id: str) -> None:
        # 실제 환불 처리 로직
        pass


# 재고 서비스 - Choreography 방식
class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # 결제 완료 이벤트 구독
        event_bus.subscribe("PaymentProcessed", self.on_payment_processed)

    async def on_payment_processed(self, payload: dict):
        """결제 완료 시 재고 차감"""
        try:
            order_id = payload["order_id"]
            # 재고 확인 및 차감
            await self._reserve_inventory(order_id)

            await self.event_bus.publish("InventoryReserved", {
                "order_id": order_id,
            })
        except InsufficientStockError:
            await self.event_bus.publish("InventoryReservationFailed", {
                "order_id": payload["order_id"],
                "reason": "insufficient_stock",
            })


class InsufficientStockError(Exception):
    pass

하이브리드 접근: 언제 어떤 방식을 선택할 것인가

실무에서는 순수한 Choreography나 Orchestration 하나만 사용하기보다, 워크플로의 복잡도에 따라 혼합하는 것이 효과적이다.

  • Choreography 선택: 2-3개 서비스만 참여하는 단순 알림, 캐시 무효화, 로그 수집 등
  • Orchestration 선택: 결제-재고-배송처럼 순서와 보상이 중요한 핵심 비즈니스 플로우
  • 하이브리드: 핵심 비즈니스 플로우는 Orchestration으로, 부수 효과(이메일 발송, 알림, 분석 이벤트)는 Choreography로 처리

이벤트 스토어 선택 (EventStoreDB, Kafka, DynamoDB)

이벤트 스토어의 선택은 시스템의 요구사항과 팀의 역량에 따라 달라진다. 주요 세 가지 옵션을 비교한다.

비교 항목EventStoreDBApache KafkaDynamoDB Streams
설계 목적Event Sourcing 전용 DB분산 메시지 스트리밍 플랫폼범용 NoSQL + 변경 스트림
스트림 모델세분화된 개별 스트림 (Aggregate 별)토픽-파티션 기반테이블 + DynamoDB Streams/Kinesis
동시성 제어ExpectedVersion (낙관적 잠금) 내장파티션 수준 순서만 보장조건부 쓰기 (ConditionExpression)
ID별 조회스트림 ID로 즉시 조회토픽 내 특정 엔티티 조회 불가파티션 키로 직접 조회
이벤트 순서스트림 내 완전 보장파티션 내에서만 보장파티션 키 내 보장
프로젝션서버 사이드 프로젝션 내장Kafka Streams / ksqlDB로 구현Lambda + DynamoDB Streams
구독 모델Persistent / Catch-up 구독Consumer GroupDynamoDB Streams / Kinesis
운영 복잡도중간 (전용 클러스터)높음 (ZooKeeper/KRaft, 파티션 관리)낮음 (서버리스, AWS 관리형)
비용 모델오픈소스 + 상용 클라우드오픈소스 + MSK/Confluent Cloud요청/저장량 기반 종량제
최적 사용처순수 Event Sourcing 시스템대용량 이벤트 스트리밍 + 통합AWS 네이티브, 서버리스 아키텍처

선택 기준 가이드

EventStoreDB가 적합한 경우:

  • Event Sourcing이 핵심 아키텍처 패턴인 경우
  • 세밀한 스트림 관리와 프로젝션이 필요한 경우
  • DDD 기반 설계를 적극 활용하는 팀

Kafka가 적합한 경우:

  • 대용량 이벤트 스트리밍이 주 목적인 경우
  • 이미 Kafka 인프라가 있고 팀 역량이 충분한 경우
  • Event Sourcing과 이벤트 스트리밍을 모두 해야 하는 경우 (EventStoreDB + Kafka 조합 고려)

DynamoDB가 적합한 경우:

  • AWS 생태계를 주로 사용하는 경우
  • 서버리스 아키텍처를 지향하는 경우
  • 운영 부담을 최소화하고 싶은 경우

EventStoreDB 설정 예시

# docker-compose.yml - EventStoreDB 클러스터 설정
version: '3.8'
services:
  eventstoredb:
    image: eventstore/eventstore:24.2
    container_name: eventstoredb
    environment:
      - EVENTSTORE_CLUSTER_SIZE=1
      - EVENTSTORE_RUN_PROJECTIONS=All
      - EVENTSTORE_START_STANDARD_PROJECTIONS=true
      - EVENTSTORE_INSECURE=true
      - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
      - EVENTSTORE_MEM_DB=false
      - EVENTSTORE_DB=/var/lib/eventstore-data
      - EVENTSTORE_INDEX=/var/lib/eventstore-index
      - EVENTSTORE_LOG=/var/log/eventstore
    ports:
      - '2113:2113' # HTTP/gRPC
      - '1113:1113' # TCP (레거시)
    volumes:
      - eventstore-data:/var/lib/eventstore-data
      - eventstore-index:/var/lib/eventstore-index
      - eventstore-logs:/var/log/eventstore

  # Kafka - 이벤트 스트리밍용 (EventStoreDB와 조합)
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    container_name: kafka
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    ports:
      - '9092:9092'
    volumes:
      - kafka-data:/var/lib/kafka/data

volumes:
  eventstore-data:
  eventstore-index:
  eventstore-logs:
  kafka-data:

실전 구현: 주문 시스템 예제

전자상거래 주문 시스템을 통해 CQRS, Event Sourcing, Saga 패턴을 통합 구현하는 예제를 살펴본다.

시스템 아키텍처 개요

전체 시스템은 다음과 같은 구조로 구성된다.

  • 주문 서비스: 주문 Aggregate (Event Sourcing), CQRS 적용
  • 결제 서비스: 결제 처리, 환불 보상 트랜잭션
  • 재고 서비스: 재고 예약, 해제 보상 트랜잭션
  • 배송 서비스: 배송 생성, 취소 보상 트랜잭션
  • Saga 오케스트레이터: Temporal 기반 워크플로 관리

Temporal 기반 Saga Workflow

// TypeScript - Temporal Workflow로 구현한 주문 Saga

import { proxyActivities, defineSignal, setHandler, condition, sleep } from '@temporalio/workflow'

// Activity 프록시
const payment = proxyActivities<PaymentActivities>({
  startToCloseTimeout: '30s',
  retry: {
    maximumAttempts: 3,
    initialInterval: '1s',
    backoffCoefficient: 2,
    maximumInterval: '30s',
  },
})

const inventory = proxyActivities<InventoryActivities>({
  startToCloseTimeout: '10s',
  retry: { maximumAttempts: 3 },
})

const shipping = proxyActivities<ShippingActivities>({
  startToCloseTimeout: '15s',
  retry: { maximumAttempts: 3 },
})

const notification = proxyActivities<NotificationActivities>({
  startToCloseTimeout: '5s',
  retry: { maximumAttempts: 5 },
})

// 시그널 정의 (외부에서 워크플로에 메시지 전달)
const cancelOrderSignal = defineSignal<[string]>('cancelOrder')

// 주문 처리 Saga Workflow
export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {
  let isCancelled = false
  let cancelReason = ''

  // 취소 시그널 핸들러
  setHandler(cancelOrderSignal, (reason: string) => {
    isCancelled = true
    cancelReason = reason
  })

  const compensations: Array<() => Promise<void>> = []

  try {
    // Step 1: 결제 처리
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const paymentResult = await payment.processPayment({
      orderId: input.orderId,
      amount: input.totalAmount,
      customerId: input.customerId,
    })

    compensations.push(async () => {
      await payment.refundPayment({
        orderId: input.orderId,
        paymentId: paymentResult.paymentId,
        amount: input.totalAmount,
      })
    })

    // Step 2: 재고 예약
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    await inventory.reserveInventory({
      orderId: input.orderId,
      items: input.items,
    })

    compensations.push(async () => {
      await inventory.releaseReservation({
        orderId: input.orderId,
        items: input.items,
      })
    })

    // Step 3: 배송 생성
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const shipmentResult = await shipping.createShipment({
      orderId: input.orderId,
      address: input.shippingAddress,
      items: input.items,
    })

    compensations.push(async () => {
      await shipping.cancelShipment({
        orderId: input.orderId,
        shipmentId: shipmentResult.shipmentId,
      })
    })

    // 모든 단계 성공 - 확인 알림 발송 (실패해도 Saga에 영향 없음)
    await notification
      .sendOrderConfirmation({
        orderId: input.orderId,
        customerId: input.customerId,
      })
      .catch(() => {
        /* 알림 실패는 무시 */
      })

    return {
      success: true,
      orderId: input.orderId,
      paymentId: paymentResult.paymentId,
      shipmentId: shipmentResult.shipmentId,
    }
  } catch (error) {
    // 보상 트랜잭션 실행 (역순)
    for (const compensate of compensations.reverse()) {
      try {
        await compensate()
      } catch (compError) {
        // Temporal이 자동으로 재시도 관리
        // 최종 실패 시 Dead Letter Queue로 전송
        console.error('Compensation failed:', compError)
      }
    }

    return {
      success: false,
      orderId: input.orderId,
      error: (error as Error).message,
    }
  }
}

interface OrderSagaInput {
  orderId: string
  customerId: string
  totalAmount: number
  shippingAddress: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
}

interface OrderSagaResult {
  success: boolean
  orderId: string
  paymentId?: string
  shipmentId?: string
  error?: string
}

class SagaCancelledError extends Error {
  constructor(reason: string) {
    super(`Saga cancelled: ${reason}`)
    this.name = 'SagaCancelledError'
  }
}

DynamoDB 기반 이벤트 스토어 스키마

# AWS CloudFormation - DynamoDB 이벤트 스토어 테이블

AWSTemplateFormatVersion: '2010-09-09'
Description: Event Store on DynamoDB

Resources:
  EventStoreTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
        - AttributeName: eventType
          AttributeType: S
        - AttributeName: timestamp
          AttributeType: S
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      GlobalSecondaryIndexes:
        - IndexName: eventType-timestamp-index
          KeySchema:
            - AttributeName: eventType
              KeyType: HASH
            - AttributeName: timestamp
              KeyType: RANGE
          Projection:
            ProjectionType: ALL
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      Tags:
        - Key: Environment
          Value: production
        - Key: Service
          Value: event-store

  SnapshotTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store-snapshots
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      Tags:
        - Key: Environment
          Value: production

  SagaStateTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: saga-state
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: sagaId
          AttributeType: S
        - AttributeName: createdAt
          AttributeType: S
      KeySchema:
        - AttributeName: sagaId
          KeyType: HASH
      GlobalSecondaryIndexes:
        - IndexName: createdAt-index
          KeySchema:
            - AttributeName: createdAt
              KeyType: HASH
          Projection:
            ProjectionType: ALL
      TimeToLiveSpecification:
        AttributeName: ttl
        Enabled: true

운영 시 주의사항과 트러블슈팅

1. 멱등성(Idempotency) 확보

분산 환경에서 이벤트는 "최소 한 번(at-least-once)" 전달이 보장되므로, 동일한 이벤트가 중복 처리될 수 있다. 모든 이벤트 핸들러는 반드시 멱등성을 보장해야 한다.

// TypeScript - 멱등성 보장 패턴

class IdempotentEventHandler {
  constructor(
    private processedEvents: ProcessedEventStore,
    private handler: EventHandler
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    // 이미 처리된 이벤트인지 확인
    const isProcessed = await this.processedEvents.exists(event.eventId)
    if (isProcessed) {
      console.log(`Event ${event.eventId} already processed, skipping`)
      return
    }

    try {
      // 이벤트 처리
      await this.handler.handle(event)

      // 처리 완료 기록 (TTL 설정으로 자동 정리)
      await this.processedEvents.markAsProcessed(event.eventId, {
        processedAt: new Date(),
        ttl: 60 * 60 * 24 * 7, // 7일 후 만료
      })
    } catch (error) {
      // 처리 실패 시 기록하지 않아 재시도 가능
      throw error
    }
  }
}

// Redis 기반 중복 검사
class RedisProcessedEventStore implements ProcessedEventStore {
  constructor(private redis: Redis) {}

  async exists(eventId: string): Promise<boolean> {
    const result = await this.redis.exists(`processed:${eventId}`)
    return result === 1
  }

  async markAsProcessed(
    eventId: string,
    options: { processedAt: Date; ttl: number }
  ): Promise<void> {
    await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
  }
}

2. 이벤트 순서 보장

Kafka에서 이벤트 순서를 보장하려면 동일한 Aggregate의 이벤트가 동일한 파티션에 라우팅되어야 한다. Aggregate ID를 파티션 키로 사용하는 것이 일반적이다.

3. 스키마 진화(Schema Evolution)

이벤트는 불변이므로, 스키마 변경 시 하위 호환성을 유지해야 한다. Avro, Protobuf 같은 스키마 레지스트리를 사용하거나 업캐스팅(Upcasting) 패턴을 적용한다.

// TypeScript - 이벤트 업캐스터 (Schema Evolution)

class EventUpcaster {
  private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()

  register(
    eventType: string,
    fromVersion: number,
    upcaster: (event: DomainEvent) => DomainEvent
  ): void {
    if (!this.upcasters.has(eventType)) {
      this.upcasters.set(eventType, new Map())
    }
    this.upcasters.get(eventType)!.set(fromVersion, upcaster)
  }

  upcast(event: DomainEvent): DomainEvent {
    const typeUpcasters = this.upcasters.get(event.eventType)
    if (!typeUpcasters) return event

    let currentEvent = event
    let schemaVersion = (event.metadata as any).schemaVersion || 1

    while (typeUpcasters.has(schemaVersion)) {
      const upcasterFn = typeUpcasters.get(schemaVersion)!
      currentEvent = upcasterFn(currentEvent)
      schemaVersion++
    }

    return currentEvent
  }
}

// 사용 예: OrderPlaced 이벤트 v1 -> v2 업캐스팅
const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => {
  // v1에는 shippingAddress가 문자열이었지만
  // v2에서는 구조화된 객체로 변경
  const payload = event.payload as any
  return {
    ...event,
    payload: {
      ...payload,
      shippingAddress: {
        full: payload.shippingAddress,
        city: '',
        zipCode: '',
      },
    },
    metadata: {
      ...event.metadata,
      schemaVersion: 2,
    },
  }
})

4. 모니터링 핵심 지표

EDA 시스템의 건강 상태를 확인하기 위해 다음 지표를 모니터링해야 한다.

지표설명경고 임계값
Consumer Lag소비자의 처리 지연 (미처리 이벤트 수)1000건 이상
Event Processing Latency이벤트 발행에서 처리까지 소요 시간P99 5초 이상
Saga Completion RateSaga 성공률99% 미만
Compensation Failure Rate보상 트랜잭션 실패율0.1% 이상
Projection Lag읽기 모델과 쓰기 모델의 동기화 지연30초 이상
Dead Letter Queue Size처리 불가 이벤트 수0건 초과 시 즉시 알림

실패 사례와 복구 절차

사례 1: 보상 트랜잭션 실패 (가장 위험한 시나리오)

문제 상황: 결제는 완료되었으나 재고 차감이 실패하여 보상(환불)을 시도했지만, 결제 게이트웨이의 타임아웃으로 환불도 실패한 경우

복구 절차:

  1. Saga 상태를 "REQUIRES_INTERVENTION"으로 표시
  2. Dead Letter Queue에 실패한 보상 트랜잭션을 등록
  3. 자동 재시도 (지수 백오프): 1초, 2초, 4초, 8초, 16초
  4. 최대 재시도 횟수 초과 시 운영팀에 PagerDuty/Slack 알림
  5. 운영자가 수동으로 결제 게이트웨이 콘솔에서 환불 처리
  6. Saga 상태를 "MANUALLY_COMPENSATED"로 갱신

사례 2: 이벤트 순서 역전

문제 상황: 네트워크 지연으로 OrderCancelled 이벤트가 OrderConfirmed보다 먼저 도착

예방 및 복구:

  • 이벤트에 version 필드를 포함하여 순서 검증
  • 버전이 기대치와 다르면 버퍼링 후 재정렬
  • EventStoreDB를 사용하면 스트림 내 순서가 보장되므로 이 문제가 원천 방지됨

사례 3: 프로젝션 장애로 인한 읽기 모델 불일치

문제 상황: 프로젝션 프로세스가 크래시되어 읽기 모델이 최신 상태를 반영하지 못하는 경우

복구 절차:

  1. 프로젝션 프로세스의 마지막 처리 체크포인트 확인
  2. 체크포인트 이후의 이벤트부터 프로젝션 재실행
  3. 심각한 불일치의 경우 읽기 모델을 Drop 후 전체 이벤트 재생(Replay)
  4. 재생 중에는 읽기 트래픽을 캐시 또는 쓰기 DB로 우회
# Python - 프로젝션 복구 스크립트

import asyncio
from datetime import datetime


class ProjectionRecovery:
    def __init__(self, event_store, read_db, projection):
        self.event_store = event_store
        self.read_db = read_db
        self.projection = projection

    async def recover_from_checkpoint(self) -> dict:
        """체크포인트 기반 프로젝션 복구"""
        # 1. 마지막 체크포인트 확인
        checkpoint = await self.read_db.get_checkpoint(
            self.projection.name
        )
        last_position = checkpoint.get("position", 0) if checkpoint else 0

        print(
            f"Recovering projection '{self.projection.name}' "
            f"from position {last_position}"
        )

        # 2. 체크포인트 이후 이벤트 로드
        events = await self.event_store.read_all_from(last_position)
        processed = 0
        errors = 0

        for event in events:
            try:
                await self.projection.handle(event)
                processed += 1

                # 100건마다 체크포인트 갱신
                if processed % 100 == 0:
                    await self.read_db.save_checkpoint(
                        self.projection.name,
                        {"position": event["global_position"]},
                    )
            except Exception as e:
                errors += 1
                print(
                    f"Error processing event "
                    f"{event['event_id']}: {e}"
                )
                # 에러 로깅 후 계속 진행 (skip & log)
                continue

        # 3. 최종 체크포인트 저장
        if events:
            await self.read_db.save_checkpoint(
                self.projection.name,
                {"position": events[-1]["global_position"]},
            )

        return {
            "projection": self.projection.name,
            "processed": processed,
            "errors": errors,
            "recovered_at": datetime.utcnow().isoformat(),
        }

    async def full_rebuild(self) -> dict:
        """읽기 모델 전체 재구축"""
        print(
            f"Full rebuild of projection '{self.projection.name}'"
        )

        # 1. 기존 읽기 모델 삭제
        await self.read_db.drop_projection_data(self.projection.name)

        # 2. 체크포인트 초기화
        await self.read_db.save_checkpoint(
            self.projection.name, {"position": 0}
        )

        # 3. 전체 이벤트 재생
        return await self.recover_from_checkpoint()

사례 4: 이벤트 스토어 디스크 부족

문제 상황: 이벤트가 지속적으로 누적되어 디스크 용량이 부족해지는 경우

예방 전략:

  • 스냅샷 생성 후 오래된 이벤트를 아카이브(S3/Glacier)로 이동
  • 이벤트 보관 정책(Retention Policy) 수립: 핫 데이터(30일), 웜 데이터(1년), 콜드 데이터(영구 아카이브)
  • 디스크 사용률 80% 경고, 90% 긴급 알림 설정

마치며

Event-Driven Architecture의 세 가지 핵심 패턴인 CQRS, Event Sourcing, Saga는 각각 독립적으로도 강력하지만, 함께 사용할 때 마이크로서비스 아키텍처의 데이터 일관성 문제를 근본적으로 해결한다.

핵심 요점을 정리하면 다음과 같다.

  1. CQRS: 읽기와 쓰기의 비대칭적 요구사항을 인정하고 분리하라. 80-90%를 차지하는 읽기 트래픽을 독립적으로 최적화할 수 있다.

  2. Event Sourcing: 현재 상태 대신 변경 이력을 저장하라. 완전한 감사 추적, 시간 여행 디버깅, 다양한 읽기 모델 생성이 가능해진다. 다만 스냅샷 전략과 스키마 진화 전략은 처음부터 고려해야 한다.

  3. Saga 패턴: 분산 트랜잭션은 보상 기반으로 관리하라. 단순한 흐름은 Choreography로, 복잡한 비즈니스 로직은 Orchestration으로 구현하되, 보상 트랜잭션의 실패까지 대비한 복구 전략을 반드시 마련해야 한다.

  4. 이벤트 스토어 선택: 순수 Event Sourcing이 목적이면 EventStoreDB, 대용량 스트리밍이면 Kafka, AWS 서버리스면 DynamoDB를 기본으로 고려하되, 요구사항에 따라 조합할 수 있다.

이 패턴들은 강력하지만, 복잡성이라는 비용이 따른다. 모든 서비스에 일괄 적용하기보다, 비즈니스 복잡도가 높고 데이터 일관성이 중요한 핵심 도메인부터 점진적으로 도입하는 것이 현실적인 전략이다. 운영 모니터링, 멱등성 보장, 스키마 진화 전략 없이 이 패턴들을 도입하면 오히려 시스템의 안정성을 해칠 수 있음을 명심하자.

참고자료

Event-Driven Architecture Practical Guide: CQRS, Event Sourcing, Saga Patterns

Event-Driven Architecture - CQRS, Event Sourcing, Saga

Introduction

One of the biggest challenges in microservice architecture is managing data consistency and business transactions across multiple services. In a traditional monolith, this could be solved with a single database's ACID transactions, but in distributed environments where each service has independent data stores, the limitations of 2PC (Two-Phase Commit) are clear. Performance bottlenecks, reduced availability, and strong coupling between services result.

Event-Driven Architecture (EDA) offers a fundamental solution to this problem. It transitions inter-service communication to asynchronous event-based messaging, manages state changes as event streams, and handles distributed transactions with compensation-based sagas. This article deeply analyzes the three core EDA patterns -- CQRS, Event Sourcing, and Saga -- with production code, and covers strategies and troubleshooting techniques needed for production operations.

Netflix uses these patterns for its personalization system serving 260 million subscribers, LinkedIn for processing trillions of daily events, and Slack for handling billions of daily messages. Let us explore the patterns and anti-patterns validated from the experience of these large-scale systems.

Event-Driven Architecture Fundamentals

Three Types of Events

In EDA, events are classified into three types based on their purpose.

TypeDescriptionExampleCharacteristics
Domain EventA fact recorded from the business domainOrderPlaced, PaymentCompletedImmutable, past-tense naming
Integration EventEvents for inter-service communicationOrderPlacedIntegrationEventCrosses bounded context boundaries
Event NotificationChange notification (minimal data)OrderStatusChanged (ID only)Receiver queries needed data directly

Core Principles

The core principles for correctly implementing EDA are as follows.

  1. Asynchronous Communication: Producers and consumers are temporally decoupled
  2. Loose Coupling: Services only need to know the event schema, not the implementation of other services
  3. Eventual Consistency: Instead of immediate strong consistency, consistency is guaranteed after a certain period
  4. Idempotency: Processing the same event multiple times must produce the same result
// TypeScript - Domain Event basic structure
interface DomainEvent {
  eventId: string
  eventType: string
  aggregateId: string
  aggregateType: string
  timestamp: Date
  version: number
  payload: Record<string, unknown>
  metadata: {
    correlationId: string
    causationId: string
    userId?: string
  }
}

// Order creation event example
const orderPlacedEvent: DomainEvent = {
  eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
  eventType: 'OrderPlaced',
  aggregateId: 'order-12345',
  aggregateType: 'Order',
  timestamp: new Date('2026-03-14T09:00:00Z'),
  version: 1,
  payload: {
    customerId: 'cust-67890',
    items: [
      { productId: 'prod-001', quantity: 2, price: 29900 },
      { productId: 'prod-002', quantity: 1, price: 15000 },
    ],
    totalAmount: 74800,
    shippingAddress: {
      city: 'Seoul',
      district: 'Gangnam-gu',
      detail: 'Teheran-ro 123',
    },
  },
  metadata: {
    correlationId: 'corr-abc123',
    causationId: 'cmd-place-order-001',
    userId: 'user-admin-01',
  },
}

CQRS Pattern: Separating Commands and Queries

What Is CQRS?

CQRS (Command Query Responsibility Segregation) is a pattern that separates data writes (Commands) and reads (Queries) into separate models. It extends Bertrand Meyer's CQS (Command Query Separation) principle to the architectural level, formalized by Greg Young in 2010.

In the traditional CRUD model, the same data model handles both reads and writes. However, in real business scenarios, read and write requirements differ significantly.

AspectCommand (Write)Query (Read)
PurposeState change, business rule validationData retrieval, display
Ratio10-20% of total traffic80-90% of total traffic
ConsistencyStrong consistency neededEventual consistency acceptable
Model ComplexityRich domain logicDenormalized read models
ScalingPrimarily vertical scalingEasy horizontal scaling (cache, replicas)

CQRS Implementation: TypeScript Example

// TypeScript - CQRS Command Side

// Command definition
interface PlaceOrderCommand {
  type: 'PlaceOrder'
  customerId: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
  shippingAddress: string
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private orderRepository: OrderWriteRepository,
    private eventBus: EventBus,
    private inventoryService: InventoryService
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // 1. Business rule validation
    await this.inventoryService.validateStock(command.items)

    // 2. Aggregate creation
    const order = Order.create({
      customerId: command.customerId,
      items: command.items,
      shippingAddress: command.shippingAddress,
    })

    // 3. Save to write store
    await this.orderRepository.save(order)

    // 4. Publish domain events (for read model sync)
    for (const event of order.getDomainEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }
}

// Query Side - Read-only model
interface OrderReadModel {
  orderId: string
  customerName: string
  orderDate: string
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: string
}

// Query Handler
class GetOrdersQueryHandler {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: {
    customerId: string
    status?: string
    page: number
    limit: number
  }): Promise<OrderReadModel[]> {
    // Query directly from denormalized read-only table
    return this.readDb.query(
      `SELECT order_id, customer_name, order_date, status,
              total_amount, item_count, last_updated
       FROM order_read_model
       WHERE customer_id = ?
       ${query.status ? 'AND status = ?' : ''}
       ORDER BY order_date DESC
       LIMIT ? OFFSET ?`,
      [query.customerId, query.status, query.limit, query.page * query.limit]
    )
  }
}

Read Model Projection

The projection logic that receives events and updates the read model is the core of CQRS.

// TypeScript - Event-based Projection

class OrderProjection {
  constructor(private readDb: ReadDatabase) {}

  async handle(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.onOrderPlaced(event)
        break
      case 'OrderShipped':
        await this.onOrderShipped(event)
        break
      case 'OrderCancelled':
        await this.onOrderCancelled(event)
        break
    }
  }

  private async onOrderPlaced(event: DomainEvent): Promise<void> {
    const payload = event.payload as {
      customerId: string
      items: Array<{ quantity: number; price: number }>
      totalAmount: number
    }

    await this.readDb.upsert('order_read_model', {
      order_id: event.aggregateId,
      customer_id: payload.customerId,
      status: 'PLACED',
      total_amount: payload.totalAmount,
      item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
      order_date: event.timestamp,
      last_updated: event.timestamp,
      version: event.version,
    })
  }

  private async onOrderShipped(event: DomainEvent): Promise<void> {
    const payload = event.payload as { trackingNumber: string }

    // Idempotency guarantee: version check
    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'SHIPPED',
        tracking_number: payload.trackingNumber,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId, version: event.version - 1 }
    )
  }

  private async onOrderCancelled(event: DomainEvent): Promise<void> {
    const payload = event.payload as { reason: string }

    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'CANCELLED',
        cancellation_reason: payload.reason,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId }
    )
  }
}

Event Sourcing: Event-Based State Management

Core Concept of Event Sourcing

Event Sourcing is a pattern that stores all events that changed the state of an aggregate in order, instead of storing the current state. The current state is reconstructed by replaying the event stream from the beginning.

Comparing the traditional approach with Event Sourcing:

AspectTraditional CRUDEvent Sourcing
What is storedCurrent state (latest snapshot)Complete history of state changes
Data lossPrevious states are lostAll change history is preserved
AuditRequires separate implementationBuilt-in
DebuggingOnly current state viewableTime Travel possible
Storage spaceRelatively smallIncreases with event accumulation
Query perfDirect queries possibleRequires projection or snapshots

Event Sourcing Implementation

// TypeScript - Event Sourcing Aggregate

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  // Apply event (state change)
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version++
    this.uncommittedEvents.push(event)
  }

  // Event handler (implemented by subclass)
  protected abstract when(event: DomainEvent): void

  // Restore state from event stream
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

// Order Aggregate
class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _items: OrderItem[] = []
  private _status: OrderStatus = OrderStatus.DRAFT
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  // Factory method (create command)
  static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
    const order = new Order()
    const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderPlaced',
      aggregateId: params.orderId,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: 1,
      payload: {
        customerId: params.customerId,
        items: params.items,
        totalAmount,
      },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'create',
      },
    })

    return order
  }

  // Confirm order command
  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error(`Cannot confirm order in status: ${this._status}`)
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { confirmedAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'confirm',
      },
    })
  }

  // Cancel order command (used in compensating transactions)
  cancel(reason: string): void {
    if (this._status === OrderStatus.CANCELLED) {
      return // Idempotency guarantee
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderCancelled',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { reason, cancelledAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'cancel',
      },
    })
  }

  // Event handler - state change logic
  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced': {
        const p = event.payload as {
          customerId: string
          items: OrderItem[]
          totalAmount: number
        }
        this._id = event.aggregateId
        this._customerId = p.customerId
        this._items = p.items
        this._totalAmount = p.totalAmount
        this._status = OrderStatus.PLACED
        break
      }
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

interface OrderItem {
  productId: string
  quantity: number
  price: number
}

Snapshot Strategy

As the number of events grows, replay time increases. Snapshots cache state at specific points to improve replay performance.

# Python - Snapshot-based Event Store

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json


@dataclass
class Snapshot:
    aggregate_id: str
    aggregate_type: str
    version: int
    state: dict
    created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
    SNAPSHOT_INTERVAL = 100  # Create snapshot every 100 events

    def __init__(self, db_connection):
        self.db = db_connection

    async def save_events(
        self, aggregate_id: str, events: list[dict], expected_version: int
    ) -> None:
        """Save events with optimistic concurrency control"""
        async with self.db.transaction():
            # Check current version (optimistic locking)
            current_version = await self._get_current_version(aggregate_id)
            if current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, "
                    f"but current version is {current_version}"
                )

            # Batch insert events
            for i, event in enumerate(events):
                version = expected_version + i + 1
                await self.db.execute(
                    """INSERT INTO event_store
                       (event_id, aggregate_id, aggregate_type,
                        event_type, version, payload, metadata, created_at)
                       VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                    (
                        event["event_id"],
                        aggregate_id,
                        event["aggregate_type"],
                        event["event_type"],
                        version,
                        json.dumps(event["payload"]),
                        json.dumps(event["metadata"]),
                        datetime.utcnow(),
                    ),
                )

            # Check snapshot creation condition
            new_version = expected_version + len(events)
            if new_version % self.SNAPSHOT_INTERVAL == 0:
                await self._create_snapshot(aggregate_id, new_version)

    async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
        """Load events from snapshot to restore state"""
        # 1. Query most recent snapshot
        snapshot = await self._get_latest_snapshot(aggregate_id)

        if snapshot:
            # 2. Load only events after snapshot
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ? AND version > ?
                   ORDER BY version ASC""",
                (aggregate_id, snapshot.version),
            )
            return events, snapshot
        else:
            # 3. Load all events if no snapshot
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ?
                   ORDER BY version ASC""",
                (aggregate_id,),
            )
            return events, None

    async def _create_snapshot(
        self, aggregate_id: str, version: int
    ) -> None:
        """Save current state as snapshot"""
        events, _ = await self.load_aggregate(aggregate_id)
        # Rebuild aggregate and serialize state
        aggregate = self._rebuild_aggregate(events)
        await self.db.execute(
            """INSERT INTO snapshots
               (aggregate_id, aggregate_type, version, state, created_at)
               VALUES (?, ?, ?, ?, ?)""",
            (
                aggregate_id,
                aggregate.aggregate_type,
                version,
                json.dumps(aggregate.to_dict()),
                datetime.utcnow(),
            ),
        )

    async def _get_current_version(self, aggregate_id: str) -> int:
        result = await self.db.fetch_one(
            "SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
            (aggregate_id,),
        )
        return result[0] if result[0] else 0


class ConcurrencyError(Exception):
    pass

Saga Pattern: Distributed Transaction Management

What Is the Saga Pattern?

The Saga pattern manages business transactions spanning multiple services in distributed environments. Unlike traditional distributed transactions (2PC), it sequentially executes each service's local transactions and, upon failure, executes compensating transactions for already completed steps to restore consistency.

An example of an order processing Saga flow:

Normal flow:

  1. Order Service: Create order (OrderCreated)
  2. Payment Service: Process payment (PaymentProcessed)
  3. Inventory Service: Reserve inventory (InventoryReserved)
  4. Shipping Service: Create shipment (ShipmentCreated)

Compensation flow on failure (when step 3 inventory reservation fails):

  1. Payment Service: Refund payment (PaymentRefunded) -- compensation
  2. Order Service: Cancel order (OrderCancelled) -- compensation

Orchestration-based Saga Implementation

// TypeScript - Saga Orchestrator (Order Processing)

interface SagaStep {
  name: string
  action: () => Promise<void>
  compensation: () => Promise<void>
}

class OrderSagaOrchestrator {
  private completedSteps: SagaStep[] = []
  private sagaLog: SagaLogEntry[] = []

  constructor(
    private paymentService: PaymentService,
    private inventoryService: InventoryService,
    private shippingService: ShippingService,
    private sagaStore: SagaStore
  ) {}

  async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
    const sagaId = crypto.randomUUID()

    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        action: async () => {
          await this.paymentService.processPayment({
            orderId,
            amount: orderData.totalAmount,
            customerId: orderData.customerId,
          })
        },
        compensation: async () => {
          await this.paymentService.refundPayment({
            orderId,
            amount: orderData.totalAmount,
          })
        },
      },
      {
        name: 'ReserveInventory',
        action: async () => {
          await this.inventoryService.reserve({
            orderId,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.inventoryService.releaseReservation({
            orderId,
            items: orderData.items,
          })
        },
      },
      {
        name: 'CreateShipment',
        action: async () => {
          await this.shippingService.createShipment({
            orderId,
            address: orderData.shippingAddress,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.shippingService.cancelShipment({ orderId })
        },
      },
    ]

    try {
      for (const step of steps) {
        await this.logStep(sagaId, step.name, 'STARTED')

        try {
          await step.action()
          this.completedSteps.push(step)
          await this.logStep(sagaId, step.name, 'COMPLETED')
        } catch (error) {
          await this.logStep(sagaId, step.name, 'FAILED', error)
          // Execute compensating transactions
          await this.compensate(sagaId)
          return {
            success: false,
            sagaId,
            failedStep: step.name,
            error: (error as Error).message,
          }
        }
      }

      await this.sagaStore.markCompleted(sagaId)
      return { success: true, sagaId }
    } catch (compensationError) {
      // When compensation transaction also fails - manual intervention needed
      await this.sagaStore.markRequiresIntervention(sagaId)
      throw new SagaCompensationFailedError(sagaId, compensationError as Error)
    }
  }

  private async compensate(sagaId: string): Promise<void> {
    // Compensate completed steps in reverse order
    const stepsToCompensate = [...this.completedSteps].reverse()

    for (const step of stepsToCompensate) {
      try {
        await this.logStep(sagaId, step.name, 'COMPENSATING')
        await step.compensation()
        await this.logStep(sagaId, step.name, 'COMPENSATED')
      } catch (error) {
        await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
        // Register in retry queue on compensation failure
        await this.sagaStore.enqueueRetry(sagaId, step.name)
        throw error
      }
    }
  }

  private async logStep(
    sagaId: string,
    stepName: string,
    status: string,
    error?: unknown
  ): Promise<void> {
    const entry: SagaLogEntry = {
      sagaId,
      stepName,
      status,
      timestamp: new Date(),
      error: error ? (error as Error).message : undefined,
    }
    this.sagaLog.push(entry)
    await this.sagaStore.appendLog(entry)
  }
}

interface SagaResult {
  success: boolean
  sagaId: string
  failedStep?: string
  error?: string
}

interface SagaLogEntry {
  sagaId: string
  stepName: string
  status: string
  timestamp: Date
  error?: string
}

Choreography vs Orchestration Comparison

A detailed comparison of the two Saga pattern implementation approaches.

ComparisonChoreographyOrchestration
Control methodDistributed - each service publishes/subscribes eventsCentralized - orchestrator controls the flow
CouplingLow (only event schema shared)Medium (orchestrator must know all services)
VisibilityLow (hard to trace flow)High (state viewable in orchestrator)
Complexity mgmtRapidly complex as services increaseIncreases linearly
SPOFNoneOrchestrator can be SPOF
CompensationDistributed across servicesCentralized in orchestrator
TestingIntegration testing difficultOrchestrator unit testing easy
Suitable scaleSimple workflows with 2-4 servicesComplex workflows with 5+ services
Representative toolsKafka, RabbitMQ, SNS/SQSTemporal, Camunda, AWS Step Functions

Choreography Pattern Code Example

# Python - Choreography-based Saga (event subscription approach)

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json


class EventBus:
    """Simple in-memory event bus (use Kafka/RabbitMQ in production)"""

    def __init__(self):
        self._handlers: dict[str, list] = {}

    def subscribe(self, event_type: str, handler):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: dict):
        handlers = self._handlers.get(event_type, [])
        for handler in handlers:
            await handler(payload)


# Payment Service - Choreography approach
class PaymentService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # Subscribe to order creation event
        event_bus.subscribe("OrderCreated", self.on_order_created)
        # Subscribe to inventory failure event (for compensation)
        event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)

    async def on_order_created(self, payload: dict):
        """Process payment on order creation"""
        try:
            order_id = payload["order_id"]
            amount = payload["total_amount"]

            # Payment processing logic
            payment_result = await self._process_payment(
                order_id, amount
            )

            # Publish success event
            await self.event_bus.publish("PaymentProcessed", {
                "order_id": order_id,
                "payment_id": payment_result["payment_id"],
                "amount": amount,
            })
        except Exception as e:
            # Publish failure event
            await self.event_bus.publish("PaymentFailed", {
                "order_id": payload["order_id"],
                "reason": str(e),
            })

    async def on_inventory_failed(self, payload: dict):
        """Refund payment on inventory failure (compensating transaction)"""
        order_id = payload["order_id"]
        await self._refund_payment(order_id)
        await self.event_bus.publish("PaymentRefunded", {
            "order_id": order_id,
        })

    async def _process_payment(self, order_id: str, amount: int) -> dict:
        # Actual payment processing logic
        return {"payment_id": f"pay-{order_id}"}

    async def _refund_payment(self, order_id: str) -> None:
        # Actual refund processing logic
        pass


# Inventory Service - Choreography approach
class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # Subscribe to payment completion event
        event_bus.subscribe("PaymentProcessed", self.on_payment_processed)

    async def on_payment_processed(self, payload: dict):
        """Reserve inventory on payment completion"""
        try:
            order_id = payload["order_id"]
            # Check and reserve inventory
            await self._reserve_inventory(order_id)

            await self.event_bus.publish("InventoryReserved", {
                "order_id": order_id,
            })
        except InsufficientStockError:
            await self.event_bus.publish("InventoryReservationFailed", {
                "order_id": payload["order_id"],
                "reason": "insufficient_stock",
            })


class InsufficientStockError(Exception):
    pass

Hybrid Approach: When to Choose Which

In practice, rather than using pure Choreography or Orchestration alone, mixing them based on workflow complexity is effective.

  • Choose Choreography: Simple notifications, cache invalidation, log collection with 2-3 participating services
  • Choose Orchestration: Core business flows like payment-inventory-shipping where order and compensation matter
  • Hybrid: Core business flows via Orchestration, side effects (email, notifications, analytics events) via Choreography

Event Store Selection (EventStoreDB, Kafka, DynamoDB)

Event store selection varies based on system requirements and team capabilities. Here is a comparison of the three main options.

ComparisonEventStoreDBApache KafkaDynamoDB Streams
Design purposeDedicated Event Sourcing DBDistributed message streaming platformGeneral NoSQL + change streams
Stream modelFine-grained individual streams (per Aggregate)Topic-partition basedTable + DynamoDB Streams/Kinesis
ConcurrencyExpectedVersion (optimistic locking) built-inPartition-level ordering onlyConditional writes (ConditionExpression)
ID lookupInstant lookup by stream IDCannot look up specific entity in topicDirect lookup by partition key
Event orderingFully guaranteed within streamGuaranteed only within partitionGuaranteed within partition key
ProjectionsServer-side projections built-inKafka Streams / ksqlDBLambda + DynamoDB Streams
SubscriptionPersistent / Catch-up subscriptionsConsumer GroupDynamoDB Streams / Kinesis
Ops complexityMedium (dedicated cluster)High (ZooKeeper/KRaft, partition mgmt)Low (serverless, AWS managed)
Cost modelOpen source + commercial cloudOpen source + MSK/Confluent CloudPay-per-request/storage
Best fitPure Event Sourcing systemsHigh-volume event streaming + integrationAWS native, serverless architecture

Selection Guide

When EventStoreDB is suitable:

  • Event Sourcing is the core architectural pattern
  • Fine-grained stream management and projections are needed
  • Team actively uses DDD-based design

When Kafka is suitable:

  • High-volume event streaming is the main purpose
  • Kafka infrastructure already exists and team expertise is sufficient
  • Both Event Sourcing and event streaming are needed (consider EventStoreDB + Kafka combination)

When DynamoDB is suitable:

  • Primarily using the AWS ecosystem
  • Targeting serverless architecture
  • Minimizing operational burden

EventStoreDB Configuration Example

# docker-compose.yml - EventStoreDB cluster setup
version: '3.8'
services:
  eventstoredb:
    image: eventstore/eventstore:24.2
    container_name: eventstoredb
    environment:
      - EVENTSTORE_CLUSTER_SIZE=1
      - EVENTSTORE_RUN_PROJECTIONS=All
      - EVENTSTORE_START_STANDARD_PROJECTIONS=true
      - EVENTSTORE_INSECURE=true
      - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
      - EVENTSTORE_MEM_DB=false
      - EVENTSTORE_DB=/var/lib/eventstore-data
      - EVENTSTORE_INDEX=/var/lib/eventstore-index
      - EVENTSTORE_LOG=/var/log/eventstore
    ports:
      - '2113:2113' # HTTP/gRPC
      - '1113:1113' # TCP (legacy)
    volumes:
      - eventstore-data:/var/lib/eventstore-data
      - eventstore-index:/var/lib/eventstore-index
      - eventstore-logs:/var/log/eventstore

  # Kafka - for event streaming (combined with EventStoreDB)
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    container_name: kafka
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    ports:
      - '9092:9092'
    volumes:
      - kafka-data:/var/lib/kafka/data

volumes:
  eventstore-data:
  eventstore-index:
  eventstore-logs:
  kafka-data:

Production Implementation: Order System Example

Let us examine a production implementation that integrates CQRS, Event Sourcing, and Saga patterns through an e-commerce order system.

System Architecture Overview

The overall system consists of the following structure:

  • Order Service: Order Aggregate (Event Sourcing), CQRS applied
  • Payment Service: Payment processing, refund compensating transaction
  • Inventory Service: Inventory reservation, release compensating transaction
  • Shipping Service: Shipment creation, cancellation compensating transaction
  • Saga Orchestrator: Temporal-based workflow management

Temporal-based Saga Workflow

// TypeScript - Order Saga implemented with Temporal Workflow

import { proxyActivities, defineSignal, setHandler, condition, sleep } from '@temporalio/workflow'

// Activity proxies
const payment = proxyActivities<PaymentActivities>({
  startToCloseTimeout: '30s',
  retry: {
    maximumAttempts: 3,
    initialInterval: '1s',
    backoffCoefficient: 2,
    maximumInterval: '30s',
  },
})

const inventory = proxyActivities<InventoryActivities>({
  startToCloseTimeout: '10s',
  retry: { maximumAttempts: 3 },
})

const shipping = proxyActivities<ShippingActivities>({
  startToCloseTimeout: '15s',
  retry: { maximumAttempts: 3 },
})

const notification = proxyActivities<NotificationActivities>({
  startToCloseTimeout: '5s',
  retry: { maximumAttempts: 5 },
})

// Signal definition (send messages to workflow from outside)
const cancelOrderSignal = defineSignal<[string]>('cancelOrder')

// Order Processing Saga Workflow
export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {
  let isCancelled = false
  let cancelReason = ''

  // Cancel signal handler
  setHandler(cancelOrderSignal, (reason: string) => {
    isCancelled = true
    cancelReason = reason
  })

  const compensations: Array<() => Promise<void>> = []

  try {
    // Step 1: Process payment
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const paymentResult = await payment.processPayment({
      orderId: input.orderId,
      amount: input.totalAmount,
      customerId: input.customerId,
    })

    compensations.push(async () => {
      await payment.refundPayment({
        orderId: input.orderId,
        paymentId: paymentResult.paymentId,
        amount: input.totalAmount,
      })
    })

    // Step 2: Reserve inventory
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    await inventory.reserveInventory({
      orderId: input.orderId,
      items: input.items,
    })

    compensations.push(async () => {
      await inventory.releaseReservation({
        orderId: input.orderId,
        items: input.items,
      })
    })

    // Step 3: Create shipment
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const shipmentResult = await shipping.createShipment({
      orderId: input.orderId,
      address: input.shippingAddress,
      items: input.items,
    })

    compensations.push(async () => {
      await shipping.cancelShipment({
        orderId: input.orderId,
        shipmentId: shipmentResult.shipmentId,
      })
    })

    // All steps succeeded - send confirmation notification (Saga unaffected by failure)
    await notification
      .sendOrderConfirmation({
        orderId: input.orderId,
        customerId: input.customerId,
      })
      .catch(() => {
        /* Notification failure is ignored */
      })

    return {
      success: true,
      orderId: input.orderId,
      paymentId: paymentResult.paymentId,
      shipmentId: shipmentResult.shipmentId,
    }
  } catch (error) {
    // Execute compensating transactions (reverse order)
    for (const compensate of compensations.reverse()) {
      try {
        await compensate()
      } catch (compError) {
        // Temporal automatically manages retries
        // Final failure sends to Dead Letter Queue
        console.error('Compensation failed:', compError)
      }
    }

    return {
      success: false,
      orderId: input.orderId,
      error: (error as Error).message,
    }
  }
}

interface OrderSagaInput {
  orderId: string
  customerId: string
  totalAmount: number
  shippingAddress: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
}

interface OrderSagaResult {
  success: boolean
  orderId: string
  paymentId?: string
  shipmentId?: string
  error?: string
}

class SagaCancelledError extends Error {
  constructor(reason: string) {
    super(`Saga cancelled: ${reason}`)
    this.name = 'SagaCancelledError'
  }
}

DynamoDB-based Event Store Schema

# AWS CloudFormation - DynamoDB Event Store Table

AWSTemplateFormatVersion: '2010-09-09'
Description: Event Store on DynamoDB

Resources:
  EventStoreTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
        - AttributeName: eventType
          AttributeType: S
        - AttributeName: timestamp
          AttributeType: S
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      GlobalSecondaryIndexes:
        - IndexName: eventType-timestamp-index
          KeySchema:
            - AttributeName: eventType
              KeyType: HASH
            - AttributeName: timestamp
              KeyType: RANGE
          Projection:
            ProjectionType: ALL
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      Tags:
        - Key: Environment
          Value: production
        - Key: Service
          Value: event-store

  SnapshotTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store-snapshots
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      Tags:
        - Key: Environment
          Value: production

  SagaStateTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: saga-state
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: sagaId
          AttributeType: S
        - AttributeName: createdAt
          AttributeType: S
      KeySchema:
        - AttributeName: sagaId
          KeyType: HASH
      GlobalSecondaryIndexes:
        - IndexName: createdAt-index
          KeySchema:
            - AttributeName: createdAt
              KeyType: HASH
          Projection:
            ProjectionType: ALL
      TimeToLiveSpecification:
        AttributeName: ttl
        Enabled: true

Operational Considerations and Troubleshooting

1. Ensuring Idempotency

In distributed environments, events have "at-least-once" delivery guarantees, so the same event may be processed multiple times. All event handlers must guarantee idempotency.

// TypeScript - Idempotency guarantee pattern

class IdempotentEventHandler {
  constructor(
    private processedEvents: ProcessedEventStore,
    private handler: EventHandler
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    // Check if event was already processed
    const isProcessed = await this.processedEvents.exists(event.eventId)
    if (isProcessed) {
      console.log(`Event ${event.eventId} already processed, skipping`)
      return
    }

    try {
      // Process event
      await this.handler.handle(event)

      // Record completion (auto-cleanup with TTL)
      await this.processedEvents.markAsProcessed(event.eventId, {
        processedAt: new Date(),
        ttl: 60 * 60 * 24 * 7, // Expire after 7 days
      })
    } catch (error) {
      // Not recorded on failure so retry is possible
      throw error
    }
  }
}

// Redis-based duplicate detection
class RedisProcessedEventStore implements ProcessedEventStore {
  constructor(private redis: Redis) {}

  async exists(eventId: string): Promise<boolean> {
    const result = await this.redis.exists(`processed:${eventId}`)
    return result === 1
  }

  async markAsProcessed(
    eventId: string,
    options: { processedAt: Date; ttl: number }
  ): Promise<void> {
    await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
  }
}

2. Ensuring Event Ordering

To guarantee event ordering in Kafka, events from the same Aggregate must be routed to the same partition. Using the Aggregate ID as the partition key is standard practice.

3. Schema Evolution

Since events are immutable, schema changes must maintain backward compatibility. Use a schema registry like Avro or Protobuf, or apply the upcasting pattern.

// TypeScript - Event Upcaster (Schema Evolution)

class EventUpcaster {
  private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()

  register(
    eventType: string,
    fromVersion: number,
    upcaster: (event: DomainEvent) => DomainEvent
  ): void {
    if (!this.upcasters.has(eventType)) {
      this.upcasters.set(eventType, new Map())
    }
    this.upcasters.get(eventType)!.set(fromVersion, upcaster)
  }

  upcast(event: DomainEvent): DomainEvent {
    const typeUpcasters = this.upcasters.get(event.eventType)
    if (!typeUpcasters) return event

    let currentEvent = event
    let schemaVersion = (event.metadata as any).schemaVersion || 1

    while (typeUpcasters.has(schemaVersion)) {
      const upcasterFn = typeUpcasters.get(schemaVersion)!
      currentEvent = upcasterFn(currentEvent)
      schemaVersion++
    }

    return currentEvent
  }
}

// Usage example: OrderPlaced event v1 -> v2 upcasting
const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => {
  // v1 had shippingAddress as a string
  // v2 changed to a structured object
  const payload = event.payload as any
  return {
    ...event,
    payload: {
      ...payload,
      shippingAddress: {
        full: payload.shippingAddress,
        city: '',
        zipCode: '',
      },
    },
    metadata: {
      ...event.metadata,
      schemaVersion: 2,
    },
  }
})

4. Key Monitoring Metrics

The following metrics should be monitored to check the health of an EDA system.

MetricDescriptionAlert Threshold
Consumer LagConsumer processing delay (unprocessed events)Over 1,000 events
Event Processing LatencyTime from event publication to processingP99 over 5 seconds
Saga Completion RateSaga success rateBelow 99%
Compensation Failure RateCompensating transaction failure rateOver 0.1%
Projection LagSync delay between read and write modelsOver 30 seconds
Dead Letter Queue SizeNumber of unprocessable eventsImmediate alert when > 0

Failure Cases and Recovery Procedures

Case 1: Compensation Transaction Failure (Most Dangerous Scenario)

Problem: Payment was completed but inventory reservation failed. The compensation (refund) was attempted but the refund also failed due to payment gateway timeout.

Recovery procedure:

  1. Mark Saga state as "REQUIRES_INTERVENTION"
  2. Register failed compensation transaction in Dead Letter Queue
  3. Automatic retry (exponential backoff): 1s, 2s, 4s, 8s, 16s
  4. Alert ops team via PagerDuty/Slack when max retries exceeded
  5. Operator manually processes refund via payment gateway console
  6. Update Saga state to "MANUALLY_COMPENSATED"

Case 2: Event Order Reversal

Problem: Due to network delay, OrderCancelled event arrives before OrderConfirmed

Prevention and recovery:

  • Include version field in events for order verification
  • Buffer and re-sort when version does not match expectations
  • Using EventStoreDB prevents this issue entirely since ordering within streams is guaranteed

Case 3: Read Model Inconsistency Due to Projection Failure

Problem: Projection process crashes and the read model does not reflect the latest state

Recovery procedure:

  1. Check the projection process's last checkpoint
  2. Re-run projection from events after the checkpoint
  3. For severe inconsistency, drop the read model and replay all events
  4. During replay, redirect read traffic to cache or write DB
# Python - Projection recovery script

import asyncio
from datetime import datetime


class ProjectionRecovery:
    def __init__(self, event_store, read_db, projection):
        self.event_store = event_store
        self.read_db = read_db
        self.projection = projection

    async def recover_from_checkpoint(self) -> dict:
        """Checkpoint-based projection recovery"""
        # 1. Check last checkpoint
        checkpoint = await self.read_db.get_checkpoint(
            self.projection.name
        )
        last_position = checkpoint.get("position", 0) if checkpoint else 0

        print(
            f"Recovering projection '{self.projection.name}' "
            f"from position {last_position}"
        )

        # 2. Load events after checkpoint
        events = await self.event_store.read_all_from(last_position)
        processed = 0
        errors = 0

        for event in events:
            try:
                await self.projection.handle(event)
                processed += 1

                # Update checkpoint every 100 events
                if processed % 100 == 0:
                    await self.read_db.save_checkpoint(
                        self.projection.name,
                        {"position": event["global_position"]},
                    )
            except Exception as e:
                errors += 1
                print(
                    f"Error processing event "
                    f"{event['event_id']}: {e}"
                )
                # Continue after logging error (skip & log)
                continue

        # 3. Save final checkpoint
        if events:
            await self.read_db.save_checkpoint(
                self.projection.name,
                {"position": events[-1]["global_position"]},
            )

        return {
            "projection": self.projection.name,
            "processed": processed,
            "errors": errors,
            "recovered_at": datetime.utcnow().isoformat(),
        }

    async def full_rebuild(self) -> dict:
        """Full rebuild of read model"""
        print(
            f"Full rebuild of projection '{self.projection.name}'"
        )

        # 1. Delete existing read model
        await self.read_db.drop_projection_data(self.projection.name)

        # 2. Reset checkpoint
        await self.read_db.save_checkpoint(
            self.projection.name, {"position": 0}
        )

        # 3. Replay all events
        return await self.recover_from_checkpoint()

Case 4: Event Store Disk Shortage

Problem: Events accumulate continuously and disk capacity becomes insufficient

Prevention strategy:

  • After creating snapshots, move old events to archive (S3/Glacier)
  • Establish event retention policy: hot data (30 days), warm data (1 year), cold data (permanent archive)
  • Set disk usage alerts at 80% warning, 90% critical

Conclusion

The three core patterns of Event-Driven Architecture -- CQRS, Event Sourcing, and Saga -- are each powerful independently but fundamentally solve data consistency problems in microservice architecture when used together.

Key takeaways:

  1. CQRS: Acknowledge and separate the asymmetric requirements of reads and writes. You can independently optimize read traffic that comprises 80-90% of total traffic.

  2. Event Sourcing: Store change history instead of current state. This enables complete audit trails, time-travel debugging, and diverse read model generation. However, snapshot strategies and schema evolution strategies must be considered from the start.

  3. Saga Pattern: Manage distributed transactions with compensation-based approaches. Implement simple flows with Choreography and complex business logic with Orchestration, but always prepare recovery strategies for compensation transaction failures.

  4. Event Store Selection: Consider EventStoreDB for pure Event Sourcing, Kafka for high-volume streaming, and DynamoDB for AWS serverless, with combinations possible based on requirements.

These patterns are powerful but come with the cost of complexity. Rather than applying them uniformly to all services, the realistic strategy is to introduce them incrementally starting with core domains where business complexity is high and data consistency is critical. Adopting these patterns without operational monitoring, idempotency guarantees, and schema evolution strategies can actually harm system stability.

References