- Authors
- Name
- 왜 이벤트 소싱과 CQRS를 함께 다루는가
- 아키텍처 전체 구조
- 도메인 이벤트 설계
- 커맨드 핸들러와 Aggregate 구현
- 프로젝션 설계와 구현
- 스냅샷 전략
- 이벤트 버전 관리 (Event Versioning)
- 장애 복구 절차
- 운영 체크리스트
- 흔한 실수와 안티패턴
- 이벤트 소싱 vs 전통 CRUD 의사결정 기준
- 정리
- References

왜 이벤트 소싱과 CQRS를 함께 다루는가
이벤트 소싱(Event Sourcing)과 CQRS(Command Query Responsibility Segregation)는 독립적인 패턴이지만, 프로덕션 환경에서는 거의 항상 함께 사용된다. 이벤트 소싱으로 상태 변경의 전체 이력을 이벤트 스트림에 기록하면, 현재 상태를 이벤트 재생(replay)으로만 복원해야 하기 때문에 읽기 성능이 급격히 떨어진다. CQRS로 읽기 전용 프로젝션(Read Model)을 분리하면 이 문제가 해결된다.
이 글은 개념 설명에 머무르지 않는다. EventStoreDB 기반 이벤트 저장소 구축, TypeScript와 Python으로 커맨드 핸들러와 프로젝션을 구현하고, 스냅샷 전략, 이벤트 스키마 버전 관리, 장애 복구 절차까지 운영 레벨의 내용을 모두 다룬다. 2025년 이후 EventStoreDB가 Kurrent로 리브랜딩된 상황과 최신 gRPC 클라이언트 API도 반영했다.
아키텍처 전체 구조
이벤트 소싱 + CQRS 시스템의 전체 흐름은 다음과 같다.
- 클라이언트가 커맨드(Command)를 전송한다
- 커맨드 핸들러가 도메인 불변식(invariant)을 검증한다
- 검증을 통과하면 도메인 이벤트를 생성하고 이벤트 스토어에 append한다
- 이벤트 스토어가 **구독자(Subscriber)**에게 이벤트를 푸시한다
- 프로젝션 핸들러가 이벤트를 수신하여 **읽기 모델(Read Model)**을 갱신한다
- 쿼리 핸들러가 읽기 모델에서 데이터를 조회하여 클라이언트에 반환한다
핵심 원칙: 쓰기 경로(Command Path)와 읽기 경로(Query Path)는 완전히 분리된다. 쓰기 측은 이벤트 스토어에만 쓰고, 읽기 측은 프로젝션된 읽기 모델에서만 조회한다.
이벤트 스토어 솔루션 비교
프로덕션에 사용 가능한 이벤트 스토어 솔루션을 비교한다. 기술 스택과 운영 환경에 맞는 선택이 중요하다.
| 솔루션 | 언어 생태계 | 저장 방식 | 프로젝션 내장 | 라이선스 | 운영 복잡도 |
|---|---|---|---|---|---|
| EventStoreDB (Kurrent) | 언어 무관 (gRPC) | 전용 파일 시스템 | 있음 (JS 기반) | BSL (상용 무료) | 중간 |
| Axon Server | JVM (Java/Kotlin) | 전용 스토리지 | 없음 (Framework에서 처리) | 오픈소스 + Enterprise | 중간 |
| Marten | .NET (C#) | PostgreSQL | 있음 (C# 기반) | MIT | 낮음 |
| EventSourcingDB | 언어 무관 (gRPC) | 전용 엔진 | 없음 | 오픈소스 | 낮음 |
| PostgreSQL + 직접 구현 | 언어 무관 | RDBMS | 없음 (직접 구현) | 오픈소스 | 높음 |
| MongoDB + 직접 구현 | 언어 무관 | Document DB | 없음 (직접 구현) | SSPL | 높음 |
EventStoreDB는 Greg Young이 설계한 전용 이벤트 스토어로, 스트림 단위 append-only 저장, 내장 프로젝션, catch-up 구독 등 이벤트 소싱에 필요한 기능을 네이티브로 제공한다. 이 글에서는 EventStoreDB를 기준으로 구현 예제를 작성한다.
도메인 이벤트 설계
이벤트 소싱 시스템에서 도메인 이벤트는 시스템의 가장 중요한 계약(contract)이다. 한번 저장된 이벤트는 절대 수정하거나 삭제하지 않는다. 따라서 이벤트 스키마 설계에 신중해야 한다.
이벤트 설계 원칙
- 과거형 동사로 명명한다:
OrderCreated,PaymentProcessed,ItemShipped - 비즈니스 의도를 담는다:
ReservationStatusChanged가 아닌RoomBooked - **자기 완결적(self-contained)**이어야 한다: 이벤트 하나로 무슨 일이 일어났는지 완전히 파악 가능해야 한다
- 단순 타입만 사용한다: string, number, boolean, 배열. Value Object를 이벤트에 직접 넣지 않는다
- 버전 필드를 포함한다: 스키마 진화에 필수
TypeScript 이벤트 정의
// events/order-events.ts
interface BaseEvent {
eventType: string
eventVersion: number
aggregateId: string
timestamp: string
metadata: {
correlationId: string
causationId: string
userId: string
}
}
interface OrderCreated extends BaseEvent {
eventType: 'OrderCreated'
eventVersion: 1
data: {
orderId: string
customerId: string
items: Array<{
productId: string
productName: string
quantity: number
unitPrice: number
}>
totalAmount: number
currency: string
shippingAddress: {
street: string
city: string
zipCode: string
country: string
}
}
}
interface OrderPaymentConfirmed extends BaseEvent {
eventType: 'OrderPaymentConfirmed'
eventVersion: 1
data: {
orderId: string
paymentId: string
amount: number
method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
confirmedAt: string
}
}
interface OrderCancelled extends BaseEvent {
eventType: 'OrderCancelled'
eventVersion: 1
data: {
orderId: string
reason: string
cancelledBy: string
refundAmount: number
}
}
type OrderEvent = OrderCreated | OrderPaymentConfirmed | OrderCancelled
correlationId와 causationId를 메타데이터에 반드시 포함시킨다. 분산 시스템에서 하나의 사용자 요청이 여러 이벤트를 발생시킬 때, 이 두 필드가 없으면 장애 추적이 불가능하다.
커맨드 핸들러와 Aggregate 구현
커맨드 핸들러는 이벤트 소싱 시스템의 쓰기 경로를 담당한다. Aggregate는 도메인 불변식을 지키는 경계이며, 이벤트를 재생하여 현재 상태를 복원한다.
TypeScript Aggregate 구현
// aggregates/order-aggregate.ts
import { EventStoreDBClient, jsonEvent, FORWARDS, START } from '@eventstore/db-client'
interface OrderState {
orderId: string
status: 'CREATED' | 'PAID' | 'SHIPPED' | 'CANCELLED'
customerId: string
totalAmount: number
items: Array<{ productId: string; quantity: number; unitPrice: number }>
version: number
}
class OrderAggregate {
private state: OrderState
private pendingEvents: OrderEvent[] = []
constructor() {
this.state = {
orderId: '',
status: 'CREATED',
customerId: '',
totalAmount: 0,
items: [],
version: -1,
}
}
// 이벤트 재생으로 상태 복원
static async load(client: EventStoreDBClient, orderId: string): Promise<OrderAggregate> {
const aggregate = new OrderAggregate()
const streamName = `order-${orderId}`
const events = client.readStream(streamName, {
direction: FORWARDS,
fromRevision: START,
})
for await (const resolvedEvent of events) {
const event = resolvedEvent.event
if (event) {
aggregate.apply(event.data as OrderEvent, false)
aggregate.state.version = Number(resolvedEvent.event!.revision)
}
}
return aggregate
}
// 커맨드: 주문 생성
createOrder(command: {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; unitPrice: number }>
}): void {
// 불변식 검증
if (this.state.orderId !== '') {
throw new Error(`Order ${command.orderId} already exists`)
}
if (command.items.length === 0) {
throw new Error('Order must contain at least one item')
}
const totalAmount = command.items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)
const event: OrderCreated = {
eventType: 'OrderCreated',
eventVersion: 1,
aggregateId: command.orderId,
timestamp: new Date().toISOString(),
metadata: { correlationId: '', causationId: '', userId: '' },
data: {
orderId: command.orderId,
customerId: command.customerId,
items: command.items.map((i) => ({
...i,
productName: '', // 조회 시 채움
})),
totalAmount,
currency: 'KRW',
shippingAddress: { street: '', city: '', zipCode: '', country: 'KR' },
},
}
this.apply(event, true)
}
// 커맨드: 결제 확인
confirmPayment(
paymentId: string,
amount: number,
method: 'CREDIT_CARD' | 'BANK_TRANSFER' | 'WALLET'
): void {
if (this.state.status !== 'CREATED') {
throw new Error(`Cannot confirm payment for order in status: ${this.state.status}`)
}
if (amount !== this.state.totalAmount) {
throw new Error(
`Payment amount ${amount} does not match order total ${this.state.totalAmount}`
)
}
const event: OrderPaymentConfirmed = {
eventType: 'OrderPaymentConfirmed',
eventVersion: 1,
aggregateId: this.state.orderId,
timestamp: new Date().toISOString(),
metadata: { correlationId: '', causationId: '', userId: '' },
data: {
orderId: this.state.orderId,
paymentId,
amount,
method,
confirmedAt: new Date().toISOString(),
},
}
this.apply(event, true)
}
// 이벤트 적용 (상태 전이)
private apply(event: OrderEvent, isNew: boolean): void {
switch (event.eventType) {
case 'OrderCreated':
this.state.orderId = event.data.orderId
this.state.customerId = event.data.customerId
this.state.totalAmount = event.data.totalAmount
this.state.items = event.data.items
this.state.status = 'CREATED'
break
case 'OrderPaymentConfirmed':
this.state.status = 'PAID'
break
case 'OrderCancelled':
this.state.status = 'CANCELLED'
break
}
if (isNew) {
this.pendingEvents.push(event)
}
}
// 이벤트 스토어에 저장
async save(client: EventStoreDBClient): Promise<void> {
if (this.pendingEvents.length === 0) return
const streamName = `order-${this.state.orderId}`
const events = this.pendingEvents.map((e) =>
jsonEvent({ type: e.eventType, data: e.data, metadata: e.metadata })
)
await client.appendToStream(streamName, events, {
expectedRevision: this.state.version === -1 ? 'no_stream' : BigInt(this.state.version),
})
this.pendingEvents = []
}
}
expectedRevision 파라미터가 낙관적 동시성 제어(Optimistic Concurrency Control)의 핵심이다. 같은 Aggregate에 대해 두 커맨드가 동시에 실행되면, 먼저 저장한 쪽이 성공하고 나중 쪽은 WrongExpectedVersionError를 받는다. 재시도 로직에서 이벤트를 다시 로드하고 커맨드를 재실행해야 한다.
프로젝션 설계와 구현
프로젝션은 이벤트 스트림에서 읽기 모델을 구축하는 과정이다. 이벤트 소싱 시스템에서 프로젝션은 "원하는 형태의 뷰를 자유롭게 만들 수 있다"는 핵심 장점을 실현하는 부분이다.
프로젝션 패턴 분류
- 인라인 프로젝션(Inline Projection): 이벤트 저장과 동시에 읽기 모델을 갱신한다. 강한 일관성을 보장하지만 쓰기 성능이 떨어진다.
- 비동기 프로젝션(Async Projection): 구독(Subscription)을 통해 비동기로 읽기 모델을 갱신한다. 최종 일관성(Eventual Consistency)이지만 쓰기 성능이 좋다. 대부분의 프로덕션 시스템이 이 방식을 채택한다.
- 라이브 프로젝션(Live Projection): 매 요청마다 이벤트를 재생한다. 항상 최신 상태를 보장하지만, 이벤트가 많으면 성능이 급격히 떨어진다.
Python 비동기 프로젝션 구현
다음은 Python으로 EventStoreDB의 Persistent Subscription을 이용한 비동기 프로젝션 구현 예제다.
# projections/order_summary_projection.py
import asyncio
import json
from datetime import datetime
from esdbclient import EventStoreDBClient, NewEvent, StreamState
from dataclasses import dataclass, asdict
from typing import Optional
import asyncpg
@dataclass
class OrderSummaryReadModel:
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: str
updated_at: str
payment_method: Optional[str] = None
cancelled_reason: Optional[str] = None
class OrderSummaryProjection:
def __init__(self, esdb_client: EventStoreDBClient, pg_pool: asyncpg.Pool):
self.esdb = esdb_client
self.pg_pool = pg_pool
self._checkpoint_interval = 100
self._processed_count = 0
async def start(self):
"""catch-up 구독으로 프로젝션 시작"""
last_position = await self._load_checkpoint()
subscription = self.esdb.subscribe_to_all(
from_position=last_position,
filter_include=[r"order-.*"], # order- 로 시작하는 스트림만 구독
)
for event in subscription:
try:
await self._handle_event(event)
self._processed_count += 1
# 주기적으로 체크포인트 저장
if self._processed_count % self._checkpoint_interval == 0:
await self._save_checkpoint(event.commit_position)
except Exception as e:
print(f"Projection error at position {event.commit_position}: {e}")
# 에러 발생 시 체크포인트까지 저장하고 재시작
await self._save_checkpoint(event.commit_position)
raise
async def _handle_event(self, event):
"""이벤트 타입별 핸들러 라우팅"""
handler_map = {
'OrderCreated': self._on_order_created,
'OrderPaymentConfirmed': self._on_payment_confirmed,
'OrderCancelled': self._on_order_cancelled,
}
handler = handler_map.get(event.type)
if handler:
data = json.loads(event.data)
await handler(data)
async def _on_order_created(self, data: dict):
"""주문 생성 이벤트 처리 - 읽기 모델 INSERT"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
INSERT INTO order_summary (
order_id, customer_id, status, total_amount,
item_count, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (order_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at
""",
data['orderId'],
data['customerId'],
'CREATED',
data['totalAmount'],
len(data['items']),
datetime.fromisoformat(data.get('createdAt', datetime.now().isoformat())),
datetime.now(),
)
async def _on_payment_confirmed(self, data: dict):
"""결제 확인 이벤트 처리 - 읽기 모델 UPDATE"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
UPDATE order_summary
SET status = 'PAID',
payment_method = $2,
updated_at = $3
WHERE order_id = $1
""", data['orderId'], data['method'], datetime.now())
async def _on_order_cancelled(self, data: dict):
"""주문 취소 이벤트 처리"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
UPDATE order_summary
SET status = 'CANCELLED',
cancelled_reason = $2,
updated_at = $3
WHERE order_id = $1
""", data['orderId'], data['reason'], datetime.now())
async def _load_checkpoint(self) -> Optional[int]:
"""마지막 처리 위치 로드"""
async with self.pg_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT position FROM projection_checkpoints WHERE name = $1",
'order_summary'
)
return row['position'] if row else None
async def _save_checkpoint(self, position: int):
"""처리 위치 저장"""
async with self.pg_pool.acquire() as conn:
await conn.execute("""
INSERT INTO projection_checkpoints (name, position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (name) DO UPDATE SET
position = EXCLUDED.position,
updated_at = NOW()
""", 'order_summary', position)
프로젝션 구현에서 가장 중요한 것은 **멱등성(Idempotency)**이다. 위 코드의 ON CONFLICT ... DO UPDATE 구문이 핵심이다. 프로젝션이 중간에 실패하고 재시작하면, 이미 처리한 이벤트를 다시 받을 수 있다. 멱등하게 처리하지 않으면 데이터가 꼬인다.
읽기 모델 테이블 스키마
-- 프로젝션이 사용하는 읽기 모델 테이블
CREATE TABLE order_summary (
order_id VARCHAR(36) PRIMARY KEY,
customer_id VARCHAR(36) NOT NULL,
status VARCHAR(20) NOT NULL,
total_amount DECIMAL(15,2) NOT NULL,
item_count INTEGER NOT NULL,
payment_method VARCHAR(20),
cancelled_reason TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
-- 고객별 주문 조회 최적화 인덱스
CREATE INDEX idx_order_summary_customer ON order_summary(customer_id, status);
-- 상태별 필터링 인덱스
CREATE INDEX idx_order_summary_status ON order_summary(status, created_at DESC);
-- 프로젝션 체크포인트 테이블
CREATE TABLE projection_checkpoints (
name VARCHAR(100) PRIMARY KEY,
position BIGINT NOT NULL,
updated_at TIMESTAMP NOT NULL
);
읽기 모델은 프로젝션 요구사항에 맞게 자유롭게 설계한다. 정규화할 필요가 없다. 고객별 대시보드용, 관리자 통계용, 검색 엔진 인덱싱용 등 여러 프로젝션을 동시에 운영할 수 있다. 새로운 뷰가 필요하면 새 프로젝션을 추가하고 이벤트를 처음부터 재생하면 된다.
스냅샷 전략
Aggregate에 이벤트가 수천, 수만 건 쌓이면 매번 전체 이벤트를 재생하는 것은 비효율적이다. 스냅샷은 특정 시점의 Aggregate 상태를 저장하여 재생 범위를 줄인다.
스냅샷 적용 기준
스냅샷을 너무 일찍 도입하면 복잡도만 올라간다. 다음 기준으로 판단한다.
- Aggregate당 평균 이벤트 수가 50개 이상이면 스냅샷 도입을 검토한다
- 이벤트 재생 시간이 100ms를 초과하면 스냅샷이 필요하다
- 스냅샷 주기는 보통 매 100~500 이벤트마다 생성한다
TypeScript 스냅샷 구현
// snapshots/snapshot-store.ts
interface Snapshot<T> {
aggregateId: string
aggregateType: string
version: number // 스냅샷 시점의 Aggregate 버전
schemaVersion: number // 스냅샷 스키마 버전 (업그레이드 감지용)
state: T
createdAt: string
}
class SnapshotStore {
private client: EventStoreDBClient
private snapshotInterval: number
constructor(client: EventStoreDBClient, snapshotInterval = 200) {
this.client = client
this.snapshotInterval = snapshotInterval
}
async saveSnapshot<T>(snapshot: Snapshot<T>): Promise<void> {
const streamName = `snapshot-${snapshot.aggregateType}-${snapshot.aggregateId}`
const event = jsonEvent({
type: 'Snapshot',
data: snapshot,
})
// 스냅샷 스트림은 최신 1개만 유지 (maxCount 설정)
await this.client.appendToStream(streamName, [event])
await this.client.setStreamMetadata(streamName, {
maxCount: 3, // 최근 3개만 유지하여 롤백 가능성 확보
})
}
async loadSnapshot<T>(
aggregateType: string,
aggregateId: string,
currentSchemaVersion: number
): Promise<Snapshot<T> | null> {
const streamName = `snapshot-${aggregateType}-${aggregateId}`
try {
const events = this.client.readStream(streamName, {
direction: BACKWARDS,
fromRevision: END,
maxCount: 1,
})
for await (const resolved of events) {
const snapshot = resolved.event!.data as Snapshot<T>
// 스키마 버전이 다르면 스냅샷 무시 (전체 이벤트 재생)
if (snapshot.schemaVersion !== currentSchemaVersion) {
console.warn(
`Snapshot schema mismatch for ${aggregateId}: ` +
`expected ${currentSchemaVersion}, got ${snapshot.schemaVersion}. ` +
`Replaying all events.`
)
return null
}
return snapshot
}
} catch (error) {
// 스냅샷 스트림이 없으면 null 반환
return null
}
return null
}
shouldTakeSnapshot(currentVersion: number, lastSnapshotVersion: number): boolean {
return currentVersion - lastSnapshotVersion >= this.snapshotInterval
}
}
스냅샷의 schemaVersion 필드에 주목해야 한다. Aggregate 구조가 변경되면 기존 스냅샷은 더 이상 유효하지 않다. schemaVersion이 불일치하면 스냅샷을 무시하고 전체 이벤트를 재생하여 새 스냅샷을 생성한다. 이것이 이벤트 소싱의 장점이다. 이벤트는 불변이므로 언제든 다시 재생할 수 있다.
주의: 스냅샷은 성능 최적화 기법이지, 필수 요소가 아니다. 스냅샷 없이도 시스템은 정상 동작해야 한다. 스냅샷이 손상되거나 무효화되면 전체 이벤트 재생으로 자동 폴백되도록 구현한다.
이벤트 버전 관리 (Event Versioning)
프로덕션 시스템이 진화하면 이벤트 스키마도 변경된다. 이벤트 소싱에서 저장된 이벤트는 절대 수정하지 않으므로, 스키마 변경을 안전하게 처리하는 전략이 필요하다.
버전 관리 전략 비교
| 전략 | 설명 | 적합한 상황 | 위험도 |
|---|---|---|---|
| Weak Schema | 새 필드 추가 시 기본값 사용, 기존 이벤트 호환 | 필드 추가만 필요한 경우 | 낮음 |
| Upcasting | 이벤트 역직렬화 시 미들웨어에서 변환 | 필드 이름 변경, 타입 변경 | 중간 |
| New Event Type | 완전히 새로운 이벤트 타입 정의 | 이벤트 의미 자체가 변경 | 낮음 |
| Copy-Replace Stream | 스트림을 새 스키마로 복제 후 교체 | 대규모 스키마 마이그레이션 | 높음 |
Upcasting 구현 예제
가장 실용적인 전략은 Upcasting이다. 이벤트를 역직렬화하는 과정에서 이전 버전을 현재 버전으로 변환하는 미들웨어를 둔다.
// versioning/event-upcaster.ts
type Upcaster = (event: any) => any
// 버전별 업캐스터 등록
const upcasters: Map<string, Map<number, Upcaster>> = new Map()
// OrderCreated v1 -> v2: shippingAddress를 구조화된 객체로 변경
upcasters.set(
'OrderCreated',
new Map([
[
1,
(event: any) => {
// v1에서 shippingAddress가 단일 문자열이었던 경우
const address =
typeof event.data.shippingAddress === 'string'
? {
street: event.data.shippingAddress,
city: 'UNKNOWN',
zipCode: 'UNKNOWN',
country: 'KR',
}
: event.data.shippingAddress
return {
...event,
eventVersion: 2,
data: {
...event.data,
shippingAddress: address,
// v2에서 추가된 필드에 기본값 적용
orderSource: event.data.orderSource ?? 'WEB',
},
}
},
],
])
)
function upcastEvent(event: any): any {
const eventUpcasters = upcasters.get(event.eventType)
if (!eventUpcasters) return event
let current = event
const targetVersion = Math.max(...Array.from(eventUpcasters.keys())) + 1
// 현재 버전부터 최신 버전까지 순차 적용
for (let v = current.eventVersion; v < targetVersion; v++) {
const upcaster = eventUpcasters.get(v)
if (upcaster) {
current = upcaster(current)
}
}
return current
}
// 사용 예
// const rawEvent = loadFromEventStore();
// const currentEvent = upcastEvent(rawEvent);
// aggregate.apply(currentEvent);
이벤트 버전 관리에서 가장 중요한 원칙은 절대 기존 이벤트를 수정하지 않는 것이다. 이벤트 스토어에 저장된 데이터는 불변이다. 대신 읽는 시점에 변환(Upcasting)하거나, 새 이벤트 타입을 정의한다.
장애 복구 절차
이벤트 소싱 시스템에서 장애 복구는 전통적 시스템과 다르다. 이벤트 스토어가 진실의 원천(Single Source of Truth)이므로, 프로젝션은 언제든 재구축할 수 있다.
프로젝션 손상 시 복구
프로젝션이 잘못된 데이터를 포함하거나, 프로젝션 로직에 버그가 있었을 때 사용하는 절차다.
- 프로젝션 서비스 중지: 해당 프로젝션의 구독을 중지한다
- 읽기 모델 테이블 DROP 또는 TRUNCATE: 기존 잘못된 데이터를 제거한다
- 체크포인트 초기화:
projection_checkpoints테이블에서 해당 프로젝션의 position을 삭제한다 - 프로젝션 서비스 재시작: 이벤트 스토어의 처음부터 모든 이벤트를 재생하여 읽기 모델을 재구축한다
- 재구축 완료 확인: 프로젝션이 현재 위치까지 따라잡았는지(caught-up) 확인한다
경고: 이벤트가 수억 건이면 재구축에 수 시간이 걸릴 수 있다. 병렬 처리 가능한 프로젝션 아키텍처를 미리 설계하거나, 파티션 단위로 재구축할 수 있도록 준비한다.
이벤트 스토어 클러스터 장애 시
EventStoreDB는 리더-팔로워 아키텍처를 사용한다. 리더 노드가 다운되면 팔로워 중 하나가 자동으로 리더로 승격된다. 이 과정에서 주의할 점은 다음과 같다.
- 쓰기 실패 처리: 리더 전환 중 쓰기가 실패하면 재시도한다.
expectedRevision으로 멱등성이 보장된다 - 구독 재연결: Persistent Subscription은 자동 재연결되지만, Catch-up Subscription은 마지막 체크포인트부터 수동 재연결해야 한다
- Split-brain 방지: 최소 3노드 클러스터를 운영하여 과반수(quorum) 기반 리더 선출이 가능하도록 한다
일반적인 장애 시나리오와 대응
| 장애 시나리오 | 원인 | 대응 방법 |
|---|---|---|
| 프로젝션 데이터 불일치 | 프로젝션 로직 버그 | 프로젝션 재구축 (이벤트 전체 재생) |
| Aggregate 로드 실패 | 이벤트 스트림 손상 | 스냅샷 폴백 후 부분 재생, 클러스터 복제본 확인 |
| 동시 쓰기 충돌 | 같은 Aggregate 동시 수정 | WrongExpectedVersionError 캐치 후 재시도 |
| 구독 지연(Consumer Lag) | 프로젝션 처리 속도 부족 | 파티셔닝 또는 프로젝션 인스턴스 수평 확장 |
| 이벤트 스토어 디스크 풀 | 이벤트 증가 | 아카이빙 정책 적용, 스트림 maxAge/maxCount 설정 |
| 스냅샷 스키마 불일치 | Aggregate 구조 변경 후 배포 | 자동 폴백으로 전체 이벤트 재생 |
운영 체크리스트
프로덕션 배포 전 반드시 확인해야 하는 항목들이다.
설계 단계 체크리스트
- 이벤트 스키마에
eventVersion필드가 포함되어 있는가 - 이벤트 메타데이터에
correlationId,causationId가 포함되어 있는가 - Aggregate의 불변식 검증 로직이 완전한가
- 커맨드 실패 시 에러 응답이 명확한가
- CQRS가 정말 필요한 도메인인가 (단순 CRUD가 아닌지 재확인)
구현 단계 체크리스트
- 프로젝션 핸들러가 멱등(Idempotent)한가
- 낙관적 동시성 제어가 구현되어 있는가 (
expectedRevision) - 이벤트 직렬화/역직렬화 테스트가 작성되어 있는가
- Upcaster가 이전 버전 이벤트를 올바르게 변환하는가
- 스냅샷 스키마 버전 불일치 시 전체 이벤트 재생으로 폴백하는가
운영 단계 체크리스트
- 이벤트 스토어 클러스터가 3노드 이상으로 구성되어 있는가
- 프로젝션 consumer lag 모니터링이 설정되어 있는가
- 이벤트 스토어 디스크 사용량 알림이 설정되어 있는가
- 프로젝션 재구축 절차가 문서화되어 있는가
- 스냅샷 생성 주기와 보관 정책이 설정되어 있는가
- Dead Letter Queue(DLQ)가 설정되어 있는가 (처리 불가 이벤트 격리)
- 장애 시 수동 이벤트 보정 스크립트가 준비되어 있는가
흔한 실수와 안티패턴
실전에서 자주 발생하는 실수를 정리한다. 이벤트 소싱 프로젝트가 실패하는 대부분의 원인이 여기 있다.
안티패턴 1: 이벤트에 현재 상태 전체를 저장
// BAD: 이벤트에 전체 상태를 넣는 것은 CRUD와 다를 바 없다
interface OrderUpdated {
eventType: 'OrderUpdated'
data: {
order: Order // 전체 Order 객체
}
}
// GOOD: 변경된 사실만 이벤트로 기록
interface OrderItemAdded {
eventType: 'OrderItemAdded'
data: {
orderId: string
productId: string
quantity: number
unitPrice: number
}
}
안티패턴 2: 프로젝션에서 외부 서비스 호출
프로젝션 핸들러에서 외부 API를 호출하면, 이벤트 재생(replay) 시 부작용이 발생한다. 프로젝션은 순수 함수처럼 이벤트 데이터만으로 읽기 모델을 갱신해야 한다. 외부 서비스 호출이 필요한 로직은 별도의 Policy 핸들러나 Saga로 분리한다.
안티패턴 3: 모든 도메인에 이벤트 소싱 적용
이벤트 소싱은 상태 변경 이력이 비즈니스 가치를 갖는 도메인에 적용한다. 사용자 설정, 코드 테이블 등 단순 CRUD 도메인에 이벤트 소싱을 적용하면 복잡도만 올라가고 얻는 것은 없다. 시스템의 핵심 도메인(Core Domain)에만 선별적으로 적용하는 것이 올바른 전략이다.
안티패턴 4: 이벤트 스트림에 10만 건 이상 축적
하나의 Aggregate 스트림에 이벤트가 10만 건 이상 쌓이면 로딩 시간이 수 초 이상 걸린다. 스냅샷으로 완화할 수 있지만, 근본 원인은 Aggregate 경계가 잘못 설계된 것이다. Aggregate를 더 작은 단위로 분할하거나, 주기적 이벤트 생성 패턴을 재검토한다.
이벤트 소싱 vs 전통 CRUD 의사결정 기준
마지막으로, 이벤트 소싱 도입 여부를 결정할 때 참고할 비교표다.
| 기준 | 전통 CRUD | 이벤트 소싱 |
|---|---|---|
| 현재 상태 조회 | 단순 (직접 읽기) | 복잡 (이벤트 재생 또는 프로젝션) |
| 변경 이력 추적 | 별도 감사 로그 필요 | 자동 확보 (이벤트 자체가 이력) |
| 스키마 변경 | ALTER TABLE 마이그레이션 | 이벤트 버전 관리 + Upcasting |
| 디버깅 | 현재 상태만 확인 가능 | 전체 이력 재생으로 문제 원인 추적 가능 |
| 데이터 정합성 보장 | ACID 트랜잭션 | Aggregate 단위 일관성 + 최종 일관성 |
| 저장 공간 | 현재 상태만 저장 | 모든 이벤트 축적 (저장 비용 높음) |
| 읽기 성능 | 인덱스 최적화로 충분 | 프로젝션 설계 필수 |
| 초기 개발 속도 | 빠름 | 느림 (학습 곡선, 인프라 구성) |
| 복잡 도메인 대응력 | 도메인 복잡도 증가 시 한계 | 이벤트 중심 모델링으로 확장성 확보 |
| 팀 역량 요구 | 일반적 | DDD, 이벤트 모델링 경험 필요 |
금융 거래, 물류 추적, 의료 기록, 협업 도구처럼 변경 이력 자체가 비즈니스 가치인 도메인에서 이벤트 소싱은 탁월한 선택이다. 그 외에는 전통 CRUD를 유지하되, 필요한 부분에만 감사 로그를 추가하는 것이 현실적이다.
정리
이벤트 소싱과 CQRS는 강력하지만 복잡한 패턴이다. 성공적으로 도입하려면 다음을 기억한다.
- 이벤트 스키마 설계에 가장 많은 시간을 투자한다. 이벤트는 영원히 남는다.
- 프로젝션은 반드시 멱등하게 구현한다. 이벤트 재생이 안전해야 운영이 가능하다.
- 스냅샷은 성능 최적화 도구이지, 아키텍처의 필수 요소가 아니다. 필요할 때 도입한다.
- 이벤트 버전 관리 전략을 프로젝트 초기에 확립한다. 나중에 추가하면 고통스럽다.
- 모든 도메인에 적용하지 않는다. 핵심 도메인에만 선별적으로 적용한다.
References
- Microsoft Azure - Event Sourcing Pattern
- Microsoft Azure - CQRS Pattern
- Martin Fowler - Event Sourcing
- Martin Fowler - CQRS
- EventStoreDB Documentation - Projections
- AWS Prescriptive Guidance - Event Sourcing Pattern
- Oskar Dudycz - How to (not) do the events versioning
- Kurrent Blog - Snapshots in Event Sourcing
- microservices.io - Event Sourcing
- Greg Young - Versioning in an Event Sourced System