Skip to content

필사 모드: Event-Driven Architecture Practical Guide: CQRS, Event Sourcing, Saga Patterns

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Introduction

One of the biggest challenges in microservice architecture is managing data consistency and business transactions across multiple services. In a traditional monolith, this could be solved with a single database's ACID transactions, but in distributed environments where each service has independent data stores, the limitations of 2PC (Two-Phase Commit) are clear. Performance bottlenecks, reduced availability, and strong coupling between services result.

Event-Driven Architecture (EDA) offers a fundamental solution to this problem. It transitions inter-service communication to asynchronous event-based messaging, manages state changes as event streams, and handles distributed transactions with compensation-based sagas. This article deeply analyzes the three core EDA patterns -- CQRS, Event Sourcing, and Saga -- with production code, and covers strategies and troubleshooting techniques needed for production operations.

Netflix uses these patterns for its personalization system serving 260 million subscribers, LinkedIn for processing trillions of daily events, and Slack for handling billions of daily messages. Let us explore the patterns and anti-patterns validated from the experience of these large-scale systems.

Event-Driven Architecture Fundamentals

Three Types of Events

In EDA, events are classified into three types based on their purpose.

| Type | Description | Example | Characteristics |

| ------------------ | ---------------------------------------- | ----------------------------- | ------------------------------------- |

| Domain Event | A fact recorded from the business domain | OrderPlaced, PaymentCompleted | Immutable, past-tense naming |

| Integration Event | Events for inter-service communication | OrderPlacedIntegrationEvent | Crosses bounded context boundaries |

| Event Notification | Change notification (minimal data) | OrderStatusChanged (ID only) | Receiver queries needed data directly |

Core Principles

The core principles for correctly implementing EDA are as follows.

1. **Asynchronous Communication**: Producers and consumers are temporally decoupled

2. **Loose Coupling**: Services only need to know the event schema, not the implementation of other services

3. **Eventual Consistency**: Instead of immediate strong consistency, consistency is guaranteed after a certain period

4. **Idempotency**: Processing the same event multiple times must produce the same result

// TypeScript - Domain Event basic structure

interface DomainEvent {

eventId: string

eventType: string

aggregateId: string

aggregateType: string

timestamp: Date

version: number

payload: Record<string, unknown>

metadata: {

correlationId: string

causationId: string

userId?: string

}

}

// Order creation event example

const orderPlacedEvent: DomainEvent = {

eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',

eventType: 'OrderPlaced',

aggregateId: 'order-12345',

aggregateType: 'Order',

timestamp: new Date('2026-03-14T09:00:00Z'),

version: 1,

payload: {

customerId: 'cust-67890',

items: [

{ productId: 'prod-001', quantity: 2, price: 29900 },

{ productId: 'prod-002', quantity: 1, price: 15000 },

],

totalAmount: 74800,

shippingAddress: {

city: 'Seoul',

district: 'Gangnam-gu',

detail: 'Teheran-ro 123',

},

},

metadata: {

correlationId: 'corr-abc123',

causationId: 'cmd-place-order-001',

userId: 'user-admin-01',

},

}

CQRS Pattern: Separating Commands and Queries

What Is CQRS?

CQRS (Command Query Responsibility Segregation) is a pattern that separates data writes (Commands) and reads (Queries) into separate models. It extends Bertrand Meyer's CQS (Command Query Separation) principle to the architectural level, formalized by Greg Young in 2010.

In the traditional CRUD model, the same data model handles both reads and writes. However, in real business scenarios, read and write requirements differ significantly.

| Aspect | Command (Write) | Query (Read) |

| ---------------- | -------------------------------------- | ----------------------------------------- |

| Purpose | State change, business rule validation | Data retrieval, display |

| Ratio | 10-20% of total traffic | 80-90% of total traffic |

| Consistency | Strong consistency needed | Eventual consistency acceptable |

| Model Complexity | Rich domain logic | Denormalized read models |

| Scaling | Primarily vertical scaling | Easy horizontal scaling (cache, replicas) |

CQRS Implementation: TypeScript Example

// TypeScript - CQRS Command Side

// Command definition

interface PlaceOrderCommand {

type: 'PlaceOrder'

customerId: string

items: Array<{

productId: string

quantity: number

price: number

}>

shippingAddress: string

}

// Command Handler

class PlaceOrderHandler {

constructor(

private orderRepository: OrderWriteRepository,

private eventBus: EventBus,

private inventoryService: InventoryService

) {}

async handle(command: PlaceOrderCommand): Promise<string> {

// 1. Business rule validation

await this.inventoryService.validateStock(command.items)

// 2. Aggregate creation

const order = Order.create({

customerId: command.customerId,

items: command.items,

shippingAddress: command.shippingAddress,

})

// 3. Save to write store

await this.orderRepository.save(order)

// 4. Publish domain events (for read model sync)

for (const event of order.getDomainEvents()) {

await this.eventBus.publish(event)

}

return order.id

}

}

// Query Side - Read-only model

interface OrderReadModel {

orderId: string

customerName: string

orderDate: string

status: string

totalAmount: number

itemCount: number

lastUpdated: string

}

// Query Handler

class GetOrdersQueryHandler {

constructor(private readDb: ReadDatabase) {}

async handle(query: {

customerId: string

status?: string

page: number

limit: number

}): Promise<OrderReadModel[]> {

// Query directly from denormalized read-only table

return this.readDb.query(

`SELECT order_id, customer_name, order_date, status,

total_amount, item_count, last_updated

FROM order_read_model

WHERE customer_id = ?

${query.status ? 'AND status = ?' : ''}

ORDER BY order_date DESC

LIMIT ? OFFSET ?`,

[query.customerId, query.status, query.limit, query.page * query.limit]

)

}

}

Read Model Projection

The projection logic that receives events and updates the read model is the core of CQRS.

// TypeScript - Event-based Projection

class OrderProjection {

constructor(private readDb: ReadDatabase) {}

async handle(event: DomainEvent): Promise<void> {

switch (event.eventType) {

case 'OrderPlaced':

await this.onOrderPlaced(event)

break

case 'OrderShipped':

await this.onOrderShipped(event)

break

case 'OrderCancelled':

await this.onOrderCancelled(event)

break

}

}

private async onOrderPlaced(event: DomainEvent): Promise<void> {

const payload = event.payload as {

customerId: string

items: Array<{ quantity: number; price: number }>

totalAmount: number

}

await this.readDb.upsert('order_read_model', {

order_id: event.aggregateId,

customer_id: payload.customerId,

status: 'PLACED',

total_amount: payload.totalAmount,

item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),

order_date: event.timestamp,

last_updated: event.timestamp,

version: event.version,

})

}

private async onOrderShipped(event: DomainEvent): Promise<void> {

const payload = event.payload as { trackingNumber: string }

// Idempotency guarantee: version check

await this.readDb.updateWhere(

'order_read_model',

{

status: 'SHIPPED',

tracking_number: payload.trackingNumber,

last_updated: event.timestamp,

version: event.version,

},

{ order_id: event.aggregateId, version: event.version - 1 }

)

}

private async onOrderCancelled(event: DomainEvent): Promise<void> {

const payload = event.payload as { reason: string }

await this.readDb.updateWhere(

'order_read_model',

{

status: 'CANCELLED',

cancellation_reason: payload.reason,

last_updated: event.timestamp,

version: event.version,

},

{ order_id: event.aggregateId }

)

}

}

Event Sourcing: Event-Based State Management

Core Concept of Event Sourcing

Event Sourcing is a pattern that stores all events that changed the state of an aggregate in order, instead of storing the current state. The current state is reconstructed by replaying the event stream from the beginning.

Comparing the traditional approach with Event Sourcing:

| Aspect | Traditional CRUD | Event Sourcing |

| -------------- | -------------------------------- | --------------------------------- |

| What is stored | Current state (latest snapshot) | Complete history of state changes |

| Data loss | Previous states are lost | All change history is preserved |

| Audit | Requires separate implementation | Built-in |

| Debugging | Only current state viewable | Time Travel possible |

| Storage space | Relatively small | Increases with event accumulation |

| Query perf | Direct queries possible | Requires projection or snapshots |

Event Sourcing Implementation

// TypeScript - Event Sourcing Aggregate

abstract class EventSourcedAggregate {

private uncommittedEvents: DomainEvent[] = []

protected version: number = 0

abstract get id(): string

// Apply event (state change)

protected apply(event: DomainEvent): void {

this.when(event)

this.version++

this.uncommittedEvents.push(event)

}

// Event handler (implemented by subclass)

protected abstract when(event: DomainEvent): void

// Restore state from event stream

loadFromHistory(events: DomainEvent[]): void {

for (const event of events) {

this.when(event)

this.version++

}

}

getUncommittedEvents(): DomainEvent[] {

return [...this.uncommittedEvents]

}

clearUncommittedEvents(): void {

this.uncommittedEvents = []

}

}

// Order Aggregate

class Order extends EventSourcedAggregate {

private _id: string = ''

private _customerId: string = ''

private _items: OrderItem[] = []

private _status: OrderStatus = OrderStatus.DRAFT

private _totalAmount: number = 0

get id(): string {

return this._id

}

// Factory method (create command)

static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {

const order = new Order()

const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

order.apply({

eventId: crypto.randomUUID(),

eventType: 'OrderPlaced',

aggregateId: params.orderId,

aggregateType: 'Order',

timestamp: new Date(),

version: 1,

payload: {

customerId: params.customerId,

items: params.items,

totalAmount,

},

metadata: {

correlationId: crypto.randomUUID(),

causationId: 'create',

},

})

return order

}

// Confirm order command

confirm(): void {

if (this._status !== OrderStatus.PLACED) {

throw new Error(`Cannot confirm order in status: ${this._status}`)

}

this.apply({

eventId: crypto.randomUUID(),

eventType: 'OrderConfirmed',

aggregateId: this._id,

aggregateType: 'Order',

timestamp: new Date(),

version: this.version + 1,

payload: { confirmedAt: new Date().toISOString() },

metadata: {

correlationId: crypto.randomUUID(),

causationId: 'confirm',

},

})

}

// Cancel order command (used in compensating transactions)

cancel(reason: string): void {

if (this._status === OrderStatus.CANCELLED) {

return // Idempotency guarantee

}

this.apply({

eventId: crypto.randomUUID(),

eventType: 'OrderCancelled',

aggregateId: this._id,

aggregateType: 'Order',

timestamp: new Date(),

version: this.version + 1,

payload: { reason, cancelledAt: new Date().toISOString() },

metadata: {

correlationId: crypto.randomUUID(),

causationId: 'cancel',

},

})

}

// Event handler - state change logic

protected when(event: DomainEvent): void {

switch (event.eventType) {

case 'OrderPlaced': {

const p = event.payload as {

customerId: string

items: OrderItem[]

totalAmount: number

}

this._id = event.aggregateId

this._customerId = p.customerId

this._items = p.items

this._totalAmount = p.totalAmount

this._status = OrderStatus.PLACED

break

}

case 'OrderConfirmed':

this._status = OrderStatus.CONFIRMED

break

case 'OrderCancelled':

this._status = OrderStatus.CANCELLED

break

}

}

}

enum OrderStatus {

DRAFT = 'DRAFT',

PLACED = 'PLACED',

CONFIRMED = 'CONFIRMED',

SHIPPED = 'SHIPPED',

CANCELLED = 'CANCELLED',

}

interface OrderItem {

productId: string

quantity: number

price: number

}

Snapshot Strategy

As the number of events grows, replay time increases. Snapshots cache state at specific points to improve replay performance.

Python - Snapshot-based Event Store

from dataclasses import dataclass, field

from datetime import datetime

from typing import Any

@dataclass

class Snapshot:

aggregate_id: str

aggregate_type: str

version: int

state: dict

created_at: datetime = field(default_factory=datetime.utcnow)

class EventStore:

SNAPSHOT_INTERVAL = 100 # Create snapshot every 100 events

def __init__(self, db_connection):

self.db = db_connection

async def save_events(

self, aggregate_id: str, events: list[dict], expected_version: int

) -> None:

"""Save events with optimistic concurrency control"""

async with self.db.transaction():

Check current version (optimistic locking)

current_version = await self._get_current_version(aggregate_id)

if current_version != expected_version:

raise ConcurrencyError(

f"Expected version {expected_version}, "

f"but current version is {current_version}"

)

Batch insert events

for i, event in enumerate(events):

version = expected_version + i + 1

await self.db.execute(

"""INSERT INTO event_store

(event_id, aggregate_id, aggregate_type,

event_type, version, payload, metadata, created_at)

VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",

(

event["event_id"],

aggregate_id,

event["aggregate_type"],

event["event_type"],

version,

json.dumps(event["payload"]),

json.dumps(event["metadata"]),

datetime.utcnow(),

),

)

Check snapshot creation condition

new_version = expected_version + len(events)

if new_version % self.SNAPSHOT_INTERVAL == 0:

await self._create_snapshot(aggregate_id, new_version)

async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:

"""Load events from snapshot to restore state"""

1. Query most recent snapshot

snapshot = await self._get_latest_snapshot(aggregate_id)

if snapshot:

2. Load only events after snapshot

events = await self.db.fetch_all(

"""SELECT * FROM event_store

WHERE aggregate_id = ? AND version > ?

ORDER BY version ASC""",

(aggregate_id, snapshot.version),

)

return events, snapshot

else:

3. Load all events if no snapshot

events = await self.db.fetch_all(

"""SELECT * FROM event_store

WHERE aggregate_id = ?

ORDER BY version ASC""",

(aggregate_id,),

)

return events, None

async def _create_snapshot(

self, aggregate_id: str, version: int

) -> None:

"""Save current state as snapshot"""

events, _ = await self.load_aggregate(aggregate_id)

Rebuild aggregate and serialize state

aggregate = self._rebuild_aggregate(events)

await self.db.execute(

"""INSERT INTO snapshots

(aggregate_id, aggregate_type, version, state, created_at)

VALUES (?, ?, ?, ?, ?)""",

(

aggregate_id,

aggregate.aggregate_type,

version,

json.dumps(aggregate.to_dict()),

datetime.utcnow(),

),

)

async def _get_current_version(self, aggregate_id: str) -> int:

result = await self.db.fetch_one(

"SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",

(aggregate_id,),

)

return result[0] if result[0] else 0

class ConcurrencyError(Exception):

pass

Saga Pattern: Distributed Transaction Management

What Is the Saga Pattern?

The Saga pattern manages business transactions spanning multiple services in distributed environments. Unlike traditional distributed transactions (2PC), it sequentially executes each service's local transactions and, upon failure, executes compensating transactions for already completed steps to restore consistency.

An example of an order processing Saga flow:

**Normal flow**:

1. Order Service: Create order (OrderCreated)

2. Payment Service: Process payment (PaymentProcessed)

3. Inventory Service: Reserve inventory (InventoryReserved)

4. Shipping Service: Create shipment (ShipmentCreated)

**Compensation flow on failure** (when step 3 inventory reservation fails):

1. Payment Service: Refund payment (PaymentRefunded) -- compensation

2. Order Service: Cancel order (OrderCancelled) -- compensation

Orchestration-based Saga Implementation

// TypeScript - Saga Orchestrator (Order Processing)

interface SagaStep {

name: string

action: () => Promise<void>

compensation: () => Promise<void>

}

class OrderSagaOrchestrator {

private completedSteps: SagaStep[] = []

private sagaLog: SagaLogEntry[] = []

constructor(

private paymentService: PaymentService,

private inventoryService: InventoryService,

private shippingService: ShippingService,

private sagaStore: SagaStore

) {}

async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {

const sagaId = crypto.randomUUID()

const steps: SagaStep[] = [

{

name: 'ProcessPayment',

action: async () => {

await this.paymentService.processPayment({

orderId,

amount: orderData.totalAmount,

customerId: orderData.customerId,

})

},

compensation: async () => {

await this.paymentService.refundPayment({

orderId,

amount: orderData.totalAmount,

})

},

},

{

name: 'ReserveInventory',

action: async () => {

await this.inventoryService.reserve({

orderId,

items: orderData.items,

})

},

compensation: async () => {

await this.inventoryService.releaseReservation({

orderId,

items: orderData.items,

})

},

},

{

name: 'CreateShipment',

action: async () => {

await this.shippingService.createShipment({

orderId,

address: orderData.shippingAddress,

items: orderData.items,

})

},

compensation: async () => {

await this.shippingService.cancelShipment({ orderId })

},

},

]

try {

for (const step of steps) {

await this.logStep(sagaId, step.name, 'STARTED')

try {

await step.action()

this.completedSteps.push(step)

await this.logStep(sagaId, step.name, 'COMPLETED')

} catch (error) {

await this.logStep(sagaId, step.name, 'FAILED', error)

// Execute compensating transactions

await this.compensate(sagaId)

return {

success: false,

sagaId,

failedStep: step.name,

error: (error as Error).message,

}

}

}

await this.sagaStore.markCompleted(sagaId)

return { success: true, sagaId }

} catch (compensationError) {

// When compensation transaction also fails - manual intervention needed

await this.sagaStore.markRequiresIntervention(sagaId)

throw new SagaCompensationFailedError(sagaId, compensationError as Error)

}

}

private async compensate(sagaId: string): Promise<void> {

// Compensate completed steps in reverse order

const stepsToCompensate = [...this.completedSteps].reverse()

for (const step of stepsToCompensate) {

try {

await this.logStep(sagaId, step.name, 'COMPENSATING')

await step.compensation()

await this.logStep(sagaId, step.name, 'COMPENSATED')

} catch (error) {

await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)

// Register in retry queue on compensation failure

await this.sagaStore.enqueueRetry(sagaId, step.name)

throw error

}

}

}

private async logStep(

sagaId: string,

stepName: string,

status: string,

error?: unknown

): Promise<void> {

const entry: SagaLogEntry = {

sagaId,

stepName,

status,

timestamp: new Date(),

error: error ? (error as Error).message : undefined,

}

this.sagaLog.push(entry)

await this.sagaStore.appendLog(entry)

}

}

interface SagaResult {

success: boolean

sagaId: string

failedStep?: string

error?: string

}

interface SagaLogEntry {

sagaId: string

stepName: string

status: string

timestamp: Date

error?: string

}

Choreography vs Orchestration Comparison

A detailed comparison of the two Saga pattern implementation approaches.

| Comparison | Choreography | Orchestration |

| -------------------- | ------------------------------------------------------ | -------------------------------------------- |

| Control method | Distributed - each service publishes/subscribes events | Centralized - orchestrator controls the flow |

| Coupling | Low (only event schema shared) | Medium (orchestrator must know all services) |

| Visibility | Low (hard to trace flow) | High (state viewable in orchestrator) |

| Complexity mgmt | Rapidly complex as services increase | Increases linearly |

| SPOF | None | Orchestrator can be SPOF |

| Compensation | Distributed across services | Centralized in orchestrator |

| Testing | Integration testing difficult | Orchestrator unit testing easy |

| Suitable scale | Simple workflows with 2-4 services | Complex workflows with 5+ services |

| Representative tools | Kafka, RabbitMQ, SNS/SQS | Temporal, Camunda, AWS Step Functions |

Choreography Pattern Code Example

Python - Choreography-based Saga (event subscription approach)

from abc import ABC, abstractmethod

from dataclasses import dataclass

from enum import Enum

class EventBus:

"""Simple in-memory event bus (use Kafka/RabbitMQ in production)"""

def __init__(self):

self._handlers: dict[str, list] = {}

def subscribe(self, event_type: str, handler):

if event_type not in self._handlers:

self._handlers[event_type] = []

self._handlers[event_type].append(handler)

async def publish(self, event_type: str, payload: dict):

handlers = self._handlers.get(event_type, [])

for handler in handlers:

await handler(payload)

Payment Service - Choreography approach

class PaymentService:

def __init__(self, event_bus: EventBus):

self.event_bus = event_bus

Subscribe to order creation event

event_bus.subscribe("OrderCreated", self.on_order_created)

Subscribe to inventory failure event (for compensation)

event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)

async def on_order_created(self, payload: dict):

"""Process payment on order creation"""

try:

order_id = payload["order_id"]

amount = payload["total_amount"]

Payment processing logic

payment_result = await self._process_payment(

order_id, amount

)

Publish success event

await self.event_bus.publish("PaymentProcessed", {

"order_id": order_id,

"payment_id": payment_result["payment_id"],

"amount": amount,

})

except Exception as e:

Publish failure event

await self.event_bus.publish("PaymentFailed", {

"order_id": payload["order_id"],

"reason": str(e),

})

async def on_inventory_failed(self, payload: dict):

"""Refund payment on inventory failure (compensating transaction)"""

order_id = payload["order_id"]

await self._refund_payment(order_id)

await self.event_bus.publish("PaymentRefunded", {

"order_id": order_id,

})

async def _process_payment(self, order_id: str, amount: int) -> dict:

Actual payment processing logic

return {"payment_id": f"pay-{order_id}"}

async def _refund_payment(self, order_id: str) -> None:

Actual refund processing logic

pass

Inventory Service - Choreography approach

class InventoryService:

def __init__(self, event_bus: EventBus):

self.event_bus = event_bus

Subscribe to payment completion event

event_bus.subscribe("PaymentProcessed", self.on_payment_processed)

async def on_payment_processed(self, payload: dict):

"""Reserve inventory on payment completion"""

try:

order_id = payload["order_id"]

Check and reserve inventory

await self._reserve_inventory(order_id)

await self.event_bus.publish("InventoryReserved", {

"order_id": order_id,

})

except InsufficientStockError:

await self.event_bus.publish("InventoryReservationFailed", {

"order_id": payload["order_id"],

"reason": "insufficient_stock",

})

class InsufficientStockError(Exception):

pass

Hybrid Approach: When to Choose Which

In practice, rather than using pure Choreography or Orchestration alone, mixing them based on workflow complexity is effective.

- **Choose Choreography**: Simple notifications, cache invalidation, log collection with 2-3 participating services

- **Choose Orchestration**: Core business flows like payment-inventory-shipping where order and compensation matter

- **Hybrid**: Core business flows via Orchestration, side effects (email, notifications, analytics events) via Choreography

Event Store Selection (EventStoreDB, Kafka, DynamoDB)

Event store selection varies based on system requirements and team capabilities. Here is a comparison of the three main options.

| Comparison | EventStoreDB | Apache Kafka | DynamoDB Streams |

| -------------- | ----------------------------------------------- | ----------------------------------------- | ---------------------------------------- |

| Design purpose | Dedicated Event Sourcing DB | Distributed message streaming platform | General NoSQL + change streams |

| Stream model | Fine-grained individual streams (per Aggregate) | Topic-partition based | Table + DynamoDB Streams/Kinesis |

| Concurrency | ExpectedVersion (optimistic locking) built-in | Partition-level ordering only | Conditional writes (ConditionExpression) |

| ID lookup | Instant lookup by stream ID | Cannot look up specific entity in topic | Direct lookup by partition key |

| Event ordering | Fully guaranteed within stream | Guaranteed only within partition | Guaranteed within partition key |

| Projections | Server-side projections built-in | Kafka Streams / ksqlDB | Lambda + DynamoDB Streams |

| Subscription | Persistent / Catch-up subscriptions | Consumer Group | DynamoDB Streams / Kinesis |

| Ops complexity | Medium (dedicated cluster) | High (ZooKeeper/KRaft, partition mgmt) | Low (serverless, AWS managed) |

| Cost model | Open source + commercial cloud | Open source + MSK/Confluent Cloud | Pay-per-request/storage |

| Best fit | Pure Event Sourcing systems | High-volume event streaming + integration | AWS native, serverless architecture |

Selection Guide

**When EventStoreDB is suitable:**

- Event Sourcing is the core architectural pattern

- Fine-grained stream management and projections are needed

- Team actively uses DDD-based design

**When Kafka is suitable:**

- High-volume event streaming is the main purpose

- Kafka infrastructure already exists and team expertise is sufficient

- Both Event Sourcing and event streaming are needed (consider EventStoreDB + Kafka combination)

**When DynamoDB is suitable:**

- Primarily using the AWS ecosystem

- Targeting serverless architecture

- Minimizing operational burden

EventStoreDB Configuration Example

docker-compose.yml - EventStoreDB cluster setup

version: '3.8'

services:

eventstoredb:

image: eventstore/eventstore:24.2

container_name: eventstoredb

environment:

- EVENTSTORE_CLUSTER_SIZE=1

- EVENTSTORE_RUN_PROJECTIONS=All

- EVENTSTORE_START_STANDARD_PROJECTIONS=true

- EVENTSTORE_INSECURE=true

- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true

- EVENTSTORE_MEM_DB=false

- EVENTSTORE_DB=/var/lib/eventstore-data

- EVENTSTORE_INDEX=/var/lib/eventstore-index

- EVENTSTORE_LOG=/var/log/eventstore

ports:

- '2113:2113' # HTTP/gRPC

- '1113:1113' # TCP (legacy)

volumes:

- eventstore-data:/var/lib/eventstore-data

- eventstore-index:/var/lib/eventstore-index

- eventstore-logs:/var/log/eventstore

Kafka - for event streaming (combined with EventStoreDB)

kafka:

image: confluentinc/cp-kafka:7.6.0

container_name: kafka

environment:

KAFKA_NODE_ID: 1

KAFKA_PROCESS_ROLES: broker,controller

KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093

KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

KAFKA_LOG_DIRS: /var/lib/kafka/data

KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'

KAFKA_NUM_PARTITIONS: 6

KAFKA_DEFAULT_REPLICATION_FACTOR: 1

CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

ports:

- '9092:9092'

volumes:

- kafka-data:/var/lib/kafka/data

volumes:

eventstore-data:

eventstore-index:

eventstore-logs:

kafka-data:

Production Implementation: Order System Example

Let us examine a production implementation that integrates CQRS, Event Sourcing, and Saga patterns through an e-commerce order system.

System Architecture Overview

The overall system consists of the following structure:

- **Order Service**: Order Aggregate (Event Sourcing), CQRS applied

- **Payment Service**: Payment processing, refund compensating transaction

- **Inventory Service**: Inventory reservation, release compensating transaction

- **Shipping Service**: Shipment creation, cancellation compensating transaction

- **Saga Orchestrator**: Temporal-based workflow management

Temporal-based Saga Workflow

// TypeScript - Order Saga implemented with Temporal Workflow

// Activity proxies

const payment = proxyActivities<PaymentActivities>({

startToCloseTimeout: '30s',

retry: {

maximumAttempts: 3,

initialInterval: '1s',

backoffCoefficient: 2,

maximumInterval: '30s',

},

})

const inventory = proxyActivities<InventoryActivities>({

startToCloseTimeout: '10s',

retry: { maximumAttempts: 3 },

})

const shipping = proxyActivities<ShippingActivities>({

startToCloseTimeout: '15s',

retry: { maximumAttempts: 3 },

})

const notification = proxyActivities<NotificationActivities>({

startToCloseTimeout: '5s',

retry: { maximumAttempts: 5 },

})

// Signal definition (send messages to workflow from outside)

const cancelOrderSignal = defineSignal<[string]>('cancelOrder')

// Order Processing Saga Workflow

export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {

let isCancelled = false

let cancelReason = ''

// Cancel signal handler

setHandler(cancelOrderSignal, (reason: string) => {

isCancelled = true

cancelReason = reason

})

const compensations: Array<() => Promise<void>> = []

try {

// Step 1: Process payment

if (isCancelled) throw new SagaCancelledError(cancelReason)

const paymentResult = await payment.processPayment({

orderId: input.orderId,

amount: input.totalAmount,

customerId: input.customerId,

})

compensations.push(async () => {

await payment.refundPayment({

orderId: input.orderId,

paymentId: paymentResult.paymentId,

amount: input.totalAmount,

})

})

// Step 2: Reserve inventory

if (isCancelled) throw new SagaCancelledError(cancelReason)

await inventory.reserveInventory({

orderId: input.orderId,

items: input.items,

})

compensations.push(async () => {

await inventory.releaseReservation({

orderId: input.orderId,

items: input.items,

})

})

// Step 3: Create shipment

if (isCancelled) throw new SagaCancelledError(cancelReason)

const shipmentResult = await shipping.createShipment({

orderId: input.orderId,

address: input.shippingAddress,

items: input.items,

})

compensations.push(async () => {

await shipping.cancelShipment({

orderId: input.orderId,

shipmentId: shipmentResult.shipmentId,

})

})

// All steps succeeded - send confirmation notification (Saga unaffected by failure)

await notification

.sendOrderConfirmation({

orderId: input.orderId,

customerId: input.customerId,

})

.catch(() => {

/* Notification failure is ignored */

})

return {

success: true,

orderId: input.orderId,

paymentId: paymentResult.paymentId,

shipmentId: shipmentResult.shipmentId,

}

} catch (error) {

// Execute compensating transactions (reverse order)

for (const compensate of compensations.reverse()) {

try {

await compensate()

} catch (compError) {

// Temporal automatically manages retries

// Final failure sends to Dead Letter Queue

console.error('Compensation failed:', compError)

}

}

return {

success: false,

orderId: input.orderId,

error: (error as Error).message,

}

}

}

interface OrderSagaInput {

orderId: string

customerId: string

totalAmount: number

shippingAddress: string

items: Array<{

productId: string

quantity: number

price: number

}>

}

interface OrderSagaResult {

success: boolean

orderId: string

paymentId?: string

shipmentId?: string

error?: string

}

class SagaCancelledError extends Error {

constructor(reason: string) {

super(`Saga cancelled: ${reason}`)

this.name = 'SagaCancelledError'

}

}

DynamoDB-based Event Store Schema

AWS CloudFormation - DynamoDB Event Store Table

AWSTemplateFormatVersion: '2010-09-09'

Description: Event Store on DynamoDB

Resources:

EventStoreTable:

Type: AWS::DynamoDB::Table

Properties:

TableName: event-store

BillingMode: PAY_PER_REQUEST

AttributeDefinitions:

- AttributeName: aggregateId

AttributeType: S

- AttributeName: version

AttributeType: N

- AttributeName: eventType

AttributeType: S

- AttributeName: timestamp

AttributeType: S

KeySchema:

- AttributeName: aggregateId

KeyType: HASH

- AttributeName: version

KeyType: RANGE

GlobalSecondaryIndexes:

- IndexName: eventType-timestamp-index

KeySchema:

- AttributeName: eventType

KeyType: HASH

- AttributeName: timestamp

KeyType: RANGE

Projection:

ProjectionType: ALL

StreamSpecification:

StreamViewType: NEW_AND_OLD_IMAGES

PointInTimeRecoverySpecification:

PointInTimeRecoveryEnabled: true

Tags:

- Key: Environment

Value: production

- Key: Service

Value: event-store

SnapshotTable:

Type: AWS::DynamoDB::Table

Properties:

TableName: event-store-snapshots

BillingMode: PAY_PER_REQUEST

AttributeDefinitions:

- AttributeName: aggregateId

AttributeType: S

- AttributeName: version

AttributeType: N

KeySchema:

- AttributeName: aggregateId

KeyType: HASH

- AttributeName: version

KeyType: RANGE

Tags:

- Key: Environment

Value: production

SagaStateTable:

Type: AWS::DynamoDB::Table

Properties:

TableName: saga-state

BillingMode: PAY_PER_REQUEST

AttributeDefinitions:

- AttributeName: sagaId

AttributeType: S

- AttributeName: createdAt

AttributeType: S

KeySchema:

- AttributeName: sagaId

KeyType: HASH

GlobalSecondaryIndexes:

- IndexName: createdAt-index

KeySchema:

- AttributeName: createdAt

KeyType: HASH

Projection:

ProjectionType: ALL

TimeToLiveSpecification:

AttributeName: ttl

Enabled: true

Operational Considerations and Troubleshooting

1. Ensuring Idempotency

In distributed environments, events have "at-least-once" delivery guarantees, so the same event may be processed multiple times. All event handlers must guarantee idempotency.

// TypeScript - Idempotency guarantee pattern

class IdempotentEventHandler {

constructor(

private processedEvents: ProcessedEventStore,

private handler: EventHandler

) {}

async handle(event: DomainEvent): Promise<void> {

// Check if event was already processed

const isProcessed = await this.processedEvents.exists(event.eventId)

if (isProcessed) {

console.log(`Event ${event.eventId} already processed, skipping`)

return

}

try {

// Process event

await this.handler.handle(event)

// Record completion (auto-cleanup with TTL)

await this.processedEvents.markAsProcessed(event.eventId, {

processedAt: new Date(),

ttl: 60 * 60 * 24 * 7, // Expire after 7 days

})

} catch (error) {

// Not recorded on failure so retry is possible

throw error

}

}

}

// Redis-based duplicate detection

class RedisProcessedEventStore implements ProcessedEventStore {

constructor(private redis: Redis) {}

async exists(eventId: string): Promise<boolean> {

const result = await this.redis.exists(`processed:${eventId}`)

return result === 1

}

async markAsProcessed(

eventId: string,

options: { processedAt: Date; ttl: number }

): Promise<void> {

await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())

}

}

2. Ensuring Event Ordering

To guarantee event ordering in Kafka, events from the same Aggregate must be routed to the same partition. Using the Aggregate ID as the partition key is standard practice.

3. Schema Evolution

Since events are immutable, schema changes must maintain backward compatibility. Use a schema registry like Avro or Protobuf, or apply the upcasting pattern.

// TypeScript - Event Upcaster (Schema Evolution)

class EventUpcaster {

private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()

register(

eventType: string,

fromVersion: number,

upcaster: (event: DomainEvent) => DomainEvent

): void {

if (!this.upcasters.has(eventType)) {

this.upcasters.set(eventType, new Map())

}

this.upcasters.get(eventType)!.set(fromVersion, upcaster)

}

upcast(event: DomainEvent): DomainEvent {

const typeUpcasters = this.upcasters.get(event.eventType)

if (!typeUpcasters) return event

let currentEvent = event

let schemaVersion = (event.metadata as any).schemaVersion || 1

while (typeUpcasters.has(schemaVersion)) {

const upcasterFn = typeUpcasters.get(schemaVersion)!

currentEvent = upcasterFn(currentEvent)

schemaVersion++

}

return currentEvent

}

}

// Usage example: OrderPlaced event v1 -> v2 upcasting

const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => {

// v1 had shippingAddress as a string

// v2 changed to a structured object

const payload = event.payload as any

return {

...event,

payload: {

...payload,

shippingAddress: {

full: payload.shippingAddress,

city: '',

zipCode: '',

},

},

metadata: {

...event.metadata,

schemaVersion: 2,

},

}

})

4. Key Monitoring Metrics

The following metrics should be monitored to check the health of an EDA system.

| Metric | Description | Alert Threshold |

| ------------------------- | ---------------------------------------------- | ------------------------ |

| Consumer Lag | Consumer processing delay (unprocessed events) | Over 1,000 events |

| Event Processing Latency | Time from event publication to processing | P99 over 5 seconds |

| Saga Completion Rate | Saga success rate | Below 99% |

| Compensation Failure Rate | Compensating transaction failure rate | Over 0.1% |

| Projection Lag | Sync delay between read and write models | Over 30 seconds |

| Dead Letter Queue Size | Number of unprocessable events | Immediate alert when > 0 |

Failure Cases and Recovery Procedures

Case 1: Compensation Transaction Failure (Most Dangerous Scenario)

**Problem**: Payment was completed but inventory reservation failed. The compensation (refund) was attempted but the refund also failed due to payment gateway timeout.

**Recovery procedure**:

1. Mark Saga state as "REQUIRES_INTERVENTION"

2. Register failed compensation transaction in Dead Letter Queue

3. Automatic retry (exponential backoff): 1s, 2s, 4s, 8s, 16s

4. Alert ops team via PagerDuty/Slack when max retries exceeded

5. Operator manually processes refund via payment gateway console

6. Update Saga state to "MANUALLY_COMPENSATED"

Case 2: Event Order Reversal

**Problem**: Due to network delay, OrderCancelled event arrives before OrderConfirmed

**Prevention and recovery**:

- Include version field in events for order verification

- Buffer and re-sort when version does not match expectations

- Using EventStoreDB prevents this issue entirely since ordering within streams is guaranteed

Case 3: Read Model Inconsistency Due to Projection Failure

**Problem**: Projection process crashes and the read model does not reflect the latest state

**Recovery procedure**:

1. Check the projection process's last checkpoint

2. Re-run projection from events after the checkpoint

3. For severe inconsistency, drop the read model and replay all events

4. During replay, redirect read traffic to cache or write DB

Python - Projection recovery script

from datetime import datetime

class ProjectionRecovery:

def __init__(self, event_store, read_db, projection):

self.event_store = event_store

self.read_db = read_db

self.projection = projection

async def recover_from_checkpoint(self) -> dict:

"""Checkpoint-based projection recovery"""

1. Check last checkpoint

checkpoint = await self.read_db.get_checkpoint(

self.projection.name

)

last_position = checkpoint.get("position", 0) if checkpoint else 0

print(

f"Recovering projection '{self.projection.name}' "

f"from position {last_position}"

)

2. Load events after checkpoint

events = await self.event_store.read_all_from(last_position)

processed = 0

errors = 0

for event in events:

try:

await self.projection.handle(event)

processed += 1

Update checkpoint every 100 events

if processed % 100 == 0:

await self.read_db.save_checkpoint(

self.projection.name,

{"position": event["global_position"]},

)

except Exception as e:

errors += 1

print(

f"Error processing event "

f"{event['event_id']}: {e}"

)

Continue after logging error (skip & log)

continue

3. Save final checkpoint

if events:

await self.read_db.save_checkpoint(

self.projection.name,

{"position": events[-1]["global_position"]},

)

return {

"projection": self.projection.name,

"processed": processed,

"errors": errors,

"recovered_at": datetime.utcnow().isoformat(),

}

async def full_rebuild(self) -> dict:

"""Full rebuild of read model"""

print(

f"Full rebuild of projection '{self.projection.name}'"

)

1. Delete existing read model

await self.read_db.drop_projection_data(self.projection.name)

2. Reset checkpoint

await self.read_db.save_checkpoint(

self.projection.name, {"position": 0}

)

3. Replay all events

return await self.recover_from_checkpoint()

Case 4: Event Store Disk Shortage

**Problem**: Events accumulate continuously and disk capacity becomes insufficient

**Prevention strategy**:

- After creating snapshots, move old events to archive (S3/Glacier)

- Establish event retention policy: hot data (30 days), warm data (1 year), cold data (permanent archive)

- Set disk usage alerts at 80% warning, 90% critical

Conclusion

The three core patterns of Event-Driven Architecture -- CQRS, Event Sourcing, and Saga -- are each powerful independently but fundamentally solve data consistency problems in microservice architecture when used together.

Key takeaways:

1. **CQRS**: Acknowledge and separate the asymmetric requirements of reads and writes. You can independently optimize read traffic that comprises 80-90% of total traffic.

2. **Event Sourcing**: Store change history instead of current state. This enables complete audit trails, time-travel debugging, and diverse read model generation. However, snapshot strategies and schema evolution strategies must be considered from the start.

3. **Saga Pattern**: Manage distributed transactions with compensation-based approaches. Implement simple flows with Choreography and complex business logic with Orchestration, but always prepare recovery strategies for compensation transaction failures.

4. **Event Store Selection**: Consider EventStoreDB for pure Event Sourcing, Kafka for high-volume streaming, and DynamoDB for AWS serverless, with combinations possible based on requirements.

These patterns are powerful but come with the cost of complexity. Rather than applying them uniformly to all services, the realistic strategy is to introduce them incrementally starting with core domains where business complexity is high and data consistency is critical. Adopting these patterns without operational monitoring, idempotency guarantees, and schema evolution strategies can actually harm system stability.

References

- [Microsoft Azure - CQRS Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs) - Official reference architecture documentation for the CQRS pattern

- [Microservices.io - Event Sourcing Pattern](https://microservices.io/patterns/data/event-sourcing.html) - Chris Richardson's Event Sourcing pattern explanation

- [Microservices.io - Saga Pattern](https://microservices.io/patterns/data/saga.html) - Detailed guide for the Saga pattern for distributed transaction management

- [EventStoreDB Documentation](https://developers.eventstore.com/) - EventStoreDB official documentation and getting started guide

- [Temporal.io - Mastering Saga Patterns](https://temporal.io/blog/mastering-saga-patterns-for-distributed-transactions-in-microservices) - Temporal-based Saga pattern implementation master guide

- [ByteByteGo - Saga Pattern Demystified](https://blog.bytebytego.com/p/saga-pattern-demystified-orchestration) - Orchestration vs Choreography comparison analysis

- [Microsoft - Exploring CQRS and Event Sourcing](https://www.microsoft.com/en-us/download/details.aspx?id=34774) - Microsoft patterns and practices CQRS Journey guidebook

- [Domain Centric - EventStoreDB vs Kafka](https://domaincentric.net/blog/eventstoredb-vs-kafka) - Detailed comparison of EventStoreDB and Kafka

- [AWS - Event Sourcing on AWS](https://aws-samples.github.io/eda-on-aws/patterns/event-sourcing/) - AWS-based Event Sourcing architecture patterns

- [Azure Architecture Center - Saga Design Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/saga) - Microsoft's official Saga design pattern documentation

현재 단락 (1/1153)

One of the biggest challenges in microservice architecture is managing data consistency and business...

작성 글자: 0원문 글자: 38,610작성 단락: 0/1153