Split View: Event-Driven Architecture 실전 가이드: CQRS, Event Sourcing, Saga 패턴
Event-Driven Architecture 실전 가이드: CQRS, Event Sourcing, Saga 패턴
- 들어가며
- Event-Driven Architecture 기초
- CQRS 패턴: Command와 Query의 분리
- Event Sourcing: 이벤트 기반 상태 관리
- Saga 패턴: 분산 트랜잭션 관리
- Choreography vs Orchestration 비교
- 이벤트 스토어 선택 (EventStoreDB, Kafka, DynamoDB)
- 실전 구현: 주문 시스템 예제
- 운영 시 주의사항과 트러블슈팅
- 실패 사례와 복구 절차
- 마치며
- 참고자료

들어가며
마이크로서비스 아키텍처에서 가장 큰 도전 중 하나는 여러 서비스에 걸친 데이터 일관성과 비즈니스 트랜잭션 관리다. 전통적인 모놀리스에서는 단일 데이터베이스의 ACID 트랜잭션으로 해결할 수 있었지만, 서비스별로 독립된 데이터 저장소를 가지는 분산 환경에서는 2PC(Two-Phase Commit)의 한계가 명확하다. 성능 병목, 가용성 저하, 그리고 서비스 간 강한 결합이 발생하기 때문이다.
Event-Driven Architecture(EDA)는 이 문제에 대한 근본적인 해법을 제시한다. 서비스 간 통신을 비동기 이벤트 기반으로 전환하고, 상태 변경을 이벤트 스트림으로 관리하며, 분산 트랜잭션을 보상 기반 사가로 처리한다. 이 글에서는 EDA의 세 가지 핵심 패턴인 CQRS, Event Sourcing, Saga를 실전 코드와 함께 깊이 있게 분석하고, 프로덕션 운영에서 필요한 전략과 트러블슈팅 기법을 다룬다.
Netflix는 2억 6천만 구독자를 위한 개인화 시스템에, LinkedIn은 하루 수조 개의 이벤트 처리에, Slack은 수십억 건의 일일 메시지 처리에 이 패턴들을 활용하고 있다. 이러한 대규모 시스템의 경험에서 검증된 패턴과 안티패턴을 함께 살펴보자.
Event-Driven Architecture 기초
이벤트의 세 가지 유형
EDA에서 이벤트는 그 목적에 따라 세 가지로 분류된다.
| 유형 | 설명 | 예시 | 특징 |
|---|---|---|---|
| Domain Event | 비즈니스 도메인에서 발생한 사실 기록 | OrderPlaced, PaymentCompleted | 불변, 과거형 명명 |
| Integration Event | 서비스 간 통신을 위한 이벤트 | OrderPlacedIntegrationEvent | 바운디드 컨텍스트 경계를 넘음 |
| Event Notification | 변경 발생 알림 (데이터 최소화) | OrderStatusChanged (ID만 포함) | 수신자가 필요한 데이터를 직접 조회 |
핵심 원칙
EDA를 올바르게 구현하기 위한 핵심 원칙은 다음과 같다.
- 비동기 통신: 생산자(Producer)와 소비자(Consumer)가 시간적으로 분리된다
- 느슨한 결합: 서비스는 이벤트 스키마만 알면 되며, 상대 서비스의 구현을 알 필요가 없다
- 최종 일관성: 즉각적인 강한 일관성 대신 일정 시간 후 일관성이 보장된다
- 멱등성: 동일한 이벤트가 여러 번 처리되어도 결과가 동일해야 한다
// TypeScript - 도메인 이벤트 기본 구조
interface DomainEvent {
eventId: string
eventType: string
aggregateId: string
aggregateType: string
timestamp: Date
version: number
payload: Record<string, unknown>
metadata: {
correlationId: string
causationId: string
userId?: string
}
}
// 주문 생성 이벤트 예시
const orderPlacedEvent: DomainEvent = {
eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
eventType: 'OrderPlaced',
aggregateId: 'order-12345',
aggregateType: 'Order',
timestamp: new Date('2026-03-14T09:00:00Z'),
version: 1,
payload: {
customerId: 'cust-67890',
items: [
{ productId: 'prod-001', quantity: 2, price: 29900 },
{ productId: 'prod-002', quantity: 1, price: 15000 },
],
totalAmount: 74800,
shippingAddress: {
city: '서울',
district: '강남구',
detail: '테헤란로 123',
},
},
metadata: {
correlationId: 'corr-abc123',
causationId: 'cmd-place-order-001',
userId: 'user-admin-01',
},
}
CQRS 패턴: Command와 Query의 분리
CQRS란 무엇인가
CQRS(Command Query Responsibility Segregation)는 데이터의 쓰기(Command)와 읽기(Query)를 별도의 모델로 분리하는 패턴이다. Bertrand Meyer의 CQS(Command Query Separation) 원칙을 아키텍처 수준으로 확장한 것으로, Greg Young이 2010년에 공식화했다.
전통적인 CRUD 모델에서는 동일한 데이터 모델이 읽기와 쓰기를 모두 담당한다. 그러나 실제 비즈니스에서 읽기와 쓰기의 요구사항은 크게 다르다.
| 관점 | Command (쓰기) | Query (읽기) |
|---|---|---|
| 목적 | 상태 변경, 비즈니스 규칙 검증 | 데이터 조회, 화면 표시 |
| 비율 | 전체 트래픽의 10-20% | 전체 트래픽의 80-90% |
| 일관성 | 강한 일관성 필요 | 최종 일관성 허용 가능 |
| 모델 복잡도 | 도메인 로직이 풍부 | 비정규화된 조회 모델 |
| 스케일링 | 수직 확장 위주 | 수평 확장 용이 (캐시, 리플리카) |
CQRS 구현: TypeScript 예제
// TypeScript - CQRS Command 측
// Command 정의
interface PlaceOrderCommand {
type: 'PlaceOrder'
customerId: string
items: Array<{
productId: string
quantity: number
price: number
}>
shippingAddress: string
}
// Command Handler
class PlaceOrderHandler {
constructor(
private orderRepository: OrderWriteRepository,
private eventBus: EventBus,
private inventoryService: InventoryService
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// 1. 비즈니스 규칙 검증
await this.inventoryService.validateStock(command.items)
// 2. Aggregate 생성
const order = Order.create({
customerId: command.customerId,
items: command.items,
shippingAddress: command.shippingAddress,
})
// 3. 쓰기 저장소에 저장
await this.orderRepository.save(order)
// 4. 도메인 이벤트 발행 (읽기 모델 동기화용)
for (const event of order.getDomainEvents()) {
await this.eventBus.publish(event)
}
return order.id
}
}
// Query 측 - 읽기 전용 모델
interface OrderReadModel {
orderId: string
customerName: string
orderDate: string
status: string
totalAmount: number
itemCount: number
lastUpdated: string
}
// Query Handler
class GetOrdersQueryHandler {
constructor(private readDb: ReadDatabase) {}
async handle(query: {
customerId: string
status?: string
page: number
limit: number
}): Promise<OrderReadModel[]> {
// 읽기 전용 비정규화 테이블에서 직접 조회
return this.readDb.query(
`SELECT order_id, customer_name, order_date, status,
total_amount, item_count, last_updated
FROM order_read_model
WHERE customer_id = ?
${query.status ? 'AND status = ?' : ''}
ORDER BY order_date DESC
LIMIT ? OFFSET ?`,
[query.customerId, query.status, query.limit, query.page * query.limit]
)
}
}
읽기 모델 프로젝션
이벤트를 수신하여 읽기 모델을 갱신하는 프로젝션(Projection) 로직은 CQRS의 핵심이다.
// TypeScript - 이벤트 기반 프로젝션
class OrderProjection {
constructor(private readDb: ReadDatabase) {}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderPlaced':
await this.onOrderPlaced(event)
break
case 'OrderShipped':
await this.onOrderShipped(event)
break
case 'OrderCancelled':
await this.onOrderCancelled(event)
break
}
}
private async onOrderPlaced(event: DomainEvent): Promise<void> {
const payload = event.payload as {
customerId: string
items: Array<{ quantity: number; price: number }>
totalAmount: number
}
await this.readDb.upsert('order_read_model', {
order_id: event.aggregateId,
customer_id: payload.customerId,
status: 'PLACED',
total_amount: payload.totalAmount,
item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
order_date: event.timestamp,
last_updated: event.timestamp,
version: event.version,
})
}
private async onOrderShipped(event: DomainEvent): Promise<void> {
const payload = event.payload as { trackingNumber: string }
// 멱등성 보장: version 체크
await this.readDb.updateWhere(
'order_read_model',
{
status: 'SHIPPED',
tracking_number: payload.trackingNumber,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId, version: event.version - 1 }
)
}
private async onOrderCancelled(event: DomainEvent): Promise<void> {
const payload = event.payload as { reason: string }
await this.readDb.updateWhere(
'order_read_model',
{
status: 'CANCELLED',
cancellation_reason: payload.reason,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId }
)
}
}
Event Sourcing: 이벤트 기반 상태 관리
Event Sourcing 핵심 개념
Event Sourcing은 애그리게이트의 현재 상태를 저장하는 대신, 상태를 변경한 모든 이벤트를 순서대로 저장하는 패턴이다. 현재 상태는 이벤트 스트림을 처음부터 재생(replay)하여 재구성한다.
전통적 방식과 Event Sourcing의 차이를 비교하면 다음과 같다.
| 관점 | 전통적 CRUD | Event Sourcing |
|---|---|---|
| 저장 대상 | 현재 상태 (최신 스냅샷) | 상태 변경 이벤트의 전체 이력 |
| 데이터 손실 | 이전 상태 소실 | 모든 변경 이력 보존 |
| 감사(Audit) | 별도 구현 필요 | 기본 내장 |
| 디버깅 | 현재 상태만 확인 가능 | 시간 여행(Time Travel) 가능 |
| 저장 공간 | 상대적으로 적음 | 이벤트 누적으로 증가 |
| 조회 성능 | 직접 조회 가능 | 프로젝션 또는 스냅샷 필요 |
Event Sourcing 구현
// TypeScript - Event Sourcing Aggregate
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
protected version: number = 0
abstract get id(): string
// 이벤트 적용 (상태 변경)
protected apply(event: DomainEvent): void {
this.when(event)
this.version++
this.uncommittedEvents.push(event)
}
// 이벤트 핸들러 (서브클래스에서 구현)
protected abstract when(event: DomainEvent): void
// 이벤트 스트림에서 상태 복원
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this.version++
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
}
// 주문 Aggregate
class Order extends EventSourcedAggregate {
private _id: string = ''
private _customerId: string = ''
private _items: OrderItem[] = []
private _status: OrderStatus = OrderStatus.DRAFT
private _totalAmount: number = 0
get id(): string {
return this._id
}
// 팩토리 메서드 (생성 명령)
static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
const order = new Order()
const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
order.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderPlaced',
aggregateId: params.orderId,
aggregateType: 'Order',
timestamp: new Date(),
version: 1,
payload: {
customerId: params.customerId,
items: params.items,
totalAmount,
},
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'create',
},
})
return order
}
// 주문 확인 명령
confirm(): void {
if (this._status !== OrderStatus.PLACED) {
throw new Error(`Cannot confirm order in status: ${this._status}`)
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderConfirmed',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { confirmedAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'confirm',
},
})
}
// 주문 취소 명령 (보상 트랜잭션에서 활용)
cancel(reason: string): void {
if (this._status === OrderStatus.CANCELLED) {
return // 멱등성 보장
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderCancelled',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { reason, cancelledAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'cancel',
},
})
}
// 이벤트 핸들러 - 상태 변경 로직
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced': {
const p = event.payload as {
customerId: string
items: OrderItem[]
totalAmount: number
}
this._id = event.aggregateId
this._customerId = p.customerId
this._items = p.items
this._totalAmount = p.totalAmount
this._status = OrderStatus.PLACED
break
}
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED
break
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED
break
}
}
}
enum OrderStatus {
DRAFT = 'DRAFT',
PLACED = 'PLACED',
CONFIRMED = 'CONFIRMED',
SHIPPED = 'SHIPPED',
CANCELLED = 'CANCELLED',
}
interface OrderItem {
productId: string
quantity: number
price: number
}
스냅샷 전략
이벤트 수가 많아지면 재생 시간이 길어진다. 스냅샷은 특정 시점의 상태를 캐싱하여 재생 성능을 개선한다.
# Python - 스냅샷 기반 이벤트 스토어
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
@dataclass
class Snapshot:
aggregate_id: str
aggregate_type: str
version: int
state: dict
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
SNAPSHOT_INTERVAL = 100 # 100개 이벤트마다 스냅샷 생성
def __init__(self, db_connection):
self.db = db_connection
async def save_events(
self, aggregate_id: str, events: list[dict], expected_version: int
) -> None:
"""낙관적 동시성 제어와 함께 이벤트 저장"""
async with self.db.transaction():
# 현재 버전 확인 (낙관적 잠금)
current_version = await self._get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but current version is {current_version}"
)
# 이벤트 배치 삽입
for i, event in enumerate(events):
version = expected_version + i + 1
await self.db.execute(
"""INSERT INTO event_store
(event_id, aggregate_id, aggregate_type,
event_type, version, payload, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
event["event_id"],
aggregate_id,
event["aggregate_type"],
event["event_type"],
version,
json.dumps(event["payload"]),
json.dumps(event["metadata"]),
datetime.utcnow(),
),
)
# 스냅샷 생성 조건 확인
new_version = expected_version + len(events)
if new_version % self.SNAPSHOT_INTERVAL == 0:
await self._create_snapshot(aggregate_id, new_version)
async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
"""스냅샷부터 이벤트를 로드하여 상태 복원"""
# 1. 가장 최근 스냅샷 조회
snapshot = await self._get_latest_snapshot(aggregate_id)
if snapshot:
# 2. 스냅샷 이후 이벤트만 로드
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ? AND version > ?
ORDER BY version ASC""",
(aggregate_id, snapshot.version),
)
return events, snapshot
else:
# 3. 스냅샷 없으면 전체 이벤트 로드
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ?
ORDER BY version ASC""",
(aggregate_id,),
)
return events, None
async def _create_snapshot(
self, aggregate_id: str, version: int
) -> None:
"""현재 상태를 스냅샷으로 저장"""
events, _ = await self.load_aggregate(aggregate_id)
# Aggregate 재구성 후 상태 직렬화
aggregate = self._rebuild_aggregate(events)
await self.db.execute(
"""INSERT INTO snapshots
(aggregate_id, aggregate_type, version, state, created_at)
VALUES (?, ?, ?, ?, ?)""",
(
aggregate_id,
aggregate.aggregate_type,
version,
json.dumps(aggregate.to_dict()),
datetime.utcnow(),
),
)
async def _get_current_version(self, aggregate_id: str) -> int:
result = await self.db.fetch_one(
"SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
(aggregate_id,),
)
return result[0] if result[0] else 0
class ConcurrencyError(Exception):
pass
Saga 패턴: 분산 트랜잭션 관리
Saga 패턴이란
Saga 패턴은 분산 환경에서 여러 서비스에 걸친 비즈니스 트랜잭션을 관리하는 패턴이다. 전통적인 분산 트랜잭션(2PC)과 달리, 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 이미 완료된 단계에 대해 보상 트랜잭션(Compensating Transaction)을 실행하여 일관성을 복원한다.
주문 처리 Saga의 흐름을 예로 들면 다음과 같다.
정상 흐름:
- 주문 서비스: 주문 생성 (OrderCreated)
- 결제 서비스: 결제 처리 (PaymentProcessed)
- 재고 서비스: 재고 차감 (InventoryReserved)
- 배송 서비스: 배송 생성 (ShipmentCreated)
실패 시 보상 흐름 (3단계 재고 차감 실패 시):
- 결제 서비스: 결제 환불 (PaymentRefunded) -- 보상
- 주문 서비스: 주문 취소 (OrderCancelled) -- 보상
Orchestration 기반 Saga 구현
// TypeScript - Saga Orchestrator (주문 처리)
interface SagaStep {
name: string
action: () => Promise<void>
compensation: () => Promise<void>
}
class OrderSagaOrchestrator {
private completedSteps: SagaStep[] = []
private sagaLog: SagaLogEntry[] = []
constructor(
private paymentService: PaymentService,
private inventoryService: InventoryService,
private shippingService: ShippingService,
private sagaStore: SagaStore
) {}
async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
const sagaId = crypto.randomUUID()
const steps: SagaStep[] = [
{
name: 'ProcessPayment',
action: async () => {
await this.paymentService.processPayment({
orderId,
amount: orderData.totalAmount,
customerId: orderData.customerId,
})
},
compensation: async () => {
await this.paymentService.refundPayment({
orderId,
amount: orderData.totalAmount,
})
},
},
{
name: 'ReserveInventory',
action: async () => {
await this.inventoryService.reserve({
orderId,
items: orderData.items,
})
},
compensation: async () => {
await this.inventoryService.releaseReservation({
orderId,
items: orderData.items,
})
},
},
{
name: 'CreateShipment',
action: async () => {
await this.shippingService.createShipment({
orderId,
address: orderData.shippingAddress,
items: orderData.items,
})
},
compensation: async () => {
await this.shippingService.cancelShipment({ orderId })
},
},
]
try {
for (const step of steps) {
await this.logStep(sagaId, step.name, 'STARTED')
try {
await step.action()
this.completedSteps.push(step)
await this.logStep(sagaId, step.name, 'COMPLETED')
} catch (error) {
await this.logStep(sagaId, step.name, 'FAILED', error)
// 보상 트랜잭션 실행
await this.compensate(sagaId)
return {
success: false,
sagaId,
failedStep: step.name,
error: (error as Error).message,
}
}
}
await this.sagaStore.markCompleted(sagaId)
return { success: true, sagaId }
} catch (compensationError) {
// 보상 트랜잭션도 실패한 경우 - 수동 개입 필요
await this.sagaStore.markRequiresIntervention(sagaId)
throw new SagaCompensationFailedError(sagaId, compensationError as Error)
}
}
private async compensate(sagaId: string): Promise<void> {
// 완료된 단계를 역순으로 보상
const stepsToCompensate = [...this.completedSteps].reverse()
for (const step of stepsToCompensate) {
try {
await this.logStep(sagaId, step.name, 'COMPENSATING')
await step.compensation()
await this.logStep(sagaId, step.name, 'COMPENSATED')
} catch (error) {
await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
// 보상 실패 시 재시도 큐에 등록
await this.sagaStore.enqueueRetry(sagaId, step.name)
throw error
}
}
}
private async logStep(
sagaId: string,
stepName: string,
status: string,
error?: unknown
): Promise<void> {
const entry: SagaLogEntry = {
sagaId,
stepName,
status,
timestamp: new Date(),
error: error ? (error as Error).message : undefined,
}
this.sagaLog.push(entry)
await this.sagaStore.appendLog(entry)
}
}
interface SagaResult {
success: boolean
sagaId: string
failedStep?: string
error?: string
}
interface SagaLogEntry {
sagaId: string
stepName: string
status: string
timestamp: Date
error?: string
}
Choreography vs Orchestration 비교
Saga 패턴의 두 가지 구현 방식을 상세히 비교한다.
| 비교 항목 | Choreography (안무) | Orchestration (오케스트레이션) |
|---|---|---|
| 제어 방식 | 분산 - 각 서비스가 이벤트를 발행/구독 | 중앙 집중 - 오케스트레이터가 흐름 제어 |
| 결합도 | 낮음 (이벤트 스키마만 공유) | 중간 (오케스트레이터가 모든 서비스를 알아야 함) |
| 가시성 | 낮음 (흐름 추적 어려움) | 높음 (오케스트레이터에서 상태 확인 가능) |
| 복잡도 관리 | 참여 서비스 증가 시 급격히 복잡 | 선형적으로 증가 |
| 단일 장애점 | 없음 | 오케스트레이터가 SPOF가 될 수 있음 |
| 보상 로직 | 각 서비스에 분산 | 오케스트레이터에 집중 |
| 테스트 | 통합 테스트 어려움 | 오케스트레이터 단위 테스트 용이 |
| 적합한 규모 | 2-4개 서비스 참여하는 단순 워크플로 | 5개 이상 서비스의 복잡한 워크플로 |
| 대표 도구 | Kafka, RabbitMQ, SNS/SQS | Temporal, Camunda, AWS Step Functions |
Choreography 패턴 코드 예시
# Python - Choreography 기반 Saga (이벤트 구독 방식)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class EventBus:
"""간단한 인메모리 이벤트 버스 (실무에서는 Kafka/RabbitMQ 사용)"""
def __init__(self):
self._handlers: dict[str, list] = {}
def subscribe(self, event_type: str, handler):
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, payload: dict):
handlers = self._handlers.get(event_type, [])
for handler in handlers:
await handler(payload)
# 결제 서비스 - Choreography 방식
class PaymentService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
# 주문 생성 이벤트 구독
event_bus.subscribe("OrderCreated", self.on_order_created)
# 재고 부족 이벤트 구독 (보상용)
event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)
async def on_order_created(self, payload: dict):
"""주문 생성 시 결제 처리"""
try:
order_id = payload["order_id"]
amount = payload["total_amount"]
# 결제 처리 로직
payment_result = await self._process_payment(
order_id, amount
)
# 성공 이벤트 발행
await self.event_bus.publish("PaymentProcessed", {
"order_id": order_id,
"payment_id": payment_result["payment_id"],
"amount": amount,
})
except Exception as e:
# 실패 이벤트 발행
await self.event_bus.publish("PaymentFailed", {
"order_id": payload["order_id"],
"reason": str(e),
})
async def on_inventory_failed(self, payload: dict):
"""재고 부족 시 결제 환불 (보상 트랜잭션)"""
order_id = payload["order_id"]
await self._refund_payment(order_id)
await self.event_bus.publish("PaymentRefunded", {
"order_id": order_id,
})
async def _process_payment(self, order_id: str, amount: int) -> dict:
# 실제 결제 처리 로직
return {"payment_id": f"pay-{order_id}"}
async def _refund_payment(self, order_id: str) -> None:
# 실제 환불 처리 로직
pass
# 재고 서비스 - Choreography 방식
class InventoryService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
# 결제 완료 이벤트 구독
event_bus.subscribe("PaymentProcessed", self.on_payment_processed)
async def on_payment_processed(self, payload: dict):
"""결제 완료 시 재고 차감"""
try:
order_id = payload["order_id"]
# 재고 확인 및 차감
await self._reserve_inventory(order_id)
await self.event_bus.publish("InventoryReserved", {
"order_id": order_id,
})
except InsufficientStockError:
await self.event_bus.publish("InventoryReservationFailed", {
"order_id": payload["order_id"],
"reason": "insufficient_stock",
})
class InsufficientStockError(Exception):
pass
하이브리드 접근: 언제 어떤 방식을 선택할 것인가
실무에서는 순수한 Choreography나 Orchestration 하나만 사용하기보다, 워크플로의 복잡도에 따라 혼합하는 것이 효과적이다.
- Choreography 선택: 2-3개 서비스만 참여하는 단순 알림, 캐시 무효화, 로그 수집 등
- Orchestration 선택: 결제-재고-배송처럼 순서와 보상이 중요한 핵심 비즈니스 플로우
- 하이브리드: 핵심 비즈니스 플로우는 Orchestration으로, 부수 효과(이메일 발송, 알림, 분석 이벤트)는 Choreography로 처리
이벤트 스토어 선택 (EventStoreDB, Kafka, DynamoDB)
이벤트 스토어의 선택은 시스템의 요구사항과 팀의 역량에 따라 달라진다. 주요 세 가지 옵션을 비교한다.
| 비교 항목 | EventStoreDB | Apache Kafka | DynamoDB Streams |
|---|---|---|---|
| 설계 목적 | Event Sourcing 전용 DB | 분산 메시지 스트리밍 플랫폼 | 범용 NoSQL + 변경 스트림 |
| 스트림 모델 | 세분화된 개별 스트림 (Aggregate 별) | 토픽-파티션 기반 | 테이블 + DynamoDB Streams/Kinesis |
| 동시성 제어 | ExpectedVersion (낙관적 잠금) 내장 | 파티션 수준 순서만 보장 | 조건부 쓰기 (ConditionExpression) |
| ID별 조회 | 스트림 ID로 즉시 조회 | 토픽 내 특정 엔티티 조회 불가 | 파티션 키로 직접 조회 |
| 이벤트 순서 | 스트림 내 완전 보장 | 파티션 내에서만 보장 | 파티션 키 내 보장 |
| 프로젝션 | 서버 사이드 프로젝션 내장 | Kafka Streams / ksqlDB로 구현 | Lambda + DynamoDB Streams |
| 구독 모델 | Persistent / Catch-up 구독 | Consumer Group | DynamoDB Streams / Kinesis |
| 운영 복잡도 | 중간 (전용 클러스터) | 높음 (ZooKeeper/KRaft, 파티션 관리) | 낮음 (서버리스, AWS 관리형) |
| 비용 모델 | 오픈소스 + 상용 클라우드 | 오픈소스 + MSK/Confluent Cloud | 요청/저장량 기반 종량제 |
| 최적 사용처 | 순수 Event Sourcing 시스템 | 대용량 이벤트 스트리밍 + 통합 | AWS 네이티브, 서버리스 아키텍처 |
선택 기준 가이드
EventStoreDB가 적합한 경우:
- Event Sourcing이 핵심 아키텍처 패턴인 경우
- 세밀한 스트림 관리와 프로젝션이 필요한 경우
- DDD 기반 설계를 적극 활용하는 팀
Kafka가 적합한 경우:
- 대용량 이벤트 스트리밍이 주 목적인 경우
- 이미 Kafka 인프라가 있고 팀 역량이 충분한 경우
- Event Sourcing과 이벤트 스트리밍을 모두 해야 하는 경우 (EventStoreDB + Kafka 조합 고려)
DynamoDB가 적합한 경우:
- AWS 생태계를 주로 사용하는 경우
- 서버리스 아키텍처를 지향하는 경우
- 운영 부담을 최소화하고 싶은 경우
EventStoreDB 설정 예시
# docker-compose.yml - EventStoreDB 클러스터 설정
version: '3.8'
services:
eventstoredb:
image: eventstore/eventstore:24.2
container_name: eventstoredb
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_START_STANDARD_PROJECTIONS=true
- EVENTSTORE_INSECURE=true
- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
- EVENTSTORE_MEM_DB=false
- EVENTSTORE_DB=/var/lib/eventstore-data
- EVENTSTORE_INDEX=/var/lib/eventstore-index
- EVENTSTORE_LOG=/var/log/eventstore
ports:
- '2113:2113' # HTTP/gRPC
- '1113:1113' # TCP (레거시)
volumes:
- eventstore-data:/var/lib/eventstore-data
- eventstore-index:/var/lib/eventstore-index
- eventstore-logs:/var/log/eventstore
# Kafka - 이벤트 스트리밍용 (EventStoreDB와 조합)
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
ports:
- '9092:9092'
volumes:
- kafka-data:/var/lib/kafka/data
volumes:
eventstore-data:
eventstore-index:
eventstore-logs:
kafka-data:
실전 구현: 주문 시스템 예제
전자상거래 주문 시스템을 통해 CQRS, Event Sourcing, Saga 패턴을 통합 구현하는 예제를 살펴본다.
시스템 아키텍처 개요
전체 시스템은 다음과 같은 구조로 구성된다.
- 주문 서비스: 주문 Aggregate (Event Sourcing), CQRS 적용
- 결제 서비스: 결제 처리, 환불 보상 트랜잭션
- 재고 서비스: 재고 예약, 해제 보상 트랜잭션
- 배송 서비스: 배송 생성, 취소 보상 트랜잭션
- Saga 오케스트레이터: Temporal 기반 워크플로 관리
Temporal 기반 Saga Workflow
// TypeScript - Temporal Workflow로 구현한 주문 Saga
import { proxyActivities, defineSignal, setHandler, condition, sleep } from '@temporalio/workflow'
// Activity 프록시
const payment = proxyActivities<PaymentActivities>({
startToCloseTimeout: '30s',
retry: {
maximumAttempts: 3,
initialInterval: '1s',
backoffCoefficient: 2,
maximumInterval: '30s',
},
})
const inventory = proxyActivities<InventoryActivities>({
startToCloseTimeout: '10s',
retry: { maximumAttempts: 3 },
})
const shipping = proxyActivities<ShippingActivities>({
startToCloseTimeout: '15s',
retry: { maximumAttempts: 3 },
})
const notification = proxyActivities<NotificationActivities>({
startToCloseTimeout: '5s',
retry: { maximumAttempts: 5 },
})
// 시그널 정의 (외부에서 워크플로에 메시지 전달)
const cancelOrderSignal = defineSignal<[string]>('cancelOrder')
// 주문 처리 Saga Workflow
export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {
let isCancelled = false
let cancelReason = ''
// 취소 시그널 핸들러
setHandler(cancelOrderSignal, (reason: string) => {
isCancelled = true
cancelReason = reason
})
const compensations: Array<() => Promise<void>> = []
try {
// Step 1: 결제 처리
if (isCancelled) throw new SagaCancelledError(cancelReason)
const paymentResult = await payment.processPayment({
orderId: input.orderId,
amount: input.totalAmount,
customerId: input.customerId,
})
compensations.push(async () => {
await payment.refundPayment({
orderId: input.orderId,
paymentId: paymentResult.paymentId,
amount: input.totalAmount,
})
})
// Step 2: 재고 예약
if (isCancelled) throw new SagaCancelledError(cancelReason)
await inventory.reserveInventory({
orderId: input.orderId,
items: input.items,
})
compensations.push(async () => {
await inventory.releaseReservation({
orderId: input.orderId,
items: input.items,
})
})
// Step 3: 배송 생성
if (isCancelled) throw new SagaCancelledError(cancelReason)
const shipmentResult = await shipping.createShipment({
orderId: input.orderId,
address: input.shippingAddress,
items: input.items,
})
compensations.push(async () => {
await shipping.cancelShipment({
orderId: input.orderId,
shipmentId: shipmentResult.shipmentId,
})
})
// 모든 단계 성공 - 확인 알림 발송 (실패해도 Saga에 영향 없음)
await notification
.sendOrderConfirmation({
orderId: input.orderId,
customerId: input.customerId,
})
.catch(() => {
/* 알림 실패는 무시 */
})
return {
success: true,
orderId: input.orderId,
paymentId: paymentResult.paymentId,
shipmentId: shipmentResult.shipmentId,
}
} catch (error) {
// 보상 트랜잭션 실행 (역순)
for (const compensate of compensations.reverse()) {
try {
await compensate()
} catch (compError) {
// Temporal이 자동으로 재시도 관리
// 최종 실패 시 Dead Letter Queue로 전송
console.error('Compensation failed:', compError)
}
}
return {
success: false,
orderId: input.orderId,
error: (error as Error).message,
}
}
}
interface OrderSagaInput {
orderId: string
customerId: string
totalAmount: number
shippingAddress: string
items: Array<{
productId: string
quantity: number
price: number
}>
}
interface OrderSagaResult {
success: boolean
orderId: string
paymentId?: string
shipmentId?: string
error?: string
}
class SagaCancelledError extends Error {
constructor(reason: string) {
super(`Saga cancelled: ${reason}`)
this.name = 'SagaCancelledError'
}
}
DynamoDB 기반 이벤트 스토어 스키마
# AWS CloudFormation - DynamoDB 이벤트 스토어 테이블
AWSTemplateFormatVersion: '2010-09-09'
Description: Event Store on DynamoDB
Resources:
EventStoreTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: event-store
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: aggregateId
AttributeType: S
- AttributeName: version
AttributeType: N
- AttributeName: eventType
AttributeType: S
- AttributeName: timestamp
AttributeType: S
KeySchema:
- AttributeName: aggregateId
KeyType: HASH
- AttributeName: version
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: eventType-timestamp-index
KeySchema:
- AttributeName: eventType
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
Tags:
- Key: Environment
Value: production
- Key: Service
Value: event-store
SnapshotTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: event-store-snapshots
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: aggregateId
AttributeType: S
- AttributeName: version
AttributeType: N
KeySchema:
- AttributeName: aggregateId
KeyType: HASH
- AttributeName: version
KeyType: RANGE
Tags:
- Key: Environment
Value: production
SagaStateTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: saga-state
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: sagaId
AttributeType: S
- AttributeName: createdAt
AttributeType: S
KeySchema:
- AttributeName: sagaId
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: createdAt-index
KeySchema:
- AttributeName: createdAt
KeyType: HASH
Projection:
ProjectionType: ALL
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
운영 시 주의사항과 트러블슈팅
1. 멱등성(Idempotency) 확보
분산 환경에서 이벤트는 "최소 한 번(at-least-once)" 전달이 보장되므로, 동일한 이벤트가 중복 처리될 수 있다. 모든 이벤트 핸들러는 반드시 멱등성을 보장해야 한다.
// TypeScript - 멱등성 보장 패턴
class IdempotentEventHandler {
constructor(
private processedEvents: ProcessedEventStore,
private handler: EventHandler
) {}
async handle(event: DomainEvent): Promise<void> {
// 이미 처리된 이벤트인지 확인
const isProcessed = await this.processedEvents.exists(event.eventId)
if (isProcessed) {
console.log(`Event ${event.eventId} already processed, skipping`)
return
}
try {
// 이벤트 처리
await this.handler.handle(event)
// 처리 완료 기록 (TTL 설정으로 자동 정리)
await this.processedEvents.markAsProcessed(event.eventId, {
processedAt: new Date(),
ttl: 60 * 60 * 24 * 7, // 7일 후 만료
})
} catch (error) {
// 처리 실패 시 기록하지 않아 재시도 가능
throw error
}
}
}
// Redis 기반 중복 검사
class RedisProcessedEventStore implements ProcessedEventStore {
constructor(private redis: Redis) {}
async exists(eventId: string): Promise<boolean> {
const result = await this.redis.exists(`processed:${eventId}`)
return result === 1
}
async markAsProcessed(
eventId: string,
options: { processedAt: Date; ttl: number }
): Promise<void> {
await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
}
}
2. 이벤트 순서 보장
Kafka에서 이벤트 순서를 보장하려면 동일한 Aggregate의 이벤트가 동일한 파티션에 라우팅되어야 한다. Aggregate ID를 파티션 키로 사용하는 것이 일반적이다.
3. 스키마 진화(Schema Evolution)
이벤트는 불변이므로, 스키마 변경 시 하위 호환성을 유지해야 한다. Avro, Protobuf 같은 스키마 레지스트리를 사용하거나 업캐스팅(Upcasting) 패턴을 적용한다.
// TypeScript - 이벤트 업캐스터 (Schema Evolution)
class EventUpcaster {
private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()
register(
eventType: string,
fromVersion: number,
upcaster: (event: DomainEvent) => DomainEvent
): void {
if (!this.upcasters.has(eventType)) {
this.upcasters.set(eventType, new Map())
}
this.upcasters.get(eventType)!.set(fromVersion, upcaster)
}
upcast(event: DomainEvent): DomainEvent {
const typeUpcasters = this.upcasters.get(event.eventType)
if (!typeUpcasters) return event
let currentEvent = event
let schemaVersion = (event.metadata as any).schemaVersion || 1
while (typeUpcasters.has(schemaVersion)) {
const upcasterFn = typeUpcasters.get(schemaVersion)!
currentEvent = upcasterFn(currentEvent)
schemaVersion++
}
return currentEvent
}
}
// 사용 예: OrderPlaced 이벤트 v1 -> v2 업캐스팅
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => {
// v1에는 shippingAddress가 문자열이었지만
// v2에서는 구조화된 객체로 변경
const payload = event.payload as any
return {
...event,
payload: {
...payload,
shippingAddress: {
full: payload.shippingAddress,
city: '',
zipCode: '',
},
},
metadata: {
...event.metadata,
schemaVersion: 2,
},
}
})
4. 모니터링 핵심 지표
EDA 시스템의 건강 상태를 확인하기 위해 다음 지표를 모니터링해야 한다.
| 지표 | 설명 | 경고 임계값 |
|---|---|---|
| Consumer Lag | 소비자의 처리 지연 (미처리 이벤트 수) | 1000건 이상 |
| Event Processing Latency | 이벤트 발행에서 처리까지 소요 시간 | P99 5초 이상 |
| Saga Completion Rate | Saga 성공률 | 99% 미만 |
| Compensation Failure Rate | 보상 트랜잭션 실패율 | 0.1% 이상 |
| Projection Lag | 읽기 모델과 쓰기 모델의 동기화 지연 | 30초 이상 |
| Dead Letter Queue Size | 처리 불가 이벤트 수 | 0건 초과 시 즉시 알림 |
실패 사례와 복구 절차
사례 1: 보상 트랜잭션 실패 (가장 위험한 시나리오)
문제 상황: 결제는 완료되었으나 재고 차감이 실패하여 보상(환불)을 시도했지만, 결제 게이트웨이의 타임아웃으로 환불도 실패한 경우
복구 절차:
- Saga 상태를 "REQUIRES_INTERVENTION"으로 표시
- Dead Letter Queue에 실패한 보상 트랜잭션을 등록
- 자동 재시도 (지수 백오프): 1초, 2초, 4초, 8초, 16초
- 최대 재시도 횟수 초과 시 운영팀에 PagerDuty/Slack 알림
- 운영자가 수동으로 결제 게이트웨이 콘솔에서 환불 처리
- Saga 상태를 "MANUALLY_COMPENSATED"로 갱신
사례 2: 이벤트 순서 역전
문제 상황: 네트워크 지연으로 OrderCancelled 이벤트가 OrderConfirmed보다 먼저 도착
예방 및 복구:
- 이벤트에 version 필드를 포함하여 순서 검증
- 버전이 기대치와 다르면 버퍼링 후 재정렬
- EventStoreDB를 사용하면 스트림 내 순서가 보장되므로 이 문제가 원천 방지됨
사례 3: 프로젝션 장애로 인한 읽기 모델 불일치
문제 상황: 프로젝션 프로세스가 크래시되어 읽기 모델이 최신 상태를 반영하지 못하는 경우
복구 절차:
- 프로젝션 프로세스의 마지막 처리 체크포인트 확인
- 체크포인트 이후의 이벤트부터 프로젝션 재실행
- 심각한 불일치의 경우 읽기 모델을 Drop 후 전체 이벤트 재생(Replay)
- 재생 중에는 읽기 트래픽을 캐시 또는 쓰기 DB로 우회
# Python - 프로젝션 복구 스크립트
import asyncio
from datetime import datetime
class ProjectionRecovery:
def __init__(self, event_store, read_db, projection):
self.event_store = event_store
self.read_db = read_db
self.projection = projection
async def recover_from_checkpoint(self) -> dict:
"""체크포인트 기반 프로젝션 복구"""
# 1. 마지막 체크포인트 확인
checkpoint = await self.read_db.get_checkpoint(
self.projection.name
)
last_position = checkpoint.get("position", 0) if checkpoint else 0
print(
f"Recovering projection '{self.projection.name}' "
f"from position {last_position}"
)
# 2. 체크포인트 이후 이벤트 로드
events = await self.event_store.read_all_from(last_position)
processed = 0
errors = 0
for event in events:
try:
await self.projection.handle(event)
processed += 1
# 100건마다 체크포인트 갱신
if processed % 100 == 0:
await self.read_db.save_checkpoint(
self.projection.name,
{"position": event["global_position"]},
)
except Exception as e:
errors += 1
print(
f"Error processing event "
f"{event['event_id']}: {e}"
)
# 에러 로깅 후 계속 진행 (skip & log)
continue
# 3. 최종 체크포인트 저장
if events:
await self.read_db.save_checkpoint(
self.projection.name,
{"position": events[-1]["global_position"]},
)
return {
"projection": self.projection.name,
"processed": processed,
"errors": errors,
"recovered_at": datetime.utcnow().isoformat(),
}
async def full_rebuild(self) -> dict:
"""읽기 모델 전체 재구축"""
print(
f"Full rebuild of projection '{self.projection.name}'"
)
# 1. 기존 읽기 모델 삭제
await self.read_db.drop_projection_data(self.projection.name)
# 2. 체크포인트 초기화
await self.read_db.save_checkpoint(
self.projection.name, {"position": 0}
)
# 3. 전체 이벤트 재생
return await self.recover_from_checkpoint()
사례 4: 이벤트 스토어 디스크 부족
문제 상황: 이벤트가 지속적으로 누적되어 디스크 용량이 부족해지는 경우
예방 전략:
- 스냅샷 생성 후 오래된 이벤트를 아카이브(S3/Glacier)로 이동
- 이벤트 보관 정책(Retention Policy) 수립: 핫 데이터(30일), 웜 데이터(1년), 콜드 데이터(영구 아카이브)
- 디스크 사용률 80% 경고, 90% 긴급 알림 설정
마치며
Event-Driven Architecture의 세 가지 핵심 패턴인 CQRS, Event Sourcing, Saga는 각각 독립적으로도 강력하지만, 함께 사용할 때 마이크로서비스 아키텍처의 데이터 일관성 문제를 근본적으로 해결한다.
핵심 요점을 정리하면 다음과 같다.
-
CQRS: 읽기와 쓰기의 비대칭적 요구사항을 인정하고 분리하라. 80-90%를 차지하는 읽기 트래픽을 독립적으로 최적화할 수 있다.
-
Event Sourcing: 현재 상태 대신 변경 이력을 저장하라. 완전한 감사 추적, 시간 여행 디버깅, 다양한 읽기 모델 생성이 가능해진다. 다만 스냅샷 전략과 스키마 진화 전략은 처음부터 고려해야 한다.
-
Saga 패턴: 분산 트랜잭션은 보상 기반으로 관리하라. 단순한 흐름은 Choreography로, 복잡한 비즈니스 로직은 Orchestration으로 구현하되, 보상 트랜잭션의 실패까지 대비한 복구 전략을 반드시 마련해야 한다.
-
이벤트 스토어 선택: 순수 Event Sourcing이 목적이면 EventStoreDB, 대용량 스트리밍이면 Kafka, AWS 서버리스면 DynamoDB를 기본으로 고려하되, 요구사항에 따라 조합할 수 있다.
이 패턴들은 강력하지만, 복잡성이라는 비용이 따른다. 모든 서비스에 일괄 적용하기보다, 비즈니스 복잡도가 높고 데이터 일관성이 중요한 핵심 도메인부터 점진적으로 도입하는 것이 현실적인 전략이다. 운영 모니터링, 멱등성 보장, 스키마 진화 전략 없이 이 패턴들을 도입하면 오히려 시스템의 안정성을 해칠 수 있음을 명심하자.
참고자료
- Microsoft Azure - CQRS Pattern - CQRS 패턴의 공식 레퍼런스 아키텍처 문서
- Microservices.io - Event Sourcing Pattern - Chris Richardson의 Event Sourcing 패턴 설명
- Microservices.io - Saga Pattern - 분산 트랜잭션 관리를 위한 Saga 패턴 상세 가이드
- EventStoreDB Documentation - EventStoreDB 공식 문서 및 시작 가이드
- Temporal.io - Mastering Saga Patterns - Temporal 기반 Saga 패턴 구현 마스터 가이드
- ByteByteGo - Saga Pattern Demystified - Orchestration vs Choreography 비교 분석
- Microsoft - Exploring CQRS and Event Sourcing - Microsoft patterns and practices CQRS Journey 가이드북
- Domain Centric - EventStoreDB vs Kafka - EventStoreDB와 Kafka의 상세 비교
- AWS - Event Sourcing on AWS - AWS 기반 Event Sourcing 아키텍처 패턴
- Azure Architecture Center - Saga Design Pattern - Microsoft의 Saga 디자인 패턴 공식 문서
Event-Driven Architecture Practical Guide: CQRS, Event Sourcing, Saga Patterns
- Introduction
- Event-Driven Architecture Fundamentals
- CQRS Pattern: Separating Commands and Queries
- Event Sourcing: Event-Based State Management
- Saga Pattern: Distributed Transaction Management
- Choreography vs Orchestration Comparison
- Event Store Selection (EventStoreDB, Kafka, DynamoDB)
- Production Implementation: Order System Example
- Operational Considerations and Troubleshooting
- Failure Cases and Recovery Procedures
- Conclusion
- References

Introduction
One of the biggest challenges in microservice architecture is managing data consistency and business transactions across multiple services. In a traditional monolith, this could be solved with a single database's ACID transactions, but in distributed environments where each service has independent data stores, the limitations of 2PC (Two-Phase Commit) are clear. Performance bottlenecks, reduced availability, and strong coupling between services result.
Event-Driven Architecture (EDA) offers a fundamental solution to this problem. It transitions inter-service communication to asynchronous event-based messaging, manages state changes as event streams, and handles distributed transactions with compensation-based sagas. This article deeply analyzes the three core EDA patterns -- CQRS, Event Sourcing, and Saga -- with production code, and covers strategies and troubleshooting techniques needed for production operations.
Netflix uses these patterns for its personalization system serving 260 million subscribers, LinkedIn for processing trillions of daily events, and Slack for handling billions of daily messages. Let us explore the patterns and anti-patterns validated from the experience of these large-scale systems.
Event-Driven Architecture Fundamentals
Three Types of Events
In EDA, events are classified into three types based on their purpose.
| Type | Description | Example | Characteristics |
|---|---|---|---|
| Domain Event | A fact recorded from the business domain | OrderPlaced, PaymentCompleted | Immutable, past-tense naming |
| Integration Event | Events for inter-service communication | OrderPlacedIntegrationEvent | Crosses bounded context boundaries |
| Event Notification | Change notification (minimal data) | OrderStatusChanged (ID only) | Receiver queries needed data directly |
Core Principles
The core principles for correctly implementing EDA are as follows.
- Asynchronous Communication: Producers and consumers are temporally decoupled
- Loose Coupling: Services only need to know the event schema, not the implementation of other services
- Eventual Consistency: Instead of immediate strong consistency, consistency is guaranteed after a certain period
- Idempotency: Processing the same event multiple times must produce the same result
// TypeScript - Domain Event basic structure
interface DomainEvent {
eventId: string
eventType: string
aggregateId: string
aggregateType: string
timestamp: Date
version: number
payload: Record<string, unknown>
metadata: {
correlationId: string
causationId: string
userId?: string
}
}
// Order creation event example
const orderPlacedEvent: DomainEvent = {
eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
eventType: 'OrderPlaced',
aggregateId: 'order-12345',
aggregateType: 'Order',
timestamp: new Date('2026-03-14T09:00:00Z'),
version: 1,
payload: {
customerId: 'cust-67890',
items: [
{ productId: 'prod-001', quantity: 2, price: 29900 },
{ productId: 'prod-002', quantity: 1, price: 15000 },
],
totalAmount: 74800,
shippingAddress: {
city: 'Seoul',
district: 'Gangnam-gu',
detail: 'Teheran-ro 123',
},
},
metadata: {
correlationId: 'corr-abc123',
causationId: 'cmd-place-order-001',
userId: 'user-admin-01',
},
}
CQRS Pattern: Separating Commands and Queries
What Is CQRS?
CQRS (Command Query Responsibility Segregation) is a pattern that separates data writes (Commands) and reads (Queries) into separate models. It extends Bertrand Meyer's CQS (Command Query Separation) principle to the architectural level, formalized by Greg Young in 2010.
In the traditional CRUD model, the same data model handles both reads and writes. However, in real business scenarios, read and write requirements differ significantly.
| Aspect | Command (Write) | Query (Read) |
|---|---|---|
| Purpose | State change, business rule validation | Data retrieval, display |
| Ratio | 10-20% of total traffic | 80-90% of total traffic |
| Consistency | Strong consistency needed | Eventual consistency acceptable |
| Model Complexity | Rich domain logic | Denormalized read models |
| Scaling | Primarily vertical scaling | Easy horizontal scaling (cache, replicas) |
CQRS Implementation: TypeScript Example
// TypeScript - CQRS Command Side
// Command definition
interface PlaceOrderCommand {
type: 'PlaceOrder'
customerId: string
items: Array<{
productId: string
quantity: number
price: number
}>
shippingAddress: string
}
// Command Handler
class PlaceOrderHandler {
constructor(
private orderRepository: OrderWriteRepository,
private eventBus: EventBus,
private inventoryService: InventoryService
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// 1. Business rule validation
await this.inventoryService.validateStock(command.items)
// 2. Aggregate creation
const order = Order.create({
customerId: command.customerId,
items: command.items,
shippingAddress: command.shippingAddress,
})
// 3. Save to write store
await this.orderRepository.save(order)
// 4. Publish domain events (for read model sync)
for (const event of order.getDomainEvents()) {
await this.eventBus.publish(event)
}
return order.id
}
}
// Query Side - Read-only model
interface OrderReadModel {
orderId: string
customerName: string
orderDate: string
status: string
totalAmount: number
itemCount: number
lastUpdated: string
}
// Query Handler
class GetOrdersQueryHandler {
constructor(private readDb: ReadDatabase) {}
async handle(query: {
customerId: string
status?: string
page: number
limit: number
}): Promise<OrderReadModel[]> {
// Query directly from denormalized read-only table
return this.readDb.query(
`SELECT order_id, customer_name, order_date, status,
total_amount, item_count, last_updated
FROM order_read_model
WHERE customer_id = ?
${query.status ? 'AND status = ?' : ''}
ORDER BY order_date DESC
LIMIT ? OFFSET ?`,
[query.customerId, query.status, query.limit, query.page * query.limit]
)
}
}
Read Model Projection
The projection logic that receives events and updates the read model is the core of CQRS.
// TypeScript - Event-based Projection
class OrderProjection {
constructor(private readDb: ReadDatabase) {}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderPlaced':
await this.onOrderPlaced(event)
break
case 'OrderShipped':
await this.onOrderShipped(event)
break
case 'OrderCancelled':
await this.onOrderCancelled(event)
break
}
}
private async onOrderPlaced(event: DomainEvent): Promise<void> {
const payload = event.payload as {
customerId: string
items: Array<{ quantity: number; price: number }>
totalAmount: number
}
await this.readDb.upsert('order_read_model', {
order_id: event.aggregateId,
customer_id: payload.customerId,
status: 'PLACED',
total_amount: payload.totalAmount,
item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
order_date: event.timestamp,
last_updated: event.timestamp,
version: event.version,
})
}
private async onOrderShipped(event: DomainEvent): Promise<void> {
const payload = event.payload as { trackingNumber: string }
// Idempotency guarantee: version check
await this.readDb.updateWhere(
'order_read_model',
{
status: 'SHIPPED',
tracking_number: payload.trackingNumber,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId, version: event.version - 1 }
)
}
private async onOrderCancelled(event: DomainEvent): Promise<void> {
const payload = event.payload as { reason: string }
await this.readDb.updateWhere(
'order_read_model',
{
status: 'CANCELLED',
cancellation_reason: payload.reason,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId }
)
}
}
Event Sourcing: Event-Based State Management
Core Concept of Event Sourcing
Event Sourcing is a pattern that stores all events that changed the state of an aggregate in order, instead of storing the current state. The current state is reconstructed by replaying the event stream from the beginning.
Comparing the traditional approach with Event Sourcing:
| Aspect | Traditional CRUD | Event Sourcing |
|---|---|---|
| What is stored | Current state (latest snapshot) | Complete history of state changes |
| Data loss | Previous states are lost | All change history is preserved |
| Audit | Requires separate implementation | Built-in |
| Debugging | Only current state viewable | Time Travel possible |
| Storage space | Relatively small | Increases with event accumulation |
| Query perf | Direct queries possible | Requires projection or snapshots |
Event Sourcing Implementation
// TypeScript - Event Sourcing Aggregate
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
protected version: number = 0
abstract get id(): string
// Apply event (state change)
protected apply(event: DomainEvent): void {
this.when(event)
this.version++
this.uncommittedEvents.push(event)
}
// Event handler (implemented by subclass)
protected abstract when(event: DomainEvent): void
// Restore state from event stream
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this.version++
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
}
// Order Aggregate
class Order extends EventSourcedAggregate {
private _id: string = ''
private _customerId: string = ''
private _items: OrderItem[] = []
private _status: OrderStatus = OrderStatus.DRAFT
private _totalAmount: number = 0
get id(): string {
return this._id
}
// Factory method (create command)
static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
const order = new Order()
const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
order.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderPlaced',
aggregateId: params.orderId,
aggregateType: 'Order',
timestamp: new Date(),
version: 1,
payload: {
customerId: params.customerId,
items: params.items,
totalAmount,
},
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'create',
},
})
return order
}
// Confirm order command
confirm(): void {
if (this._status !== OrderStatus.PLACED) {
throw new Error(`Cannot confirm order in status: ${this._status}`)
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderConfirmed',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { confirmedAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'confirm',
},
})
}
// Cancel order command (used in compensating transactions)
cancel(reason: string): void {
if (this._status === OrderStatus.CANCELLED) {
return // Idempotency guarantee
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderCancelled',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { reason, cancelledAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'cancel',
},
})
}
// Event handler - state change logic
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced': {
const p = event.payload as {
customerId: string
items: OrderItem[]
totalAmount: number
}
this._id = event.aggregateId
this._customerId = p.customerId
this._items = p.items
this._totalAmount = p.totalAmount
this._status = OrderStatus.PLACED
break
}
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED
break
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED
break
}
}
}
enum OrderStatus {
DRAFT = 'DRAFT',
PLACED = 'PLACED',
CONFIRMED = 'CONFIRMED',
SHIPPED = 'SHIPPED',
CANCELLED = 'CANCELLED',
}
interface OrderItem {
productId: string
quantity: number
price: number
}
Snapshot Strategy
As the number of events grows, replay time increases. Snapshots cache state at specific points to improve replay performance.
# Python - Snapshot-based Event Store
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
@dataclass
class Snapshot:
aggregate_id: str
aggregate_type: str
version: int
state: dict
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
SNAPSHOT_INTERVAL = 100 # Create snapshot every 100 events
def __init__(self, db_connection):
self.db = db_connection
async def save_events(
self, aggregate_id: str, events: list[dict], expected_version: int
) -> None:
"""Save events with optimistic concurrency control"""
async with self.db.transaction():
# Check current version (optimistic locking)
current_version = await self._get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but current version is {current_version}"
)
# Batch insert events
for i, event in enumerate(events):
version = expected_version + i + 1
await self.db.execute(
"""INSERT INTO event_store
(event_id, aggregate_id, aggregate_type,
event_type, version, payload, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
event["event_id"],
aggregate_id,
event["aggregate_type"],
event["event_type"],
version,
json.dumps(event["payload"]),
json.dumps(event["metadata"]),
datetime.utcnow(),
),
)
# Check snapshot creation condition
new_version = expected_version + len(events)
if new_version % self.SNAPSHOT_INTERVAL == 0:
await self._create_snapshot(aggregate_id, new_version)
async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
"""Load events from snapshot to restore state"""
# 1. Query most recent snapshot
snapshot = await self._get_latest_snapshot(aggregate_id)
if snapshot:
# 2. Load only events after snapshot
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ? AND version > ?
ORDER BY version ASC""",
(aggregate_id, snapshot.version),
)
return events, snapshot
else:
# 3. Load all events if no snapshot
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ?
ORDER BY version ASC""",
(aggregate_id,),
)
return events, None
async def _create_snapshot(
self, aggregate_id: str, version: int
) -> None:
"""Save current state as snapshot"""
events, _ = await self.load_aggregate(aggregate_id)
# Rebuild aggregate and serialize state
aggregate = self._rebuild_aggregate(events)
await self.db.execute(
"""INSERT INTO snapshots
(aggregate_id, aggregate_type, version, state, created_at)
VALUES (?, ?, ?, ?, ?)""",
(
aggregate_id,
aggregate.aggregate_type,
version,
json.dumps(aggregate.to_dict()),
datetime.utcnow(),
),
)
async def _get_current_version(self, aggregate_id: str) -> int:
result = await self.db.fetch_one(
"SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
(aggregate_id,),
)
return result[0] if result[0] else 0
class ConcurrencyError(Exception):
pass
Saga Pattern: Distributed Transaction Management
What Is the Saga Pattern?
The Saga pattern manages business transactions spanning multiple services in distributed environments. Unlike traditional distributed transactions (2PC), it sequentially executes each service's local transactions and, upon failure, executes compensating transactions for already completed steps to restore consistency.
An example of an order processing Saga flow:
Normal flow:
- Order Service: Create order (OrderCreated)
- Payment Service: Process payment (PaymentProcessed)
- Inventory Service: Reserve inventory (InventoryReserved)
- Shipping Service: Create shipment (ShipmentCreated)
Compensation flow on failure (when step 3 inventory reservation fails):
- Payment Service: Refund payment (PaymentRefunded) -- compensation
- Order Service: Cancel order (OrderCancelled) -- compensation
Orchestration-based Saga Implementation
// TypeScript - Saga Orchestrator (Order Processing)
interface SagaStep {
name: string
action: () => Promise<void>
compensation: () => Promise<void>
}
class OrderSagaOrchestrator {
private completedSteps: SagaStep[] = []
private sagaLog: SagaLogEntry[] = []
constructor(
private paymentService: PaymentService,
private inventoryService: InventoryService,
private shippingService: ShippingService,
private sagaStore: SagaStore
) {}
async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
const sagaId = crypto.randomUUID()
const steps: SagaStep[] = [
{
name: 'ProcessPayment',
action: async () => {
await this.paymentService.processPayment({
orderId,
amount: orderData.totalAmount,
customerId: orderData.customerId,
})
},
compensation: async () => {
await this.paymentService.refundPayment({
orderId,
amount: orderData.totalAmount,
})
},
},
{
name: 'ReserveInventory',
action: async () => {
await this.inventoryService.reserve({
orderId,
items: orderData.items,
})
},
compensation: async () => {
await this.inventoryService.releaseReservation({
orderId,
items: orderData.items,
})
},
},
{
name: 'CreateShipment',
action: async () => {
await this.shippingService.createShipment({
orderId,
address: orderData.shippingAddress,
items: orderData.items,
})
},
compensation: async () => {
await this.shippingService.cancelShipment({ orderId })
},
},
]
try {
for (const step of steps) {
await this.logStep(sagaId, step.name, 'STARTED')
try {
await step.action()
this.completedSteps.push(step)
await this.logStep(sagaId, step.name, 'COMPLETED')
} catch (error) {
await this.logStep(sagaId, step.name, 'FAILED', error)
// Execute compensating transactions
await this.compensate(sagaId)
return {
success: false,
sagaId,
failedStep: step.name,
error: (error as Error).message,
}
}
}
await this.sagaStore.markCompleted(sagaId)
return { success: true, sagaId }
} catch (compensationError) {
// When compensation transaction also fails - manual intervention needed
await this.sagaStore.markRequiresIntervention(sagaId)
throw new SagaCompensationFailedError(sagaId, compensationError as Error)
}
}
private async compensate(sagaId: string): Promise<void> {
// Compensate completed steps in reverse order
const stepsToCompensate = [...this.completedSteps].reverse()
for (const step of stepsToCompensate) {
try {
await this.logStep(sagaId, step.name, 'COMPENSATING')
await step.compensation()
await this.logStep(sagaId, step.name, 'COMPENSATED')
} catch (error) {
await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
// Register in retry queue on compensation failure
await this.sagaStore.enqueueRetry(sagaId, step.name)
throw error
}
}
}
private async logStep(
sagaId: string,
stepName: string,
status: string,
error?: unknown
): Promise<void> {
const entry: SagaLogEntry = {
sagaId,
stepName,
status,
timestamp: new Date(),
error: error ? (error as Error).message : undefined,
}
this.sagaLog.push(entry)
await this.sagaStore.appendLog(entry)
}
}
interface SagaResult {
success: boolean
sagaId: string
failedStep?: string
error?: string
}
interface SagaLogEntry {
sagaId: string
stepName: string
status: string
timestamp: Date
error?: string
}
Choreography vs Orchestration Comparison
A detailed comparison of the two Saga pattern implementation approaches.
| Comparison | Choreography | Orchestration |
|---|---|---|
| Control method | Distributed - each service publishes/subscribes events | Centralized - orchestrator controls the flow |
| Coupling | Low (only event schema shared) | Medium (orchestrator must know all services) |
| Visibility | Low (hard to trace flow) | High (state viewable in orchestrator) |
| Complexity mgmt | Rapidly complex as services increase | Increases linearly |
| SPOF | None | Orchestrator can be SPOF |
| Compensation | Distributed across services | Centralized in orchestrator |
| Testing | Integration testing difficult | Orchestrator unit testing easy |
| Suitable scale | Simple workflows with 2-4 services | Complex workflows with 5+ services |
| Representative tools | Kafka, RabbitMQ, SNS/SQS | Temporal, Camunda, AWS Step Functions |
Choreography Pattern Code Example
# Python - Choreography-based Saga (event subscription approach)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class EventBus:
"""Simple in-memory event bus (use Kafka/RabbitMQ in production)"""
def __init__(self):
self._handlers: dict[str, list] = {}
def subscribe(self, event_type: str, handler):
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, payload: dict):
handlers = self._handlers.get(event_type, [])
for handler in handlers:
await handler(payload)
# Payment Service - Choreography approach
class PaymentService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
# Subscribe to order creation event
event_bus.subscribe("OrderCreated", self.on_order_created)
# Subscribe to inventory failure event (for compensation)
event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)
async def on_order_created(self, payload: dict):
"""Process payment on order creation"""
try:
order_id = payload["order_id"]
amount = payload["total_amount"]
# Payment processing logic
payment_result = await self._process_payment(
order_id, amount
)
# Publish success event
await self.event_bus.publish("PaymentProcessed", {
"order_id": order_id,
"payment_id": payment_result["payment_id"],
"amount": amount,
})
except Exception as e:
# Publish failure event
await self.event_bus.publish("PaymentFailed", {
"order_id": payload["order_id"],
"reason": str(e),
})
async def on_inventory_failed(self, payload: dict):
"""Refund payment on inventory failure (compensating transaction)"""
order_id = payload["order_id"]
await self._refund_payment(order_id)
await self.event_bus.publish("PaymentRefunded", {
"order_id": order_id,
})
async def _process_payment(self, order_id: str, amount: int) -> dict:
# Actual payment processing logic
return {"payment_id": f"pay-{order_id}"}
async def _refund_payment(self, order_id: str) -> None:
# Actual refund processing logic
pass
# Inventory Service - Choreography approach
class InventoryService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
# Subscribe to payment completion event
event_bus.subscribe("PaymentProcessed", self.on_payment_processed)
async def on_payment_processed(self, payload: dict):
"""Reserve inventory on payment completion"""
try:
order_id = payload["order_id"]
# Check and reserve inventory
await self._reserve_inventory(order_id)
await self.event_bus.publish("InventoryReserved", {
"order_id": order_id,
})
except InsufficientStockError:
await self.event_bus.publish("InventoryReservationFailed", {
"order_id": payload["order_id"],
"reason": "insufficient_stock",
})
class InsufficientStockError(Exception):
pass
Hybrid Approach: When to Choose Which
In practice, rather than using pure Choreography or Orchestration alone, mixing them based on workflow complexity is effective.
- Choose Choreography: Simple notifications, cache invalidation, log collection with 2-3 participating services
- Choose Orchestration: Core business flows like payment-inventory-shipping where order and compensation matter
- Hybrid: Core business flows via Orchestration, side effects (email, notifications, analytics events) via Choreography
Event Store Selection (EventStoreDB, Kafka, DynamoDB)
Event store selection varies based on system requirements and team capabilities. Here is a comparison of the three main options.
| Comparison | EventStoreDB | Apache Kafka | DynamoDB Streams |
|---|---|---|---|
| Design purpose | Dedicated Event Sourcing DB | Distributed message streaming platform | General NoSQL + change streams |
| Stream model | Fine-grained individual streams (per Aggregate) | Topic-partition based | Table + DynamoDB Streams/Kinesis |
| Concurrency | ExpectedVersion (optimistic locking) built-in | Partition-level ordering only | Conditional writes (ConditionExpression) |
| ID lookup | Instant lookup by stream ID | Cannot look up specific entity in topic | Direct lookup by partition key |
| Event ordering | Fully guaranteed within stream | Guaranteed only within partition | Guaranteed within partition key |
| Projections | Server-side projections built-in | Kafka Streams / ksqlDB | Lambda + DynamoDB Streams |
| Subscription | Persistent / Catch-up subscriptions | Consumer Group | DynamoDB Streams / Kinesis |
| Ops complexity | Medium (dedicated cluster) | High (ZooKeeper/KRaft, partition mgmt) | Low (serverless, AWS managed) |
| Cost model | Open source + commercial cloud | Open source + MSK/Confluent Cloud | Pay-per-request/storage |
| Best fit | Pure Event Sourcing systems | High-volume event streaming + integration | AWS native, serverless architecture |
Selection Guide
When EventStoreDB is suitable:
- Event Sourcing is the core architectural pattern
- Fine-grained stream management and projections are needed
- Team actively uses DDD-based design
When Kafka is suitable:
- High-volume event streaming is the main purpose
- Kafka infrastructure already exists and team expertise is sufficient
- Both Event Sourcing and event streaming are needed (consider EventStoreDB + Kafka combination)
When DynamoDB is suitable:
- Primarily using the AWS ecosystem
- Targeting serverless architecture
- Minimizing operational burden
EventStoreDB Configuration Example
# docker-compose.yml - EventStoreDB cluster setup
version: '3.8'
services:
eventstoredb:
image: eventstore/eventstore:24.2
container_name: eventstoredb
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_START_STANDARD_PROJECTIONS=true
- EVENTSTORE_INSECURE=true
- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
- EVENTSTORE_MEM_DB=false
- EVENTSTORE_DB=/var/lib/eventstore-data
- EVENTSTORE_INDEX=/var/lib/eventstore-index
- EVENTSTORE_LOG=/var/log/eventstore
ports:
- '2113:2113' # HTTP/gRPC
- '1113:1113' # TCP (legacy)
volumes:
- eventstore-data:/var/lib/eventstore-data
- eventstore-index:/var/lib/eventstore-index
- eventstore-logs:/var/log/eventstore
# Kafka - for event streaming (combined with EventStoreDB)
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
ports:
- '9092:9092'
volumes:
- kafka-data:/var/lib/kafka/data
volumes:
eventstore-data:
eventstore-index:
eventstore-logs:
kafka-data:
Production Implementation: Order System Example
Let us examine a production implementation that integrates CQRS, Event Sourcing, and Saga patterns through an e-commerce order system.
System Architecture Overview
The overall system consists of the following structure:
- Order Service: Order Aggregate (Event Sourcing), CQRS applied
- Payment Service: Payment processing, refund compensating transaction
- Inventory Service: Inventory reservation, release compensating transaction
- Shipping Service: Shipment creation, cancellation compensating transaction
- Saga Orchestrator: Temporal-based workflow management
Temporal-based Saga Workflow
// TypeScript - Order Saga implemented with Temporal Workflow
import { proxyActivities, defineSignal, setHandler, condition, sleep } from '@temporalio/workflow'
// Activity proxies
const payment = proxyActivities<PaymentActivities>({
startToCloseTimeout: '30s',
retry: {
maximumAttempts: 3,
initialInterval: '1s',
backoffCoefficient: 2,
maximumInterval: '30s',
},
})
const inventory = proxyActivities<InventoryActivities>({
startToCloseTimeout: '10s',
retry: { maximumAttempts: 3 },
})
const shipping = proxyActivities<ShippingActivities>({
startToCloseTimeout: '15s',
retry: { maximumAttempts: 3 },
})
const notification = proxyActivities<NotificationActivities>({
startToCloseTimeout: '5s',
retry: { maximumAttempts: 5 },
})
// Signal definition (send messages to workflow from outside)
const cancelOrderSignal = defineSignal<[string]>('cancelOrder')
// Order Processing Saga Workflow
export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {
let isCancelled = false
let cancelReason = ''
// Cancel signal handler
setHandler(cancelOrderSignal, (reason: string) => {
isCancelled = true
cancelReason = reason
})
const compensations: Array<() => Promise<void>> = []
try {
// Step 1: Process payment
if (isCancelled) throw new SagaCancelledError(cancelReason)
const paymentResult = await payment.processPayment({
orderId: input.orderId,
amount: input.totalAmount,
customerId: input.customerId,
})
compensations.push(async () => {
await payment.refundPayment({
orderId: input.orderId,
paymentId: paymentResult.paymentId,
amount: input.totalAmount,
})
})
// Step 2: Reserve inventory
if (isCancelled) throw new SagaCancelledError(cancelReason)
await inventory.reserveInventory({
orderId: input.orderId,
items: input.items,
})
compensations.push(async () => {
await inventory.releaseReservation({
orderId: input.orderId,
items: input.items,
})
})
// Step 3: Create shipment
if (isCancelled) throw new SagaCancelledError(cancelReason)
const shipmentResult = await shipping.createShipment({
orderId: input.orderId,
address: input.shippingAddress,
items: input.items,
})
compensations.push(async () => {
await shipping.cancelShipment({
orderId: input.orderId,
shipmentId: shipmentResult.shipmentId,
})
})
// All steps succeeded - send confirmation notification (Saga unaffected by failure)
await notification
.sendOrderConfirmation({
orderId: input.orderId,
customerId: input.customerId,
})
.catch(() => {
/* Notification failure is ignored */
})
return {
success: true,
orderId: input.orderId,
paymentId: paymentResult.paymentId,
shipmentId: shipmentResult.shipmentId,
}
} catch (error) {
// Execute compensating transactions (reverse order)
for (const compensate of compensations.reverse()) {
try {
await compensate()
} catch (compError) {
// Temporal automatically manages retries
// Final failure sends to Dead Letter Queue
console.error('Compensation failed:', compError)
}
}
return {
success: false,
orderId: input.orderId,
error: (error as Error).message,
}
}
}
interface OrderSagaInput {
orderId: string
customerId: string
totalAmount: number
shippingAddress: string
items: Array<{
productId: string
quantity: number
price: number
}>
}
interface OrderSagaResult {
success: boolean
orderId: string
paymentId?: string
shipmentId?: string
error?: string
}
class SagaCancelledError extends Error {
constructor(reason: string) {
super(`Saga cancelled: ${reason}`)
this.name = 'SagaCancelledError'
}
}
DynamoDB-based Event Store Schema
# AWS CloudFormation - DynamoDB Event Store Table
AWSTemplateFormatVersion: '2010-09-09'
Description: Event Store on DynamoDB
Resources:
EventStoreTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: event-store
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: aggregateId
AttributeType: S
- AttributeName: version
AttributeType: N
- AttributeName: eventType
AttributeType: S
- AttributeName: timestamp
AttributeType: S
KeySchema:
- AttributeName: aggregateId
KeyType: HASH
- AttributeName: version
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: eventType-timestamp-index
KeySchema:
- AttributeName: eventType
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
Tags:
- Key: Environment
Value: production
- Key: Service
Value: event-store
SnapshotTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: event-store-snapshots
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: aggregateId
AttributeType: S
- AttributeName: version
AttributeType: N
KeySchema:
- AttributeName: aggregateId
KeyType: HASH
- AttributeName: version
KeyType: RANGE
Tags:
- Key: Environment
Value: production
SagaStateTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: saga-state
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: sagaId
AttributeType: S
- AttributeName: createdAt
AttributeType: S
KeySchema:
- AttributeName: sagaId
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: createdAt-index
KeySchema:
- AttributeName: createdAt
KeyType: HASH
Projection:
ProjectionType: ALL
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
Operational Considerations and Troubleshooting
1. Ensuring Idempotency
In distributed environments, events have "at-least-once" delivery guarantees, so the same event may be processed multiple times. All event handlers must guarantee idempotency.
// TypeScript - Idempotency guarantee pattern
class IdempotentEventHandler {
constructor(
private processedEvents: ProcessedEventStore,
private handler: EventHandler
) {}
async handle(event: DomainEvent): Promise<void> {
// Check if event was already processed
const isProcessed = await this.processedEvents.exists(event.eventId)
if (isProcessed) {
console.log(`Event ${event.eventId} already processed, skipping`)
return
}
try {
// Process event
await this.handler.handle(event)
// Record completion (auto-cleanup with TTL)
await this.processedEvents.markAsProcessed(event.eventId, {
processedAt: new Date(),
ttl: 60 * 60 * 24 * 7, // Expire after 7 days
})
} catch (error) {
// Not recorded on failure so retry is possible
throw error
}
}
}
// Redis-based duplicate detection
class RedisProcessedEventStore implements ProcessedEventStore {
constructor(private redis: Redis) {}
async exists(eventId: string): Promise<boolean> {
const result = await this.redis.exists(`processed:${eventId}`)
return result === 1
}
async markAsProcessed(
eventId: string,
options: { processedAt: Date; ttl: number }
): Promise<void> {
await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
}
}
2. Ensuring Event Ordering
To guarantee event ordering in Kafka, events from the same Aggregate must be routed to the same partition. Using the Aggregate ID as the partition key is standard practice.
3. Schema Evolution
Since events are immutable, schema changes must maintain backward compatibility. Use a schema registry like Avro or Protobuf, or apply the upcasting pattern.
// TypeScript - Event Upcaster (Schema Evolution)
class EventUpcaster {
private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()
register(
eventType: string,
fromVersion: number,
upcaster: (event: DomainEvent) => DomainEvent
): void {
if (!this.upcasters.has(eventType)) {
this.upcasters.set(eventType, new Map())
}
this.upcasters.get(eventType)!.set(fromVersion, upcaster)
}
upcast(event: DomainEvent): DomainEvent {
const typeUpcasters = this.upcasters.get(event.eventType)
if (!typeUpcasters) return event
let currentEvent = event
let schemaVersion = (event.metadata as any).schemaVersion || 1
while (typeUpcasters.has(schemaVersion)) {
const upcasterFn = typeUpcasters.get(schemaVersion)!
currentEvent = upcasterFn(currentEvent)
schemaVersion++
}
return currentEvent
}
}
// Usage example: OrderPlaced event v1 -> v2 upcasting
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => {
// v1 had shippingAddress as a string
// v2 changed to a structured object
const payload = event.payload as any
return {
...event,
payload: {
...payload,
shippingAddress: {
full: payload.shippingAddress,
city: '',
zipCode: '',
},
},
metadata: {
...event.metadata,
schemaVersion: 2,
},
}
})
4. Key Monitoring Metrics
The following metrics should be monitored to check the health of an EDA system.
| Metric | Description | Alert Threshold |
|---|---|---|
| Consumer Lag | Consumer processing delay (unprocessed events) | Over 1,000 events |
| Event Processing Latency | Time from event publication to processing | P99 over 5 seconds |
| Saga Completion Rate | Saga success rate | Below 99% |
| Compensation Failure Rate | Compensating transaction failure rate | Over 0.1% |
| Projection Lag | Sync delay between read and write models | Over 30 seconds |
| Dead Letter Queue Size | Number of unprocessable events | Immediate alert when > 0 |
Failure Cases and Recovery Procedures
Case 1: Compensation Transaction Failure (Most Dangerous Scenario)
Problem: Payment was completed but inventory reservation failed. The compensation (refund) was attempted but the refund also failed due to payment gateway timeout.
Recovery procedure:
- Mark Saga state as "REQUIRES_INTERVENTION"
- Register failed compensation transaction in Dead Letter Queue
- Automatic retry (exponential backoff): 1s, 2s, 4s, 8s, 16s
- Alert ops team via PagerDuty/Slack when max retries exceeded
- Operator manually processes refund via payment gateway console
- Update Saga state to "MANUALLY_COMPENSATED"
Case 2: Event Order Reversal
Problem: Due to network delay, OrderCancelled event arrives before OrderConfirmed
Prevention and recovery:
- Include version field in events for order verification
- Buffer and re-sort when version does not match expectations
- Using EventStoreDB prevents this issue entirely since ordering within streams is guaranteed
Case 3: Read Model Inconsistency Due to Projection Failure
Problem: Projection process crashes and the read model does not reflect the latest state
Recovery procedure:
- Check the projection process's last checkpoint
- Re-run projection from events after the checkpoint
- For severe inconsistency, drop the read model and replay all events
- During replay, redirect read traffic to cache or write DB
# Python - Projection recovery script
import asyncio
from datetime import datetime
class ProjectionRecovery:
def __init__(self, event_store, read_db, projection):
self.event_store = event_store
self.read_db = read_db
self.projection = projection
async def recover_from_checkpoint(self) -> dict:
"""Checkpoint-based projection recovery"""
# 1. Check last checkpoint
checkpoint = await self.read_db.get_checkpoint(
self.projection.name
)
last_position = checkpoint.get("position", 0) if checkpoint else 0
print(
f"Recovering projection '{self.projection.name}' "
f"from position {last_position}"
)
# 2. Load events after checkpoint
events = await self.event_store.read_all_from(last_position)
processed = 0
errors = 0
for event in events:
try:
await self.projection.handle(event)
processed += 1
# Update checkpoint every 100 events
if processed % 100 == 0:
await self.read_db.save_checkpoint(
self.projection.name,
{"position": event["global_position"]},
)
except Exception as e:
errors += 1
print(
f"Error processing event "
f"{event['event_id']}: {e}"
)
# Continue after logging error (skip & log)
continue
# 3. Save final checkpoint
if events:
await self.read_db.save_checkpoint(
self.projection.name,
{"position": events[-1]["global_position"]},
)
return {
"projection": self.projection.name,
"processed": processed,
"errors": errors,
"recovered_at": datetime.utcnow().isoformat(),
}
async def full_rebuild(self) -> dict:
"""Full rebuild of read model"""
print(
f"Full rebuild of projection '{self.projection.name}'"
)
# 1. Delete existing read model
await self.read_db.drop_projection_data(self.projection.name)
# 2. Reset checkpoint
await self.read_db.save_checkpoint(
self.projection.name, {"position": 0}
)
# 3. Replay all events
return await self.recover_from_checkpoint()
Case 4: Event Store Disk Shortage
Problem: Events accumulate continuously and disk capacity becomes insufficient
Prevention strategy:
- After creating snapshots, move old events to archive (S3/Glacier)
- Establish event retention policy: hot data (30 days), warm data (1 year), cold data (permanent archive)
- Set disk usage alerts at 80% warning, 90% critical
Conclusion
The three core patterns of Event-Driven Architecture -- CQRS, Event Sourcing, and Saga -- are each powerful independently but fundamentally solve data consistency problems in microservice architecture when used together.
Key takeaways:
-
CQRS: Acknowledge and separate the asymmetric requirements of reads and writes. You can independently optimize read traffic that comprises 80-90% of total traffic.
-
Event Sourcing: Store change history instead of current state. This enables complete audit trails, time-travel debugging, and diverse read model generation. However, snapshot strategies and schema evolution strategies must be considered from the start.
-
Saga Pattern: Manage distributed transactions with compensation-based approaches. Implement simple flows with Choreography and complex business logic with Orchestration, but always prepare recovery strategies for compensation transaction failures.
-
Event Store Selection: Consider EventStoreDB for pure Event Sourcing, Kafka for high-volume streaming, and DynamoDB for AWS serverless, with combinations possible based on requirements.
These patterns are powerful but come with the cost of complexity. Rather than applying them uniformly to all services, the realistic strategy is to introduce them incrementally starting with core domains where business complexity is high and data consistency is critical. Adopting these patterns without operational monitoring, idempotency guarantees, and schema evolution strategies can actually harm system stability.
References
- Microsoft Azure - CQRS Pattern - Official reference architecture documentation for the CQRS pattern
- Microservices.io - Event Sourcing Pattern - Chris Richardson's Event Sourcing pattern explanation
- Microservices.io - Saga Pattern - Detailed guide for the Saga pattern for distributed transaction management
- EventStoreDB Documentation - EventStoreDB official documentation and getting started guide
- Temporal.io - Mastering Saga Patterns - Temporal-based Saga pattern implementation master guide
- ByteByteGo - Saga Pattern Demystified - Orchestration vs Choreography comparison analysis
- Microsoft - Exploring CQRS and Event Sourcing - Microsoft patterns and practices CQRS Journey guidebook
- Domain Centric - EventStoreDB vs Kafka - Detailed comparison of EventStoreDB and Kafka
- AWS - Event Sourcing on AWS - AWS-based Event Sourcing architecture patterns
- Azure Architecture Center - Saga Design Pattern - Microsoft's official Saga design pattern documentation