Skip to content
Published on

Event-Driven Architecture + CQRS + Event Sourcing 실전 구현: Kafka/RabbitMQ 기반 분산 시스템 설계

Authors
  • Name
    Twitter
Event-Driven Architecture with CQRS and Event Sourcing

들어가며

마이크로서비스 아키텍처가 보편화되면서, 서비스 간 통신 방식은 시스템의 확장성과 복원력을 결정짓는 핵심 요소가 되었다. 동기 REST 호출 기반 아키텍처는 서비스 간 강한 결합을 만들고, 하나의 서비스 장애가 전체 시스템으로 전파되는 캐스케이드 장애를 유발한다.

Event-Driven Architecture(EDA)는 이 문제를 해결하는 대표적인 패턴이다. 서비스는 이벤트를 발행(publish)하고, 관심 있는 서비스가 이를 구독(subscribe)하여 비동기적으로 처리한다. 여기에 CQRS(Command Query Responsibility Segregation)와 Event Sourcing을 결합하면 읽기/쓰기 워크로드를 독립적으로 스케일링하고, 모든 상태 변경의 감사 추적(audit trail)을 자연스럽게 확보할 수 있다.

이 글에서는 EDA의 핵심 패턴부터 CQRS와 Event Sourcing의 구현, 메시지 브로커 비교, Saga 패턴, 그리고 프로덕션에서 겪는 장애 사례와 대응 방법을 코드 수준에서 다룬다.

Event-Driven Architecture 핵심 패턴

이벤트의 종류

EDA에서 이벤트는 크게 세 가지로 분류된다.

유형설명예시특징
Domain Event비즈니스 도메인에서 발생한 의미 있는 사실OrderPlaced, PaymentCompleted과거형 명명, 불변
Integration Event서비스 경계를 넘어 전파되는 이벤트OrderShipped (물류 서비스로)서비스 간 계약
Notification Event단순 알림 (데이터 최소화)OrderStatusChanged수신자가 상세 데이터를 별도 조회

EDA 통신 패턴 비교

패턴결합도전달 보장순서 보장사용 사례
Pub/Sub느슨At-least-once보장 안 됨알림, 캐시 무효화
Event Streaming느슨At-least-once파티션 내 보장로그 집계, 실시간 분석
Event Sourcing자기 참조영속 저장애그리거트 내 보장감사 추적, 상태 복원
Request-Reply강함동기요청-응답 쌍동기적 확인 필요 시

TypeScript 이벤트 기본 구조

// 도메인 이벤트 기본 인터페이스
interface DomainEvent {
  readonly eventId: string
  readonly eventType: string
  readonly aggregateId: string
  readonly aggregateType: string
  readonly version: number
  readonly timestamp: Date
  readonly metadata: EventMetadata
  readonly payload: Record<string, unknown>
}

interface EventMetadata {
  readonly correlationId: string
  readonly causationId: string
  readonly userId?: string
  readonly traceId?: string
}

// 구체적 이벤트 예시
class OrderPlacedEvent implements DomainEvent {
  readonly eventType = 'OrderPlaced'
  readonly aggregateType = 'Order'

  constructor(
    public readonly eventId: string,
    public readonly aggregateId: string,
    public readonly version: number,
    public readonly timestamp: Date,
    public readonly metadata: EventMetadata,
    public readonly payload: {
      customerId: string
      items: Array<{ productId: string; quantity: number; price: number }>
      totalAmount: number
      currency: string
    }
  ) {}
}

CQRS (Command Query Responsibility Segregation) 심화

CQRS의 핵심 원칙

CQRS는 읽기(Query) 모델과 쓰기(Command) 모델을 완전히 분리하는 패턴이다. 전통적인 CRUD 모델에서는 동일한 데이터 모델로 읽기와 쓰기를 모두 처리하지만, CQRS에서는 각각의 목적에 최적화된 별도의 모델을 사용한다.

// Command 측 - 쓰기 모델
interface Command {
  readonly commandId: string
  readonly commandType: string
  readonly timestamp: Date
}

class PlaceOrderCommand implements Command {
  readonly commandType = 'PlaceOrder'

  constructor(
    public readonly commandId: string,
    public readonly timestamp: Date,
    public readonly customerId: string,
    public readonly items: Array<{
      productId: string
      quantity: number
      price: number
    }>,
    public readonly shippingAddress: string
  ) {}
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventBus: EventBus
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // 비즈니스 검증
    await this.validateInventory(command.items)
    await this.validateCustomerCredit(command.customerId)

    // 애그리거트 생성 및 이벤트 발행
    const order = Order.create(command.customerId, command.items, command.shippingAddress)

    await this.orderRepository.save(order)

    // 도메인 이벤트 발행
    for (const event of order.getUncommittedEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }

  private async validateInventory(
    items: Array<{ productId: string; quantity: number; price: number }>
  ): Promise<void> {
    // 재고 확인 로직
  }

  private async validateCustomerCredit(customerId: string): Promise<void> {
    // 고객 신용 확인 로직
  }
}

Query 측 - 읽기 모델

// Query 측 - 읽기에 최적화된 비정규화 모델
interface OrderSummaryReadModel {
  orderId: string
  customerName: string
  customerEmail: string
  orderDate: Date
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: Date
}

// Query Handler
class GetOrderSummaryHandler {
  constructor(private readonly readDb: ReadDatabase) {}

  async handle(orderId: string): Promise<OrderSummaryReadModel | null> {
    // 읽기 전용 DB에서 비정규화된 데이터 직접 조회
    return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])
  }
}

// Projection - 이벤트를 읽기 모델로 변환
class OrderSummaryProjection {
  constructor(private readonly readDb: ReadDatabase) {}

  async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
    await this.readDb.execute(
      `INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)
       VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,
      [
        event.aggregateId,
        event.payload.customerId,
        event.timestamp,
        event.payload.totalAmount,
        event.payload.items.length,
        new Date(),
      ]
    )
  }

  async handleOrderShipped(event: DomainEvent): Promise<void> {
    await this.readDb.execute(
      `UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,
      [new Date(), event.aggregateId]
    )
  }
}

Event Sourcing 구현

Event Store 설계

Event Sourcing에서는 상태를 직접 저장하지 않고, 상태 변경 이벤트의 시퀀스를 저장한다. 현재 상태는 이벤트를 순서대로 재생(replay)하여 복원한다.

// Event Store 인터페이스
interface EventStore {
  append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>
  getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>
  getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>
}

// PostgreSQL 기반 Event Store 구현
class PostgresEventStore implements EventStore {
  constructor(private readonly pool: Pool) {}

  async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
    const client = await this.pool.connect()
    try {
      await client.query('BEGIN')

      // 낙관적 동시성 제어 (Optimistic Concurrency Control)
      const result = await client.query(
        'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
        [aggregateId]
      )
      const currentVersion = result.rows[0]?.current_version ?? 0

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current version is ${currentVersion}`
        )
      }

      // 이벤트 배치 삽입
      for (const event of events) {
        await client.query(
          `INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
          [
            event.eventId,
            event.aggregateId,
            event.aggregateType,
            event.eventType,
            event.version,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
            event.timestamp,
          ]
        )
      }

      await client.query('COMMIT')
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }

  async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',
      [aggregateId, fromVersion]
    )
    return result.rows.map(this.deserializeEvent)
  }

  async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {
    const query = fromTimestamp
      ? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'
      : 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'
    const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]
    const result = await this.pool.query(query, params)
    return result.rows.map(this.deserializeEvent)
  }

  private deserializeEvent(row: any): DomainEvent {
    return {
      eventId: row.event_id,
      eventType: row.event_type,
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      version: row.version,
      timestamp: row.timestamp,
      metadata: JSON.parse(row.metadata),
      payload: JSON.parse(row.payload),
    }
  }
}

애그리거트와 이벤트 재생

// Event Sourced 애그리거트
abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version = event.version
    this.uncommittedEvents.push(event)
  }

  protected abstract when(event: DomainEvent): void

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }

  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version = event.version
    }
  }
}

// Order 애그리거트
class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _status: OrderStatus = OrderStatus.DRAFT
  private _items: OrderItem[] = []
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  static create(
    customerId: string,
    items: Array<{ productId: string; quantity: number; price: number }>,
    shippingAddress: string
  ): Order {
    const order = new Order()
    const orderId = generateUUID()
    const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: generateUUID(),
      eventType: 'OrderPlaced',
      aggregateId: orderId,
      aggregateType: 'Order',
      version: 1,
      timestamp: new Date(),
      metadata: { correlationId: generateUUID(), causationId: generateUUID() },
      payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },
    })

    return order
  }

  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error('Order can only be confirmed when in PLACED status')
    }

    this.apply({
      eventId: generateUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      metadata: { correlationId: generateUUID(), causationId: generateUUID() },
      payload: { confirmedAt: new Date().toISOString() },
    })
  }

  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced':
        this._id = event.aggregateId
        this._customerId = event.payload.customerId as string
        this._status = OrderStatus.PLACED
        this._items = event.payload.items as OrderItem[]
        this._totalAmount = event.payload.totalAmount as number
        break
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderShipped':
        this._status = OrderStatus.SHIPPED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

Python Event Store 구현

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Protocol
from uuid import uuid4
import json
import asyncpg


@dataclass(frozen=True)
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    aggregate_type: str
    version: int
    timestamp: datetime
    payload: dict[str, Any]
    metadata: dict[str, str] = field(default_factory=dict)


class EventStore(Protocol):
    async def append(
        self,
        aggregate_id: str,
        events: list[DomainEvent],
        expected_version: int,
    ) -> None: ...

    async def get_events(
        self, aggregate_id: str, from_version: int = 0
    ) -> list[DomainEvent]: ...


class PostgresEventStore:
    def __init__(self, pool: asyncpg.Pool):
        self._pool = pool

    async def append(
        self,
        aggregate_id: str,
        events: list[DomainEvent],
        expected_version: int,
    ) -> None:
        async with self._pool.acquire() as conn:
            async with conn.transaction():
                # 낙관적 동시성 제어
                row = await conn.fetchrow(
                    "SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",
                    aggregate_id,
                )
                current = row["cur"] or 0
                if current != expected_version:
                    raise ConcurrencyError(
                        f"Expected {expected_version}, got {current}"
                    )

                for event in events:
                    await conn.execute(
                        """
                        INSERT INTO events
                            (event_id, aggregate_id, aggregate_type,
                             event_type, version, payload, metadata, timestamp)
                        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
                        """,
                        event.event_id,
                        event.aggregate_id,
                        event.aggregate_type,
                        event.event_type,
                        event.version,
                        json.dumps(event.payload),
                        json.dumps(event.metadata),
                        event.timestamp,
                    )

    async def get_events(
        self, aggregate_id: str, from_version: int = 0
    ) -> list[DomainEvent]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                """
                SELECT * FROM events
                WHERE aggregate_id = $1 AND version > $2
                ORDER BY version ASC
                """,
                aggregate_id,
                from_version,
            )
            return [self._deserialize(row) for row in rows]

    @staticmethod
    def _deserialize(row) -> DomainEvent:
        return DomainEvent(
            event_id=row["event_id"],
            event_type=row["event_type"],
            aggregate_id=row["aggregate_id"],
            aggregate_type=row["aggregate_type"],
            version=row["version"],
            timestamp=row["timestamp"],
            payload=json.loads(row["payload"]),
            metadata=json.loads(row["metadata"]),
        )

메시지 브로커 비교: Kafka vs RabbitMQ vs NATS

핵심 특성 비교

항목Apache KafkaRabbitMQNATS JetStream
모델분산 로그 (Append-only)메시지 큐 (Push 기반)스트리밍 (Pull/Push)
메시지 보존설정된 기간 동안 영구 보존소비 후 삭제 (기본)설정된 기간 보존
순서 보장파티션 내 보장큐 단위 보장스트림 내 보장
처리량초당 수백만 건초당 수만 건초당 수십만 건
지연시간ms 단위 (배치)us 단위 (단건)us 단위
컨슈머 그룹네이티브 지원경쟁 소비자 패턴네이티브 지원
Replay오프셋 기반 자유 이동제한적 (dead letter)시퀀스 기반 이동
운영 복잡도높음 (ZooKeeper/KRaft)중간낮음
적합한 사례이벤트 스트리밍, 로그 집계태스크 큐, RPC경량 메시징, IoT

Kafka Producer/Consumer 예시

import { Kafka, logLevel } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  logLevel: logLevel.WARN,
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
})

// Producer - 이벤트 발행
class KafkaEventPublisher {
  private producer = kafka.producer({
    idempotent: true, // 멱등성 보장
    maxInFlightRequests: 5,
    transactionalId: 'order-service-producer',
  })

  async publish(event: DomainEvent): Promise<void> {
    await this.producer.send({
      topic: `events.${event.aggregateType.toLowerCase()}`,
      messages: [
        {
          key: event.aggregateId, // 파티셔닝 키 = 애그리거트 ID
          value: JSON.stringify(event),
          headers: {
            'event-type': event.eventType,
            'correlation-id': event.metadata.correlationId,
            'content-type': 'application/json',
          },
        },
      ],
    })
  }

  async publishBatch(events: DomainEvent[]): Promise<void> {
    const transaction = await this.producer.transaction()
    try {
      for (const event of events) {
        await transaction.send({
          topic: `events.${event.aggregateType.toLowerCase()}`,
          messages: [
            {
              key: event.aggregateId,
              value: JSON.stringify(event),
              headers: {
                'event-type': event.eventType,
                'correlation-id': event.metadata.correlationId,
              },
            },
          ],
        })
      }
      await transaction.commit()
    } catch (error) {
      await transaction.abort()
      throw error
    }
  }
}

// Consumer - 이벤트 소비 (멱등성 보장)
class KafkaEventConsumer {
  private consumer = kafka.consumer({ groupId: 'projection-service' })
  private processedEvents = new Set<string>()

  async start(): Promise<void> {
    await this.consumer.subscribe({
      topics: ['events.order'],
      fromBeginning: false,
    })

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const event: DomainEvent = JSON.parse(message.value!.toString())

        // 멱등성 체크 - 이미 처리된 이벤트 스킵
        if (await this.isAlreadyProcessed(event.eventId)) {
          console.log(`Skipping duplicate event: ${event.eventId}`)
          return
        }

        await this.handleEvent(event)
        await this.markAsProcessed(event.eventId)
      },
    })
  }

  private async isAlreadyProcessed(eventId: string): Promise<boolean> {
    // Redis 또는 DB에서 처리 여부 확인
    return this.processedEvents.has(eventId)
  }

  private async markAsProcessed(eventId: string): Promise<void> {
    this.processedEvents.add(eventId)
  }

  private async handleEvent(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.handleOrderPlaced(event)
        break
      case 'OrderConfirmed':
        await this.handleOrderConfirmed(event)
        break
    }
  }

  private async handleOrderPlaced(event: DomainEvent): Promise<void> {
    console.log(`Processing OrderPlaced for ${event.aggregateId}`)
  }

  private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
    console.log(`Processing OrderConfirmed for ${event.aggregateId}`)
  }
}

Saga 패턴: 분산 트랜잭션 관리

Orchestration Saga vs Choreography Saga

항목OrchestrationChoreography
제어 방식중앙 오케스트레이터가 조율각 서비스가 자율적으로 반응
결합도오케스트레이터에 의존서비스 간 느슨한 결합
복잡성오케스트레이터에 집중서비스에 분산
추적중앙에서 상태 확인 가능분산된 상태 추적 필요
에러 처리중앙에서 보상 트랜잭션 실행각 서비스가 보상 이벤트 발행
적합한 경우복잡한 워크플로우 (5개 이상 단계)단순한 워크플로우 (3개 이하 단계)

Orchestration Saga 구현

// Saga 상태 머신
enum SagaStatus {
  STARTED = 'STARTED',
  ORDER_CREATED = 'ORDER_CREATED',
  PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',
  INVENTORY_RESERVED = 'INVENTORY_RESERVED',
  COMPLETED = 'COMPLETED',
  COMPENSATING = 'COMPENSATING',
  FAILED = 'FAILED',
}

interface SagaStep {
  name: string
  execute: (context: SagaContext) => Promise<void>
  compensate: (context: SagaContext) => Promise<void>
}

interface SagaContext {
  sagaId: string
  orderId: string
  customerId: string
  items: Array<{ productId: string; quantity: number; price: number }>
  paymentId?: string
  reservationId?: string
  [key: string]: unknown
}

class OrderSagaOrchestrator {
  private steps: SagaStep[] = []
  private completedSteps: SagaStep[] = []

  constructor(
    private readonly sagaRepository: SagaRepository,
    private readonly eventBus: EventBus
  ) {
    this.steps = [
      {
        name: 'CreateOrder',
        execute: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'CreateOrderRequested',
            aggregateId: ctx.orderId,
            payload: { customerId: ctx.customerId, items: ctx.items },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'CancelOrderRequested',
            aggregateId: ctx.orderId,
            payload: { reason: 'Saga compensation' },
          } as DomainEvent)
        },
      },
      {
        name: 'ProcessPayment',
        execute: async (ctx) => {
          const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
          await this.eventBus.publish({
            eventType: 'ProcessPaymentRequested',
            aggregateId: ctx.orderId,
            payload: { customerId: ctx.customerId, amount: totalAmount },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'RefundPaymentRequested',
            aggregateId: ctx.orderId,
            payload: { paymentId: ctx.paymentId },
          } as DomainEvent)
        },
      },
      {
        name: 'ReserveInventory',
        execute: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'ReserveInventoryRequested',
            aggregateId: ctx.orderId,
            payload: { items: ctx.items },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'ReleaseInventoryRequested',
            aggregateId: ctx.orderId,
            payload: { reservationId: ctx.reservationId },
          } as DomainEvent)
        },
      },
    ]
  }

  async start(context: SagaContext): Promise<void> {
    await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)
    await this.executeNextStep(context, 0)
  }

  private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {
    if (stepIndex >= this.steps.length) {
      await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)
      return
    }

    const step = this.steps[stepIndex]
    try {
      await step.execute(context)
      this.completedSteps.push(step)
      await this.executeNextStep(context, stepIndex + 1)
    } catch (error) {
      console.error(`Saga step '${step.name}' failed:`, error)
      await this.compensate(context)
    }
  }

  private async compensate(context: SagaContext): Promise<void> {
    await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)

    // 완료된 스텝을 역순으로 보상
    for (const step of [...this.completedSteps].reverse()) {
      try {
        await step.compensate(context)
      } catch (error) {
        console.error(`Compensation for '${step.name}' failed:`, error)
        // 보상 실패 시 수동 개입 필요 - 알림 발송
      }
    }

    await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)
  }
}

Snapshot 전략: 이벤트 재생 최적화

이벤트 수가 많아지면 재생(replay) 시간이 길어진다. Snapshot은 특정 시점의 애그리거트 상태를 저장하여 이 문제를 해결한다.

// Snapshot 기반 애그리거트 로딩
class EventSourcedRepository<T extends EventSourcedAggregate> {
  private readonly SNAPSHOT_INTERVAL = 50 // 50개 이벤트마다 스냅샷

  constructor(
    private readonly eventStore: EventStore,
    private readonly snapshotStore: SnapshotStore,
    private readonly factory: () => T
  ) {}

  async load(aggregateId: string): Promise<T> {
    const aggregate = this.factory()

    // 1. 최신 스냅샷 로드
    const snapshot = await this.snapshotStore.getLatest(aggregateId)

    if (snapshot) {
      aggregate.restoreFromSnapshot(snapshot.state)
      // 2. 스냅샷 이후의 이벤트만 재생
      const events = await this.eventStore.getEvents(aggregateId, snapshot.version)
      aggregate.loadFromHistory(events)
    } else {
      // 3. 전체 이벤트 재생
      const events = await this.eventStore.getEvents(aggregateId)
      aggregate.loadFromHistory(events)
    }

    return aggregate
  }

  async save(aggregate: T): Promise<void> {
    const events = aggregate.getUncommittedEvents()
    await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)

    // 스냅샷 생성 조건 확인
    if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotStore.save({
        aggregateId: aggregate.id,
        version: aggregate.version,
        state: aggregate.toSnapshot(),
        timestamp: new Date(),
      })
    }

    aggregate.clearUncommittedEvents()
  }
}

운영 주의사항

반드시 알아야 할 핵심 경고

  1. 이벤트 스키마 변경은 DB 마이그레이션보다 위험하다: Event Store의 이벤트는 불변이므로, 스키마를 변경하면 과거 이벤트 재생이 깨진다. 반드시 하위 호환성(backward compatibility)을 유지해야 한다.

  2. CQRS 도입은 복잡도를 최소 2배로 만든다: 읽기 모델과 쓰기 모델의 동기화 지연(eventual consistency)을 UI와 비즈니스 로직에서 반드시 처리해야 한다.

  3. Saga 보상 트랜잭션 실패는 수동 개입이 필요하다: 보상 트랜잭션도 실패할 수 있으며, 이 경우 Dead Letter Queue와 수동 복구 프로세스가 반드시 필요하다.

  4. 이벤트 순서 역전은 데이터 정합성을 파괴한다: Kafka의 파티셔닝 키가 잘못 설정되면 동일 애그리거트의 이벤트가 다른 파티션에 분산되어 순서가 뒤바뀐다.

  5. Event Store의 무한 성장을 관리해야 한다: Snapshot과 아카이빙 전략 없이는 Event Store가 무한히 커져 재생 성능이 급격히 저하된다.

장애 사례와 복구 절차

사례 1: 이벤트 순서 역전

증상: OrderPlaced 이전에 OrderShipped 이벤트가 도착하여 프로젝션이 실패한다.

원인: Kafka 파티셔닝 키를 설정하지 않아 이벤트가 다른 파티션에 분산되었다.

복구:

# 1. 컨슈머 그룹 오프셋 리셋
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group projection-service \
  --topic events.order \
  --reset-offsets --to-earliest --execute

# 2. 프로젝션 테이블 초기화
psql -c "TRUNCATE TABLE order_summaries;"

# 3. 프로젝션 서비스 재시작 (전체 재생)
kubectl rollout restart deployment/projection-service

예방: 반드시 애그리거트 ID를 Kafka 파티셔닝 키로 사용한다.

사례 2: 중복 이벤트 처리

증상: 주문이 이중으로 생성되거나, 결제가 두 번 처리된다.

원인: 컨슈머 장애 후 재시작 시 이미 처리된 메시지를 다시 소비한다 (at-least-once 특성).

복구:

// 멱등성 키 테이블 기반 중복 방지
class IdempotencyGuard {
  constructor(private readonly db: Database) {}

  async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {
    try {
      // UNIQUE 제약 조건으로 중복 삽입 방지
      await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [
        idempotencyKey,
        new Date(),
      ])
    } catch (error) {
      // 이미 처리된 이벤트
      console.log(`Event ${idempotencyKey} already processed, skipping`)
      return null
    }

    return operation()
  }
}

사례 3: 스키마 진화 실패

증상: 새 버전의 이벤트 핸들러가 과거 이벤트를 역직렬화하지 못해 프로젝션이 중단된다.

원인: 이벤트에 필수 필드를 추가하면서 과거 이벤트에는 해당 필드가 없다.

복구:

// 이벤트 업캐스터 (Upcaster) 패턴
class EventUpcaster {
  private upcasters: Map<string, (event: any) => DomainEvent> = new Map()

  register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {
    this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)
  }

  upcast(event: any): DomainEvent {
    const key = `${event.eventType}:v${event.schemaVersion || 1}`
    const upcaster = this.upcasters.get(key)

    if (upcaster) {
      return this.upcast(upcaster(event)) // 재귀적으로 최신 버전까지
    }

    return event
  }
}

// 사용 예: v1 -> v2 변환
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => ({
  ...event,
  schemaVersion: 2,
  payload: {
    ...event.payload,
    currency: event.payload.currency || 'KRW', // 기본값 추가
    shippingMethod: 'STANDARD', // 새 필드에 기본값 설정
  },
}))

프로덕션 체크리스트

Event Store

  • 이벤트 테이블에 aggregate_id + version 유니크 인덱스 설정
  • 낙관적 동시성 제어(Optimistic Concurrency) 구현 확인
  • Snapshot 전략 설정 (50~100 이벤트마다)
  • 이벤트 아카이빙 정책 수립 (cold storage 이동)
  • Event Store 백업 및 복구 절차 검증

CQRS

  • Read Model 재구축(Rebuild) 프로세스 자동화
  • 읽기/쓰기 DB 분리 여부와 복제 지연 모니터링
  • Projection 실패 시 알림 및 자동 재시도 설정
  • Eventual Consistency 처리 (UI에서 낙관적 업데이트)

메시지 브로커

  • Kafka: 파티셔닝 키를 애그리거트 ID로 설정
  • Consumer 그룹 lag 모니터링 (Burrow 또는 Kafka Exporter)
  • Dead Letter Queue(DLQ) 설정 및 모니터링
  • 메시지 직렬화 포맷 결정 (Avro + Schema Registry 권장)
  • 브로커 클러스터 복제 팩터 3 이상 설정

Saga

  • 보상 트랜잭션 정의 및 테스트 완료
  • Saga 상태 영속화 (DB 저장) 확인
  • Saga 타임아웃 설정 (무한 대기 방지)
  • 보상 실패 시 수동 복구 프로세스 문서화

이벤트 스키마

  • Schema Registry 도입 (Confluent Schema Registry, AWS Glue)
  • 하위 호환성 검증 자동화 (CI 파이프라인에 포함)
  • 이벤트 업캐스터(Upcaster) 구현
  • 이벤트 카탈로그 문서 유지

모니터링

  • 이벤트 처리 지연시간(latency) 메트릭 수집
  • Consumer Group lag 알림 설정
  • 이벤트 처리 실패율 대시보드 구성
  • Correlation ID 기반 엔드투엔드 트레이싱

참고자료