- Published on
Event-Driven Architecture + CQRS + Event Sourcing Practical Implementation: Distributed System Design with Kafka/RabbitMQ
- Authors

- Name
- Youngju Kim
- @fjvbn20031
- Introduction
- Event-Driven Architecture Core Patterns
- CQRS (Command Query Responsibility Segregation) Deep Dive
- Event Sourcing Implementation
- Message Broker Comparison: Kafka vs RabbitMQ vs NATS
- Saga Pattern: Distributed Transaction Management
- Snapshot Strategy: Event Replay Optimization
- Operational Considerations
- Failure Cases and Recovery Procedures
- Production Checklist
- References

Introduction
As microservices architecture becomes ubiquitous, inter-service communication has become a critical factor determining system scalability and resilience. Synchronous REST call-based architecture creates strong coupling between services, and a single service failure triggers cascading failures that propagate throughout the entire system.
Event-Driven Architecture (EDA) is a representative pattern for solving this problem. Services publish events, and interested services subscribe to process them asynchronously. By combining CQRS (Command Query Responsibility Segregation) and Event Sourcing, you can independently scale read/write workloads and naturally obtain audit trails for all state changes.
This article covers EDA core patterns, CQRS and Event Sourcing implementation, message broker comparison, Saga pattern, and production failure cases with responses at the code level.
Event-Driven Architecture Core Patterns
Types of Events
Events in EDA are broadly classified into three categories:
| Type | Description | Example | Characteristics |
|---|---|---|---|
| Domain Event | Meaningful facts occurring in the business domain | OrderPlaced, PaymentCompleted | Past tense naming, immutable |
| Integration Event | Events propagating across service boundaries | OrderShipped (to logistics) | Inter-service contract |
| Notification Event | Simple notifications (minimized data) | OrderStatusChanged | Receiver queries details separately |
EDA Communication Pattern Comparison
| Pattern | Coupling | Delivery Guarantee | Order Guarantee | Use Cases |
|---|---|---|---|---|
| Pub/Sub | Loose | At-least-once | Not guaranteed | Notifications, cache invalidation |
| Event Streaming | Loose | At-least-once | Within partition | Log aggregation, real-time analytics |
| Event Sourcing | Self-ref | Persistent storage | Within aggregate | Audit trails, state reconstruction |
| Request-Reply | Strong | Synchronous | Request-response pair | When synchronous confirmation needed |
TypeScript Event Base Structure
// Domain event base interface
interface DomainEvent {
readonly eventId: string
readonly eventType: string
readonly aggregateId: string
readonly aggregateType: string
readonly version: number
readonly timestamp: Date
readonly metadata: EventMetadata
readonly payload: Record<string, unknown>
}
interface EventMetadata {
readonly correlationId: string
readonly causationId: string
readonly userId?: string
readonly traceId?: string
}
// Concrete event example
class OrderPlacedEvent implements DomainEvent {
readonly eventType = 'OrderPlaced'
readonly aggregateType = 'Order'
constructor(
public readonly eventId: string,
public readonly aggregateId: string,
public readonly version: number,
public readonly timestamp: Date,
public readonly metadata: EventMetadata,
public readonly payload: {
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
totalAmount: number
currency: string
}
) {}
}
CQRS (Command Query Responsibility Segregation) Deep Dive
CQRS Core Principles
CQRS is a pattern that completely separates the Read (Query) model from the Write (Command) model. In traditional CRUD models, the same data model handles both reading and writing, but in CQRS, separate models optimized for each purpose are used.
// Command side - Write model
interface Command {
readonly commandId: string
readonly commandType: string
readonly timestamp: Date
}
class PlaceOrderCommand implements Command {
readonly commandType = 'PlaceOrder'
constructor(
public readonly commandId: string,
public readonly timestamp: Date,
public readonly customerId: string,
public readonly items: Array<{
productId: string
quantity: number
price: number
}>,
public readonly shippingAddress: string
) {}
}
// Command Handler
class PlaceOrderHandler {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// Business validation
await this.validateInventory(command.items)
await this.validateCustomerCredit(command.customerId)
// Create aggregate and publish events
const order = Order.create(command.customerId, command.items, command.shippingAddress)
await this.orderRepository.save(order)
// Publish domain events
for (const event of order.getUncommittedEvents()) {
await this.eventBus.publish(event)
}
return order.id
}
private async validateInventory(
items: Array<{ productId: string; quantity: number; price: number }>
): Promise<void> {
// Inventory check logic
}
private async validateCustomerCredit(customerId: string): Promise<void> {
// Customer credit check logic
}
}
Query Side - Read Model
// Query side - Denormalized model optimized for reading
interface OrderSummaryReadModel {
orderId: string
customerName: string
customerEmail: string
orderDate: Date
status: string
totalAmount: number
itemCount: number
lastUpdated: Date
}
// Query Handler
class GetOrderSummaryHandler {
constructor(private readonly readDb: ReadDatabase) {}
async handle(orderId: string): Promise<OrderSummaryReadModel | null> {
// Query denormalized data directly from read-only DB
return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])
}
}
// Projection - Transform events into read models
class OrderSummaryProjection {
constructor(private readonly readDb: ReadDatabase) {}
async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
await this.readDb.execute(
`INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)
VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,
[
event.aggregateId,
event.payload.customerId,
event.timestamp,
event.payload.totalAmount,
event.payload.items.length,
new Date(),
]
)
}
async handleOrderShipped(event: DomainEvent): Promise<void> {
await this.readDb.execute(
`UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,
[new Date(), event.aggregateId]
)
}
}
Event Sourcing Implementation
Event Store Design
In Event Sourcing, state is not stored directly; instead, a sequence of state change events is stored. Current state is reconstructed by replaying events in order.
// Event Store interface
interface EventStore {
append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>
getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>
getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>
}
// PostgreSQL-based Event Store implementation
class PostgresEventStore implements EventStore {
constructor(private readonly pool: Pool) {}
async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// Optimistic Concurrency Control
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
[aggregateId]
)
const currentVersion = result.rows[0]?.current_version ?? 0
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current version is ${currentVersion}`
)
}
// Batch insert events
for (const event of events) {
await client.query(
`INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.eventId,
event.aggregateId,
event.aggregateType,
event.eventType,
event.version,
JSON.stringify(event.payload),
JSON.stringify(event.metadata),
event.timestamp,
]
)
}
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
const result = await this.pool.query(
'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',
[aggregateId, fromVersion]
)
return result.rows.map(this.deserializeEvent)
}
async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {
const query = fromTimestamp
? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'
: 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'
const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]
const result = await this.pool.query(query, params)
return result.rows.map(this.deserializeEvent)
}
private deserializeEvent(row: any): DomainEvent {
return {
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
timestamp: row.timestamp,
metadata: JSON.parse(row.metadata),
payload: JSON.parse(row.payload),
}
}
}
Aggregate and Event Replay
// Event Sourced Aggregate
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
protected version: number = 0
abstract get id(): string
protected apply(event: DomainEvent): void {
this.when(event)
this.version = event.version
this.uncommittedEvents.push(event)
}
protected abstract when(event: DomainEvent): void
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this.version = event.version
}
}
}
// Order Aggregate
class Order extends EventSourcedAggregate {
private _id: string = ''
private _customerId: string = ''
private _status: OrderStatus = OrderStatus.DRAFT
private _items: OrderItem[] = []
private _totalAmount: number = 0
get id(): string {
return this._id
}
static create(
customerId: string,
items: Array<{ productId: string; quantity: number; price: number }>,
shippingAddress: string
): Order {
const order = new Order()
const orderId = generateUUID()
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)
order.apply({
eventId: generateUUID(),
eventType: 'OrderPlaced',
aggregateId: orderId,
aggregateType: 'Order',
version: 1,
timestamp: new Date(),
metadata: { correlationId: generateUUID(), causationId: generateUUID() },
payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },
})
return order
}
confirm(): void {
if (this._status !== OrderStatus.PLACED) {
throw new Error('Order can only be confirmed when in PLACED status')
}
this.apply({
eventId: generateUUID(),
eventType: 'OrderConfirmed',
aggregateId: this._id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
metadata: { correlationId: generateUUID(), causationId: generateUUID() },
payload: { confirmedAt: new Date().toISOString() },
})
}
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this._id = event.aggregateId
this._customerId = event.payload.customerId as string
this._status = OrderStatus.PLACED
this._items = event.payload.items as OrderItem[]
this._totalAmount = event.payload.totalAmount as number
break
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED
break
case 'OrderShipped':
this._status = OrderStatus.SHIPPED
break
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED
break
}
}
}
enum OrderStatus {
DRAFT = 'DRAFT',
PLACED = 'PLACED',
CONFIRMED = 'CONFIRMED',
SHIPPED = 'SHIPPED',
CANCELLED = 'CANCELLED',
}
Python Event Store Implementation
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Protocol
from uuid import uuid4
import json
import asyncpg
@dataclass(frozen=True)
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
aggregate_type: str
version: int
timestamp: datetime
payload: dict[str, Any]
metadata: dict[str, str] = field(default_factory=dict)
class EventStore(Protocol):
async def append(
self,
aggregate_id: str,
events: list[DomainEvent],
expected_version: int,
) -> None: ...
async def get_events(
self, aggregate_id: str, from_version: int = 0
) -> list[DomainEvent]: ...
class PostgresEventStore:
def __init__(self, pool: asyncpg.Pool):
self._pool = pool
async def append(
self,
aggregate_id: str,
events: list[DomainEvent],
expected_version: int,
) -> None:
async with self._pool.acquire() as conn:
async with conn.transaction():
# Optimistic Concurrency Control
row = await conn.fetchrow(
"SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",
aggregate_id,
)
current = row["cur"] or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected {expected_version}, got {current}"
)
for event in events:
await conn.execute(
"""
INSERT INTO events
(event_id, aggregate_id, aggregate_type,
event_type, version, payload, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
event.event_id,
event.aggregate_id,
event.aggregate_type,
event.event_type,
event.version,
json.dumps(event.payload),
json.dumps(event.metadata),
event.timestamp,
)
async def get_events(
self, aggregate_id: str, from_version: int = 0
) -> list[DomainEvent]:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
""",
aggregate_id,
from_version,
)
return [self._deserialize(row) for row in rows]
@staticmethod
def _deserialize(row) -> DomainEvent:
return DomainEvent(
event_id=row["event_id"],
event_type=row["event_type"],
aggregate_id=row["aggregate_id"],
aggregate_type=row["aggregate_type"],
version=row["version"],
timestamp=row["timestamp"],
payload=json.loads(row["payload"]),
metadata=json.loads(row["metadata"]),
)
Message Broker Comparison: Kafka vs RabbitMQ vs NATS
Key Characteristics Comparison
| Item | Apache Kafka | RabbitMQ | NATS JetStream |
|---|---|---|---|
| Model | Distributed log (Append-only) | Message queue (Push-based) | Streaming (Pull/Push) |
| Message retention | Persistent for configured period | Deleted after consumption (default) | Retained for configured period |
| Order guarantee | Within partition | Per queue | Within stream |
| Throughput | Millions per second | Tens of thousands/sec | Hundreds of thousands/sec |
| Latency | ms level (batch) | us level (single) | us level |
| Consumer groups | Native support | Competing consumer pattern | Native support |
| Replay | Offset-based free navigation | Limited (dead letter) | Sequence-based navigation |
| Operational complexity | High (ZooKeeper/KRaft) | Medium | Low |
| Best for | Event streaming, log aggregation | Task queues, RPC | Lightweight messaging, IoT |
Kafka Producer/Consumer Example
import { Kafka, logLevel } from 'kafkajs'
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8,
},
})
// Producer - Event publishing
class KafkaEventPublisher {
private producer = kafka.producer({
idempotent: true, // Idempotency guarantee
maxInFlightRequests: 5,
transactionalId: 'order-service-producer',
})
async publish(event: DomainEvent): Promise<void> {
await this.producer.send({
topic: `events.${event.aggregateType.toLowerCase()}`,
messages: [
{
key: event.aggregateId, // Partition key = Aggregate ID
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'correlation-id': event.metadata.correlationId,
'content-type': 'application/json',
},
},
],
})
}
async publishBatch(events: DomainEvent[]): Promise<void> {
const transaction = await this.producer.transaction()
try {
for (const event of events) {
await transaction.send({
topic: `events.${event.aggregateType.toLowerCase()}`,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'correlation-id': event.metadata.correlationId,
},
},
],
})
}
await transaction.commit()
} catch (error) {
await transaction.abort()
throw error
}
}
}
// Consumer - Event consumption (with idempotency guarantee)
class KafkaEventConsumer {
private consumer = kafka.consumer({ groupId: 'projection-service' })
private processedEvents = new Set<string>()
async start(): Promise<void> {
await this.consumer.subscribe({
topics: ['events.order'],
fromBeginning: false,
})
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event: DomainEvent = JSON.parse(message.value!.toString())
// Idempotency check - skip already processed events
if (await this.isAlreadyProcessed(event.eventId)) {
console.log(`Skipping duplicate event: ${event.eventId}`)
return
}
await this.handleEvent(event)
await this.markAsProcessed(event.eventId)
},
})
}
private async isAlreadyProcessed(eventId: string): Promise<boolean> {
// Check processing status in Redis or DB
return this.processedEvents.has(eventId)
}
private async markAsProcessed(eventId: string): Promise<void> {
this.processedEvents.add(eventId)
}
private async handleEvent(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderPlaced':
await this.handleOrderPlaced(event)
break
case 'OrderConfirmed':
await this.handleOrderConfirmed(event)
break
}
}
private async handleOrderPlaced(event: DomainEvent): Promise<void> {
console.log(`Processing OrderPlaced for ${event.aggregateId}`)
}
private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
console.log(`Processing OrderConfirmed for ${event.aggregateId}`)
}
}
Saga Pattern: Distributed Transaction Management
Orchestration Saga vs Choreography Saga
| Item | Orchestration | Choreography |
|---|---|---|
| Control | Central orchestrator coordinates | Each service reacts autonomously |
| Coupling | Depends on orchestrator | Loosely coupled between services |
| Complexity | Concentrated in orchestrator | Distributed across services |
| Tracking | Central state visibility | Distributed state tracking needed |
| Error handling | Central compensating transactions | Each service publishes compensation events |
| Best for | Complex workflows (5+ steps) | Simple workflows (3 or fewer steps) |
Orchestration Saga Implementation
// Saga state machine
enum SagaStatus {
STARTED = 'STARTED',
ORDER_CREATED = 'ORDER_CREATED',
PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',
INVENTORY_RESERVED = 'INVENTORY_RESERVED',
COMPLETED = 'COMPLETED',
COMPENSATING = 'COMPENSATING',
FAILED = 'FAILED',
}
interface SagaStep {
name: string
execute: (context: SagaContext) => Promise<void>
compensate: (context: SagaContext) => Promise<void>
}
interface SagaContext {
sagaId: string
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
paymentId?: string
reservationId?: string
[key: string]: unknown
}
class OrderSagaOrchestrator {
private steps: SagaStep[] = []
private completedSteps: SagaStep[] = []
constructor(
private readonly sagaRepository: SagaRepository,
private readonly eventBus: EventBus
) {
this.steps = [
{
name: 'CreateOrder',
execute: async (ctx) => {
await this.eventBus.publish({
eventType: 'CreateOrderRequested',
aggregateId: ctx.orderId,
payload: { customerId: ctx.customerId, items: ctx.items },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'CancelOrderRequested',
aggregateId: ctx.orderId,
payload: { reason: 'Saga compensation' },
} as DomainEvent)
},
},
{
name: 'ProcessPayment',
execute: async (ctx) => {
const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
await this.eventBus.publish({
eventType: 'ProcessPaymentRequested',
aggregateId: ctx.orderId,
payload: { customerId: ctx.customerId, amount: totalAmount },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'RefundPaymentRequested',
aggregateId: ctx.orderId,
payload: { paymentId: ctx.paymentId },
} as DomainEvent)
},
},
{
name: 'ReserveInventory',
execute: async (ctx) => {
await this.eventBus.publish({
eventType: 'ReserveInventoryRequested',
aggregateId: ctx.orderId,
payload: { items: ctx.items },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'ReleaseInventoryRequested',
aggregateId: ctx.orderId,
payload: { reservationId: ctx.reservationId },
} as DomainEvent)
},
},
]
}
async start(context: SagaContext): Promise<void> {
await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)
await this.executeNextStep(context, 0)
}
private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {
if (stepIndex >= this.steps.length) {
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)
return
}
const step = this.steps[stepIndex]
try {
await step.execute(context)
this.completedSteps.push(step)
await this.executeNextStep(context, stepIndex + 1)
} catch (error) {
console.error(`Saga step '${step.name}' failed:`, error)
await this.compensate(context)
}
}
private async compensate(context: SagaContext): Promise<void> {
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)
// Compensate completed steps in reverse order
for (const step of [...this.completedSteps].reverse()) {
try {
await step.compensate(context)
} catch (error) {
console.error(`Compensation for '${step.name}' failed:`, error)
// Manual intervention needed on compensation failure - send alert
}
}
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)
}
}
Snapshot Strategy: Event Replay Optimization
As the number of events grows, replay time increases. Snapshots solve this by storing the aggregate state at a specific point in time.
// Snapshot-based aggregate loading
class EventSourcedRepository<T extends EventSourcedAggregate> {
private readonly SNAPSHOT_INTERVAL = 50 // Snapshot every 50 events
constructor(
private readonly eventStore: EventStore,
private readonly snapshotStore: SnapshotStore,
private readonly factory: () => T
) {}
async load(aggregateId: string): Promise<T> {
const aggregate = this.factory()
// 1. Load latest snapshot
const snapshot = await this.snapshotStore.getLatest(aggregateId)
if (snapshot) {
aggregate.restoreFromSnapshot(snapshot.state)
// 2. Replay only events after snapshot
const events = await this.eventStore.getEvents(aggregateId, snapshot.version)
aggregate.loadFromHistory(events)
} else {
// 3. Full event replay
const events = await this.eventStore.getEvents(aggregateId)
aggregate.loadFromHistory(events)
}
return aggregate
}
async save(aggregate: T): Promise<void> {
const events = aggregate.getUncommittedEvents()
await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)
// Check snapshot creation condition
if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotStore.save({
aggregateId: aggregate.id,
version: aggregate.version,
state: aggregate.toSnapshot(),
timestamp: new Date(),
})
}
aggregate.clearUncommittedEvents()
}
}
Operational Considerations
Critical Warnings
-
Event schema changes are more dangerous than DB migrations: Events in the Event Store are immutable, so schema changes break past event replay. Backward compatibility must always be maintained.
-
CQRS adoption at least doubles complexity: The synchronization delay (eventual consistency) between read and write models must be handled in both UI and business logic.
-
Saga compensating transaction failures require manual intervention: Compensating transactions can also fail, and in such cases, Dead Letter Queues and manual recovery processes are essential.
-
Event order inversion destroys data consistency: If Kafka partitioning keys are misconfigured, events from the same aggregate are distributed across different partitions, causing order reversal.
-
Event Store infinite growth must be managed: Without snapshot and archiving strategies, the Event Store grows infinitely and replay performance degrades sharply.
Failure Cases and Recovery Procedures
Case 1: Event Order Inversion
Symptom: OrderShipped event arrives before OrderPlaced, causing projection failure.
Cause: Kafka partitioning key was not set, causing events to be distributed across different partitions.
Recovery:
# 1. Reset consumer group offsets
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group projection-service \
--topic events.order \
--reset-offsets --to-earliest --execute
# 2. Reset projection table
psql -c "TRUNCATE TABLE order_summaries;"
# 3. Restart projection service (full replay)
kubectl rollout restart deployment/projection-service
Prevention: Always use aggregate ID as the Kafka partitioning key.
Case 2: Duplicate Event Processing
Symptom: Orders created in duplicate, or payments processed twice.
Cause: After consumer failure and restart, already processed messages are consumed again (at-least-once characteristic).
Recovery:
// Idempotency key table-based duplicate prevention
class IdempotencyGuard {
constructor(private readonly db: Database) {}
async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {
try {
// Prevent duplicate insertion with UNIQUE constraint
await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [
idempotencyKey,
new Date(),
])
} catch (error) {
// Already processed event
console.log(`Event ${idempotencyKey} already processed, skipping`)
return null
}
return operation()
}
}
Case 3: Schema Evolution Failure
Symptom: New version event handler cannot deserialize past events, causing projection to stop.
Cause: Required field added to events while past events lack that field.
Recovery:
// Event Upcaster pattern
class EventUpcaster {
private upcasters: Map<string, (event: any) => DomainEvent> = new Map()
register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {
this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)
}
upcast(event: any): DomainEvent {
const key = `${event.eventType}:v${event.schemaVersion || 1}`
const upcaster = this.upcasters.get(key)
if (upcaster) {
return this.upcast(upcaster(event)) // Recursively upcast to latest version
}
return event
}
}
// Usage example: v1 -> v2 conversion
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => ({
...event,
schemaVersion: 2,
payload: {
...event.payload,
currency: event.payload.currency || 'KRW', // Add default value
shippingMethod: 'STANDARD', // Set default for new field
},
}))
Production Checklist
Event Store
- Set aggregate_id + version unique index on events table
- Verify Optimistic Concurrency Control implementation
- Configure snapshot strategy (every 50-100 events)
- Establish event archiving policy (move to cold storage)
- Verify Event Store backup and recovery procedures
CQRS
- Automate Read Model rebuild process
- Monitor read/write DB separation and replication lag
- Set up alerts and auto-retry for projection failures
- Handle Eventual Consistency (optimistic updates in UI)
Message Broker
- Kafka: Set partitioning key to aggregate ID
- Monitor consumer group lag (Burrow or Kafka Exporter)
- Configure and monitor Dead Letter Queue (DLQ)
- Decide message serialization format (Avro + Schema Registry recommended)
- Set broker cluster replication factor to 3 or more
Saga
- Define and test compensating transactions
- Verify Saga state persistence (DB storage)
- Set Saga timeout (prevent infinite wait)
- Document manual recovery process for compensation failures
Event Schema
- Introduce Schema Registry (Confluent Schema Registry, AWS Glue)
- Automate backward compatibility verification (include in CI pipeline)
- Implement Event Upcaster
- Maintain event catalog documentation
Monitoring
- Collect event processing latency metrics
- Set up Consumer Group lag alerts
- Build event processing failure rate dashboard
- Implement Correlation ID-based end-to-end tracing