Skip to content
Published on

Event-Driven Architecture + CQRS + Event Sourcing Practical Implementation: Distributed System Design with Kafka/RabbitMQ

Authors
Event-Driven Architecture with CQRS and Event Sourcing

Introduction

As microservices architecture becomes ubiquitous, inter-service communication has become a critical factor determining system scalability and resilience. Synchronous REST call-based architecture creates strong coupling between services, and a single service failure triggers cascading failures that propagate throughout the entire system.

Event-Driven Architecture (EDA) is a representative pattern for solving this problem. Services publish events, and interested services subscribe to process them asynchronously. By combining CQRS (Command Query Responsibility Segregation) and Event Sourcing, you can independently scale read/write workloads and naturally obtain audit trails for all state changes.

This article covers EDA core patterns, CQRS and Event Sourcing implementation, message broker comparison, Saga pattern, and production failure cases with responses at the code level.

Event-Driven Architecture Core Patterns

Types of Events

Events in EDA are broadly classified into three categories:

TypeDescriptionExampleCharacteristics
Domain EventMeaningful facts occurring in the business domainOrderPlaced, PaymentCompletedPast tense naming, immutable
Integration EventEvents propagating across service boundariesOrderShipped (to logistics)Inter-service contract
Notification EventSimple notifications (minimized data)OrderStatusChangedReceiver queries details separately

EDA Communication Pattern Comparison

PatternCouplingDelivery GuaranteeOrder GuaranteeUse Cases
Pub/SubLooseAt-least-onceNot guaranteedNotifications, cache invalidation
Event StreamingLooseAt-least-onceWithin partitionLog aggregation, real-time analytics
Event SourcingSelf-refPersistent storageWithin aggregateAudit trails, state reconstruction
Request-ReplyStrongSynchronousRequest-response pairWhen synchronous confirmation needed

TypeScript Event Base Structure

// Domain event base interface
interface DomainEvent {
  readonly eventId: string
  readonly eventType: string
  readonly aggregateId: string
  readonly aggregateType: string
  readonly version: number
  readonly timestamp: Date
  readonly metadata: EventMetadata
  readonly payload: Record<string, unknown>
}

interface EventMetadata {
  readonly correlationId: string
  readonly causationId: string
  readonly userId?: string
  readonly traceId?: string
}

// Concrete event example
class OrderPlacedEvent implements DomainEvent {
  readonly eventType = 'OrderPlaced'
  readonly aggregateType = 'Order'

  constructor(
    public readonly eventId: string,
    public readonly aggregateId: string,
    public readonly version: number,
    public readonly timestamp: Date,
    public readonly metadata: EventMetadata,
    public readonly payload: {
      customerId: string
      items: Array<{ productId: string; quantity: number; price: number }>
      totalAmount: number
      currency: string
    }
  ) {}
}

CQRS (Command Query Responsibility Segregation) Deep Dive

CQRS Core Principles

CQRS is a pattern that completely separates the Read (Query) model from the Write (Command) model. In traditional CRUD models, the same data model handles both reading and writing, but in CQRS, separate models optimized for each purpose are used.

// Command side - Write model
interface Command {
  readonly commandId: string
  readonly commandType: string
  readonly timestamp: Date
}

class PlaceOrderCommand implements Command {
  readonly commandType = 'PlaceOrder'

  constructor(
    public readonly commandId: string,
    public readonly timestamp: Date,
    public readonly customerId: string,
    public readonly items: Array<{
      productId: string
      quantity: number
      price: number
    }>,
    public readonly shippingAddress: string
  ) {}
}

// Command Handler
class PlaceOrderHandler {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly eventBus: EventBus
  ) {}

  async handle(command: PlaceOrderCommand): Promise<string> {
    // Business validation
    await this.validateInventory(command.items)
    await this.validateCustomerCredit(command.customerId)

    // Create aggregate and publish events
    const order = Order.create(command.customerId, command.items, command.shippingAddress)

    await this.orderRepository.save(order)

    // Publish domain events
    for (const event of order.getUncommittedEvents()) {
      await this.eventBus.publish(event)
    }

    return order.id
  }

  private async validateInventory(
    items: Array<{ productId: string; quantity: number; price: number }>
  ): Promise<void> {
    // Inventory check logic
  }

  private async validateCustomerCredit(customerId: string): Promise<void> {
    // Customer credit check logic
  }
}

Query Side - Read Model

// Query side - Denormalized model optimized for reading
interface OrderSummaryReadModel {
  orderId: string
  customerName: string
  customerEmail: string
  orderDate: Date
  status: string
  totalAmount: number
  itemCount: number
  lastUpdated: Date
}

// Query Handler
class GetOrderSummaryHandler {
  constructor(private readonly readDb: ReadDatabase) {}

  async handle(orderId: string): Promise<OrderSummaryReadModel | null> {
    // Query denormalized data directly from read-only DB
    return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])
  }
}

// Projection - Transform events into read models
class OrderSummaryProjection {
  constructor(private readonly readDb: ReadDatabase) {}

  async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
    await this.readDb.execute(
      `INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)
       VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,
      [
        event.aggregateId,
        event.payload.customerId,
        event.timestamp,
        event.payload.totalAmount,
        event.payload.items.length,
        new Date(),
      ]
    )
  }

  async handleOrderShipped(event: DomainEvent): Promise<void> {
    await this.readDb.execute(
      `UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,
      [new Date(), event.aggregateId]
    )
  }
}

Event Sourcing Implementation

Event Store Design

In Event Sourcing, state is not stored directly; instead, a sequence of state change events is stored. Current state is reconstructed by replaying events in order.

// Event Store interface
interface EventStore {
  append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>
  getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>
  getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>
}

// PostgreSQL-based Event Store implementation
class PostgresEventStore implements EventStore {
  constructor(private readonly pool: Pool) {}

  async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
    const client = await this.pool.connect()
    try {
      await client.query('BEGIN')

      // Optimistic Concurrency Control
      const result = await client.query(
        'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
        [aggregateId]
      )
      const currentVersion = result.rows[0]?.current_version ?? 0

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, but current version is ${currentVersion}`
        )
      }

      // Batch insert events
      for (const event of events) {
        await client.query(
          `INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
          [
            event.eventId,
            event.aggregateId,
            event.aggregateType,
            event.eventType,
            event.version,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
            event.timestamp,
          ]
        )
      }

      await client.query('COMMIT')
    } catch (error) {
      await client.query('ROLLBACK')
      throw error
    } finally {
      client.release()
    }
  }

  async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',
      [aggregateId, fromVersion]
    )
    return result.rows.map(this.deserializeEvent)
  }

  async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {
    const query = fromTimestamp
      ? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'
      : 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'
    const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]
    const result = await this.pool.query(query, params)
    return result.rows.map(this.deserializeEvent)
  }

  private deserializeEvent(row: any): DomainEvent {
    return {
      eventId: row.event_id,
      eventType: row.event_type,
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      version: row.version,
      timestamp: row.timestamp,
      metadata: JSON.parse(row.metadata),
      payload: JSON.parse(row.payload),
    }
  }
}

Aggregate and Event Replay

// Event Sourced Aggregate
abstract class EventSourcedAggregate {
  private uncommittedEvents: DomainEvent[] = []
  protected version: number = 0

  abstract get id(): string

  protected apply(event: DomainEvent): void {
    this.when(event)
    this.version = event.version
    this.uncommittedEvents.push(event)
  }

  protected abstract when(event: DomainEvent): void

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents]
  }

  clearUncommittedEvents(): void {
    this.uncommittedEvents = []
  }

  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event)
      this.version = event.version
    }
  }
}

// Order Aggregate
class Order extends EventSourcedAggregate {
  private _id: string = ''
  private _customerId: string = ''
  private _status: OrderStatus = OrderStatus.DRAFT
  private _items: OrderItem[] = []
  private _totalAmount: number = 0

  get id(): string {
    return this._id
  }

  static create(
    customerId: string,
    items: Array<{ productId: string; quantity: number; price: number }>,
    shippingAddress: string
  ): Order {
    const order = new Order()
    const orderId = generateUUID()
    const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

    order.apply({
      eventId: generateUUID(),
      eventType: 'OrderPlaced',
      aggregateId: orderId,
      aggregateType: 'Order',
      version: 1,
      timestamp: new Date(),
      metadata: { correlationId: generateUUID(), causationId: generateUUID() },
      payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },
    })

    return order
  }

  confirm(): void {
    if (this._status !== OrderStatus.PLACED) {
      throw new Error('Order can only be confirmed when in PLACED status')
    }

    this.apply({
      eventId: generateUUID(),
      eventType: 'OrderConfirmed',
      aggregateId: this._id,
      aggregateType: 'Order',
      version: this.version + 1,
      timestamp: new Date(),
      metadata: { correlationId: generateUUID(), causationId: generateUUID() },
      payload: { confirmedAt: new Date().toISOString() },
    })
  }

  protected when(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderPlaced':
        this._id = event.aggregateId
        this._customerId = event.payload.customerId as string
        this._status = OrderStatus.PLACED
        this._items = event.payload.items as OrderItem[]
        this._totalAmount = event.payload.totalAmount as number
        break
      case 'OrderConfirmed':
        this._status = OrderStatus.CONFIRMED
        break
      case 'OrderShipped':
        this._status = OrderStatus.SHIPPED
        break
      case 'OrderCancelled':
        this._status = OrderStatus.CANCELLED
        break
    }
  }
}

enum OrderStatus {
  DRAFT = 'DRAFT',
  PLACED = 'PLACED',
  CONFIRMED = 'CONFIRMED',
  SHIPPED = 'SHIPPED',
  CANCELLED = 'CANCELLED',
}

Python Event Store Implementation

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Protocol
from uuid import uuid4
import json
import asyncpg


@dataclass(frozen=True)
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    aggregate_type: str
    version: int
    timestamp: datetime
    payload: dict[str, Any]
    metadata: dict[str, str] = field(default_factory=dict)


class EventStore(Protocol):
    async def append(
        self,
        aggregate_id: str,
        events: list[DomainEvent],
        expected_version: int,
    ) -> None: ...

    async def get_events(
        self, aggregate_id: str, from_version: int = 0
    ) -> list[DomainEvent]: ...


class PostgresEventStore:
    def __init__(self, pool: asyncpg.Pool):
        self._pool = pool

    async def append(
        self,
        aggregate_id: str,
        events: list[DomainEvent],
        expected_version: int,
    ) -> None:
        async with self._pool.acquire() as conn:
            async with conn.transaction():
                # Optimistic Concurrency Control
                row = await conn.fetchrow(
                    "SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",
                    aggregate_id,
                )
                current = row["cur"] or 0
                if current != expected_version:
                    raise ConcurrencyError(
                        f"Expected {expected_version}, got {current}"
                    )

                for event in events:
                    await conn.execute(
                        """
                        INSERT INTO events
                            (event_id, aggregate_id, aggregate_type,
                             event_type, version, payload, metadata, timestamp)
                        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
                        """,
                        event.event_id,
                        event.aggregate_id,
                        event.aggregate_type,
                        event.event_type,
                        event.version,
                        json.dumps(event.payload),
                        json.dumps(event.metadata),
                        event.timestamp,
                    )

    async def get_events(
        self, aggregate_id: str, from_version: int = 0
    ) -> list[DomainEvent]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                """
                SELECT * FROM events
                WHERE aggregate_id = $1 AND version > $2
                ORDER BY version ASC
                """,
                aggregate_id,
                from_version,
            )
            return [self._deserialize(row) for row in rows]

    @staticmethod
    def _deserialize(row) -> DomainEvent:
        return DomainEvent(
            event_id=row["event_id"],
            event_type=row["event_type"],
            aggregate_id=row["aggregate_id"],
            aggregate_type=row["aggregate_type"],
            version=row["version"],
            timestamp=row["timestamp"],
            payload=json.loads(row["payload"]),
            metadata=json.loads(row["metadata"]),
        )

Message Broker Comparison: Kafka vs RabbitMQ vs NATS

Key Characteristics Comparison

ItemApache KafkaRabbitMQNATS JetStream
ModelDistributed log (Append-only)Message queue (Push-based)Streaming (Pull/Push)
Message retentionPersistent for configured periodDeleted after consumption (default)Retained for configured period
Order guaranteeWithin partitionPer queueWithin stream
ThroughputMillions per secondTens of thousands/secHundreds of thousands/sec
Latencyms level (batch)us level (single)us level
Consumer groupsNative supportCompeting consumer patternNative support
ReplayOffset-based free navigationLimited (dead letter)Sequence-based navigation
Operational complexityHigh (ZooKeeper/KRaft)MediumLow
Best forEvent streaming, log aggregationTask queues, RPCLightweight messaging, IoT

Kafka Producer/Consumer Example

import { Kafka, logLevel } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
  logLevel: logLevel.WARN,
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
})

// Producer - Event publishing
class KafkaEventPublisher {
  private producer = kafka.producer({
    idempotent: true, // Idempotency guarantee
    maxInFlightRequests: 5,
    transactionalId: 'order-service-producer',
  })

  async publish(event: DomainEvent): Promise<void> {
    await this.producer.send({
      topic: `events.${event.aggregateType.toLowerCase()}`,
      messages: [
        {
          key: event.aggregateId, // Partition key = Aggregate ID
          value: JSON.stringify(event),
          headers: {
            'event-type': event.eventType,
            'correlation-id': event.metadata.correlationId,
            'content-type': 'application/json',
          },
        },
      ],
    })
  }

  async publishBatch(events: DomainEvent[]): Promise<void> {
    const transaction = await this.producer.transaction()
    try {
      for (const event of events) {
        await transaction.send({
          topic: `events.${event.aggregateType.toLowerCase()}`,
          messages: [
            {
              key: event.aggregateId,
              value: JSON.stringify(event),
              headers: {
                'event-type': event.eventType,
                'correlation-id': event.metadata.correlationId,
              },
            },
          ],
        })
      }
      await transaction.commit()
    } catch (error) {
      await transaction.abort()
      throw error
    }
  }
}

// Consumer - Event consumption (with idempotency guarantee)
class KafkaEventConsumer {
  private consumer = kafka.consumer({ groupId: 'projection-service' })
  private processedEvents = new Set<string>()

  async start(): Promise<void> {
    await this.consumer.subscribe({
      topics: ['events.order'],
      fromBeginning: false,
    })

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const event: DomainEvent = JSON.parse(message.value!.toString())

        // Idempotency check - skip already processed events
        if (await this.isAlreadyProcessed(event.eventId)) {
          console.log(`Skipping duplicate event: ${event.eventId}`)
          return
        }

        await this.handleEvent(event)
        await this.markAsProcessed(event.eventId)
      },
    })
  }

  private async isAlreadyProcessed(eventId: string): Promise<boolean> {
    // Check processing status in Redis or DB
    return this.processedEvents.has(eventId)
  }

  private async markAsProcessed(eventId: string): Promise<void> {
    this.processedEvents.add(eventId)
  }

  private async handleEvent(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderPlaced':
        await this.handleOrderPlaced(event)
        break
      case 'OrderConfirmed':
        await this.handleOrderConfirmed(event)
        break
    }
  }

  private async handleOrderPlaced(event: DomainEvent): Promise<void> {
    console.log(`Processing OrderPlaced for ${event.aggregateId}`)
  }

  private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
    console.log(`Processing OrderConfirmed for ${event.aggregateId}`)
  }
}

Saga Pattern: Distributed Transaction Management

Orchestration Saga vs Choreography Saga

ItemOrchestrationChoreography
ControlCentral orchestrator coordinatesEach service reacts autonomously
CouplingDepends on orchestratorLoosely coupled between services
ComplexityConcentrated in orchestratorDistributed across services
TrackingCentral state visibilityDistributed state tracking needed
Error handlingCentral compensating transactionsEach service publishes compensation events
Best forComplex workflows (5+ steps)Simple workflows (3 or fewer steps)

Orchestration Saga Implementation

// Saga state machine
enum SagaStatus {
  STARTED = 'STARTED',
  ORDER_CREATED = 'ORDER_CREATED',
  PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',
  INVENTORY_RESERVED = 'INVENTORY_RESERVED',
  COMPLETED = 'COMPLETED',
  COMPENSATING = 'COMPENSATING',
  FAILED = 'FAILED',
}

interface SagaStep {
  name: string
  execute: (context: SagaContext) => Promise<void>
  compensate: (context: SagaContext) => Promise<void>
}

interface SagaContext {
  sagaId: string
  orderId: string
  customerId: string
  items: Array<{ productId: string; quantity: number; price: number }>
  paymentId?: string
  reservationId?: string
  [key: string]: unknown
}

class OrderSagaOrchestrator {
  private steps: SagaStep[] = []
  private completedSteps: SagaStep[] = []

  constructor(
    private readonly sagaRepository: SagaRepository,
    private readonly eventBus: EventBus
  ) {
    this.steps = [
      {
        name: 'CreateOrder',
        execute: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'CreateOrderRequested',
            aggregateId: ctx.orderId,
            payload: { customerId: ctx.customerId, items: ctx.items },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'CancelOrderRequested',
            aggregateId: ctx.orderId,
            payload: { reason: 'Saga compensation' },
          } as DomainEvent)
        },
      },
      {
        name: 'ProcessPayment',
        execute: async (ctx) => {
          const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
          await this.eventBus.publish({
            eventType: 'ProcessPaymentRequested',
            aggregateId: ctx.orderId,
            payload: { customerId: ctx.customerId, amount: totalAmount },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'RefundPaymentRequested',
            aggregateId: ctx.orderId,
            payload: { paymentId: ctx.paymentId },
          } as DomainEvent)
        },
      },
      {
        name: 'ReserveInventory',
        execute: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'ReserveInventoryRequested',
            aggregateId: ctx.orderId,
            payload: { items: ctx.items },
          } as DomainEvent)
        },
        compensate: async (ctx) => {
          await this.eventBus.publish({
            eventType: 'ReleaseInventoryRequested',
            aggregateId: ctx.orderId,
            payload: { reservationId: ctx.reservationId },
          } as DomainEvent)
        },
      },
    ]
  }

  async start(context: SagaContext): Promise<void> {
    await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)
    await this.executeNextStep(context, 0)
  }

  private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {
    if (stepIndex >= this.steps.length) {
      await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)
      return
    }

    const step = this.steps[stepIndex]
    try {
      await step.execute(context)
      this.completedSteps.push(step)
      await this.executeNextStep(context, stepIndex + 1)
    } catch (error) {
      console.error(`Saga step '${step.name}' failed:`, error)
      await this.compensate(context)
    }
  }

  private async compensate(context: SagaContext): Promise<void> {
    await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)

    // Compensate completed steps in reverse order
    for (const step of [...this.completedSteps].reverse()) {
      try {
        await step.compensate(context)
      } catch (error) {
        console.error(`Compensation for '${step.name}' failed:`, error)
        // Manual intervention needed on compensation failure - send alert
      }
    }

    await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)
  }
}

Snapshot Strategy: Event Replay Optimization

As the number of events grows, replay time increases. Snapshots solve this by storing the aggregate state at a specific point in time.

// Snapshot-based aggregate loading
class EventSourcedRepository<T extends EventSourcedAggregate> {
  private readonly SNAPSHOT_INTERVAL = 50 // Snapshot every 50 events

  constructor(
    private readonly eventStore: EventStore,
    private readonly snapshotStore: SnapshotStore,
    private readonly factory: () => T
  ) {}

  async load(aggregateId: string): Promise<T> {
    const aggregate = this.factory()

    // 1. Load latest snapshot
    const snapshot = await this.snapshotStore.getLatest(aggregateId)

    if (snapshot) {
      aggregate.restoreFromSnapshot(snapshot.state)
      // 2. Replay only events after snapshot
      const events = await this.eventStore.getEvents(aggregateId, snapshot.version)
      aggregate.loadFromHistory(events)
    } else {
      // 3. Full event replay
      const events = await this.eventStore.getEvents(aggregateId)
      aggregate.loadFromHistory(events)
    }

    return aggregate
  }

  async save(aggregate: T): Promise<void> {
    const events = aggregate.getUncommittedEvents()
    await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)

    // Check snapshot creation condition
    if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotStore.save({
        aggregateId: aggregate.id,
        version: aggregate.version,
        state: aggregate.toSnapshot(),
        timestamp: new Date(),
      })
    }

    aggregate.clearUncommittedEvents()
  }
}

Operational Considerations

Critical Warnings

  1. Event schema changes are more dangerous than DB migrations: Events in the Event Store are immutable, so schema changes break past event replay. Backward compatibility must always be maintained.

  2. CQRS adoption at least doubles complexity: The synchronization delay (eventual consistency) between read and write models must be handled in both UI and business logic.

  3. Saga compensating transaction failures require manual intervention: Compensating transactions can also fail, and in such cases, Dead Letter Queues and manual recovery processes are essential.

  4. Event order inversion destroys data consistency: If Kafka partitioning keys are misconfigured, events from the same aggregate are distributed across different partitions, causing order reversal.

  5. Event Store infinite growth must be managed: Without snapshot and archiving strategies, the Event Store grows infinitely and replay performance degrades sharply.

Failure Cases and Recovery Procedures

Case 1: Event Order Inversion

Symptom: OrderShipped event arrives before OrderPlaced, causing projection failure.

Cause: Kafka partitioning key was not set, causing events to be distributed across different partitions.

Recovery:

# 1. Reset consumer group offsets
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group projection-service \
  --topic events.order \
  --reset-offsets --to-earliest --execute

# 2. Reset projection table
psql -c "TRUNCATE TABLE order_summaries;"

# 3. Restart projection service (full replay)
kubectl rollout restart deployment/projection-service

Prevention: Always use aggregate ID as the Kafka partitioning key.

Case 2: Duplicate Event Processing

Symptom: Orders created in duplicate, or payments processed twice.

Cause: After consumer failure and restart, already processed messages are consumed again (at-least-once characteristic).

Recovery:

// Idempotency key table-based duplicate prevention
class IdempotencyGuard {
  constructor(private readonly db: Database) {}

  async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {
    try {
      // Prevent duplicate insertion with UNIQUE constraint
      await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [
        idempotencyKey,
        new Date(),
      ])
    } catch (error) {
      // Already processed event
      console.log(`Event ${idempotencyKey} already processed, skipping`)
      return null
    }

    return operation()
  }
}

Case 3: Schema Evolution Failure

Symptom: New version event handler cannot deserialize past events, causing projection to stop.

Cause: Required field added to events while past events lack that field.

Recovery:

// Event Upcaster pattern
class EventUpcaster {
  private upcasters: Map<string, (event: any) => DomainEvent> = new Map()

  register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {
    this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)
  }

  upcast(event: any): DomainEvent {
    const key = `${event.eventType}:v${event.schemaVersion || 1}`
    const upcaster = this.upcasters.get(key)

    if (upcaster) {
      return this.upcast(upcaster(event)) // Recursively upcast to latest version
    }

    return event
  }
}

// Usage example: v1 -> v2 conversion
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => ({
  ...event,
  schemaVersion: 2,
  payload: {
    ...event.payload,
    currency: event.payload.currency || 'KRW', // Add default value
    shippingMethod: 'STANDARD', // Set default for new field
  },
}))

Production Checklist

Event Store

  • Set aggregate_id + version unique index on events table
  • Verify Optimistic Concurrency Control implementation
  • Configure snapshot strategy (every 50-100 events)
  • Establish event archiving policy (move to cold storage)
  • Verify Event Store backup and recovery procedures

CQRS

  • Automate Read Model rebuild process
  • Monitor read/write DB separation and replication lag
  • Set up alerts and auto-retry for projection failures
  • Handle Eventual Consistency (optimistic updates in UI)

Message Broker

  • Kafka: Set partitioning key to aggregate ID
  • Monitor consumer group lag (Burrow or Kafka Exporter)
  • Configure and monitor Dead Letter Queue (DLQ)
  • Decide message serialization format (Avro + Schema Registry recommended)
  • Set broker cluster replication factor to 3 or more

Saga

  • Define and test compensating transactions
  • Verify Saga state persistence (DB storage)
  • Set Saga timeout (prevent infinite wait)
  • Document manual recovery process for compensation failures

Event Schema

  • Introduce Schema Registry (Confluent Schema Registry, AWS Glue)
  • Automate backward compatibility verification (include in CI pipeline)
  • Implement Event Upcaster
  • Maintain event catalog documentation

Monitoring

  • Collect event processing latency metrics
  • Set up Consumer Group lag alerts
  • Build event processing failure rate dashboard
  • Implement Correlation ID-based end-to-end tracing

References