Skip to content

Split View: Event Sourcing + CQRS 아키텍처 실전 구현 가이드: 이벤트 스토어부터 프로젝션·사가 패턴까지

✨ Learn with Quiz
|

Event Sourcing + CQRS 아키텍처 실전 구현 가이드: 이벤트 스토어부터 프로젝션·사가 패턴까지

Event Sourcing CQRS

들어가며

전통적인 CRUD 시스템은 데이터의 현재 상태만 저장한다. UPDATE가 실행되면 이전 값은 사라지고, DELETE가 실행되면 데이터 자체가 소멸한다. 은행 계좌 잔액이 100만원이라는 사실은 알 수 있지만, 그 잔액이 어떤 입출금 과정을 거쳐 형성되었는지는 알 수 없다.

Event Sourcing은 이 문제를 근본적으로 해결한다. 상태 변경 자체를 이벤트로 기록하고, 현재 상태는 이벤트의 순차적 재생으로 복원한다. CQRS는 이 구조 위에서 쓰기(Command)와 읽기(Query)를 완전히 분리하여 각각의 요구사항에 최적화한다.

이 글에서는 Event Sourcing과 CQRS를 결합한 아키텍처의 전체 구현 과정을 다룬다. 이벤트 스토어 설계부터 Aggregate 구현, 프로젝션 구축, Saga 패턴을 활용한 분산 트랜잭션 관리, 스냅샷 최적화, 그리고 운영 시 발생하는 장애 사례와 복구 절차까지 실전 코드와 함께 안내한다.

1. Event Sourcing 핵심 원리

1.1 이벤트란 무엇인가

Event Sourcing에서 이벤트(Event)는 **과거에 발생한 변경 불가능한 사실(Immutable Fact)**이다. 이벤트는 과거 시제로 명명되며, 비즈니스 의도를 명확히 담아야 한다.

잘못된 이벤트 설계와 올바른 이벤트 설계를 비교하면 다음과 같다.

  • 잘못된 예: NameChanged, EmailChanged - 속성 단위의 이벤트는 비즈니스 의미가 없다
  • 올바른 예: CustomerRelocated, OrderConfirmed - 도메인 행위를 표현하는 이벤트

1.2 상태 복원의 원리

현재 상태는 초기 상태에 모든 이벤트를 순차적으로 적용(replay)하여 계산한다.

현재 상태 = fold(초기 상태, [이벤트1, 이벤트2, ..., 이벤트N])

예를 들어 은행 계좌의 상태 복원 과정은 다음과 같다.

초기 잔액: 0  -> AccountOpened(금액: 0)        => 잔액: 0  -> MoneyDeposited(금액: 100만원)   => 잔액: 100만원
  -> MoneyWithdrawn(금액: 30만원)    => 잔액: 70만원
  -> MoneyDeposited(금액: 50만원)    => 잔액: 120만원

1.3 이벤트의 불변성 원칙

한번 저장된 이벤트는 절대 수정하거나 삭제할 수 없다. 잘못된 이벤트가 발행되었다면 **보상 이벤트(Compensating Event)**를 새로 발행하여 논리적으로 취소해야 한다.

// 잘못된 출금이 발생한 경우 - 이벤트를 삭제하지 않는다
// 대신 보상 이벤트를 발행한다
interface MoneyWithdrawnCompensated {
  type: 'MoneyWithdrawnCompensated'
  originalEventId: string
  amount: number
  reason: string
  timestamp: Date
}

2. CQRS 아키텍처 패턴

2.1 Command와 Query의 분리

CQRS의 핵심은 단순하다. 데이터를 변경하는 모델과 데이터를 조회하는 모델을 분리하는 것이다.

[클라이언트]
    ├── Command(쓰기) ──> [Command Handler] ──> [Write Model / Event Store]
    │                                                     │
    │                                              이벤트 발행
    │                                                     │
    │                                                     ▼
[Projection Engine]
    │                                                     │
    │                                                     ▼
    └── Query(읽기) ───> [Query Handler] ───> [Read Model / View DB]

2.2 왜 분리하는가

읽기와 쓰기의 요구사항은 근본적으로 다르다.

  • 쓰기(Command): 비즈니스 규칙 검증, 도메인 불변 규칙(invariant) 보장, 트랜잭션 일관성 필요
  • 읽기(Query): 다양한 뷰 지원, 낮은 지연시간, 높은 처리량 필요

하나의 모델로 양쪽을 만족시키려 하면 양쪽 모두 타협해야 한다. CQRS는 각각의 모델을 독립적으로 최적화할 수 있게 해준다.

2.3 Command Handler 구현

// Command 정의
interface CreateOrderCommand {
  orderId: string
  customerId: string
  items: Array<{ productId: string; quantity: number; price: number }>
}

// Command Handler
class OrderCommandHandler {
  constructor(
    private readonly eventStore: EventStore,
    private readonly orderRepository: EventSourcedRepository<Order>
  ) {}

  async handle(command: CreateOrderCommand): Promise<void> {
    // 1. Aggregate 로드 (이벤트 재생)
    const order = await this.orderRepository.load(command.orderId)

    // 2. 비즈니스 로직 실행 (이벤트 생성)
    order.create(command.customerId, command.items)

    // 3. 이벤트 저장
    await this.orderRepository.save(order)
  }
}

3. Event Store 설계와 구현

3.1 이벤트 스토어 스키마

이벤트 스토어의 핵심 테이블 구조는 다음과 같다.

CREATE TABLE events (
    -- 글로벌 고유 식별자
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    -- 이벤트가 속한 스트림(Aggregate) 식별자
    stream_id       VARCHAR(255) NOT NULL,
    -- 스트림 내 이벤트 순서 (낙관적 동시성 제어에 사용)
    stream_version  BIGINT NOT NULL,
    -- 이벤트 타입 (역직렬화에 사용)
    event_type      VARCHAR(255) NOT NULL,
    -- 이벤트 페이로드 (JSON)
    event_data      JSONB NOT NULL,
    -- 메타데이터 (상관관계 ID, 사용자 정보 등)
    metadata        JSONB DEFAULT '{}',
    -- 이벤트 발생 시각
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    -- 글로벌 순서 (프로젝션에서 전체 순서 추적에 사용)
    global_position BIGSERIAL NOT NULL,

    -- 같은 스트림 내에서 버전 중복 방지 (낙관적 동시성 제어)
    UNIQUE(stream_id, stream_version)
);

-- 스트림별 이벤트 조회 인덱스
CREATE INDEX idx_events_stream ON events(stream_id, stream_version);
-- 전체 이벤트 순서 인덱스 (프로젝션용)
CREATE INDEX idx_events_global ON events(global_position);
-- 이벤트 타입별 조회 인덱스
CREATE INDEX idx_events_type ON events(event_type);

3.2 TypeScript Event Store 구현

import { Pool } from 'pg'

interface DomainEvent {
  eventType: string
  data: Record<string, unknown>
  metadata?: Record<string, unknown>
}

interface StoredEvent extends DomainEvent {
  eventId: string
  streamId: string
  streamVersion: number
  globalPosition: number
  createdAt: Date
}

class PostgresEventStore {
  constructor(private readonly pool: Pool) {}

  /**
   * 이벤트를 스트림에 추가한다.
   * expectedVersion으로 낙관적 동시성 제어를 수행한다.
   */
  async appendToStream(
    streamId: string,
    expectedVersion: number,
    events: DomainEvent[]
  ): Promise<StoredEvent[]> {
    const client = await this.pool.connect()
    try {
      await client.query('BEGIN')

      // 낙관적 동시성 검사
      const versionCheck = await client.query(
        'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',
        [streamId]
      )
      const currentVersion = versionCheck.rows[0].current_version ?? -1

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current version is ${currentVersion}`
        )
      }

      const storedEvents: StoredEvent[] = []
      for (let i = 0; i < events.length; i++) {
        const event = events[i]
        const version = expectedVersion + i + 1

        const result = await client.query(
          `INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)
           VALUES ($1, $2, $3, $4, $5)
           RETURNING event_id, global_position, created_at`,
          [
            streamId,
            version,
            event.eventType,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata ?? {}),
          ]
        )

        storedEvents.push({
          eventId: result.rows[0].event_id,
          streamId,
          streamVersion: version,
          globalPosition: result.rows[0].global_position,
          createdAt: result.rows[0].created_at,
          ...event,
        })
      }

      await client.query('COMMIT')
      return storedEvents
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }

  /**
   * 스트림의 모든 이벤트를 순서대로 읽는다.
   */
  async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE stream_id = $1 AND stream_version >= $2
       ORDER BY stream_version ASC`,
      [streamId, fromVersion]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  /**
   * 전체 이벤트를 글로벌 순서로 읽는다 (프로젝션용).
   */
  async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE global_position > $1
       ORDER BY global_position ASC
       LIMIT $2`,
      [fromPosition, limit]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  private mapToStoredEvent(row: any): StoredEvent {
    return {
      eventId: row.event_id,
      streamId: row.stream_id,
      streamVersion: row.stream_version,
      globalPosition: row.global_position,
      eventType: row.event_type,
      data: row.event_data,
      metadata: row.metadata,
      createdAt: row.created_at,
    }
  }
}

3.3 전용 Event Store 솔루션 비교

항목EventStoreDB (Kurrent)Axon ServerPostgreSQL 직접 구현
타입전용 이벤트 DBCQRS 프레임워크 서버범용 RDBMS
프로젝션서버 사이드 JavaScriptTracking Processor직접 구현 필요
구독 모델Catch-up, PersistentTracking, SubscribingPolling, LISTEN/NOTIFY
클러스터링내장 (Gossip 프로토콜)Axon Server Enterprise외부 솔루션 필요
학습 곡선중간높음 (프레임워크 전체)낮음 (SQL 기반)
유연성중간낮음 (Axon 생태계 종속)높음
운영 복잡도중간높음낮음~중간

4. Aggregate와 Domain Event 구현

4.1 Aggregate Base Class

Event Sourcing에서 Aggregate는 이벤트를 발행하고 이벤트를 재생하여 상태를 복원하는 핵심 단위다.

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  private _version: number = -1

  get version(): number {
    return this._version
  }

  /**
   * 외부에서 호출하여 상태를 변경한다.
   * 내부적으로 이벤트를 생성하고 적용한다.
   */
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.uncommittedEvents.push(event)
    this._version++
  }

  /**
   * 이벤트에 따라 내부 상태를 변경하는 핸들러.
   * 하위 클래스에서 구현한다.
   */
  protected abstract when(event: DomainEvent): void

  /**
   * 저장된 이벤트를 재생하여 상태를 복원한다.
   */
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this._version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

4.2 Order Aggregate 구현

// Domain Events
interface OrderCreated {
  eventType: 'OrderCreated'
  data: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
    totalAmount: number
  }
}

interface OrderConfirmed {
  eventType: 'OrderConfirmed'
  data: {
    orderId: string
    confirmedAt: string
  }
}

interface OrderCancelled {
  eventType: 'OrderCancelled'
  data: {
    orderId: string
    reason: string
    cancelledAt: string
  }
}

type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled

// Order Aggregate
class Order extends EventSourcedAggregate {
  private orderId!: string
  private customerId!: string
  private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []
  private totalAmount: number = 0
  private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'

  static create(
    orderId: string,
    customerId: string,
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  ): Order {
    const order = new Order()
    const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    // 비즈니스 규칙 검증
    if (items.length === 0) {
      throw new Error('주문에는 최소 1개 이상의 상품이 필요합니다')
    }
    if (totalAmount <= 0) {
      throw new Error('주문 총액은 0보다 커야 합니다')
    }

    order.apply({
      eventType: 'OrderCreated',
      data: { orderId, customerId, items, totalAmount },
    })

    return order
  }

  confirm(): void {
    if (this.status !== 'CREATED') {
      throw new Error('CREATED 상태의 주문만 확인할 수 있습니다')
    }
    this.apply({
      eventType: 'OrderConfirmed',
      data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },
    })
  }

  cancel(reason: string): void {
    if (this.status === 'CANCELLED') {
      throw new Error('이미 취소된 주문입니다')
    }
    this.apply({
      eventType: 'OrderCancelled',
      data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },
    })
  }

  protected when(event: DomainEvent): void {
    const orderEvent = event as OrderEvent
    switch (orderEvent.eventType) {
      case 'OrderCreated':
        this.orderId = orderEvent.data.orderId
        this.customerId = orderEvent.data.customerId
        this.items = orderEvent.data.items
        this.totalAmount = orderEvent.data.totalAmount
        this.status = 'CREATED'
        break
      case 'OrderConfirmed':
        this.status = 'CONFIRMED'
        break
      case 'OrderCancelled':
        this.status = 'CANCELLED'
        break
    }
  }
}

5. Projection과 Read Model

5.1 프로젝션의 역할

프로젝션(Projection)은 이벤트 스트림을 구독하여 **읽기에 최적화된 뷰(Read Model)**를 구축하는 프로세스다. Event Sourcing의 이벤트 스트림에서 직접 조회하는 것은 비효율적이므로, 다양한 조회 요구사항에 맞는 비정규화된 뷰를 별도로 유지한다.

5.2 Projection Engine 구현

interface ProjectionCheckpoint {
  projectionName: string
  lastProcessedPosition: number
}

abstract class Projection {
  abstract readonly name: string

  abstract handle(event: StoredEvent): Promise<void>

  canHandle(eventType: string): boolean {
    return this.handledEventTypes().includes(eventType)
  }

  abstract handledEventTypes(): string[]
}

class ProjectionEngine {
  private projections: Projection[] = []

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly checkpointStore: CheckpointStore
  ) {}

  register(projection: Projection): void {
    this.projections.push(projection)
  }

  /**
   * 등록된 모든 프로젝션을 실행한다.
   * 각 프로젝션은 마지막으로 처리한 위치부터 재개한다.
   */
  async run(): Promise<void> {
    while (true) {
      for (const projection of this.projections) {
        const checkpoint = await this.checkpointStore.get(projection.name)
        const lastPosition = checkpoint?.lastProcessedPosition ?? 0

        const events = await this.eventStore.readAll(lastPosition, 100)

        for (const event of events) {
          if (projection.canHandle(event.eventType)) {
            try {
              await projection.handle(event)
            } catch (error) {
              console.error(
                `Projection ${projection.name} failed at position ${event.globalPosition}:`,
                error
              )
              // 실패 시 해당 프로젝션 중단, 다른 프로젝션은 계속 진행
              break
            }
          }
          await this.checkpointStore.save({
            projectionName: projection.name,
            lastProcessedPosition: event.globalPosition,
          })
        }
      }
      // 폴링 간격
      await new Promise((resolve) => setTimeout(resolve, 500))
    }
  }
}

5.3 주문 대시보드 Read Model

class OrderDashboardProjection extends Projection {
  readonly name = 'order-dashboard'

  constructor(private readonly db: Pool) {
    super()
  }

  handledEventTypes(): string[] {
    return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']
  }

  async handle(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.db.query(
          `INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)
           VALUES ($1, $2, $3, 'CREATED', $4)
           ON CONFLICT (order_id) DO NOTHING`,
          [event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]
        )
        break

      case 'OrderConfirmed':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.confirmedAt]
        )
        break

      case 'OrderCancelled':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.reason]
        )
        break
    }
  }
}

6. Saga 패턴과 분산 트랜잭션

6.1 Saga란 무엇인가

Saga 패턴은 여러 서비스에 걸친 트랜잭션을 일련의 로컬 트랜잭션과 보상 트랜잭션으로 관리하는 패턴이다. 전통적인 2PC(Two-Phase Commit)와 달리 장시간 락을 잡지 않으므로 확장성이 뛰어나다.

두 가지 구현 방식이 있다.

  • Choreography(안무): 각 서비스가 이벤트를 발행하고 다른 서비스가 반응한다. 중앙 조율자가 없다.
  • Orchestration(오케스트레이션): 중앙 오케스트레이터가 전체 흐름을 관리하고 각 서비스에 명령을 전달한다.

6.2 Orchestration Saga 구현

// Saga 상태 정의
type OrderSagaStatus =
  | 'STARTED'
  | 'PAYMENT_PENDING'
  | 'PAYMENT_COMPLETED'
  | 'INVENTORY_RESERVED'
  | 'COMPLETED'
  | 'COMPENSATING'
  | 'FAILED'

interface SagaStep {
  name: string
  execute: () => Promise<void>
  compensate: () => Promise<void>
}

class OrderSaga {
  private status: OrderSagaStatus = 'STARTED'
  private completedSteps: SagaStep[] = []

  constructor(
    private readonly orderId: string,
    private readonly paymentService: PaymentService,
    private readonly inventoryService: InventoryService,
    private readonly sagaLog: SagaLogStore
  ) {}

  async execute(orderData: {
    customerId: string
    items: any[]
    totalAmount: number
  }): Promise<void> {
    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        execute: async () => {
          await this.paymentService.processPayment(
            this.orderId,
            orderData.customerId,
            orderData.totalAmount
          )
          this.status = 'PAYMENT_COMPLETED'
        },
        compensate: async () => {
          await this.paymentService.refundPayment(this.orderId)
        },
      },
      {
        name: 'ReserveInventory',
        execute: async () => {
          await this.inventoryService.reserve(this.orderId, orderData.items)
          this.status = 'INVENTORY_RESERVED'
        },
        compensate: async () => {
          await this.inventoryService.releaseReservation(this.orderId)
        },
      },
    ]

    for (const step of steps) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')
        await step.execute()
        await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')
        this.completedSteps.push(step)
      } catch (error) {
        await this.sagaLog.record(this.orderId, step.name, 'FAILED')
        console.error(`Saga step ${step.name} failed:`, error)

        // 보상 트랜잭션 실행 (역순)
        this.status = 'COMPENSATING'
        await this.compensate()
        this.status = 'FAILED'
        return
      }
    }

    this.status = 'COMPLETED'
    await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')
  }

  private async compensate(): Promise<void> {
    // 완료된 단계를 역순으로 보상
    const stepsToCompensate = [...this.completedSteps].reverse()
    for (const step of stepsToCompensate) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')
        await step.compensate()
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')
      } catch (compensateError) {
        // 보상 실패 시 수동 개입 필요
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')
        console.error(`Compensation failed for step ${step.name}:`, compensateError)
      }
    }
  }
}

6.3 Choreography vs Orchestration 비교

항목ChoreographyOrchestration
결합도낮음 (이벤트 기반)중간 (오케스트레이터 의존)
가시성낮음 (흐름 추적 어려움)높음 (중앙에서 흐름 관리)
복잡도서비스 수 증가 시 급격히 상승오케스트레이터에 집중
단일 장애점없음오케스트레이터
테스트 용이성어려움상대적으로 용이
적합한 경우2~3개 서비스의 단순 흐름4개 이상 서비스의 복잡한 흐름

7. 스냅샷과 이벤트 압축

7.1 스냅샷의 필요성

Aggregate의 이벤트가 수천, 수만 건에 이르면 상태 복원 시간이 급격히 증가한다. 스냅샷은 특정 시점의 Aggregate 상태를 직렬화하여 저장하고, 이후 이벤트만 재생하는 최적화 기법이다.

interface Snapshot {
  streamId: string
  version: number
  state: Record<string, unknown>
  createdAt: Date
}

class SnapshotRepository {
  constructor(private readonly pool: Pool) {}

  async save(snapshot: Snapshot): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (stream_id, version, state, created_at)
       VALUES ($1, $2, $3, $4)
       ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
      [snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
    )
  }

  async load(streamId: string): Promise<Snapshot | null> {
    const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])
    return result.rows.length > 0 ? result.rows[0] : null
  }
}

class EventSourcedRepository<T extends EventSourcedAggregate> {
  private readonly SNAPSHOT_INTERVAL = 100 // 100개 이벤트마다 스냅샷

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly snapshotRepo: SnapshotRepository,
    private readonly factory: () => T
  ) {}

  async load(streamId: string): Promise<T> {
    const aggregate = this.factory()

    // 1. 스냅샷 로드 시도
    const snapshot = await this.snapshotRepo.load(streamId)
    let fromVersion = 0

    if (snapshot) {
      ;(aggregate as any).restoreFromSnapshot(snapshot.state)
      ;(aggregate as any)._version = snapshot.version
      fromVersion = snapshot.version + 1
    }

    // 2. 스냅샷 이후 이벤트만 재생
    const events = await this.eventStore.readStream(streamId, fromVersion)
    aggregate.loadFromHistory(events)

    return aggregate
  }

  async save(aggregate: T, streamId: string): Promise<void> {
    const uncommittedEvents = aggregate.getUncommittedEvents()
    if (uncommittedEvents.length === 0) return

    await this.eventStore.appendToStream(
      streamId,
      aggregate.version - uncommittedEvents.length,
      uncommittedEvents
    )

    // 스냅샷 생성 여부 확인
    if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotRepo.save({
        streamId,
        version: aggregate.version,
        state: (aggregate as any).toSnapshot(),
        createdAt: new Date(),
      })
    }

    aggregate.clearUncommittedEvents()
  }
}

7.2 스냅샷 전략 가이드라인

  • 생성 빈도: 100~1,000개 이벤트마다 (도메인 특성에 따라 조정)
  • 저장 위치: 같은 스트림 또는 별도 저장소 (별도 저장소 권장)
  • 생성 시점: 비동기 백그라운드에서 생성하여 쓰기 지연시간에 영향을 주지 않도록 한다
  • 재생 시간 100ms 초과 시: 스냅샷 도입을 적극 검토한다

8. Event Sourcing vs 전통 CRUD 비교

항목Event Sourcing전통 CRUD
데이터 저장이벤트(변경 기록) 저장현재 상태만 저장
이력 추적완전한 감사 추적 자동 제공별도 감사 테이블 필요
시간 여행 쿼리특정 시점 상태 복원 가능별도 스냅샷 테이블 필요
쿼리 복잡도높음 (Read Model 별도 구축)낮음 (SQL 직접 조회)
일관성 모델주로 최종 일관성즉시 일관성
스키마 변경이벤트 버전 관리 필요ALTER TABLE
디버깅이벤트 재생으로 문제 재현현재 상태만 확인 가능
학습 곡선높음낮음
적합한 도메인금융, 주문, 물류 등 이력 중요 도메인설정 관리, 카탈로그 등 단순 CRUD

9. 장애 사례와 복구 절차

9.1 이벤트 스토어 장애

증상: 이벤트 저장 실패로 Command 처리 중단

복구 절차:

  1. DB 연결 상태 확인 및 장애 원인 파악
  2. 이벤트 스토어 복구 후 미처리 Command 재시도
  3. 글로벌 포지션 갭 확인 - 갭이 발생한 경우 프로젝션 무결성 검증

9.2 프로젝션 동기화 지연

증상: Read Model이 최신 상태를 반영하지 않음

복구 절차:

  1. 프로젝션 체크포인트와 이벤트 스토어 최신 포지션 비교
  2. 지연 원인 파악 (처리 속도, 장애, 독점 이벤트 등)
  3. 필요시 프로젝션 초기화 후 처음부터 재구축 (rebuild)

9.3 Saga 보상 실패

증상: 보상 트랜잭션 실행 중 외부 서비스 장애

복구 절차:

  1. Saga 로그에서 COMPENSATION_FAILED 상태 조회
  2. 실패한 보상 단계를 수동으로 재시도
  3. 재시도 횟수 초과 시 Dead Letter Queue에 기록하고 수동 개입

9.4 이벤트 스키마 변경

증상: 이벤트 구조가 변경되어 기존 이벤트 역직렬화 실패

복구 절차:

  1. Upcaster 패턴으로 구버전 이벤트를 신버전으로 변환
  2. 프로젝션에서 두 버전 모두 처리 가능하도록 핸들러 수정
  3. 전체 프로젝션 rebuild 실행

10. 운영 체크리스트

도입 전 검토

  • 도메인에 이벤트 소싱이 정말 필요한가? (이력 추적, 감사, 시간 여행 쿼리 필요성)
  • 최종 일관성을 수용할 수 있는가?
  • 팀이 DDD와 이벤트 기반 사고에 익숙한가?

설계 단계

  • 이벤트는 비즈니스 의도를 담고 있는가? (속성 변경이 아닌 도메인 행위)
  • Aggregate 경계가 적절한가? (너무 크거나 작지 않은지)
  • 이벤트 스키마 버전 관리 전략이 수립되어 있는가?
  • 프로젝션 rebuild 전략이 준비되어 있는가?

운영 단계

  • 이벤트 스토어 용량 모니터링 및 아카이빙 전략
  • 프로젝션 지연 시간(lag) 모니터링
  • Saga 실패율 및 보상 트랜잭션 성공률 추적
  • 스냅샷 생성 주기와 디스크 사용량 관리
  • 이벤트 스트림별 이벤트 수 모니터링 (비정상 증가 감지)

장애 대비

  • 프로젝션 전체 rebuild 절차와 예상 소요 시간 문서화
  • Saga Dead Letter Queue 처리 프로세스 수립
  • 이벤트 스토어 백업 및 복구 절차 검증
  • Upcaster 테스트 자동화

마치며

Event Sourcing과 CQRS는 강력한 아키텍처 패턴이지만, 모든 시스템에 적합한 것은 아니다. 이력 추적, 감사 로그, 시간 여행 쿼리가 핵심 요구사항인 도메인(금융, 주문, 물류 등)에서 가장 큰 가치를 발휘한다.

도입을 검토한다면 다음을 권장한다.

  1. 핵심 도메인에만 선택적으로 적용하라. 모든 서비스에 Event Sourcing을 적용할 필요는 없다.
  2. 프로젝션 rebuild를 처음부터 설계하라. 운영 중 프로젝션 재구축은 반드시 필요하게 된다.
  3. 이벤트 스키마 진화 전략을 먼저 수립하라. 이벤트는 영원히 저장되므로, 스키마 변경 비용이 가장 크다.
  4. Saga 보상 실패에 대비하라. 분산 시스템에서 보상이 실패하는 것은 시간 문제다.

최종 일관성을 수용하고, 이벤트 기반 사고로 전환할 준비가 되어 있다면, Event Sourcing + CQRS는 시스템의 복원력과 확장성을 근본적으로 향상시키는 아키텍처 선택이 될 것이다.

Event Sourcing + CQRS Architecture Implementation Guide: From Event Store to Projections and Saga Patterns

Event Sourcing CQRS

Introduction

Traditional CRUD systems store only the current state of data. When an UPDATE executes, the previous value is lost. When a DELETE executes, the data itself is destroyed. You can see that a bank account balance is 1,000 dollars, but you cannot tell what sequence of deposits and withdrawals led to that balance.

Event Sourcing solves this problem at its root. It records state changes themselves as events, and the current state is reconstructed by replaying those events sequentially. CQRS builds on this structure by fully separating write (Command) and read (Query) concerns, optimizing each for its specific requirements.

This guide walks through the complete implementation of an architecture combining Event Sourcing and CQRS. We cover event store design, Aggregate implementation, projection construction, distributed transaction management with the Saga pattern, snapshot optimization, and real-world failure scenarios with recovery procedures.

1. Event Sourcing Core Principles

1.1 What Is an Event?

In Event Sourcing, an event is an immutable fact that occurred in the past. Events are named in the past tense and must clearly convey business intent.

Compare poorly designed events with well-designed ones:

  • Bad examples: NameChanged, EmailChanged - property-level events carry no business meaning
  • Good examples: CustomerRelocated, OrderConfirmed - events that express domain behavior

1.2 How State Reconstruction Works

The current state is computed by sequentially applying (replaying) all events to an initial state.

Current State = fold(Initial State, [Event1, Event2, ..., EventN])

For example, reconstructing a bank account state looks like this:

Initial balance: $0
  -> AccountOpened(amount: $0)         => Balance: $0
  -> MoneyDeposited(amount: $1,000)    => Balance: $1,000
  -> MoneyWithdrawn(amount: $300)      => Balance: $700
  -> MoneyDeposited(amount: $500)      => Balance: $1,200

1.3 The Immutability Principle

Once stored, events can never be modified or deleted. If an incorrect event was published, a compensating event must be issued to logically reverse it.

// When an incorrect withdrawal occurred - do not delete the event
// Instead, publish a compensating event
interface MoneyWithdrawnCompensated {
  type: 'MoneyWithdrawnCompensated'
  originalEventId: string
  amount: number
  reason: string
  timestamp: Date
}

2. CQRS Architecture Pattern

2.1 Separating Commands and Queries

The core idea of CQRS is straightforward: separate the model that modifies data from the model that reads data.

[Client]
    |
    |-- Command(write) --> [Command Handler] --> [Write Model / Event Store]
    |                                                     |
    |                                              Events published
    |                                                     |
    |                                                     v
    |                                           [Projection Engine]
    |                                                     |
    |                                                     v
    +-- Query(read) ----> [Query Handler] ----> [Read Model / View DB]

2.2 Why Separate?

Read and write requirements are fundamentally different:

  • Write (Command): Requires business rule validation, domain invariant enforcement, and transactional consistency
  • Read (Query): Requires diverse view support, low latency, and high throughput

Trying to satisfy both sides with a single model forces compromises on both. CQRS enables independent optimization of each model.

2.3 Command Handler Implementation

// Command definition
interface CreateOrderCommand {
  orderId: string
  customerId: string
  items: Array<{ productId: string; quantity: number; price: number }>
}

// Command Handler
class OrderCommandHandler {
  constructor(
    private readonly eventStore: EventStore,
    private readonly orderRepository: EventSourcedRepository<Order>
  ) {}

  async handle(command: CreateOrderCommand): Promise<void> {
    // 1. Load Aggregate (replay events)
    const order = await this.orderRepository.load(command.orderId)

    // 2. Execute business logic (generate events)
    order.create(command.customerId, command.items)

    // 3. Persist events
    await this.orderRepository.save(order)
  }
}

3. Event Store Design and Implementation

3.1 Event Store Schema

The core table structure for an event store is as follows:

CREATE TABLE events (
    -- Global unique identifier
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    -- Stream (Aggregate) identifier
    stream_id       VARCHAR(255) NOT NULL,
    -- Event sequence within the stream (used for optimistic concurrency)
    stream_version  BIGINT NOT NULL,
    -- Event type (used for deserialization)
    event_type      VARCHAR(255) NOT NULL,
    -- Event payload (JSON)
    event_data      JSONB NOT NULL,
    -- Metadata (correlation ID, user info, etc.)
    metadata        JSONB DEFAULT '{}',
    -- Event timestamp
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    -- Global ordering (used by projections to track overall sequence)
    global_position BIGSERIAL NOT NULL,

    -- Prevent version duplication within the same stream (optimistic concurrency)
    UNIQUE(stream_id, stream_version)
);

-- Index for stream-level event retrieval
CREATE INDEX idx_events_stream ON events(stream_id, stream_version);
-- Index for global event ordering (projections)
CREATE INDEX idx_events_global ON events(global_position);
-- Index for event type lookup
CREATE INDEX idx_events_type ON events(event_type);

3.2 TypeScript Event Store Implementation

import { Pool } from 'pg'

interface DomainEvent {
  eventType: string
  data: Record<string, unknown>
  metadata?: Record<string, unknown>
}

interface StoredEvent extends DomainEvent {
  eventId: string
  streamId: string
  streamVersion: number
  globalPosition: number
  createdAt: Date
}

class PostgresEventStore {
  constructor(private readonly pool: Pool) {}

  /**
   * Appends events to a stream.
   * Uses expectedVersion for optimistic concurrency control.
   */
  async appendToStream(
    streamId: string,
    expectedVersion: number,
    events: DomainEvent[]
  ): Promise<StoredEvent[]> {
    const client = await this.pool.connect()
    try {
      await client.query('BEGIN')

      // Optimistic concurrency check
      const versionCheck = await client.query(
        'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',
        [streamId]
      )
      const currentVersion = versionCheck.rows[0].current_version ?? -1

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current version is ${currentVersion}`
        )
      }

      const storedEvents: StoredEvent[] = []
      for (let i = 0; i < events.length; i++) {
        const event = events[i]
        const version = expectedVersion + i + 1

        const result = await client.query(
          `INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)
           VALUES ($1, $2, $3, $4, $5)
           RETURNING event_id, global_position, created_at`,
          [
            streamId,
            version,
            event.eventType,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata ?? {}),
          ]
        )

        storedEvents.push({
          eventId: result.rows[0].event_id,
          streamId,
          streamVersion: version,
          globalPosition: result.rows[0].global_position,
          createdAt: result.rows[0].created_at,
          ...event,
        })
      }

      await client.query('COMMIT')
      return storedEvents
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }

  /**
   * Reads all events from a stream in order.
   */
  async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE stream_id = $1 AND stream_version >= $2
       ORDER BY stream_version ASC`,
      [streamId, fromVersion]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  /**
   * Reads all events in global order (for projections).
   */
  async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE global_position > $1
       ORDER BY global_position ASC
       LIMIT $2`,
      [fromPosition, limit]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  private mapToStoredEvent(row: any): StoredEvent {
    return {
      eventId: row.event_id,
      streamId: row.stream_id,
      streamVersion: row.stream_version,
      globalPosition: row.global_position,
      eventType: row.event_type,
      data: row.event_data,
      metadata: row.metadata,
      createdAt: row.created_at,
    }
  }
}

3.3 Dedicated Event Store Solutions Comparison

AspectEventStoreDB (Kurrent)Axon ServerPostgreSQL Custom
TypeDedicated Event DBCQRS Framework ServerGeneral-purpose RDBMS
ProjectionsServer-side JavaScriptTracking ProcessorMust build manually
Subscription ModelCatch-up, PersistentTracking, SubscribingPolling, LISTEN/NOTIFY
ClusteringBuilt-in (Gossip protocol)Axon Server EnterpriseExternal solution needed
Learning CurveMediumHigh (entire framework)Low (SQL-based)
FlexibilityMediumLow (Axon ecosystem lock-in)High
Operational ComplexityMediumHighLow to Medium

4. Aggregate and Domain Event Implementation

4.1 Aggregate Base Class

In Event Sourcing, the Aggregate is the core unit that publishes events and reconstructs state by replaying events.

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  private _version: number = -1

  get version(): number {
    return this._version
  }

  /**
   * Called externally to change state.
   * Internally creates and applies an event.
   */
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.uncommittedEvents.push(event)
    this._version++
  }

  /**
   * Handler that changes internal state based on the event.
   * Implemented by subclasses.
   */
  protected abstract when(event: DomainEvent): void

  /**
   * Restores state by replaying stored events.
   */
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this._version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

4.2 Order Aggregate Implementation

// Domain Events
interface OrderCreated {
  eventType: 'OrderCreated'
  data: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
    totalAmount: number
  }
}

interface OrderConfirmed {
  eventType: 'OrderConfirmed'
  data: {
    orderId: string
    confirmedAt: string
  }
}

interface OrderCancelled {
  eventType: 'OrderCancelled'
  data: {
    orderId: string
    reason: string
    cancelledAt: string
  }
}

type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled

// Order Aggregate
class Order extends EventSourcedAggregate {
  private orderId!: string
  private customerId!: string
  private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []
  private totalAmount: number = 0
  private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'

  static create(
    orderId: string,
    customerId: string,
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  ): Order {
    const order = new Order()
    const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    // Business rule validation
    if (items.length === 0) {
      throw new Error('An order must contain at least one item')
    }
    if (totalAmount <= 0) {
      throw new Error('Order total must be greater than zero')
    }

    order.apply({
      eventType: 'OrderCreated',
      data: { orderId, customerId, items, totalAmount },
    })

    return order
  }

  confirm(): void {
    if (this.status !== 'CREATED') {
      throw new Error('Only orders in CREATED status can be confirmed')
    }
    this.apply({
      eventType: 'OrderConfirmed',
      data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },
    })
  }

  cancel(reason: string): void {
    if (this.status === 'CANCELLED') {
      throw new Error('Order is already cancelled')
    }
    this.apply({
      eventType: 'OrderCancelled',
      data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },
    })
  }

  protected when(event: DomainEvent): void {
    const orderEvent = event as OrderEvent
    switch (orderEvent.eventType) {
      case 'OrderCreated':
        this.orderId = orderEvent.data.orderId
        this.customerId = orderEvent.data.customerId
        this.items = orderEvent.data.items
        this.totalAmount = orderEvent.data.totalAmount
        this.status = 'CREATED'
        break
      case 'OrderConfirmed':
        this.status = 'CONFIRMED'
        break
      case 'OrderCancelled':
        this.status = 'CANCELLED'
        break
    }
  }
}

5. Projections and Read Models

5.1 The Role of Projections

A Projection subscribes to event streams and builds read-optimized views (Read Models). Since querying directly from the Event Sourcing event stream is inefficient, we maintain separate denormalized views tailored to various query requirements.

5.2 Projection Engine Implementation

interface ProjectionCheckpoint {
  projectionName: string
  lastProcessedPosition: number
}

abstract class Projection {
  abstract readonly name: string

  abstract handle(event: StoredEvent): Promise<void>

  canHandle(eventType: string): boolean {
    return this.handledEventTypes().includes(eventType)
  }

  abstract handledEventTypes(): string[]
}

class ProjectionEngine {
  private projections: Projection[] = []

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly checkpointStore: CheckpointStore
  ) {}

  register(projection: Projection): void {
    this.projections.push(projection)
  }

  /**
   * Runs all registered projections.
   * Each projection resumes from its last processed position.
   */
  async run(): Promise<void> {
    while (true) {
      for (const projection of this.projections) {
        const checkpoint = await this.checkpointStore.get(projection.name)
        const lastPosition = checkpoint?.lastProcessedPosition ?? 0

        const events = await this.eventStore.readAll(lastPosition, 100)

        for (const event of events) {
          if (projection.canHandle(event.eventType)) {
            try {
              await projection.handle(event)
            } catch (error) {
              console.error(
                `Projection ${projection.name} failed at position ${event.globalPosition}:`,
                error
              )
              break
            }
          }
          await this.checkpointStore.save({
            projectionName: projection.name,
            lastProcessedPosition: event.globalPosition,
          })
        }
      }
      // Polling interval
      await new Promise((resolve) => setTimeout(resolve, 500))
    }
  }
}

5.3 Order Dashboard Read Model

class OrderDashboardProjection extends Projection {
  readonly name = 'order-dashboard'

  constructor(private readonly db: Pool) {
    super()
  }

  handledEventTypes(): string[] {
    return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']
  }

  async handle(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.db.query(
          `INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)
           VALUES ($1, $2, $3, 'CREATED', $4)
           ON CONFLICT (order_id) DO NOTHING`,
          [event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]
        )
        break

      case 'OrderConfirmed':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.confirmedAt]
        )
        break

      case 'OrderCancelled':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.reason]
        )
        break
    }
  }
}

6. Saga Pattern and Distributed Transactions

6.1 What Is a Saga?

The Saga pattern manages transactions spanning multiple services as a series of local transactions with compensating transactions. Unlike traditional 2PC (Two-Phase Commit), it avoids long-lived locks, providing superior scalability.

Two implementation approaches exist:

  • Choreography: Each service publishes events and other services react. No central coordinator.
  • Orchestration: A central orchestrator manages the overall flow and dispatches commands to each service.

6.2 Orchestration Saga Implementation

// Saga state definition
type OrderSagaStatus =
  | 'STARTED'
  | 'PAYMENT_PENDING'
  | 'PAYMENT_COMPLETED'
  | 'INVENTORY_RESERVED'
  | 'COMPLETED'
  | 'COMPENSATING'
  | 'FAILED'

interface SagaStep {
  name: string
  execute: () => Promise<void>
  compensate: () => Promise<void>
}

class OrderSaga {
  private status: OrderSagaStatus = 'STARTED'
  private completedSteps: SagaStep[] = []

  constructor(
    private readonly orderId: string,
    private readonly paymentService: PaymentService,
    private readonly inventoryService: InventoryService,
    private readonly sagaLog: SagaLogStore
  ) {}

  async execute(orderData: {
    customerId: string
    items: any[]
    totalAmount: number
  }): Promise<void> {
    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        execute: async () => {
          await this.paymentService.processPayment(
            this.orderId,
            orderData.customerId,
            orderData.totalAmount
          )
          this.status = 'PAYMENT_COMPLETED'
        },
        compensate: async () => {
          await this.paymentService.refundPayment(this.orderId)
        },
      },
      {
        name: 'ReserveInventory',
        execute: async () => {
          await this.inventoryService.reserve(this.orderId, orderData.items)
          this.status = 'INVENTORY_RESERVED'
        },
        compensate: async () => {
          await this.inventoryService.releaseReservation(this.orderId)
        },
      },
    ]

    for (const step of steps) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')
        await step.execute()
        await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')
        this.completedSteps.push(step)
      } catch (error) {
        await this.sagaLog.record(this.orderId, step.name, 'FAILED')
        console.error(`Saga step ${step.name} failed:`, error)

        // Execute compensating transactions (reverse order)
        this.status = 'COMPENSATING'
        await this.compensate()
        this.status = 'FAILED'
        return
      }
    }

    this.status = 'COMPLETED'
    await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')
  }

  private async compensate(): Promise<void> {
    // Compensate completed steps in reverse order
    const stepsToCompensate = [...this.completedSteps].reverse()
    for (const step of stepsToCompensate) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')
        await step.compensate()
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')
      } catch (compensateError) {
        // Compensation failure requires manual intervention
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')
        console.error(`Compensation failed for step ${step.name}:`, compensateError)
      }
    }
  }
}

6.3 Choreography vs Orchestration Comparison

AspectChoreographyOrchestration
CouplingLow (event-based)Medium (orchestrator dependency)
VisibilityLow (hard to trace flows)High (centralized flow management)
ComplexityGrows rapidly with service countConcentrated in orchestrator
Single Point of FailureNoneThe orchestrator
TestabilityDifficultRelatively easy
Best ForSimple flows across 2-3 servicesComplex flows across 4+ services

7. Snapshots and Event Compaction

7.1 Why Snapshots Are Needed

When an Aggregate accumulates thousands or tens of thousands of events, state reconstruction time increases dramatically. Snapshots are an optimization technique that serializes and stores the Aggregate state at a specific point, then replays only subsequent events.

interface Snapshot {
  streamId: string
  version: number
  state: Record<string, unknown>
  createdAt: Date
}

class SnapshotRepository {
  constructor(private readonly pool: Pool) {}

  async save(snapshot: Snapshot): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (stream_id, version, state, created_at)
       VALUES ($1, $2, $3, $4)
       ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
      [snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
    )
  }

  async load(streamId: string): Promise<Snapshot | null> {
    const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])
    return result.rows.length > 0 ? result.rows[0] : null
  }
}

class EventSourcedRepository<T extends EventSourcedAggregate> {
  private readonly SNAPSHOT_INTERVAL = 100 // Snapshot every 100 events

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly snapshotRepo: SnapshotRepository,
    private readonly factory: () => T
  ) {}

  async load(streamId: string): Promise<T> {
    const aggregate = this.factory()

    // 1. Attempt to load snapshot
    const snapshot = await this.snapshotRepo.load(streamId)
    let fromVersion = 0

    if (snapshot) {
      ;(aggregate as any).restoreFromSnapshot(snapshot.state)
      ;(aggregate as any)._version = snapshot.version
      fromVersion = snapshot.version + 1
    }

    // 2. Replay only events after the snapshot
    const events = await this.eventStore.readStream(streamId, fromVersion)
    aggregate.loadFromHistory(events)

    return aggregate
  }

  async save(aggregate: T, streamId: string): Promise<void> {
    const uncommittedEvents = aggregate.getUncommittedEvents()
    if (uncommittedEvents.length === 0) return

    await this.eventStore.appendToStream(
      streamId,
      aggregate.version - uncommittedEvents.length,
      uncommittedEvents
    )

    // Check whether to create a snapshot
    if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotRepo.save({
        streamId,
        version: aggregate.version,
        state: (aggregate as any).toSnapshot(),
        createdAt: new Date(),
      })
    }

    aggregate.clearUncommittedEvents()
  }
}

7.2 Snapshot Strategy Guidelines

  • Creation frequency: Every 100 to 1,000 events (adjust based on domain characteristics)
  • Storage location: Same stream or separate store (separate store recommended)
  • Creation timing: Create asynchronously in the background to avoid impacting write latency
  • When replay time exceeds 100ms: Actively consider introducing snapshots

8. Event Sourcing vs Traditional CRUD

AspectEvent SourcingTraditional CRUD
Data StorageStores events (change records)Stores current state only
History TrackingComplete audit trail built inRequires separate audit tables
Time-Travel QueriesCan reconstruct state at any pointRequires separate snapshot tables
Query ComplexityHigh (separate Read Models needed)Low (direct SQL queries)
Consistency ModelPrimarily eventual consistencyImmediate consistency
Schema ChangesEvent versioning requiredALTER TABLE
DebuggingReproduce issues via event replayOnly current state visible
Learning CurveHighLow
Suitable DomainsFinance, orders, logistics (history matters)Config management, catalogs (simple CRUD)

9. Failure Scenarios and Recovery Procedures

9.1 Event Store Failure

Symptom: Command processing halts due to event storage failure

Recovery procedure:

  1. Check DB connection status and identify the root cause
  2. After event store recovery, retry unprocessed Commands
  3. Verify global position gaps -- if gaps exist, validate projection integrity

9.2 Projection Sync Lag

Symptom: Read Model does not reflect the latest state

Recovery procedure:

  1. Compare projection checkpoint with the event store's latest position
  2. Identify the lag cause (processing speed, failures, poison events, etc.)
  3. If needed, reset the projection and rebuild from scratch

9.3 Saga Compensation Failure

Symptom: External service failure during compensating transaction execution

Recovery procedure:

  1. Query the Saga log for COMPENSATION_FAILED status entries
  2. Manually retry the failed compensation step
  3. If retry count is exceeded, record in a Dead Letter Queue and escalate for manual intervention

9.4 Event Schema Changes

Symptom: Event structure changes cause deserialization failures for existing events

Recovery procedure:

  1. Use the Upcaster pattern to transform old-version events to the new version
  2. Modify projection handlers to support both versions
  3. Execute a full projection rebuild

10. Operational Checklist

Pre-Adoption Review

  • Does the domain truly need Event Sourcing? (History tracking, auditing, time-travel queries)
  • Can the system accept eventual consistency?
  • Is the team comfortable with DDD and event-driven thinking?

Design Phase

  • Do events carry business intent? (Domain behavior, not property changes)
  • Are Aggregate boundaries appropriate? (Not too large, not too small)
  • Is an event schema versioning strategy established?
  • Is a projection rebuild strategy in place?

Operations Phase

  • Event store capacity monitoring and archiving strategy
  • Projection lag time monitoring
  • Saga failure rate and compensation transaction success rate tracking
  • Snapshot creation frequency and disk usage management
  • Per-stream event count monitoring (detect abnormal growth)

Failure Preparedness

  • Document full projection rebuild procedure and estimated completion time
  • Establish Saga Dead Letter Queue processing workflow
  • Verify event store backup and recovery procedures
  • Automate Upcaster testing

Conclusion

Event Sourcing and CQRS are powerful architectural patterns, but they are not suitable for every system. They deliver the greatest value in domains where history tracking, audit logs, and time-travel queries are core requirements -- such as finance, order management, and logistics.

If you are considering adoption, here are our recommendations:

  1. Apply selectively to core domains only. Not every service needs Event Sourcing.
  2. Design for projection rebuilds from the start. Rebuilding projections in production is inevitable.
  3. Establish an event schema evolution strategy first. Events are stored forever, making schema change the highest-cost operation.
  4. Prepare for Saga compensation failures. In distributed systems, compensation failure is a matter of when, not if.

If your team is ready to embrace eventual consistency and shift to event-driven thinking, Event Sourcing + CQRS can fundamentally improve your system's resilience and scalability.