Skip to content
Published on

Event-Driven Architecture 실전 가이드: CQRS, Event Sourcing, Saga 패턴

Authors
  • Name
    Twitter
Event-Driven Architecture - CQRS, Event Sourcing, Saga

들어가며

마이크로서비스 아키텍처에서 가장 큰 도전 중 하나는 여러 서비스에 걸친 데이터 일관성과 비즈니스 트랜잭션 관리다. 전통적인 모놀리스에서는 단일 데이터베이스의 ACID 트랜잭션으로 해결할 수 있었지만, 서비스별로 독립된 데이터 저장소를 가지는 분산 환경에서는 2PC(Two-Phase Commit)의 한계가 명확하다. 성능 병목, 가용성 저하, 그리고 서비스 간 강한 결합이 발생하기 때문이다.

Event-Driven Architecture(EDA)는 이 문제에 대한 근본적인 해법을 제시한다. 서비스 간 통신을 비동기 이벤트 기반으로 전환하고, 상태 변경을 이벤트 스트림으로 관리하며, 분산 트랜잭션을 보상 기반 사가로 처리한다. 이 글에서는 EDA의 세 가지 핵심 패턴인 CQRS, Event Sourcing, Saga를 실전 코드와 함께 깊이 있게 분석하고, 프로덕션 운영에서 필요한 전략과 트러블슈팅 기법을 다룬다.

Netflix는 2억 6천만 구독자를 위한 개인화 시스템에, LinkedIn은 하루 수조 개의 이벤트 처리에, Slack은 수십억 건의 일일 메시지 처리에 이 패턴들을 활용하고 있다. 이러한 대규모 시스템의 경험에서 검증된 패턴과 안티패턴을 함께 살펴보자.

Event-Driven Architecture 기초

이벤트의 세 가지 유형

EDA에서 이벤트는 그 목적에 따라 세 가지로 분류된다.

유형설명예시특징
Domain Event비즈니스 도메인에서 발생한 사실 기록OrderPlaced, PaymentCompleted불변, 과거형 명명
Integration Event서비스 간 통신을 위한 이벤트OrderPlacedIntegrationEvent바운디드 컨텍스트 경계를 넘음
Event Notification변경 발생 알림 (데이터 최소화)OrderStatusChanged (ID만 포함)수신자가 필요한 데이터를 직접 조회

핵심 원칙

EDA를 올바르게 구현하기 위한 핵심 원칙은 다음과 같다.

  1. 비동기 통신: 생산자(Producer)와 소비자(Consumer)가 시간적으로 분리된다
  2. 느슨한 결합: 서비스는 이벤트 스키마만 알면 되며, 상대 서비스의 구현을 알 필요가 없다
  3. 최종 일관성: 즉각적인 강한 일관성 대신 일정 시간 후 일관성이 보장된다
  4. 멱등성: 동일한 이벤트가 여러 번 처리되어도 결과가 동일해야 한다
// TypeScript - 도메인 이벤트 기본 구조
interface DomainEvent {
  eventId: string
  eventType: string
  aggregateId: string
  aggregateType: string
  timestamp: Date
  version: number
  payload: Record<string, unknown>
  metadata: {
    correlationId: string
    causationId: string
    userId?: string
  }
}

// 주문 생성 이벤트 예시
const orderPlacedEvent: DomainEvent = {
  eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
  eventType: 'OrderPlaced',
  aggregateId: 'order-12345',
  aggregateType: 'Order',
  timestamp: new Date('2026-03-14T09:00:00Z'),
  version: 1,
  payload: {
    customerId: 'cust-67890',
    items: [
      { productId: 'prod-001', quantity: 2, price: 29900 },
      { productId: 'prod-002', quantity: 1, price: 15000 },
    ],
    totalAmount: 74800,
    shippingAddress: {
      city: '서울',
      district: '강남구',
      detail: '테헤란로 123',
    },
  },
  metadata: {
    correlationId: 'corr-abc123',
    causationId: 'cmd-place-order-001',
    userId: 'user-admin-01',
  },
}

CQRS 패턴: Command와 Query의 분리

CQRS란 무엇인가

CQRS(Command Query Responsibility Segregation)는 데이터의 쓰기(Command)와 읽기(Query)를 별도의 모델로 분리하는 패턴이다. Bertrand Meyer의 CQS(Command Query Separation) 원칙을 아키텍처 수준으로 확장한 것으로, Greg Young이 2010년에 공식화했다.

전통적인 CRUD 모델에서는 동일한 데이터 모델이 읽기와 쓰기를 모두 담당한다. 그러나 실제 비즈니스에서 읽기와 쓰기의 요구사항은 크게 다르다.

관점Command (쓰기)Query (읽기)
목적상태 변경, 비즈니스 규칙 검증데이터 조회, 화면 표시
비율전체 트래픽의 10-20%전체 트래픽의 80-90%
일관성강한 일관성 필요최종 일관성 허용 가능
모델 복잡도도메인 로직이 풍부비정규화된 조회 모델
스케일링수직 확장 위주수평 확장 용이 (캐시, 리플리카)

CQRS 구현: TypeScript 예제

// TypeScript - CQRS Command 측

// Command 정의
interface PlaceOrderCommand {
  type: 'PlaceOrder'
  customerId: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
  shippingAddress: string
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private orderRepository: OrderWriteRepository,
    private eventBus: EventBus,
    private inventoryService: InventoryService
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // 1. 비즈니스 규칙 검증
    await this.inventoryService.validateStock(command.items)

    // 2. Aggregate 생성
    const order = Order.create({
      customerId: command.customerId,
      items: command.items,
      shippingAddress: command.shippingAddress,
    })

    // 3. 쓰기 저장소에 저장
    await this.orderRepository.save(order)

    // 4. 도메인 이벤트 발행 (읽기 모델 동기화용)
    for (const event of order.getDomainEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }
}

// Query 측 - 읽기 전용 모델
interface OrderReadModel {
  orderId: string
  customerName: string
  orderDate: string
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: string
}

// Query Handler
class GetOrdersQueryHandler {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: {
    customerId: string
    status?: string
    page: number
    limit: number
  }): Promise<OrderReadModel[]> {
    // 읽기 전용 비정규화 테이블에서 직접 조회
    return this.readDb.query(
      `SELECT order_id, customer_name, order_date, status,
              total_amount, item_count, last_updated
       FROM order_read_model
       WHERE customer_id = ?
       ${query.status ? 'AND status = ?' : ''}
       ORDER BY order_date DESC
       LIMIT ? OFFSET ?`,
      [query.customerId, query.status, query.limit, query.page * query.limit]
    )
  }
}

읽기 모델 프로젝션

이벤트를 수신하여 읽기 모델을 갱신하는 프로젝션(Projection) 로직은 CQRS의 핵심이다.

// TypeScript - 이벤트 기반 프로젝션

class OrderProjection {
  constructor(private readDb: ReadDatabase) {}

  async handle(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.onOrderPlaced(event)
        break
      case 'OrderShipped':
        await this.onOrderShipped(event)
        break
      case 'OrderCancelled':
        await this.onOrderCancelled(event)
        break
    }
  }

  private async onOrderPlaced(event: DomainEvent): Promise<void> {
    const payload = event.payload as {
      customerId: string
      items: Array<{ quantity: number; price: number }>
      totalAmount: number
    }

    await this.readDb.upsert('order_read_model', {
      order_id: event.aggregateId,
      customer_id: payload.customerId,
      status: 'PLACED',
      total_amount: payload.totalAmount,
      item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
      order_date: event.timestamp,
      last_updated: event.timestamp,
      version: event.version,
    })
  }

  private async onOrderShipped(event: DomainEvent): Promise<void> {
    const payload = event.payload as { trackingNumber: string }

    // 멱등성 보장: version 체크
    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'SHIPPED',
        tracking_number: payload.trackingNumber,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId, version: event.version - 1 }
    )
  }

  private async onOrderCancelled(event: DomainEvent): Promise<void> {
    const payload = event.payload as { reason: string }

    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'CANCELLED',
        cancellation_reason: payload.reason,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId }
    )
  }
}

Event Sourcing: 이벤트 기반 상태 관리

Event Sourcing 핵심 개념

Event Sourcing은 애그리게이트의 현재 상태를 저장하는 대신, 상태를 변경한 모든 이벤트를 순서대로 저장하는 패턴이다. 현재 상태는 이벤트 스트림을 처음부터 재생(replay)하여 재구성한다.

전통적 방식과 Event Sourcing의 차이를 비교하면 다음과 같다.

관점전통적 CRUDEvent Sourcing
저장 대상현재 상태 (최신 스냅샷)상태 변경 이벤트의 전체 이력
데이터 손실이전 상태 소실모든 변경 이력 보존
감사(Audit)별도 구현 필요기본 내장
디버깅현재 상태만 확인 가능시간 여행(Time Travel) 가능
저장 공간상대적으로 적음이벤트 누적으로 증가
조회 성능직접 조회 가능프로젝션 또는 스냅샷 필요

Event Sourcing 구현

// TypeScript - Event Sourcing Aggregate

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  // 이벤트 적용 (상태 변경)
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version++
    this.uncommittedEvents.push(event)
  }

  // 이벤트 핸들러 (서브클래스에서 구현)
  protected abstract when(event: DomainEvent): void

  // 이벤트 스트림에서 상태 복원
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

// 주문 Aggregate
class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _items: OrderItem[] = []
  private _status: OrderStatus = OrderStatus.DRAFT
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  // 팩토리 메서드 (생성 명령)
  static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
    const order = new Order()
    const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderPlaced',
      aggregateId: params.orderId,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: 1,
      payload: {
        customerId: params.customerId,
        items: params.items,
        totalAmount,
      },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'create',
      },
    })

    return order
  }

  // 주문 확인 명령
  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error(`Cannot confirm order in status: ${this._status}`)
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { confirmedAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'confirm',
      },
    })
  }

  // 주문 취소 명령 (보상 트랜잭션에서 활용)
  cancel(reason: string): void {
    if (this._status === OrderStatus.CANCELLED) {
      return // 멱등성 보장
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderCancelled',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { reason, cancelledAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'cancel',
      },
    })
  }

  // 이벤트 핸들러 - 상태 변경 로직
  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced': {
        const p = event.payload as {
          customerId: string
          items: OrderItem[]
          totalAmount: number
        }
        this._id = event.aggregateId
        this._customerId = p.customerId
        this._items = p.items
        this._totalAmount = p.totalAmount
        this._status = OrderStatus.PLACED
        break
      }
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

interface OrderItem {
  productId: string
  quantity: number
  price: number
}

스냅샷 전략

이벤트 수가 많아지면 재생 시간이 길어진다. 스냅샷은 특정 시점의 상태를 캐싱하여 재생 성능을 개선한다.

# Python - 스냅샷 기반 이벤트 스토어

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json


@dataclass
class Snapshot:
    aggregate_id: str
    aggregate_type: str
    version: int
    state: dict
    created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
    SNAPSHOT_INTERVAL = 100  # 100개 이벤트마다 스냅샷 생성

    def __init__(self, db_connection):
        self.db = db_connection

    async def save_events(
        self, aggregate_id: str, events: list[dict], expected_version: int
    ) -> None:
        """낙관적 동시성 제어와 함께 이벤트 저장"""
        async with self.db.transaction():
            # 현재 버전 확인 (낙관적 잠금)
            current_version = await self._get_current_version(aggregate_id)
            if current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, "
                    f"but current version is {current_version}"
                )

            # 이벤트 배치 삽입
            for i, event in enumerate(events):
                version = expected_version + i + 1
                await self.db.execute(
                    """INSERT INTO event_store
                       (event_id, aggregate_id, aggregate_type,
                        event_type, version, payload, metadata, created_at)
                       VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                    (
                        event["event_id"],
                        aggregate_id,
                        event["aggregate_type"],
                        event["event_type"],
                        version,
                        json.dumps(event["payload"]),
                        json.dumps(event["metadata"]),
                        datetime.utcnow(),
                    ),
                )

            # 스냅샷 생성 조건 확인
            new_version = expected_version + len(events)
            if new_version % self.SNAPSHOT_INTERVAL == 0:
                await self._create_snapshot(aggregate_id, new_version)

    async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
        """스냅샷부터 이벤트를 로드하여 상태 복원"""
        # 1. 가장 최근 스냅샷 조회
        snapshot = await self._get_latest_snapshot(aggregate_id)

        if snapshot:
            # 2. 스냅샷 이후 이벤트만 로드
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ? AND version > ?
                   ORDER BY version ASC""",
                (aggregate_id, snapshot.version),
            )
            return events, snapshot
        else:
            # 3. 스냅샷 없으면 전체 이벤트 로드
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ?
                   ORDER BY version ASC""",
                (aggregate_id,),
            )
            return events, None

    async def _create_snapshot(
        self, aggregate_id: str, version: int
    ) -> None:
        """현재 상태를 스냅샷으로 저장"""
        events, _ = await self.load_aggregate(aggregate_id)
        # Aggregate 재구성 후 상태 직렬화
        aggregate = self._rebuild_aggregate(events)
        await self.db.execute(
            """INSERT INTO snapshots
               (aggregate_id, aggregate_type, version, state, created_at)
               VALUES (?, ?, ?, ?, ?)""",
            (
                aggregate_id,
                aggregate.aggregate_type,
                version,
                json.dumps(aggregate.to_dict()),
                datetime.utcnow(),
            ),
        )

    async def _get_current_version(self, aggregate_id: str) -> int:
        result = await self.db.fetch_one(
            "SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
            (aggregate_id,),
        )
        return result[0] if result[0] else 0


class ConcurrencyError(Exception):
    pass

Saga 패턴: 분산 트랜잭션 관리

Saga 패턴이란

Saga 패턴은 분산 환경에서 여러 서비스에 걸친 비즈니스 트랜잭션을 관리하는 패턴이다. 전통적인 분산 트랜잭션(2PC)과 달리, 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 이미 완료된 단계에 대해 보상 트랜잭션(Compensating Transaction)을 실행하여 일관성을 복원한다.

주문 처리 Saga의 흐름을 예로 들면 다음과 같다.

정상 흐름:

  1. 주문 서비스: 주문 생성 (OrderCreated)
  2. 결제 서비스: 결제 처리 (PaymentProcessed)
  3. 재고 서비스: 재고 차감 (InventoryReserved)
  4. 배송 서비스: 배송 생성 (ShipmentCreated)

실패 시 보상 흐름 (3단계 재고 차감 실패 시):

  1. 결제 서비스: 결제 환불 (PaymentRefunded) -- 보상
  2. 주문 서비스: 주문 취소 (OrderCancelled) -- 보상

Orchestration 기반 Saga 구현

// TypeScript - Saga Orchestrator (주문 처리)

interface SagaStep {
  name: string
  action: () => Promise<void>
  compensation: () => Promise<void>
}

class OrderSagaOrchestrator {
  private completedSteps: SagaStep[] = []
  private sagaLog: SagaLogEntry[] = []

  constructor(
    private paymentService: PaymentService,
    private inventoryService: InventoryService,
    private shippingService: ShippingService,
    private sagaStore: SagaStore
  ) {}

  async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
    const sagaId = crypto.randomUUID()

    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        action: async () => {
          await this.paymentService.processPayment({
            orderId,
            amount: orderData.totalAmount,
            customerId: orderData.customerId,
          })
        },
        compensation: async () => {
          await this.paymentService.refundPayment({
            orderId,
            amount: orderData.totalAmount,
          })
        },
      },
      {
        name: 'ReserveInventory',
        action: async () => {
          await this.inventoryService.reserve({
            orderId,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.inventoryService.releaseReservation({
            orderId,
            items: orderData.items,
          })
        },
      },
      {
        name: 'CreateShipment',
        action: async () => {
          await this.shippingService.createShipment({
            orderId,
            address: orderData.shippingAddress,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.shippingService.cancelShipment({ orderId })
        },
      },
    ]

    try {
      for (const step of steps) {
        await this.logStep(sagaId, step.name, 'STARTED')

        try {
          await step.action()
          this.completedSteps.push(step)
          await this.logStep(sagaId, step.name, 'COMPLETED')
        } catch (error) {
          await this.logStep(sagaId, step.name, 'FAILED', error)
          // 보상 트랜잭션 실행
          await this.compensate(sagaId)
          return {
            success: false,
            sagaId,
            failedStep: step.name,
            error: (error as Error).message,
          }
        }
      }

      await this.sagaStore.markCompleted(sagaId)
      return { success: true, sagaId }
    } catch (compensationError) {
      // 보상 트랜잭션도 실패한 경우 - 수동 개입 필요
      await this.sagaStore.markRequiresIntervention(sagaId)
      throw new SagaCompensationFailedError(sagaId, compensationError as Error)
    }
  }

  private async compensate(sagaId: string): Promise<void> {
    // 완료된 단계를 역순으로 보상
    const stepsToCompensate = [...this.completedSteps].reverse()

    for (const step of stepsToCompensate) {
      try {
        await this.logStep(sagaId, step.name, 'COMPENSATING')
        await step.compensation()
        await this.logStep(sagaId, step.name, 'COMPENSATED')
      } catch (error) {
        await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
        // 보상 실패 시 재시도 큐에 등록
        await this.sagaStore.enqueueRetry(sagaId, step.name)
        throw error
      }
    }
  }

  private async logStep(
    sagaId: string,
    stepName: string,
    status: string,
    error?: unknown
  ): Promise<void> {
    const entry: SagaLogEntry = {
      sagaId,
      stepName,
      status,
      timestamp: new Date(),
      error: error ? (error as Error).message : undefined,
    }
    this.sagaLog.push(entry)
    await this.sagaStore.appendLog(entry)
  }
}

interface SagaResult {
  success: boolean
  sagaId: string
  failedStep?: string
  error?: string
}

interface SagaLogEntry {
  sagaId: string
  stepName: string
  status: string
  timestamp: Date
  error?: string
}

Choreography vs Orchestration 비교

Saga 패턴의 두 가지 구현 방식을 상세히 비교한다.

비교 항목Choreography (안무)Orchestration (오케스트레이션)
제어 방식분산 - 각 서비스가 이벤트를 발행/구독중앙 집중 - 오케스트레이터가 흐름 제어
결합도낮음 (이벤트 스키마만 공유)중간 (오케스트레이터가 모든 서비스를 알아야 함)
가시성낮음 (흐름 추적 어려움)높음 (오케스트레이터에서 상태 확인 가능)
복잡도 관리참여 서비스 증가 시 급격히 복잡선형적으로 증가
단일 장애점없음오케스트레이터가 SPOF가 될 수 있음
보상 로직각 서비스에 분산오케스트레이터에 집중
테스트통합 테스트 어려움오케스트레이터 단위 테스트 용이
적합한 규모2-4개 서비스 참여하는 단순 워크플로5개 이상 서비스의 복잡한 워크플로
대표 도구Kafka, RabbitMQ, SNS/SQSTemporal, Camunda, AWS Step Functions

Choreography 패턴 코드 예시

# Python - Choreography 기반 Saga (이벤트 구독 방식)

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json


class EventBus:
    """간단한 인메모리 이벤트 버스 (실무에서는 Kafka/RabbitMQ 사용)"""

    def __init__(self):
        self._handlers: dict[str, list] = {}

    def subscribe(self, event_type: str, handler):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: dict):
        handlers = self._handlers.get(event_type, [])
        for handler in handlers:
            await handler(payload)


# 결제 서비스 - Choreography 방식
class PaymentService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # 주문 생성 이벤트 구독
        event_bus.subscribe("OrderCreated", self.on_order_created)
        # 재고 부족 이벤트 구독 (보상용)
        event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)

    async def on_order_created(self, payload: dict):
        """주문 생성 시 결제 처리"""
        try:
            order_id = payload["order_id"]
            amount = payload["total_amount"]

            # 결제 처리 로직
            payment_result = await self._process_payment(
                order_id, amount
            )

            # 성공 이벤트 발행
            await self.event_bus.publish("PaymentProcessed", {
                "order_id": order_id,
                "payment_id": payment_result["payment_id"],
                "amount": amount,
            })
        except Exception as e:
            # 실패 이벤트 발행
            await self.event_bus.publish("PaymentFailed", {
                "order_id": payload["order_id"],
                "reason": str(e),
            })

    async def on_inventory_failed(self, payload: dict):
        """재고 부족 시 결제 환불 (보상 트랜잭션)"""
        order_id = payload["order_id"]
        await self._refund_payment(order_id)
        await self.event_bus.publish("PaymentRefunded", {
            "order_id": order_id,
        })

    async def _process_payment(self, order_id: str, amount: int) -> dict:
        # 실제 결제 처리 로직
        return {"payment_id": f"pay-{order_id}"}

    async def _refund_payment(self, order_id: str) -> None:
        # 실제 환불 처리 로직
        pass


# 재고 서비스 - Choreography 방식
class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # 결제 완료 이벤트 구독
        event_bus.subscribe("PaymentProcessed", self.on_payment_processed)

    async def on_payment_processed(self, payload: dict):
        """결제 완료 시 재고 차감"""
        try:
            order_id = payload["order_id"]
            # 재고 확인 및 차감
            await self._reserve_inventory(order_id)

            await self.event_bus.publish("InventoryReserved", {
                "order_id": order_id,
            })
        except InsufficientStockError:
            await self.event_bus.publish("InventoryReservationFailed", {
                "order_id": payload["order_id"],
                "reason": "insufficient_stock",
            })


class InsufficientStockError(Exception):
    pass

하이브리드 접근: 언제 어떤 방식을 선택할 것인가

실무에서는 순수한 Choreography나 Orchestration 하나만 사용하기보다, 워크플로의 복잡도에 따라 혼합하는 것이 효과적이다.

  • Choreography 선택: 2-3개 서비스만 참여하는 단순 알림, 캐시 무효화, 로그 수집 등
  • Orchestration 선택: 결제-재고-배송처럼 순서와 보상이 중요한 핵심 비즈니스 플로우
  • 하이브리드: 핵심 비즈니스 플로우는 Orchestration으로, 부수 효과(이메일 발송, 알림, 분석 이벤트)는 Choreography로 처리

이벤트 스토어 선택 (EventStoreDB, Kafka, DynamoDB)

이벤트 스토어의 선택은 시스템의 요구사항과 팀의 역량에 따라 달라진다. 주요 세 가지 옵션을 비교한다.

비교 항목EventStoreDBApache KafkaDynamoDB Streams
설계 목적Event Sourcing 전용 DB분산 메시지 스트리밍 플랫폼범용 NoSQL + 변경 스트림
스트림 모델세분화된 개별 스트림 (Aggregate 별)토픽-파티션 기반테이블 + DynamoDB Streams/Kinesis
동시성 제어ExpectedVersion (낙관적 잠금) 내장파티션 수준 순서만 보장조건부 쓰기 (ConditionExpression)
ID별 조회스트림 ID로 즉시 조회토픽 내 특정 엔티티 조회 불가파티션 키로 직접 조회
이벤트 순서스트림 내 완전 보장파티션 내에서만 보장파티션 키 내 보장
프로젝션서버 사이드 프로젝션 내장Kafka Streams / ksqlDB로 구현Lambda + DynamoDB Streams
구독 모델Persistent / Catch-up 구독Consumer GroupDynamoDB Streams / Kinesis
운영 복잡도중간 (전용 클러스터)높음 (ZooKeeper/KRaft, 파티션 관리)낮음 (서버리스, AWS 관리형)
비용 모델오픈소스 + 상용 클라우드오픈소스 + MSK/Confluent Cloud요청/저장량 기반 종량제
최적 사용처순수 Event Sourcing 시스템대용량 이벤트 스트리밍 + 통합AWS 네이티브, 서버리스 아키텍처

선택 기준 가이드

EventStoreDB가 적합한 경우:

  • Event Sourcing이 핵심 아키텍처 패턴인 경우
  • 세밀한 스트림 관리와 프로젝션이 필요한 경우
  • DDD 기반 설계를 적극 활용하는 팀

Kafka가 적합한 경우:

  • 대용량 이벤트 스트리밍이 주 목적인 경우
  • 이미 Kafka 인프라가 있고 팀 역량이 충분한 경우
  • Event Sourcing과 이벤트 스트리밍을 모두 해야 하는 경우 (EventStoreDB + Kafka 조합 고려)

DynamoDB가 적합한 경우:

  • AWS 생태계를 주로 사용하는 경우
  • 서버리스 아키텍처를 지향하는 경우
  • 운영 부담을 최소화하고 싶은 경우

EventStoreDB 설정 예시

# docker-compose.yml - EventStoreDB 클러스터 설정
version: '3.8'
services:
  eventstoredb:
    image: eventstore/eventstore:24.2
    container_name: eventstoredb
    environment:
      - EVENTSTORE_CLUSTER_SIZE=1
      - EVENTSTORE_RUN_PROJECTIONS=All
      - EVENTSTORE_START_STANDARD_PROJECTIONS=true
      - EVENTSTORE_INSECURE=true
      - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
      - EVENTSTORE_MEM_DB=false
      - EVENTSTORE_DB=/var/lib/eventstore-data
      - EVENTSTORE_INDEX=/var/lib/eventstore-index
      - EVENTSTORE_LOG=/var/log/eventstore
    ports:
      - '2113:2113' # HTTP/gRPC
      - '1113:1113' # TCP (레거시)
    volumes:
      - eventstore-data:/var/lib/eventstore-data
      - eventstore-index:/var/lib/eventstore-index
      - eventstore-logs:/var/log/eventstore

  # Kafka - 이벤트 스트리밍용 (EventStoreDB와 조합)
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    container_name: kafka
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    ports:
      - '9092:9092'
    volumes:
      - kafka-data:/var/lib/kafka/data

volumes:
  eventstore-data:
  eventstore-index:
  eventstore-logs:
  kafka-data:

실전 구현: 주문 시스템 예제

전자상거래 주문 시스템을 통해 CQRS, Event Sourcing, Saga 패턴을 통합 구현하는 예제를 살펴본다.

시스템 아키텍처 개요

전체 시스템은 다음과 같은 구조로 구성된다.

  • 주문 서비스: 주문 Aggregate (Event Sourcing), CQRS 적용
  • 결제 서비스: 결제 처리, 환불 보상 트랜잭션
  • 재고 서비스: 재고 예약, 해제 보상 트랜잭션
  • 배송 서비스: 배송 생성, 취소 보상 트랜잭션
  • Saga 오케스트레이터: Temporal 기반 워크플로 관리

Temporal 기반 Saga Workflow

// TypeScript - Temporal Workflow로 구현한 주문 Saga

import { proxyActivities, defineSignal, setHandler, condition, sleep } from '@temporalio/workflow'

// Activity 프록시
const payment = proxyActivities<PaymentActivities>({
  startToCloseTimeout: '30s',
  retry: {
    maximumAttempts: 3,
    initialInterval: '1s',
    backoffCoefficient: 2,
    maximumInterval: '30s',
  },
})

const inventory = proxyActivities<InventoryActivities>({
  startToCloseTimeout: '10s',
  retry: { maximumAttempts: 3 },
})

const shipping = proxyActivities<ShippingActivities>({
  startToCloseTimeout: '15s',
  retry: { maximumAttempts: 3 },
})

const notification = proxyActivities<NotificationActivities>({
  startToCloseTimeout: '5s',
  retry: { maximumAttempts: 5 },
})

// 시그널 정의 (외부에서 워크플로에 메시지 전달)
const cancelOrderSignal = defineSignal<[string]>('cancelOrder')

// 주문 처리 Saga Workflow
export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {
  let isCancelled = false
  let cancelReason = ''

  // 취소 시그널 핸들러
  setHandler(cancelOrderSignal, (reason: string) => {
    isCancelled = true
    cancelReason = reason
  })

  const compensations: Array<() => Promise<void>> = []

  try {
    // Step 1: 결제 처리
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const paymentResult = await payment.processPayment({
      orderId: input.orderId,
      amount: input.totalAmount,
      customerId: input.customerId,
    })

    compensations.push(async () => {
      await payment.refundPayment({
        orderId: input.orderId,
        paymentId: paymentResult.paymentId,
        amount: input.totalAmount,
      })
    })

    // Step 2: 재고 예약
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    await inventory.reserveInventory({
      orderId: input.orderId,
      items: input.items,
    })

    compensations.push(async () => {
      await inventory.releaseReservation({
        orderId: input.orderId,
        items: input.items,
      })
    })

    // Step 3: 배송 생성
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const shipmentResult = await shipping.createShipment({
      orderId: input.orderId,
      address: input.shippingAddress,
      items: input.items,
    })

    compensations.push(async () => {
      await shipping.cancelShipment({
        orderId: input.orderId,
        shipmentId: shipmentResult.shipmentId,
      })
    })

    // 모든 단계 성공 - 확인 알림 발송 (실패해도 Saga에 영향 없음)
    await notification
      .sendOrderConfirmation({
        orderId: input.orderId,
        customerId: input.customerId,
      })
      .catch(() => {
        /* 알림 실패는 무시 */
      })

    return {
      success: true,
      orderId: input.orderId,
      paymentId: paymentResult.paymentId,
      shipmentId: shipmentResult.shipmentId,
    }
  } catch (error) {
    // 보상 트랜잭션 실행 (역순)
    for (const compensate of compensations.reverse()) {
      try {
        await compensate()
      } catch (compError) {
        // Temporal이 자동으로 재시도 관리
        // 최종 실패 시 Dead Letter Queue로 전송
        console.error('Compensation failed:', compError)
      }
    }

    return {
      success: false,
      orderId: input.orderId,
      error: (error as Error).message,
    }
  }
}

interface OrderSagaInput {
  orderId: string
  customerId: string
  totalAmount: number
  shippingAddress: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
}

interface OrderSagaResult {
  success: boolean
  orderId: string
  paymentId?: string
  shipmentId?: string
  error?: string
}

class SagaCancelledError extends Error {
  constructor(reason: string) {
    super(`Saga cancelled: ${reason}`)
    this.name = 'SagaCancelledError'
  }
}

DynamoDB 기반 이벤트 스토어 스키마

# AWS CloudFormation - DynamoDB 이벤트 스토어 테이블

AWSTemplateFormatVersion: '2010-09-09'
Description: Event Store on DynamoDB

Resources:
  EventStoreTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
        - AttributeName: eventType
          AttributeType: S
        - AttributeName: timestamp
          AttributeType: S
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      GlobalSecondaryIndexes:
        - IndexName: eventType-timestamp-index
          KeySchema:
            - AttributeName: eventType
              KeyType: HASH
            - AttributeName: timestamp
              KeyType: RANGE
          Projection:
            ProjectionType: ALL
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      Tags:
        - Key: Environment
          Value: production
        - Key: Service
          Value: event-store

  SnapshotTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store-snapshots
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      Tags:
        - Key: Environment
          Value: production

  SagaStateTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: saga-state
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: sagaId
          AttributeType: S
        - AttributeName: createdAt
          AttributeType: S
      KeySchema:
        - AttributeName: sagaId
          KeyType: HASH
      GlobalSecondaryIndexes:
        - IndexName: createdAt-index
          KeySchema:
            - AttributeName: createdAt
              KeyType: HASH
          Projection:
            ProjectionType: ALL
      TimeToLiveSpecification:
        AttributeName: ttl
        Enabled: true

운영 시 주의사항과 트러블슈팅

1. 멱등성(Idempotency) 확보

분산 환경에서 이벤트는 "최소 한 번(at-least-once)" 전달이 보장되므로, 동일한 이벤트가 중복 처리될 수 있다. 모든 이벤트 핸들러는 반드시 멱등성을 보장해야 한다.

// TypeScript - 멱등성 보장 패턴

class IdempotentEventHandler {
  constructor(
    private processedEvents: ProcessedEventStore,
    private handler: EventHandler
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    // 이미 처리된 이벤트인지 확인
    const isProcessed = await this.processedEvents.exists(event.eventId)
    if (isProcessed) {
      console.log(`Event ${event.eventId} already processed, skipping`)
      return
    }

    try {
      // 이벤트 처리
      await this.handler.handle(event)

      // 처리 완료 기록 (TTL 설정으로 자동 정리)
      await this.processedEvents.markAsProcessed(event.eventId, {
        processedAt: new Date(),
        ttl: 60 * 60 * 24 * 7, // 7일 후 만료
      })
    } catch (error) {
      // 처리 실패 시 기록하지 않아 재시도 가능
      throw error
    }
  }
}

// Redis 기반 중복 검사
class RedisProcessedEventStore implements ProcessedEventStore {
  constructor(private redis: Redis) {}

  async exists(eventId: string): Promise<boolean> {
    const result = await this.redis.exists(`processed:${eventId}`)
    return result === 1
  }

  async markAsProcessed(
    eventId: string,
    options: { processedAt: Date; ttl: number }
  ): Promise<void> {
    await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
  }
}

2. 이벤트 순서 보장

Kafka에서 이벤트 순서를 보장하려면 동일한 Aggregate의 이벤트가 동일한 파티션에 라우팅되어야 한다. Aggregate ID를 파티션 키로 사용하는 것이 일반적이다.

3. 스키마 진화(Schema Evolution)

이벤트는 불변이므로, 스키마 변경 시 하위 호환성을 유지해야 한다. Avro, Protobuf 같은 스키마 레지스트리를 사용하거나 업캐스팅(Upcasting) 패턴을 적용한다.

// TypeScript - 이벤트 업캐스터 (Schema Evolution)

class EventUpcaster {
  private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()

  register(
    eventType: string,
    fromVersion: number,
    upcaster: (event: DomainEvent) => DomainEvent
  ): void {
    if (!this.upcasters.has(eventType)) {
      this.upcasters.set(eventType, new Map())
    }
    this.upcasters.get(eventType)!.set(fromVersion, upcaster)
  }

  upcast(event: DomainEvent): DomainEvent {
    const typeUpcasters = this.upcasters.get(event.eventType)
    if (!typeUpcasters) return event

    let currentEvent = event
    let schemaVersion = (event.metadata as any).schemaVersion || 1

    while (typeUpcasters.has(schemaVersion)) {
      const upcasterFn = typeUpcasters.get(schemaVersion)!
      currentEvent = upcasterFn(currentEvent)
      schemaVersion++
    }

    return currentEvent
  }
}

// 사용 예: OrderPlaced 이벤트 v1 -> v2 업캐스팅
const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => {
  // v1에는 shippingAddress가 문자열이었지만
  // v2에서는 구조화된 객체로 변경
  const payload = event.payload as any
  return {
    ...event,
    payload: {
      ...payload,
      shippingAddress: {
        full: payload.shippingAddress,
        city: '',
        zipCode: '',
      },
    },
    metadata: {
      ...event.metadata,
      schemaVersion: 2,
    },
  }
})

4. 모니터링 핵심 지표

EDA 시스템의 건강 상태를 확인하기 위해 다음 지표를 모니터링해야 한다.

지표설명경고 임계값
Consumer Lag소비자의 처리 지연 (미처리 이벤트 수)1000건 이상
Event Processing Latency이벤트 발행에서 처리까지 소요 시간P99 5초 이상
Saga Completion RateSaga 성공률99% 미만
Compensation Failure Rate보상 트랜잭션 실패율0.1% 이상
Projection Lag읽기 모델과 쓰기 모델의 동기화 지연30초 이상
Dead Letter Queue Size처리 불가 이벤트 수0건 초과 시 즉시 알림

실패 사례와 복구 절차

사례 1: 보상 트랜잭션 실패 (가장 위험한 시나리오)

문제 상황: 결제는 완료되었으나 재고 차감이 실패하여 보상(환불)을 시도했지만, 결제 게이트웨이의 타임아웃으로 환불도 실패한 경우

복구 절차:

  1. Saga 상태를 "REQUIRES_INTERVENTION"으로 표시
  2. Dead Letter Queue에 실패한 보상 트랜잭션을 등록
  3. 자동 재시도 (지수 백오프): 1초, 2초, 4초, 8초, 16초
  4. 최대 재시도 횟수 초과 시 운영팀에 PagerDuty/Slack 알림
  5. 운영자가 수동으로 결제 게이트웨이 콘솔에서 환불 처리
  6. Saga 상태를 "MANUALLY_COMPENSATED"로 갱신

사례 2: 이벤트 순서 역전

문제 상황: 네트워크 지연으로 OrderCancelled 이벤트가 OrderConfirmed보다 먼저 도착

예방 및 복구:

  • 이벤트에 version 필드를 포함하여 순서 검증
  • 버전이 기대치와 다르면 버퍼링 후 재정렬
  • EventStoreDB를 사용하면 스트림 내 순서가 보장되므로 이 문제가 원천 방지됨

사례 3: 프로젝션 장애로 인한 읽기 모델 불일치

문제 상황: 프로젝션 프로세스가 크래시되어 읽기 모델이 최신 상태를 반영하지 못하는 경우

복구 절차:

  1. 프로젝션 프로세스의 마지막 처리 체크포인트 확인
  2. 체크포인트 이후의 이벤트부터 프로젝션 재실행
  3. 심각한 불일치의 경우 읽기 모델을 Drop 후 전체 이벤트 재생(Replay)
  4. 재생 중에는 읽기 트래픽을 캐시 또는 쓰기 DB로 우회
# Python - 프로젝션 복구 스크립트

import asyncio
from datetime import datetime


class ProjectionRecovery:
    def __init__(self, event_store, read_db, projection):
        self.event_store = event_store
        self.read_db = read_db
        self.projection = projection

    async def recover_from_checkpoint(self) -> dict:
        """체크포인트 기반 프로젝션 복구"""
        # 1. 마지막 체크포인트 확인
        checkpoint = await self.read_db.get_checkpoint(
            self.projection.name
        )
        last_position = checkpoint.get("position", 0) if checkpoint else 0

        print(
            f"Recovering projection '{self.projection.name}' "
            f"from position {last_position}"
        )

        # 2. 체크포인트 이후 이벤트 로드
        events = await self.event_store.read_all_from(last_position)
        processed = 0
        errors = 0

        for event in events:
            try:
                await self.projection.handle(event)
                processed += 1

                # 100건마다 체크포인트 갱신
                if processed % 100 == 0:
                    await self.read_db.save_checkpoint(
                        self.projection.name,
                        {"position": event["global_position"]},
                    )
            except Exception as e:
                errors += 1
                print(
                    f"Error processing event "
                    f"{event['event_id']}: {e}"
                )
                # 에러 로깅 후 계속 진행 (skip & log)
                continue

        # 3. 최종 체크포인트 저장
        if events:
            await self.read_db.save_checkpoint(
                self.projection.name,
                {"position": events[-1]["global_position"]},
            )

        return {
            "projection": self.projection.name,
            "processed": processed,
            "errors": errors,
            "recovered_at": datetime.utcnow().isoformat(),
        }

    async def full_rebuild(self) -> dict:
        """읽기 모델 전체 재구축"""
        print(
            f"Full rebuild of projection '{self.projection.name}'"
        )

        # 1. 기존 읽기 모델 삭제
        await self.read_db.drop_projection_data(self.projection.name)

        # 2. 체크포인트 초기화
        await self.read_db.save_checkpoint(
            self.projection.name, {"position": 0}
        )

        # 3. 전체 이벤트 재생
        return await self.recover_from_checkpoint()

사례 4: 이벤트 스토어 디스크 부족

문제 상황: 이벤트가 지속적으로 누적되어 디스크 용량이 부족해지는 경우

예방 전략:

  • 스냅샷 생성 후 오래된 이벤트를 아카이브(S3/Glacier)로 이동
  • 이벤트 보관 정책(Retention Policy) 수립: 핫 데이터(30일), 웜 데이터(1년), 콜드 데이터(영구 아카이브)
  • 디스크 사용률 80% 경고, 90% 긴급 알림 설정

마치며

Event-Driven Architecture의 세 가지 핵심 패턴인 CQRS, Event Sourcing, Saga는 각각 독립적으로도 강력하지만, 함께 사용할 때 마이크로서비스 아키텍처의 데이터 일관성 문제를 근본적으로 해결한다.

핵심 요점을 정리하면 다음과 같다.

  1. CQRS: 읽기와 쓰기의 비대칭적 요구사항을 인정하고 분리하라. 80-90%를 차지하는 읽기 트래픽을 독립적으로 최적화할 수 있다.

  2. Event Sourcing: 현재 상태 대신 변경 이력을 저장하라. 완전한 감사 추적, 시간 여행 디버깅, 다양한 읽기 모델 생성이 가능해진다. 다만 스냅샷 전략과 스키마 진화 전략은 처음부터 고려해야 한다.

  3. Saga 패턴: 분산 트랜잭션은 보상 기반으로 관리하라. 단순한 흐름은 Choreography로, 복잡한 비즈니스 로직은 Orchestration으로 구현하되, 보상 트랜잭션의 실패까지 대비한 복구 전략을 반드시 마련해야 한다.

  4. 이벤트 스토어 선택: 순수 Event Sourcing이 목적이면 EventStoreDB, 대용량 스트리밍이면 Kafka, AWS 서버리스면 DynamoDB를 기본으로 고려하되, 요구사항에 따라 조합할 수 있다.

이 패턴들은 강력하지만, 복잡성이라는 비용이 따른다. 모든 서비스에 일괄 적용하기보다, 비즈니스 복잡도가 높고 데이터 일관성이 중요한 핵심 도메인부터 점진적으로 도입하는 것이 현실적인 전략이다. 운영 모니터링, 멱등성 보장, 스키마 진화 전략 없이 이 패턴들을 도입하면 오히려 시스템의 안정성을 해칠 수 있음을 명심하자.

참고자료