Skip to content

필사 모드: Event-Driven Architecture + CQRS + Event Sourcing 실전 구현: Kafka/RabbitMQ 기반 분산 시스템 설계

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

마이크로서비스 아키텍처가 보편화되면서, 서비스 간 통신 방식은 시스템의 확장성과 복원력을 결정짓는 핵심 요소가 되었다. 동기 REST 호출 기반 아키텍처는 서비스 간 강한 결합을 만들고, 하나의 서비스 장애가 전체 시스템으로 전파되는 캐스케이드 장애를 유발한다.

Event-Driven Architecture(EDA)는 이 문제를 해결하는 대표적인 패턴이다. 서비스는 이벤트를 발행(publish)하고, 관심 있는 서비스가 이를 구독(subscribe)하여 비동기적으로 처리한다. 여기에 CQRS(Command Query Responsibility Segregation)와 Event Sourcing을 결합하면 읽기/쓰기 워크로드를 독립적으로 스케일링하고, 모든 상태 변경의 감사 추적(audit trail)을 자연스럽게 확보할 수 있다.

이 글에서는 EDA의 핵심 패턴부터 CQRS와 Event Sourcing의 구현, 메시지 브로커 비교, Saga 패턴, 그리고 프로덕션에서 겪는 장애 사례와 대응 방법을 코드 수준에서 다룬다.

Event-Driven Architecture 핵심 패턴

이벤트의 종류

EDA에서 이벤트는 크게 세 가지로 분류된다.

| 유형 | 설명 | 예시 | 특징 |

| ---------------------- | ----------------------------------------- | ----------------------------- | -------------------------------- |

| **Domain Event** | 비즈니스 도메인에서 발생한 의미 있는 사실 | OrderPlaced, PaymentCompleted | 과거형 명명, 불변 |

| **Integration Event** | 서비스 경계를 넘어 전파되는 이벤트 | OrderShipped (물류 서비스로) | 서비스 간 계약 |

| **Notification Event** | 단순 알림 (데이터 최소화) | OrderStatusChanged | 수신자가 상세 데이터를 별도 조회 |

EDA 통신 패턴 비교

| 패턴 | 결합도 | 전달 보장 | 순서 보장 | 사용 사례 |

| ------------------- | --------- | ------------- | ------------------ | ---------------------- |

| **Pub/Sub** | 느슨 | At-least-once | 보장 안 됨 | 알림, 캐시 무효화 |

| **Event Streaming** | 느슨 | At-least-once | 파티션 내 보장 | 로그 집계, 실시간 분석 |

| **Event Sourcing** | 자기 참조 | 영속 저장 | 애그리거트 내 보장 | 감사 추적, 상태 복원 |

| **Request-Reply** | 강함 | 동기 | 요청-응답 쌍 | 동기적 확인 필요 시 |

TypeScript 이벤트 기본 구조

// 도메인 이벤트 기본 인터페이스

interface DomainEvent {

readonly eventId: string

readonly eventType: string

readonly aggregateId: string

readonly aggregateType: string

readonly version: number

readonly timestamp: Date

readonly metadata: EventMetadata

readonly payload: Record<string, unknown>

}

interface EventMetadata {

readonly correlationId: string

readonly causationId: string

readonly userId?: string

readonly traceId?: string

}

// 구체적 이벤트 예시

class OrderPlacedEvent implements DomainEvent {

readonly eventType = 'OrderPlaced'

readonly aggregateType = 'Order'

constructor(

public readonly eventId: string,

public readonly aggregateId: string,

public readonly version: number,

public readonly timestamp: Date,

public readonly metadata: EventMetadata,

public readonly payload: {

customerId: string

items: Array<{ productId: string; quantity: number; price: number }>

totalAmount: number

currency: string

}

) {}

}

CQRS (Command Query Responsibility Segregation) 심화

CQRS의 핵심 원칙

CQRS는 읽기(Query) 모델과 쓰기(Command) 모델을 완전히 분리하는 패턴이다. 전통적인 CRUD 모델에서는 동일한 데이터 모델로 읽기와 쓰기를 모두 처리하지만, CQRS에서는 각각의 목적에 최적화된 별도의 모델을 사용한다.

// Command 측 - 쓰기 모델

interface Command {

readonly commandId: string

readonly commandType: string

readonly timestamp: Date

}

class PlaceOrderCommand implements Command {

readonly commandType = 'PlaceOrder'

constructor(

public readonly commandId: string,

public readonly timestamp: Date,

public readonly customerId: string,

public readonly items: Array<{

productId: string

quantity: number

price: number

}>,

public readonly shippingAddress: string

) {}

}

// Command Handler

class PlaceOrderHandler {

constructor(

private readonly orderRepository: OrderRepository,

private readonly eventBus: EventBus

) {}

async handle(command: PlaceOrderCommand): Promise<string> {

// 비즈니스 검증

await this.validateInventory(command.items)

await this.validateCustomerCredit(command.customerId)

// 애그리거트 생성 및 이벤트 발행

const order = Order.create(command.customerId, command.items, command.shippingAddress)

await this.orderRepository.save(order)

// 도메인 이벤트 발행

for (const event of order.getUncommittedEvents()) {

await this.eventBus.publish(event)

}

return order.id

}

private async validateInventory(

items: Array<{ productId: string; quantity: number; price: number }>

): Promise<void> {

// 재고 확인 로직

}

private async validateCustomerCredit(customerId: string): Promise<void> {

// 고객 신용 확인 로직

}

}

Query 측 - 읽기 모델

// Query 측 - 읽기에 최적화된 비정규화 모델

interface OrderSummaryReadModel {

orderId: string

customerName: string

customerEmail: string

orderDate: Date

status: string

totalAmount: number

itemCount: number

lastUpdated: Date

}

// Query Handler

class GetOrderSummaryHandler {

constructor(private readonly readDb: ReadDatabase) {}

async handle(orderId: string): Promise<OrderSummaryReadModel | null> {

// 읽기 전용 DB에서 비정규화된 데이터 직접 조회

return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])

}

}

// Projection - 이벤트를 읽기 모델로 변환

class OrderSummaryProjection {

constructor(private readonly readDb: ReadDatabase) {}

async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {

await this.readDb.execute(

`INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)

VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,

[

event.aggregateId,

event.payload.customerId,

event.timestamp,

event.payload.totalAmount,

event.payload.items.length,

new Date(),

]

)

}

async handleOrderShipped(event: DomainEvent): Promise<void> {

await this.readDb.execute(

`UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,

[new Date(), event.aggregateId]

)

}

}

Event Sourcing 구현

Event Store 설계

Event Sourcing에서는 상태를 직접 저장하지 않고, 상태 변경 이벤트의 시퀀스를 저장한다. 현재 상태는 이벤트를 순서대로 재생(replay)하여 복원한다.

// Event Store 인터페이스

interface EventStore {

append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>

getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>

getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>

}

// PostgreSQL 기반 Event Store 구현

class PostgresEventStore implements EventStore {

constructor(private readonly pool: Pool) {}

async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {

const client = await this.pool.connect()

try {

await client.query('BEGIN')

// 낙관적 동시성 제어 (Optimistic Concurrency Control)

const result = await client.query(

'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',

[aggregateId]

)

const currentVersion = result.rows[0]?.current_version ?? 0

if (currentVersion !== expectedVersion) {

throw new ConcurrencyError(

`Expected version ${expectedVersion}, but current version is ${currentVersion}`

)

}

// 이벤트 배치 삽입

for (const event of events) {

await client.query(

`INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)

VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,

[

event.eventId,

event.aggregateId,

event.aggregateType,

event.eventType,

event.version,

JSON.stringify(event.payload),

JSON.stringify(event.metadata),

event.timestamp,

]

)

}

await client.query('COMMIT')

} catch (error) {

await client.query('ROLLBACK')

throw error

} finally {

client.release()

}

}

async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {

const result = await this.pool.query(

'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',

[aggregateId, fromVersion]

)

return result.rows.map(this.deserializeEvent)

}

async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {

const query = fromTimestamp

? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'

: 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'

const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]

const result = await this.pool.query(query, params)

return result.rows.map(this.deserializeEvent)

}

private deserializeEvent(row: any): DomainEvent {

return {

eventId: row.event_id,

eventType: row.event_type,

aggregateId: row.aggregate_id,

aggregateType: row.aggregate_type,

version: row.version,

timestamp: row.timestamp,

metadata: JSON.parse(row.metadata),

payload: JSON.parse(row.payload),

}

}

}

애그리거트와 이벤트 재생

// Event Sourced 애그리거트

abstract class EventSourcedAggregate {

private uncommittedEvents: DomainEvent[] = []

protected version: number = 0

abstract get id(): string

protected apply(event: DomainEvent): void {

this.when(event)

this.version = event.version

this.uncommittedEvents.push(event)

}

protected abstract when(event: DomainEvent): void

getUncommittedEvents(): DomainEvent[] {

return [...this.uncommittedEvents]

}

clearUncommittedEvents(): void {

this.uncommittedEvents = []

}

loadFromHistory(events: DomainEvent[]): void {

for (const event of events) {

this.when(event)

this.version = event.version

}

}

}

// Order 애그리거트

class Order extends EventSourcedAggregate {

private _id: string = ''

private _customerId: string = ''

private _status: OrderStatus = OrderStatus.DRAFT

private _items: OrderItem[] = []

private _totalAmount: number = 0

get id(): string {

return this._id

}

static create(

customerId: string,

items: Array<{ productId: string; quantity: number; price: number }>,

shippingAddress: string

): Order {

const order = new Order()

const orderId = generateUUID()

const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)

order.apply({

eventId: generateUUID(),

eventType: 'OrderPlaced',

aggregateId: orderId,

aggregateType: 'Order',

version: 1,

timestamp: new Date(),

metadata: { correlationId: generateUUID(), causationId: generateUUID() },

payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },

})

return order

}

confirm(): void {

if (this._status !== OrderStatus.PLACED) {

throw new Error('Order can only be confirmed when in PLACED status')

}

this.apply({

eventId: generateUUID(),

eventType: 'OrderConfirmed',

aggregateId: this._id,

aggregateType: 'Order',

version: this.version + 1,

timestamp: new Date(),

metadata: { correlationId: generateUUID(), causationId: generateUUID() },

payload: { confirmedAt: new Date().toISOString() },

})

}

protected when(event: DomainEvent): void {

switch (event.eventType) {

case 'OrderPlaced':

this._id = event.aggregateId

this._customerId = event.payload.customerId as string

this._status = OrderStatus.PLACED

this._items = event.payload.items as OrderItem[]

this._totalAmount = event.payload.totalAmount as number

break

case 'OrderConfirmed':

this._status = OrderStatus.CONFIRMED

break

case 'OrderShipped':

this._status = OrderStatus.SHIPPED

break

case 'OrderCancelled':

this._status = OrderStatus.CANCELLED

break

}

}

}

enum OrderStatus {

DRAFT = 'DRAFT',

PLACED = 'PLACED',

CONFIRMED = 'CONFIRMED',

SHIPPED = 'SHIPPED',

CANCELLED = 'CANCELLED',

}

Python Event Store 구현

from dataclasses import dataclass, field

from datetime import datetime

from typing import Any, Protocol

from uuid import uuid4

@dataclass(frozen=True)

class DomainEvent:

event_id: str

event_type: str

aggregate_id: str

aggregate_type: str

version: int

timestamp: datetime

payload: dict[str, Any]

metadata: dict[str, str] = field(default_factory=dict)

class EventStore(Protocol):

async def append(

self,

aggregate_id: str,

events: list[DomainEvent],

expected_version: int,

) -> None: ...

async def get_events(

self, aggregate_id: str, from_version: int = 0

) -> list[DomainEvent]: ...

class PostgresEventStore:

def __init__(self, pool: asyncpg.Pool):

self._pool = pool

async def append(

self,

aggregate_id: str,

events: list[DomainEvent],

expected_version: int,

) -> None:

async with self._pool.acquire() as conn:

async with conn.transaction():

낙관적 동시성 제어

row = await conn.fetchrow(

"SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",

aggregate_id,

)

current = row["cur"] or 0

if current != expected_version:

raise ConcurrencyError(

f"Expected {expected_version}, got {current}"

)

for event in events:

await conn.execute(

"""

INSERT INTO events

(event_id, aggregate_id, aggregate_type,

event_type, version, payload, metadata, timestamp)

VALUES ($1, $2, $3, $4, $5, $6, $7, $8)

""",

event.event_id,

event.aggregate_id,

event.aggregate_type,

event.event_type,

event.version,

json.dumps(event.payload),

json.dumps(event.metadata),

event.timestamp,

)

async def get_events(

self, aggregate_id: str, from_version: int = 0

) -> list[DomainEvent]:

async with self._pool.acquire() as conn:

rows = await conn.fetch(

"""

SELECT * FROM events

WHERE aggregate_id = $1 AND version > $2

ORDER BY version ASC

""",

aggregate_id,

from_version,

)

return [self._deserialize(row) for row in rows]

@staticmethod

def _deserialize(row) -> DomainEvent:

return DomainEvent(

event_id=row["event_id"],

event_type=row["event_type"],

aggregate_id=row["aggregate_id"],

aggregate_type=row["aggregate_type"],

version=row["version"],

timestamp=row["timestamp"],

payload=json.loads(row["payload"]),

metadata=json.loads(row["metadata"]),

)

메시지 브로커 비교: Kafka vs RabbitMQ vs NATS

핵심 특성 비교

| 항목 | Apache Kafka | RabbitMQ | NATS JetStream |

| --------------- | -------------------------- | --------------------- | -------------------- |

| **모델** | 분산 로그 (Append-only) | 메시지 큐 (Push 기반) | 스트리밍 (Pull/Push) |

| **메시지 보존** | 설정된 기간 동안 영구 보존 | 소비 후 삭제 (기본) | 설정된 기간 보존 |

| **순서 보장** | 파티션 내 보장 | 큐 단위 보장 | 스트림 내 보장 |

| **처리량** | 초당 수백만 건 | 초당 수만 건 | 초당 수십만 건 |

| **지연시간** | ms 단위 (배치) | us 단위 (단건) | us 단위 |

| **컨슈머 그룹** | 네이티브 지원 | 경쟁 소비자 패턴 | 네이티브 지원 |

| **Replay** | 오프셋 기반 자유 이동 | 제한적 (dead letter) | 시퀀스 기반 이동 |

| **운영 복잡도** | 높음 (ZooKeeper/KRaft) | 중간 | 낮음 |

| **적합한 사례** | 이벤트 스트리밍, 로그 집계 | 태스크 큐, RPC | 경량 메시징, IoT |

Kafka Producer/Consumer 예시

const kafka = new Kafka({

clientId: 'order-service',

brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],

logLevel: logLevel.WARN,

retry: {

initialRetryTime: 100,

retries: 8,

},

})

// Producer - 이벤트 발행

class KafkaEventPublisher {

private producer = kafka.producer({

idempotent: true, // 멱등성 보장

maxInFlightRequests: 5,

transactionalId: 'order-service-producer',

})

async publish(event: DomainEvent): Promise<void> {

await this.producer.send({

topic: `events.${event.aggregateType.toLowerCase()}`,

messages: [

{

key: event.aggregateId, // 파티셔닝 키 = 애그리거트 ID

value: JSON.stringify(event),

headers: {

'event-type': event.eventType,

'correlation-id': event.metadata.correlationId,

'content-type': 'application/json',

},

},

],

})

}

async publishBatch(events: DomainEvent[]): Promise<void> {

const transaction = await this.producer.transaction()

try {

for (const event of events) {

await transaction.send({

topic: `events.${event.aggregateType.toLowerCase()}`,

messages: [

{

key: event.aggregateId,

value: JSON.stringify(event),

headers: {

'event-type': event.eventType,

'correlation-id': event.metadata.correlationId,

},

},

],

})

}

await transaction.commit()

} catch (error) {

await transaction.abort()

throw error

}

}

}

// Consumer - 이벤트 소비 (멱등성 보장)

class KafkaEventConsumer {

private consumer = kafka.consumer({ groupId: 'projection-service' })

private processedEvents = new Set<string>()

async start(): Promise<void> {

await this.consumer.subscribe({

topics: ['events.order'],

fromBeginning: false,

})

await this.consumer.run({

eachMessage: async ({ topic, partition, message }) => {

const event: DomainEvent = JSON.parse(message.value!.toString())

// 멱등성 체크 - 이미 처리된 이벤트 스킵

if (await this.isAlreadyProcessed(event.eventId)) {

console.log(`Skipping duplicate event: ${event.eventId}`)

return

}

await this.handleEvent(event)

await this.markAsProcessed(event.eventId)

},

})

}

private async isAlreadyProcessed(eventId: string): Promise<boolean> {

// Redis 또는 DB에서 처리 여부 확인

return this.processedEvents.has(eventId)

}

private async markAsProcessed(eventId: string): Promise<void> {

this.processedEvents.add(eventId)

}

private async handleEvent(event: DomainEvent): Promise<void> {

switch (event.eventType) {

case 'OrderPlaced':

await this.handleOrderPlaced(event)

break

case 'OrderConfirmed':

await this.handleOrderConfirmed(event)

break

}

}

private async handleOrderPlaced(event: DomainEvent): Promise<void> {

console.log(`Processing OrderPlaced for ${event.aggregateId}`)

}

private async handleOrderConfirmed(event: DomainEvent): Promise<void> {

console.log(`Processing OrderConfirmed for ${event.aggregateId}`)

}

}

Saga 패턴: 분산 트랜잭션 관리

Orchestration Saga vs Choreography Saga

| 항목 | Orchestration | Choreography |

| --------------- | --------------------------------- | --------------------------------- |

| **제어 방식** | 중앙 오케스트레이터가 조율 | 각 서비스가 자율적으로 반응 |

| **결합도** | 오케스트레이터에 의존 | 서비스 간 느슨한 결합 |

| **복잡성** | 오케스트레이터에 집중 | 서비스에 분산 |

| **추적** | 중앙에서 상태 확인 가능 | 분산된 상태 추적 필요 |

| **에러 처리** | 중앙에서 보상 트랜잭션 실행 | 각 서비스가 보상 이벤트 발행 |

| **적합한 경우** | 복잡한 워크플로우 (5개 이상 단계) | 단순한 워크플로우 (3개 이하 단계) |

Orchestration Saga 구현

// Saga 상태 머신

enum SagaStatus {

STARTED = 'STARTED',

ORDER_CREATED = 'ORDER_CREATED',

PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',

INVENTORY_RESERVED = 'INVENTORY_RESERVED',

COMPLETED = 'COMPLETED',

COMPENSATING = 'COMPENSATING',

FAILED = 'FAILED',

}

interface SagaStep {

name: string

execute: (context: SagaContext) => Promise<void>

compensate: (context: SagaContext) => Promise<void>

}

interface SagaContext {

sagaId: string

orderId: string

customerId: string

items: Array<{ productId: string; quantity: number; price: number }>

paymentId?: string

reservationId?: string

[key: string]: unknown

}

class OrderSagaOrchestrator {

private steps: SagaStep[] = []

private completedSteps: SagaStep[] = []

constructor(

private readonly sagaRepository: SagaRepository,

private readonly eventBus: EventBus

) {

this.steps = [

{

name: 'CreateOrder',

execute: async (ctx) => {

await this.eventBus.publish({

eventType: 'CreateOrderRequested',

aggregateId: ctx.orderId,

payload: { customerId: ctx.customerId, items: ctx.items },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'CancelOrderRequested',

aggregateId: ctx.orderId,

payload: { reason: 'Saga compensation' },

} as DomainEvent)

},

},

{

name: 'ProcessPayment',

execute: async (ctx) => {

const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)

await this.eventBus.publish({

eventType: 'ProcessPaymentRequested',

aggregateId: ctx.orderId,

payload: { customerId: ctx.customerId, amount: totalAmount },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'RefundPaymentRequested',

aggregateId: ctx.orderId,

payload: { paymentId: ctx.paymentId },

} as DomainEvent)

},

},

{

name: 'ReserveInventory',

execute: async (ctx) => {

await this.eventBus.publish({

eventType: 'ReserveInventoryRequested',

aggregateId: ctx.orderId,

payload: { items: ctx.items },

} as DomainEvent)

},

compensate: async (ctx) => {

await this.eventBus.publish({

eventType: 'ReleaseInventoryRequested',

aggregateId: ctx.orderId,

payload: { reservationId: ctx.reservationId },

} as DomainEvent)

},

},

]

}

async start(context: SagaContext): Promise<void> {

await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)

await this.executeNextStep(context, 0)

}

private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {

if (stepIndex >= this.steps.length) {

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)

return

}

const step = this.steps[stepIndex]

try {

await step.execute(context)

this.completedSteps.push(step)

await this.executeNextStep(context, stepIndex + 1)

} catch (error) {

console.error(`Saga step '${step.name}' failed:`, error)

await this.compensate(context)

}

}

private async compensate(context: SagaContext): Promise<void> {

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)

// 완료된 스텝을 역순으로 보상

for (const step of [...this.completedSteps].reverse()) {

try {

await step.compensate(context)

} catch (error) {

console.error(`Compensation for '${step.name}' failed:`, error)

// 보상 실패 시 수동 개입 필요 - 알림 발송

}

}

await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)

}

}

Snapshot 전략: 이벤트 재생 최적화

이벤트 수가 많아지면 재생(replay) 시간이 길어진다. Snapshot은 특정 시점의 애그리거트 상태를 저장하여 이 문제를 해결한다.

// Snapshot 기반 애그리거트 로딩

class EventSourcedRepository<T extends EventSourcedAggregate> {

private readonly SNAPSHOT_INTERVAL = 50 // 50개 이벤트마다 스냅샷

constructor(

private readonly eventStore: EventStore,

private readonly snapshotStore: SnapshotStore,

private readonly factory: () => T

) {}

async load(aggregateId: string): Promise<T> {

const aggregate = this.factory()

// 1. 최신 스냅샷 로드

const snapshot = await this.snapshotStore.getLatest(aggregateId)

if (snapshot) {

aggregate.restoreFromSnapshot(snapshot.state)

// 2. 스냅샷 이후의 이벤트만 재생

const events = await this.eventStore.getEvents(aggregateId, snapshot.version)

aggregate.loadFromHistory(events)

} else {

// 3. 전체 이벤트 재생

const events = await this.eventStore.getEvents(aggregateId)

aggregate.loadFromHistory(events)

}

return aggregate

}

async save(aggregate: T): Promise<void> {

const events = aggregate.getUncommittedEvents()

await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)

// 스냅샷 생성 조건 확인

if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {

await this.snapshotStore.save({

aggregateId: aggregate.id,

version: aggregate.version,

state: aggregate.toSnapshot(),

timestamp: new Date(),

})

}

aggregate.clearUncommittedEvents()

}

}

운영 주의사항

반드시 알아야 할 핵심 경고

1. **이벤트 스키마 변경은 DB 마이그레이션보다 위험하다**: Event Store의 이벤트는 불변이므로, 스키마를 변경하면 과거 이벤트 재생이 깨진다. 반드시 하위 호환성(backward compatibility)을 유지해야 한다.

2. **CQRS 도입은 복잡도를 최소 2배로 만든다**: 읽기 모델과 쓰기 모델의 동기화 지연(eventual consistency)을 UI와 비즈니스 로직에서 반드시 처리해야 한다.

3. **Saga 보상 트랜잭션 실패는 수동 개입이 필요하다**: 보상 트랜잭션도 실패할 수 있으며, 이 경우 Dead Letter Queue와 수동 복구 프로세스가 반드시 필요하다.

4. **이벤트 순서 역전은 데이터 정합성을 파괴한다**: Kafka의 파티셔닝 키가 잘못 설정되면 동일 애그리거트의 이벤트가 다른 파티션에 분산되어 순서가 뒤바뀐다.

5. **Event Store의 무한 성장을 관리해야 한다**: Snapshot과 아카이빙 전략 없이는 Event Store가 무한히 커져 재생 성능이 급격히 저하된다.

장애 사례와 복구 절차

사례 1: 이벤트 순서 역전

**증상**: OrderPlaced 이전에 OrderShipped 이벤트가 도착하여 프로젝션이 실패한다.

**원인**: Kafka 파티셔닝 키를 설정하지 않아 이벤트가 다른 파티션에 분산되었다.

**복구**:

1. 컨슈머 그룹 오프셋 리셋

kafka-consumer-groups.sh --bootstrap-server kafka:9092 \

--group projection-service \

--topic events.order \

--reset-offsets --to-earliest --execute

2. 프로젝션 테이블 초기화

psql -c "TRUNCATE TABLE order_summaries;"

3. 프로젝션 서비스 재시작 (전체 재생)

kubectl rollout restart deployment/projection-service

**예방**: 반드시 애그리거트 ID를 Kafka 파티셔닝 키로 사용한다.

사례 2: 중복 이벤트 처리

**증상**: 주문이 이중으로 생성되거나, 결제가 두 번 처리된다.

**원인**: 컨슈머 장애 후 재시작 시 이미 처리된 메시지를 다시 소비한다 (at-least-once 특성).

**복구**:

// 멱등성 키 테이블 기반 중복 방지

class IdempotencyGuard {

constructor(private readonly db: Database) {}

async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {

try {

// UNIQUE 제약 조건으로 중복 삽입 방지

await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [

idempotencyKey,

new Date(),

])

} catch (error) {

// 이미 처리된 이벤트

console.log(`Event ${idempotencyKey} already processed, skipping`)

return null

}

return operation()

}

}

사례 3: 스키마 진화 실패

**증상**: 새 버전의 이벤트 핸들러가 과거 이벤트를 역직렬화하지 못해 프로젝션이 중단된다.

**원인**: 이벤트에 필수 필드를 추가하면서 과거 이벤트에는 해당 필드가 없다.

**복구**:

// 이벤트 업캐스터 (Upcaster) 패턴

class EventUpcaster {

private upcasters: Map<string, (event: any) => DomainEvent> = new Map()

register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {

this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)

}

upcast(event: any): DomainEvent {

const key = `${event.eventType}:v${event.schemaVersion || 1}`

const upcaster = this.upcasters.get(key)

if (upcaster) {

return this.upcast(upcaster(event)) // 재귀적으로 최신 버전까지

}

return event

}

}

// 사용 예: v1 -> v2 변환

const upcaster = new EventUpcaster()

upcaster.register('OrderPlaced', 1, (event) => ({

...event,

schemaVersion: 2,

payload: {

...event.payload,

currency: event.payload.currency || 'KRW', // 기본값 추가

shippingMethod: 'STANDARD', // 새 필드에 기본값 설정

},

}))

프로덕션 체크리스트

Event Store

- [ ] 이벤트 테이블에 aggregate_id + version 유니크 인덱스 설정

- [ ] 낙관적 동시성 제어(Optimistic Concurrency) 구현 확인

- [ ] Snapshot 전략 설정 (50~100 이벤트마다)

- [ ] 이벤트 아카이빙 정책 수립 (cold storage 이동)

- [ ] Event Store 백업 및 복구 절차 검증

CQRS

- [ ] Read Model 재구축(Rebuild) 프로세스 자동화

- [ ] 읽기/쓰기 DB 분리 여부와 복제 지연 모니터링

- [ ] Projection 실패 시 알림 및 자동 재시도 설정

- [ ] Eventual Consistency 처리 (UI에서 낙관적 업데이트)

메시지 브로커

- [ ] Kafka: 파티셔닝 키를 애그리거트 ID로 설정

- [ ] Consumer 그룹 lag 모니터링 (Burrow 또는 Kafka Exporter)

- [ ] Dead Letter Queue(DLQ) 설정 및 모니터링

- [ ] 메시지 직렬화 포맷 결정 (Avro + Schema Registry 권장)

- [ ] 브로커 클러스터 복제 팩터 3 이상 설정

Saga

- [ ] 보상 트랜잭션 정의 및 테스트 완료

- [ ] Saga 상태 영속화 (DB 저장) 확인

- [ ] Saga 타임아웃 설정 (무한 대기 방지)

- [ ] 보상 실패 시 수동 복구 프로세스 문서화

이벤트 스키마

- [ ] Schema Registry 도입 (Confluent Schema Registry, AWS Glue)

- [ ] 하위 호환성 검증 자동화 (CI 파이프라인에 포함)

- [ ] 이벤트 업캐스터(Upcaster) 구현

- [ ] 이벤트 카탈로그 문서 유지

모니터링

- [ ] 이벤트 처리 지연시간(latency) 메트릭 수집

- [ ] Consumer Group lag 알림 설정

- [ ] 이벤트 처리 실패율 대시보드 구성

- [ ] Correlation ID 기반 엔드투엔드 트레이싱

참고자료

- [Microsoft - CQRS Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs)

- [Microsoft - Event Sourcing Pattern](https://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)

- [Martin Fowler - Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html)

- [Martin Fowler - CQRS](https://martinfowler.com/bliki/CQRS.html)

- [Chris Richardson - Saga Pattern](https://microservices.io/patterns/data/saga.html)

- [Apache Kafka Documentation](https://kafka.apache.org/documentation/)

- [Confluent - Event-Driven Architecture](https://www.confluent.io/learn/event-driven-architecture/)

- [Greg Young - CQRS Documents](https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf)

- [Vaughn Vernon - Implementing Domain-Driven Design](https://www.oreilly.com/library/view/implementing-domain-driven-design/9780133039900/)

현재 단락 (1/785)

마이크로서비스 아키텍처가 보편화되면서, 서비스 간 통신 방식은 시스템의 확장성과 복원력을 결정짓는 핵심 요소가 되었다. 동기 REST 호출 기반 아키텍처는 서비스 간 강한 결합을 ...

작성 글자: 0원문 글자: 22,073작성 단락: 0/785