Skip to content
Published on

Event-Driven Architecture Practical Guide: CQRS, Event Sourcing, Saga Patterns

Authors
Event-Driven Architecture - CQRS, Event Sourcing, Saga

Introduction

One of the biggest challenges in microservice architecture is managing data consistency and business transactions across multiple services. In a traditional monolith, this could be solved with a single database's ACID transactions, but in distributed environments where each service has independent data stores, the limitations of 2PC (Two-Phase Commit) are clear. Performance bottlenecks, reduced availability, and strong coupling between services result.

Event-Driven Architecture (EDA) offers a fundamental solution to this problem. It transitions inter-service communication to asynchronous event-based messaging, manages state changes as event streams, and handles distributed transactions with compensation-based sagas. This article deeply analyzes the three core EDA patterns -- CQRS, Event Sourcing, and Saga -- with production code, and covers strategies and troubleshooting techniques needed for production operations.

Netflix uses these patterns for its personalization system serving 260 million subscribers, LinkedIn for processing trillions of daily events, and Slack for handling billions of daily messages. Let us explore the patterns and anti-patterns validated from the experience of these large-scale systems.

Event-Driven Architecture Fundamentals

Three Types of Events

In EDA, events are classified into three types based on their purpose.

TypeDescriptionExampleCharacteristics
Domain EventA fact recorded from the business domainOrderPlaced, PaymentCompletedImmutable, past-tense naming
Integration EventEvents for inter-service communicationOrderPlacedIntegrationEventCrosses bounded context boundaries
Event NotificationChange notification (minimal data)OrderStatusChanged (ID only)Receiver queries needed data directly

Core Principles

The core principles for correctly implementing EDA are as follows.

  1. Asynchronous Communication: Producers and consumers are temporally decoupled
  2. Loose Coupling: Services only need to know the event schema, not the implementation of other services
  3. Eventual Consistency: Instead of immediate strong consistency, consistency is guaranteed after a certain period
  4. Idempotency: Processing the same event multiple times must produce the same result
// TypeScript - Domain Event basic structure
interface DomainEvent {
  eventId: string
  eventType: string
  aggregateId: string
  aggregateType: string
  timestamp: Date
  version: number
  payload: Record<string, unknown>
  metadata: {
    correlationId: string
    causationId: string
    userId?: string
  }
}

// Order creation event example
const orderPlacedEvent: DomainEvent = {
  eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
  eventType: 'OrderPlaced',
  aggregateId: 'order-12345',
  aggregateType: 'Order',
  timestamp: new Date('2026-03-14T09:00:00Z'),
  version: 1,
  payload: {
    customerId: 'cust-67890',
    items: [
      { productId: 'prod-001', quantity: 2, price: 29900 },
      { productId: 'prod-002', quantity: 1, price: 15000 },
    ],
    totalAmount: 74800,
    shippingAddress: {
      city: 'Seoul',
      district: 'Gangnam-gu',
      detail: 'Teheran-ro 123',
    },
  },
  metadata: {
    correlationId: 'corr-abc123',
    causationId: 'cmd-place-order-001',
    userId: 'user-admin-01',
  },
}

CQRS Pattern: Separating Commands and Queries

What Is CQRS?

CQRS (Command Query Responsibility Segregation) is a pattern that separates data writes (Commands) and reads (Queries) into separate models. It extends Bertrand Meyer's CQS (Command Query Separation) principle to the architectural level, formalized by Greg Young in 2010.

In the traditional CRUD model, the same data model handles both reads and writes. However, in real business scenarios, read and write requirements differ significantly.

AspectCommand (Write)Query (Read)
PurposeState change, business rule validationData retrieval, display
Ratio10-20% of total traffic80-90% of total traffic
ConsistencyStrong consistency neededEventual consistency acceptable
Model ComplexityRich domain logicDenormalized read models
ScalingPrimarily vertical scalingEasy horizontal scaling (cache, replicas)

CQRS Implementation: TypeScript Example

// TypeScript - CQRS Command Side

// Command definition
interface PlaceOrderCommand {
  type: 'PlaceOrder'
  customerId: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
  shippingAddress: string
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private orderRepository: OrderWriteRepository,
    private eventBus: EventBus,
    private inventoryService: InventoryService
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // 1. Business rule validation
    await this.inventoryService.validateStock(command.items)

    // 2. Aggregate creation
    const order = Order.create({
      customerId: command.customerId,
      items: command.items,
      shippingAddress: command.shippingAddress,
    })

    // 3. Save to write store
    await this.orderRepository.save(order)

    // 4. Publish domain events (for read model sync)
    for (const event of order.getDomainEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }
}

// Query Side - Read-only model
interface OrderReadModel {
  orderId: string
  customerName: string
  orderDate: string
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: string
}

// Query Handler
class GetOrdersQueryHandler {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: {
    customerId: string
    status?: string
    page: number
    limit: number
  }): Promise<OrderReadModel[]> {
    // Query directly from denormalized read-only table
    return this.readDb.query(
      `SELECT order_id, customer_name, order_date, status,
              total_amount, item_count, last_updated
       FROM order_read_model
       WHERE customer_id = ?
       ${query.status ? 'AND status = ?' : ''}
       ORDER BY order_date DESC
       LIMIT ? OFFSET ?`,
      [query.customerId, query.status, query.limit, query.page * query.limit]
    )
  }
}

Read Model Projection

The projection logic that receives events and updates the read model is the core of CQRS.

// TypeScript - Event-based Projection

class OrderProjection {
  constructor(private readDb: ReadDatabase) {}

  async handle(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.onOrderPlaced(event)
        break
      case 'OrderShipped':
        await this.onOrderShipped(event)
        break
      case 'OrderCancelled':
        await this.onOrderCancelled(event)
        break
    }
  }

  private async onOrderPlaced(event: DomainEvent): Promise<void> {
    const payload = event.payload as {
      customerId: string
      items: Array<{ quantity: number; price: number }>
      totalAmount: number
    }

    await this.readDb.upsert('order_read_model', {
      order_id: event.aggregateId,
      customer_id: payload.customerId,
      status: 'PLACED',
      total_amount: payload.totalAmount,
      item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
      order_date: event.timestamp,
      last_updated: event.timestamp,
      version: event.version,
    })
  }

  private async onOrderShipped(event: DomainEvent): Promise<void> {
    const payload = event.payload as { trackingNumber: string }

    // Idempotency guarantee: version check
    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'SHIPPED',
        tracking_number: payload.trackingNumber,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId, version: event.version - 1 }
    )
  }

  private async onOrderCancelled(event: DomainEvent): Promise<void> {
    const payload = event.payload as { reason: string }

    await this.readDb.updateWhere(
      'order_read_model',
      {
        status: 'CANCELLED',
        cancellation_reason: payload.reason,
        last_updated: event.timestamp,
        version: event.version,
      },
      { order_id: event.aggregateId }
    )
  }
}

Event Sourcing: Event-Based State Management

Core Concept of Event Sourcing

Event Sourcing is a pattern that stores all events that changed the state of an aggregate in order, instead of storing the current state. The current state is reconstructed by replaying the event stream from the beginning.

Comparing the traditional approach with Event Sourcing:

AspectTraditional CRUDEvent Sourcing
What is storedCurrent state (latest snapshot)Complete history of state changes
Data lossPrevious states are lostAll change history is preserved
AuditRequires separate implementationBuilt-in
DebuggingOnly current state viewableTime Travel possible
Storage spaceRelatively smallIncreases with event accumulation
Query perfDirect queries possibleRequires projection or snapshots

Event Sourcing Implementation

// TypeScript - Event Sourcing Aggregate

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  // Apply event (state change)
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version++
    this.uncommittedEvents.push(event)
  }

  // Event handler (implemented by subclass)
  protected abstract when(event: DomainEvent): void

  // Restore state from event stream
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

// Order Aggregate
class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _items: OrderItem[] = []
  private _status: OrderStatus = OrderStatus.DRAFT
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  // Factory method (create command)
  static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
    const order = new Order()
    const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderPlaced',
      aggregateId: params.orderId,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: 1,
      payload: {
        customerId: params.customerId,
        items: params.items,
        totalAmount,
      },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'create',
      },
    })

    return order
  }

  // Confirm order command
  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error(`Cannot confirm order in status: ${this._status}`)
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { confirmedAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'confirm',
      },
    })
  }

  // Cancel order command (used in compensating transactions)
  cancel(reason: string): void {
    if (this._status === OrderStatus.CANCELLED) {
      return // Idempotency guarantee
    }

    this.apply({
      eventId: crypto.randomUUID(),
      eventType: 'OrderCancelled',
      aggregateId: this._id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version + 1,
      payload: { reason, cancelledAt: new Date().toISOString() },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: 'cancel',
      },
    })
  }

  // Event handler - state change logic
  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced': {
        const p = event.payload as {
          customerId: string
          items: OrderItem[]
          totalAmount: number
        }
        this._id = event.aggregateId
        this._customerId = p.customerId
        this._items = p.items
        this._totalAmount = p.totalAmount
        this._status = OrderStatus.PLACED
        break
      }
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

interface OrderItem {
  productId: string
  quantity: number
  price: number
}

Snapshot Strategy

As the number of events grows, replay time increases. Snapshots cache state at specific points to improve replay performance.

# Python - Snapshot-based Event Store

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json


@dataclass
class Snapshot:
    aggregate_id: str
    aggregate_type: str
    version: int
    state: dict
    created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
    SNAPSHOT_INTERVAL = 100  # Create snapshot every 100 events

    def __init__(self, db_connection):
        self.db = db_connection

    async def save_events(
        self, aggregate_id: str, events: list[dict], expected_version: int
    ) -> None:
        """Save events with optimistic concurrency control"""
        async with self.db.transaction():
            # Check current version (optimistic locking)
            current_version = await self._get_current_version(aggregate_id)
            if current_version != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, "
                    f"but current version is {current_version}"
                )

            # Batch insert events
            for i, event in enumerate(events):
                version = expected_version + i + 1
                await self.db.execute(
                    """INSERT INTO event_store
                       (event_id, aggregate_id, aggregate_type,
                        event_type, version, payload, metadata, created_at)
                       VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                    (
                        event["event_id"],
                        aggregate_id,
                        event["aggregate_type"],
                        event["event_type"],
                        version,
                        json.dumps(event["payload"]),
                        json.dumps(event["metadata"]),
                        datetime.utcnow(),
                    ),
                )

            # Check snapshot creation condition
            new_version = expected_version + len(events)
            if new_version % self.SNAPSHOT_INTERVAL == 0:
                await self._create_snapshot(aggregate_id, new_version)

    async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
        """Load events from snapshot to restore state"""
        # 1. Query most recent snapshot
        snapshot = await self._get_latest_snapshot(aggregate_id)

        if snapshot:
            # 2. Load only events after snapshot
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ? AND version > ?
                   ORDER BY version ASC""",
                (aggregate_id, snapshot.version),
            )
            return events, snapshot
        else:
            # 3. Load all events if no snapshot
            events = await self.db.fetch_all(
                """SELECT * FROM event_store
                   WHERE aggregate_id = ?
                   ORDER BY version ASC""",
                (aggregate_id,),
            )
            return events, None

    async def _create_snapshot(
        self, aggregate_id: str, version: int
    ) -> None:
        """Save current state as snapshot"""
        events, _ = await self.load_aggregate(aggregate_id)
        # Rebuild aggregate and serialize state
        aggregate = self._rebuild_aggregate(events)
        await self.db.execute(
            """INSERT INTO snapshots
               (aggregate_id, aggregate_type, version, state, created_at)
               VALUES (?, ?, ?, ?, ?)""",
            (
                aggregate_id,
                aggregate.aggregate_type,
                version,
                json.dumps(aggregate.to_dict()),
                datetime.utcnow(),
            ),
        )

    async def _get_current_version(self, aggregate_id: str) -> int:
        result = await self.db.fetch_one(
            "SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
            (aggregate_id,),
        )
        return result[0] if result[0] else 0


class ConcurrencyError(Exception):
    pass

Saga Pattern: Distributed Transaction Management

What Is the Saga Pattern?

The Saga pattern manages business transactions spanning multiple services in distributed environments. Unlike traditional distributed transactions (2PC), it sequentially executes each service's local transactions and, upon failure, executes compensating transactions for already completed steps to restore consistency.

An example of an order processing Saga flow:

Normal flow:

  1. Order Service: Create order (OrderCreated)
  2. Payment Service: Process payment (PaymentProcessed)
  3. Inventory Service: Reserve inventory (InventoryReserved)
  4. Shipping Service: Create shipment (ShipmentCreated)

Compensation flow on failure (when step 3 inventory reservation fails):

  1. Payment Service: Refund payment (PaymentRefunded) -- compensation
  2. Order Service: Cancel order (OrderCancelled) -- compensation

Orchestration-based Saga Implementation

// TypeScript - Saga Orchestrator (Order Processing)

interface SagaStep {
  name: string
  action: () => Promise<void>
  compensation: () => Promise<void>
}

class OrderSagaOrchestrator {
  private completedSteps: SagaStep[] = []
  private sagaLog: SagaLogEntry[] = []

  constructor(
    private paymentService: PaymentService,
    private inventoryService: InventoryService,
    private shippingService: ShippingService,
    private sagaStore: SagaStore
  ) {}

  async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
    const sagaId = crypto.randomUUID()

    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        action: async () => {
          await this.paymentService.processPayment({
            orderId,
            amount: orderData.totalAmount,
            customerId: orderData.customerId,
          })
        },
        compensation: async () => {
          await this.paymentService.refundPayment({
            orderId,
            amount: orderData.totalAmount,
          })
        },
      },
      {
        name: 'ReserveInventory',
        action: async () => {
          await this.inventoryService.reserve({
            orderId,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.inventoryService.releaseReservation({
            orderId,
            items: orderData.items,
          })
        },
      },
      {
        name: 'CreateShipment',
        action: async () => {
          await this.shippingService.createShipment({
            orderId,
            address: orderData.shippingAddress,
            items: orderData.items,
          })
        },
        compensation: async () => {
          await this.shippingService.cancelShipment({ orderId })
        },
      },
    ]

    try {
      for (const step of steps) {
        await this.logStep(sagaId, step.name, 'STARTED')

        try {
          await step.action()
          this.completedSteps.push(step)
          await this.logStep(sagaId, step.name, 'COMPLETED')
        } catch (error) {
          await this.logStep(sagaId, step.name, 'FAILED', error)
          // Execute compensating transactions
          await this.compensate(sagaId)
          return {
            success: false,
            sagaId,
            failedStep: step.name,
            error: (error as Error).message,
          }
        }
      }

      await this.sagaStore.markCompleted(sagaId)
      return { success: true, sagaId }
    } catch (compensationError) {
      // When compensation transaction also fails - manual intervention needed
      await this.sagaStore.markRequiresIntervention(sagaId)
      throw new SagaCompensationFailedError(sagaId, compensationError as Error)
    }
  }

  private async compensate(sagaId: string): Promise<void> {
    // Compensate completed steps in reverse order
    const stepsToCompensate = [...this.completedSteps].reverse()

    for (const step of stepsToCompensate) {
      try {
        await this.logStep(sagaId, step.name, 'COMPENSATING')
        await step.compensation()
        await this.logStep(sagaId, step.name, 'COMPENSATED')
      } catch (error) {
        await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
        // Register in retry queue on compensation failure
        await this.sagaStore.enqueueRetry(sagaId, step.name)
        throw error
      }
    }
  }

  private async logStep(
    sagaId: string,
    stepName: string,
    status: string,
    error?: unknown
  ): Promise<void> {
    const entry: SagaLogEntry = {
      sagaId,
      stepName,
      status,
      timestamp: new Date(),
      error: error ? (error as Error).message : undefined,
    }
    this.sagaLog.push(entry)
    await this.sagaStore.appendLog(entry)
  }
}

interface SagaResult {
  success: boolean
  sagaId: string
  failedStep?: string
  error?: string
}

interface SagaLogEntry {
  sagaId: string
  stepName: string
  status: string
  timestamp: Date
  error?: string
}

Choreography vs Orchestration Comparison

A detailed comparison of the two Saga pattern implementation approaches.

ComparisonChoreographyOrchestration
Control methodDistributed - each service publishes/subscribes eventsCentralized - orchestrator controls the flow
CouplingLow (only event schema shared)Medium (orchestrator must know all services)
VisibilityLow (hard to trace flow)High (state viewable in orchestrator)
Complexity mgmtRapidly complex as services increaseIncreases linearly
SPOFNoneOrchestrator can be SPOF
CompensationDistributed across servicesCentralized in orchestrator
TestingIntegration testing difficultOrchestrator unit testing easy
Suitable scaleSimple workflows with 2-4 servicesComplex workflows with 5+ services
Representative toolsKafka, RabbitMQ, SNS/SQSTemporal, Camunda, AWS Step Functions

Choreography Pattern Code Example

# Python - Choreography-based Saga (event subscription approach)

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json


class EventBus:
    """Simple in-memory event bus (use Kafka/RabbitMQ in production)"""

    def __init__(self):
        self._handlers: dict[str, list] = {}

    def subscribe(self, event_type: str, handler):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: dict):
        handlers = self._handlers.get(event_type, [])
        for handler in handlers:
            await handler(payload)


# Payment Service - Choreography approach
class PaymentService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # Subscribe to order creation event
        event_bus.subscribe("OrderCreated", self.on_order_created)
        # Subscribe to inventory failure event (for compensation)
        event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)

    async def on_order_created(self, payload: dict):
        """Process payment on order creation"""
        try:
            order_id = payload["order_id"]
            amount = payload["total_amount"]

            # Payment processing logic
            payment_result = await self._process_payment(
                order_id, amount
            )

            # Publish success event
            await self.event_bus.publish("PaymentProcessed", {
                "order_id": order_id,
                "payment_id": payment_result["payment_id"],
                "amount": amount,
            })
        except Exception as e:
            # Publish failure event
            await self.event_bus.publish("PaymentFailed", {
                "order_id": payload["order_id"],
                "reason": str(e),
            })

    async def on_inventory_failed(self, payload: dict):
        """Refund payment on inventory failure (compensating transaction)"""
        order_id = payload["order_id"]
        await self._refund_payment(order_id)
        await self.event_bus.publish("PaymentRefunded", {
            "order_id": order_id,
        })

    async def _process_payment(self, order_id: str, amount: int) -> dict:
        # Actual payment processing logic
        return {"payment_id": f"pay-{order_id}"}

    async def _refund_payment(self, order_id: str) -> None:
        # Actual refund processing logic
        pass


# Inventory Service - Choreography approach
class InventoryService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # Subscribe to payment completion event
        event_bus.subscribe("PaymentProcessed", self.on_payment_processed)

    async def on_payment_processed(self, payload: dict):
        """Reserve inventory on payment completion"""
        try:
            order_id = payload["order_id"]
            # Check and reserve inventory
            await self._reserve_inventory(order_id)

            await self.event_bus.publish("InventoryReserved", {
                "order_id": order_id,
            })
        except InsufficientStockError:
            await self.event_bus.publish("InventoryReservationFailed", {
                "order_id": payload["order_id"],
                "reason": "insufficient_stock",
            })


class InsufficientStockError(Exception):
    pass

Hybrid Approach: When to Choose Which

In practice, rather than using pure Choreography or Orchestration alone, mixing them based on workflow complexity is effective.

  • Choose Choreography: Simple notifications, cache invalidation, log collection with 2-3 participating services
  • Choose Orchestration: Core business flows like payment-inventory-shipping where order and compensation matter
  • Hybrid: Core business flows via Orchestration, side effects (email, notifications, analytics events) via Choreography

Event Store Selection (EventStoreDB, Kafka, DynamoDB)

Event store selection varies based on system requirements and team capabilities. Here is a comparison of the three main options.

ComparisonEventStoreDBApache KafkaDynamoDB Streams
Design purposeDedicated Event Sourcing DBDistributed message streaming platformGeneral NoSQL + change streams
Stream modelFine-grained individual streams (per Aggregate)Topic-partition basedTable + DynamoDB Streams/Kinesis
ConcurrencyExpectedVersion (optimistic locking) built-inPartition-level ordering onlyConditional writes (ConditionExpression)
ID lookupInstant lookup by stream IDCannot look up specific entity in topicDirect lookup by partition key
Event orderingFully guaranteed within streamGuaranteed only within partitionGuaranteed within partition key
ProjectionsServer-side projections built-inKafka Streams / ksqlDBLambda + DynamoDB Streams
SubscriptionPersistent / Catch-up subscriptionsConsumer GroupDynamoDB Streams / Kinesis
Ops complexityMedium (dedicated cluster)High (ZooKeeper/KRaft, partition mgmt)Low (serverless, AWS managed)
Cost modelOpen source + commercial cloudOpen source + MSK/Confluent CloudPay-per-request/storage
Best fitPure Event Sourcing systemsHigh-volume event streaming + integrationAWS native, serverless architecture

Selection Guide

When EventStoreDB is suitable:

  • Event Sourcing is the core architectural pattern
  • Fine-grained stream management and projections are needed
  • Team actively uses DDD-based design

When Kafka is suitable:

  • High-volume event streaming is the main purpose
  • Kafka infrastructure already exists and team expertise is sufficient
  • Both Event Sourcing and event streaming are needed (consider EventStoreDB + Kafka combination)

When DynamoDB is suitable:

  • Primarily using the AWS ecosystem
  • Targeting serverless architecture
  • Minimizing operational burden

EventStoreDB Configuration Example

# docker-compose.yml - EventStoreDB cluster setup
version: '3.8'
services:
  eventstoredb:
    image: eventstore/eventstore:24.2
    container_name: eventstoredb
    environment:
      - EVENTSTORE_CLUSTER_SIZE=1
      - EVENTSTORE_RUN_PROJECTIONS=All
      - EVENTSTORE_START_STANDARD_PROJECTIONS=true
      - EVENTSTORE_INSECURE=true
      - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
      - EVENTSTORE_MEM_DB=false
      - EVENTSTORE_DB=/var/lib/eventstore-data
      - EVENTSTORE_INDEX=/var/lib/eventstore-index
      - EVENTSTORE_LOG=/var/log/eventstore
    ports:
      - '2113:2113' # HTTP/gRPC
      - '1113:1113' # TCP (legacy)
    volumes:
      - eventstore-data:/var/lib/eventstore-data
      - eventstore-index:/var/lib/eventstore-index
      - eventstore-logs:/var/log/eventstore

  # Kafka - for event streaming (combined with EventStoreDB)
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    container_name: kafka
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    ports:
      - '9092:9092'
    volumes:
      - kafka-data:/var/lib/kafka/data

volumes:
  eventstore-data:
  eventstore-index:
  eventstore-logs:
  kafka-data:

Production Implementation: Order System Example

Let us examine a production implementation that integrates CQRS, Event Sourcing, and Saga patterns through an e-commerce order system.

System Architecture Overview

The overall system consists of the following structure:

  • Order Service: Order Aggregate (Event Sourcing), CQRS applied
  • Payment Service: Payment processing, refund compensating transaction
  • Inventory Service: Inventory reservation, release compensating transaction
  • Shipping Service: Shipment creation, cancellation compensating transaction
  • Saga Orchestrator: Temporal-based workflow management

Temporal-based Saga Workflow

// TypeScript - Order Saga implemented with Temporal Workflow

import { proxyActivities, defineSignal, setHandler, condition, sleep } from '@temporalio/workflow'

// Activity proxies
const payment = proxyActivities<PaymentActivities>({
  startToCloseTimeout: '30s',
  retry: {
    maximumAttempts: 3,
    initialInterval: '1s',
    backoffCoefficient: 2,
    maximumInterval: '30s',
  },
})

const inventory = proxyActivities<InventoryActivities>({
  startToCloseTimeout: '10s',
  retry: { maximumAttempts: 3 },
})

const shipping = proxyActivities<ShippingActivities>({
  startToCloseTimeout: '15s',
  retry: { maximumAttempts: 3 },
})

const notification = proxyActivities<NotificationActivities>({
  startToCloseTimeout: '5s',
  retry: { maximumAttempts: 5 },
})

// Signal definition (send messages to workflow from outside)
const cancelOrderSignal = defineSignal<[string]>('cancelOrder')

// Order Processing Saga Workflow
export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {
  let isCancelled = false
  let cancelReason = ''

  // Cancel signal handler
  setHandler(cancelOrderSignal, (reason: string) => {
    isCancelled = true
    cancelReason = reason
  })

  const compensations: Array<() => Promise<void>> = []

  try {
    // Step 1: Process payment
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const paymentResult = await payment.processPayment({
      orderId: input.orderId,
      amount: input.totalAmount,
      customerId: input.customerId,
    })

    compensations.push(async () => {
      await payment.refundPayment({
        orderId: input.orderId,
        paymentId: paymentResult.paymentId,
        amount: input.totalAmount,
      })
    })

    // Step 2: Reserve inventory
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    await inventory.reserveInventory({
      orderId: input.orderId,
      items: input.items,
    })

    compensations.push(async () => {
      await inventory.releaseReservation({
        orderId: input.orderId,
        items: input.items,
      })
    })

    // Step 3: Create shipment
    if (isCancelled) throw new SagaCancelledError(cancelReason)

    const shipmentResult = await shipping.createShipment({
      orderId: input.orderId,
      address: input.shippingAddress,
      items: input.items,
    })

    compensations.push(async () => {
      await shipping.cancelShipment({
        orderId: input.orderId,
        shipmentId: shipmentResult.shipmentId,
      })
    })

    // All steps succeeded - send confirmation notification (Saga unaffected by failure)
    await notification
      .sendOrderConfirmation({
        orderId: input.orderId,
        customerId: input.customerId,
      })
      .catch(() => {
        /* Notification failure is ignored */
      })

    return {
      success: true,
      orderId: input.orderId,
      paymentId: paymentResult.paymentId,
      shipmentId: shipmentResult.shipmentId,
    }
  } catch (error) {
    // Execute compensating transactions (reverse order)
    for (const compensate of compensations.reverse()) {
      try {
        await compensate()
      } catch (compError) {
        // Temporal automatically manages retries
        // Final failure sends to Dead Letter Queue
        console.error('Compensation failed:', compError)
      }
    }

    return {
      success: false,
      orderId: input.orderId,
      error: (error as Error).message,
    }
  }
}

interface OrderSagaInput {
  orderId: string
  customerId: string
  totalAmount: number
  shippingAddress: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
}

interface OrderSagaResult {
  success: boolean
  orderId: string
  paymentId?: string
  shipmentId?: string
  error?: string
}

class SagaCancelledError extends Error {
  constructor(reason: string) {
    super(`Saga cancelled: ${reason}`)
    this.name = 'SagaCancelledError'
  }
}

DynamoDB-based Event Store Schema

# AWS CloudFormation - DynamoDB Event Store Table

AWSTemplateFormatVersion: '2010-09-09'
Description: Event Store on DynamoDB

Resources:
  EventStoreTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
        - AttributeName: eventType
          AttributeType: S
        - AttributeName: timestamp
          AttributeType: S
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      GlobalSecondaryIndexes:
        - IndexName: eventType-timestamp-index
          KeySchema:
            - AttributeName: eventType
              KeyType: HASH
            - AttributeName: timestamp
              KeyType: RANGE
          Projection:
            ProjectionType: ALL
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
      PointInTimeRecoverySpecification:
        PointInTimeRecoveryEnabled: true
      Tags:
        - Key: Environment
          Value: production
        - Key: Service
          Value: event-store

  SnapshotTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: event-store-snapshots
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: aggregateId
          AttributeType: S
        - AttributeName: version
          AttributeType: N
      KeySchema:
        - AttributeName: aggregateId
          KeyType: HASH
        - AttributeName: version
          KeyType: RANGE
      Tags:
        - Key: Environment
          Value: production

  SagaStateTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: saga-state
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: sagaId
          AttributeType: S
        - AttributeName: createdAt
          AttributeType: S
      KeySchema:
        - AttributeName: sagaId
          KeyType: HASH
      GlobalSecondaryIndexes:
        - IndexName: createdAt-index
          KeySchema:
            - AttributeName: createdAt
              KeyType: HASH
          Projection:
            ProjectionType: ALL
      TimeToLiveSpecification:
        AttributeName: ttl
        Enabled: true

Operational Considerations and Troubleshooting

1. Ensuring Idempotency

In distributed environments, events have "at-least-once" delivery guarantees, so the same event may be processed multiple times. All event handlers must guarantee idempotency.

// TypeScript - Idempotency guarantee pattern

class IdempotentEventHandler {
  constructor(
    private processedEvents: ProcessedEventStore,
    private handler: EventHandler
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    // Check if event was already processed
    const isProcessed = await this.processedEvents.exists(event.eventId)
    if (isProcessed) {
      console.log(`Event ${event.eventId} already processed, skipping`)
      return
    }

    try {
      // Process event
      await this.handler.handle(event)

      // Record completion (auto-cleanup with TTL)
      await this.processedEvents.markAsProcessed(event.eventId, {
        processedAt: new Date(),
        ttl: 60 * 60 * 24 * 7, // Expire after 7 days
      })
    } catch (error) {
      // Not recorded on failure so retry is possible
      throw error
    }
  }
}

// Redis-based duplicate detection
class RedisProcessedEventStore implements ProcessedEventStore {
  constructor(private redis: Redis) {}

  async exists(eventId: string): Promise<boolean> {
    const result = await this.redis.exists(`processed:${eventId}`)
    return result === 1
  }

  async markAsProcessed(
    eventId: string,
    options: { processedAt: Date; ttl: number }
  ): Promise<void> {
    await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
  }
}

2. Ensuring Event Ordering

To guarantee event ordering in Kafka, events from the same Aggregate must be routed to the same partition. Using the Aggregate ID as the partition key is standard practice.

3. Schema Evolution

Since events are immutable, schema changes must maintain backward compatibility. Use a schema registry like Avro or Protobuf, or apply the upcasting pattern.

// TypeScript - Event Upcaster (Schema Evolution)

class EventUpcaster {
  private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()

  register(
    eventType: string,
    fromVersion: number,
    upcaster: (event: DomainEvent) => DomainEvent
  ): void {
    if (!this.upcasters.has(eventType)) {
      this.upcasters.set(eventType, new Map())
    }
    this.upcasters.get(eventType)!.set(fromVersion, upcaster)
  }

  upcast(event: DomainEvent): DomainEvent {
    const typeUpcasters = this.upcasters.get(event.eventType)
    if (!typeUpcasters) return event

    let currentEvent = event
    let schemaVersion = (event.metadata as any).schemaVersion || 1

    while (typeUpcasters.has(schemaVersion)) {
      const upcasterFn = typeUpcasters.get(schemaVersion)!
      currentEvent = upcasterFn(currentEvent)
      schemaVersion++
    }

    return currentEvent
  }
}

// Usage example: OrderPlaced event v1 -> v2 upcasting
const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => {
  // v1 had shippingAddress as a string
  // v2 changed to a structured object
  const payload = event.payload as any
  return {
    ...event,
    payload: {
      ...payload,
      shippingAddress: {
        full: payload.shippingAddress,
        city: '',
        zipCode: '',
      },
    },
    metadata: {
      ...event.metadata,
      schemaVersion: 2,
    },
  }
})

4. Key Monitoring Metrics

The following metrics should be monitored to check the health of an EDA system.

MetricDescriptionAlert Threshold
Consumer LagConsumer processing delay (unprocessed events)Over 1,000 events
Event Processing LatencyTime from event publication to processingP99 over 5 seconds
Saga Completion RateSaga success rateBelow 99%
Compensation Failure RateCompensating transaction failure rateOver 0.1%
Projection LagSync delay between read and write modelsOver 30 seconds
Dead Letter Queue SizeNumber of unprocessable eventsImmediate alert when > 0

Failure Cases and Recovery Procedures

Case 1: Compensation Transaction Failure (Most Dangerous Scenario)

Problem: Payment was completed but inventory reservation failed. The compensation (refund) was attempted but the refund also failed due to payment gateway timeout.

Recovery procedure:

  1. Mark Saga state as "REQUIRES_INTERVENTION"
  2. Register failed compensation transaction in Dead Letter Queue
  3. Automatic retry (exponential backoff): 1s, 2s, 4s, 8s, 16s
  4. Alert ops team via PagerDuty/Slack when max retries exceeded
  5. Operator manually processes refund via payment gateway console
  6. Update Saga state to "MANUALLY_COMPENSATED"

Case 2: Event Order Reversal

Problem: Due to network delay, OrderCancelled event arrives before OrderConfirmed

Prevention and recovery:

  • Include version field in events for order verification
  • Buffer and re-sort when version does not match expectations
  • Using EventStoreDB prevents this issue entirely since ordering within streams is guaranteed

Case 3: Read Model Inconsistency Due to Projection Failure

Problem: Projection process crashes and the read model does not reflect the latest state

Recovery procedure:

  1. Check the projection process's last checkpoint
  2. Re-run projection from events after the checkpoint
  3. For severe inconsistency, drop the read model and replay all events
  4. During replay, redirect read traffic to cache or write DB
# Python - Projection recovery script

import asyncio
from datetime import datetime


class ProjectionRecovery:
    def __init__(self, event_store, read_db, projection):
        self.event_store = event_store
        self.read_db = read_db
        self.projection = projection

    async def recover_from_checkpoint(self) -> dict:
        """Checkpoint-based projection recovery"""
        # 1. Check last checkpoint
        checkpoint = await self.read_db.get_checkpoint(
            self.projection.name
        )
        last_position = checkpoint.get("position", 0) if checkpoint else 0

        print(
            f"Recovering projection '{self.projection.name}' "
            f"from position {last_position}"
        )

        # 2. Load events after checkpoint
        events = await self.event_store.read_all_from(last_position)
        processed = 0
        errors = 0

        for event in events:
            try:
                await self.projection.handle(event)
                processed += 1

                # Update checkpoint every 100 events
                if processed % 100 == 0:
                    await self.read_db.save_checkpoint(
                        self.projection.name,
                        {"position": event["global_position"]},
                    )
            except Exception as e:
                errors += 1
                print(
                    f"Error processing event "
                    f"{event['event_id']}: {e}"
                )
                # Continue after logging error (skip & log)
                continue

        # 3. Save final checkpoint
        if events:
            await self.read_db.save_checkpoint(
                self.projection.name,
                {"position": events[-1]["global_position"]},
            )

        return {
            "projection": self.projection.name,
            "processed": processed,
            "errors": errors,
            "recovered_at": datetime.utcnow().isoformat(),
        }

    async def full_rebuild(self) -> dict:
        """Full rebuild of read model"""
        print(
            f"Full rebuild of projection '{self.projection.name}'"
        )

        # 1. Delete existing read model
        await self.read_db.drop_projection_data(self.projection.name)

        # 2. Reset checkpoint
        await self.read_db.save_checkpoint(
            self.projection.name, {"position": 0}
        )

        # 3. Replay all events
        return await self.recover_from_checkpoint()

Case 4: Event Store Disk Shortage

Problem: Events accumulate continuously and disk capacity becomes insufficient

Prevention strategy:

  • After creating snapshots, move old events to archive (S3/Glacier)
  • Establish event retention policy: hot data (30 days), warm data (1 year), cold data (permanent archive)
  • Set disk usage alerts at 80% warning, 90% critical

Conclusion

The three core patterns of Event-Driven Architecture -- CQRS, Event Sourcing, and Saga -- are each powerful independently but fundamentally solve data consistency problems in microservice architecture when used together.

Key takeaways:

  1. CQRS: Acknowledge and separate the asymmetric requirements of reads and writes. You can independently optimize read traffic that comprises 80-90% of total traffic.

  2. Event Sourcing: Store change history instead of current state. This enables complete audit trails, time-travel debugging, and diverse read model generation. However, snapshot strategies and schema evolution strategies must be considered from the start.

  3. Saga Pattern: Manage distributed transactions with compensation-based approaches. Implement simple flows with Choreography and complex business logic with Orchestration, but always prepare recovery strategies for compensation transaction failures.

  4. Event Store Selection: Consider EventStoreDB for pure Event Sourcing, Kafka for high-volume streaming, and DynamoDB for AWS serverless, with combinations possible based on requirements.

These patterns are powerful but come with the cost of complexity. Rather than applying them uniformly to all services, the realistic strategy is to introduce them incrementally starting with core domains where business complexity is high and data consistency is critical. Adopting these patterns without operational monitoring, idempotency guarantees, and schema evolution strategies can actually harm system stability.

References