Skip to content
Published on

Event Sourcing + CQRS Architecture Implementation Guide: From Event Store to Projections and Saga Patterns

Authors
  • Name
    Twitter
Event Sourcing CQRS

Introduction

Traditional CRUD systems store only the current state of data. When an UPDATE executes, the previous value is lost. When a DELETE executes, the data itself is destroyed. You can see that a bank account balance is 1,000 dollars, but you cannot tell what sequence of deposits and withdrawals led to that balance.

Event Sourcing solves this problem at its root. It records state changes themselves as events, and the current state is reconstructed by replaying those events sequentially. CQRS builds on this structure by fully separating write (Command) and read (Query) concerns, optimizing each for its specific requirements.

This guide walks through the complete implementation of an architecture combining Event Sourcing and CQRS. We cover event store design, Aggregate implementation, projection construction, distributed transaction management with the Saga pattern, snapshot optimization, and real-world failure scenarios with recovery procedures.

1. Event Sourcing Core Principles

1.1 What Is an Event?

In Event Sourcing, an event is an immutable fact that occurred in the past. Events are named in the past tense and must clearly convey business intent.

Compare poorly designed events with well-designed ones:

  • Bad examples: NameChanged, EmailChanged - property-level events carry no business meaning
  • Good examples: CustomerRelocated, OrderConfirmed - events that express domain behavior

1.2 How State Reconstruction Works

The current state is computed by sequentially applying (replaying) all events to an initial state.

Current State = fold(Initial State, [Event1, Event2, ..., EventN])

For example, reconstructing a bank account state looks like this:

Initial balance: $0
  -> AccountOpened(amount: $0)         => Balance: $0
  -> MoneyDeposited(amount: $1,000)    => Balance: $1,000
  -> MoneyWithdrawn(amount: $300)      => Balance: $700
  -> MoneyDeposited(amount: $500)      => Balance: $1,200

1.3 The Immutability Principle

Once stored, events can never be modified or deleted. If an incorrect event was published, a compensating event must be issued to logically reverse it.

// When an incorrect withdrawal occurred - do not delete the event
// Instead, publish a compensating event
interface MoneyWithdrawnCompensated {
  type: 'MoneyWithdrawnCompensated'
  originalEventId: string
  amount: number
  reason: string
  timestamp: Date
}

2. CQRS Architecture Pattern

2.1 Separating Commands and Queries

The core idea of CQRS is straightforward: separate the model that modifies data from the model that reads data.

[Client]
    |
    |-- Command(write) --> [Command Handler] --> [Write Model / Event Store]
    |                                                     |
    |                                              Events published
    |                                                     |
    |                                                     v
    |                                           [Projection Engine]
    |                                                     |
    |                                                     v
    +-- Query(read) ----> [Query Handler] ----> [Read Model / View DB]

2.2 Why Separate?

Read and write requirements are fundamentally different:

  • Write (Command): Requires business rule validation, domain invariant enforcement, and transactional consistency
  • Read (Query): Requires diverse view support, low latency, and high throughput

Trying to satisfy both sides with a single model forces compromises on both. CQRS enables independent optimization of each model.

2.3 Command Handler Implementation

// Command definition
interface CreateOrderCommand {
  orderId: string
  customerId: string
  items: Array<{ productId: string; quantity: number; price: number }>
}

// Command Handler
class OrderCommandHandler {
  constructor(
    private readonly eventStore: EventStore,
    private readonly orderRepository: EventSourcedRepository<Order>
  ) {}

  async handle(command: CreateOrderCommand): Promise<void> {
    // 1. Load Aggregate (replay events)
    const order = await this.orderRepository.load(command.orderId)

    // 2. Execute business logic (generate events)
    order.create(command.customerId, command.items)

    // 3. Persist events
    await this.orderRepository.save(order)
  }
}

3. Event Store Design and Implementation

3.1 Event Store Schema

The core table structure for an event store is as follows:

CREATE TABLE events (
    -- Global unique identifier
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    -- Stream (Aggregate) identifier
    stream_id       VARCHAR(255) NOT NULL,
    -- Event sequence within the stream (used for optimistic concurrency)
    stream_version  BIGINT NOT NULL,
    -- Event type (used for deserialization)
    event_type      VARCHAR(255) NOT NULL,
    -- Event payload (JSON)
    event_data      JSONB NOT NULL,
    -- Metadata (correlation ID, user info, etc.)
    metadata        JSONB DEFAULT '{}',
    -- Event timestamp
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    -- Global ordering (used by projections to track overall sequence)
    global_position BIGSERIAL NOT NULL,

    -- Prevent version duplication within the same stream (optimistic concurrency)
    UNIQUE(stream_id, stream_version)
);

-- Index for stream-level event retrieval
CREATE INDEX idx_events_stream ON events(stream_id, stream_version);
-- Index for global event ordering (projections)
CREATE INDEX idx_events_global ON events(global_position);
-- Index for event type lookup
CREATE INDEX idx_events_type ON events(event_type);

3.2 TypeScript Event Store Implementation

import { Pool } from 'pg'

interface DomainEvent {
  eventType: string
  data: Record<string, unknown>
  metadata?: Record<string, unknown>
}

interface StoredEvent extends DomainEvent {
  eventId: string
  streamId: string
  streamVersion: number
  globalPosition: number
  createdAt: Date
}

class PostgresEventStore {
  constructor(private readonly pool: Pool) {}

  /**
   * Appends events to a stream.
   * Uses expectedVersion for optimistic concurrency control.
   */
  async appendToStream(
    streamId: string,
    expectedVersion: number,
    events: DomainEvent[]
  ): Promise<StoredEvent[]> {
    const client = await this.pool.connect()
    try {
      await client.query('BEGIN')

      // Optimistic concurrency check
      const versionCheck = await client.query(
        'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',
        [streamId]
      )
      const currentVersion = versionCheck.rows[0].current_version ?? -1

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current version is ${currentVersion}`
        )
      }

      const storedEvents: StoredEvent[] = []
      for (let i = 0; i < events.length; i++) {
        const event = events[i]
        const version = expectedVersion + i + 1

        const result = await client.query(
          `INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)
           VALUES ($1, $2, $3, $4, $5)
           RETURNING event_id, global_position, created_at`,
          [
            streamId,
            version,
            event.eventType,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata ?? {}),
          ]
        )

        storedEvents.push({
          eventId: result.rows[0].event_id,
          streamId,
          streamVersion: version,
          globalPosition: result.rows[0].global_position,
          createdAt: result.rows[0].created_at,
          ...event,
        })
      }

      await client.query('COMMIT')
      return storedEvents
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }

  /**
   * Reads all events from a stream in order.
   */
  async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE stream_id = $1 AND stream_version >= $2
       ORDER BY stream_version ASC`,
      [streamId, fromVersion]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  /**
   * Reads all events in global order (for projections).
   */
  async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE global_position > $1
       ORDER BY global_position ASC
       LIMIT $2`,
      [fromPosition, limit]
    )
    return result.rows.map(this.mapToStoredEvent)
  }

  private mapToStoredEvent(row: any): StoredEvent {
    return {
      eventId: row.event_id,
      streamId: row.stream_id,
      streamVersion: row.stream_version,
      globalPosition: row.global_position,
      eventType: row.event_type,
      data: row.event_data,
      metadata: row.metadata,
      createdAt: row.created_at,
    }
  }
}

3.3 Dedicated Event Store Solutions Comparison

AspectEventStoreDB (Kurrent)Axon ServerPostgreSQL Custom
TypeDedicated Event DBCQRS Framework ServerGeneral-purpose RDBMS
ProjectionsServer-side JavaScriptTracking ProcessorMust build manually
Subscription ModelCatch-up, PersistentTracking, SubscribingPolling, LISTEN/NOTIFY
ClusteringBuilt-in (Gossip protocol)Axon Server EnterpriseExternal solution needed
Learning CurveMediumHigh (entire framework)Low (SQL-based)
FlexibilityMediumLow (Axon ecosystem lock-in)High
Operational ComplexityMediumHighLow to Medium

4. Aggregate and Domain Event Implementation

4.1 Aggregate Base Class

In Event Sourcing, the Aggregate is the core unit that publishes events and reconstructs state by replaying events.

abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  private _version: number = -1

  get version(): number {
    return this._version
  }

  /**
   * Called externally to change state.
   * Internally creates and applies an event.
   */
  protected apply(event: DomainEvent): void {
    this.when(event)
    this.uncommittedEvents.push(event)
    this._version++
  }

  /**
   * Handler that changes internal state based on the event.
   * Implemented by subclasses.
   */
  protected abstract when(event: DomainEvent): void

  /**
   * Restores state by replaying stored events.
   */
  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this._version++
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }
}

4.2 Order Aggregate Implementation

// Domain Events
interface OrderCreated {
  eventType: 'OrderCreated'
  data: {
    orderId: string
    customerId: string
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
    totalAmount: number
  }
}

interface OrderConfirmed {
  eventType: 'OrderConfirmed'
  data: {
    orderId: string
    confirmedAt: string
  }
}

interface OrderCancelled {
  eventType: 'OrderCancelled'
  data: {
    orderId: string
    reason: string
    cancelledAt: string
  }
}

type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled

// Order Aggregate
class Order extends EventSourcedAggregate {
  private orderId!: string
  private customerId!: string
  private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []
  private totalAmount: number = 0
  private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'

  static create(
    orderId: string,
    customerId: string,
    items: Array<{ productId: string; quantity: number; unitPrice: number }>
  ): Order {
    const order = new Order()
    const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

    // Business rule validation
    if (items.length === 0) {
      throw new Error('An order must contain at least one item')
    }
    if (totalAmount <= 0) {
      throw new Error('Order total must be greater than zero')
    }

    order.apply({
      eventType: 'OrderCreated',
      data: { orderId, customerId, items, totalAmount },
    })

    return order
  }

  confirm(): void {
    if (this.status !== 'CREATED') {
      throw new Error('Only orders in CREATED status can be confirmed')
    }
    this.apply({
      eventType: 'OrderConfirmed',
      data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },
    })
  }

  cancel(reason: string): void {
    if (this.status === 'CANCELLED') {
      throw new Error('Order is already cancelled')
    }
    this.apply({
      eventType: 'OrderCancelled',
      data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },
    })
  }

  protected when(event: DomainEvent): void {
    const orderEvent = event as OrderEvent
    switch (orderEvent.eventType) {
      case 'OrderCreated':
        this.orderId = orderEvent.data.orderId
        this.customerId = orderEvent.data.customerId
        this.items = orderEvent.data.items
        this.totalAmount = orderEvent.data.totalAmount
        this.status = 'CREATED'
        break
      case 'OrderConfirmed':
        this.status = 'CONFIRMED'
        break
      case 'OrderCancelled':
        this.status = 'CANCELLED'
        break
    }
  }
}

5. Projections and Read Models

5.1 The Role of Projections

A Projection subscribes to event streams and builds read-optimized views (Read Models). Since querying directly from the Event Sourcing event stream is inefficient, we maintain separate denormalized views tailored to various query requirements.

5.2 Projection Engine Implementation

interface ProjectionCheckpoint {
  projectionName: string
  lastProcessedPosition: number
}

abstract class Projection {
  abstract readonly name: string

  abstract handle(event: StoredEvent): Promise<void>

  canHandle(eventType: string): boolean {
    return this.handledEventTypes().includes(eventType)
  }

  abstract handledEventTypes(): string[]
}

class ProjectionEngine {
  private projections: Projection[] = []

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly checkpointStore: CheckpointStore
  ) {}

  register(projection: Projection): void {
    this.projections.push(projection)
  }

  /**
   * Runs all registered projections.
   * Each projection resumes from its last processed position.
   */
  async run(): Promise<void> {
    while (true) {
      for (const projection of this.projections) {
        const checkpoint = await this.checkpointStore.get(projection.name)
        const lastPosition = checkpoint?.lastProcessedPosition ?? 0

        const events = await this.eventStore.readAll(lastPosition, 100)

        for (const event of events) {
          if (projection.canHandle(event.eventType)) {
            try {
              await projection.handle(event)
            } catch (error) {
              console.error(
                `Projection ${projection.name} failed at position ${event.globalPosition}:`,
                error
              )
              break
            }
          }
          await this.checkpointStore.save({
            projectionName: projection.name,
            lastProcessedPosition: event.globalPosition,
          })
        }
      }
      // Polling interval
      await new Promise((resolve) => setTimeout(resolve, 500))
    }
  }
}

5.3 Order Dashboard Read Model

class OrderDashboardProjection extends Projection {
  readonly name = 'order-dashboard'

  constructor(private readonly db: Pool) {
    super()
  }

  handledEventTypes(): string[] {
    return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']
  }

  async handle(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.db.query(
          `INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)
           VALUES ($1, $2, $3, 'CREATED', $4)
           ON CONFLICT (order_id) DO NOTHING`,
          [event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]
        )
        break

      case 'OrderConfirmed':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.confirmedAt]
        )
        break

      case 'OrderCancelled':
        await this.db.query(
          `UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,
          [event.data.orderId, event.data.reason]
        )
        break
    }
  }
}

6. Saga Pattern and Distributed Transactions

6.1 What Is a Saga?

The Saga pattern manages transactions spanning multiple services as a series of local transactions with compensating transactions. Unlike traditional 2PC (Two-Phase Commit), it avoids long-lived locks, providing superior scalability.

Two implementation approaches exist:

  • Choreography: Each service publishes events and other services react. No central coordinator.
  • Orchestration: A central orchestrator manages the overall flow and dispatches commands to each service.

6.2 Orchestration Saga Implementation

// Saga state definition
type OrderSagaStatus =
  | 'STARTED'
  | 'PAYMENT_PENDING'
  | 'PAYMENT_COMPLETED'
  | 'INVENTORY_RESERVED'
  | 'COMPLETED'
  | 'COMPENSATING'
  | 'FAILED'

interface SagaStep {
  name: string
  execute: () => Promise<void>
  compensate: () => Promise<void>
}

class OrderSaga {
  private status: OrderSagaStatus = 'STARTED'
  private completedSteps: SagaStep[] = []

  constructor(
    private readonly orderId: string,
    private readonly paymentService: PaymentService,
    private readonly inventoryService: InventoryService,
    private readonly sagaLog: SagaLogStore
  ) {}

  async execute(orderData: {
    customerId: string
    items: any[]
    totalAmount: number
  }): Promise<void> {
    const steps: SagaStep[] = [
      {
        name: 'ProcessPayment',
        execute: async () => {
          await this.paymentService.processPayment(
            this.orderId,
            orderData.customerId,
            orderData.totalAmount
          )
          this.status = 'PAYMENT_COMPLETED'
        },
        compensate: async () => {
          await this.paymentService.refundPayment(this.orderId)
        },
      },
      {
        name: 'ReserveInventory',
        execute: async () => {
          await this.inventoryService.reserve(this.orderId, orderData.items)
          this.status = 'INVENTORY_RESERVED'
        },
        compensate: async () => {
          await this.inventoryService.releaseReservation(this.orderId)
        },
      },
    ]

    for (const step of steps) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')
        await step.execute()
        await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')
        this.completedSteps.push(step)
      } catch (error) {
        await this.sagaLog.record(this.orderId, step.name, 'FAILED')
        console.error(`Saga step ${step.name} failed:`, error)

        // Execute compensating transactions (reverse order)
        this.status = 'COMPENSATING'
        await this.compensate()
        this.status = 'FAILED'
        return
      }
    }

    this.status = 'COMPLETED'
    await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')
  }

  private async compensate(): Promise<void> {
    // Compensate completed steps in reverse order
    const stepsToCompensate = [...this.completedSteps].reverse()
    for (const step of stepsToCompensate) {
      try {
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')
        await step.compensate()
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')
      } catch (compensateError) {
        // Compensation failure requires manual intervention
        await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')
        console.error(`Compensation failed for step ${step.name}:`, compensateError)
      }
    }
  }
}

6.3 Choreography vs Orchestration Comparison

AspectChoreographyOrchestration
CouplingLow (event-based)Medium (orchestrator dependency)
VisibilityLow (hard to trace flows)High (centralized flow management)
ComplexityGrows rapidly with service countConcentrated in orchestrator
Single Point of FailureNoneThe orchestrator
TestabilityDifficultRelatively easy
Best ForSimple flows across 2-3 servicesComplex flows across 4+ services

7. Snapshots and Event Compaction

7.1 Why Snapshots Are Needed

When an Aggregate accumulates thousands or tens of thousands of events, state reconstruction time increases dramatically. Snapshots are an optimization technique that serializes and stores the Aggregate state at a specific point, then replays only subsequent events.

interface Snapshot {
  streamId: string
  version: number
  state: Record<string, unknown>
  createdAt: Date
}

class SnapshotRepository {
  constructor(private readonly pool: Pool) {}

  async save(snapshot: Snapshot): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (stream_id, version, state, created_at)
       VALUES ($1, $2, $3, $4)
       ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
      [snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
    )
  }

  async load(streamId: string): Promise<Snapshot | null> {
    const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])
    return result.rows.length > 0 ? result.rows[0] : null
  }
}

class EventSourcedRepository<T extends EventSourcedAggregate> {
  private readonly SNAPSHOT_INTERVAL = 100 // Snapshot every 100 events

  constructor(
    private readonly eventStore: PostgresEventStore,
    private readonly snapshotRepo: SnapshotRepository,
    private readonly factory: () => T
  ) {}

  async load(streamId: string): Promise<T> {
    const aggregate = this.factory()

    // 1. Attempt to load snapshot
    const snapshot = await this.snapshotRepo.load(streamId)
    let fromVersion = 0

    if (snapshot) {
      ;(aggregate as any).restoreFromSnapshot(snapshot.state)
      ;(aggregate as any)._version = snapshot.version
      fromVersion = snapshot.version + 1
    }

    // 2. Replay only events after the snapshot
    const events = await this.eventStore.readStream(streamId, fromVersion)
    aggregate.loadFromHistory(events)

    return aggregate
  }

  async save(aggregate: T, streamId: string): Promise<void> {
    const uncommittedEvents = aggregate.getUncommittedEvents()
    if (uncommittedEvents.length === 0) return

    await this.eventStore.appendToStream(
      streamId,
      aggregate.version - uncommittedEvents.length,
      uncommittedEvents
    )

    // Check whether to create a snapshot
    if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotRepo.save({
        streamId,
        version: aggregate.version,
        state: (aggregate as any).toSnapshot(),
        createdAt: new Date(),
      })
    }

    aggregate.clearUncommittedEvents()
  }
}

7.2 Snapshot Strategy Guidelines

  • Creation frequency: Every 100 to 1,000 events (adjust based on domain characteristics)
  • Storage location: Same stream or separate store (separate store recommended)
  • Creation timing: Create asynchronously in the background to avoid impacting write latency
  • When replay time exceeds 100ms: Actively consider introducing snapshots

8. Event Sourcing vs Traditional CRUD

AspectEvent SourcingTraditional CRUD
Data StorageStores events (change records)Stores current state only
History TrackingComplete audit trail built inRequires separate audit tables
Time-Travel QueriesCan reconstruct state at any pointRequires separate snapshot tables
Query ComplexityHigh (separate Read Models needed)Low (direct SQL queries)
Consistency ModelPrimarily eventual consistencyImmediate consistency
Schema ChangesEvent versioning requiredALTER TABLE
DebuggingReproduce issues via event replayOnly current state visible
Learning CurveHighLow
Suitable DomainsFinance, orders, logistics (history matters)Config management, catalogs (simple CRUD)

9. Failure Scenarios and Recovery Procedures

9.1 Event Store Failure

Symptom: Command processing halts due to event storage failure

Recovery procedure:

  1. Check DB connection status and identify the root cause
  2. After event store recovery, retry unprocessed Commands
  3. Verify global position gaps -- if gaps exist, validate projection integrity

9.2 Projection Sync Lag

Symptom: Read Model does not reflect the latest state

Recovery procedure:

  1. Compare projection checkpoint with the event store's latest position
  2. Identify the lag cause (processing speed, failures, poison events, etc.)
  3. If needed, reset the projection and rebuild from scratch

9.3 Saga Compensation Failure

Symptom: External service failure during compensating transaction execution

Recovery procedure:

  1. Query the Saga log for COMPENSATION_FAILED status entries
  2. Manually retry the failed compensation step
  3. If retry count is exceeded, record in a Dead Letter Queue and escalate for manual intervention

9.4 Event Schema Changes

Symptom: Event structure changes cause deserialization failures for existing events

Recovery procedure:

  1. Use the Upcaster pattern to transform old-version events to the new version
  2. Modify projection handlers to support both versions
  3. Execute a full projection rebuild

10. Operational Checklist

Pre-Adoption Review

  • Does the domain truly need Event Sourcing? (History tracking, auditing, time-travel queries)
  • Can the system accept eventual consistency?
  • Is the team comfortable with DDD and event-driven thinking?

Design Phase

  • Do events carry business intent? (Domain behavior, not property changes)
  • Are Aggregate boundaries appropriate? (Not too large, not too small)
  • Is an event schema versioning strategy established?
  • Is a projection rebuild strategy in place?

Operations Phase

  • Event store capacity monitoring and archiving strategy
  • Projection lag time monitoring
  • Saga failure rate and compensation transaction success rate tracking
  • Snapshot creation frequency and disk usage management
  • Per-stream event count monitoring (detect abnormal growth)

Failure Preparedness

  • Document full projection rebuild procedure and estimated completion time
  • Establish Saga Dead Letter Queue processing workflow
  • Verify event store backup and recovery procedures
  • Automate Upcaster testing

Conclusion

Event Sourcing and CQRS are powerful architectural patterns, but they are not suitable for every system. They deliver the greatest value in domains where history tracking, audit logs, and time-travel queries are core requirements -- such as finance, order management, and logistics.

If you are considering adoption, here are our recommendations:

  1. Apply selectively to core domains only. Not every service needs Event Sourcing.
  2. Design for projection rebuilds from the start. Rebuilding projections in production is inevitable.
  3. Establish an event schema evolution strategy first. Events are stored forever, making schema change the highest-cost operation.
  4. Prepare for Saga compensation failures. In distributed systems, compensation failure is a matter of when, not if.

If your team is ready to embrace eventual consistency and shift to event-driven thinking, Event Sourcing + CQRS can fundamentally improve your system's resilience and scalability.