Skip to content
Published on

Event-Driven Architecture Patterns Guide 2025: CQRS, Event Sourcing, Saga Pattern

Authors

Table of Contents

1. Event-Driven Architecture Fundamentals

Event-Driven Architecture (EDA) is a software architecture pattern where system components communicate through events. Instead of direct service-to-service calls, services publish and subscribe to events, achieving loose coupling and high scalability.

1.1 Core Concepts

Event: A meaningful fact that occurred in the system. Named in past tense. Examples: OrderPlaced, PaymentCompleted, InventoryReserved.

Event Producer: A component that creates and publishes events.

Event Consumer: A component that subscribes to and processes events.

Event Broker: Infrastructure that delivers events between producers and consumers. Examples include Kafka, RabbitMQ, and AWS EventBridge.

1.2 Event Types

Event Types
├── Domain Event
│   └── A fact that occurred in the business domain
│       Example: OrderPlaced, PaymentFailed
├── Integration Event
│   └── Events for cross-bounded context communication
│       Example: OrderConfirmedIntegrationEvent
├── Event Notification
│   └── Contains minimal data. Consumer queries if needed
│       Example: OrderStatusChanged (contains only orderId)
└── Event-Carried State Transfer
    └── Contains full state. No additional query needed
        Example: OrderDetails (includes full order info)

1.3 Pub/Sub Pattern

// TypeScript - Event Bus Interface
interface EventBus {
  publish<T extends DomainEvent>(event: T): Promise<void>;
  subscribe<T extends DomainEvent>(
    eventType: string,
    handler: EventHandler<T>
  ): void;
  unsubscribe(eventType: string, handler: EventHandler<unknown>): void;
}

interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  occurredAt: Date;
  version: number;
  metadata: Record<string, string>;
}

interface EventHandler<T extends DomainEvent> {
  handle(event: T): Promise<void>;
}

2. Domain Events

2.1 Domain Event Design Principles

Good domain events follow these principles.

Immutable: Events represent facts that already happened, so they cannot be changed.

Past Tense Naming: OrderPlaced (O), PlaceOrder (X). Events describe what already happened.

Self-describing: The event alone should convey what happened.

Business Meaning: Name from a business perspective, not technical implementation.

2.2 Domain Event Implementation

// Base Domain Event
abstract class BaseDomainEvent implements DomainEvent {
  readonly eventId: string;
  readonly occurredAt: Date;
  readonly version: number;
  abstract readonly eventType: string;
  abstract readonly aggregateId: string;
  metadata: Record<string, string> = {};

  constructor() {
    this.eventId = crypto.randomUUID();
    this.occurredAt = new Date();
    this.version = 1;
  }
}

// Order Placed Event
class OrderPlaced extends BaseDomainEvent {
  readonly eventType = 'order.placed';
  readonly aggregateId: string;

  constructor(
    public readonly orderId: string,
    public readonly customerId: string,
    public readonly items: OrderItem[],
    public readonly totalAmount: Money,
    public readonly shippingAddress: Address
  ) {
    super();
    this.aggregateId = orderId;
  }
}

// Payment Completed Event
class PaymentCompleted extends BaseDomainEvent {
  readonly eventType = 'payment.completed';
  readonly aggregateId: string;

  constructor(
    public readonly paymentId: string,
    public readonly orderId: string,
    public readonly amount: Money,
    public readonly paymentMethod: string
  ) {
    super();
    this.aggregateId = paymentId;
  }
}

// Inventory Reserved Event
class InventoryReserved extends BaseDomainEvent {
  readonly eventType = 'inventory.reserved';
  readonly aggregateId: string;

  constructor(
    public readonly reservationId: string,
    public readonly orderId: string,
    public readonly items: ReservedItem[]
  ) {
    super();
    this.aggregateId = reservationId;
  }
}

2.3 Event Schema Design

// Event Schema (Avro-like)
const OrderPlacedSchema = {
  type: 'record',
  name: 'OrderPlaced',
  namespace: 'com.ecommerce.orders.v1',
  fields: [
    { name: 'eventId', type: 'string' },
    { name: 'eventType', type: 'string' },
    { name: 'aggregateId', type: 'string' },
    { name: 'occurredAt', type: 'string' },  // ISO 8601
    { name: 'version', type: 'int' },
    { name: 'orderId', type: 'string' },
    { name: 'customerId', type: 'string' },
    {
      name: 'items',
      type: {
        type: 'array',
        items: {
          type: 'record',
          name: 'OrderItem',
          fields: [
            { name: 'productId', type: 'string' },
            { name: 'quantity', type: 'int' },
            { name: 'unitPrice', type: 'long' },
          ],
        },
      },
    },
    { name: 'totalAmount', type: 'long' },
    { name: 'currency', type: 'string' },
  ],
};

3. Event Sourcing

Event Sourcing stores application state as a sequence of events. Instead of storing current state directly, it records the entire history of state changes.

3.1 Core Concept

Traditional Approach (CRUD):
┌─────────────────────────┐
│ orders table            │
│ id: ORD-001             │
│ status: shipped         │ ← Only current state stored
│ total: 50000            │
│ updated_at: 2025-03-24  │
└─────────────────────────┘

Event Sourcing Approach:
┌─────────────────────────────────────────┐
│ event_store                             │
│ 1. OrderPlaced    (ORD-001, 2025-03-24) │
│ 2. PaymentReceived(ORD-001, 2025-03-24) │
│ 3. ItemPacked     (ORD-001, 2025-03-24) │
│ 4. OrderShipped   (ORD-001, 2025-03-24) │ ← Full history preserved
└─────────────────────────────────────────┘

3.2 Event Store Implementation

// Event Store Interface
interface EventStore {
  // Append events (optimistic concurrency control)
  append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void>;

  // Read events from stream
  readStream(
    streamId: string,
    fromVersion?: number
  ): Promise<DomainEvent[]>;

  // Subscribe to all events
  subscribe(
    handler: (event: DomainEvent) => Promise<void>
  ): void;
}

// PostgreSQL-based Event Store
class PostgresEventStore implements EventStore {
  async append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // Optimistic concurrency check
      const result = await client.query(
        'SELECT MAX(version) as current_version FROM events WHERE stream_id = $1',
        [streamId]
      );
      const currentVersion = result.rows[0]?.current_version ?? 0;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current is ${currentVersion}`
        );
      }

      // Store events
      let version = expectedVersion;
      for (const event of events) {
        version++;
        await client.query(
          `INSERT INTO events (stream_id, version, event_type, data, metadata, created_at)
           VALUES ($1, $2, $3, $4, $5, NOW())`,
          [streamId, version, event.eventType, JSON.stringify(event), JSON.stringify(event.metadata)]
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async readStream(streamId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      'SELECT data FROM events WHERE stream_id = $1 AND version > $2 ORDER BY version ASC',
      [streamId, fromVersion]
    );
    return result.rows.map(row => JSON.parse(row.data));
  }
}

3.3 Aggregate with Event Sourcing

// Event Sourced Aggregate base class
abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = [];
  version: number = 0;

  protected apply(event: DomainEvent): void {
    this.when(event);
    this.version++;
    this.uncommittedEvents.push(event);
  }

  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event);
      this.version++;
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = [];
  }

  protected abstract when(event: DomainEvent): void;
}

// Order Aggregate
class Order extends EventSourcedAggregate {
  private id!: string;
  private customerId!: string;
  private items!: OrderItem[];
  private totalAmount!: Money;
  private status!: OrderStatus;

  // Command: Place Order
  static place(orderId: string, customerId: string, items: OrderItem[], total: Money): Order {
    const order = new Order();
    order.apply(new OrderPlaced(orderId, customerId, items, total, new Address()));
    return order;
  }

  // Command: Confirm Payment
  confirmPayment(paymentId: string): void {
    if (this.status !== OrderStatus.PLACED) {
      throw new InvalidOperationError('Can only confirm payment for placed orders');
    }
    this.apply(new OrderPaymentConfirmed(this.id, paymentId));
  }

  // Command: Cancel Order
  cancel(reason: string): void {
    if (this.status === OrderStatus.CANCELLED) {
      throw new InvalidOperationError('Order is already cancelled');
    }
    this.apply(new OrderCancelled(this.id, reason));
  }

  // Apply events to state
  protected when(event: DomainEvent): void {
    if (event instanceof OrderPlaced) {
      this.id = event.orderId;
      this.customerId = event.customerId;
      this.items = event.items;
      this.totalAmount = event.totalAmount;
      this.status = OrderStatus.PLACED;
    } else if (event instanceof OrderPaymentConfirmed) {
      this.status = OrderStatus.CONFIRMED;
    } else if (event instanceof OrderCancelled) {
      this.status = OrderStatus.CANCELLED;
    }
  }
}

3.4 Snapshots

As events accumulate, Aggregate restoration becomes slow. Snapshots optimize this.

class SnapshotStore {
  async saveSnapshot(
    streamId: string,
    snapshot: AggregateSnapshot
  ): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (stream_id, version, data, created_at)
       VALUES ($1, $2, $3, NOW())
       ON CONFLICT (stream_id) DO UPDATE SET version = $2, data = $3, created_at = NOW()`,
      [streamId, snapshot.version, JSON.stringify(snapshot.state)]
    );
  }

  async getLatestSnapshot(streamId: string): Promise<AggregateSnapshot | null> {
    const result = await this.pool.query(
      'SELECT version, data FROM snapshots WHERE stream_id = $1',
      [streamId]
    );
    if (result.rows.length === 0) return null;
    return {
      version: result.rows[0].version,
      state: JSON.parse(result.rows[0].data),
    };
  }
}

// Aggregate loading with snapshots
class OrderRepository {
  async getById(orderId: string): Promise<Order> {
    const order = new Order();

    // 1. Load snapshot
    const snapshot = await this.snapshotStore.getLatestSnapshot(orderId);
    let fromVersion = 0;

    if (snapshot) {
      order.restoreFromSnapshot(snapshot.state);
      fromVersion = snapshot.version;
    }

    // 2. Load only events after snapshot
    const events = await this.eventStore.readStream(orderId, fromVersion);
    order.loadFromHistory(events);

    // 3. Create snapshot after certain number of events
    if (events.length > 50) {
      await this.snapshotStore.saveSnapshot(orderId, {
        version: order.version,
        state: order.toSnapshot(),
      });
    }

    return order;
  }
}

3.5 Projections (Read Model)

// Projection that builds read model from events
class OrderProjection {
  constructor(private readDb: Pool) {}

  async handle(event: DomainEvent): Promise<void> {
    if (event instanceof OrderPlaced) {
      await this.readDb.query(
        `INSERT INTO order_read_model (id, customer_id, status, total_amount, created_at, version)
         VALUES ($1, $2, 'PLACED', $3, $4, $5)`,
        [event.orderId, event.customerId, event.totalAmount.amount, event.occurredAt, event.version]
      );

      for (const item of event.items) {
        await this.readDb.query(
          `INSERT INTO order_items_read_model (order_id, product_id, quantity, unit_price)
           VALUES ($1, $2, $3, $4)`,
          [event.orderId, item.productId, item.quantity, item.unitPrice]
        );
      }
    } else if (event instanceof OrderPaymentConfirmed) {
      await this.readDb.query(
        `UPDATE order_read_model SET status = 'CONFIRMED', version = $2 WHERE id = $1`,
        [event.orderId, event.version]
      );
    } else if (event instanceof OrderCancelled) {
      await this.readDb.query(
        `UPDATE order_read_model SET status = 'CANCELLED', version = $2 WHERE id = $1`,
        [event.orderId, event.version]
      );
    }
  }
}

4. CQRS (Command Query Responsibility Segregation)

CQRS separates the write model (Command) and read model (Query) to optimize each independently.

4.1 CQRS Architecture

┌─────────────────────────────────────────────────┐
│                   Client                        │
│         ┌──────────┐  ┌──────────┐              │
│         │  Write   │  │  Read    │              │
│         │  Request │  │  Request │              │
│         └────┬─────┘  └─────┬────┘              │
└──────────────┼──────────────┼───────────────────┘
               │              │
    ┌──────────▼──────┐  ┌────▼──────────┐
    │  Command Side   │  │  Query Side   │
    │                 │  │               │
    │  Command Handler│  │  Query Handler│
    │       │         │  │       │       │
    │  ┌────▼──────┐  │  │  ┌────▼────┐  │
    │  │ Domain    │  │  │  │ Read DB │  │
    │  │ Model     │  │  │  │ (Denorm)│  │
    │  └────┬──────┘  │  │  └─────────┘  │
    │  ┌────▼──────┐  │  │       ▲       │
    │  │ Write DB  │  │  │       │       │
    │  └────┬──────┘  │  │  Projection   │
    └───────┼─────────┘  └───────┼───────┘
            │                    │
            └─── Events ─────────┘

4.2 Command Side Implementation

// Command definition
class PlaceOrderCommand {
  constructor(
    public readonly customerId: string,
    public readonly items: OrderItemDto[],
    public readonly shippingAddress: AddressDto
  ) {}
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private orderRepo: OrderRepository,
    private eventBus: EventBus
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // Business logic
    const orderId = generateOrderId();
    const items = command.items.map(i => new OrderItem(i.productId, i.quantity, i.unitPrice));
    const total = items.reduce((sum, i) => sum + i.quantity * i.unitPrice, 0);

    // Create aggregate and apply events
    const order = Order.place(orderId, command.customerId, items, new Money(total, 'USD'));

    // Save to event store
    await this.orderRepo.save(order);

    // Publish events
    for (const event of order.getUncommittedEvents()) {
      await this.eventBus.publish(event);
    }

    return orderId;
  }
}

// Command Bus
class CommandBus {
  private handlers = new Map<string, CommandHandler<unknown>>();

  register<T extends Command>(commandType: string, handler: CommandHandler<T>): void {
    this.handlers.set(commandType, handler);
  }

  async dispatch<T extends Command>(command: T): Promise<unknown> {
    const handler = this.handlers.get(command.constructor.name);
    if (!handler) {
      throw new Error(`No handler for ${command.constructor.name}`);
    }
    return handler.handle(command);
  }
}

4.3 Query Side Implementation

// Query definition
class GetOrderDetailsQuery {
  constructor(public readonly orderId: string) {}
}

class ListCustomerOrdersQuery {
  constructor(
    public readonly customerId: string,
    public readonly page: number = 1,
    public readonly pageSize: number = 20
  ) {}
}

// Query Handler
class GetOrderDetailsHandler {
  constructor(private readDb: Pool) {}

  async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsDto> {
    const result = await this.readDb.query(
      `SELECT o.*, json_agg(i.*) as items
       FROM order_read_model o
       JOIN order_items_read_model i ON o.id = i.order_id
       WHERE o.id = $1
       GROUP BY o.id`,
      [query.orderId]
    );

    if (result.rows.length === 0) {
      throw new NotFoundError(`Order ${query.orderId} not found`);
    }

    return this.mapToDto(result.rows[0]);
  }
}

// Eventual Consistency
// Read model updates after event processing, so slight delays may occur
class OrderQueryService {
  async getOrderWithConsistencyCheck(
    orderId: string,
    expectedVersion?: number
  ): Promise<OrderDetailsDto> {
    const order = await this.getOrderDetails(orderId);

    // Check if specific version is required
    if (expectedVersion && order.version < expectedVersion) {
      // Projection hasn't caught up yet
      // Wait briefly and retry, or query directly from Command side
      await this.waitForProjection(orderId, expectedVersion);
      return this.getOrderDetails(orderId);
    }

    return order;
  }
}

5. Saga Pattern

The Saga pattern manages distributed transactions as a sequence of local transactions.

5.1 Choreography vs Orchestration

Choreography:
Services collaborate autonomously through events

Order Service ──OrderPlaced──> Payment Service
                               ──PaymentCompleted──> Inventory Service
                                                     ──InventoryReserved──> Shipping Service

Pros: Low coupling, no single point of failure
Cons: Hard to track complex workflows, possible cyclic dependencies

─────────────────────────────────────────────────

Orchestration:
A central orchestrator controls the workflow

                   ┌──────────────────┐
                   │  Order Saga      │
                   │  Orchestrator    │
                   └───────┬──────────┘
          ┌────────────────┼────────────────┐
          │                │                │
   ┌──────▼──────┐  ┌─────▼───────┐  ┌─────▼───────┐
   │  Payment    │  │  Inventory  │  │  Shipping   │
   │  Service    │  │  Service    │  │  Service    │
   └─────────────┘  └─────────────┘  └─────────────┘

Pros: Easy to track flow, testable
Cons: Orchestrator is single point of failure

5.2 Orchestration Saga Implementation

// Saga State
enum SagaState {
  STARTED = 'STARTED',
  PAYMENT_PENDING = 'PAYMENT_PENDING',
  INVENTORY_PENDING = 'INVENTORY_PENDING',
  SHIPPING_PENDING = 'SHIPPING_PENDING',
  COMPLETED = 'COMPLETED',
  COMPENSATING = 'COMPENSATING',
  COMPENSATED = 'COMPENSATED',
  FAILED = 'FAILED',
}

// Order Processing Saga
class OrderProcessingSaga {
  private state: SagaState = SagaState.STARTED;
  private orderId!: string;
  private paymentId?: string;
  private reservationId?: string;

  constructor(
    private sagaStore: SagaStore,
    private commandBus: CommandBus
  ) {}

  // Start: when order is placed
  async start(event: OrderPlaced): Promise<void> {
    this.orderId = event.orderId;
    this.state = SagaState.PAYMENT_PENDING;
    await this.sagaStore.save(this);

    // Step 1: Request payment
    await this.commandBus.dispatch(new ProcessPaymentCommand(
      event.orderId, event.totalAmount
    ));
  }

  async handlePaymentCompleted(event: PaymentCompleted): Promise<void> {
    this.paymentId = event.paymentId;
    this.state = SagaState.INVENTORY_PENDING;
    await this.sagaStore.save(this);

    // Step 2: Reserve inventory
    await this.commandBus.dispatch(new ReserveInventoryCommand(
      this.orderId, event.paymentId
    ));
  }

  async handlePaymentFailed(event: PaymentFailed): Promise<void> {
    this.state = SagaState.COMPENSATING;
    await this.sagaStore.save(this);

    // Compensate: Cancel order
    await this.commandBus.dispatch(new CancelOrderCommand(
      this.orderId, 'Payment failed'
    ));

    this.state = SagaState.COMPENSATED;
    await this.sagaStore.save(this);
  }

  async handleInventoryReserved(event: InventoryReserved): Promise<void> {
    this.reservationId = event.reservationId;
    this.state = SagaState.SHIPPING_PENDING;
    await this.sagaStore.save(this);

    // Step 3: Request shipping
    await this.commandBus.dispatch(new CreateShipmentCommand(
      this.orderId, this.reservationId
    ));
  }

  async handleInventoryFailed(event: InventoryReservationFailed): Promise<void> {
    this.state = SagaState.COMPENSATING;
    await this.sagaStore.save(this);

    // Compensate: Refund payment then cancel order
    await this.commandBus.dispatch(new RefundPaymentCommand(
      this.orderId, this.paymentId!
    ));
    await this.commandBus.dispatch(new CancelOrderCommand(
      this.orderId, 'Inventory unavailable'
    ));

    this.state = SagaState.COMPENSATED;
    await this.sagaStore.save(this);
  }

  async handleShipmentCreated(event: ShipmentCreated): Promise<void> {
    this.state = SagaState.COMPLETED;
    await this.sagaStore.save(this);

    // Confirm order
    await this.commandBus.dispatch(new ConfirmOrderCommand(this.orderId));
  }
}

6. Outbox Pattern

6.1 The Dual-Write Problem

Database updates and message publishing must be performed atomically. The Outbox pattern solves this.

Problem Scenario:
1. Save order to DB ✓
2. Publish event to Kafka ✗  ← If this fails, data inconsistency!

Outbox Pattern:
1. Within DB transaction:
   - Save order ✓
   - Save event to outbox table ✓
2. Separate process reads from outbox and publishes to Kafka

6.2 Outbox Table Schema

CREATE TABLE outbox (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_type VARCHAR(255) NOT NULL,
  aggregate_id VARCHAR(255) NOT NULL,
  event_type VARCHAR(255) NOT NULL,
  payload JSONB NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  published_at TIMESTAMP,
  retry_count INT DEFAULT 0,
  last_error TEXT
);

CREATE INDEX idx_outbox_unpublished ON outbox (created_at)
  WHERE published_at IS NULL;

6.3 Outbox Implementation

class OutboxService {
  async saveWithOutbox(
    order: Order,
    events: DomainEvent[],
    client: PoolClient
  ): Promise<void> {
    // Save order and events in same transaction
    await client.query(
      'INSERT INTO orders (id, customer_id, status, total) VALUES ($1, $2, $3, $4)',
      [order.id, order.customerId, order.status, order.total]
    );

    for (const event of events) {
      await client.query(
        `INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
         VALUES ($1, $2, $3, $4)`,
        ['Order', order.id, event.eventType, JSON.stringify(event)]
      );
    }
  }
}

// Outbox Relay: periodically reads outbox and publishes
class OutboxRelay {
  async processOutbox(): Promise<void> {
    const client = await this.pool.connect();
    try {
      // Read unpublished events
      const result = await client.query(
        `SELECT * FROM outbox
         WHERE published_at IS NULL AND retry_count < 5
         ORDER BY created_at ASC
         LIMIT 100
         FOR UPDATE SKIP LOCKED`
      );

      for (const row of result.rows) {
        try {
          await this.kafka.produce(
            `events.${row.aggregate_type.toLowerCase()}`,
            row.aggregate_id,
            row.payload
          );

          // Mark as published
          await client.query(
            'UPDATE outbox SET published_at = NOW() WHERE id = $1',
            [row.id]
          );
        } catch (error) {
          // Increment failure count
          await client.query(
            'UPDATE outbox SET retry_count = retry_count + 1, last_error = $1 WHERE id = $2',
            [error.message, row.id]
          );
        }
      }
    } finally {
      client.release();
    }
  }
}

6.4 CDC (Change Data Capture) with Debezium

# Debezium Connector Configuration
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "orderdb",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "events.orders"
  }
}

7. Idempotency

7.1 Why Idempotency Matters

In event-driven systems, messages may be delivered multiple times (at-least-once delivery). Idempotency ensures processing the same event multiple times produces the same result.

7.2 Idempotency Key Implementation

class IdempotencyStore {
  async isProcessed(idempotencyKey: string): Promise<boolean> {
    const result = await this.pool.query(
      'SELECT 1 FROM processed_events WHERE idempotency_key = $1',
      [idempotencyKey]
    );
    return result.rows.length > 0;
  }

  async markProcessed(
    idempotencyKey: string,
    result: unknown,
    client: PoolClient
  ): Promise<void> {
    await client.query(
      `INSERT INTO processed_events (idempotency_key, result, processed_at)
       VALUES ($1, $2, NOW())
       ON CONFLICT (idempotency_key) DO NOTHING`,
      [idempotencyKey, JSON.stringify(result)]
    );
  }
}

// Idempotent Event Handler
class IdempotentEventHandler<T extends DomainEvent> {
  constructor(
    private idempotencyStore: IdempotencyStore,
    private innerHandler: EventHandler<T>
  ) {}

  async handle(event: T): Promise<void> {
    const key = `${event.eventType}:${event.eventId}`;

    // Check if event already processed
    if (await this.idempotencyStore.isProcessed(key)) {
      console.log(`Event ${key} already processed, skipping`);
      return;
    }

    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      // Process event
      await this.innerHandler.handle(event);

      // Mark as processed
      await this.idempotencyStore.markProcessed(key, null, client);

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }
}

7.3 Idempotency Strategies

StrategyDescriptionExample
Idempotency KeyStore processed event IDsEvent ID based deduplication
Conditional UpdateUpdate only if conditions metWHERE version = expected
UpsertINSERT ON CONFLICT DO UPDATENatural idempotency
Deduplication LogRecord processed messagesConsumer-side dedup table

8. Eventual Consistency

8.1 Consistency Models

Strong Consistency:
Writer ──Write──> DB ──Read──> Reader
                  └── Always sees latest ──┘

Eventual Consistency:
Writer ──Write──> Write DB ──Event──> Read DB ──Read──> Reader
                                      └── May see stale data temporarily ──┘

8.2 Handling Eventual Consistency

// Strategy 1: Optimistic UI Update
// After write, update UI immediately without waiting for read model
class OrderController {
  async placeOrder(req: Request, res: Response): Promise<void> {
    const orderId = await this.commandBus.dispatch(
      new PlaceOrderCommand(req.body)
    );

    // Return orderId immediately
    // Client uses this to poll or subscribe for updates
    res.json({
      orderId,
      status: 'PROCESSING',
      message: 'Order is being processed'
    });
  }
}

// Strategy 2: Version-based consistency check
class OrderApiController {
  async getOrder(req: Request, res: Response): Promise<void> {
    const minVersion = req.headers['x-min-version']
      ? parseInt(req.headers['x-min-version'] as string)
      : undefined;

    const order = await this.queryService.getOrderWithConsistencyCheck(
      req.params.orderId,
      minVersion
    );

    res.json(order);
  }
}

// Strategy 3: Subscription-based updates
class OrderEventsWebSocket {
  handleConnection(client: WebSocket, orderId: string): void {
    this.eventBus.subscribe('order.*', async (event: DomainEvent) => {
      if (event.aggregateId === orderId) {
        client.send(JSON.stringify({
          type: event.eventType,
          data: event,
        }));
      }
    });
  }
}

9. Schema Evolution and Compatibility

9.1 Event Versioning

// Schema evolution strategy
interface OrderPlacedV1 {
  orderId: string;
  customerId: string;
  totalAmount: number;
}

interface OrderPlacedV2 extends OrderPlacedV1 {
  currency: string;        // New field (default: 'USD')
  shippingMethod: string;  // New field (default: 'STANDARD')
}

// Event upcaster: converts old versions to new versions
class OrderPlacedUpcaster {
  upcast(event: OrderPlacedV1): OrderPlacedV2 {
    return {
      ...event,
      currency: 'USD',
      shippingMethod: 'STANDARD',
    };
  }
}

9.2 Schema Compatibility Types

TypeDescriptionUpgrade Strategy
BACKWARDRead old data with new schemaUpgrade consumers first
FORWARDRead new data with old schemaUpgrade producers first
FULLBidirectional compatibilitySafest, recommended

10. Kafka + TypeScript Production Implementation

10.1 Kafka Producer

import { Kafka, Partitioners } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  retry: { retries: 5 },
});

const producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
  idempotent: true,
  maxInFlightRequests: 5,
  transactionalId: 'order-producer-1',
});

async function publishOrderEvent(event: OrderPlaced): Promise<void> {
  await producer.send({
    topic: 'events.orders',
    messages: [
      {
        key: event.orderId,
        value: JSON.stringify(event),
        headers: {
          'event-type': event.eventType,
          'event-id': event.eventId,
          'correlation-id': event.metadata.correlationId || '',
        },
      },
    ],
  });
}

10.2 Kafka Consumer

const consumer = kafka.consumer({
  groupId: 'payment-service-group',
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
});

async function startConsumer(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({
    topics: ['events.orders'],
    fromBeginning: false,
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value!.toString());
      const eventType = message.headers?.['event-type']?.toString();

      console.log(`Received: ${eventType} from ${topic}[${partition}]`);

      try {
        switch (eventType) {
          case 'order.placed':
            await paymentHandler.onOrderPlaced(event);
            break;
          case 'order.cancelled':
            await paymentHandler.onOrderCancelled(event);
            break;
          default:
            console.log(`Unknown event type: ${eventType}`);
        }
      } catch (error) {
        console.error(`Failed to process event: ${error}`);
        // DLQ handling or retry
      }
    },
  });
}

10.3 Dead Letter Queue (DLQ) Pattern

class DeadLetterQueueHandler {
  private dlqProducer: Producer;

  async handleFailedMessage(
    originalTopic: string,
    message: KafkaMessage,
    error: Error
  ): Promise<void> {
    await this.dlqProducer.send({
      topic: `${originalTopic}.dlq`,
      messages: [
        {
          key: message.key,
          value: message.value,
          headers: {
            ...message.headers,
            'dlq-reason': error.message,
            'dlq-timestamp': new Date().toISOString(),
            'original-topic': originalTopic,
            'retry-count': String(
              parseInt(message.headers?.['retry-count']?.toString() || '0') + 1
            ),
          },
        },
      ],
    });
  }
}

11. RabbitMQ Implementation

11.1 Exchange Types and Routing

Exchange Types:
├── Direct Exchange
│   └── Routes by exact routing key match
│       Use case: Specific service targeting
├── Topic Exchange
│   └── Routes by routing key pattern (*.order.#)
│       Use case: Flexible event routing
├── Fanout Exchange
│   └── Broadcasts to all bound queues
│       Use case: Event notification to all consumers
└── Headers Exchange
    └── Routes by message header matching
        Use case: Complex routing logic

11.2 RabbitMQ with Node.js

import amqp from 'amqplib';

class RabbitMQEventBus {
  private connection!: amqp.Connection;
  private channel!: amqp.Channel;

  async connect(): Promise<void> {
    this.connection = await amqp.connect('amqp://localhost');
    this.channel = await this.connection.createChannel();

    // Declare exchange
    await this.channel.assertExchange('domain-events', 'topic', {
      durable: true,
    });
  }

  async publish(event: DomainEvent): Promise<void> {
    const routingKey = `events.${event.eventType}`;
    this.channel.publish(
      'domain-events',
      routingKey,
      Buffer.from(JSON.stringify(event)),
      {
        persistent: true,
        messageId: event.eventId,
        contentType: 'application/json',
        headers: {
          'event-type': event.eventType,
          'aggregate-id': event.aggregateId,
        },
      }
    );
  }

  async subscribe(pattern: string, handler: (event: DomainEvent) => Promise<void>): Promise<void> {
    const queue = await this.channel.assertQueue('', { exclusive: true });
    await this.channel.bindQueue(queue.queue, 'domain-events', pattern);

    await this.channel.consume(queue.queue, async (msg) => {
      if (!msg) return;
      try {
        const event = JSON.parse(msg.content.toString());
        await handler(event);
        this.channel.ack(msg);
      } catch (error) {
        // Reject and requeue or send to DLQ
        this.channel.nack(msg, false, false);
      }
    });
  }
}

12. Common Anti-Patterns

12.1 Overly Granular Events

Excessively fine-grained events increase system complexity. Design events at the business meaning level.

12.2 Tight Coupling Through Events

Including too much data in events creates dependency on the producer's internal structure. Events are public contracts and should be designed carefully.

12.3 Synchronous Event Processing

Calling other services synchronously in event handlers defeats the purpose of EDA. Maintain asynchronous processing.

12.4 Ignoring Idempotency

Failing to consider "at-least-once" delivery guarantees leads to duplicate processing and data inconsistency.

12.5 Missing Correlation IDs

Without correlation IDs, tracing a request across multiple services becomes nearly impossible. Always propagate correlation IDs through events.

12.6 No Dead Letter Queue

Without DLQ, failed messages are lost or block the queue. Always implement a DLQ strategy for handling poison messages.

13. EDA Pattern Selection Guide

ScenarioRecommended Pattern
Audit trail requiredEvent Sourcing
Read/write ratio heavily skewedCQRS
Multi-service transactionsSaga Pattern
DB + message broker atomicityOutbox Pattern
High throughput event streamingKafka
Complex routing logicRabbitMQ
Simple event notificationEvent Notification + SNS/SQS

14. Quiz

Q1: Why are snapshots needed in Event Sourcing?

Answer: Because Aggregate restoration time grows as events accumulate

In Event Sourcing, restoring the current state of an Aggregate requires replaying all events from the beginning. When events number in the thousands or tens of thousands, restoration time increases dramatically. Snapshots store the Aggregate state at a specific point, so only events after that point need to be replayed.

Q2: What are the pros and cons of Choreography Saga vs Orchestration Saga?

Answer:

Choreography: Pros - Suitable for simple workflows, low coupling between services, no single point of failure. Cons - Difficult to track complex workflows, possible cyclic dependencies, harder to test.

Orchestration: Pros - Suitable for complex workflows, entire flow visible in one place, easier to test. Cons - Orchestrator becomes single point of failure, risk of logic concentration in orchestrator.

Q3: What is the "dual-write problem" that the Outbox pattern solves?

Answer: The inability to guarantee atomicity when writing to both a database and a message broker simultaneously

Saving data to DB and publishing events to Kafka are operations on different systems and cannot be wrapped in a single transaction. If DB save succeeds but Kafka publish fails, data inconsistency occurs. The Outbox pattern stores events in an outbox table within the same DB transaction, and a separate process reads and publishes them to the broker, guaranteeing atomicity.

Q4: What does "Eventual Consistency" mean in CQRS, and how do you handle it?

Answer: Changes in the write model are not immediately reflected in the read model

In CQRS, changes from the Command side are propagated asynchronously to the Query side through events. Reading immediately after writing may show stale state. Handling strategies include applying optimistic updates in the UI, querying directly from the Command side after specific operations, or using version-based conditional queries.

Q5: Why is idempotency critical in event-driven systems?

Answer: Because "at-least-once" delivery guarantees mean the same event may be delivered multiple times

Most message brokers guarantee "at-least-once" delivery. Network issues, consumer restarts, etc. can cause the same event to be delivered multiple times. Without idempotency, the same payment could be processed twice, or inventory could be deducted multiple times. Idempotency keys, deduplication, and conditional updates prevent these issues.

15. References

  1. Martin Fowler - Event Sourcing - Event Sourcing concepts
  2. Microsoft - CQRS Pattern - CQRS pattern guide
  3. Microservices.io - Saga Pattern - Saga pattern details
  4. Debezium Documentation - CDC tool official docs
  5. KafkaJS Documentation - Node.js Kafka client
  6. Apache Kafka Documentation - Kafka official docs
  7. Confluent Schema Registry - Schema Registry
  8. EventStoreDB - Dedicated event store database
  9. Axon Framework - Java CQRS/ES framework
  10. NestJS CQRS - NestJS CQRS recipe
  11. Domain-Driven Design Reference - DDD reference
  12. Microservices Patterns by Chris Richardson - Microservices pattern catalog
  13. RabbitMQ Documentation - RabbitMQ official docs