Skip to content
Published on

이벤트 소싱과 CQRS 패턴 실전 구현 가이드: 설계부터 운영까지

Authors
  • Name
    Twitter
이벤트 소싱과 CQRS 패턴 실전 구현 가이드: 설계부터 운영까지

왜 이벤트 소싱과 CQRS를 함께 다루는가

이벤트 소싱(Event Sourcing)과 CQRS(Command Query Responsibility Segregation)는 독립적인 패턴이지만, 프로덕션 환경에서는 거의 항상 함께 사용된다. 이벤트 소싱으로 상태 변경의 전체 이력을 이벤트 스트림에 기록하면, 현재 상태를 이벤트 재생(replay)으로만 복원해야 하기 때문에 읽기 성능이 급격히 떨어진다. CQRS로 읽기 전용 프로젝션(Read Model)을 분리하면 이 문제가 해결된다.

이 글은 개념 설명에 머무르지 않는다. EventStoreDB 기반 이벤트 저장소 구축, TypeScript와 Python으로 커맨드 핸들러와 프로젝션을 구현하고, 스냅샷 전략, 이벤트 스키마 버전 관리, 장애 복구 절차까지 운영 레벨의 내용을 모두 다룬다. 2025년 이후 EventStoreDB가 Kurrent로 리브랜딩된 상황과 최신 gRPC 클라이언트 API도 반영했다.

아키텍처 전체 구조

이벤트 소싱 + CQRS 시스템의 전체 흐름은 다음과 같다.

  1. 클라이언트가 커맨드(Command)를 전송한다
  2. 커맨드 핸들러가 도메인 불변식(invariant)을 검증한다
  3. 검증을 통과하면 도메인 이벤트를 생성하고 이벤트 스토어에 append한다
  4. 이벤트 스토어가 **구독자(Subscriber)**에게 이벤트를 푸시한다
  5. 프로젝션 핸들러가 이벤트를 수신하여 **읽기 모델(Read Model)**을 갱신한다
  6. 쿼리 핸들러가 읽기 모델에서 데이터를 조회하여 클라이언트에 반환한다

핵심 원칙: 쓰기 경로(Command Path)와 읽기 경로(Query Path)는 완전히 분리된다. 쓰기 측은 이벤트 스토어에만 쓰고, 읽기 측은 프로젝션된 읽기 모델에서만 조회한다.

이벤트 스토어 솔루션 비교

프로덕션에 사용 가능한 이벤트 스토어 솔루션을 비교한다. 기술 스택과 운영 환경에 맞는 선택이 중요하다.

솔루션언어 생태계저장 방식프로젝션 내장라이선스운영 복잡도
EventStoreDB (Kurrent)언어 무관 (gRPC)전용 파일 시스템있음 (JS 기반)BSL (상용 무료)중간
Axon ServerJVM (Java/Kotlin)전용 스토리지없음 (Framework에서 처리)오픈소스 + Enterprise중간
Marten.NET (C#)PostgreSQL있음 (C# 기반)MIT낮음
EventSourcingDB언어 무관 (gRPC)전용 엔진없음오픈소스낮음
PostgreSQL + 직접 구현언어 무관RDBMS없음 (직접 구현)오픈소스높음
MongoDB + 직접 구현언어 무관Document DB없음 (직접 구현)SSPL높음

EventStoreDB는 Greg Young이 설계한 전용 이벤트 스토어로, 스트림 단위 append-only 저장, 내장 프로젝션, catch-up 구독 등 이벤트 소싱에 필요한 기능을 네이티브로 제공한다. 이 글에서는 EventStoreDB를 기준으로 구현 예제를 작성한다.

도메인 이벤트 설계

이벤트 소싱 시스템에서 도메인 이벤트는 시스템의 가장 중요한 계약(contract)이다. 한번 저장된 이벤트는 절대 수정하거나 삭제하지 않는다. 따라서 이벤트 스키마 설계에 신중해야 한다.

이벤트 설계 원칙

  • 과거형 동사로 명명한다: OrderCreated, PaymentProcessed, ItemShipped
  • 비즈니스 의도를 담는다: ReservationStatusChanged가 아닌 RoomBooked
  • **자기 완결적(self-contained)**이어야 한다: 이벤트 하나로 무슨 일이 일어났는지 완전히 파악 가능해야 한다
  • 단순 타입만 사용한다: string, number, boolean, 배열. Value Object를 이벤트에 직접 넣지 않는다
  • 버전 필드를 포함한다: 스키마 진화에 필수

TypeScript 이벤트 정의

// events/order-events.ts

interface BaseEvent {
  eventType: string
  eventVersion: number
  aggregateId: string
  timestamp: string
  metadata: {
    correlationId: string
    causationId: string
    userId: string
  }
}

interface OrderCreated extends BaseEvent {
  eventType: 'OrderCreated'
  eventVersion: 1
  data: {
    orderId: string
    customerId: string
    items: Array<{
      productId: string
      productName: string
      quantity: number
      unitPrice: number
    }>
    totalAmount: number
    currency: string
    shippingAddress: {
      street: string
      city: string
      zipCode: string
      country: string
    }
  }
}

interface OrderPaymentConfirmed extends BaseEvent {
  eventType: 'OrderPaymentConfirmed'
  eventVersion: 1
  data: {
    orderId: string
    paymentId: string
    amount: number
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
    confirmedAt: string
  }
}

interface OrderCancelled extends BaseEvent {
  eventType: 'OrderCancelled'
  eventVersion: 1
  data: {
    orderId: string
    reason: string
    cancelledBy: string
    refundAmount: number
  }
}

type OrderEvent = OrderCreated | OrderPaymentConfirmed | OrderCancelled

correlationIdcausationId를 메타데이터에 반드시 포함시킨다. 분산 시스템에서 하나의 사용자 요청이 여러 이벤트를 발생시킬 때, 이 두 필드가 없으면 장애 추적이 불가능하다.

커맨드 핸들러와 Aggregate 구현

커맨드 핸들러는 이벤트 소싱 시스템의 쓰기 경로를 담당한다. Aggregate는 도메인 불변식을 지키는 경계이며, 이벤트를 재생하여 현재 상태를 복원한다.

TypeScript Aggregate 구현

// aggregates/order-aggregate.ts

import { EventStoreDBClient, jsonEvent, FORWARDS, START } from '@eventstore/db-client'

interface OrderState {
  orderId: string
  status: 'CREATED' | 'PAID' | 'SHIPPED' | 'CANCELLED'
  customerId: string
  totalAmount: number
  items: Array<{ productId: string; quantity: number; unitPrice: number }>
  version: number
}

class OrderAggregate {
  private state: OrderState
  private pendingEvents: OrderEvent[] = []

  constructor() {
    this.state = {
      orderId: '',
      status: 'CREATED',
      customerId: '',
      totalAmount: 0,
      items: [],
      version: -1,
    }
  }

  // 이벤트 재생으로 상태 복원
  static async load(client: EventStoreDBClient, orderId: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate()
    const streamName = `order-${orderId}`

    const events = client.readStream(streamName, {
      direction: FORWARDS,
      fromRevision: START,
    })

    for await (const resolvedEvent of events) {
      const event = resolvedEvent.event
      if (event) {
        aggregate.apply(event.data as OrderEvent, false)
        aggregate.state.version = Number(resolvedEvent.event!.revision)
      }
    }

    return aggregate
  }

  // 커맨드: 주문 생성
  createOrder(command: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  }): void {
    // 불변식 검증
    if (this.state.orderId !== '') {
      throw new Error(`Order ${command.orderId} already exists`)
    }
    if (command.items.length === 0) {
      throw new Error('Order must contain at least one item')
    }

    const totalAmount = command.items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    const event: OrderCreated = {
      eventType: 'OrderCreated',
      eventVersion: 1,
      aggregateId: command.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: command.orderId,
        customerId: command.customerId,
        items: command.items.map((i) => ({
          ...i,
          productName: '', // 조회 시 채움
        })),
        totalAmount,
        currency: 'KRW',
        shippingAddress: { street: '', city: '', zipCode: '', country: 'KR' },
      },
    }

    this.apply(event, true)
  }

  // 커맨드: 결제 확인
  confirmPayment(
    paymentId: string,
    amount: number,
    method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
  ): void {
    if (this.state.status !== 'CREATED') {
      throw new Error(`Cannot confirm payment for order in status: ${this.state.status}`)
    }
    if (amount !== this.state.totalAmount) {
      throw new Error(
        `Payment amount ${amount} does not match order total ${this.state.totalAmount}`
      )
    }

    const event: OrderPaymentConfirmed = {
      eventType: 'OrderPaymentConfirmed',
      eventVersion: 1,
      aggregateId: this.state.orderId,
      timestamp: new Date().toISOString(),
      metadata: { correlationId: '', causationId: '', userId: '' },
      data: {
        orderId: this.state.orderId,
        paymentId,
        amount,
        method,
        confirmedAt: new Date().toISOString(),
      },
    }

    this.apply(event, true)
  }

  // 이벤트 적용 (상태 전이)
  private apply(event: OrderEvent, isNew: boolean): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.state.orderId = event.data.orderId
        this.state.customerId = event.data.customerId
        this.state.totalAmount = event.data.totalAmount
        this.state.items = event.data.items
        this.state.status = 'CREATED'
        break
      case 'OrderPaymentConfirmed':
        this.state.status = 'PAID'
        break
      case 'OrderCancelled':
        this.state.status = 'CANCELLED'
        break
    }

    if (isNew) {
      this.pendingEvents.push(event)
    }
  }

  // 이벤트 스토어에 저장
  async save(client: EventStoreDBClient): Promise<void> {
    if (this.pendingEvents.length === 0) return

    const streamName = `order-${this.state.orderId}`
    const events = this.pendingEvents.map((e) =>
      jsonEvent({ type: e.eventType, data: e.data, metadata: e.metadata })
    )

    await client.appendToStream(streamName, events, {
      expectedRevision: this.state.version === -1 ? 'no_stream' : BigInt(this.state.version),
    })

    this.pendingEvents = []
  }
}

expectedRevision 파라미터가 낙관적 동시성 제어(Optimistic Concurrency Control)의 핵심이다. 같은 Aggregate에 대해 두 커맨드가 동시에 실행되면, 먼저 저장한 쪽이 성공하고 나중 쪽은 WrongExpectedVersionError를 받는다. 재시도 로직에서 이벤트를 다시 로드하고 커맨드를 재실행해야 한다.

프로젝션 설계와 구현

프로젝션은 이벤트 스트림에서 읽기 모델을 구축하는 과정이다. 이벤트 소싱 시스템에서 프로젝션은 "원하는 형태의 뷰를 자유롭게 만들 수 있다"는 핵심 장점을 실현하는 부분이다.

프로젝션 패턴 분류

  • 인라인 프로젝션(Inline Projection): 이벤트 저장과 동시에 읽기 모델을 갱신한다. 강한 일관성을 보장하지만 쓰기 성능이 떨어진다.
  • 비동기 프로젝션(Async Projection): 구독(Subscription)을 통해 비동기로 읽기 모델을 갱신한다. 최종 일관성(Eventual Consistency)이지만 쓰기 성능이 좋다. 대부분의 프로덕션 시스템이 이 방식을 채택한다.
  • 라이브 프로젝션(Live Projection): 매 요청마다 이벤트를 재생한다. 항상 최신 상태를 보장하지만, 이벤트가 많으면 성능이 급격히 떨어진다.

Python 비동기 프로젝션 구현

다음은 Python으로 EventStoreDB의 Persistent Subscription을 이용한 비동기 프로젝션 구현 예제다.

# projections/order_summary_projection.py

import asyncio
import json
from datetime import datetime
from esdbclient import EventStoreDBClient, NewEvent, StreamState
from dataclasses import dataclass, asdict
from typing import Optional
import asyncpg

@dataclass
class OrderSummaryReadModel:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    item_count: int
    created_at: str
    updated_at: str
    payment_method: Optional[str] = None
    cancelled_reason: Optional[str] = None


class OrderSummaryProjection:
    def __init__(self, esdb_client: EventStoreDBClient, pg_pool: asyncpg.Pool):
        self.esdb = esdb_client
        self.pg_pool = pg_pool
        self._checkpoint_interval = 100
        self._processed_count = 0

    async def start(self):
        """catch-up 구독으로 프로젝션 시작"""
        last_position = await self._load_checkpoint()

        subscription = self.esdb.subscribe_to_all(
            from_position=last_position,
            filter_include=[r"order-.*"],  # order- 로 시작하는 스트림만 구독
        )

        for event in subscription:
            try:
                await self._handle_event(event)
                self._processed_count += 1

                # 주기적으로 체크포인트 저장
                if self._processed_count % self._checkpoint_interval == 0:
                    await self._save_checkpoint(event.commit_position)

            except Exception as e:
                print(f"Projection error at position {event.commit_position}: {e}")
                # 에러 발생 시 체크포인트까지 저장하고 재시작
                await self._save_checkpoint(event.commit_position)
                raise

    async def _handle_event(self, event):
        """이벤트 타입별 핸들러 라우팅"""
        handler_map = {
            'OrderCreated': self._on_order_created,
            'OrderPaymentConfirmed': self._on_payment_confirmed,
            'OrderCancelled': self._on_order_cancelled,
        }

        handler = handler_map.get(event.type)
        if handler:
            data = json.loads(event.data)
            await handler(data)

    async def _on_order_created(self, data: dict):
        """주문 생성 이벤트 처리 - 읽기 모델 INSERT"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO order_summary (
                    order_id, customer_id, status, total_amount,
                    item_count, created_at, updated_at
                ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                ON CONFLICT (order_id) DO UPDATE SET
                    status = EXCLUDED.status,
                    updated_at = EXCLUDED.updated_at
            """,
                data['orderId'],
                data['customerId'],
                'CREATED',
                data['totalAmount'],
                len(data['items']),
                datetime.fromisoformat(data.get('createdAt', datetime.now().isoformat())),
                datetime.now(),
            )

    async def _on_payment_confirmed(self, data: dict):
        """결제 확인 이벤트 처리 - 읽기 모델 UPDATE"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'PAID',
                    payment_method = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['method'], datetime.now())

    async def _on_order_cancelled(self, data: dict):
        """주문 취소 이벤트 처리"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                UPDATE order_summary
                SET status = 'CANCELLED',
                    cancelled_reason = $2,
                    updated_at = $3
                WHERE order_id = $1
            """, data['orderId'], data['reason'], datetime.now())

    async def _load_checkpoint(self) -> Optional[int]:
        """마지막 처리 위치 로드"""
        async with self.pg_pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT position FROM projection_checkpoints WHERE name = $1",
                'order_summary'
            )
            return row['position'] if row else None

    async def _save_checkpoint(self, position: int):
        """처리 위치 저장"""
        async with self.pg_pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO projection_checkpoints (name, position, updated_at)
                VALUES ($1, $2, NOW())
                ON CONFLICT (name) DO UPDATE SET
                    position = EXCLUDED.position,
                    updated_at = NOW()
            """, 'order_summary', position)

프로젝션 구현에서 가장 중요한 것은 **멱등성(Idempotency)**이다. 위 코드의 ON CONFLICT ... DO UPDATE 구문이 핵심이다. 프로젝션이 중간에 실패하고 재시작하면, 이미 처리한 이벤트를 다시 받을 수 있다. 멱등하게 처리하지 않으면 데이터가 꼬인다.

읽기 모델 테이블 스키마

-- 프로젝션이 사용하는 읽기 모델 테이블
CREATE TABLE order_summary (
    order_id        VARCHAR(36) PRIMARY KEY,
    customer_id     VARCHAR(36) NOT NULL,
    status          VARCHAR(20) NOT NULL,
    total_amount    DECIMAL(15,2) NOT NULL,
    item_count      INTEGER NOT NULL,
    payment_method  VARCHAR(20),
    cancelled_reason TEXT,
    created_at      TIMESTAMP NOT NULL,
    updated_at      TIMESTAMP NOT NULL
);

-- 고객별 주문 조회 최적화 인덱스
CREATE INDEX idx_order_summary_customer ON order_summary(customer_id, status);

-- 상태별 필터링 인덱스
CREATE INDEX idx_order_summary_status ON order_summary(status, created_at DESC);

-- 프로젝션 체크포인트 테이블
CREATE TABLE projection_checkpoints (
    name        VARCHAR(100) PRIMARY KEY,
    position    BIGINT NOT NULL,
    updated_at  TIMESTAMP NOT NULL
);

읽기 모델은 프로젝션 요구사항에 맞게 자유롭게 설계한다. 정규화할 필요가 없다. 고객별 대시보드용, 관리자 통계용, 검색 엔진 인덱싱용 등 여러 프로젝션을 동시에 운영할 수 있다. 새로운 뷰가 필요하면 새 프로젝션을 추가하고 이벤트를 처음부터 재생하면 된다.

스냅샷 전략

Aggregate에 이벤트가 수천, 수만 건 쌓이면 매번 전체 이벤트를 재생하는 것은 비효율적이다. 스냅샷은 특정 시점의 Aggregate 상태를 저장하여 재생 범위를 줄인다.

스냅샷 적용 기준

스냅샷을 너무 일찍 도입하면 복잡도만 올라간다. 다음 기준으로 판단한다.

  • Aggregate당 평균 이벤트 수가 50개 이상이면 스냅샷 도입을 검토한다
  • 이벤트 재생 시간이 100ms를 초과하면 스냅샷이 필요하다
  • 스냅샷 주기는 보통 매 100~500 이벤트마다 생성한다

TypeScript 스냅샷 구현

// snapshots/snapshot-store.ts

interface Snapshot<T> {
  aggregateId: string
  aggregateType: string
  version: number // 스냅샷 시점의 Aggregate 버전
  schemaVersion: number // 스냅샷 스키마 버전 (업그레이드 감지용)
  state: T
  createdAt: string
}

class SnapshotStore {
  private client: EventStoreDBClient
  private snapshotInterval: number

  constructor(client: EventStoreDBClient, snapshotInterval = 200) {
    this.client = client
    this.snapshotInterval = snapshotInterval
  }

  async saveSnapshot<T>(snapshot: Snapshot<T>): Promise<void> {
    const streamName = `snapshot-${snapshot.aggregateType}-${snapshot.aggregateId}`
    const event = jsonEvent({
      type: 'Snapshot',
      data: snapshot,
    })

    // 스냅샷 스트림은 최신 1개만 유지 (maxCount 설정)
    await this.client.appendToStream(streamName, [event])
    await this.client.setStreamMetadata(streamName, {
      maxCount: 3, // 최근 3개만 유지하여 롤백 가능성 확보
    })
  }

  async loadSnapshot<T>(
    aggregateType: string,
    aggregateId: string,
    currentSchemaVersion: number
  ): Promise<Snapshot<T> | null> {
    const streamName = `snapshot-${aggregateType}-${aggregateId}`

    try {
      const events = this.client.readStream(streamName, {
        direction: BACKWARDS,
        fromRevision: END,
        maxCount: 1,
      })

      for await (const resolved of events) {
        const snapshot = resolved.event!.data as Snapshot<T>

        // 스키마 버전이 다르면 스냅샷 무시 (전체 이벤트 재생)
        if (snapshot.schemaVersion !== currentSchemaVersion) {
          console.warn(
            `Snapshot schema mismatch for ${aggregateId}: ` +
              `expected ${currentSchemaVersion}, got ${snapshot.schemaVersion}. ` +
              `Replaying all events.`
          )
          return null
        }

        return snapshot
      }
    } catch (error) {
      // 스냅샷 스트림이 없으면 null 반환
      return null
    }

    return null
  }

  shouldTakeSnapshot(currentVersion: number, lastSnapshotVersion: number): boolean {
    return currentVersion - lastSnapshotVersion >= this.snapshotInterval
  }
}

스냅샷의 schemaVersion 필드에 주목해야 한다. Aggregate 구조가 변경되면 기존 스냅샷은 더 이상 유효하지 않다. schemaVersion이 불일치하면 스냅샷을 무시하고 전체 이벤트를 재생하여 새 스냅샷을 생성한다. 이것이 이벤트 소싱의 장점이다. 이벤트는 불변이므로 언제든 다시 재생할 수 있다.

주의: 스냅샷은 성능 최적화 기법이지, 필수 요소가 아니다. 스냅샷 없이도 시스템은 정상 동작해야 한다. 스냅샷이 손상되거나 무효화되면 전체 이벤트 재생으로 자동 폴백되도록 구현한다.

이벤트 버전 관리 (Event Versioning)

프로덕션 시스템이 진화하면 이벤트 스키마도 변경된다. 이벤트 소싱에서 저장된 이벤트는 절대 수정하지 않으므로, 스키마 변경을 안전하게 처리하는 전략이 필요하다.

버전 관리 전략 비교

전략설명적합한 상황위험도
Weak Schema새 필드 추가 시 기본값 사용, 기존 이벤트 호환필드 추가만 필요한 경우낮음
Upcasting이벤트 역직렬화 시 미들웨어에서 변환필드 이름 변경, 타입 변경중간
New Event Type완전히 새로운 이벤트 타입 정의이벤트 의미 자체가 변경낮음
Copy-Replace Stream스트림을 새 스키마로 복제 후 교체대규모 스키마 마이그레이션높음

Upcasting 구현 예제

가장 실용적인 전략은 Upcasting이다. 이벤트를 역직렬화하는 과정에서 이전 버전을 현재 버전으로 변환하는 미들웨어를 둔다.

// versioning/event-upcaster.ts

type Upcaster = (event: any) => any

// 버전별 업캐스터 등록
const upcasters: Map<string, Map<number, Upcaster>> = new Map()

// OrderCreated v1 -> v2: shippingAddress를 구조화된 객체로 변경
upcasters.set(
  'OrderCreated',
  new Map([
    [
      1,
      (event: any) => {
        // v1에서 shippingAddress가 단일 문자열이었던 경우
        const address =
          typeof event.data.shippingAddress === 'string'
            ? {
                street: event.data.shippingAddress,
                city: 'UNKNOWN',
                zipCode: 'UNKNOWN',
                country: 'KR',
              }
            : event.data.shippingAddress

        return {
          ...event,
          eventVersion: 2,
          data: {
            ...event.data,
            shippingAddress: address,
            // v2에서 추가된 필드에 기본값 적용
            orderSource: event.data.orderSource ?? 'WEB',
          },
        }
      },
    ],
  ])
)

function upcastEvent(event: any): any {
  const eventUpcasters = upcasters.get(event.eventType)
  if (!eventUpcasters) return event

  let current = event
  const targetVersion = Math.max(...Array.from(eventUpcasters.keys())) + 1

  // 현재 버전부터 최신 버전까지 순차 적용
  for (let v = current.eventVersion; v < targetVersion; v++) {
    const upcaster = eventUpcasters.get(v)
    if (upcaster) {
      current = upcaster(current)
    }
  }

  return current
}

// 사용 예
// const rawEvent = loadFromEventStore();
// const currentEvent = upcastEvent(rawEvent);
// aggregate.apply(currentEvent);

이벤트 버전 관리에서 가장 중요한 원칙은 절대 기존 이벤트를 수정하지 않는 것이다. 이벤트 스토어에 저장된 데이터는 불변이다. 대신 읽는 시점에 변환(Upcasting)하거나, 새 이벤트 타입을 정의한다.

장애 복구 절차

이벤트 소싱 시스템에서 장애 복구는 전통적 시스템과 다르다. 이벤트 스토어가 진실의 원천(Single Source of Truth)이므로, 프로젝션은 언제든 재구축할 수 있다.

프로젝션 손상 시 복구

프로젝션이 잘못된 데이터를 포함하거나, 프로젝션 로직에 버그가 있었을 때 사용하는 절차다.

  1. 프로젝션 서비스 중지: 해당 프로젝션의 구독을 중지한다
  2. 읽기 모델 테이블 DROP 또는 TRUNCATE: 기존 잘못된 데이터를 제거한다
  3. 체크포인트 초기화: projection_checkpoints 테이블에서 해당 프로젝션의 position을 삭제한다
  4. 프로젝션 서비스 재시작: 이벤트 스토어의 처음부터 모든 이벤트를 재생하여 읽기 모델을 재구축한다
  5. 재구축 완료 확인: 프로젝션이 현재 위치까지 따라잡았는지(caught-up) 확인한다

경고: 이벤트가 수억 건이면 재구축에 수 시간이 걸릴 수 있다. 병렬 처리 가능한 프로젝션 아키텍처를 미리 설계하거나, 파티션 단위로 재구축할 수 있도록 준비한다.

이벤트 스토어 클러스터 장애 시

EventStoreDB는 리더-팔로워 아키텍처를 사용한다. 리더 노드가 다운되면 팔로워 중 하나가 자동으로 리더로 승격된다. 이 과정에서 주의할 점은 다음과 같다.

  • 쓰기 실패 처리: 리더 전환 중 쓰기가 실패하면 재시도한다. expectedRevision으로 멱등성이 보장된다
  • 구독 재연결: Persistent Subscription은 자동 재연결되지만, Catch-up Subscription은 마지막 체크포인트부터 수동 재연결해야 한다
  • Split-brain 방지: 최소 3노드 클러스터를 운영하여 과반수(quorum) 기반 리더 선출이 가능하도록 한다

일반적인 장애 시나리오와 대응

장애 시나리오원인대응 방법
프로젝션 데이터 불일치프로젝션 로직 버그프로젝션 재구축 (이벤트 전체 재생)
Aggregate 로드 실패이벤트 스트림 손상스냅샷 폴백 후 부분 재생, 클러스터 복제본 확인
동시 쓰기 충돌같은 Aggregate 동시 수정WrongExpectedVersionError 캐치 후 재시도
구독 지연(Consumer Lag)프로젝션 처리 속도 부족파티셔닝 또는 프로젝션 인스턴스 수평 확장
이벤트 스토어 디스크 풀이벤트 증가아카이빙 정책 적용, 스트림 maxAge/maxCount 설정
스냅샷 스키마 불일치Aggregate 구조 변경 후 배포자동 폴백으로 전체 이벤트 재생

운영 체크리스트

프로덕션 배포 전 반드시 확인해야 하는 항목들이다.

설계 단계 체크리스트

  • 이벤트 스키마에 eventVersion 필드가 포함되어 있는가
  • 이벤트 메타데이터에 correlationId, causationId가 포함되어 있는가
  • Aggregate의 불변식 검증 로직이 완전한가
  • 커맨드 실패 시 에러 응답이 명확한가
  • CQRS가 정말 필요한 도메인인가 (단순 CRUD가 아닌지 재확인)

구현 단계 체크리스트

  • 프로젝션 핸들러가 멱등(Idempotent)한가
  • 낙관적 동시성 제어가 구현되어 있는가 (expectedRevision)
  • 이벤트 직렬화/역직렬화 테스트가 작성되어 있는가
  • Upcaster가 이전 버전 이벤트를 올바르게 변환하는가
  • 스냅샷 스키마 버전 불일치 시 전체 이벤트 재생으로 폴백하는가

운영 단계 체크리스트

  • 이벤트 스토어 클러스터가 3노드 이상으로 구성되어 있는가
  • 프로젝션 consumer lag 모니터링이 설정되어 있는가
  • 이벤트 스토어 디스크 사용량 알림이 설정되어 있는가
  • 프로젝션 재구축 절차가 문서화되어 있는가
  • 스냅샷 생성 주기와 보관 정책이 설정되어 있는가
  • Dead Letter Queue(DLQ)가 설정되어 있는가 (처리 불가 이벤트 격리)
  • 장애 시 수동 이벤트 보정 스크립트가 준비되어 있는가

흔한 실수와 안티패턴

실전에서 자주 발생하는 실수를 정리한다. 이벤트 소싱 프로젝트가 실패하는 대부분의 원인이 여기 있다.

안티패턴 1: 이벤트에 현재 상태 전체를 저장

// BAD: 이벤트에 전체 상태를 넣는 것은 CRUD와 다를 바 없다
interface OrderUpdated {
  eventType: 'OrderUpdated'
  data: {
    order: Order // 전체 Order 객체
  }
}

// GOOD: 변경된 사실만 이벤트로 기록
interface OrderItemAdded {
  eventType: 'OrderItemAdded'
  data: {
    orderId: string
    productId: string
    quantity: number
    unitPrice: number
  }
}

안티패턴 2: 프로젝션에서 외부 서비스 호출

프로젝션 핸들러에서 외부 API를 호출하면, 이벤트 재생(replay) 시 부작용이 발생한다. 프로젝션은 순수 함수처럼 이벤트 데이터만으로 읽기 모델을 갱신해야 한다. 외부 서비스 호출이 필요한 로직은 별도의 Policy 핸들러나 Saga로 분리한다.

안티패턴 3: 모든 도메인에 이벤트 소싱 적용

이벤트 소싱은 상태 변경 이력이 비즈니스 가치를 갖는 도메인에 적용한다. 사용자 설정, 코드 테이블 등 단순 CRUD 도메인에 이벤트 소싱을 적용하면 복잡도만 올라가고 얻는 것은 없다. 시스템의 핵심 도메인(Core Domain)에만 선별적으로 적용하는 것이 올바른 전략이다.

안티패턴 4: 이벤트 스트림에 10만 건 이상 축적

하나의 Aggregate 스트림에 이벤트가 10만 건 이상 쌓이면 로딩 시간이 수 초 이상 걸린다. 스냅샷으로 완화할 수 있지만, 근본 원인은 Aggregate 경계가 잘못 설계된 것이다. Aggregate를 더 작은 단위로 분할하거나, 주기적 이벤트 생성 패턴을 재검토한다.

이벤트 소싱 vs 전통 CRUD 의사결정 기준

마지막으로, 이벤트 소싱 도입 여부를 결정할 때 참고할 비교표다.

기준전통 CRUD이벤트 소싱
현재 상태 조회단순 (직접 읽기)복잡 (이벤트 재생 또는 프로젝션)
변경 이력 추적별도 감사 로그 필요자동 확보 (이벤트 자체가 이력)
스키마 변경ALTER TABLE 마이그레이션이벤트 버전 관리 + Upcasting
디버깅현재 상태만 확인 가능전체 이력 재생으로 문제 원인 추적 가능
데이터 정합성 보장ACID 트랜잭션Aggregate 단위 일관성 + 최종 일관성
저장 공간현재 상태만 저장모든 이벤트 축적 (저장 비용 높음)
읽기 성능인덱스 최적화로 충분프로젝션 설계 필수
초기 개발 속도빠름느림 (학습 곡선, 인프라 구성)
복잡 도메인 대응력도메인 복잡도 증가 시 한계이벤트 중심 모델링으로 확장성 확보
팀 역량 요구일반적DDD, 이벤트 모델링 경험 필요

금융 거래, 물류 추적, 의료 기록, 협업 도구처럼 변경 이력 자체가 비즈니스 가치인 도메인에서 이벤트 소싱은 탁월한 선택이다. 그 외에는 전통 CRUD를 유지하되, 필요한 부분에만 감사 로그를 추가하는 것이 현실적이다.

정리

이벤트 소싱과 CQRS는 강력하지만 복잡한 패턴이다. 성공적으로 도입하려면 다음을 기억한다.

  • 이벤트 스키마 설계에 가장 많은 시간을 투자한다. 이벤트는 영원히 남는다.
  • 프로젝션은 반드시 멱등하게 구현한다. 이벤트 재생이 안전해야 운영이 가능하다.
  • 스냅샷은 성능 최적화 도구이지, 아키텍처의 필수 요소가 아니다. 필요할 때 도입한다.
  • 이벤트 버전 관리 전략을 프로젝트 초기에 확립한다. 나중에 추가하면 고통스럽다.
  • 모든 도메인에 적용하지 않는다. 핵심 도메인에만 선별적으로 적용한다.

References