- Published on
Event-Driven Architecture + CQRS + Event Sourcing 실전 구현: Kafka/RabbitMQ 기반 분산 시스템 설계
- Authors
- Name
- 들어가며
- Event-Driven Architecture 핵심 패턴
- CQRS (Command Query Responsibility Segregation) 심화
- Event Sourcing 구현
- 메시지 브로커 비교: Kafka vs RabbitMQ vs NATS
- Saga 패턴: 분산 트랜잭션 관리
- Snapshot 전략: 이벤트 재생 최적화
- 운영 주의사항
- 장애 사례와 복구 절차
- 프로덕션 체크리스트
- 참고자료

들어가며
마이크로서비스 아키텍처가 보편화되면서, 서비스 간 통신 방식은 시스템의 확장성과 복원력을 결정짓는 핵심 요소가 되었다. 동기 REST 호출 기반 아키텍처는 서비스 간 강한 결합을 만들고, 하나의 서비스 장애가 전체 시스템으로 전파되는 캐스케이드 장애를 유발한다.
Event-Driven Architecture(EDA)는 이 문제를 해결하는 대표적인 패턴이다. 서비스는 이벤트를 발행(publish)하고, 관심 있는 서비스가 이를 구독(subscribe)하여 비동기적으로 처리한다. 여기에 CQRS(Command Query Responsibility Segregation)와 Event Sourcing을 결합하면 읽기/쓰기 워크로드를 독립적으로 스케일링하고, 모든 상태 변경의 감사 추적(audit trail)을 자연스럽게 확보할 수 있다.
이 글에서는 EDA의 핵심 패턴부터 CQRS와 Event Sourcing의 구현, 메시지 브로커 비교, Saga 패턴, 그리고 프로덕션에서 겪는 장애 사례와 대응 방법을 코드 수준에서 다룬다.
Event-Driven Architecture 핵심 패턴
이벤트의 종류
EDA에서 이벤트는 크게 세 가지로 분류된다.
| 유형 | 설명 | 예시 | 특징 |
|---|---|---|---|
| Domain Event | 비즈니스 도메인에서 발생한 의미 있는 사실 | OrderPlaced, PaymentCompleted | 과거형 명명, 불변 |
| Integration Event | 서비스 경계를 넘어 전파되는 이벤트 | OrderShipped (물류 서비스로) | 서비스 간 계약 |
| Notification Event | 단순 알림 (데이터 최소화) | OrderStatusChanged | 수신자가 상세 데이터를 별도 조회 |
EDA 통신 패턴 비교
| 패턴 | 결합도 | 전달 보장 | 순서 보장 | 사용 사례 |
|---|---|---|---|---|
| Pub/Sub | 느슨 | At-least-once | 보장 안 됨 | 알림, 캐시 무효화 |
| Event Streaming | 느슨 | At-least-once | 파티션 내 보장 | 로그 집계, 실시간 분석 |
| Event Sourcing | 자기 참조 | 영속 저장 | 애그리거트 내 보장 | 감사 추적, 상태 복원 |
| Request-Reply | 강함 | 동기 | 요청-응답 쌍 | 동기적 확인 필요 시 |
TypeScript 이벤트 기본 구조
// 도메인 이벤트 기본 인터페이스
interface DomainEvent {
readonly eventId: string
readonly eventType: string
readonly aggregateId: string
readonly aggregateType: string
readonly version: number
readonly timestamp: Date
readonly metadata: EventMetadata
readonly payload: Record<string, unknown>
}
interface EventMetadata {
readonly correlationId: string
readonly causationId: string
readonly userId?: string
readonly traceId?: string
}
// 구체적 이벤트 예시
class OrderPlacedEvent implements DomainEvent {
readonly eventType = 'OrderPlaced'
readonly aggregateType = 'Order'
constructor(
public readonly eventId: string,
public readonly aggregateId: string,
public readonly version: number,
public readonly timestamp: Date,
public readonly metadata: EventMetadata,
public readonly payload: {
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
totalAmount: number
currency: string
}
) {}
}
CQRS (Command Query Responsibility Segregation) 심화
CQRS의 핵심 원칙
CQRS는 읽기(Query) 모델과 쓰기(Command) 모델을 완전히 분리하는 패턴이다. 전통적인 CRUD 모델에서는 동일한 데이터 모델로 읽기와 쓰기를 모두 처리하지만, CQRS에서는 각각의 목적에 최적화된 별도의 모델을 사용한다.
// Command 측 - 쓰기 모델
interface Command {
readonly commandId: string
readonly commandType: string
readonly timestamp: Date
}
class PlaceOrderCommand implements Command {
readonly commandType = 'PlaceOrder'
constructor(
public readonly commandId: string,
public readonly timestamp: Date,
public readonly customerId: string,
public readonly items: Array<{
productId: string
quantity: number
price: number
}>,
public readonly shippingAddress: string
) {}
}
// Command Handler
class PlaceOrderHandler {
constructor(
private readonly orderRepository: OrderRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// 비즈니스 검증
await this.validateInventory(command.items)
await this.validateCustomerCredit(command.customerId)
// 애그리거트 생성 및 이벤트 발행
const order = Order.create(command.customerId, command.items, command.shippingAddress)
await this.orderRepository.save(order)
// 도메인 이벤트 발행
for (const event of order.getUncommittedEvents()) {
await this.eventBus.publish(event)
}
return order.id
}
private async validateInventory(
items: Array<{ productId: string; quantity: number; price: number }>
): Promise<void> {
// 재고 확인 로직
}
private async validateCustomerCredit(customerId: string): Promise<void> {
// 고객 신용 확인 로직
}
}
Query 측 - 읽기 모델
// Query 측 - 읽기에 최적화된 비정규화 모델
interface OrderSummaryReadModel {
orderId: string
customerName: string
customerEmail: string
orderDate: Date
status: string
totalAmount: number
itemCount: number
lastUpdated: Date
}
// Query Handler
class GetOrderSummaryHandler {
constructor(private readonly readDb: ReadDatabase) {}
async handle(orderId: string): Promise<OrderSummaryReadModel | null> {
// 읽기 전용 DB에서 비정규화된 데이터 직접 조회
return this.readDb.query('SELECT * FROM order_summaries WHERE order_id = ?', [orderId])
}
}
// Projection - 이벤트를 읽기 모델로 변환
class OrderSummaryProjection {
constructor(private readonly readDb: ReadDatabase) {}
async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> {
await this.readDb.execute(
`INSERT INTO order_summaries (order_id, customer_name, order_date, status, total_amount, item_count, last_updated)
VALUES (?, ?, ?, 'PLACED', ?, ?, ?)`,
[
event.aggregateId,
event.payload.customerId,
event.timestamp,
event.payload.totalAmount,
event.payload.items.length,
new Date(),
]
)
}
async handleOrderShipped(event: DomainEvent): Promise<void> {
await this.readDb.execute(
`UPDATE order_summaries SET status = 'SHIPPED', last_updated = ? WHERE order_id = ?`,
[new Date(), event.aggregateId]
)
}
}
Event Sourcing 구현
Event Store 설계
Event Sourcing에서는 상태를 직접 저장하지 않고, 상태 변경 이벤트의 시퀀스를 저장한다. 현재 상태는 이벤트를 순서대로 재생(replay)하여 복원한다.
// Event Store 인터페이스
interface EventStore {
append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>
getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>
getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]>
}
// PostgreSQL 기반 Event Store 구현
class PostgresEventStore implements EventStore {
constructor(private readonly pool: Pool) {}
async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// 낙관적 동시성 제어 (Optimistic Concurrency Control)
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
[aggregateId]
)
const currentVersion = result.rows[0]?.current_version ?? 0
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current version is ${currentVersion}`
)
}
// 이벤트 배치 삽입
for (const event of events) {
await client.query(
`INSERT INTO events (event_id, aggregate_id, aggregate_type, event_type, version, payload, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.eventId,
event.aggregateId,
event.aggregateType,
event.eventType,
event.version,
JSON.stringify(event.payload),
JSON.stringify(event.metadata),
event.timestamp,
]
)
}
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
async getEvents(aggregateId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
const result = await this.pool.query(
'SELECT * FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',
[aggregateId, fromVersion]
)
return result.rows.map(this.deserializeEvent)
}
async getEventsByType(eventType: string, fromTimestamp?: Date): Promise<DomainEvent[]> {
const query = fromTimestamp
? 'SELECT * FROM events WHERE event_type = $1 AND timestamp > $2 ORDER BY timestamp ASC'
: 'SELECT * FROM events WHERE event_type = $1 ORDER BY timestamp ASC'
const params = fromTimestamp ? [eventType, fromTimestamp] : [eventType]
const result = await this.pool.query(query, params)
return result.rows.map(this.deserializeEvent)
}
private deserializeEvent(row: any): DomainEvent {
return {
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
timestamp: row.timestamp,
metadata: JSON.parse(row.metadata),
payload: JSON.parse(row.payload),
}
}
}
애그리거트와 이벤트 재생
// Event Sourced 애그리거트
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
protected version: number = 0
abstract get id(): string
protected apply(event: DomainEvent): void {
this.when(event)
this.version = event.version
this.uncommittedEvents.push(event)
}
protected abstract when(event: DomainEvent): void
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this.version = event.version
}
}
}
// Order 애그리거트
class Order extends EventSourcedAggregate {
private _id: string = ''
private _customerId: string = ''
private _status: OrderStatus = OrderStatus.DRAFT
private _items: OrderItem[] = []
private _totalAmount: number = 0
get id(): string {
return this._id
}
static create(
customerId: string,
items: Array<{ productId: string; quantity: number; price: number }>,
shippingAddress: string
): Order {
const order = new Order()
const orderId = generateUUID()
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0)
order.apply({
eventId: generateUUID(),
eventType: 'OrderPlaced',
aggregateId: orderId,
aggregateType: 'Order',
version: 1,
timestamp: new Date(),
metadata: { correlationId: generateUUID(), causationId: generateUUID() },
payload: { customerId, items, totalAmount, shippingAddress, currency: 'KRW' },
})
return order
}
confirm(): void {
if (this._status !== OrderStatus.PLACED) {
throw new Error('Order can only be confirmed when in PLACED status')
}
this.apply({
eventId: generateUUID(),
eventType: 'OrderConfirmed',
aggregateId: this._id,
aggregateType: 'Order',
version: this.version + 1,
timestamp: new Date(),
metadata: { correlationId: generateUUID(), causationId: generateUUID() },
payload: { confirmedAt: new Date().toISOString() },
})
}
protected when(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderPlaced':
this._id = event.aggregateId
this._customerId = event.payload.customerId as string
this._status = OrderStatus.PLACED
this._items = event.payload.items as OrderItem[]
this._totalAmount = event.payload.totalAmount as number
break
case 'OrderConfirmed':
this._status = OrderStatus.CONFIRMED
break
case 'OrderShipped':
this._status = OrderStatus.SHIPPED
break
case 'OrderCancelled':
this._status = OrderStatus.CANCELLED
break
}
}
}
enum OrderStatus {
DRAFT = 'DRAFT',
PLACED = 'PLACED',
CONFIRMED = 'CONFIRMED',
SHIPPED = 'SHIPPED',
CANCELLED = 'CANCELLED',
}
Python Event Store 구현
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Protocol
from uuid import uuid4
import json
import asyncpg
@dataclass(frozen=True)
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
aggregate_type: str
version: int
timestamp: datetime
payload: dict[str, Any]
metadata: dict[str, str] = field(default_factory=dict)
class EventStore(Protocol):
async def append(
self,
aggregate_id: str,
events: list[DomainEvent],
expected_version: int,
) -> None: ...
async def get_events(
self, aggregate_id: str, from_version: int = 0
) -> list[DomainEvent]: ...
class PostgresEventStore:
def __init__(self, pool: asyncpg.Pool):
self._pool = pool
async def append(
self,
aggregate_id: str,
events: list[DomainEvent],
expected_version: int,
) -> None:
async with self._pool.acquire() as conn:
async with conn.transaction():
# 낙관적 동시성 제어
row = await conn.fetchrow(
"SELECT MAX(version) AS cur FROM events WHERE aggregate_id = $1",
aggregate_id,
)
current = row["cur"] or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected {expected_version}, got {current}"
)
for event in events:
await conn.execute(
"""
INSERT INTO events
(event_id, aggregate_id, aggregate_type,
event_type, version, payload, metadata, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
event.event_id,
event.aggregate_id,
event.aggregate_type,
event.event_type,
event.version,
json.dumps(event.payload),
json.dumps(event.metadata),
event.timestamp,
)
async def get_events(
self, aggregate_id: str, from_version: int = 0
) -> list[DomainEvent]:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
""",
aggregate_id,
from_version,
)
return [self._deserialize(row) for row in rows]
@staticmethod
def _deserialize(row) -> DomainEvent:
return DomainEvent(
event_id=row["event_id"],
event_type=row["event_type"],
aggregate_id=row["aggregate_id"],
aggregate_type=row["aggregate_type"],
version=row["version"],
timestamp=row["timestamp"],
payload=json.loads(row["payload"]),
metadata=json.loads(row["metadata"]),
)
메시지 브로커 비교: Kafka vs RabbitMQ vs NATS
핵심 특성 비교
| 항목 | Apache Kafka | RabbitMQ | NATS JetStream |
|---|---|---|---|
| 모델 | 분산 로그 (Append-only) | 메시지 큐 (Push 기반) | 스트리밍 (Pull/Push) |
| 메시지 보존 | 설정된 기간 동안 영구 보존 | 소비 후 삭제 (기본) | 설정된 기간 보존 |
| 순서 보장 | 파티션 내 보장 | 큐 단위 보장 | 스트림 내 보장 |
| 처리량 | 초당 수백만 건 | 초당 수만 건 | 초당 수십만 건 |
| 지연시간 | ms 단위 (배치) | us 단위 (단건) | us 단위 |
| 컨슈머 그룹 | 네이티브 지원 | 경쟁 소비자 패턴 | 네이티브 지원 |
| Replay | 오프셋 기반 자유 이동 | 제한적 (dead letter) | 시퀀스 기반 이동 |
| 운영 복잡도 | 높음 (ZooKeeper/KRaft) | 중간 | 낮음 |
| 적합한 사례 | 이벤트 스트리밍, 로그 집계 | 태스크 큐, RPC | 경량 메시징, IoT |
Kafka Producer/Consumer 예시
import { Kafka, logLevel } from 'kafkajs'
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8,
},
})
// Producer - 이벤트 발행
class KafkaEventPublisher {
private producer = kafka.producer({
idempotent: true, // 멱등성 보장
maxInFlightRequests: 5,
transactionalId: 'order-service-producer',
})
async publish(event: DomainEvent): Promise<void> {
await this.producer.send({
topic: `events.${event.aggregateType.toLowerCase()}`,
messages: [
{
key: event.aggregateId, // 파티셔닝 키 = 애그리거트 ID
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'correlation-id': event.metadata.correlationId,
'content-type': 'application/json',
},
},
],
})
}
async publishBatch(events: DomainEvent[]): Promise<void> {
const transaction = await this.producer.transaction()
try {
for (const event of events) {
await transaction.send({
topic: `events.${event.aggregateType.toLowerCase()}`,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'correlation-id': event.metadata.correlationId,
},
},
],
})
}
await transaction.commit()
} catch (error) {
await transaction.abort()
throw error
}
}
}
// Consumer - 이벤트 소비 (멱등성 보장)
class KafkaEventConsumer {
private consumer = kafka.consumer({ groupId: 'projection-service' })
private processedEvents = new Set<string>()
async start(): Promise<void> {
await this.consumer.subscribe({
topics: ['events.order'],
fromBeginning: false,
})
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event: DomainEvent = JSON.parse(message.value!.toString())
// 멱등성 체크 - 이미 처리된 이벤트 스킵
if (await this.isAlreadyProcessed(event.eventId)) {
console.log(`Skipping duplicate event: ${event.eventId}`)
return
}
await this.handleEvent(event)
await this.markAsProcessed(event.eventId)
},
})
}
private async isAlreadyProcessed(eventId: string): Promise<boolean> {
// Redis 또는 DB에서 처리 여부 확인
return this.processedEvents.has(eventId)
}
private async markAsProcessed(eventId: string): Promise<void> {
this.processedEvents.add(eventId)
}
private async handleEvent(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderPlaced':
await this.handleOrderPlaced(event)
break
case 'OrderConfirmed':
await this.handleOrderConfirmed(event)
break
}
}
private async handleOrderPlaced(event: DomainEvent): Promise<void> {
console.log(`Processing OrderPlaced for ${event.aggregateId}`)
}
private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
console.log(`Processing OrderConfirmed for ${event.aggregateId}`)
}
}
Saga 패턴: 분산 트랜잭션 관리
Orchestration Saga vs Choreography Saga
| 항목 | Orchestration | Choreography |
|---|---|---|
| 제어 방식 | 중앙 오케스트레이터가 조율 | 각 서비스가 자율적으로 반응 |
| 결합도 | 오케스트레이터에 의존 | 서비스 간 느슨한 결합 |
| 복잡성 | 오케스트레이터에 집중 | 서비스에 분산 |
| 추적 | 중앙에서 상태 확인 가능 | 분산된 상태 추적 필요 |
| 에러 처리 | 중앙에서 보상 트랜잭션 실행 | 각 서비스가 보상 이벤트 발행 |
| 적합한 경우 | 복잡한 워크플로우 (5개 이상 단계) | 단순한 워크플로우 (3개 이하 단계) |
Orchestration Saga 구현
// Saga 상태 머신
enum SagaStatus {
STARTED = 'STARTED',
ORDER_CREATED = 'ORDER_CREATED',
PAYMENT_PROCESSED = 'PAYMENT_PROCESSED',
INVENTORY_RESERVED = 'INVENTORY_RESERVED',
COMPLETED = 'COMPLETED',
COMPENSATING = 'COMPENSATING',
FAILED = 'FAILED',
}
interface SagaStep {
name: string
execute: (context: SagaContext) => Promise<void>
compensate: (context: SagaContext) => Promise<void>
}
interface SagaContext {
sagaId: string
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
paymentId?: string
reservationId?: string
[key: string]: unknown
}
class OrderSagaOrchestrator {
private steps: SagaStep[] = []
private completedSteps: SagaStep[] = []
constructor(
private readonly sagaRepository: SagaRepository,
private readonly eventBus: EventBus
) {
this.steps = [
{
name: 'CreateOrder',
execute: async (ctx) => {
await this.eventBus.publish({
eventType: 'CreateOrderRequested',
aggregateId: ctx.orderId,
payload: { customerId: ctx.customerId, items: ctx.items },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'CancelOrderRequested',
aggregateId: ctx.orderId,
payload: { reason: 'Saga compensation' },
} as DomainEvent)
},
},
{
name: 'ProcessPayment',
execute: async (ctx) => {
const totalAmount = ctx.items.reduce((sum, item) => sum + item.price * item.quantity, 0)
await this.eventBus.publish({
eventType: 'ProcessPaymentRequested',
aggregateId: ctx.orderId,
payload: { customerId: ctx.customerId, amount: totalAmount },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'RefundPaymentRequested',
aggregateId: ctx.orderId,
payload: { paymentId: ctx.paymentId },
} as DomainEvent)
},
},
{
name: 'ReserveInventory',
execute: async (ctx) => {
await this.eventBus.publish({
eventType: 'ReserveInventoryRequested',
aggregateId: ctx.orderId,
payload: { items: ctx.items },
} as DomainEvent)
},
compensate: async (ctx) => {
await this.eventBus.publish({
eventType: 'ReleaseInventoryRequested',
aggregateId: ctx.orderId,
payload: { reservationId: ctx.reservationId },
} as DomainEvent)
},
},
]
}
async start(context: SagaContext): Promise<void> {
await this.sagaRepository.save(context.sagaId, SagaStatus.STARTED, context)
await this.executeNextStep(context, 0)
}
private async executeNextStep(context: SagaContext, stepIndex: number): Promise<void> {
if (stepIndex >= this.steps.length) {
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPLETED)
return
}
const step = this.steps[stepIndex]
try {
await step.execute(context)
this.completedSteps.push(step)
await this.executeNextStep(context, stepIndex + 1)
} catch (error) {
console.error(`Saga step '${step.name}' failed:`, error)
await this.compensate(context)
}
}
private async compensate(context: SagaContext): Promise<void> {
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.COMPENSATING)
// 완료된 스텝을 역순으로 보상
for (const step of [...this.completedSteps].reverse()) {
try {
await step.compensate(context)
} catch (error) {
console.error(`Compensation for '${step.name}' failed:`, error)
// 보상 실패 시 수동 개입 필요 - 알림 발송
}
}
await this.sagaRepository.updateStatus(context.sagaId, SagaStatus.FAILED)
}
}
Snapshot 전략: 이벤트 재생 최적화
이벤트 수가 많아지면 재생(replay) 시간이 길어진다. Snapshot은 특정 시점의 애그리거트 상태를 저장하여 이 문제를 해결한다.
// Snapshot 기반 애그리거트 로딩
class EventSourcedRepository<T extends EventSourcedAggregate> {
private readonly SNAPSHOT_INTERVAL = 50 // 50개 이벤트마다 스냅샷
constructor(
private readonly eventStore: EventStore,
private readonly snapshotStore: SnapshotStore,
private readonly factory: () => T
) {}
async load(aggregateId: string): Promise<T> {
const aggregate = this.factory()
// 1. 최신 스냅샷 로드
const snapshot = await this.snapshotStore.getLatest(aggregateId)
if (snapshot) {
aggregate.restoreFromSnapshot(snapshot.state)
// 2. 스냅샷 이후의 이벤트만 재생
const events = await this.eventStore.getEvents(aggregateId, snapshot.version)
aggregate.loadFromHistory(events)
} else {
// 3. 전체 이벤트 재생
const events = await this.eventStore.getEvents(aggregateId)
aggregate.loadFromHistory(events)
}
return aggregate
}
async save(aggregate: T): Promise<void> {
const events = aggregate.getUncommittedEvents()
await this.eventStore.append(aggregate.id, events, aggregate.version - events.length)
// 스냅샷 생성 조건 확인
if (aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotStore.save({
aggregateId: aggregate.id,
version: aggregate.version,
state: aggregate.toSnapshot(),
timestamp: new Date(),
})
}
aggregate.clearUncommittedEvents()
}
}
운영 주의사항
반드시 알아야 할 핵심 경고
이벤트 스키마 변경은 DB 마이그레이션보다 위험하다: Event Store의 이벤트는 불변이므로, 스키마를 변경하면 과거 이벤트 재생이 깨진다. 반드시 하위 호환성(backward compatibility)을 유지해야 한다.
CQRS 도입은 복잡도를 최소 2배로 만든다: 읽기 모델과 쓰기 모델의 동기화 지연(eventual consistency)을 UI와 비즈니스 로직에서 반드시 처리해야 한다.
Saga 보상 트랜잭션 실패는 수동 개입이 필요하다: 보상 트랜잭션도 실패할 수 있으며, 이 경우 Dead Letter Queue와 수동 복구 프로세스가 반드시 필요하다.
이벤트 순서 역전은 데이터 정합성을 파괴한다: Kafka의 파티셔닝 키가 잘못 설정되면 동일 애그리거트의 이벤트가 다른 파티션에 분산되어 순서가 뒤바뀐다.
Event Store의 무한 성장을 관리해야 한다: Snapshot과 아카이빙 전략 없이는 Event Store가 무한히 커져 재생 성능이 급격히 저하된다.
장애 사례와 복구 절차
사례 1: 이벤트 순서 역전
증상: OrderPlaced 이전에 OrderShipped 이벤트가 도착하여 프로젝션이 실패한다.
원인: Kafka 파티셔닝 키를 설정하지 않아 이벤트가 다른 파티션에 분산되었다.
복구:
# 1. 컨슈머 그룹 오프셋 리셋
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group projection-service \
--topic events.order \
--reset-offsets --to-earliest --execute
# 2. 프로젝션 테이블 초기화
psql -c "TRUNCATE TABLE order_summaries;"
# 3. 프로젝션 서비스 재시작 (전체 재생)
kubectl rollout restart deployment/projection-service
예방: 반드시 애그리거트 ID를 Kafka 파티셔닝 키로 사용한다.
사례 2: 중복 이벤트 처리
증상: 주문이 이중으로 생성되거나, 결제가 두 번 처리된다.
원인: 컨슈머 장애 후 재시작 시 이미 처리된 메시지를 다시 소비한다 (at-least-once 특성).
복구:
// 멱등성 키 테이블 기반 중복 방지
class IdempotencyGuard {
constructor(private readonly db: Database) {}
async executeOnce<T>(idempotencyKey: string, operation: () => Promise<T>): Promise<T | null> {
try {
// UNIQUE 제약 조건으로 중복 삽입 방지
await this.db.execute('INSERT INTO processed_events (event_id, processed_at) VALUES (?, ?)', [
idempotencyKey,
new Date(),
])
} catch (error) {
// 이미 처리된 이벤트
console.log(`Event ${idempotencyKey} already processed, skipping`)
return null
}
return operation()
}
}
사례 3: 스키마 진화 실패
증상: 새 버전의 이벤트 핸들러가 과거 이벤트를 역직렬화하지 못해 프로젝션이 중단된다.
원인: 이벤트에 필수 필드를 추가하면서 과거 이벤트에는 해당 필드가 없다.
복구:
// 이벤트 업캐스터 (Upcaster) 패턴
class EventUpcaster {
private upcasters: Map<string, (event: any) => DomainEvent> = new Map()
register(eventType: string, fromVersion: number, upcaster: (event: any) => any): void {
this.upcasters.set(`${eventType}:v${fromVersion}`, upcaster)
}
upcast(event: any): DomainEvent {
const key = `${event.eventType}:v${event.schemaVersion || 1}`
const upcaster = this.upcasters.get(key)
if (upcaster) {
return this.upcast(upcaster(event)) // 재귀적으로 최신 버전까지
}
return event
}
}
// 사용 예: v1 -> v2 변환
const upcaster = new EventUpcaster()
upcaster.register('OrderPlaced', 1, (event) => ({
...event,
schemaVersion: 2,
payload: {
...event.payload,
currency: event.payload.currency || 'KRW', // 기본값 추가
shippingMethod: 'STANDARD', // 새 필드에 기본값 설정
},
}))
프로덕션 체크리스트
Event Store
- 이벤트 테이블에 aggregate_id + version 유니크 인덱스 설정
- 낙관적 동시성 제어(Optimistic Concurrency) 구현 확인
- Snapshot 전략 설정 (50~100 이벤트마다)
- 이벤트 아카이빙 정책 수립 (cold storage 이동)
- Event Store 백업 및 복구 절차 검증
CQRS
- Read Model 재구축(Rebuild) 프로세스 자동화
- 읽기/쓰기 DB 분리 여부와 복제 지연 모니터링
- Projection 실패 시 알림 및 자동 재시도 설정
- Eventual Consistency 처리 (UI에서 낙관적 업데이트)
메시지 브로커
- Kafka: 파티셔닝 키를 애그리거트 ID로 설정
- Consumer 그룹 lag 모니터링 (Burrow 또는 Kafka Exporter)
- Dead Letter Queue(DLQ) 설정 및 모니터링
- 메시지 직렬화 포맷 결정 (Avro + Schema Registry 권장)
- 브로커 클러스터 복제 팩터 3 이상 설정
Saga
- 보상 트랜잭션 정의 및 테스트 완료
- Saga 상태 영속화 (DB 저장) 확인
- Saga 타임아웃 설정 (무한 대기 방지)
- 보상 실패 시 수동 복구 프로세스 문서화
이벤트 스키마
- Schema Registry 도입 (Confluent Schema Registry, AWS Glue)
- 하위 호환성 검증 자동화 (CI 파이프라인에 포함)
- 이벤트 업캐스터(Upcaster) 구현
- 이벤트 카탈로그 문서 유지
모니터링
- 이벤트 처리 지연시간(latency) 메트릭 수집
- Consumer Group lag 알림 설정
- 이벤트 처리 실패율 대시보드 구성
- Correlation ID 기반 엔드투엔드 트레이싱