- Published on
Event-Driven Architecture Practical Guide: CQRS, Event Sourcing, Saga Patterns
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- Introduction
- Event-Driven Architecture Fundamentals
- CQRS Pattern: Separating Commands and Queries
- Event Sourcing: Event-Based State Management
- Saga Pattern: Distributed Transaction Management
- Choreography vs Orchestration Comparison
- Event Store Selection (EventStoreDB, Kafka, DynamoDB)
- Production Implementation: Order System Example
- Operational Considerations and Troubleshooting
- Failure Cases and Recovery Procedures
- Conclusion
- References

Introduction
One of the biggest challenges in microservice architecture is managing data consistency and business transactions across multiple services. In a traditional monolith, this could be solved with a single database's ACID transactions, but in distributed environments where each service has independent data stores, the limitations of 2PC (Two-Phase Commit) are clear. Performance bottlenecks, reduced availability, and strong coupling between services result.
Event-Driven Architecture (EDA) offers a fundamental solution to this problem. It transitions inter-service communication to asynchronous event-based messaging, manages state changes as event streams, and handles distributed transactions with compensation-based sagas. This article deeply analyzes the three core EDA patterns -- CQRS, Event Sourcing, and Saga -- with production code, and covers strategies and troubleshooting techniques needed for production operations.
Netflix uses these patterns for its personalization system serving 260 million subscribers, LinkedIn for processing trillions of daily events, and Slack for handling billions of daily messages. Let us explore the patterns and anti-patterns validated from the experience of these large-scale systems.
Event-Driven Architecture Fundamentals
Three Types of Events
In EDA, events are classified into three types based on their purpose.
| Type | Description | Example | Characteristics |
|---|---|---|---|
| Domain Event | A fact recorded from the business domain | OrderPlaced, PaymentCompleted | Immutable, past-tense naming |
| Integration Event | Events for inter-service communication | OrderPlacedIntegrationEvent | Crosses bounded context boundaries |
| Event Notification | Change notification (minimal data) | OrderStatusChanged (ID only) | Receiver queries needed data directly |
Core Principles
The core principles for correctly implementing EDA are as follows.
- Asynchronous Communication: Producers and consumers are temporally decoupled
- Loose Coupling: Services only need to know the event schema, not the implementation of other services
- Eventual Consistency: Instead of immediate strong consistency, consistency is guaranteed after a certain period
- Idempotency: Processing the same event multiple times must produce the same result
// TypeScript - Domain Event basic structure
interface DomainEvent {
eventId: string
eventType: string
aggregateId: string
aggregateType: string
timestamp: Date
version: number
payload: Record<string, unknown>
metadata: {
correlationId: string
causationId: string
userId?: string
}
}
// Order creation event example
const orderPlacedEvent: DomainEvent = {
eventId: 'evt-550e8400-e29b-41d4-a716-446655440000',
eventType: 'OrderPlaced',
aggregateId: 'order-12345',
aggregateType: 'Order',
timestamp: new Date('2026-03-14T09:00:00Z'),
version: 1,
payload: {
customerId: 'cust-67890',
items: [
{ productId: 'prod-001', quantity: 2, price: 29900 },
{ productId: 'prod-002', quantity: 1, price: 15000 },
],
totalAmount: 74800,
shippingAddress: {
city: 'Seoul',
district: 'Gangnam-gu',
detail: 'Teheran-ro 123',
},
},
metadata: {
correlationId: 'corr-abc123',
causationId: 'cmd-place-order-001',
userId: 'user-admin-01',
},
}
CQRS Pattern: Separating Commands and Queries
What Is CQRS?
CQRS (Command Query Responsibility Segregation) is a pattern that separates data writes (Commands) and reads (Queries) into separate models. It extends Bertrand Meyer's CQS (Command Query Separation) principle to the architectural level, formalized by Greg Young in 2010.
In the traditional CRUD model, the same data model handles both reads and writes. However, in real business scenarios, read and write requirements differ significantly.
| Aspect | Command (Write) | Query (Read) |
|---|---|---|
| Purpose | State change, business rule validation | Data retrieval, display |
| Ratio | 10-20% of total traffic | 80-90% of total traffic |
| Consistency | Strong consistency needed | Eventual consistency acceptable |
| Model Complexity | Rich domain logic | Denormalized read models |
| Scaling | Primarily vertical scaling | Easy horizontal scaling (cache, replicas) |
CQRS Implementation: TypeScript Example
// TypeScript - CQRS Command Side
// Command definition
interface PlaceOrderCommand {
type: 'PlaceOrder'
customerId: string
items: Array<{
productId: string
quantity: number
price: number
}>
shippingAddress: string
}
// Command Handler
class PlaceOrderHandler {
constructor(
private orderRepository: OrderWriteRepository,
private eventBus: EventBus,
private inventoryService: InventoryService
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// 1. Business rule validation
await this.inventoryService.validateStock(command.items)
// 2. Aggregate creation
const order = Order.create({
customerId: command.customerId,
items: command.items,
shippingAddress: command.shippingAddress,
})
// 3. Save to write store
await this.orderRepository.save(order)
// 4. Publish domain events (for read model sync)
for (const event of order.getDomainEvents()) {
await this.eventBus.publish(event)
}
return order.id
}
}
// Query Side - Read-only model
interface OrderReadModel {
orderId: string
customerName: string
orderDate: string
status: string
totalAmount: number
itemCount: number
lastUpdated: string
}
// Query Handler
class GetOrdersQueryHandler {
constructor(private readDb: ReadDatabase) {}
async handle(query: {
customerId: string
status?: string
page: number
limit: number
}): Promise<OrderReadModel[]> {
// Query directly from denormalized read-only table
return this.readDb.query(
`SELECT order_id, customer_name, order_date, status,
total_amount, item_count, last_updated
FROM order_read_model
WHERE customer_id = ?
${query.status ? 'AND status = ?' : ''}
ORDER BY order_date DESC
LIMIT ? OFFSET ?`,
[query.customerId, query.status, query.limit, query.page * query.limit]
)
}
}
Read Model Projection
The projection logic that receives events and updates the read model is the core of CQRS.
// TypeScript - Event-based Projection
class OrderProjection {
constructor(private readDb: ReadDatabase) {}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderPlaced':
await this.onOrderPlaced(event)
break
case 'OrderShipped':
await this.onOrderShipped(event)
break
case 'OrderCancelled':
await this.onOrderCancelled(event)
break
}
}
private async onOrderPlaced(event: DomainEvent): Promise<void> {
const payload = event.payload as {
customerId: string
items: Array<{ quantity: number; price: number }>
totalAmount: number
}
await this.readDb.upsert('order_read_model', {
order_id: event.aggregateId,
customer_id: payload.customerId,
status: 'PLACED',
total_amount: payload.totalAmount,
item_count: payload.items.reduce((sum, i) => sum + i.quantity, 0),
order_date: event.timestamp,
last_updated: event.timestamp,
version: event.version,
})
}
private async onOrderShipped(event: DomainEvent): Promise<void> {
const payload = event.payload as { trackingNumber: string }
// Idempotency guarantee: version check
await this.readDb.updateWhere(
'order_read_model',
{
status: 'SHIPPED',
tracking_number: payload.trackingNumber,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId, version: event.version - 1 }
)
}
private async onOrderCancelled(event: DomainEvent): Promise<void> {
const payload = event.payload as { reason: string }
await this.readDb.updateWhere(
'order_read_model',
{
status: 'CANCELLED',
cancellation_reason: payload.reason,
last_updated: event.timestamp,
version: event.version,
},
{ order_id: event.aggregateId }
)
}
}
Event Sourcing: Event-Based State Management
Core Concept of Event Sourcing
Event Sourcing is a pattern that stores all events that changed the state of an aggregate in order, instead of storing the current state. The current state is reconstructed by replaying the event stream from the beginning.
Comparing the traditional approach with Event Sourcing:
| Aspect | Traditional CRUD | Event Sourcing |
|---|---|---|
| What is stored | Current state (latest snapshot) | Complete history of state changes |
| Data loss | Previous states are lost | All change history is preserved |
| Audit | Requires separate implementation | Built-in |
| Debugging | Only current state viewable | Time Travel possible |
| Storage space | Relatively small | Increases with event accumulation |
| Query perf | Direct queries possible | Requires projection or snapshots |
Event Sourcing Implementation
// TypeScript - Event Sourcing Aggregate
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
protected version: number = 0
abstract get id(): string
// Apply event (state change)
protected apply(event: DomainEvent): void {
this.when(event)
this.version++
this.uncommittedEvents.push(event)
}
// Event handler (implemented by subclass)
protected abstract when(event: DomainEvent): void
// Restore state from event stream
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this.version++
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
}
// Order Aggregate
class Order extends EventSourcedAggregate {
private _id: string = ''
private _customerId: string = ''
private _items: OrderItem[] = []
private _status: OrderStatus = OrderStatus.DRAFT
private _totalAmount: number = 0
get id(): string {
return this._id
}
// Factory method (create command)
static create(params: { orderId: string; customerId: string; items: OrderItem[] }): Order {
const order = new Order()
const totalAmount = params.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
order.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderPlaced',
aggregateId: params.orderId,
aggregateType: 'Order',
timestamp: new Date(),
version: 1,
payload: {
customerId: params.customerId,
items: params.items,
totalAmount,
},
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'create',
},
})
return order
}
// Confirm order command
confirm(): void {
if (this._status !== OrderStatus.PLACED) {
throw new Error(`Cannot confirm order in status: ${this._status}`)
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderConfirmed',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { confirmedAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'confirm',
},
})
}
// Cancel order command (used in compensating transactions)
cancel(reason: string): void {
if (this._status === OrderStatus.CANCELLED) {
return // Idempotency guarantee
}
this.apply({
eventId: crypto.randomUUID(),
eventType: 'OrderCancelled',
aggregateId: this._id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
payload: { reason, cancelledAt: new Date().toISOString() },
metadata: {
correlationId: crypto.randomUUID(),
causationId: 'cancel',
},
})
}
// Event handler - state change logic
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced': {
const p = event.payload as {
customerId: string
items: OrderItem[]
totalAmount: number
}
this._id = event.aggregateId
this._customerId = p.customerId
this._items = p.items
this._totalAmount = p.totalAmount
this._status = OrderStatus.PLACED
break
}
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED
break
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED
break
}
}
}
enum OrderStatus {
DRAFT = 'DRAFT',
PLACED = 'PLACED',
CONFIRMED = 'CONFIRMED',
SHIPPED = 'SHIPPED',
CANCELLED = 'CANCELLED',
}
interface OrderItem {
productId: string
quantity: number
price: number
}
Snapshot Strategy
As the number of events grows, replay time increases. Snapshots cache state at specific points to improve replay performance.
# Python - Snapshot-based Event Store
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
@dataclass
class Snapshot:
aggregate_id: str
aggregate_type: str
version: int
state: dict
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
SNAPSHOT_INTERVAL = 100 # Create snapshot every 100 events
def __init__(self, db_connection):
self.db = db_connection
async def save_events(
self, aggregate_id: str, events: list[dict], expected_version: int
) -> None:
"""Save events with optimistic concurrency control"""
async with self.db.transaction():
# Check current version (optimistic locking)
current_version = await self._get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but current version is {current_version}"
)
# Batch insert events
for i, event in enumerate(events):
version = expected_version + i + 1
await self.db.execute(
"""INSERT INTO event_store
(event_id, aggregate_id, aggregate_type,
event_type, version, payload, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
event["event_id"],
aggregate_id,
event["aggregate_type"],
event["event_type"],
version,
json.dumps(event["payload"]),
json.dumps(event["metadata"]),
datetime.utcnow(),
),
)
# Check snapshot creation condition
new_version = expected_version + len(events)
if new_version % self.SNAPSHOT_INTERVAL == 0:
await self._create_snapshot(aggregate_id, new_version)
async def load_aggregate(self, aggregate_id: str) -> tuple[list[dict], int]:
"""Load events from snapshot to restore state"""
# 1. Query most recent snapshot
snapshot = await self._get_latest_snapshot(aggregate_id)
if snapshot:
# 2. Load only events after snapshot
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ? AND version > ?
ORDER BY version ASC""",
(aggregate_id, snapshot.version),
)
return events, snapshot
else:
# 3. Load all events if no snapshot
events = await self.db.fetch_all(
"""SELECT * FROM event_store
WHERE aggregate_id = ?
ORDER BY version ASC""",
(aggregate_id,),
)
return events, None
async def _create_snapshot(
self, aggregate_id: str, version: int
) -> None:
"""Save current state as snapshot"""
events, _ = await self.load_aggregate(aggregate_id)
# Rebuild aggregate and serialize state
aggregate = self._rebuild_aggregate(events)
await self.db.execute(
"""INSERT INTO snapshots
(aggregate_id, aggregate_type, version, state, created_at)
VALUES (?, ?, ?, ?, ?)""",
(
aggregate_id,
aggregate.aggregate_type,
version,
json.dumps(aggregate.to_dict()),
datetime.utcnow(),
),
)
async def _get_current_version(self, aggregate_id: str) -> int:
result = await self.db.fetch_one(
"SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
(aggregate_id,),
)
return result[0] if result[0] else 0
class ConcurrencyError(Exception):
pass
Saga Pattern: Distributed Transaction Management
What Is the Saga Pattern?
The Saga pattern manages business transactions spanning multiple services in distributed environments. Unlike traditional distributed transactions (2PC), it sequentially executes each service's local transactions and, upon failure, executes compensating transactions for already completed steps to restore consistency.
An example of an order processing Saga flow:
Normal flow:
- Order Service: Create order (OrderCreated)
- Payment Service: Process payment (PaymentProcessed)
- Inventory Service: Reserve inventory (InventoryReserved)
- Shipping Service: Create shipment (ShipmentCreated)
Compensation flow on failure (when step 3 inventory reservation fails):
- Payment Service: Refund payment (PaymentRefunded) -- compensation
- Order Service: Cancel order (OrderCancelled) -- compensation
Orchestration-based Saga Implementation
// TypeScript - Saga Orchestrator (Order Processing)
interface SagaStep {
name: string
action: () => Promise<void>
compensation: () => Promise<void>
}
class OrderSagaOrchestrator {
private completedSteps: SagaStep[] = []
private sagaLog: SagaLogEntry[] = []
constructor(
private paymentService: PaymentService,
private inventoryService: InventoryService,
private shippingService: ShippingService,
private sagaStore: SagaStore
) {}
async execute(orderId: string, orderData: OrderData): Promise<SagaResult> {
const sagaId = crypto.randomUUID()
const steps: SagaStep[] = [
{
name: 'ProcessPayment',
action: async () => {
await this.paymentService.processPayment({
orderId,
amount: orderData.totalAmount,
customerId: orderData.customerId,
})
},
compensation: async () => {
await this.paymentService.refundPayment({
orderId,
amount: orderData.totalAmount,
})
},
},
{
name: 'ReserveInventory',
action: async () => {
await this.inventoryService.reserve({
orderId,
items: orderData.items,
})
},
compensation: async () => {
await this.inventoryService.releaseReservation({
orderId,
items: orderData.items,
})
},
},
{
name: 'CreateShipment',
action: async () => {
await this.shippingService.createShipment({
orderId,
address: orderData.shippingAddress,
items: orderData.items,
})
},
compensation: async () => {
await this.shippingService.cancelShipment({ orderId })
},
},
]
try {
for (const step of steps) {
await this.logStep(sagaId, step.name, 'STARTED')
try {
await step.action()
this.completedSteps.push(step)
await this.logStep(sagaId, step.name, 'COMPLETED')
} catch (error) {
await this.logStep(sagaId, step.name, 'FAILED', error)
// Execute compensating transactions
await this.compensate(sagaId)
return {
success: false,
sagaId,
failedStep: step.name,
error: (error as Error).message,
}
}
}
await this.sagaStore.markCompleted(sagaId)
return { success: true, sagaId }
} catch (compensationError) {
// When compensation transaction also fails - manual intervention needed
await this.sagaStore.markRequiresIntervention(sagaId)
throw new SagaCompensationFailedError(sagaId, compensationError as Error)
}
}
private async compensate(sagaId: string): Promise<void> {
// Compensate completed steps in reverse order
const stepsToCompensate = [...this.completedSteps].reverse()
for (const step of stepsToCompensate) {
try {
await this.logStep(sagaId, step.name, 'COMPENSATING')
await step.compensation()
await this.logStep(sagaId, step.name, 'COMPENSATED')
} catch (error) {
await this.logStep(sagaId, step.name, 'COMPENSATION_FAILED', error)
// Register in retry queue on compensation failure
await this.sagaStore.enqueueRetry(sagaId, step.name)
throw error
}
}
}
private async logStep(
sagaId: string,
stepName: string,
status: string,
error?: unknown
): Promise<void> {
const entry: SagaLogEntry = {
sagaId,
stepName,
status,
timestamp: new Date(),
error: error ? (error as Error).message : undefined,
}
this.sagaLog.push(entry)
await this.sagaStore.appendLog(entry)
}
}
interface SagaResult {
success: boolean
sagaId: string
failedStep?: string
error?: string
}
interface SagaLogEntry {
sagaId: string
stepName: string
status: string
timestamp: Date
error?: string
}
Choreography vs Orchestration Comparison
A detailed comparison of the two Saga pattern implementation approaches.
| Comparison | Choreography | Orchestration |
|---|---|---|
| Control method | Distributed - each service publishes/subscribes events | Centralized - orchestrator controls the flow |
| Coupling | Low (only event schema shared) | Medium (orchestrator must know all services) |
| Visibility | Low (hard to trace flow) | High (state viewable in orchestrator) |
| Complexity mgmt | Rapidly complex as services increase | Increases linearly |
| SPOF | None | Orchestrator can be SPOF |
| Compensation | Distributed across services | Centralized in orchestrator |
| Testing | Integration testing difficult | Orchestrator unit testing easy |
| Suitable scale | Simple workflows with 2-4 services | Complex workflows with 5+ services |
| Representative tools | Kafka, RabbitMQ, SNS/SQS | Temporal, Camunda, AWS Step Functions |
Choreography Pattern Code Example
# Python - Choreography-based Saga (event subscription approach)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class EventBus:
"""Simple in-memory event bus (use Kafka/RabbitMQ in production)"""
def __init__(self):
self._handlers: dict[str, list] = {}
def subscribe(self, event_type: str, handler):
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, payload: dict):
handlers = self._handlers.get(event_type, [])
for handler in handlers:
await handler(payload)
# Payment Service - Choreography approach
class PaymentService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
# Subscribe to order creation event
event_bus.subscribe("OrderCreated", self.on_order_created)
# Subscribe to inventory failure event (for compensation)
event_bus.subscribe("InventoryReservationFailed", self.on_inventory_failed)
async def on_order_created(self, payload: dict):
"""Process payment on order creation"""
try:
order_id = payload["order_id"]
amount = payload["total_amount"]
# Payment processing logic
payment_result = await self._process_payment(
order_id, amount
)
# Publish success event
await self.event_bus.publish("PaymentProcessed", {
"order_id": order_id,
"payment_id": payment_result["payment_id"],
"amount": amount,
})
except Exception as e:
# Publish failure event
await self.event_bus.publish("PaymentFailed", {
"order_id": payload["order_id"],
"reason": str(e),
})
async def on_inventory_failed(self, payload: dict):
"""Refund payment on inventory failure (compensating transaction)"""
order_id = payload["order_id"]
await self._refund_payment(order_id)
await self.event_bus.publish("PaymentRefunded", {
"order_id": order_id,
})
async def _process_payment(self, order_id: str, amount: int) -> dict:
# Actual payment processing logic
return {"payment_id": f"pay-{order_id}"}
async def _refund_payment(self, order_id: str) -> None:
# Actual refund processing logic
pass
# Inventory Service - Choreography approach
class InventoryService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
# Subscribe to payment completion event
event_bus.subscribe("PaymentProcessed", self.on_payment_processed)
async def on_payment_processed(self, payload: dict):
"""Reserve inventory on payment completion"""
try:
order_id = payload["order_id"]
# Check and reserve inventory
await self._reserve_inventory(order_id)
await self.event_bus.publish("InventoryReserved", {
"order_id": order_id,
})
except InsufficientStockError:
await self.event_bus.publish("InventoryReservationFailed", {
"order_id": payload["order_id"],
"reason": "insufficient_stock",
})
class InsufficientStockError(Exception):
pass
Hybrid Approach: When to Choose Which
In practice, rather than using pure Choreography or Orchestration alone, mixing them based on workflow complexity is effective.
- Choose Choreography: Simple notifications, cache invalidation, log collection with 2-3 participating services
- Choose Orchestration: Core business flows like payment-inventory-shipping where order and compensation matter
- Hybrid: Core business flows via Orchestration, side effects (email, notifications, analytics events) via Choreography
Event Store Selection (EventStoreDB, Kafka, DynamoDB)
Event store selection varies based on system requirements and team capabilities. Here is a comparison of the three main options.
| Comparison | EventStoreDB | Apache Kafka | DynamoDB Streams |
|---|---|---|---|
| Design purpose | Dedicated Event Sourcing DB | Distributed message streaming platform | General NoSQL + change streams |
| Stream model | Fine-grained individual streams (per Aggregate) | Topic-partition based | Table + DynamoDB Streams/Kinesis |
| Concurrency | ExpectedVersion (optimistic locking) built-in | Partition-level ordering only | Conditional writes (ConditionExpression) |
| ID lookup | Instant lookup by stream ID | Cannot look up specific entity in topic | Direct lookup by partition key |
| Event ordering | Fully guaranteed within stream | Guaranteed only within partition | Guaranteed within partition key |
| Projections | Server-side projections built-in | Kafka Streams / ksqlDB | Lambda + DynamoDB Streams |
| Subscription | Persistent / Catch-up subscriptions | Consumer Group | DynamoDB Streams / Kinesis |
| Ops complexity | Medium (dedicated cluster) | High (ZooKeeper/KRaft, partition mgmt) | Low (serverless, AWS managed) |
| Cost model | Open source + commercial cloud | Open source + MSK/Confluent Cloud | Pay-per-request/storage |
| Best fit | Pure Event Sourcing systems | High-volume event streaming + integration | AWS native, serverless architecture |
Selection Guide
When EventStoreDB is suitable:
- Event Sourcing is the core architectural pattern
- Fine-grained stream management and projections are needed
- Team actively uses DDD-based design
When Kafka is suitable:
- High-volume event streaming is the main purpose
- Kafka infrastructure already exists and team expertise is sufficient
- Both Event Sourcing and event streaming are needed (consider EventStoreDB + Kafka combination)
When DynamoDB is suitable:
- Primarily using the AWS ecosystem
- Targeting serverless architecture
- Minimizing operational burden
EventStoreDB Configuration Example
# docker-compose.yml - EventStoreDB cluster setup
version: '3.8'
services:
eventstoredb:
image: eventstore/eventstore:24.2
container_name: eventstoredb
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_START_STANDARD_PROJECTIONS=true
- EVENTSTORE_INSECURE=true
- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
- EVENTSTORE_MEM_DB=false
- EVENTSTORE_DB=/var/lib/eventstore-data
- EVENTSTORE_INDEX=/var/lib/eventstore-index
- EVENTSTORE_LOG=/var/log/eventstore
ports:
- '2113:2113' # HTTP/gRPC
- '1113:1113' # TCP (legacy)
volumes:
- eventstore-data:/var/lib/eventstore-data
- eventstore-index:/var/lib/eventstore-index
- eventstore-logs:/var/log/eventstore
# Kafka - for event streaming (combined with EventStoreDB)
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
ports:
- '9092:9092'
volumes:
- kafka-data:/var/lib/kafka/data
volumes:
eventstore-data:
eventstore-index:
eventstore-logs:
kafka-data:
Production Implementation: Order System Example
Let us examine a production implementation that integrates CQRS, Event Sourcing, and Saga patterns through an e-commerce order system.
System Architecture Overview
The overall system consists of the following structure:
- Order Service: Order Aggregate (Event Sourcing), CQRS applied
- Payment Service: Payment processing, refund compensating transaction
- Inventory Service: Inventory reservation, release compensating transaction
- Shipping Service: Shipment creation, cancellation compensating transaction
- Saga Orchestrator: Temporal-based workflow management
Temporal-based Saga Workflow
// TypeScript - Order Saga implemented with Temporal Workflow
import { proxyActivities, defineSignal, setHandler, condition, sleep } from '@temporalio/workflow'
// Activity proxies
const payment = proxyActivities<PaymentActivities>({
startToCloseTimeout: '30s',
retry: {
maximumAttempts: 3,
initialInterval: '1s',
backoffCoefficient: 2,
maximumInterval: '30s',
},
})
const inventory = proxyActivities<InventoryActivities>({
startToCloseTimeout: '10s',
retry: { maximumAttempts: 3 },
})
const shipping = proxyActivities<ShippingActivities>({
startToCloseTimeout: '15s',
retry: { maximumAttempts: 3 },
})
const notification = proxyActivities<NotificationActivities>({
startToCloseTimeout: '5s',
retry: { maximumAttempts: 5 },
})
// Signal definition (send messages to workflow from outside)
const cancelOrderSignal = defineSignal<[string]>('cancelOrder')
// Order Processing Saga Workflow
export async function orderSagaWorkflow(input: OrderSagaInput): Promise<OrderSagaResult> {
let isCancelled = false
let cancelReason = ''
// Cancel signal handler
setHandler(cancelOrderSignal, (reason: string) => {
isCancelled = true
cancelReason = reason
})
const compensations: Array<() => Promise<void>> = []
try {
// Step 1: Process payment
if (isCancelled) throw new SagaCancelledError(cancelReason)
const paymentResult = await payment.processPayment({
orderId: input.orderId,
amount: input.totalAmount,
customerId: input.customerId,
})
compensations.push(async () => {
await payment.refundPayment({
orderId: input.orderId,
paymentId: paymentResult.paymentId,
amount: input.totalAmount,
})
})
// Step 2: Reserve inventory
if (isCancelled) throw new SagaCancelledError(cancelReason)
await inventory.reserveInventory({
orderId: input.orderId,
items: input.items,
})
compensations.push(async () => {
await inventory.releaseReservation({
orderId: input.orderId,
items: input.items,
})
})
// Step 3: Create shipment
if (isCancelled) throw new SagaCancelledError(cancelReason)
const shipmentResult = await shipping.createShipment({
orderId: input.orderId,
address: input.shippingAddress,
items: input.items,
})
compensations.push(async () => {
await shipping.cancelShipment({
orderId: input.orderId,
shipmentId: shipmentResult.shipmentId,
})
})
// All steps succeeded - send confirmation notification (Saga unaffected by failure)
await notification
.sendOrderConfirmation({
orderId: input.orderId,
customerId: input.customerId,
})
.catch(() => {
/* Notification failure is ignored */
})
return {
success: true,
orderId: input.orderId,
paymentId: paymentResult.paymentId,
shipmentId: shipmentResult.shipmentId,
}
} catch (error) {
// Execute compensating transactions (reverse order)
for (const compensate of compensations.reverse()) {
try {
await compensate()
} catch (compError) {
// Temporal automatically manages retries
// Final failure sends to Dead Letter Queue
console.error('Compensation failed:', compError)
}
}
return {
success: false,
orderId: input.orderId,
error: (error as Error).message,
}
}
}
interface OrderSagaInput {
orderId: string
customerId: string
totalAmount: number
shippingAddress: string
items: Array<{
productId: string
quantity: number
price: number
}>
}
interface OrderSagaResult {
success: boolean
orderId: string
paymentId?: string
shipmentId?: string
error?: string
}
class SagaCancelledError extends Error {
constructor(reason: string) {
super(`Saga cancelled: ${reason}`)
this.name = 'SagaCancelledError'
}
}
DynamoDB-based Event Store Schema
# AWS CloudFormation - DynamoDB Event Store Table
AWSTemplateFormatVersion: '2010-09-09'
Description: Event Store on DynamoDB
Resources:
EventStoreTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: event-store
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: aggregateId
AttributeType: S
- AttributeName: version
AttributeType: N
- AttributeName: eventType
AttributeType: S
- AttributeName: timestamp
AttributeType: S
KeySchema:
- AttributeName: aggregateId
KeyType: HASH
- AttributeName: version
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: eventType-timestamp-index
KeySchema:
- AttributeName: eventType
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
Tags:
- Key: Environment
Value: production
- Key: Service
Value: event-store
SnapshotTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: event-store-snapshots
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: aggregateId
AttributeType: S
- AttributeName: version
AttributeType: N
KeySchema:
- AttributeName: aggregateId
KeyType: HASH
- AttributeName: version
KeyType: RANGE
Tags:
- Key: Environment
Value: production
SagaStateTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: saga-state
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: sagaId
AttributeType: S
- AttributeName: createdAt
AttributeType: S
KeySchema:
- AttributeName: sagaId
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: createdAt-index
KeySchema:
- AttributeName: createdAt
KeyType: HASH
Projection:
ProjectionType: ALL
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
Operational Considerations and Troubleshooting
1. Ensuring Idempotency
In distributed environments, events have "at-least-once" delivery guarantees, so the same event may be processed multiple times. All event handlers must guarantee idempotency.
// TypeScript - Idempotency guarantee pattern
class IdempotentEventHandler {
constructor(
private processedEvents: ProcessedEventStore,
private handler: EventHandler
) {}
async handle(event: DomainEvent): Promise<void> {
// Check if event was already processed
const isProcessed = await this.processedEvents.exists(event.eventId)
if (isProcessed) {
console.log(`Event ${event.eventId} already processed, skipping`)
return
}
try {
// Process event
await this.handler.handle(event)
// Record completion (auto-cleanup with TTL)
await this.processedEvents.markAsProcessed(event.eventId, {
processedAt: new Date(),
ttl: 60 * 60 * 24 * 7, // Expire after 7 days
})
} catch (error) {
// Not recorded on failure so retry is possible
throw error
}
}
}
// Redis-based duplicate detection
class RedisProcessedEventStore implements ProcessedEventStore {
constructor(private redis: Redis) {}
async exists(eventId: string): Promise<boolean> {
const result = await this.redis.exists(`processed:${eventId}`)
return result === 1
}
async markAsProcessed(
eventId: string,
options: { processedAt: Date; ttl: number }
): Promise<void> {
await this.redis.setex(`processed:${eventId}`, options.ttl, options.processedAt.toISOString())
}
}
2. Ensuring Event Ordering
To guarantee event ordering in Kafka, events from the same Aggregate must be routed to the same partition. Using the Aggregate ID as the partition key is standard practice.
3. Schema Evolution
Since events are immutable, schema changes must maintain backward compatibility. Use a schema registry like Avro or Protobuf, or apply the upcasting pattern.
// TypeScript - Event Upcaster (Schema Evolution)
class EventUpcaster {
private upcasters: Map<string, Map<number, (event: DomainEvent) => DomainEvent>> = new Map()
register(
eventType: string,
fromVersion: number,
upcaster: (event: DomainEvent) => DomainEvent
): void {
if (!this.upcasters.has(eventType)) {
this.upcasters.set(eventType, new Map())
}
this.upcasters.get(eventType)!.set(fromVersion, upcaster)
}
upcast(event: DomainEvent): DomainEvent {
const typeUpcasters = this.upcasters.get(event.eventType)
if (!typeUpcasters) return event
let currentEvent = event
let schemaVersion = (event.metadata as any).schemaVersion || 1
while (typeUpcasters.has(schemaVersion)) {
const upcasterFn = typeUpcasters.get(schemaVersion)!
currentEvent = upcasterFn(currentEvent)
schemaVersion++
}
return currentEvent
}
}
// Usage example: OrderPlaced event v1 -> v2 upcasting
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => {
// v1 had shippingAddress as a string
// v2 changed to a structured object
const payload = event.payload as any
return {
...event,
payload: {
...payload,
shippingAddress: {
full: payload.shippingAddress,
city: '',
zipCode: '',
},
},
metadata: {
...event.metadata,
schemaVersion: 2,
},
}
})
4. Key Monitoring Metrics
The following metrics should be monitored to check the health of an EDA system.
| Metric | Description | Alert Threshold |
|---|---|---|
| Consumer Lag | Consumer processing delay (unprocessed events) | Over 1,000 events |
| Event Processing Latency | Time from event publication to processing | P99 over 5 seconds |
| Saga Completion Rate | Saga success rate | Below 99% |
| Compensation Failure Rate | Compensating transaction failure rate | Over 0.1% |
| Projection Lag | Sync delay between read and write models | Over 30 seconds |
| Dead Letter Queue Size | Number of unprocessable events | Immediate alert when > 0 |
Failure Cases and Recovery Procedures
Case 1: Compensation Transaction Failure (Most Dangerous Scenario)
Problem: Payment was completed but inventory reservation failed. The compensation (refund) was attempted but the refund also failed due to payment gateway timeout.
Recovery procedure:
- Mark Saga state as "REQUIRES_INTERVENTION"
- Register failed compensation transaction in Dead Letter Queue
- Automatic retry (exponential backoff): 1s, 2s, 4s, 8s, 16s
- Alert ops team via PagerDuty/Slack when max retries exceeded
- Operator manually processes refund via payment gateway console
- Update Saga state to "MANUALLY_COMPENSATED"
Case 2: Event Order Reversal
Problem: Due to network delay, OrderCancelled event arrives before OrderConfirmed
Prevention and recovery:
- Include version field in events for order verification
- Buffer and re-sort when version does not match expectations
- Using EventStoreDB prevents this issue entirely since ordering within streams is guaranteed
Case 3: Read Model Inconsistency Due to Projection Failure
Problem: Projection process crashes and the read model does not reflect the latest state
Recovery procedure:
- Check the projection process's last checkpoint
- Re-run projection from events after the checkpoint
- For severe inconsistency, drop the read model and replay all events
- During replay, redirect read traffic to cache or write DB
# Python - Projection recovery script
import asyncio
from datetime import datetime
class ProjectionRecovery:
def __init__(self, event_store, read_db, projection):
self.event_store = event_store
self.read_db = read_db
self.projection = projection
async def recover_from_checkpoint(self) -> dict:
"""Checkpoint-based projection recovery"""
# 1. Check last checkpoint
checkpoint = await self.read_db.get_checkpoint(
self.projection.name
)
last_position = checkpoint.get("position", 0) if checkpoint else 0
print(
f"Recovering projection '{self.projection.name}' "
f"from position {last_position}"
)
# 2. Load events after checkpoint
events = await self.event_store.read_all_from(last_position)
processed = 0
errors = 0
for event in events:
try:
await self.projection.handle(event)
processed += 1
# Update checkpoint every 100 events
if processed % 100 == 0:
await self.read_db.save_checkpoint(
self.projection.name,
{"position": event["global_position"]},
)
except Exception as e:
errors += 1
print(
f"Error processing event "
f"{event['event_id']}: {e}"
)
# Continue after logging error (skip & log)
continue
# 3. Save final checkpoint
if events:
await self.read_db.save_checkpoint(
self.projection.name,
{"position": events[-1]["global_position"]},
)
return {
"projection": self.projection.name,
"processed": processed,
"errors": errors,
"recovered_at": datetime.utcnow().isoformat(),
}
async def full_rebuild(self) -> dict:
"""Full rebuild of read model"""
print(
f"Full rebuild of projection '{self.projection.name}'"
)
# 1. Delete existing read model
await self.read_db.drop_projection_data(self.projection.name)
# 2. Reset checkpoint
await self.read_db.save_checkpoint(
self.projection.name, {"position": 0}
)
# 3. Replay all events
return await self.recover_from_checkpoint()
Case 4: Event Store Disk Shortage
Problem: Events accumulate continuously and disk capacity becomes insufficient
Prevention strategy:
- After creating snapshots, move old events to archive (S3/Glacier)
- Establish event retention policy: hot data (30 days), warm data (1 year), cold data (permanent archive)
- Set disk usage alerts at 80% warning, 90% critical
Conclusion
The three core patterns of Event-Driven Architecture -- CQRS, Event Sourcing, and Saga -- are each powerful independently but fundamentally solve data consistency problems in microservice architecture when used together.
Key takeaways:
-
CQRS: Acknowledge and separate the asymmetric requirements of reads and writes. You can independently optimize read traffic that comprises 80-90% of total traffic.
-
Event Sourcing: Store change history instead of current state. This enables complete audit trails, time-travel debugging, and diverse read model generation. However, snapshot strategies and schema evolution strategies must be considered from the start.
-
Saga Pattern: Manage distributed transactions with compensation-based approaches. Implement simple flows with Choreography and complex business logic with Orchestration, but always prepare recovery strategies for compensation transaction failures.
-
Event Store Selection: Consider EventStoreDB for pure Event Sourcing, Kafka for high-volume streaming, and DynamoDB for AWS serverless, with combinations possible based on requirements.
These patterns are powerful but come with the cost of complexity. Rather than applying them uniformly to all services, the realistic strategy is to introduce them incrementally starting with core domains where business complexity is high and data consistency is critical. Adopting these patterns without operational monitoring, idempotency guarantees, and schema evolution strategies can actually harm system stability.
References
- Microsoft Azure - CQRS Pattern - Official reference architecture documentation for the CQRS pattern
- Microservices.io - Event Sourcing Pattern - Chris Richardson's Event Sourcing pattern explanation
- Microservices.io - Saga Pattern - Detailed guide for the Saga pattern for distributed transaction management
- EventStoreDB Documentation - EventStoreDB official documentation and getting started guide
- Temporal.io - Mastering Saga Patterns - Temporal-based Saga pattern implementation master guide
- ByteByteGo - Saga Pattern Demystified - Orchestration vs Choreography comparison analysis
- Microsoft - Exploring CQRS and Event Sourcing - Microsoft patterns and practices CQRS Journey guidebook
- Domain Centric - EventStoreDB vs Kafka - Detailed comparison of EventStoreDB and Kafka
- AWS - Event Sourcing on AWS - AWS-based Event Sourcing architecture patterns
- Azure Architecture Center - Saga Design Pattern - Microsoft's official Saga design pattern documentation