- Published on
Event Sourcing + CQRS Architecture Implementation Guide: From Event Store to Projections and Saga Patterns
- Authors
- Name
- Introduction
- 1. Event Sourcing Core Principles
- 2. CQRS Architecture Pattern
- 3. Event Store Design and Implementation
- 4. Aggregate and Domain Event Implementation
- 5. Projections and Read Models
- 6. Saga Pattern and Distributed Transactions
- 7. Snapshots and Event Compaction
- 8. Event Sourcing vs Traditional CRUD
- 9. Failure Scenarios and Recovery Procedures
- 10. Operational Checklist
- Conclusion

Introduction
Traditional CRUD systems store only the current state of data. When an UPDATE executes, the previous value is lost. When a DELETE executes, the data itself is destroyed. You can see that a bank account balance is 1,000 dollars, but you cannot tell what sequence of deposits and withdrawals led to that balance.
Event Sourcing solves this problem at its root. It records state changes themselves as events, and the current state is reconstructed by replaying those events sequentially. CQRS builds on this structure by fully separating write (Command) and read (Query) concerns, optimizing each for its specific requirements.
This guide walks through the complete implementation of an architecture combining Event Sourcing and CQRS. We cover event store design, Aggregate implementation, projection construction, distributed transaction management with the Saga pattern, snapshot optimization, and real-world failure scenarios with recovery procedures.
1. Event Sourcing Core Principles
1.1 What Is an Event?
In Event Sourcing, an event is an immutable fact that occurred in the past. Events are named in the past tense and must clearly convey business intent.
Compare poorly designed events with well-designed ones:
- Bad examples:
NameChanged,EmailChanged- property-level events carry no business meaning - Good examples:
CustomerRelocated,OrderConfirmed- events that express domain behavior
1.2 How State Reconstruction Works
The current state is computed by sequentially applying (replaying) all events to an initial state.
Current State = fold(Initial State, [Event1, Event2, ..., EventN])
For example, reconstructing a bank account state looks like this:
Initial balance: $0
-> AccountOpened(amount: $0) => Balance: $0
-> MoneyDeposited(amount: $1,000) => Balance: $1,000
-> MoneyWithdrawn(amount: $300) => Balance: $700
-> MoneyDeposited(amount: $500) => Balance: $1,200
1.3 The Immutability Principle
Once stored, events can never be modified or deleted. If an incorrect event was published, a compensating event must be issued to logically reverse it.
// When an incorrect withdrawal occurred - do not delete the event
// Instead, publish a compensating event
interface MoneyWithdrawnCompensated {
type: 'MoneyWithdrawnCompensated'
originalEventId: string
amount: number
reason: string
timestamp: Date
}
2. CQRS Architecture Pattern
2.1 Separating Commands and Queries
The core idea of CQRS is straightforward: separate the model that modifies data from the model that reads data.
[Client]
|
|-- Command(write) --> [Command Handler] --> [Write Model / Event Store]
| |
| Events published
| |
| v
| [Projection Engine]
| |
| v
+-- Query(read) ----> [Query Handler] ----> [Read Model / View DB]
2.2 Why Separate?
Read and write requirements are fundamentally different:
- Write (Command): Requires business rule validation, domain invariant enforcement, and transactional consistency
- Read (Query): Requires diverse view support, low latency, and high throughput
Trying to satisfy both sides with a single model forces compromises on both. CQRS enables independent optimization of each model.
2.3 Command Handler Implementation
// Command definition
interface CreateOrderCommand {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
}
// Command Handler
class OrderCommandHandler {
constructor(
private readonly eventStore: EventStore,
private readonly orderRepository: EventSourcedRepository<Order>
) {}
async handle(command: CreateOrderCommand): Promise<void> {
// 1. Load Aggregate (replay events)
const order = await this.orderRepository.load(command.orderId)
// 2. Execute business logic (generate events)
order.create(command.customerId, command.items)
// 3. Persist events
await this.orderRepository.save(order)
}
}
3. Event Store Design and Implementation
3.1 Event Store Schema
The core table structure for an event store is as follows:
CREATE TABLE events (
-- Global unique identifier
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Stream (Aggregate) identifier
stream_id VARCHAR(255) NOT NULL,
-- Event sequence within the stream (used for optimistic concurrency)
stream_version BIGINT NOT NULL,
-- Event type (used for deserialization)
event_type VARCHAR(255) NOT NULL,
-- Event payload (JSON)
event_data JSONB NOT NULL,
-- Metadata (correlation ID, user info, etc.)
metadata JSONB DEFAULT '{}',
-- Event timestamp
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- Global ordering (used by projections to track overall sequence)
global_position BIGSERIAL NOT NULL,
-- Prevent version duplication within the same stream (optimistic concurrency)
UNIQUE(stream_id, stream_version)
);
-- Index for stream-level event retrieval
CREATE INDEX idx_events_stream ON events(stream_id, stream_version);
-- Index for global event ordering (projections)
CREATE INDEX idx_events_global ON events(global_position);
-- Index for event type lookup
CREATE INDEX idx_events_type ON events(event_type);
3.2 TypeScript Event Store Implementation
import { Pool } from 'pg'
interface DomainEvent {
eventType: string
data: Record<string, unknown>
metadata?: Record<string, unknown>
}
interface StoredEvent extends DomainEvent {
eventId: string
streamId: string
streamVersion: number
globalPosition: number
createdAt: Date
}
class PostgresEventStore {
constructor(private readonly pool: Pool) {}
/**
* Appends events to a stream.
* Uses expectedVersion for optimistic concurrency control.
*/
async appendToStream(
streamId: string,
expectedVersion: number,
events: DomainEvent[]
): Promise<StoredEvent[]> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// Optimistic concurrency check
const versionCheck = await client.query(
'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',
[streamId]
)
const currentVersion = versionCheck.rows[0].current_version ?? -1
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current version is ${currentVersion}`
)
}
const storedEvents: StoredEvent[] = []
for (let i = 0; i < events.length; i++) {
const event = events[i]
const version = expectedVersion + i + 1
const result = await client.query(
`INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING event_id, global_position, created_at`,
[
streamId,
version,
event.eventType,
JSON.stringify(event.data),
JSON.stringify(event.metadata ?? {}),
]
)
storedEvents.push({
eventId: result.rows[0].event_id,
streamId,
streamVersion: version,
globalPosition: result.rows[0].global_position,
createdAt: result.rows[0].created_at,
...event,
})
}
await client.query('COMMIT')
return storedEvents
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
/**
* Reads all events from a stream in order.
*/
async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE stream_id = $1 AND stream_version >= $2
ORDER BY stream_version ASC`,
[streamId, fromVersion]
)
return result.rows.map(this.mapToStoredEvent)
}
/**
* Reads all events in global order (for projections).
*/
async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE global_position > $1
ORDER BY global_position ASC
LIMIT $2`,
[fromPosition, limit]
)
return result.rows.map(this.mapToStoredEvent)
}
private mapToStoredEvent(row: any): StoredEvent {
return {
eventId: row.event_id,
streamId: row.stream_id,
streamVersion: row.stream_version,
globalPosition: row.global_position,
eventType: row.event_type,
data: row.event_data,
metadata: row.metadata,
createdAt: row.created_at,
}
}
}
3.3 Dedicated Event Store Solutions Comparison
| Aspect | EventStoreDB (Kurrent) | Axon Server | PostgreSQL Custom |
|---|---|---|---|
| Type | Dedicated Event DB | CQRS Framework Server | General-purpose RDBMS |
| Projections | Server-side JavaScript | Tracking Processor | Must build manually |
| Subscription Model | Catch-up, Persistent | Tracking, Subscribing | Polling, LISTEN/NOTIFY |
| Clustering | Built-in (Gossip protocol) | Axon Server Enterprise | External solution needed |
| Learning Curve | Medium | High (entire framework) | Low (SQL-based) |
| Flexibility | Medium | Low (Axon ecosystem lock-in) | High |
| Operational Complexity | Medium | High | Low to Medium |
4. Aggregate and Domain Event Implementation
4.1 Aggregate Base Class
In Event Sourcing, the Aggregate is the core unit that publishes events and reconstructs state by replaying events.
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
private _version: number = -1
get version(): number {
return this._version
}
/**
* Called externally to change state.
* Internally creates and applies an event.
*/
protected apply(event: DomainEvent): void {
this.when(event)
this.uncommittedEvents.push(event)
this._version++
}
/**
* Handler that changes internal state based on the event.
* Implemented by subclasses.
*/
protected abstract when(event: DomainEvent): void
/**
* Restores state by replaying stored events.
*/
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this._version++
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
}
4.2 Order Aggregate Implementation
// Domain Events
interface OrderCreated {
eventType: 'OrderCreated'
data: {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; unitPrice: number }>
totalAmount: number
}
}
interface OrderConfirmed {
eventType: 'OrderConfirmed'
data: {
orderId: string
confirmedAt: string
}
}
interface OrderCancelled {
eventType: 'OrderCancelled'
data: {
orderId: string
reason: string
cancelledAt: string
}
}
type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled
// Order Aggregate
class Order extends EventSourcedAggregate {
private orderId!: string
private customerId!: string
private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []
private totalAmount: number = 0
private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'
static create(
orderId: string,
customerId: string,
items: Array<{ productId: string; quantity: number; unitPrice: number }>
): Order {
const order = new Order()
const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)
// Business rule validation
if (items.length === 0) {
throw new Error('An order must contain at least one item')
}
if (totalAmount <= 0) {
throw new Error('Order total must be greater than zero')
}
order.apply({
eventType: 'OrderCreated',
data: { orderId, customerId, items, totalAmount },
})
return order
}
confirm(): void {
if (this.status !== 'CREATED') {
throw new Error('Only orders in CREATED status can be confirmed')
}
this.apply({
eventType: 'OrderConfirmed',
data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },
})
}
cancel(reason: string): void {
if (this.status === 'CANCELLED') {
throw new Error('Order is already cancelled')
}
this.apply({
eventType: 'OrderCancelled',
data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },
})
}
protected when(event: DomainEvent): void {
const orderEvent = event as OrderEvent
switch (orderEvent.eventType) {
case 'OrderCreated':
this.orderId = orderEvent.data.orderId
this.customerId = orderEvent.data.customerId
this.items = orderEvent.data.items
this.totalAmount = orderEvent.data.totalAmount
this.status = 'CREATED'
break
case 'OrderConfirmed':
this.status = 'CONFIRMED'
break
case 'OrderCancelled':
this.status = 'CANCELLED'
break
}
}
}
5. Projections and Read Models
5.1 The Role of Projections
A Projection subscribes to event streams and builds read-optimized views (Read Models). Since querying directly from the Event Sourcing event stream is inefficient, we maintain separate denormalized views tailored to various query requirements.
5.2 Projection Engine Implementation
interface ProjectionCheckpoint {
projectionName: string
lastProcessedPosition: number
}
abstract class Projection {
abstract readonly name: string
abstract handle(event: StoredEvent): Promise<void>
canHandle(eventType: string): boolean {
return this.handledEventTypes().includes(eventType)
}
abstract handledEventTypes(): string[]
}
class ProjectionEngine {
private projections: Projection[] = []
constructor(
private readonly eventStore: PostgresEventStore,
private readonly checkpointStore: CheckpointStore
) {}
register(projection: Projection): void {
this.projections.push(projection)
}
/**
* Runs all registered projections.
* Each projection resumes from its last processed position.
*/
async run(): Promise<void> {
while (true) {
for (const projection of this.projections) {
const checkpoint = await this.checkpointStore.get(projection.name)
const lastPosition = checkpoint?.lastProcessedPosition ?? 0
const events = await this.eventStore.readAll(lastPosition, 100)
for (const event of events) {
if (projection.canHandle(event.eventType)) {
try {
await projection.handle(event)
} catch (error) {
console.error(
`Projection ${projection.name} failed at position ${event.globalPosition}:`,
error
)
break
}
}
await this.checkpointStore.save({
projectionName: projection.name,
lastProcessedPosition: event.globalPosition,
})
}
}
// Polling interval
await new Promise((resolve) => setTimeout(resolve, 500))
}
}
}
5.3 Order Dashboard Read Model
class OrderDashboardProjection extends Projection {
readonly name = 'order-dashboard'
constructor(private readonly db: Pool) {
super()
}
handledEventTypes(): string[] {
return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']
}
async handle(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.db.query(
`INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, 'CREATED', $4)
ON CONFLICT (order_id) DO NOTHING`,
[event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]
)
break
case 'OrderConfirmed':
await this.db.query(
`UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,
[event.data.orderId, event.data.confirmedAt]
)
break
case 'OrderCancelled':
await this.db.query(
`UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,
[event.data.orderId, event.data.reason]
)
break
}
}
}
6. Saga Pattern and Distributed Transactions
6.1 What Is a Saga?
The Saga pattern manages transactions spanning multiple services as a series of local transactions with compensating transactions. Unlike traditional 2PC (Two-Phase Commit), it avoids long-lived locks, providing superior scalability.
Two implementation approaches exist:
- Choreography: Each service publishes events and other services react. No central coordinator.
- Orchestration: A central orchestrator manages the overall flow and dispatches commands to each service.
6.2 Orchestration Saga Implementation
// Saga state definition
type OrderSagaStatus =
| 'STARTED'
| 'PAYMENT_PENDING'
| 'PAYMENT_COMPLETED'
| 'INVENTORY_RESERVED'
| 'COMPLETED'
| 'COMPENSATING'
| 'FAILED'
interface SagaStep {
name: string
execute: () => Promise<void>
compensate: () => Promise<void>
}
class OrderSaga {
private status: OrderSagaStatus = 'STARTED'
private completedSteps: SagaStep[] = []
constructor(
private readonly orderId: string,
private readonly paymentService: PaymentService,
private readonly inventoryService: InventoryService,
private readonly sagaLog: SagaLogStore
) {}
async execute(orderData: {
customerId: string
items: any[]
totalAmount: number
}): Promise<void> {
const steps: SagaStep[] = [
{
name: 'ProcessPayment',
execute: async () => {
await this.paymentService.processPayment(
this.orderId,
orderData.customerId,
orderData.totalAmount
)
this.status = 'PAYMENT_COMPLETED'
},
compensate: async () => {
await this.paymentService.refundPayment(this.orderId)
},
},
{
name: 'ReserveInventory',
execute: async () => {
await this.inventoryService.reserve(this.orderId, orderData.items)
this.status = 'INVENTORY_RESERVED'
},
compensate: async () => {
await this.inventoryService.releaseReservation(this.orderId)
},
},
]
for (const step of steps) {
try {
await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')
await step.execute()
await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')
this.completedSteps.push(step)
} catch (error) {
await this.sagaLog.record(this.orderId, step.name, 'FAILED')
console.error(`Saga step ${step.name} failed:`, error)
// Execute compensating transactions (reverse order)
this.status = 'COMPENSATING'
await this.compensate()
this.status = 'FAILED'
return
}
}
this.status = 'COMPLETED'
await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')
}
private async compensate(): Promise<void> {
// Compensate completed steps in reverse order
const stepsToCompensate = [...this.completedSteps].reverse()
for (const step of stepsToCompensate) {
try {
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')
await step.compensate()
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')
} catch (compensateError) {
// Compensation failure requires manual intervention
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')
console.error(`Compensation failed for step ${step.name}:`, compensateError)
}
}
}
}
6.3 Choreography vs Orchestration Comparison
| Aspect | Choreography | Orchestration |
|---|---|---|
| Coupling | Low (event-based) | Medium (orchestrator dependency) |
| Visibility | Low (hard to trace flows) | High (centralized flow management) |
| Complexity | Grows rapidly with service count | Concentrated in orchestrator |
| Single Point of Failure | None | The orchestrator |
| Testability | Difficult | Relatively easy |
| Best For | Simple flows across 2-3 services | Complex flows across 4+ services |
7. Snapshots and Event Compaction
7.1 Why Snapshots Are Needed
When an Aggregate accumulates thousands or tens of thousands of events, state reconstruction time increases dramatically. Snapshots are an optimization technique that serializes and stores the Aggregate state at a specific point, then replays only subsequent events.
interface Snapshot {
streamId: string
version: number
state: Record<string, unknown>
createdAt: Date
}
class SnapshotRepository {
constructor(private readonly pool: Pool) {}
async save(snapshot: Snapshot): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (stream_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
[snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
)
}
async load(streamId: string): Promise<Snapshot | null> {
const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])
return result.rows.length > 0 ? result.rows[0] : null
}
}
class EventSourcedRepository<T extends EventSourcedAggregate> {
private readonly SNAPSHOT_INTERVAL = 100 // Snapshot every 100 events
constructor(
private readonly eventStore: PostgresEventStore,
private readonly snapshotRepo: SnapshotRepository,
private readonly factory: () => T
) {}
async load(streamId: string): Promise<T> {
const aggregate = this.factory()
// 1. Attempt to load snapshot
const snapshot = await this.snapshotRepo.load(streamId)
let fromVersion = 0
if (snapshot) {
;(aggregate as any).restoreFromSnapshot(snapshot.state)
;(aggregate as any)._version = snapshot.version
fromVersion = snapshot.version + 1
}
// 2. Replay only events after the snapshot
const events = await this.eventStore.readStream(streamId, fromVersion)
aggregate.loadFromHistory(events)
return aggregate
}
async save(aggregate: T, streamId: string): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents()
if (uncommittedEvents.length === 0) return
await this.eventStore.appendToStream(
streamId,
aggregate.version - uncommittedEvents.length,
uncommittedEvents
)
// Check whether to create a snapshot
if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotRepo.save({
streamId,
version: aggregate.version,
state: (aggregate as any).toSnapshot(),
createdAt: new Date(),
})
}
aggregate.clearUncommittedEvents()
}
}
7.2 Snapshot Strategy Guidelines
- Creation frequency: Every 100 to 1,000 events (adjust based on domain characteristics)
- Storage location: Same stream or separate store (separate store recommended)
- Creation timing: Create asynchronously in the background to avoid impacting write latency
- When replay time exceeds 100ms: Actively consider introducing snapshots
8. Event Sourcing vs Traditional CRUD
| Aspect | Event Sourcing | Traditional CRUD |
|---|---|---|
| Data Storage | Stores events (change records) | Stores current state only |
| History Tracking | Complete audit trail built in | Requires separate audit tables |
| Time-Travel Queries | Can reconstruct state at any point | Requires separate snapshot tables |
| Query Complexity | High (separate Read Models needed) | Low (direct SQL queries) |
| Consistency Model | Primarily eventual consistency | Immediate consistency |
| Schema Changes | Event versioning required | ALTER TABLE |
| Debugging | Reproduce issues via event replay | Only current state visible |
| Learning Curve | High | Low |
| Suitable Domains | Finance, orders, logistics (history matters) | Config management, catalogs (simple CRUD) |
9. Failure Scenarios and Recovery Procedures
9.1 Event Store Failure
Symptom: Command processing halts due to event storage failure
Recovery procedure:
- Check DB connection status and identify the root cause
- After event store recovery, retry unprocessed Commands
- Verify global position gaps -- if gaps exist, validate projection integrity
9.2 Projection Sync Lag
Symptom: Read Model does not reflect the latest state
Recovery procedure:
- Compare projection checkpoint with the event store's latest position
- Identify the lag cause (processing speed, failures, poison events, etc.)
- If needed, reset the projection and rebuild from scratch
9.3 Saga Compensation Failure
Symptom: External service failure during compensating transaction execution
Recovery procedure:
- Query the Saga log for COMPENSATION_FAILED status entries
- Manually retry the failed compensation step
- If retry count is exceeded, record in a Dead Letter Queue and escalate for manual intervention
9.4 Event Schema Changes
Symptom: Event structure changes cause deserialization failures for existing events
Recovery procedure:
- Use the Upcaster pattern to transform old-version events to the new version
- Modify projection handlers to support both versions
- Execute a full projection rebuild
10. Operational Checklist
Pre-Adoption Review
- Does the domain truly need Event Sourcing? (History tracking, auditing, time-travel queries)
- Can the system accept eventual consistency?
- Is the team comfortable with DDD and event-driven thinking?
Design Phase
- Do events carry business intent? (Domain behavior, not property changes)
- Are Aggregate boundaries appropriate? (Not too large, not too small)
- Is an event schema versioning strategy established?
- Is a projection rebuild strategy in place?
Operations Phase
- Event store capacity monitoring and archiving strategy
- Projection lag time monitoring
- Saga failure rate and compensation transaction success rate tracking
- Snapshot creation frequency and disk usage management
- Per-stream event count monitoring (detect abnormal growth)
Failure Preparedness
- Document full projection rebuild procedure and estimated completion time
- Establish Saga Dead Letter Queue processing workflow
- Verify event store backup and recovery procedures
- Automate Upcaster testing
Conclusion
Event Sourcing and CQRS are powerful architectural patterns, but they are not suitable for every system. They deliver the greatest value in domains where history tracking, audit logs, and time-travel queries are core requirements -- such as finance, order management, and logistics.
If you are considering adoption, here are our recommendations:
- Apply selectively to core domains only. Not every service needs Event Sourcing.
- Design for projection rebuilds from the start. Rebuilding projections in production is inevitable.
- Establish an event schema evolution strategy first. Events are stored forever, making schema change the highest-cost operation.
- Prepare for Saga compensation failures. In distributed systems, compensation failure is a matter of when, not if.
If your team is ready to embrace eventual consistency and shift to event-driven thinking, Event Sourcing + CQRS can fundamentally improve your system's resilience and scalability.