Skip to content

필사 모드: Event-Driven Architecture + CQRS + Event Sourcing Practical Implementation: Distributed System Design with Kafka/RabbitMQ

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Introduction

As microservices architecture becomes ubiquitous, inter-service communication has become a critical factor determining system scalability and resilience. Synchronous REST call-based architecture creates strong coupling between services, and a single service failure triggers cascading failures that propagate throughout the entire system.

Event-Driven Architecture (EDA) is a representative pattern for solving this problem. Services publish events, and interested services subscribe to process them asynchronously. By combining CQRS (Command Query Responsibility Segregation) and Event Sourcing, you can independently scale read/write workloads and naturally obtain audit trails for all state changes.

This article covers EDA core patterns, CQRS and Event Sourcing implementation, message broker comparison, Saga pattern, and production failure cases with responses at the code level.

Event-Driven Architecture Core Patterns

Types of Events

Events in EDA are broadly classified into three categories:

| Type | Description | Example | Characteristics |

| ---------------------- | ------------------------------------------------- | ----------------------------- | ----------------------------------- |

| **Domain Event** | Meaningful facts occurring in the business domain | OrderPlaced, PaymentCompleted | Past tense naming, immutable |

| **Integration Event** | Events propagating across service boundaries | OrderShipped (to logistics) | Inter-service contract |

| **Notification Event** | Simple notifications (minimized data) | OrderStatusChanged | Receiver queries details separately |

EDA Communication Pattern Comparison

| Pattern | Coupling | Delivery Guarantee | Order Guarantee | Use Cases |

| ------------------- | -------- | ------------------ | --------------------- | ------------------------------------ |

| **Pub/Sub** | Loose | At-least-once | Not guaranteed | Notifications, cache invalidation |

| **Event Streaming** | Loose | At-least-once | Within partition | Log aggregation, real-time analytics |

| **Event Sourcing** | Self-ref | Persistent storage | Within aggregate | Audit trails, state reconstruction |

| **Request-Reply** | Strong | Synchronous | Request-response pair | When synchronous confirmation needed |

TypeScript Event Base Structure

// Domain event base interface

interface DomainEvent {

readonly eventId: string

readonly eventType: string

readonly aggregateId: string

readonly aggregateType: string

readonly version: number

readonly timestamp: Date

readonly metadata: EventMetadata

readonly payload: Record<string, unknown>

}

interface EventMetadata {

readonly correlationId: string

readonly causationId: string

readonly userId?: string

readonly traceId?: string

}

// Concrete event example

class OrderPlacedEvent implements DomainEvent {

readonly eventType = 'OrderPlaced'

readonly aggregateType = 'Order'

constructor(

public readonly eventId: string,

public readonly aggregateId: string,

public readonly version: number,

public readonly timestamp: Date,

public readonly metadata: EventMetadata,

public readonly payload: {

customerId: string

items: Array<{ productId: string; quantity: number; price: number }>

totalAmount: number

currency: string

}

) {}

}

CQRS (Command Query Responsibility Segregation) Deep Dive

CQRS Core Principles

CQRS is a pattern that completely separates the Read (Query) model from the Write (Command) model. In traditional CRUD models, the same data model handles both reading and writing, but in CQRS, separate models optimized for each purpose are used.

// Command side - Write model

interface Command {

readonly commandId: string

readonly commandType: string

readonly timestamp: Date

}

class PlaceOrderCommand implements Command {

readonly commandType = 'PlaceOrder'

constructor(

public readonly commandId: string,

public readonly timestamp: Date,

public readonly customerId: string,

public readonly items: Array<{

productId: string

quantity: number

price: number

}>,

public readonly shippingAddress: string

) {}

}

// Command Handler

class PlaceOrderHandler {

constructor(

private readonly orderRepository: OrderRepository,

private readonly eventBus: EventBus

) {}

async handle(command: PlaceOrderCommand): Promise<string> {

// Business validation

await this.validateInventory(command.items)

await this.validateCustomerCredit(command.customerId)

// Create aggregate and publish events

const order = Order.create(command.customerId, command.items, command.shippingAddress)

await this.orderRepository.save(order)

// Publish domain events

for (const event of order.getUncommittedEvents()) {

await this.eventBus.publish(event)

}

return order.id

}

private async validateInventory(

items: Array<{ productId: string; quantity: number; price: number }>

): Promise<void> {

// Inventory check logic

}

private async validateCustomerCredit(customerId: string): Promise<void> {

// Customer credit check logic

}

}

Query Side - Read Model

// Query side - Denormalized model optimized for reading

interface OrderSummaryReadModel {

orderId: string

customerName: string

customerEmail: string

orderDate: Date

status: string

totalAmount: number

itemCount: number

lastUpdated: Date

}

// Query Handler

class GetOrderSummaryHandler {

constructor(private readonly readDb: ReadDatabase) {}

async handle(orderId: string): Promise<OrderSummaryReadModel | null> {

// Query denormalized data directly from read-only DB

return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])

}

}

// Projection - Transform events into read models

class OrderSummaryProjection {

constructor(private readonly readDb: ReadDatabase) {}

async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {

await this.readDb.execute(

`INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)

VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,

[

event.aggregateId,

event.payload.customerId,

event.timestamp,

event.payload.totalAmount,

event.payload.items.length,

new Date(),

]

)

}

async handleOrderShipped(event: DomainEvent): Promise<void> {

await this.readDb.execute(

`UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,

[new Date(), event.aggregateId]

)

}

}

Event Sourcing Implementation

Event Store Design

In Event Sourcing, state is not stored directly; instead, a sequence of state change events is stored. Current state is reconstructed by replaying events in order.

// Event Store interface

interface EventStore {

append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>

getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>

getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>

}

// PostgreSQL-based Event Store implementation

class PostgresEventStore implements EventStore {

constructor(private readonly pool: Pool) {}

async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {

const client = await this.pool.connect()

try {

await client.query('BEGIN')

// Optimistic Concurrency Control

const result = await client.query(

'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',

[aggregateId]

)

const currentVersion = result.rows[0]?.current_version ?? 0

if (currentVersion !== expectedVersion) {

throw new ConcurrencyError(

`Expected version ${expectedVersion}, but current version is ${currentVersion}`

)

}

// Batch insert events

for (const event of events) {

await client.query(

`INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)

VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,

[

event.eventId,

event.aggregateId,

event.aggregateType,

event.eventType,

event.version,

JSON.stringify(event.payload),

JSON.stringify(event.metadata),

event.timestamp,

]

)

}

await client.query('COMMIT')

} catch (error) {

await client.query('ROLLBACK')

throw error

} finally {

client.release()

}

}

async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {

const result = await this.pool.query(

'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',

[aggregateId, fromVersion]

)

return result.rows.map(this.deserializeEvent)

}

async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {

const query = fromTimestamp

? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'

: 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'

const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]

const result = await this.pool.query(query, params)

return result.rows.map(this.deserializeEvent)

}

private deserializeEvent(row: any): DomainEvent {

return {

eventId: row.event_id,

eventType: row.event_type,

aggregateId: row.aggregate_id,

aggregateType: row.aggregate_type,

version: row.version,

timestamp: row.timestamp,

metadata: JSON.parse(row.metadata),

payload: JSON.parse(row.payload),

}

}

}

Aggregate and Event Replay

// Event Sourced Aggregate

abstract class EventSourcedAggregate {

private uncommittedEvents: DomainEvent[] = []

protected version: number = 0

abstract get id(): string

protected apply(event: DomainEvent): void {

this.when(event)

this.version = event.version

this.uncommittedEvents.push(event)

}

protected abstract when(event: DomainEvent): void

getUncommittedEvents(): DomainEvent[] {

return [...this.uncommittedEvents]

}

clearUncommittedEvents(): void {

this.uncommittedEvents = []

}

loadFromHistory(events: DomainEvent[]): void {

for (const event of events) {

this.when(event)

this.version = event.version

}

}

}

// Order Aggregate

class Order extends EventSourcedAggregate {

private _id: string = ''

private _customerId: string = ''

private _status: OrderStatus = OrderStatus.DRAFT

private _items: OrderItem[] = []

private _totalAmount: number = 0

get id(): string {

return this._id

}

static create(

customerId: string,

items: Array<{ productId: string; quantity: number; price: number }>,

shippingAddress: string

): Order {

const order = new Order()

const orderId = generateUUID()

const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

order.apply({

eventId: generateUUID(),

eventType: 'OrderPlaced',

aggregateId: orderId,

aggregateType: 'Order',

version: 1,

timestamp: new Date(),

metadata: { correlationId: generateUUID(), causationId: generateUUID() },

payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },

})

return order

}

confirm(): void {

if (this._status !== OrderStatus.PLACED) {

throw new Error('Order can only be confirmed when in PLACED status')

}

this.apply({

eventId: generateUUID(),

eventType: 'OrderConfirmed',

aggregateId: this._id,

aggregateType: 'Order',

version: this.version + 1,

timestamp: new Date(),

metadata: { correlationId: generateUUID(), causationId: generateUUID() },

payload: { confirmedAt: new Date().toISOString() },

})

}

protected when(event: DomainEvent): void {

switch (event.eventType) {

case 'OrderPlaced':

this._id = event.aggregateId

this._customerId = event.payload.customerId as string

this._status = OrderStatus.PLACED

this._items = event.payload.items as OrderItem[]

this._totalAmount = event.payload.totalAmount as number

break

case 'OrderConfirmed':

this._status = OrderStatus.CONFIRMED

break

case 'OrderShipped':

this._status = OrderStatus.SHIPPED

break

case 'OrderCancelled':

this._status = OrderStatus.CANCELLED

break

}

}

}

enum OrderStatus {

DRAFT = 'DRAFT',

PLACED = 'PLACED',

CONFIRMED = 'CONFIRMED',

SHIPPED = 'SHIPPED',

CANCELLED = 'CANCELLED',

}

Python Event Store Implementation

from dataclasses import dataclass, field

from datetime import datetime

from typing import Any, Protocol

from uuid import uuid4

@dataclass(frozen=True)

class DomainEvent:

event_id: str

event_type: str

aggregate_id: str

aggregate_type: str

version: int

timestamp: datetime

payload: dict[str, Any]

metadata: dict[str, str] = field(default_factory=dict)

class EventStore(Protocol):

async def append(

self,

aggregate_id: str,

events: list[DomainEvent],

expected_version: int,

) -> None: ...

async def get_events(

self, aggregate_id: str, from_version: int = 0

) -> list[DomainEvent]: ...

class PostgresEventStore:

def __init__(self, pool: asyncpg.Pool):

self._pool = pool

async def append(

self,

aggregate_id: str,

events: list[DomainEvent],

expected_version: int,

) -> None:

async with self._pool.acquire() as conn:

async with conn.transaction():

Optimistic Concurrency Control

row = await conn.fetchrow(

"SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",

aggregate_id,

)

current = row["cur"] or 0

if current != expected_version:

raise ConcurrencyError(

f"Expected {expected_version}, got {current}"

)

for event in events:

await conn.execute(

"""

INSERT INTO events

(event_id, aggregate_id, aggregate_type,

event_type, version, payload, metadata, timestamp)

VALUES ($1, $2, $3, $4, $5, $6, $7, $8)

""",

event.event_id,

event.aggregate_id,

event.aggregate_type,

event.event_type,

event.version,

json.dumps(event.payload),

json.dumps(event.metadata),

event.timestamp,

)

async def get_events(

self, aggregate_id: str, from_version: int = 0

) -> list[DomainEvent]:

async with self._pool.acquire() as conn:

rows = await conn.fetch(

"""

SELECT * FROM events

WHERE aggregate_id = $1 AND version > $2

ORDER BY version ASC

""",

aggregate_id,

from_version,

)

return [self._deserialize(row) for row in rows]

@staticmethod

def _deserialize(row) -> DomainEvent:

return DomainEvent(

event_id=row["event_id"],

event_type=row["event_type"],

aggregate_id=row["aggregate_id"],

aggregate_type=row["aggregate_type"],

version=row["version"],

timestamp=row["timestamp"],

payload=json.loads(row["payload"]),

metadata=json.loads(row["metadata"]),

)

Message Broker Comparison: Kafka vs RabbitMQ vs NATS

Key Characteristics Comparison

| Item | Apache Kafka | RabbitMQ | NATS JetStream |

| -------------------------- | -------------------------------- | ----------------------------------- | ------------------------------ |

| **Model** | Distributed log (Append-only) | Message queue (Push-based) | Streaming (Pull/Push) |

| **Message retention** | Persistent for configured period | Deleted after consumption (default) | Retained for configured period |

| **Order guarantee** | Within partition | Per queue | Within stream |

| **Throughput** | Millions per second | Tens of thousands/sec | Hundreds of thousands/sec |

| **Latency** | ms level (batch) | us level (single) | us level |

| **Consumer groups** | Native support | Competing consumer pattern | Native support |

| **Replay** | Offset-based free navigation | Limited (dead letter) | Sequence-based navigation |

| **Operational complexity** | High (ZooKeeper/KRaft) | Medium | Low |

| **Best for** | Event streaming, log aggregation | Task queues, RPC | Lightweight messaging, IoT |

Kafka Producer/Consumer Example

const kafka = new Kafka({

clientId: 'order-service',

brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],

logLevel: logLevel.WARN,

retry: {

initialRetryTime: 100,

retries: 8,

},

})

// Producer - Event publishing

class KafkaEventPublisher {

private producer = kafka.producer({

idempotent: true, // Idempotency guarantee

maxInFlightRequests: 5,

transactionalId: 'order-service-producer',

})

async publish(event: DomainEvent): Promise<void> {

await this.producer.send({

topic: `events.${event.aggregateType.toLowerCase()}`,

messages: [

{

key: event.aggregateId, // Partition key = Aggregate ID

value: JSON.stringify(event),

headers: {

'event-type': event.eventType,

'correlation-id': event.metadata.correlationId,

'content-type': 'application/json',

},

},

],

})

}

async publishBatch(events: DomainEvent[]): Promise<void> {

const transaction = await this.producer.transaction()

try {

for (const event of events) {

await transaction.send({

topic: `events.${event.aggregateType.toLowerCase()}`,

messages: [

{

key: event.aggregateId,

value: JSON.stringify(event),

headers: {

'event-type': event.eventType,

'correlation-id': event.metadata.correlationId,

},

},

],

})

}

await transaction.commit()

} catch (error) {

await transaction.abort()

throw error

}

}

}

// Consumer - Event consumption (with idempotency guarantee)

class KafkaEventConsumer {

private consumer = kafka.consumer({ groupId: 'projection-service' })

private processedEvents = new Set<string>()

async start(): Promise<void> {

await this.consumer.subscribe({

topics: ['events.order'],

fromBeginning: false,

})

await this.consumer.run({

eachMessage: async ({ topic, partition, message }) => {

const event: DomainEvent = JSON.parse(message.value!.toString())

// Idempotency check - skip already processed events

if (await this.isAlreadyProcessed(event.eventId)) {

console.log(`Skipping duplicate event: ${event.eventId}`)

return

}

await this.handleEvent(event)

await this.markAsProcessed(event.eventId)

},

})

}

private async isAlreadyProcessed(eventId: string): Promise<boolean> {

// Check processing status in Redis or DB

return this.processedEvents.has(eventId)

}

private async markAsProcessed(eventId: string): Promise<void> {

this.processedEvents.add(eventId)

}

private async handleEvent(event: DomainEvent): Promise<void> {

switch (event.eventType) {

case 'OrderPlaced':

await this.handleOrderPlaced(event)

break

case 'OrderConfirmed':

await this.handleOrderConfirmed(event)

break

}

}

private async handleOrderPlaced(event: DomainEvent): Promise<void> {

console.log(`Processing OrderPlaced for ${event.aggregateId}`)

}

private async handleOrderConfirmed(event: DomainEvent): Promise<void> {

console.log(`Processing OrderConfirmed for ${event.aggregateId}`)

}

}

Saga Pattern: Distributed Transaction Management

Orchestration Saga vs Choreography Saga

| Item | Orchestration | Choreography |

| ------------------ | --------------------------------- | ------------------------------------------ |

| **Control** | Central orchestrator coordinates | Each service reacts autonomously |

| **Coupling** | Depends on orchestrator | Loosely coupled between services |

| **Complexity** | Concentrated in orchestrator | Distributed across services |

| **Tracking** | Central state visibility | Distributed state tracking needed |

| **Error handling** | Central compensating transactions | Each service publishes compensation events |

| **Best for** | Complex workflows (5+ steps) | Simple workflows (3 or fewer steps) |

Orchestration Saga Implementation

// Saga state machine

enum SagaStatus {

STARTED = 'STARTED',

ORDER_CREATED = 'ORDER_CREATED',

PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',

INVENTORY_RESERVED = 'INVENTORY_RESERVED',

COMPLETED = 'COMPLETED',

COMPENSATING = 'COMPENSATING',

FAILED = 'FAILED',

}

interface SagaStep {

name: string

execute: (context: SagaContext) => Promise<void>

compensate: (context: SagaContext) => Promise<void>

}

interface SagaContext {

sagaId: string

orderId: string

customerId: string

items: Array<{ productId: string; quantity: number; price: number }>

paymentId?: string

reservationId?: string

[key: string]: unknown

}

class OrderSagaOrchestrator {

private steps: SagaStep[] = []

private completedSteps: SagaStep[] = []

constructor(

private readonly sagaRepository: SagaRepository,

private readonly eventBus: EventBus

) {

this.steps = [

{

name: 'CreateOrder',

execute: async (ctx) => {

await this.eventBus.publish({

eventType: 'CreateOrderRequested',

aggregateId: ctx.orderId,

payload: { customerId: ctx.customerId, items: ctx.items },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'CancelOrderRequested',

aggregateId: ctx.orderId,

payload: { reason: 'Saga compensation' },

} as DomainEvent)

},

},

{

name: 'ProcessPayment',

execute: async (ctx) => {

const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

await this.eventBus.publish({

eventType: 'ProcessPaymentRequested',

aggregateId: ctx.orderId,

payload: { customerId: ctx.customerId, amount: totalAmount },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'RefundPaymentRequested',

aggregateId: ctx.orderId,

payload: { paymentId: ctx.paymentId },

} as DomainEvent)

},

},

{

name: 'ReserveInventory',

execute: async (ctx) => {

await this.eventBus.publish({

eventType: 'ReserveInventoryRequested',

aggregateId: ctx.orderId,

payload: { items: ctx.items },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'ReleaseInventoryRequested',

aggregateId: ctx.orderId,

payload: { reservationId: ctx.reservationId },

} as DomainEvent)

},

},

]

}

async start(context: SagaContext): Promise<void> {

await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)

await this.executeNextStep(context, 0)

}

private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {

if (stepIndex >= this.steps.length) {

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)

return

}

const step = this.steps[stepIndex]

try {

await step.execute(context)

this.completedSteps.push(step)

await this.executeNextStep(context, stepIndex + 1)

} catch (error) {

console.error(`Saga step '${step.name}' failed:`, error)

await this.compensate(context)

}

}

private async compensate(context: SagaContext): Promise<void> {

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)

// Compensate completed steps in reverse order

for (const step of [...this.completedSteps].reverse()) {

try {

await step.compensate(context)

} catch (error) {

console.error(`Compensation for '${step.name}' failed:`, error)

// Manual intervention needed on compensation failure - send alert

}

}

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)

}

}

Snapshot Strategy: Event Replay Optimization

As the number of events grows, replay time increases. Snapshots solve this by storing the aggregate state at a specific point in time.

// Snapshot-based aggregate loading

class EventSourcedRepository<T extends EventSourcedAggregate> {

private readonly SNAPSHOT_INTERVAL = 50 // Snapshot every 50 events

constructor(

private readonly eventStore: EventStore,

private readonly snapshotStore: SnapshotStore,

private readonly factory: () => T

) {}

async load(aggregateId: string): Promise<T> {

const aggregate = this.factory()

// 1. Load latest snapshot

const snapshot = await this.snapshotStore.getLatest(aggregateId)

if (snapshot) {

aggregate.restoreFromSnapshot(snapshot.state)

// 2. Replay only events after snapshot

const events = await this.eventStore.getEvents(aggregateId, snapshot.version)

aggregate.loadFromHistory(events)

} else {

// 3. Full event replay

const events = await this.eventStore.getEvents(aggregateId)

aggregate.loadFromHistory(events)

}

return aggregate

}

async save(aggregate: T): Promise<void> {

const events = aggregate.getUncommittedEvents()

await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)

// Check snapshot creation condition

if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {

await this.snapshotStore.save({

aggregateId: aggregate.id,

version: aggregate.version,

state: aggregate.toSnapshot(),

timestamp: new Date(),

})

}

aggregate.clearUncommittedEvents()

}

}

Operational Considerations

Critical Warnings

1. **Event schema changes are more dangerous than DB migrations**: Events in the Event Store are immutable, so schema changes break past event replay. Backward compatibility must always be maintained.

2. **CQRS adoption at least doubles complexity**: The synchronization delay (eventual consistency) between read and write models must be handled in both UI and business logic.

3. **Saga compensating transaction failures require manual intervention**: Compensating transactions can also fail, and in such cases, Dead Letter Queues and manual recovery processes are essential.

4. **Event order inversion destroys data consistency**: If Kafka partitioning keys are misconfigured, events from the same aggregate are distributed across different partitions, causing order reversal.

5. **Event Store infinite growth must be managed**: Without snapshot and archiving strategies, the Event Store grows infinitely and replay performance degrades sharply.

Failure Cases and Recovery Procedures

Case 1: Event Order Inversion

**Symptom**: OrderShipped event arrives before OrderPlaced, causing projection failure.

**Cause**: Kafka partitioning key was not set, causing events to be distributed across different partitions.

**Recovery**:

1. Reset consumer group offsets

kafka-consumer-groups.sh --bootstrap-server kafka:9092 \

--group projection-service \

--topic events.order \

--reset-offsets --to-earliest --execute

2. Reset projection table

psql -c "TRUNCATE TABLE order_summaries;"

3. Restart projection service (full replay)

kubectl rollout restart deployment/projection-service

**Prevention**: Always use aggregate ID as the Kafka partitioning key.

Case 2: Duplicate Event Processing

**Symptom**: Orders created in duplicate, or payments processed twice.

**Cause**: After consumer failure and restart, already processed messages are consumed again (at-least-once characteristic).

**Recovery**:

// Idempotency key table-based duplicate prevention

class IdempotencyGuard {

constructor(private readonly db: Database) {}

async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {

try {

// Prevent duplicate insertion with UNIQUE constraint

await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [

idempotencyKey,

new Date(),

])

} catch (error) {

// Already processed event

console.log(`Event ${idempotencyKey} already processed, skipping`)

return null

}

return operation()

}

}

Case 3: Schema Evolution Failure

**Symptom**: New version event handler cannot deserialize past events, causing projection to stop.

**Cause**: Required field added to events while past events lack that field.

**Recovery**:

// Event Upcaster pattern

class EventUpcaster {

private upcasters: Map<string, (event: any) => DomainEvent> = new Map()

register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {

this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)

}

upcast(event: any): DomainEvent {

const key = `${event.eventType}:v${event.schemaVersion || 1}`

const upcaster = this.upcasters.get(key)

if (upcaster) {

return this.upcast(upcaster(event)) // Recursively upcast to latest version

}

return event

}

}

// Usage example: v1 -> v2 conversion

const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => ({

...event,

schemaVersion: 2,

payload: {

...event.payload,

currency: event.payload.currency || 'KRW', // Add default value

shippingMethod: 'STANDARD', // Set default for new field

},

}))

Production Checklist

Event Store

- [ ] Set aggregate_id + version unique index on events table

- [ ] Verify Optimistic Concurrency Control implementation

- [ ] Configure snapshot strategy (every 50-100 events)

- [ ] Establish event archiving policy (move to cold storage)

- [ ] Verify Event Store backup and recovery procedures

CQRS

- [ ] Automate Read Model rebuild process

- [ ] Monitor read/write DB separation and replication lag

- [ ] Set up alerts and auto-retry for projection failures

- [ ] Handle Eventual Consistency (optimistic updates in UI)

Message Broker

- [ ] Kafka: Set partitioning key to aggregate ID

- [ ] Monitor consumer group lag (Burrow or Kafka Exporter)

- [ ] Configure and monitor Dead Letter Queue (DLQ)

- [ ] Decide message serialization format (Avro + Schema Registry recommended)

- [ ] Set broker cluster replication factor to 3 or more

Saga

- [ ] Define and test compensating transactions

- [ ] Verify Saga state persistence (DB storage)

- [ ] Set Saga timeout (prevent infinite wait)

- [ ] Document manual recovery process for compensation failures

Event Schema

- [ ] Introduce Schema Registry (Confluent Schema Registry, AWS Glue)

- [ ] Automate backward compatibility verification (include in CI pipeline)

- [ ] Implement Event Upcaster

- [ ] Maintain event catalog documentation

Monitoring

- [ ] Collect event processing latency metrics

- [ ] Set up Consumer Group lag alerts

- [ ] Build event processing failure rate dashboard

- [ ] Implement Correlation ID-based end-to-end tracing

References

- [Microsoft - CQRS Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs)

- [Microsoft - Event Sourcing Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)

- [Martin Fowler - Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html)

- [Martin Fowler - CQRS](https://martinfowler.com/bliki/CQRS.html)

- [Chris Richardson - Saga Pattern](https://microservices.io/patterns/data/saga.html)

- [Apache Kafka Documentation](https://kafka.apache.org/documentation/)

- [Confluent - Event-Driven Architecture](https://www.confluent.io/learn/event-driven-architecture/)

- [Greg Young - CQRS Documents](https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf)

- [Vaughn Vernon - Implementing Domain-Driven Design](https://www.oreilly.com/library/view/implementing-domain-driven-design/9780133039900/)

현재 단락 (1/785)

As microservices architecture becomes ubiquitous, inter-service communication has become a critical ...

작성 글자: 0원문 글자: 25,865작성 단락: 0/785