들어가며
전통적인 CRUD 시스템은 데이터의 현재 상태만 저장한다. UPDATE가 실행되면 이전 값은 사라지고, DELETE가 실행되면 데이터 자체가 소멸한다. 은행 계좌 잔액이 100만원이라는 사실은 알 수 있지만, 그 잔액이 어떤 입출금 과정을 거쳐 형성되었는지는 알 수 없다.
Event Sourcing은 이 문제를 근본적으로 해결한다. **상태 변경 자체를 이벤트로 기록**하고, 현재 상태는 이벤트의 순차적 재생으로 복원한다. CQRS는 이 구조 위에서 **쓰기(Command)와 읽기(Query)를 완전히 분리**하여 각각의 요구사항에 최적화한다.
이 글에서는 Event Sourcing과 CQRS를 결합한 아키텍처의 전체 구현 과정을 다룬다. 이벤트 스토어 설계부터 Aggregate 구현, 프로젝션 구축, Saga 패턴을 활용한 분산 트랜잭션 관리, 스냅샷 최적화, 그리고 운영 시 발생하는 장애 사례와 복구 절차까지 실전 코드와 함께 안내한다.
1. Event Sourcing 핵심 원리
1.1 이벤트란 무엇인가
Event Sourcing에서 이벤트(Event)는 **과거에 발생한 변경 불가능한 사실(Immutable Fact)**이다. 이벤트는 과거 시제로 명명되며, 비즈니스 의도를 명확히 담아야 한다.
잘못된 이벤트 설계와 올바른 이벤트 설계를 비교하면 다음과 같다.
- 잘못된 예: `NameChanged`, `EmailChanged` - 속성 단위의 이벤트는 비즈니스 의미가 없다
- 올바른 예: `CustomerRelocated`, `OrderConfirmed` - 도메인 행위를 표현하는 이벤트
1.2 상태 복원의 원리
현재 상태는 초기 상태에 모든 이벤트를 순차적으로 적용(replay)하여 계산한다.
현재 상태 = fold(초기 상태, [이벤트1, 이벤트2, ..., 이벤트N])
예를 들어 은행 계좌의 상태 복원 과정은 다음과 같다.
초기 잔액: 0원
-> AccountOpened(금액: 0원) => 잔액: 0원
-> MoneyDeposited(금액: 100만원) => 잔액: 100만원
-> MoneyWithdrawn(금액: 30만원) => 잔액: 70만원
-> MoneyDeposited(금액: 50만원) => 잔액: 120만원
1.3 이벤트의 불변성 원칙
한번 저장된 이벤트는 절대 수정하거나 삭제할 수 없다. 잘못된 이벤트가 발행되었다면 **보상 이벤트(Compensating Event)**를 새로 발행하여 논리적으로 취소해야 한다.
// 잘못된 출금이 발생한 경우 - 이벤트를 삭제하지 않는다
// 대신 보상 이벤트를 발행한다
interface MoneyWithdrawnCompensated {
type: 'MoneyWithdrawnCompensated'
originalEventId: string
amount: number
reason: string
timestamp: Date
}
2. CQRS 아키텍처 패턴
2.1 Command와 Query의 분리
CQRS의 핵심은 단순하다. **데이터를 변경하는 모델과 데이터를 조회하는 모델을 분리**하는 것이다.
[클라이언트]
│
├── Command(쓰기) ──> [Command Handler] ──> [Write Model / Event Store]
│ │
│ 이벤트 발행
│ │
│ ▼
│ [Projection Engine]
│ │
│ ▼
└── Query(읽기) ───> [Query Handler] ───> [Read Model / View DB]
2.2 왜 분리하는가
읽기와 쓰기의 요구사항은 근본적으로 다르다.
- **쓰기(Command)**: 비즈니스 규칙 검증, 도메인 불변 규칙(invariant) 보장, 트랜잭션 일관성 필요
- **읽기(Query)**: 다양한 뷰 지원, 낮은 지연시간, 높은 처리량 필요
하나의 모델로 양쪽을 만족시키려 하면 양쪽 모두 타협해야 한다. CQRS는 각각의 모델을 독립적으로 최적화할 수 있게 해준다.
2.3 Command Handler 구현
// Command 정의
interface CreateOrderCommand {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; price: number }>
}
// Command Handler
class OrderCommandHandler {
constructor(
private readonly eventStore: EventStore,
private readonly orderRepository: EventSourcedRepository<Order>
) {}
async handle(command: CreateOrderCommand): Promise<void> {
// 1. Aggregate 로드 (이벤트 재생)
const order = await this.orderRepository.load(command.orderId)
// 2. 비즈니스 로직 실행 (이벤트 생성)
order.create(command.customerId, command.items)
// 3. 이벤트 저장
await this.orderRepository.save(order)
}
}
3. Event Store 설계와 구현
3.1 이벤트 스토어 스키마
이벤트 스토어의 핵심 테이블 구조는 다음과 같다.
CREATE TABLE events (
-- 글로벌 고유 식별자
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- 이벤트가 속한 스트림(Aggregate) 식별자
stream_id VARCHAR(255) NOT NULL,
-- 스트림 내 이벤트 순서 (낙관적 동시성 제어에 사용)
stream_version BIGINT NOT NULL,
-- 이벤트 타입 (역직렬화에 사용)
event_type VARCHAR(255) NOT NULL,
-- 이벤트 페이로드 (JSON)
event_data JSONB NOT NULL,
-- 메타데이터 (상관관계 ID, 사용자 정보 등)
metadata JSONB DEFAULT '{}',
-- 이벤트 발생 시각
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- 글로벌 순서 (프로젝션에서 전체 순서 추적에 사용)
global_position BIGSERIAL NOT NULL,
-- 같은 스트림 내에서 버전 중복 방지 (낙관적 동시성 제어)
UNIQUE(stream_id, stream_version)
);
-- 스트림별 이벤트 조회 인덱스
CREATE INDEX idx_events_stream ON events(stream_id, stream_version);
-- 전체 이벤트 순서 인덱스 (프로젝션용)
CREATE INDEX idx_events_global ON events(global_position);
-- 이벤트 타입별 조회 인덱스
CREATE INDEX idx_events_type ON events(event_type);
3.2 TypeScript Event Store 구현
interface DomainEvent {
eventType: string
data: Record<string, unknown>
metadata?: Record<string, unknown>
}
interface StoredEvent extends DomainEvent {
eventId: string
streamId: string
streamVersion: number
globalPosition: number
createdAt: Date
}
class PostgresEventStore {
constructor(private readonly pool: Pool) {}
/**
* 이벤트를 스트림에 추가한다.
* expectedVersion으로 낙관적 동시성 제어를 수행한다.
*/
async appendToStream(
streamId: string,
expectedVersion: number,
events: DomainEvent[]
): Promise<StoredEvent[]> {
const client = await this.pool.connect()
try {
await client.query('BEGIN')
// 낙관적 동시성 검사
const versionCheck = await client.query(
'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',
[streamId]
)
const currentVersion = versionCheck.rows[0].current_version ?? -1
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current version is ${currentVersion}`
)
}
const storedEvents: StoredEvent[] = []
for (let i = 0; i < events.length; i++) {
const event = events[i]
const version = expectedVersion + i + 1
const result = await client.query(
`INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)
VALUES ($1, $2, $3, $4, $5)
RETURNING event_id, global_position, created_at`,
[
streamId,
version,
event.eventType,
JSON.stringify(event.data),
JSON.stringify(event.metadata ?? {}),
]
)
storedEvents.push({
eventId: result.rows[0].event_id,
streamId,
streamVersion: version,
globalPosition: result.rows[0].global_position,
createdAt: result.rows[0].created_at,
...event,
})
}
await client.query('COMMIT')
return storedEvents
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
/**
* 스트림의 모든 이벤트를 순서대로 읽는다.
*/
async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE stream_id = $1 AND stream_version >= $2
ORDER BY stream_version ASC`,
[streamId, fromVersion]
)
return result.rows.map(this.mapToStoredEvent)
}
/**
* 전체 이벤트를 글로벌 순서로 읽는다 (프로젝션용).
*/
async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE global_position > $1
ORDER BY global_position ASC
LIMIT $2`,
[fromPosition, limit]
)
return result.rows.map(this.mapToStoredEvent)
}
private mapToStoredEvent(row: any): StoredEvent {
return {
eventId: row.event_id,
streamId: row.stream_id,
streamVersion: row.stream_version,
globalPosition: row.global_position,
eventType: row.event_type,
data: row.event_data,
metadata: row.metadata,
createdAt: row.created_at,
}
}
}
3.3 전용 Event Store 솔루션 비교
| 항목 | EventStoreDB (Kurrent) | Axon Server | PostgreSQL 직접 구현 |
| ----------- | ---------------------- | ----------------------- | ---------------------- |
| 타입 | 전용 이벤트 DB | CQRS 프레임워크 서버 | 범용 RDBMS |
| 프로젝션 | 서버 사이드 JavaScript | Tracking Processor | 직접 구현 필요 |
| 구독 모델 | Catch-up, Persistent | Tracking, Subscribing | Polling, LISTEN/NOTIFY |
| 클러스터링 | 내장 (Gossip 프로토콜) | Axon Server Enterprise | 외부 솔루션 필요 |
| 학습 곡선 | 중간 | 높음 (프레임워크 전체) | 낮음 (SQL 기반) |
| 유연성 | 중간 | 낮음 (Axon 생태계 종속) | 높음 |
| 운영 복잡도 | 중간 | 높음 | 낮음~중간 |
4. Aggregate와 Domain Event 구현
4.1 Aggregate Base Class
Event Sourcing에서 Aggregate는 이벤트를 발행하고 이벤트를 재생하여 상태를 복원하는 핵심 단위다.
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = []
private _version: number = -1
get version(): number {
return this._version
}
/**
* 외부에서 호출하여 상태를 변경한다.
* 내부적으로 이벤트를 생성하고 적용한다.
*/
protected apply(event: DomainEvent): void {
this.when(event)
this.uncommittedEvents.push(event)
this._version++
}
/**
* 이벤트에 따라 내부 상태를 변경하는 핸들러.
* 하위 클래스에서 구현한다.
*/
protected abstract when(event: DomainEvent): void
/**
* 저장된 이벤트를 재생하여 상태를 복원한다.
*/
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event)
this._version++
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents]
}
clearUncommittedEvents(): void {
this.uncommittedEvents = []
}
}
4.2 Order Aggregate 구현
// Domain Events
interface OrderCreated {
eventType: 'OrderCreated'
data: {
orderId: string
customerId: string
items: Array<{ productId: string; quantity: number; unitPrice: number }>
totalAmount: number
}
}
interface OrderConfirmed {
eventType: 'OrderConfirmed'
data: {
orderId: string
confirmedAt: string
}
}
interface OrderCancelled {
eventType: 'OrderCancelled'
data: {
orderId: string
reason: string
cancelledAt: string
}
}
type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled
// Order Aggregate
class Order extends EventSourcedAggregate {
private orderId!: string
private customerId!: string
private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []
private totalAmount: number = 0
private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'
static create(
orderId: string,
customerId: string,
items: Array<{ productId: string; quantity: number; unitPrice: number }>
): Order {
const order = new Order()
const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)
// 비즈니스 규칙 검증
if (items.length === 0) {
throw new Error('주문에는 최소 1개 이상의 상품이 필요합니다')
}
if (totalAmount <= 0) {
throw new Error('주문 총액은 0보다 커야 합니다')
}
order.apply({
eventType: 'OrderCreated',
data: { orderId, customerId, items, totalAmount },
})
return order
}
confirm(): void {
if (this.status !== 'CREATED') {
throw new Error('CREATED 상태의 주문만 확인할 수 있습니다')
}
this.apply({
eventType: 'OrderConfirmed',
data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },
})
}
cancel(reason: string): void {
if (this.status === 'CANCELLED') {
throw new Error('이미 취소된 주문입니다')
}
this.apply({
eventType: 'OrderCancelled',
data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },
})
}
protected when(event: DomainEvent): void {
const orderEvent = event as OrderEvent
switch (orderEvent.eventType) {
case 'OrderCreated':
this.orderId = orderEvent.data.orderId
this.customerId = orderEvent.data.customerId
this.items = orderEvent.data.items
this.totalAmount = orderEvent.data.totalAmount
this.status = 'CREATED'
break
case 'OrderConfirmed':
this.status = 'CONFIRMED'
break
case 'OrderCancelled':
this.status = 'CANCELLED'
break
}
}
}
5. Projection과 Read Model
5.1 프로젝션의 역할
프로젝션(Projection)은 이벤트 스트림을 구독하여 **읽기에 최적화된 뷰(Read Model)**를 구축하는 프로세스다. Event Sourcing의 이벤트 스트림에서 직접 조회하는 것은 비효율적이므로, 다양한 조회 요구사항에 맞는 비정규화된 뷰를 별도로 유지한다.
5.2 Projection Engine 구현
interface ProjectionCheckpoint {
projectionName: string
lastProcessedPosition: number
}
abstract class Projection {
abstract readonly name: string
abstract handle(event: StoredEvent): Promise<void>
canHandle(eventType: string): boolean {
return this.handledEventTypes().includes(eventType)
}
abstract handledEventTypes(): string[]
}
class ProjectionEngine {
private projections: Projection[] = []
constructor(
private readonly eventStore: PostgresEventStore,
private readonly checkpointStore: CheckpointStore
) {}
register(projection: Projection): void {
this.projections.push(projection)
}
/**
* 등록된 모든 프로젝션을 실행한다.
* 각 프로젝션은 마지막으로 처리한 위치부터 재개한다.
*/
async run(): Promise<void> {
while (true) {
for (const projection of this.projections) {
const checkpoint = await this.checkpointStore.get(projection.name)
const lastPosition = checkpoint?.lastProcessedPosition ?? 0
const events = await this.eventStore.readAll(lastPosition, 100)
for (const event of events) {
if (projection.canHandle(event.eventType)) {
try {
await projection.handle(event)
} catch (error) {
console.error(
`Projection ${projection.name} failed at position ${event.globalPosition}:`,
error
)
// 실패 시 해당 프로젝션 중단, 다른 프로젝션은 계속 진행
break
}
}
await this.checkpointStore.save({
projectionName: projection.name,
lastProcessedPosition: event.globalPosition,
})
}
}
// 폴링 간격
await new Promise((resolve) => setTimeout(resolve, 500))
}
}
}
5.3 주문 대시보드 Read Model
class OrderDashboardProjection extends Projection {
readonly name = 'order-dashboard'
constructor(private readonly db: Pool) {
super()
}
handledEventTypes(): string[] {
return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']
}
async handle(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.db.query(
`INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, 'CREATED', $4)
ON CONFLICT (order_id) DO NOTHING`,
[event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]
)
break
case 'OrderConfirmed':
await this.db.query(
`UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,
[event.data.orderId, event.data.confirmedAt]
)
break
case 'OrderCancelled':
await this.db.query(
`UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,
[event.data.orderId, event.data.reason]
)
break
}
}
}
6. Saga 패턴과 분산 트랜잭션
6.1 Saga란 무엇인가
Saga 패턴은 여러 서비스에 걸친 트랜잭션을 **일련의 로컬 트랜잭션과 보상 트랜잭션**으로 관리하는 패턴이다. 전통적인 2PC(Two-Phase Commit)와 달리 장시간 락을 잡지 않으므로 확장성이 뛰어나다.
두 가지 구현 방식이 있다.
- **Choreography(안무)**: 각 서비스가 이벤트를 발행하고 다른 서비스가 반응한다. 중앙 조율자가 없다.
- **Orchestration(오케스트레이션)**: 중앙 오케스트레이터가 전체 흐름을 관리하고 각 서비스에 명령을 전달한다.
6.2 Orchestration Saga 구현
// Saga 상태 정의
type OrderSagaStatus =
| 'STARTED'
| 'PAYMENT_PENDING'
| 'PAYMENT_COMPLETED'
| 'INVENTORY_RESERVED'
| 'COMPLETED'
| 'COMPENSATING'
| 'FAILED'
interface SagaStep {
name: string
execute: () => Promise<void>
compensate: () => Promise<void>
}
class OrderSaga {
private status: OrderSagaStatus = 'STARTED'
private completedSteps: SagaStep[] = []
constructor(
private readonly orderId: string,
private readonly paymentService: PaymentService,
private readonly inventoryService: InventoryService,
private readonly sagaLog: SagaLogStore
) {}
async execute(orderData: {
customerId: string
items: any[]
totalAmount: number
}): Promise<void> {
const steps: SagaStep[] = [
{
name: 'ProcessPayment',
execute: async () => {
await this.paymentService.processPayment(
this.orderId,
orderData.customerId,
orderData.totalAmount
)
this.status = 'PAYMENT_COMPLETED'
},
compensate: async () => {
await this.paymentService.refundPayment(this.orderId)
},
},
{
name: 'ReserveInventory',
execute: async () => {
await this.inventoryService.reserve(this.orderId, orderData.items)
this.status = 'INVENTORY_RESERVED'
},
compensate: async () => {
await this.inventoryService.releaseReservation(this.orderId)
},
},
]
for (const step of steps) {
try {
await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')
await step.execute()
await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')
this.completedSteps.push(step)
} catch (error) {
await this.sagaLog.record(this.orderId, step.name, 'FAILED')
console.error(`Saga step ${step.name} failed:`, error)
// 보상 트랜잭션 실행 (역순)
this.status = 'COMPENSATING'
await this.compensate()
this.status = 'FAILED'
return
}
}
this.status = 'COMPLETED'
await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')
}
private async compensate(): Promise<void> {
// 완료된 단계를 역순으로 보상
const stepsToCompensate = [...this.completedSteps].reverse()
for (const step of stepsToCompensate) {
try {
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')
await step.compensate()
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')
} catch (compensateError) {
// 보상 실패 시 수동 개입 필요
await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')
console.error(`Compensation failed for step ${step.name}:`, compensateError)
}
}
}
}
6.3 Choreography vs Orchestration 비교
| 항목 | Choreography | Orchestration |
| ------------- | ----------------------------- | ----------------------------- |
| 결합도 | 낮음 (이벤트 기반) | 중간 (오케스트레이터 의존) |
| 가시성 | 낮음 (흐름 추적 어려움) | 높음 (중앙에서 흐름 관리) |
| 복잡도 | 서비스 수 증가 시 급격히 상승 | 오케스트레이터에 집중 |
| 단일 장애점 | 없음 | 오케스트레이터 |
| 테스트 용이성 | 어려움 | 상대적으로 용이 |
| 적합한 경우 | 2~3개 서비스의 단순 흐름 | 4개 이상 서비스의 복잡한 흐름 |
7. 스냅샷과 이벤트 압축
7.1 스냅샷의 필요성
Aggregate의 이벤트가 수천, 수만 건에 이르면 상태 복원 시간이 급격히 증가한다. 스냅샷은 **특정 시점의 Aggregate 상태를 직렬화하여 저장**하고, 이후 이벤트만 재생하는 최적화 기법이다.
interface Snapshot {
streamId: string
version: number
state: Record<string, unknown>
createdAt: Date
}
class SnapshotRepository {
constructor(private readonly pool: Pool) {}
async save(snapshot: Snapshot): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (stream_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
[snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
)
}
async load(streamId: string): Promise<Snapshot | null> {
const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])
return result.rows.length > 0 ? result.rows[0] : null
}
}
class EventSourcedRepository<T extends EventSourcedAggregate> {
private readonly SNAPSHOT_INTERVAL = 100 // 100개 이벤트마다 스냅샷
constructor(
private readonly eventStore: PostgresEventStore,
private readonly snapshotRepo: SnapshotRepository,
private readonly factory: () => T
) {}
async load(streamId: string): Promise<T> {
const aggregate = this.factory()
// 1. 스냅샷 로드 시도
const snapshot = await this.snapshotRepo.load(streamId)
let fromVersion = 0
if (snapshot) {
;(aggregate as any).restoreFromSnapshot(snapshot.state)
;(aggregate as any)._version = snapshot.version
fromVersion = snapshot.version + 1
}
// 2. 스냅샷 이후 이벤트만 재생
const events = await this.eventStore.readStream(streamId, fromVersion)
aggregate.loadFromHistory(events)
return aggregate
}
async save(aggregate: T, streamId: string): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents()
if (uncommittedEvents.length === 0) return
await this.eventStore.appendToStream(
streamId,
aggregate.version - uncommittedEvents.length,
uncommittedEvents
)
// 스냅샷 생성 여부 확인
if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotRepo.save({
streamId,
version: aggregate.version,
state: (aggregate as any).toSnapshot(),
createdAt: new Date(),
})
}
aggregate.clearUncommittedEvents()
}
}
7.2 스냅샷 전략 가이드라인
- **생성 빈도**: 100~1,000개 이벤트마다 (도메인 특성에 따라 조정)
- **저장 위치**: 같은 스트림 또는 별도 저장소 (별도 저장소 권장)
- **생성 시점**: 비동기 백그라운드에서 생성하여 쓰기 지연시간에 영향을 주지 않도록 한다
- **재생 시간 100ms 초과 시**: 스냅샷 도입을 적극 검토한다
8. Event Sourcing vs 전통 CRUD 비교
| 항목 | Event Sourcing | 전통 CRUD |
| -------------- | ------------------------------------ | -------------------------------- |
| 데이터 저장 | 이벤트(변경 기록) 저장 | 현재 상태만 저장 |
| 이력 추적 | 완전한 감사 추적 자동 제공 | 별도 감사 테이블 필요 |
| 시간 여행 쿼리 | 특정 시점 상태 복원 가능 | 별도 스냅샷 테이블 필요 |
| 쿼리 복잡도 | 높음 (Read Model 별도 구축) | 낮음 (SQL 직접 조회) |
| 일관성 모델 | 주로 최종 일관성 | 즉시 일관성 |
| 스키마 변경 | 이벤트 버전 관리 필요 | ALTER TABLE |
| 디버깅 | 이벤트 재생으로 문제 재현 | 현재 상태만 확인 가능 |
| 학습 곡선 | 높음 | 낮음 |
| 적합한 도메인 | 금융, 주문, 물류 등 이력 중요 도메인 | 설정 관리, 카탈로그 등 단순 CRUD |
9. 장애 사례와 복구 절차
9.1 이벤트 스토어 장애
**증상**: 이벤트 저장 실패로 Command 처리 중단
**복구 절차**:
1. DB 연결 상태 확인 및 장애 원인 파악
2. 이벤트 스토어 복구 후 미처리 Command 재시도
3. 글로벌 포지션 갭 확인 - 갭이 발생한 경우 프로젝션 무결성 검증
9.2 프로젝션 동기화 지연
**증상**: Read Model이 최신 상태를 반영하지 않음
**복구 절차**:
1. 프로젝션 체크포인트와 이벤트 스토어 최신 포지션 비교
2. 지연 원인 파악 (처리 속도, 장애, 독점 이벤트 등)
3. 필요시 프로젝션 초기화 후 처음부터 재구축 (rebuild)
9.3 Saga 보상 실패
**증상**: 보상 트랜잭션 실행 중 외부 서비스 장애
**복구 절차**:
1. Saga 로그에서 COMPENSATION_FAILED 상태 조회
2. 실패한 보상 단계를 수동으로 재시도
3. 재시도 횟수 초과 시 Dead Letter Queue에 기록하고 수동 개입
9.4 이벤트 스키마 변경
**증상**: 이벤트 구조가 변경되어 기존 이벤트 역직렬화 실패
**복구 절차**:
1. Upcaster 패턴으로 구버전 이벤트를 신버전으로 변환
2. 프로젝션에서 두 버전 모두 처리 가능하도록 핸들러 수정
3. 전체 프로젝션 rebuild 실행
10. 운영 체크리스트
도입 전 검토
- 도메인에 이벤트 소싱이 정말 필요한가? (이력 추적, 감사, 시간 여행 쿼리 필요성)
- 최종 일관성을 수용할 수 있는가?
- 팀이 DDD와 이벤트 기반 사고에 익숙한가?
설계 단계
- 이벤트는 비즈니스 의도를 담고 있는가? (속성 변경이 아닌 도메인 행위)
- Aggregate 경계가 적절한가? (너무 크거나 작지 않은지)
- 이벤트 스키마 버전 관리 전략이 수립되어 있는가?
- 프로젝션 rebuild 전략이 준비되어 있는가?
운영 단계
- 이벤트 스토어 용량 모니터링 및 아카이빙 전략
- 프로젝션 지연 시간(lag) 모니터링
- Saga 실패율 및 보상 트랜잭션 성공률 추적
- 스냅샷 생성 주기와 디스크 사용량 관리
- 이벤트 스트림별 이벤트 수 모니터링 (비정상 증가 감지)
장애 대비
- 프로젝션 전체 rebuild 절차와 예상 소요 시간 문서화
- Saga Dead Letter Queue 처리 프로세스 수립
- 이벤트 스토어 백업 및 복구 절차 검증
- Upcaster 테스트 자동화
마치며
Event Sourcing과 CQRS는 강력한 아키텍처 패턴이지만, 모든 시스템에 적합한 것은 아니다. 이력 추적, 감사 로그, 시간 여행 쿼리가 핵심 요구사항인 도메인(금융, 주문, 물류 등)에서 가장 큰 가치를 발휘한다.
도입을 검토한다면 다음을 권장한다.
1. **핵심 도메인에만 선택적으로 적용**하라. 모든 서비스에 Event Sourcing을 적용할 필요는 없다.
2. **프로젝션 rebuild를 처음부터 설계**하라. 운영 중 프로젝션 재구축은 반드시 필요하게 된다.
3. **이벤트 스키마 진화 전략을 먼저 수립**하라. 이벤트는 영원히 저장되므로, 스키마 변경 비용이 가장 크다.
4. **Saga 보상 실패에 대비**하라. 분산 시스템에서 보상이 실패하는 것은 시간 문제다.
최종 일관성을 수용하고, 이벤트 기반 사고로 전환할 준비가 되어 있다면, Event Sourcing + CQRS는 시스템의 복원력과 확장성을 근본적으로 향상시키는 아키텍처 선택이 될 것이다.
현재 단락 (1/646)
전통적인 CRUD 시스템은 데이터의 현재 상태만 저장한다. UPDATE가 실행되면 이전 값은 사라지고, DELETE가 실행되면 데이터 자체가 소멸한다. 은행 계좌 잔액이 100만원...