Skip to content

Split View: 이벤트 소싱과 CQRS 패턴 실전 구현 가이드: 설계부터 운영까지

✨ Learn with Quiz
|

이벤트 소싱과 CQRS 패턴 실전 구현 가이드: 설계부터 운영까지

이벤트 소싱과 CQRS 패턴 실전 구현 가이드: 설계부터 운영까지

왜 이벤트 소싱과 CQRS를 함께 다루는가

이벤트 소싱(Event Sourcing)과 CQRS(Command Query Responsibility Segregation)는 독립적인 패턴이지만, 프로덕션 환경에서는 거의 항상 함께 사용된다. 이벤트 소싱으로 상태 변경의 전체 이력을 이벤트 스트림에 기록하면, 현재 상태를 이벤트 재생(replay)으로만 복원해야 하기 때문에 읽기 성능이 급격히 떨어진다. CQRS로 읽기 전용 프로젝션(Read Model)을 분리하면 이 문제가 해결된다.

이 글은 개념 설명에 머무르지 않는다. EventStoreDB 기반 이벤트 저장소 구축, TypeScript와 Python으로 커맨드 핸들러와 프로젝션을 구현하고, 스냅샷 전략, 이벤트 스키마 버전 관리, 장애 복구 절차까지 운영 레벨의 내용을 모두 다룬다. 2025년 이후 EventStoreDB가 Kurrent로 리브랜딩된 상황과 최신 gRPC 클라이언트 API도 반영했다.

아키텍처 전체 구조

이벤트 소싱 + CQRS 시스템의 전체 흐름은 다음과 같다.

  1. 클라이언트가 커맨드(Command)를 전송한다
  2. 커맨드 핸들러가 도메인 불변식(invariant)을 검증한다
  3. 검증을 통과하면 도메인 이벤트를 생성하고 이벤트 스토어에 append한다
  4. 이벤트 스토어가 **구독자(Subscriber)**에게 이벤트를 푸시한다
  5. 프로젝션 핸들러가 이벤트를 수신하여 **읽기 모델(Read Model)**을 갱신한다
  6. 쿼리 핸들러가 읽기 모델에서 데이터를 조회하여 클라이언트에 반환한다

핵심 원칙: 쓰기 경로(Command Path)와 읽기 경로(Query Path)는 완전히 분리된다. 쓰기 측은 이벤트 스토어에만 쓰고, 읽기 측은 프로젝션된 읽기 모델에서만 조회한다.

이벤트 스토어 솔루션 비교

프로덕션에 사용 가능한 이벤트 스토어 솔루션을 비교한다. 기술 스택과 운영 환경에 맞는 선택이 중요하다.

솔루션언어 생태계저장 방식프로젝션 내장라이선스운영 복잡도
EventStoreDB (Kurrent)언어 무관 (gRPC)전용 파일 시스템있음 (JS 기반)BSL (상용 무료)중간
Axon ServerJVM (Java/Kotlin)전용 스토리지없음 (Framework에서 처리)오픈소스 + Enterprise중간
Marten.NET (C#)PostgreSQL있음 (C# 기반)MIT낮음
EventSourcingDB언어 무관 (gRPC)전용 엔진없음오픈소스낮음
PostgreSQL + 직접 구현언어 무관RDBMS없음 (직접 구현)오픈소스높음
MongoDB + 직접 구현언어 무관Document DB없음 (직접 구현)SSPL높음

EventStoreDB는 Greg Young이 설계한 전용 이벤트 스토어로, 스트림 단위 append-only 저장, 내장 프로젝션, catch-up 구독 등 이벤트 소싱에 필요한 기능을 네이티브로 제공한다. 이 글에서는 EventStoreDB를 기준으로 구현 예제를 작성한다.

도메인 이벤트 설계

이벤트 소싱 시스템에서 도메인 이벤트는 시스템의 가장 중요한 계약(contract)이다. 한번 저장된 이벤트는 절대 수정하거나 삭제하지 않는다. 따라서 이벤트 스키마 설계에 신중해야 한다.

이벤트 설계 원칙

  • 과거형 동사로 명명한다: OrderCreated, PaymentProcessed, ItemShipped
  • 비즈니스 의도를 담는다: ReservationStatusChanged가 아닌 RoomBooked
  • **자기 완결적(self-contained)**이어야 한다: 이벤트 하나로 무슨 일이 일어났는지 완전히 파악 가능해야 한다
  • 단순 타입만 사용한다: string, number, boolean, 배열. Value Object를 이벤트에 직접 넣지 않는다
  • 버전 필드를 포함한다: 스키마 진화에 필수

TypeScript 이벤트 정의

// events/order-events.ts

interface BaseEvent {
  eventType: string
  eventVersion: number
  aggregateId: string
  timestamp: string
  metadata: {
    correlationId: string
    causationId: string
    userId: string
  }
}

interface OrderCreated extends BaseEvent {
  eventType: 'OrderCreated'
  eventVersion: 1
  data: {
    orderId: string
    customerId: string
    items: Array<{
      productId: string
      productName: string
      quantity: number
      unitPrice: number
    }>
    totalAmount: number
    currency: string
    shippingAddress: {
      street: string
      city: string
      zipCode: string
      country: string
    }
  }
}

interface OrderPaymentConfirmed extends BaseEvent {
  eventType: 'OrderPaymentConfirmed'
  eventVersion: 1
  data: {
    orderId: string
    paymentId: string
    amount: number
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
    confirmedAt: string
  }
}

interface OrderCancelled extends BaseEvent {
  eventType: 'OrderCancelled'
  eventVersion: 1
  data: {
    orderId: string
    reason: string
    cancelledBy: string
    refundAmount: number
  }
}

type OrderEvent = OrderCreated | OrderPaymentConfirmed | OrderCancelled

correlationIdcausationId를 메타데이터에 반드시 포함시킨다. 분산 시스템에서 하나의 사용자 요청이 여러 이벤트를 발생시킬 때, 이 두 필드가 없으면 장애 추적이 불가능하다.

커맨드 핸들러와 Aggregate 구현

커맨드 핸들러는 이벤트 소싱 시스템의 쓰기 경로를 담당한다. Aggregate는 도메인 불변식을 지키는 경계이며, 이벤트를 재생하여 현재 상태를 복원한다.

TypeScript Aggregate 구현

// aggregates/order-aggregate.ts

import { EventStoreDBClient, jsonEvent, FORWARDS, START } from '@eventstore/db-client'

interface OrderState {
  orderId: string
  status: 'CREATED' | 'PAID' | 'SHIPPED' | 'CANCELLED'
  customerId: string
  totalAmount: number
  items: Array<{ productId: string; quantity: number; unitPrice: number }>
  version: number
}

class OrderAggregate {
  private state: OrderState
  private pendingEvents: OrderEvent[] = []

  constructor() {
    this.state = {
      orderId: '',
      status: 'CREATED',
      customerId: '',
      totalAmount: 0,
      items: [],
      version: -1,
    }
  }

  // 이벤트 재생으로 상태 복원
  static async load(client: EventStoreDBClient, orderId: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate()
    const streamName = `order-${orderId}`

    const events = client.readStream(streamName, {
      direction: FORWARDS,
      fromRevision: START,
    })

    for await (const resolvedEvent of events) {
      const event = resolvedEvent.event
      if (event) {
        aggregate.apply(event.data as OrderEvent, false)
        aggregate.state.version = Number(resolvedEvent.event!.revision)
      }
    }

    return aggregate
  }

  // 커맨드: 주문 생성
  createOrder(command: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  }): void {
    // 불변식 검증
    if (this.state.orderId !== '') {
      throw new Error(`Order ${command.orderId} already exists`)
    }
    if (command.items.length === 0) {
      throw new Error('Order must contain at least one item')
    }

    const totalAmount = command.items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    const event: OrderCreated = {
      eventType: 'OrderCreated',
      eventVersion: 1,
      aggregateId: command.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: command.orderId,
        customerId: command.customerId,
        items: command.items.map((i) => ({
          ...i,
          productName: '', // 조회 시 채움
        })),
        totalAmount,
        currency: 'KRW',
        shippingAddress: { street: '', city: '', zipCode: '', country: 'KR' },
      },
    }

    this.apply(event, true)
  }

  // 커맨드: 결제 확인
  confirmPayment(
    paymentId: string,
    amount: number,
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
  ): void {
    if (this.state.status !== 'CREATED') {
      throw new Error(`Cannot confirm payment for order in status: ${this.state.status}`)
    }
    if (amount !== this.state.totalAmount) {
      throw new Error(
        `Payment amount ${amount} does not match order total ${this.state.totalAmount}`
      )
    }

    const event: OrderPaymentConfirmed = {
      eventType: 'OrderPaymentConfirmed',
      eventVersion: 1,
      aggregateId: this.state.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: this.state.orderId,
        paymentId,
        amount,
        method,
        confirmedAt: new Date().toISOString(),
      },
    }

    this.apply(event, true)
  }

  // 이벤트 적용 (상태 전이)
  private apply(event: OrderEvent, isNew: boolean): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.state.orderId = event.data.orderId
        this.state.customerId = event.data.customerId
        this.state.totalAmount = event.data.totalAmount
        this.state.items = event.data.items
        this.state.status = 'CREATED'
        break
      case 'OrderPaymentConfirmed':
        this.state.status = 'PAID'
        break
      case 'OrderCancelled':
        this.state.status = 'CANCELLED'
        break
    }

    if (isNew) {
      this.pendingEvents.push(event)
    }
  }

  // 이벤트 스토어에 저장
  async save(client: EventStoreDBClient): Promise<void> {
    if (this.pendingEvents.length === 0) return

    const streamName = `order-${this.state.orderId}`
    const events = this.pendingEvents.map((e) =>
      jsonEvent({ type: e.eventType, data: e.data, metadata: e.metadata })
    )

    await client.appendToStream(streamName, events, {
      expectedRevision: this.state.version === -1 ? 'no_stream' : BigInt(this.state.version),
    })

    this.pendingEvents = []
  }
}

expectedRevision 파라미터가 낙관적 동시성 제어(Optimistic Concurrency Control)의 핵심이다. 같은 Aggregate에 대해 두 커맨드가 동시에 실행되면, 먼저 저장한 쪽이 성공하고 나중 쪽은 WrongExpectedVersionError를 받는다. 재시도 로직에서 이벤트를 다시 로드하고 커맨드를 재실행해야 한다.

프로젝션 설계와 구현

프로젝션은 이벤트 스트림에서 읽기 모델을 구축하는 과정이다. 이벤트 소싱 시스템에서 프로젝션은 "원하는 형태의 뷰를 자유롭게 만들 수 있다"는 핵심 장점을 실현하는 부분이다.

프로젝션 패턴 분류

  • 인라인 프로젝션(Inline Projection): 이벤트 저장과 동시에 읽기 모델을 갱신한다. 강한 일관성을 보장하지만 쓰기 성능이 떨어진다.
  • 비동기 프로젝션(Async Projection): 구독(Subscription)을 통해 비동기로 읽기 모델을 갱신한다. 최종 일관성(Eventual Consistency)이지만 쓰기 성능이 좋다. 대부분의 프로덕션 시스템이 이 방식을 채택한다.
  • 라이브 프로젝션(Live Projection): 매 요청마다 이벤트를 재생한다. 항상 최신 상태를 보장하지만, 이벤트가 많으면 성능이 급격히 떨어진다.

Python 비동기 프로젝션 구현

다음은 Python으로 EventStoreDB의 Persistent Subscription을 이용한 비동기 프로젝션 구현 예제다.

# projections/order_summary_projection.py

import asyncio
import json
from datetime import datetime
from esdbclient import EventStoreDBClient, NewEvent, StreamState
from dataclasses import dataclass, asdict
from typing import Optional
import asyncpg

@dataclass
class OrderSummaryReadModel:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    item_count: int
    created_at: str
    updated_at: str
    payment_method: Optional[str] = None
    cancelled_reason: Optional[str] = None


class OrderSummaryProjection:
    def __init__(self, esdb_client: EventStoreDBClient, pg_pool: asyncpg.Pool):
        self.esdb = esdb_client
        self.pg_pool = pg_pool
        self._checkpoint_interval = 100
        self._processed_count = 0

    async def start(self):
        """catch-up 구독으로 프로젝션 시작"""
        last_position = await self._load_checkpoint()

        subscription = self.esdb.subscribe_to_all(
            from_position=last_position,
            filter_include=[r"order-.*"],  # order- 로 시작하는 스트림만 구독
        )

        for event in subscription:
            try:
                await self._handle_event(event)
                self._processed_count += 1

                # 주기적으로 체크포인트 저장
                if self._processed_count % self._checkpoint_interval == 0:
                    await self._save_checkpoint(event.commit_position)

            except Exception as e:
                print(f"Projection error at position {event.commit_position}: {e}")
                # 에러 발생 시 체크포인트까지 저장하고 재시작
                await self._save_checkpoint(event.commit_position)
                raise

    async def _handle_event(self, event):
        """이벤트 타입별 핸들러 라우팅"""
        handler_map = {
            'OrderCreated': self._on_order_created,
            'OrderPaymentConfirmed': self._on_payment_confirmed,
            'OrderCancelled': self._on_order_cancelled,
        }

        handler = handler_map.get(event.type)
        if handler:
            data = json.loads(event.data)
            await handler(data)

    async def _on_order_created(self, data: dict):
        """주문 생성 이벤트 처리 - 읽기 모델 INSERT"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO order_summary (
                    order_id, customer_id, status, total_amount,
                    item_count, created_at, updated_at
                ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                ON CONFLICT (order_id) DO UPDATE SET
                    status = EXCLUDED.status,
                    updated_at = EXCLUDED.updated_at
            """,
                data['orderId'],
                data['customerId'],
                'CREATED',
                data['totalAmount'],
                len(data['items']),
                datetime.fromisoformat(data.get('createdAt', datetime.now().isoformat())),
                datetime.now(),
            )

    async def _on_payment_confirmed(self, data: dict):
        """결제 확인 이벤트 처리 - 읽기 모델 UPDATE"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'PAID',
                    payment_method = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['method'], datetime.now())

    async def _on_order_cancelled(self, data: dict):
        """주문 취소 이벤트 처리"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'CANCELLED',
                    cancelled_reason = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['reason'], datetime.now())

    async def _load_checkpoint(self) -> Optional[int]:
        """마지막 처리 위치 로드"""
        async with self.pg_pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT position FROM projection_checkpoints WHERE name = $1",
                'order_summary'
            )
            return row['position'] if row else None

    async def _save_checkpoint(self, position: int):
        """처리 위치 저장"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO projection_checkpoints (name, position, updated_at)
                VALUES ($1, $2, NOW())
                ON CONFLICT (name) DO UPDATE SET
                    position = EXCLUDED.position,
                    updated_at = NOW()
            """, 'order_summary', position)

프로젝션 구현에서 가장 중요한 것은 **멱등성(Idempotency)**이다. 위 코드의 ON CONFLICT ... DO UPDATE 구문이 핵심이다. 프로젝션이 중간에 실패하고 재시작하면, 이미 처리한 이벤트를 다시 받을 수 있다. 멱등하게 처리하지 않으면 데이터가 꼬인다.

읽기 모델 테이블 스키마

-- 프로젝션이 사용하는 읽기 모델 테이블
CREATE TABLE order_summary (
    order_id        VARCHAR(36) PRIMARY KEY,
    customer_id     VARCHAR(36) NOT NULL,
    status          VARCHAR(20) NOT NULL,
    total_amount    DECIMAL(15,2) NOT NULL,
    item_count      INTEGER NOT NULL,
    payment_method  VARCHAR(20),
    cancelled_reason TEXT,
    created_at      TIMESTAMP NOT NULL,
    updated_at      TIMESTAMP NOT NULL
);

-- 고객별 주문 조회 최적화 인덱스
CREATE INDEX idx_order_summary_customer ON order_summary(customer_id, status);

-- 상태별 필터링 인덱스
CREATE INDEX idx_order_summary_status ON order_summary(status, created_at DESC);

-- 프로젝션 체크포인트 테이블
CREATE TABLE projection_checkpoints (
    name        VARCHAR(100) PRIMARY KEY,
    position    BIGINT NOT NULL,
    updated_at  TIMESTAMP NOT NULL
);

읽기 모델은 프로젝션 요구사항에 맞게 자유롭게 설계한다. 정규화할 필요가 없다. 고객별 대시보드용, 관리자 통계용, 검색 엔진 인덱싱용 등 여러 프로젝션을 동시에 운영할 수 있다. 새로운 뷰가 필요하면 새 프로젝션을 추가하고 이벤트를 처음부터 재생하면 된다.

스냅샷 전략

Aggregate에 이벤트가 수천, 수만 건 쌓이면 매번 전체 이벤트를 재생하는 것은 비효율적이다. 스냅샷은 특정 시점의 Aggregate 상태를 저장하여 재생 범위를 줄인다.

스냅샷 적용 기준

스냅샷을 너무 일찍 도입하면 복잡도만 올라간다. 다음 기준으로 판단한다.

  • Aggregate당 평균 이벤트 수가 50개 이상이면 스냅샷 도입을 검토한다
  • 이벤트 재생 시간이 100ms를 초과하면 스냅샷이 필요하다
  • 스냅샷 주기는 보통 매 100~500 이벤트마다 생성한다

TypeScript 스냅샷 구현

// snapshots/snapshot-store.ts

interface Snapshot<T> {
  aggregateId: string
  aggregateType: string
  version: number // 스냅샷 시점의 Aggregate 버전
  schemaVersion: number // 스냅샷 스키마 버전 (업그레이드 감지용)
  state: T
  createdAt: string
}

class SnapshotStore {
  private client: EventStoreDBClient
  private snapshotInterval: number

  constructor(client: EventStoreDBClient, snapshotInterval = 200) {
    this.client = client
    this.snapshotInterval = snapshotInterval
  }

  async saveSnapshot<T>(snapshot: Snapshot<T>): Promise<void> {
    const streamName = `snapshot-${snapshot.aggregateType}-${snapshot.aggregateId}`
    const event = jsonEvent({
      type: 'Snapshot',
      data: snapshot,
    })

    // 스냅샷 스트림은 최신 1개만 유지 (maxCount 설정)
    await this.client.appendToStream(streamName, [event])
    await this.client.setStreamMetadata(streamName, {
      maxCount: 3, // 최근 3개만 유지하여 롤백 가능성 확보
    })
  }

  async loadSnapshot<T>(
    aggregateType: string,
    aggregateId: string,
    currentSchemaVersion: number
  ): Promise<Snapshot<T> | null> {
    const streamName = `snapshot-${aggregateType}-${aggregateId}`

    try {
      const events = this.client.readStream(streamName, {
        direction: BACKWARDS,
        fromRevision: END,
        maxCount: 1,
      })

      for await (const resolved of events) {
        const snapshot = resolved.event!.data as Snapshot<T>

        // 스키마 버전이 다르면 스냅샷 무시 (전체 이벤트 재생)
        if (snapshot.schemaVersion !== currentSchemaVersion) {
          console.warn(
            `Snapshot schema mismatch for ${aggregateId}: ` +
              `expected ${currentSchemaVersion}, got ${snapshot.schemaVersion}. ` +
              `Replaying all events.`
          )
          return null
        }

        return snapshot
      }
    } catch (error) {
      // 스냅샷 스트림이 없으면 null 반환
      return null
    }

    return null
  }

  shouldTakeSnapshot(currentVersion: number, lastSnapshotVersion: number): boolean {
    return currentVersion - lastSnapshotVersion >= this.snapshotInterval
  }
}

스냅샷의 schemaVersion 필드에 주목해야 한다. Aggregate 구조가 변경되면 기존 스냅샷은 더 이상 유효하지 않다. schemaVersion이 불일치하면 스냅샷을 무시하고 전체 이벤트를 재생하여 새 스냅샷을 생성한다. 이것이 이벤트 소싱의 장점이다. 이벤트는 불변이므로 언제든 다시 재생할 수 있다.

주의: 스냅샷은 성능 최적화 기법이지, 필수 요소가 아니다. 스냅샷 없이도 시스템은 정상 동작해야 한다. 스냅샷이 손상되거나 무효화되면 전체 이벤트 재생으로 자동 폴백되도록 구현한다.

이벤트 버전 관리 (Event Versioning)

프로덕션 시스템이 진화하면 이벤트 스키마도 변경된다. 이벤트 소싱에서 저장된 이벤트는 절대 수정하지 않으므로, 스키마 변경을 안전하게 처리하는 전략이 필요하다.

버전 관리 전략 비교

전략설명적합한 상황위험도
Weak Schema새 필드 추가 시 기본값 사용, 기존 이벤트 호환필드 추가만 필요한 경우낮음
Upcasting이벤트 역직렬화 시 미들웨어에서 변환필드 이름 변경, 타입 변경중간
New Event Type완전히 새로운 이벤트 타입 정의이벤트 의미 자체가 변경낮음
Copy-Replace Stream스트림을 새 스키마로 복제 후 교체대규모 스키마 마이그레이션높음

Upcasting 구현 예제

가장 실용적인 전략은 Upcasting이다. 이벤트를 역직렬화하는 과정에서 이전 버전을 현재 버전으로 변환하는 미들웨어를 둔다.

// versioning/event-upcaster.ts

type Upcaster = (event: any) => any

// 버전별 업캐스터 등록
const upcasters: Map<string, Map<number, Upcaster>> = new Map()

// OrderCreated v1 -> v2: shippingAddress를 구조화된 객체로 변경
upcasters.set(
  'OrderCreated',
  new Map([
    [
      1,
      (event: any) => {
        // v1에서 shippingAddress가 단일 문자열이었던 경우
        const address =
          typeof event.data.shippingAddress === 'string'
            ? {
                street: event.data.shippingAddress,
                city: 'UNKNOWN',
                zipCode: 'UNKNOWN',
                country: 'KR',
              }
            : event.data.shippingAddress

        return {
          ...event,
          eventVersion: 2,
          data: {
            ...event.data,
            shippingAddress: address,
            // v2에서 추가된 필드에 기본값 적용
            orderSource: event.data.orderSource ?? 'WEB',
          },
        }
      },
    ],
  ])
)

function upcastEvent(event: any): any {
  const eventUpcasters = upcasters.get(event.eventType)
  if (!eventUpcasters) return event

  let current = event
  const targetVersion = Math.max(...Array.from(eventUpcasters.keys())) + 1

  // 현재 버전부터 최신 버전까지 순차 적용
  for (let v = current.eventVersion; v < targetVersion; v++) {
    const upcaster = eventUpcasters.get(v)
    if (upcaster) {
      current = upcaster(current)
    }
  }

  return current
}

// 사용 예
// const rawEvent = loadFromEventStore();
// const currentEvent = upcastEvent(rawEvent);
// aggregate.apply(currentEvent);

이벤트 버전 관리에서 가장 중요한 원칙은 절대 기존 이벤트를 수정하지 않는 것이다. 이벤트 스토어에 저장된 데이터는 불변이다. 대신 읽는 시점에 변환(Upcasting)하거나, 새 이벤트 타입을 정의한다.

장애 복구 절차

이벤트 소싱 시스템에서 장애 복구는 전통적 시스템과 다르다. 이벤트 스토어가 진실의 원천(Single Source of Truth)이므로, 프로젝션은 언제든 재구축할 수 있다.

프로젝션 손상 시 복구

프로젝션이 잘못된 데이터를 포함하거나, 프로젝션 로직에 버그가 있었을 때 사용하는 절차다.

  1. 프로젝션 서비스 중지: 해당 프로젝션의 구독을 중지한다
  2. 읽기 모델 테이블 DROP 또는 TRUNCATE: 기존 잘못된 데이터를 제거한다
  3. 체크포인트 초기화: projection_checkpoints 테이블에서 해당 프로젝션의 position을 삭제한다
  4. 프로젝션 서비스 재시작: 이벤트 스토어의 처음부터 모든 이벤트를 재생하여 읽기 모델을 재구축한다
  5. 재구축 완료 확인: 프로젝션이 현재 위치까지 따라잡았는지(caught-up) 확인한다

경고: 이벤트가 수억 건이면 재구축에 수 시간이 걸릴 수 있다. 병렬 처리 가능한 프로젝션 아키텍처를 미리 설계하거나, 파티션 단위로 재구축할 수 있도록 준비한다.

이벤트 스토어 클러스터 장애 시

EventStoreDB는 리더-팔로워 아키텍처를 사용한다. 리더 노드가 다운되면 팔로워 중 하나가 자동으로 리더로 승격된다. 이 과정에서 주의할 점은 다음과 같다.

  • 쓰기 실패 처리: 리더 전환 중 쓰기가 실패하면 재시도한다. expectedRevision으로 멱등성이 보장된다
  • 구독 재연결: Persistent Subscription은 자동 재연결되지만, Catch-up Subscription은 마지막 체크포인트부터 수동 재연결해야 한다
  • Split-brain 방지: 최소 3노드 클러스터를 운영하여 과반수(quorum) 기반 리더 선출이 가능하도록 한다

일반적인 장애 시나리오와 대응

장애 시나리오원인대응 방법
프로젝션 데이터 불일치프로젝션 로직 버그프로젝션 재구축 (이벤트 전체 재생)
Aggregate 로드 실패이벤트 스트림 손상스냅샷 폴백 후 부분 재생, 클러스터 복제본 확인
동시 쓰기 충돌같은 Aggregate 동시 수정WrongExpectedVersionError 캐치 후 재시도
구독 지연(Consumer Lag)프로젝션 처리 속도 부족파티셔닝 또는 프로젝션 인스턴스 수평 확장
이벤트 스토어 디스크 풀이벤트 증가아카이빙 정책 적용, 스트림 maxAge/maxCount 설정
스냅샷 스키마 불일치Aggregate 구조 변경 후 배포자동 폴백으로 전체 이벤트 재생

운영 체크리스트

프로덕션 배포 전 반드시 확인해야 하는 항목들이다.

설계 단계 체크리스트

  • 이벤트 스키마에 eventVersion 필드가 포함되어 있는가
  • 이벤트 메타데이터에 correlationId, causationId가 포함되어 있는가
  • Aggregate의 불변식 검증 로직이 완전한가
  • 커맨드 실패 시 에러 응답이 명확한가
  • CQRS가 정말 필요한 도메인인가 (단순 CRUD가 아닌지 재확인)

구현 단계 체크리스트

  • 프로젝션 핸들러가 멱등(Idempotent)한가
  • 낙관적 동시성 제어가 구현되어 있는가 (expectedRevision)
  • 이벤트 직렬화/역직렬화 테스트가 작성되어 있는가
  • Upcaster가 이전 버전 이벤트를 올바르게 변환하는가
  • 스냅샷 스키마 버전 불일치 시 전체 이벤트 재생으로 폴백하는가

운영 단계 체크리스트

  • 이벤트 스토어 클러스터가 3노드 이상으로 구성되어 있는가
  • 프로젝션 consumer lag 모니터링이 설정되어 있는가
  • 이벤트 스토어 디스크 사용량 알림이 설정되어 있는가
  • 프로젝션 재구축 절차가 문서화되어 있는가
  • 스냅샷 생성 주기와 보관 정책이 설정되어 있는가
  • Dead Letter Queue(DLQ)가 설정되어 있는가 (처리 불가 이벤트 격리)
  • 장애 시 수동 이벤트 보정 스크립트가 준비되어 있는가

흔한 실수와 안티패턴

실전에서 자주 발생하는 실수를 정리한다. 이벤트 소싱 프로젝트가 실패하는 대부분의 원인이 여기 있다.

안티패턴 1: 이벤트에 현재 상태 전체를 저장

// BAD: 이벤트에 전체 상태를 넣는 것은 CRUD와 다를 바 없다
interface OrderUpdated {
  eventType: 'OrderUpdated'
  data: {
    order: Order // 전체 Order 객체
  }
}

// GOOD: 변경된 사실만 이벤트로 기록
interface OrderItemAdded {
  eventType: 'OrderItemAdded'
  data: {
    orderId: string
    productId: string
    quantity: number
    unitPrice: number
  }
}

안티패턴 2: 프로젝션에서 외부 서비스 호출

프로젝션 핸들러에서 외부 API를 호출하면, 이벤트 재생(replay) 시 부작용이 발생한다. 프로젝션은 순수 함수처럼 이벤트 데이터만으로 읽기 모델을 갱신해야 한다. 외부 서비스 호출이 필요한 로직은 별도의 Policy 핸들러나 Saga로 분리한다.

안티패턴 3: 모든 도메인에 이벤트 소싱 적용

이벤트 소싱은 상태 변경 이력이 비즈니스 가치를 갖는 도메인에 적용한다. 사용자 설정, 코드 테이블 등 단순 CRUD 도메인에 이벤트 소싱을 적용하면 복잡도만 올라가고 얻는 것은 없다. 시스템의 핵심 도메인(Core Domain)에만 선별적으로 적용하는 것이 올바른 전략이다.

안티패턴 4: 이벤트 스트림에 10만 건 이상 축적

하나의 Aggregate 스트림에 이벤트가 10만 건 이상 쌓이면 로딩 시간이 수 초 이상 걸린다. 스냅샷으로 완화할 수 있지만, 근본 원인은 Aggregate 경계가 잘못 설계된 것이다. Aggregate를 더 작은 단위로 분할하거나, 주기적 이벤트 생성 패턴을 재검토한다.

이벤트 소싱 vs 전통 CRUD 의사결정 기준

마지막으로, 이벤트 소싱 도입 여부를 결정할 때 참고할 비교표다.

기준전통 CRUD이벤트 소싱
현재 상태 조회단순 (직접 읽기)복잡 (이벤트 재생 또는 프로젝션)
변경 이력 추적별도 감사 로그 필요자동 확보 (이벤트 자체가 이력)
스키마 변경ALTER TABLE 마이그레이션이벤트 버전 관리 + Upcasting
디버깅현재 상태만 확인 가능전체 이력 재생으로 문제 원인 추적 가능
데이터 정합성 보장ACID 트랜잭션Aggregate 단위 일관성 + 최종 일관성
저장 공간현재 상태만 저장모든 이벤트 축적 (저장 비용 높음)
읽기 성능인덱스 최적화로 충분프로젝션 설계 필수
초기 개발 속도빠름느림 (학습 곡선, 인프라 구성)
복잡 도메인 대응력도메인 복잡도 증가 시 한계이벤트 중심 모델링으로 확장성 확보
팀 역량 요구일반적DDD, 이벤트 모델링 경험 필요

금융 거래, 물류 추적, 의료 기록, 협업 도구처럼 변경 이력 자체가 비즈니스 가치인 도메인에서 이벤트 소싱은 탁월한 선택이다. 그 외에는 전통 CRUD를 유지하되, 필요한 부분에만 감사 로그를 추가하는 것이 현실적이다.

정리

이벤트 소싱과 CQRS는 강력하지만 복잡한 패턴이다. 성공적으로 도입하려면 다음을 기억한다.

  • 이벤트 스키마 설계에 가장 많은 시간을 투자한다. 이벤트는 영원히 남는다.
  • 프로젝션은 반드시 멱등하게 구현한다. 이벤트 재생이 안전해야 운영이 가능하다.
  • 스냅샷은 성능 최적화 도구이지, 아키텍처의 필수 요소가 아니다. 필요할 때 도입한다.
  • 이벤트 버전 관리 전략을 프로젝트 초기에 확립한다. 나중에 추가하면 고통스럽다.
  • 모든 도메인에 적용하지 않는다. 핵심 도메인에만 선별적으로 적용한다.

References

Event Sourcing and CQRS Pattern Implementation Guide: From Design to Operations

Event Sourcing and CQRS Pattern Implementation Guide: From Design to Operations

Why Event Sourcing and CQRS Are Discussed Together

Event Sourcing and CQRS (Command Query Responsibility Segregation) are independent patterns, but in production environments they are almost always used together. When you record the complete history of state changes in an event stream with Event Sourcing, the current state can only be restored through event replay, causing read performance to drop sharply. Separating read-only projections (Read Models) with CQRS solves this problem.

This article goes beyond conceptual explanations. It covers building an event store based on EventStoreDB, implementing command handlers and projections in TypeScript and Python, snapshot strategies, event schema versioning, and disaster recovery procedures at an operational level. It also reflects the rebranding of EventStoreDB to Kurrent after 2025 and the latest gRPC client API.

Overall Architecture

The complete flow of an Event Sourcing + CQRS system works as follows:

  1. The client sends a Command
  2. The command handler validates domain invariants
  3. Upon passing validation, it creates domain events and appends them to the event store
  4. The event store pushes events to subscribers
  5. Projection handlers receive events and update the Read Model
  6. Query handlers retrieve data from the Read Model and return it to the client

Core principle: The write path (Command Path) and read path (Query Path) are completely separated. The write side only writes to the event store, and the read side only queries from projected read models.

Event Store Solution Comparison

Here is a comparison of production-ready event store solutions. Choosing the right one depends on your technology stack and operational environment.

SolutionLanguage EcosystemStorage MethodBuilt-in ProjectionsLicenseOperational Complexity
EventStoreDB (Kurrent)Language-agnostic (gRPC)Dedicated file systemYes (JS-based)BSL (free for commercial)Medium
Axon ServerJVM (Java/Kotlin)Dedicated storageNo (handled by Framework)Open Source + EnterpriseMedium
Marten.NET (C#)PostgreSQLYes (C#-based)MITLow
EventSourcingDBLanguage-agnostic (gRPC)Dedicated engineNoOpen SourceLow
PostgreSQL + DIYLanguage-agnosticRDBMSNo (build your own)Open SourceHigh
MongoDB + DIYLanguage-agnosticDocument DBNo (build your own)SSPLHigh

EventStoreDB is a dedicated event store designed by Greg Young, providing native support for stream-based append-only storage, built-in projections, catch-up subscriptions, and other features essential for event sourcing. This article uses EventStoreDB as the basis for implementation examples.

Domain Event Design

In an event sourcing system, domain events are the most important contract of the system. Once stored, events are never modified or deleted. Therefore, careful event schema design is essential.

Event Design Principles

  • Name events using past-tense verbs: OrderCreated, PaymentProcessed, ItemShipped
  • Capture business intent: RoomBooked rather than ReservationStatusChanged
  • Events must be self-contained: it should be possible to fully understand what happened from a single event
  • Use only simple types: string, number, boolean, arrays. Do not embed Value Objects directly in events
  • Include a version field: essential for schema evolution

TypeScript Event Definitions

// events/order-events.ts

interface BaseEvent {
  eventType: string
  eventVersion: number
  aggregateId: string
  timestamp: string
  metadata: {
    correlationId: string
    causationId: string
    userId: string
  }
}

interface OrderCreated extends BaseEvent {
  eventType: 'OrderCreated'
  eventVersion: 1
  data: {
    orderId: string
    customerId: string
    items: Array<{
      productId: string
      productName: string
      quantity: number
      unitPrice: number
    }>
    totalAmount: number
    currency: string
    shippingAddress: {
      street: string
      city: string
      zipCode: string
      country: string
    }
  }
}

interface OrderPaymentConfirmed extends BaseEvent {
  eventType: 'OrderPaymentConfirmed'
  eventVersion: 1
  data: {
    orderId: string
    paymentId: string
    amount: number
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
    confirmedAt: string
  }
}

interface OrderCancelled extends BaseEvent {
  eventType: 'OrderCancelled'
  eventVersion: 1
  data: {
    orderId: string
    reason: string
    cancelledBy: string
    refundAmount: number
  }
}

type OrderEvent = OrderCreated | OrderPaymentConfirmed | OrderCancelled

Always include correlationId and causationId in the metadata. In distributed systems where a single user request generates multiple events, troubleshooting becomes impossible without these two fields.

Command Handler and Aggregate Implementation

The command handler is responsible for the write path of an event sourcing system. The Aggregate is the boundary that enforces domain invariants, restoring the current state by replaying events.

TypeScript Aggregate Implementation

// aggregates/order-aggregate.ts

import { EventStoreDBClient, jsonEvent, FORWARDS, START } from '@eventstore/db-client'

interface OrderState {
  orderId: string
  status: 'CREATED' | 'PAID' | 'SHIPPED' | 'CANCELLED'
  customerId: string
  totalAmount: number
  items: Array<{ productId: string; quantity: number; unitPrice: number }>
  version: number
}

class OrderAggregate {
  private state: OrderState
  private pendingEvents: OrderEvent[] = []

  constructor() {
    this.state = {
      orderId: '',
      status: 'CREATED',
      customerId: '',
      totalAmount: 0,
      items: [],
      version: -1,
    }
  }

  // Restore state through event replay
  static async load(client: EventStoreDBClient, orderId: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate()
    const streamName = `order-${orderId}`

    const events = client.readStream(streamName, {
      direction: FORWARDS,
      fromRevision: START,
    })

    for await (const resolvedEvent of events) {
      const event = resolvedEvent.event
      if (event) {
        aggregate.apply(event.data as OrderEvent, false)
        aggregate.state.version = Number(resolvedEvent.event!.revision)
      }
    }

    return aggregate
  }

  // Command: Create order
  createOrder(command: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  }): void {
    // Invariant validation
    if (this.state.orderId !== '') {
      throw new Error(`Order ${command.orderId} already exists`)
    }
    if (command.items.length === 0) {
      throw new Error('Order must contain at least one item')
    }

    const totalAmount = command.items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    const event: OrderCreated = {
      eventType: 'OrderCreated',
      eventVersion: 1,
      aggregateId: command.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: command.orderId,
        customerId: command.customerId,
        items: command.items.map((i) => ({
          ...i,
          productName: '', // Populated on query
        })),
        totalAmount,
        currency: 'KRW',
        shippingAddress: { street: '', city: '', zipCode: '', country: 'KR' },
      },
    }

    this.apply(event, true)
  }

  // Command: Confirm payment
  confirmPayment(
    paymentId: string,
    amount: number,
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
  ): void {
    if (this.state.status !== 'CREATED') {
      throw new Error(`Cannot confirm payment for order in status: ${this.state.status}`)
    }
    if (amount !== this.state.totalAmount) {
      throw new Error(
        `Payment amount ${amount} does not match order total ${this.state.totalAmount}`
      )
    }

    const event: OrderPaymentConfirmed = {
      eventType: 'OrderPaymentConfirmed',
      eventVersion: 1,
      aggregateId: this.state.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: this.state.orderId,
        paymentId,
        amount,
        method,
        confirmedAt: new Date().toISOString(),
      },
    }

    this.apply(event, true)
  }

  // Apply event (state transition)
  private apply(event: OrderEvent, isNew: boolean): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.state.orderId = event.data.orderId
        this.state.customerId = event.data.customerId
        this.state.totalAmount = event.data.totalAmount
        this.state.items = event.data.items
        this.state.status = 'CREATED'
        break
      case 'OrderPaymentConfirmed':
        this.state.status = 'PAID'
        break
      case 'OrderCancelled':
        this.state.status = 'CANCELLED'
        break
    }

    if (isNew) {
      this.pendingEvents.push(event)
    }
  }

  // Save to event store
  async save(client: EventStoreDBClient): Promise<void> {
    if (this.pendingEvents.length === 0) return

    const streamName = `order-${this.state.orderId}`
    const events = this.pendingEvents.map((e) =>
      jsonEvent({ type: e.eventType, data: e.data, metadata: e.metadata })
    )

    await client.appendToStream(streamName, events, {
      expectedRevision: this.state.version === -1 ? 'no_stream' : BigInt(this.state.version),
    })

    this.pendingEvents = []
  }
}

The expectedRevision parameter is the core of Optimistic Concurrency Control. When two commands are executed simultaneously against the same Aggregate, the first one to save succeeds and the latter receives a WrongExpectedVersionError. The retry logic must reload the events and re-execute the command.

Projection Design and Implementation

Projections are the process of building read models from event streams. In an event sourcing system, projections realize the core advantage of being able to freely create views in any desired shape.

Projection Pattern Classification

  • Inline Projection: Updates the read model simultaneously with event storage. Guarantees strong consistency but degrades write performance.
  • Async Projection: Updates the read model asynchronously through subscriptions. Provides eventual consistency but offers good write performance. Most production systems adopt this approach.
  • Live Projection: Replays events for every request. Always guarantees the latest state, but performance drops sharply when there are many events.

Python Async Projection Implementation

The following is an example of an async projection implementation using EventStoreDB's Persistent Subscription in Python.

# projections/order_summary_projection.py

import asyncio
import json
from datetime import datetime
from esdbclient import EventStoreDBClient, NewEvent, StreamState
from dataclasses import dataclass, asdict
from typing import Optional
import asyncpg

@dataclass
class OrderSummaryReadModel:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    item_count: int
    created_at: str
    updated_at: str
    payment_method: Optional[str] = None
    cancelled_reason: Optional[str] = None


class OrderSummaryProjection:
    def __init__(self, esdb_client: EventStoreDBClient, pg_pool: asyncpg.Pool):
        self.esdb = esdb_client
        self.pg_pool = pg_pool
        self._checkpoint_interval = 100
        self._processed_count = 0

    async def start(self):
        """Start projection with catch-up subscription"""
        last_position = await self._load_checkpoint()

        subscription = self.esdb.subscribe_to_all(
            from_position=last_position,
            filter_include=[r"order-.*"],  # Subscribe only to streams starting with order-
        )

        for event in subscription:
            try:
                await self._handle_event(event)
                self._processed_count += 1

                # Periodically save checkpoint
                if self._processed_count % self._checkpoint_interval == 0:
                    await self._save_checkpoint(event.commit_position)

            except Exception as e:
                print(f"Projection error at position {event.commit_position}: {e}")
                # Save checkpoint on error and restart
                await self._save_checkpoint(event.commit_position)
                raise

    async def _handle_event(self, event):
        """Route to handler by event type"""
        handler_map = {
            'OrderCreated': self._on_order_created,
            'OrderPaymentConfirmed': self._on_payment_confirmed,
            'OrderCancelled': self._on_order_cancelled,
        }

        handler = handler_map.get(event.type)
        if handler:
            data = json.loads(event.data)
            await handler(data)

    async def _on_order_created(self, data: dict):
        """Handle order created event - INSERT into read model"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO order_summary (
                    order_id, customer_id, status, total_amount,
                    item_count, created_at, updated_at
                ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                ON CONFLICT (order_id) DO UPDATE SET
                    status = EXCLUDED.status,
                    updated_at = EXCLUDED.updated_at
            """,
                data['orderId'],
                data['customerId'],
                'CREATED',
                data['totalAmount'],
                len(data['items']),
                datetime.fromisoformat(data.get('createdAt', datetime.now().isoformat())),
                datetime.now(),
            )

    async def _on_payment_confirmed(self, data: dict):
        """Handle payment confirmed event - UPDATE read model"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'PAID',
                    payment_method = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['method'], datetime.now())

    async def _on_order_cancelled(self, data: dict):
        """Handle order cancelled event"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'CANCELLED',
                    cancelled_reason = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['reason'], datetime.now())

    async def _load_checkpoint(self) -> Optional[int]:
        """Load last processed position"""
        async with self.pg_pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT position FROM projection_checkpoints WHERE name = $1",
                'order_summary'
            )
            return row['position'] if row else None

    async def _save_checkpoint(self, position: int):
        """Save processed position"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO projection_checkpoints (name, position, updated_at)
                VALUES ($1, $2, NOW())
                ON CONFLICT (name) DO UPDATE SET
                    position = EXCLUDED.position,
                    updated_at = NOW()
            """, 'order_summary', position)

The most important aspect of projection implementation is idempotency. The ON CONFLICT ... DO UPDATE clause in the code above is critical. When a projection fails midway and restarts, it may receive events that have already been processed. Without idempotent handling, data becomes corrupted.

Read Model Table Schema

-- Read model table used by projections
CREATE TABLE order_summary (
    order_id        VARCHAR(36) PRIMARY KEY,
    customer_id     VARCHAR(36) NOT NULL,
    status          VARCHAR(20) NOT NULL,
    total_amount    DECIMAL(15,2) NOT NULL,
    item_count      INTEGER NOT NULL,
    payment_method  VARCHAR(20),
    cancelled_reason TEXT,
    created_at      TIMESTAMP NOT NULL,
    updated_at      TIMESTAMP NOT NULL
);

-- Index optimized for customer-based order queries
CREATE INDEX idx_order_summary_customer ON order_summary(customer_id, status);

-- Index for status-based filtering
CREATE INDEX idx_order_summary_status ON order_summary(status, created_at DESC);

-- Projection checkpoint table
CREATE TABLE projection_checkpoints (
    name        VARCHAR(100) PRIMARY KEY,
    position    BIGINT NOT NULL,
    updated_at  TIMESTAMP NOT NULL
);

Read models are freely designed according to projection requirements. There is no need for normalization. You can run multiple projections simultaneously for customer dashboards, admin statistics, search engine indexing, and more. When a new view is needed, simply add a new projection and replay events from the beginning.

Snapshot Strategy

When an Aggregate accumulates thousands or tens of thousands of events, replaying all events each time becomes inefficient. Snapshots save the Aggregate state at specific points to reduce the replay range.

Snapshot Application Criteria

Introducing snapshots too early only increases complexity. Use the following criteria for judgment:

  • Consider introducing snapshots when the average number of events per Aggregate exceeds 50
  • Snapshots are needed when event replay time exceeds 100ms
  • Snapshots are typically created every 100 to 500 events

TypeScript Snapshot Implementation

// snapshots/snapshot-store.ts

interface Snapshot<T> {
  aggregateId: string
  aggregateType: string
  version: number // Aggregate version at snapshot time
  schemaVersion: number // Snapshot schema version (for upgrade detection)
  state: T
  createdAt: string
}

class SnapshotStore {
  private client: EventStoreDBClient
  private snapshotInterval: number

  constructor(client: EventStoreDBClient, snapshotInterval = 200) {
    this.client = client
    this.snapshotInterval = snapshotInterval
  }

  async saveSnapshot<T>(snapshot: Snapshot<T>): Promise<void> {
    const streamName = `snapshot-${snapshot.aggregateType}-${snapshot.aggregateId}`
    const event = jsonEvent({
      type: 'Snapshot',
      data: snapshot,
    })

    // Keep only the latest snapshot in the snapshot stream (maxCount setting)
    await this.client.appendToStream(streamName, [event])
    await this.client.setStreamMetadata(streamName, {
      maxCount: 3, // Keep only the 3 most recent for rollback capability
    })
  }

  async loadSnapshot<T>(
    aggregateType: string,
    aggregateId: string,
    currentSchemaVersion: number
  ): Promise<Snapshot<T> | null> {
    const streamName = `snapshot-${aggregateType}-${aggregateId}`

    try {
      const events = this.client.readStream(streamName, {
        direction: BACKWARDS,
        fromRevision: END,
        maxCount: 1,
      })

      for await (const resolved of events) {
        const snapshot = resolved.event!.data as Snapshot<T>

        // Ignore snapshot if schema version differs (full event replay)
        if (snapshot.schemaVersion !== currentSchemaVersion) {
          console.warn(
            `Snapshot schema mismatch for ${aggregateId}: ` +
              `expected ${currentSchemaVersion}, got ${snapshot.schemaVersion}. ` +
              `Replaying all events.`
          )
          return null
        }

        return snapshot
      }
    } catch (error) {
      // Return null if snapshot stream doesn't exist
      return null
    }

    return null
  }

  shouldTakeSnapshot(currentVersion: number, lastSnapshotVersion: number): boolean {
    return currentVersion - lastSnapshotVersion >= this.snapshotInterval
  }
}

Pay attention to the snapshot's schemaVersion field. When the Aggregate structure changes, existing snapshots are no longer valid. If the schemaVersion mismatches, the system ignores the snapshot and replays all events to generate a new snapshot. This is the advantage of event sourcing -- events are immutable, so they can always be replayed.

Note: Snapshots are a performance optimization technique, not a required component. The system must function correctly without snapshots. Implement automatic fallback to full event replay when snapshots are corrupted or invalidated.

Event Versioning

As production systems evolve, event schemas also change. Since stored events are never modified in event sourcing, a strategy for safely handling schema changes is needed.

Versioning Strategy Comparison

StrategyDescriptionSuitable ScenarioRisk
Weak SchemaUse default values for new fields, compatible with existing eventsWhen only field additions are neededLow
UpcastingTransform during event deserialization via middlewareField name changes, type changesMedium
New Event TypeDefine completely new event typeWhen the event meaning itself changesLow
Copy-Replace StreamClone stream with new schema and replaceLarge-scale schema migrationHigh

Upcasting Implementation Example

The most practical strategy is Upcasting. It places middleware that converts previous versions to the current version during the event deserialization process.

// versioning/event-upcaster.ts

type Upcaster = (event: any) => any

// Register upcasters per version
const upcasters: Map<string, Map<number, Upcaster>> = new Map()

// OrderCreated v1 -> v2: Changed shippingAddress to structured object
upcasters.set(
  'OrderCreated',
  new Map([
    [
      1,
      (event: any) => {
        // When shippingAddress was a single string in v1
        const address =
          typeof event.data.shippingAddress === 'string'
            ? {
                street: event.data.shippingAddress,
                city: 'UNKNOWN',
                zipCode: 'UNKNOWN',
                country: 'KR',
              }
            : event.data.shippingAddress

        return {
          ...event,
          eventVersion: 2,
          data: {
            ...event.data,
            shippingAddress: address,
            // Apply default value for fields added in v2
            orderSource: event.data.orderSource ?? 'WEB',
          },
        }
      },
    ],
  ])
)

function upcastEvent(event: any): any {
  const eventUpcasters = upcasters.get(event.eventType)
  if (!eventUpcasters) return event

  let current = event
  const targetVersion = Math.max(...Array.from(eventUpcasters.keys())) + 1

  // Apply sequentially from current version to latest version
  for (let v = current.eventVersion; v < targetVersion; v++) {
    const upcaster = eventUpcasters.get(v)
    if (upcaster) {
      current = upcaster(current)
    }
  }

  return current
}

// Usage example
// const rawEvent = loadFromEventStore();
// const currentEvent = upcastEvent(rawEvent);
// aggregate.apply(currentEvent);

The most important principle in event versioning is to never modify existing events. Data stored in the event store is immutable. Instead, transform at read time (Upcasting) or define new event types.

Disaster Recovery Procedures

Disaster recovery in event sourcing systems differs from traditional systems. Since the event store is the Single Source of Truth, projections can be rebuilt at any time.

Recovering from Projection Corruption

This procedure is used when a projection contains incorrect data or when there was a bug in the projection logic.

  1. Stop the projection service: Stop the subscription for the affected projection
  2. DROP or TRUNCATE the read model table: Remove existing incorrect data
  3. Reset the checkpoint: Delete the position for the affected projection from the projection_checkpoints table
  4. Restart the projection service: Rebuild the read model by replaying all events from the beginning of the event store
  5. Verify rebuild completion: Confirm that the projection has caught up to the current position

Warning: If there are hundreds of millions of events, rebuilding may take several hours. Design a projection architecture capable of parallel processing in advance, or prepare to rebuild by partition.

Event Store Cluster Failure

EventStoreDB uses a leader-follower architecture. When the leader node goes down, one of the followers is automatically promoted to leader. Points to note during this process:

  • Write failure handling: Retry writes that fail during leader transitions. Idempotency is guaranteed by expectedRevision
  • Subscription reconnection: Persistent Subscriptions reconnect automatically, but Catch-up Subscriptions must be manually reconnected from the last checkpoint
  • Split-brain prevention: Operate a minimum 3-node cluster to enable quorum-based leader election

Common Failure Scenarios and Responses

Failure ScenarioCauseResponse
Projection data inconsistencyProjection logic bugRebuild projection (full event replay)
Aggregate load failureEvent stream corruptionSnapshot fallback then partial replay, check cluster replicas
Concurrent write conflictSimultaneous Aggregate modificationCatch WrongExpectedVersionError and retry
Subscription lag (Consumer Lag)Insufficient projection processing speedPartitioning or horizontal scaling of projection instances
Event store disk fullEvent growthApply archiving policies, set stream maxAge/maxCount
Snapshot schema mismatchDeployment after Aggregate structure changeAutomatic fallback to full event replay

Operational Checklist

Items that must be verified before production deployment.

Design Phase Checklist

  • Does the event schema include an eventVersion field
  • Does the event metadata include correlationId and causationId
  • Is the Aggregate's invariant validation logic complete
  • Are error responses clear when commands fail
  • Is CQRS truly needed for this domain (reconfirm it's not simple CRUD)

Implementation Phase Checklist

  • Are projection handlers idempotent
  • Is optimistic concurrency control implemented (expectedRevision)
  • Are event serialization/deserialization tests written
  • Does the upcaster correctly convert previous version events
  • Does it fall back to full event replay when snapshot schema versions mismatch

Operations Phase Checklist

  • Is the event store cluster configured with 3 or more nodes
  • Is projection consumer lag monitoring set up
  • Are event store disk usage alerts configured
  • Is the projection rebuild procedure documented
  • Are snapshot creation frequency and retention policies configured
  • Is a Dead Letter Queue (DLQ) configured (for isolating unprocessable events)
  • Are manual event correction scripts prepared for failure scenarios

Common Mistakes and Anti-patterns

Here is a summary of frequently occurring mistakes in practice. Most causes of event sourcing project failures are found here.

Anti-pattern 1: Storing the Entire Current State in Events

// BAD: Putting the entire state in an event is no different from CRUD
interface OrderUpdated {
  eventType: 'OrderUpdated'
  data: {
    order: Order // Entire Order object
  }
}

// GOOD: Record only what changed as events
interface OrderItemAdded {
  eventType: 'OrderItemAdded'
  data: {
    orderId: string
    productId: string
    quantity: number
    unitPrice: number
  }
}

Anti-pattern 2: Calling External Services from Projections

If a projection handler calls external APIs, side effects occur during event replay. Projections should update the read model using only event data, like pure functions. Logic requiring external service calls should be separated into dedicated Policy handlers or Sagas.

Anti-pattern 3: Applying Event Sourcing to Every Domain

Event sourcing should be applied to domains where state change history has business value. Applying event sourcing to simple CRUD domains like user settings or code tables only increases complexity with no benefit. The correct strategy is to selectively apply it only to the system's Core Domain.

Anti-pattern 4: Accumulating 100,000+ Events in a Stream

When a single Aggregate stream accumulates over 100,000 events, loading time exceeds several seconds. Snapshots can mitigate this, but the root cause is poorly designed Aggregate boundaries. Consider splitting the Aggregate into smaller units or reviewing the periodic event generation pattern.

Event Sourcing vs Traditional CRUD Decision Criteria

Finally, here is a comparison table for reference when deciding whether to adopt event sourcing.

CriteriaTraditional CRUDEvent Sourcing
Current state queryingSimple (direct read)Complex (event replay or projection)
Change history trackingRequires separate audit logsAutomatically captured (events are history)
Schema changesALTER TABLE migrationEvent versioning + Upcasting
DebuggingOnly current state availableFull history replay for root cause analysis
Data consistencyACID transactionsAggregate-level consistency + eventual consistency
Storage spaceOnly current state storedAll events accumulated (higher storage cost)
Read performanceSufficient with index optimizationProjection design required
Initial development speedFastSlow (learning curve, infrastructure setup)
Complex domain adaptabilityLimited as domain complexity growsExtensibility through event-centric modeling
Team skill requirementsGeneralDDD and event modeling experience required

Event sourcing is an excellent choice for domains where change history itself has business value, such as financial transactions, logistics tracking, medical records, and collaboration tools. For other cases, it is more practical to maintain traditional CRUD and add audit logs only where needed.

Summary

Event sourcing and CQRS are powerful but complex patterns. To successfully adopt them, remember the following:

  • Invest the most time in event schema design. Events persist forever.
  • Projections must be implemented idempotently. Event replay must be safe for operations to be possible.
  • Snapshots are a performance optimization tool, not a required architectural component. Introduce them when needed.
  • Establish event versioning strategies early in the project. Adding them later is painful.
  • Do not apply to every domain. Apply selectively to core domains only.

References