Skip to content

필사 모드: Event Sourcing + CQRS 아키텍처 실전 구현 가이드: 이벤트 스토어부터 프로젝션·사가 패턴까지

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

전통적인 CRUD 시스템은 데이터의 현재 상태만 저장한다. UPDATE가 실행되면 이전 값은 사라지고, DELETE가 실행되면 데이터 자체가 소멸한다. 은행 계좌 잔액이 100만원이라는 사실은 알 수 있지만, 그 잔액이 어떤 입출금 과정을 거쳐 형성되었는지는 알 수 없다.

Event Sourcing은 이 문제를 근본적으로 해결한다. **상태 변경 자체를 이벤트로 기록**하고, 현재 상태는 이벤트의 순차적 재생으로 복원한다. CQRS는 이 구조 위에서 **쓰기(Command)와 읽기(Query)를 완전히 분리**하여 각각의 요구사항에 최적화한다.

이 글에서는 Event Sourcing과 CQRS를 결합한 아키텍처의 전체 구현 과정을 다룬다. 이벤트 스토어 설계부터 Aggregate 구현, 프로젝션 구축, Saga 패턴을 활용한 분산 트랜잭션 관리, 스냅샷 최적화, 그리고 운영 시 발생하는 장애 사례와 복구 절차까지 실전 코드와 함께 안내한다.

1. Event Sourcing 핵심 원리

1.1 이벤트란 무엇인가

Event Sourcing에서 이벤트(Event)는 **과거에 발생한 변경 불가능한 사실(Immutable Fact)**이다. 이벤트는 과거 시제로 명명되며, 비즈니스 의도를 명확히 담아야 한다.

잘못된 이벤트 설계와 올바른 이벤트 설계를 비교하면 다음과 같다.

- 잘못된 예: `NameChanged`, `EmailChanged` - 속성 단위의 이벤트는 비즈니스 의미가 없다

- 올바른 예: `CustomerRelocated`, `OrderConfirmed` - 도메인 행위를 표현하는 이벤트

1.2 상태 복원의 원리

현재 상태는 초기 상태에 모든 이벤트를 순차적으로 적용(replay)하여 계산한다.

현재 상태 = fold(초기 상태, [이벤트1, 이벤트2, ..., 이벤트N])

예를 들어 은행 계좌의 상태 복원 과정은 다음과 같다.

초기 잔액: 0원

-> AccountOpened(금액: 0원) => 잔액: 0원

-> MoneyDeposited(금액: 100만원) => 잔액: 100만원

-> MoneyWithdrawn(금액: 30만원) => 잔액: 70만원

-> MoneyDeposited(금액: 50만원) => 잔액: 120만원

1.3 이벤트의 불변성 원칙

한번 저장된 이벤트는 절대 수정하거나 삭제할 수 없다. 잘못된 이벤트가 발행되었다면 **보상 이벤트(Compensating Event)**를 새로 발행하여 논리적으로 취소해야 한다.

// 잘못된 출금이 발생한 경우 - 이벤트를 삭제하지 않는다

// 대신 보상 이벤트를 발행한다

interface MoneyWithdrawnCompensated {

type: 'MoneyWithdrawnCompensated'

originalEventId: string

amount: number

reason: string

timestamp: Date

}

2. CQRS 아키텍처 패턴

2.1 Command와 Query의 분리

CQRS의 핵심은 단순하다. **데이터를 변경하는 모델과 데이터를 조회하는 모델을 분리**하는 것이다.

[클라이언트]

├── Command(쓰기) ──> [Command Handler] ──> [Write Model / Event Store]

│ │

│ 이벤트 발행

│ │

│ ▼

│ [Projection Engine]

│ │

│ ▼

└── Query(읽기) ───> [Query Handler] ───> [Read Model / View DB]

2.2 왜 분리하는가

읽기와 쓰기의 요구사항은 근본적으로 다르다.

- **쓰기(Command)**: 비즈니스 규칙 검증, 도메인 불변 규칙(invariant) 보장, 트랜잭션 일관성 필요

- **읽기(Query)**: 다양한 뷰 지원, 낮은 지연시간, 높은 처리량 필요

하나의 모델로 양쪽을 만족시키려 하면 양쪽 모두 타협해야 한다. CQRS는 각각의 모델을 독립적으로 최적화할 수 있게 해준다.

2.3 Command Handler 구현

// Command 정의

interface CreateOrderCommand {

orderId: string

customerId: string

items: Array<{ productId: string; quantity: number; price: number }>

}

// Command Handler

class OrderCommandHandler {

constructor(

private readonly eventStore: EventStore,

private readonly orderRepository: EventSourcedRepository<Order>

) {}

async handle(command: CreateOrderCommand): Promise<void> {

// 1. Aggregate 로드 (이벤트 재생)

const order = await this.orderRepository.load(command.orderId)

// 2. 비즈니스 로직 실행 (이벤트 생성)

order.create(command.customerId, command.items)

// 3. 이벤트 저장

await this.orderRepository.save(order)

}

}

3. Event Store 설계와 구현

3.1 이벤트 스토어 스키마

이벤트 스토어의 핵심 테이블 구조는 다음과 같다.

CREATE TABLE events (

-- 글로벌 고유 식별자

event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

-- 이벤트가 속한 스트림(Aggregate) 식별자

stream_id VARCHAR(255) NOT NULL,

-- 스트림 내 이벤트 순서 (낙관적 동시성 제어에 사용)

stream_version BIGINT NOT NULL,

-- 이벤트 타입 (역직렬화에 사용)

event_type VARCHAR(255) NOT NULL,

-- 이벤트 페이로드 (JSON)

event_data JSONB NOT NULL,

-- 메타데이터 (상관관계 ID, 사용자 정보 등)

metadata JSONB DEFAULT '{}',

-- 이벤트 발생 시각

created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

-- 글로벌 순서 (프로젝션에서 전체 순서 추적에 사용)

global_position BIGSERIAL NOT NULL,

-- 같은 스트림 내에서 버전 중복 방지 (낙관적 동시성 제어)

UNIQUE(stream_id, stream_version)

);

-- 스트림별 이벤트 조회 인덱스

CREATE INDEX idx_events_stream ON events(stream_id, stream_version);

-- 전체 이벤트 순서 인덱스 (프로젝션용)

CREATE INDEX idx_events_global ON events(global_position);

-- 이벤트 타입별 조회 인덱스

CREATE INDEX idx_events_type ON events(event_type);

3.2 TypeScript Event Store 구현

interface DomainEvent {

eventType: string

data: Record<string, unknown>

metadata?: Record<string, unknown>

}

interface StoredEvent extends DomainEvent {

eventId: string

streamId: string

streamVersion: number

globalPosition: number

createdAt: Date

}

class PostgresEventStore {

constructor(private readonly pool: Pool) {}

/**

* 이벤트를 스트림에 추가한다.

* expectedVersion으로 낙관적 동시성 제어를 수행한다.

*/

async appendToStream(

streamId: string,

expectedVersion: number,

events: DomainEvent[]

): Promise<StoredEvent[]> {

const client = await this.pool.connect()

try {

await client.query('BEGIN')

// 낙관적 동시성 검사

const versionCheck = await client.query(

'SELECT MAX(stream_version) as current_version FROM events WHERE stream_id = $1',

[streamId]

)

const currentVersion = versionCheck.rows[0].current_version ?? -1

if (currentVersion !== expectedVersion) {

throw new ConcurrencyError(

`Expected version ${expectedVersion}, but current version is ${currentVersion}`

)

}

const storedEvents: StoredEvent[] = []

for (let i = 0; i < events.length; i++) {

const event = events[i]

const version = expectedVersion + i + 1

const result = await client.query(

`INSERT INTO events (stream_id, stream_version, event_type, event_data, metadata)

VALUES ($1, $2, $3, $4, $5)

RETURNING event_id, global_position, created_at`,

[

streamId,

version,

event.eventType,

JSON.stringify(event.data),

JSON.stringify(event.metadata ?? {}),

]

)

storedEvents.push({

eventId: result.rows[0].event_id,

streamId,

streamVersion: version,

globalPosition: result.rows[0].global_position,

createdAt: result.rows[0].created_at,

...event,

})

}

await client.query('COMMIT')

return storedEvents

} catch (error) {

await client.query('ROLLBACK')

throw error

} finally {

client.release()

}

}

/**

* 스트림의 모든 이벤트를 순서대로 읽는다.

*/

async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {

const result = await this.pool.query(

`SELECT * FROM events

WHERE stream_id = $1 AND stream_version >= $2

ORDER BY stream_version ASC`,

[streamId, fromVersion]

)

return result.rows.map(this.mapToStoredEvent)

}

/**

* 전체 이벤트를 글로벌 순서로 읽는다 (프로젝션용).

*/

async readAll(fromPosition = 0, limit = 1000): Promise<StoredEvent[]> {

const result = await this.pool.query(

`SELECT * FROM events

WHERE global_position > $1

ORDER BY global_position ASC

LIMIT $2`,

[fromPosition, limit]

)

return result.rows.map(this.mapToStoredEvent)

}

private mapToStoredEvent(row: any): StoredEvent {

return {

eventId: row.event_id,

streamId: row.stream_id,

streamVersion: row.stream_version,

globalPosition: row.global_position,

eventType: row.event_type,

data: row.event_data,

metadata: row.metadata,

createdAt: row.created_at,

}

}

}

3.3 전용 Event Store 솔루션 비교

| 항목 | EventStoreDB (Kurrent) | Axon Server | PostgreSQL 직접 구현 |

| ----------- | ---------------------- | ----------------------- | ---------------------- |

| 타입 | 전용 이벤트 DB | CQRS 프레임워크 서버 | 범용 RDBMS |

| 프로젝션 | 서버 사이드 JavaScript | Tracking Processor | 직접 구현 필요 |

| 구독 모델 | Catch-up, Persistent | Tracking, Subscribing | Polling, LISTEN/NOTIFY |

| 클러스터링 | 내장 (Gossip 프로토콜) | Axon Server Enterprise | 외부 솔루션 필요 |

| 학습 곡선 | 중간 | 높음 (프레임워크 전체) | 낮음 (SQL 기반) |

| 유연성 | 중간 | 낮음 (Axon 생태계 종속) | 높음 |

| 운영 복잡도 | 중간 | 높음 | 낮음~중간 |

4. Aggregate와 Domain Event 구현

4.1 Aggregate Base Class

Event Sourcing에서 Aggregate는 이벤트를 발행하고 이벤트를 재생하여 상태를 복원하는 핵심 단위다.

abstract class EventSourcedAggregate {

private uncommittedEvents: DomainEvent[] = []

private _version: number = -1

get version(): number {

return this._version

}

/**

* 외부에서 호출하여 상태를 변경한다.

* 내부적으로 이벤트를 생성하고 적용한다.

*/

protected apply(event: DomainEvent): void {

this.when(event)

this.uncommittedEvents.push(event)

this._version++

}

/**

* 이벤트에 따라 내부 상태를 변경하는 핸들러.

* 하위 클래스에서 구현한다.

*/

protected abstract when(event: DomainEvent): void

/**

* 저장된 이벤트를 재생하여 상태를 복원한다.

*/

loadFromHistory(events: DomainEvent[]): void {

for (const event of events) {

this.when(event)

this._version++

}

}

getUncommittedEvents(): DomainEvent[] {

return [...this.uncommittedEvents]

}

clearUncommittedEvents(): void {

this.uncommittedEvents = []

}

}

4.2 Order Aggregate 구현

// Domain Events

interface OrderCreated {

eventType: 'OrderCreated'

data: {

orderId: string

customerId: string

items: Array<{ productId: string; quantity: number; unitPrice: number }>

totalAmount: number

}

}

interface OrderConfirmed {

eventType: 'OrderConfirmed'

data: {

orderId: string

confirmedAt: string

}

}

interface OrderCancelled {

eventType: 'OrderCancelled'

data: {

orderId: string

reason: string

cancelledAt: string

}

}

type OrderEvent = OrderCreated | OrderConfirmed | OrderCancelled

// Order Aggregate

class Order extends EventSourcedAggregate {

private orderId!: string

private customerId!: string

private items: Array<{ productId: string; quantity: number; unitPrice: number }> = []

private totalAmount: number = 0

private status: 'CREATED' | 'CONFIRMED' | 'CANCELLED' = 'CREATED'

static create(

orderId: string,

customerId: string,

items: Array<{ productId: string; quantity: number; unitPrice: number }>

): Order {

const order = new Order()

const totalAmount = items.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0)

// 비즈니스 규칙 검증

if (items.length === 0) {

throw new Error('주문에는 최소 1개 이상의 상품이 필요합니다')

}

if (totalAmount <= 0) {

throw new Error('주문 총액은 0보다 커야 합니다')

}

order.apply({

eventType: 'OrderCreated',

data: { orderId, customerId, items, totalAmount },

})

return order

}

confirm(): void {

if (this.status !== 'CREATED') {

throw new Error('CREATED 상태의 주문만 확인할 수 있습니다')

}

this.apply({

eventType: 'OrderConfirmed',

data: { orderId: this.orderId, confirmedAt: new Date().toISOString() },

})

}

cancel(reason: string): void {

if (this.status === 'CANCELLED') {

throw new Error('이미 취소된 주문입니다')

}

this.apply({

eventType: 'OrderCancelled',

data: { orderId: this.orderId, reason, cancelledAt: new Date().toISOString() },

})

}

protected when(event: DomainEvent): void {

const orderEvent = event as OrderEvent

switch (orderEvent.eventType) {

case 'OrderCreated':

this.orderId = orderEvent.data.orderId

this.customerId = orderEvent.data.customerId

this.items = orderEvent.data.items

this.totalAmount = orderEvent.data.totalAmount

this.status = 'CREATED'

break

case 'OrderConfirmed':

this.status = 'CONFIRMED'

break

case 'OrderCancelled':

this.status = 'CANCELLED'

break

}

}

}

5. Projection과 Read Model

5.1 프로젝션의 역할

프로젝션(Projection)은 이벤트 스트림을 구독하여 **읽기에 최적화된 뷰(Read Model)**를 구축하는 프로세스다. Event Sourcing의 이벤트 스트림에서 직접 조회하는 것은 비효율적이므로, 다양한 조회 요구사항에 맞는 비정규화된 뷰를 별도로 유지한다.

5.2 Projection Engine 구현

interface ProjectionCheckpoint {

projectionName: string

lastProcessedPosition: number

}

abstract class Projection {

abstract readonly name: string

abstract handle(event: StoredEvent): Promise<void>

canHandle(eventType: string): boolean {

return this.handledEventTypes().includes(eventType)

}

abstract handledEventTypes(): string[]

}

class ProjectionEngine {

private projections: Projection[] = []

constructor(

private readonly eventStore: PostgresEventStore,

private readonly checkpointStore: CheckpointStore

) {}

register(projection: Projection): void {

this.projections.push(projection)

}

/**

* 등록된 모든 프로젝션을 실행한다.

* 각 프로젝션은 마지막으로 처리한 위치부터 재개한다.

*/

async run(): Promise<void> {

while (true) {

for (const projection of this.projections) {

const checkpoint = await this.checkpointStore.get(projection.name)

const lastPosition = checkpoint?.lastProcessedPosition ?? 0

const events = await this.eventStore.readAll(lastPosition, 100)

for (const event of events) {

if (projection.canHandle(event.eventType)) {

try {

await projection.handle(event)

} catch (error) {

console.error(

`Projection ${projection.name} failed at position ${event.globalPosition}:`,

error

)

// 실패 시 해당 프로젝션 중단, 다른 프로젝션은 계속 진행

break

}

}

await this.checkpointStore.save({

projectionName: projection.name,

lastProcessedPosition: event.globalPosition,

})

}

}

// 폴링 간격

await new Promise((resolve) => setTimeout(resolve, 500))

}

}

}

5.3 주문 대시보드 Read Model

class OrderDashboardProjection extends Projection {

readonly name = 'order-dashboard'

constructor(private readonly db: Pool) {

super()

}

handledEventTypes(): string[] {

return ['OrderCreated', 'OrderConfirmed', 'OrderCancelled']

}

async handle(event: StoredEvent): Promise<void> {

switch (event.eventType) {

case 'OrderCreated':

await this.db.query(

`INSERT INTO order_dashboard (order_id, customer_id, total_amount, status, created_at)

VALUES ($1, $2, $3, 'CREATED', $4)

ON CONFLICT (order_id) DO NOTHING`,

[event.data.orderId, event.data.customerId, event.data.totalAmount, event.createdAt]

)

break

case 'OrderConfirmed':

await this.db.query(

`UPDATE order_dashboard SET status = 'CONFIRMED', confirmed_at = $2 WHERE order_id = $1`,

[event.data.orderId, event.data.confirmedAt]

)

break

case 'OrderCancelled':

await this.db.query(

`UPDATE order_dashboard SET status = 'CANCELLED', cancel_reason = $2 WHERE order_id = $1`,

[event.data.orderId, event.data.reason]

)

break

}

}

}

6. Saga 패턴과 분산 트랜잭션

6.1 Saga란 무엇인가

Saga 패턴은 여러 서비스에 걸친 트랜잭션을 **일련의 로컬 트랜잭션과 보상 트랜잭션**으로 관리하는 패턴이다. 전통적인 2PC(Two-Phase Commit)와 달리 장시간 락을 잡지 않으므로 확장성이 뛰어나다.

두 가지 구현 방식이 있다.

- **Choreography(안무)**: 각 서비스가 이벤트를 발행하고 다른 서비스가 반응한다. 중앙 조율자가 없다.

- **Orchestration(오케스트레이션)**: 중앙 오케스트레이터가 전체 흐름을 관리하고 각 서비스에 명령을 전달한다.

6.2 Orchestration Saga 구현

// Saga 상태 정의

type OrderSagaStatus =

| 'STARTED'

| 'PAYMENT_PENDING'

| 'PAYMENT_COMPLETED'

| 'INVENTORY_RESERVED'

| 'COMPLETED'

| 'COMPENSATING'

| 'FAILED'

interface SagaStep {

name: string

execute: () => Promise<void>

compensate: () => Promise<void>

}

class OrderSaga {

private status: OrderSagaStatus = 'STARTED'

private completedSteps: SagaStep[] = []

constructor(

private readonly orderId: string,

private readonly paymentService: PaymentService,

private readonly inventoryService: InventoryService,

private readonly sagaLog: SagaLogStore

) {}

async execute(orderData: {

customerId: string

items: any[]

totalAmount: number

}): Promise<void> {

const steps: SagaStep[] = [

{

name: 'ProcessPayment',

execute: async () => {

await this.paymentService.processPayment(

this.orderId,

orderData.customerId,

orderData.totalAmount

)

this.status = 'PAYMENT_COMPLETED'

},

compensate: async () => {

await this.paymentService.refundPayment(this.orderId)

},

},

{

name: 'ReserveInventory',

execute: async () => {

await this.inventoryService.reserve(this.orderId, orderData.items)

this.status = 'INVENTORY_RESERVED'

},

compensate: async () => {

await this.inventoryService.releaseReservation(this.orderId)

},

},

]

for (const step of steps) {

try {

await this.sagaLog.record(this.orderId, step.name, 'EXECUTING')

await step.execute()

await this.sagaLog.record(this.orderId, step.name, 'COMPLETED')

this.completedSteps.push(step)

} catch (error) {

await this.sagaLog.record(this.orderId, step.name, 'FAILED')

console.error(`Saga step ${step.name} failed:`, error)

// 보상 트랜잭션 실행 (역순)

this.status = 'COMPENSATING'

await this.compensate()

this.status = 'FAILED'

return

}

}

this.status = 'COMPLETED'

await this.sagaLog.record(this.orderId, 'Saga', 'COMPLETED')

}

private async compensate(): Promise<void> {

// 완료된 단계를 역순으로 보상

const stepsToCompensate = [...this.completedSteps].reverse()

for (const step of stepsToCompensate) {

try {

await this.sagaLog.record(this.orderId, step.name, 'COMPENSATING')

await step.compensate()

await this.sagaLog.record(this.orderId, step.name, 'COMPENSATED')

} catch (compensateError) {

// 보상 실패 시 수동 개입 필요

await this.sagaLog.record(this.orderId, step.name, 'COMPENSATION_FAILED')

console.error(`Compensation failed for step ${step.name}:`, compensateError)

}

}

}

}

6.3 Choreography vs Orchestration 비교

| 항목 | Choreography | Orchestration |

| ------------- | ----------------------------- | ----------------------------- |

| 결합도 | 낮음 (이벤트 기반) | 중간 (오케스트레이터 의존) |

| 가시성 | 낮음 (흐름 추적 어려움) | 높음 (중앙에서 흐름 관리) |

| 복잡도 | 서비스 수 증가 시 급격히 상승 | 오케스트레이터에 집중 |

| 단일 장애점 | 없음 | 오케스트레이터 |

| 테스트 용이성 | 어려움 | 상대적으로 용이 |

| 적합한 경우 | 2~3개 서비스의 단순 흐름 | 4개 이상 서비스의 복잡한 흐름 |

7. 스냅샷과 이벤트 압축

7.1 스냅샷의 필요성

Aggregate의 이벤트가 수천, 수만 건에 이르면 상태 복원 시간이 급격히 증가한다. 스냅샷은 **특정 시점의 Aggregate 상태를 직렬화하여 저장**하고, 이후 이벤트만 재생하는 최적화 기법이다.

interface Snapshot {

streamId: string

version: number

state: Record<string, unknown>

createdAt: Date

}

class SnapshotRepository {

constructor(private readonly pool: Pool) {}

async save(snapshot: Snapshot): Promise<void> {

await this.pool.query(

`INSERT INTO snapshots (stream_id, version, state, created_at)

VALUES ($1, $2, $3, $4)

ON CONFLICT (stream_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,

[snapshot.streamId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]

)

}

async load(streamId: string): Promise<Snapshot | null> {

const result = await this.pool.query('SELECT * FROM snapshots WHERE stream_id = $1', [streamId])

return result.rows.length > 0 ? result.rows[0] : null

}

}

class EventSourcedRepository<T extends EventSourcedAggregate> {

private readonly SNAPSHOT_INTERVAL = 100 // 100개 이벤트마다 스냅샷

constructor(

private readonly eventStore: PostgresEventStore,

private readonly snapshotRepo: SnapshotRepository,

private readonly factory: () => T

) {}

async load(streamId: string): Promise<T> {

const aggregate = this.factory()

// 1. 스냅샷 로드 시도

const snapshot = await this.snapshotRepo.load(streamId)

let fromVersion = 0

if (snapshot) {

;(aggregate as any).restoreFromSnapshot(snapshot.state)

;(aggregate as any)._version = snapshot.version

fromVersion = snapshot.version + 1

}

// 2. 스냅샷 이후 이벤트만 재생

const events = await this.eventStore.readStream(streamId, fromVersion)

aggregate.loadFromHistory(events)

return aggregate

}

async save(aggregate: T, streamId: string): Promise<void> {

const uncommittedEvents = aggregate.getUncommittedEvents()

if (uncommittedEvents.length === 0) return

await this.eventStore.appendToStream(

streamId,

aggregate.version - uncommittedEvents.length,

uncommittedEvents

)

// 스냅샷 생성 여부 확인

if (aggregate.version > 0 && aggregate.version % this.SNAPSHOT_INTERVAL === 0) {

await this.snapshotRepo.save({

streamId,

version: aggregate.version,

state: (aggregate as any).toSnapshot(),

createdAt: new Date(),

})

}

aggregate.clearUncommittedEvents()

}

}

7.2 스냅샷 전략 가이드라인

- **생성 빈도**: 100~1,000개 이벤트마다 (도메인 특성에 따라 조정)

- **저장 위치**: 같은 스트림 또는 별도 저장소 (별도 저장소 권장)

- **생성 시점**: 비동기 백그라운드에서 생성하여 쓰기 지연시간에 영향을 주지 않도록 한다

- **재생 시간 100ms 초과 시**: 스냅샷 도입을 적극 검토한다

8. Event Sourcing vs 전통 CRUD 비교

| 항목 | Event Sourcing | 전통 CRUD |

| -------------- | ------------------------------------ | -------------------------------- |

| 데이터 저장 | 이벤트(변경 기록) 저장 | 현재 상태만 저장 |

| 이력 추적 | 완전한 감사 추적 자동 제공 | 별도 감사 테이블 필요 |

| 시간 여행 쿼리 | 특정 시점 상태 복원 가능 | 별도 스냅샷 테이블 필요 |

| 쿼리 복잡도 | 높음 (Read Model 별도 구축) | 낮음 (SQL 직접 조회) |

| 일관성 모델 | 주로 최종 일관성 | 즉시 일관성 |

| 스키마 변경 | 이벤트 버전 관리 필요 | ALTER TABLE |

| 디버깅 | 이벤트 재생으로 문제 재현 | 현재 상태만 확인 가능 |

| 학습 곡선 | 높음 | 낮음 |

| 적합한 도메인 | 금융, 주문, 물류 등 이력 중요 도메인 | 설정 관리, 카탈로그 등 단순 CRUD |

9. 장애 사례와 복구 절차

9.1 이벤트 스토어 장애

**증상**: 이벤트 저장 실패로 Command 처리 중단

**복구 절차**:

1. DB 연결 상태 확인 및 장애 원인 파악

2. 이벤트 스토어 복구 후 미처리 Command 재시도

3. 글로벌 포지션 갭 확인 - 갭이 발생한 경우 프로젝션 무결성 검증

9.2 프로젝션 동기화 지연

**증상**: Read Model이 최신 상태를 반영하지 않음

**복구 절차**:

1. 프로젝션 체크포인트와 이벤트 스토어 최신 포지션 비교

2. 지연 원인 파악 (처리 속도, 장애, 독점 이벤트 등)

3. 필요시 프로젝션 초기화 후 처음부터 재구축 (rebuild)

9.3 Saga 보상 실패

**증상**: 보상 트랜잭션 실행 중 외부 서비스 장애

**복구 절차**:

1. Saga 로그에서 COMPENSATION_FAILED 상태 조회

2. 실패한 보상 단계를 수동으로 재시도

3. 재시도 횟수 초과 시 Dead Letter Queue에 기록하고 수동 개입

9.4 이벤트 스키마 변경

**증상**: 이벤트 구조가 변경되어 기존 이벤트 역직렬화 실패

**복구 절차**:

1. Upcaster 패턴으로 구버전 이벤트를 신버전으로 변환

2. 프로젝션에서 두 버전 모두 처리 가능하도록 핸들러 수정

3. 전체 프로젝션 rebuild 실행

10. 운영 체크리스트

도입 전 검토

- 도메인에 이벤트 소싱이 정말 필요한가? (이력 추적, 감사, 시간 여행 쿼리 필요성)

- 최종 일관성을 수용할 수 있는가?

- 팀이 DDD와 이벤트 기반 사고에 익숙한가?

설계 단계

- 이벤트는 비즈니스 의도를 담고 있는가? (속성 변경이 아닌 도메인 행위)

- Aggregate 경계가 적절한가? (너무 크거나 작지 않은지)

- 이벤트 스키마 버전 관리 전략이 수립되어 있는가?

- 프로젝션 rebuild 전략이 준비되어 있는가?

운영 단계

- 이벤트 스토어 용량 모니터링 및 아카이빙 전략

- 프로젝션 지연 시간(lag) 모니터링

- Saga 실패율 및 보상 트랜잭션 성공률 추적

- 스냅샷 생성 주기와 디스크 사용량 관리

- 이벤트 스트림별 이벤트 수 모니터링 (비정상 증가 감지)

장애 대비

- 프로젝션 전체 rebuild 절차와 예상 소요 시간 문서화

- Saga Dead Letter Queue 처리 프로세스 수립

- 이벤트 스토어 백업 및 복구 절차 검증

- Upcaster 테스트 자동화

마치며

Event Sourcing과 CQRS는 강력한 아키텍처 패턴이지만, 모든 시스템에 적합한 것은 아니다. 이력 추적, 감사 로그, 시간 여행 쿼리가 핵심 요구사항인 도메인(금융, 주문, 물류 등)에서 가장 큰 가치를 발휘한다.

도입을 검토한다면 다음을 권장한다.

1. **핵심 도메인에만 선택적으로 적용**하라. 모든 서비스에 Event Sourcing을 적용할 필요는 없다.

2. **프로젝션 rebuild를 처음부터 설계**하라. 운영 중 프로젝션 재구축은 반드시 필요하게 된다.

3. **이벤트 스키마 진화 전략을 먼저 수립**하라. 이벤트는 영원히 저장되므로, 스키마 변경 비용이 가장 크다.

4. **Saga 보상 실패에 대비**하라. 분산 시스템에서 보상이 실패하는 것은 시간 문제다.

최종 일관성을 수용하고, 이벤트 기반 사고로 전환할 준비가 되어 있다면, Event Sourcing + CQRS는 시스템의 복원력과 확장성을 근본적으로 향상시키는 아키텍처 선택이 될 것이다.

현재 단락 (1/646)

전통적인 CRUD 시스템은 데이터의 현재 상태만 저장한다. UPDATE가 실행되면 이전 값은 사라지고, DELETE가 실행되면 데이터 자체가 소멸한다. 은행 계좌 잔액이 100만원...

작성 글자: 0원문 글자: 17,166작성 단락: 0/646