- Published on
이벤트 드리븐 아키텍처 패턴 가이드 2025: CQRS, Event Sourcing, Saga 패턴까지
- Authors

- Name
- Youngju Kim
- @fjvbn20031
목차
1. 이벤트 드리븐 아키텍처 기본 개념
이벤트 드리븐 아키텍처(EDA)는 시스템의 컴포넌트가 이벤트를 통해 통신하는 소프트웨어 아키텍처 패턴입니다. 서비스 간 직접 호출 대신 이벤트를 발행하고 구독하는 방식으로, 느슨한 결합과 높은 확장성을 달성합니다.
1.1 핵심 개념
이벤트(Event): 시스템에서 발생한 의미 있는 사실(fact)입니다. 과거형으로 명명합니다. 예: OrderPlaced, PaymentCompleted, InventoryReserved.
이벤트 프로듀서(Producer): 이벤트를 생성하고 발행하는 컴포넌트입니다.
이벤트 컨슈머(Consumer): 이벤트를 구독하고 처리하는 컴포넌트입니다.
이벤트 브로커(Broker): 프로듀서와 컨슈머 사이에서 이벤트를 전달하는 인프라입니다. Kafka, RabbitMQ, AWS EventBridge 등이 있습니다.
1.2 이벤트 유형
이벤트 유형
├── Domain Event (도메인 이벤트)
│ └── 비즈니스 도메인에서 발생한 사실
│ 예: OrderPlaced, PaymentFailed
├── Integration Event (통합 이벤트)
│ └── 바운디드 컨텍스트 간 통신을 위한 이벤트
│ 예: OrderConfirmedIntegrationEvent
├── Event Notification (이벤트 알림)
│ └── 최소한의 데이터만 포함. 소비자가 필요시 조회
│ 예: OrderStatusChanged (orderId만 포함)
└── Event-Carried State Transfer (상태 전달 이벤트)
└── 전체 상태를 포함. 소비자가 별도 조회 불필요
예: OrderDetails (전체 주문 정보 포함)
1.3 Pub/Sub 패턴
// TypeScript - 이벤트 버스 인터페이스
interface EventBus {
publish<T extends DomainEvent>(event: T): Promise<void>;
subscribe<T extends DomainEvent>(
eventType: string,
handler: EventHandler<T>
): void;
unsubscribe(eventType: string, handler: EventHandler<unknown>): void;
}
interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: Date;
version: number;
metadata: Record<string, string>;
}
interface EventHandler<T extends DomainEvent> {
handle(event: T): Promise<void>;
}
2. 도메인 이벤트
2.1 도메인 이벤트 설계 원칙
좋은 도메인 이벤트는 다음 원칙을 따릅니다.
불변성(Immutable): 이벤트는 이미 발생한 사실이므로 변경할 수 없습니다.
과거형 명명: OrderPlaced (O), PlaceOrder (X). 이벤트는 이미 일어난 일입니다.
자기 서술적(Self-describing): 이벤트만으로 무슨 일이 있었는지 이해할 수 있어야 합니다.
비즈니스 의미: 기술적 구현이 아닌 비즈니스 관점에서 명명합니다.
2.2 도메인 이벤트 구현
// 기본 도메인 이벤트
abstract class BaseDomainEvent implements DomainEvent {
readonly eventId: string;
readonly occurredAt: Date;
readonly version: number;
abstract readonly eventType: string;
abstract readonly aggregateId: string;
metadata: Record<string, string> = {};
constructor() {
this.eventId = crypto.randomUUID();
this.occurredAt = new Date();
this.version = 1;
}
}
// 주문 생성 이벤트
class OrderPlaced extends BaseDomainEvent {
readonly eventType = 'order.placed';
readonly aggregateId: string;
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: OrderItem[],
public readonly totalAmount: Money,
public readonly shippingAddress: Address
) {
super();
this.aggregateId = orderId;
}
}
// 결제 완료 이벤트
class PaymentCompleted extends BaseDomainEvent {
readonly eventType = 'payment.completed';
readonly aggregateId: string;
constructor(
public readonly paymentId: string,
public readonly orderId: string,
public readonly amount: Money,
public readonly paymentMethod: string
) {
super();
this.aggregateId = paymentId;
}
}
// 재고 예약 이벤트
class InventoryReserved extends BaseDomainEvent {
readonly eventType = 'inventory.reserved';
readonly aggregateId: string;
constructor(
public readonly reservationId: string,
public readonly orderId: string,
public readonly items: ReservedItem[]
) {
super();
this.aggregateId = reservationId;
}
}
2.3 이벤트 스키마 설계
// 이벤트 스키마 (Avro-like)
const OrderPlacedSchema = {
type: 'record',
name: 'OrderPlaced',
namespace: 'com.ecommerce.orders.v1',
fields: [
{ name: 'eventId', type: 'string' },
{ name: 'eventType', type: 'string' },
{ name: 'aggregateId', type: 'string' },
{ name: 'occurredAt', type: 'string' }, // ISO 8601
{ name: 'version', type: 'int' },
{ name: 'orderId', type: 'string' },
{ name: 'customerId', type: 'string' },
{
name: 'items',
type: {
type: 'array',
items: {
type: 'record',
name: 'OrderItem',
fields: [
{ name: 'productId', type: 'string' },
{ name: 'quantity', type: 'int' },
{ name: 'unitPrice', type: 'long' },
],
},
},
},
{ name: 'totalAmount', type: 'long' },
{ name: 'currency', type: 'string' },
],
};
3. Event Sourcing
Event Sourcing은 애플리케이션의 상태를 이벤트의 시퀀스로 저장하는 패턴입니다. 현재 상태를 직접 저장하는 대신, 상태 변경의 이력을 모두 기록합니다.
3.1 기본 개념
전통적 방식 (CRUD):
┌─────────────────────────┐
│ orders 테이블 │
│ id: ORD-001 │
│ status: shipped │ ← 현재 상태만 저장
│ total: 50000 │
│ updated_at: 2025-03-24 │
└─────────────────────────┘
Event Sourcing 방식:
┌─────────────────────────────────────────┐
│ event_store │
│ 1. OrderPlaced (ORD-001, 2025-03-24) │
│ 2. PaymentReceived(ORD-001, 2025-03-24) │
│ 3. ItemPacked (ORD-001, 2025-03-24) │
│ 4. OrderShipped (ORD-001, 2025-03-24) │ ← 모든 이력 보존
└─────────────────────────────────────────┘
3.2 이벤트 스토어 구현
// 이벤트 스토어 인터페이스
interface EventStore {
// 이벤트 추가 (낙관적 동시성 제어)
append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void>;
// 스트림에서 이벤트 읽기
readStream(
streamId: string,
fromVersion?: number
): Promise<DomainEvent[]>;
// 전체 이벤트 구독
subscribe(
handler: (event: DomainEvent) => Promise<void>
): void;
}
// PostgreSQL 기반 이벤트 스토어
class PostgresEventStore implements EventStore {
async append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 낙관적 동시성 검사
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE stream_id = $1',
[streamId]
);
const currentVersion = result.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current is ${currentVersion}`
);
}
// 이벤트 저장
let version = expectedVersion;
for (const event of events) {
version++;
await client.query(
`INSERT INTO events (event_id, stream_id, event_type, data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.eventId,
streamId,
event.eventType,
JSON.stringify(event),
JSON.stringify(event.metadata),
version,
event.occurredAt,
]
);
}
await client.query('COMMIT');
// 이벤트 발행 (비동기)
for (const event of events) {
await this.eventBus.publish(event);
}
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async readStream(streamId: string, fromVersion = 0): Promise<DomainEvent[]> {
const result = await this.pool.query(
'SELECT data FROM events WHERE stream_id = $1 AND version > $2 ORDER BY version ASC',
[streamId, fromVersion]
);
return result.rows.map((row) => JSON.parse(row.data));
}
}
3.3 Aggregate와 Event Sourcing
// Event Sourced Aggregate
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = [];
private _version: number = 0;
get version(): number {
return this._version;
}
// 이벤트 적용 (상태 변경)
protected apply(event: DomainEvent): void {
this.when(event);
this.uncommittedEvents.push(event);
this._version++;
}
// 이벤트 핸들러 (하위 클래스에서 구현)
protected abstract when(event: DomainEvent): void;
// 이벤트로부터 상태 복원
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event);
this._version++;
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
}
// 주문 Aggregate
class Order extends EventSourcedAggregate {
private id: string = '';
private customerId: string = '';
private items: OrderItem[] = [];
private status: OrderStatus = OrderStatus.DRAFT;
private totalAmount: number = 0;
// 명령: 주문 생성
static create(
orderId: string,
customerId: string,
items: OrderItem[]
): Order {
const order = new Order();
const totalAmount = items.reduce(
(sum, item) => sum + item.unitPrice * item.quantity, 0
);
order.apply(new OrderPlaced(orderId, customerId, items, totalAmount));
return order;
}
// 명령: 결제 확인
confirmPayment(paymentId: string, amount: number): void {
if (this.status !== OrderStatus.PLACED) {
throw new Error('Order is not in placed status');
}
this.apply(new OrderPaymentConfirmed(this.id, paymentId, amount));
}
// 명령: 주문 취소
cancel(reason: string): void {
if (this.status === OrderStatus.SHIPPED) {
throw new Error('Cannot cancel shipped order');
}
this.apply(new OrderCancelled(this.id, reason));
}
// 이벤트 핸들러
protected when(event: DomainEvent): void {
if (event instanceof OrderPlaced) {
this.id = event.orderId;
this.customerId = event.customerId;
this.items = event.items;
this.totalAmount = event.totalAmount;
this.status = OrderStatus.PLACED;
} else if (event instanceof OrderPaymentConfirmed) {
this.status = OrderStatus.CONFIRMED;
} else if (event instanceof OrderCancelled) {
this.status = OrderStatus.CANCELLED;
}
}
}
3.4 스냅샷
이벤트가 많이 쌓이면 Aggregate 복원이 느려집니다. 스냅샷으로 이를 최적화합니다.
class SnapshotStore {
async saveSnapshot(
streamId: string,
snapshot: AggregateSnapshot
): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (stream_id, version, data, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (stream_id) DO UPDATE SET version = $2, data = $3, created_at = NOW()`,
[streamId, snapshot.version, JSON.stringify(snapshot.state)]
);
}
async getLatestSnapshot(streamId: string): Promise<AggregateSnapshot | null> {
const result = await this.pool.query(
'SELECT version, data FROM snapshots WHERE stream_id = $1',
[streamId]
);
if (result.rows.length === 0) return null;
return {
version: result.rows[0].version,
state: JSON.parse(result.rows[0].data),
};
}
}
// 스냅샷을 활용한 Aggregate 로딩
class OrderRepository {
async getById(orderId: string): Promise<Order> {
const order = new Order();
// 1. 스냅샷 로드
const snapshot = await this.snapshotStore.getLatestSnapshot(orderId);
let fromVersion = 0;
if (snapshot) {
order.restoreFromSnapshot(snapshot.state);
fromVersion = snapshot.version;
}
// 2. 스냅샷 이후의 이벤트만 로드
const events = await this.eventStore.readStream(orderId, fromVersion);
order.loadFromHistory(events);
// 3. 일정 이벤트 수 이후 스냅샷 생성
if (events.length > 50) {
await this.snapshotStore.saveSnapshot(orderId, {
version: order.version,
state: order.toSnapshot(),
});
}
return order;
}
}
3.5 프로젝션 (Read Model)
// 주문 조회 프로젝션
class OrderProjection {
async handle(event: DomainEvent): Promise<void> {
if (event instanceof OrderPlaced) {
await this.db.query(
`INSERT INTO order_read_model (id, customer_id, status, total_amount, created_at)
VALUES ($1, $2, $3, $4, $5)`,
[event.orderId, event.customerId, 'placed', event.totalAmount, event.occurredAt]
);
// 주문 아이템 프로젝션
for (const item of event.items) {
await this.db.query(
`INSERT INTO order_items_read_model (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)`,
[event.orderId, item.productId, item.quantity, item.unitPrice]
);
}
} else if (event instanceof OrderPaymentConfirmed) {
await this.db.query(
'UPDATE order_read_model SET status = $1, payment_id = $2, updated_at = $3 WHERE id = $4',
['confirmed', event.paymentId, event.occurredAt, event.orderId]
);
} else if (event instanceof OrderShipped) {
await this.db.query(
'UPDATE order_read_model SET status = $1, tracking_number = $2, shipped_at = $3 WHERE id = $4',
['shipped', event.trackingNumber, event.occurredAt, event.orderId]
);
}
}
}
4. CQRS (Command Query Responsibility Segregation)
CQRS는 쓰기(Command)와 읽기(Query) 모델을 분리하는 패턴입니다.
4.1 기본 구조
┌──────────────┐ Command ┌──────────────┐
│ Client │ ───────────────► │ Command Side │
│ │ │ (Write) │
│ │ Query │ │
│ │ ───────────────► │ │
└──────────────┘ └──────┬───────┘
│
Events│
▼
┌──────────────┐
│ Event Bus │
└──────┬───────┘
│
▼
┌──────────────┐ Query ┌──────────────┐
│ Client │ ◄─────────────── │ Query Side │
│ │ │ (Read) │
└──────────────┘ └──────────────┘
4.2 Command Side 구현
// Command 정의
interface Command {
commandId: string;
timestamp: Date;
}
class PlaceOrderCommand implements Command {
commandId: string;
timestamp: Date;
constructor(
public readonly customerId: string,
public readonly items: OrderItemDto[],
public readonly shippingAddress: AddressDto
) {
this.commandId = crypto.randomUUID();
this.timestamp = new Date();
}
}
// Command Handler
class PlaceOrderHandler {
constructor(
private orderRepository: OrderRepository,
private inventoryService: InventoryService,
private pricingService: PricingService
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// 1. 재고 확인
await this.inventoryService.checkAvailability(command.items);
// 2. 가격 계산
const pricedItems = await this.pricingService.calculatePrices(command.items);
// 3. 주문 Aggregate 생성
const orderId = crypto.randomUUID();
const order = Order.create(orderId, command.customerId, pricedItems);
// 4. 이벤트 스토어에 저장
await this.orderRepository.save(order);
return orderId;
}
}
// Command Bus
class CommandBus {
private handlers: Map<string, CommandHandler<unknown>> = new Map();
register<T extends Command>(
commandType: string,
handler: CommandHandler<T>
): void {
this.handlers.set(commandType, handler);
}
async dispatch<T extends Command>(command: T): Promise<unknown> {
const handler = this.handlers.get(command.constructor.name);
if (!handler) {
throw new Error(`No handler for ${command.constructor.name}`);
}
return handler.handle(command);
}
}
4.3 Query Side 구현
// Query 정의
class GetOrderDetailsQuery {
constructor(public readonly orderId: string) {}
}
class ListCustomerOrdersQuery {
constructor(
public readonly customerId: string,
public readonly page: number = 1,
public readonly pageSize: number = 20
) {}
}
// Query Handler
class GetOrderDetailsHandler {
constructor(private readDb: Pool) {}
async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsDto> {
const result = await this.readDb.query(
`SELECT o.*, json_agg(i.*) as items
FROM order_read_model o
JOIN order_items_read_model i ON o.id = i.order_id
WHERE o.id = $1
GROUP BY o.id`,
[query.orderId]
);
if (result.rows.length === 0) {
throw new NotFoundError(`Order ${query.orderId} not found`);
}
return this.mapToDto(result.rows[0]);
}
}
// 최종 일관성 (Eventual Consistency)
// 읽기 모델은 이벤트 처리 후 업데이트되므로 약간의 지연이 있을 수 있음
class OrderQueryService {
async getOrderWithConsistencyCheck(
orderId: string,
expectedVersion?: number
): Promise<OrderDetailsDto> {
const order = await this.getOrderDetails(orderId);
// 특정 버전이 요구되면 확인
if (expectedVersion && order.version < expectedVersion) {
// 프로젝션이 아직 따라잡지 못한 경우
// 잠시 대기 후 재시도 또는 Command 측에서 직접 조회
await this.waitForProjection(orderId, expectedVersion);
return this.getOrderDetails(orderId);
}
return order;
}
}
5. Saga 패턴
Saga 패턴은 분산 트랜잭션을 로컬 트랜잭션의 시퀀스로 관리합니다.
5.1 Choreography vs Orchestration
Choreography (안무 방식):
서비스들이 이벤트를 통해 자율적으로 협력
OrderService ──OrderPlaced──► PaymentService
│
PaymentCompleted
│
▼
InventoryService
│
InventoryReserved
│
▼
ShippingService
Orchestration (오케스트레이션 방식):
중앙 오케스트레이터가 전체 흐름을 조율
┌──────────────────┐
│ OrderSaga │
│ (Orchestrator) │
└──────┬───────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
PaymentService InventoryService ShippingService
5.2 Choreography Saga 구현
// 주문 서비스: 이벤트 발행
class OrderService {
async placeOrder(command: PlaceOrderCommand): Promise<string> {
const order = Order.create(command);
await this.orderRepository.save(order);
// OrderPlaced 이벤트가 자동 발행됨
return order.id;
}
}
// 결제 서비스: OrderPlaced 이벤트 처리
class PaymentEventHandler {
@EventHandler('order.placed')
async onOrderPlaced(event: OrderPlaced): Promise<void> {
try {
const payment = await this.paymentService.processPayment(
event.orderId,
event.totalAmount
);
// 성공 이벤트 발행
await this.eventBus.publish(new PaymentCompleted(
payment.id, event.orderId, event.totalAmount
));
} catch (error) {
// 실패 이벤트 발행 (보상 트랜잭션 트리거)
await this.eventBus.publish(new PaymentFailed(
event.orderId, error.message
));
}
}
}
// 재고 서비스: PaymentCompleted 이벤트 처리
class InventoryEventHandler {
@EventHandler('payment.completed')
async onPaymentCompleted(event: PaymentCompleted): Promise<void> {
try {
const reservation = await this.inventoryService.reserveItems(
event.orderId
);
await this.eventBus.publish(new InventoryReserved(
reservation.id, event.orderId, reservation.items
));
} catch (error) {
// 보상: 결제 환불 트리거
await this.eventBus.publish(new InventoryReservationFailed(
event.orderId, error.message
));
}
}
}
// 주문 서비스: 보상 트랜잭션 처리
class OrderCompensationHandler {
@EventHandler('payment.failed')
async onPaymentFailed(event: PaymentFailed): Promise<void> {
await this.orderService.cancelOrder(event.orderId, 'Payment failed');
}
@EventHandler('inventory.reservation.failed')
async onInventoryFailed(event: InventoryReservationFailed): Promise<void> {
// 결제 환불 요청
await this.paymentService.refund(event.orderId);
await this.orderService.cancelOrder(event.orderId, 'Out of stock');
}
}
5.3 Orchestration Saga 구현
// Saga 정의
class OrderSaga {
private state: SagaState = SagaState.STARTED;
private orderId: string;
private paymentId?: string;
private reservationId?: string;
constructor(
private commandBus: CommandBus,
private sagaStore: SagaStore
) {}
async start(orderId: string): Promise<void> {
this.orderId = orderId;
this.state = SagaState.PAYMENT_PENDING;
await this.sagaStore.save(this);
// Step 1: 결제 요청
await this.commandBus.dispatch(new ProcessPaymentCommand(orderId));
}
async handlePaymentCompleted(event: PaymentCompleted): Promise<void> {
this.paymentId = event.paymentId;
this.state = SagaState.INVENTORY_PENDING;
await this.sagaStore.save(this);
// Step 2: 재고 예약 요청
await this.commandBus.dispatch(new ReserveInventoryCommand(
this.orderId, event.paymentId
));
}
async handlePaymentFailed(event: PaymentFailed): Promise<void> {
this.state = SagaState.COMPENSATING;
await this.sagaStore.save(this);
// 보상: 주문 취소
await this.commandBus.dispatch(new CancelOrderCommand(
this.orderId, 'Payment failed'
));
this.state = SagaState.COMPENSATED;
await this.sagaStore.save(this);
}
async handleInventoryReserved(event: InventoryReserved): Promise<void> {
this.reservationId = event.reservationId;
this.state = SagaState.SHIPPING_PENDING;
await this.sagaStore.save(this);
// Step 3: 배송 요청
await this.commandBus.dispatch(new CreateShipmentCommand(
this.orderId, this.reservationId
));
}
async handleInventoryFailed(event: InventoryReservationFailed): Promise<void> {
this.state = SagaState.COMPENSATING;
await this.sagaStore.save(this);
// 보상: 결제 환불 후 주문 취소
await this.commandBus.dispatch(new RefundPaymentCommand(
this.orderId, this.paymentId!
));
await this.commandBus.dispatch(new CancelOrderCommand(
this.orderId, 'Inventory unavailable'
));
this.state = SagaState.COMPENSATED;
await this.sagaStore.save(this);
}
async handleShipmentCreated(event: ShipmentCreated): Promise<void> {
this.state = SagaState.COMPLETED;
await this.sagaStore.save(this);
// 주문 확정
await this.commandBus.dispatch(new ConfirmOrderCommand(this.orderId));
}
}
6. Outbox 패턴
6.1 이중 쓰기 문제
데이터베이스 업데이트와 메시지 발행을 원자적으로 수행해야 합니다. Outbox 패턴은 이 문제를 해결합니다.
문제 시나리오:
1. DB에 주문 저장 ✓
2. Kafka에 이벤트 발행 ✗ ← 실패하면 데이터 불일치!
Outbox 패턴:
1. DB 트랜잭션 내에서:
- 주문 저장 ✓
- outbox 테이블에 이벤트 저장 ✓
2. 별도 프로세스가 outbox에서 읽어 Kafka에 발행
6.2 Outbox 테이블 구현
-- Outbox 테이블
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
published_at TIMESTAMP NULL,
retry_count INT DEFAULT 0,
last_error TEXT NULL
);
CREATE INDEX idx_outbox_unpublished ON outbox (created_at)
WHERE published_at IS NULL;
// Outbox 패턴 구현
class OrderService {
async placeOrder(command: PlaceOrderCommand): Promise<string> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 1. 주문 저장
const orderId = crypto.randomUUID();
await client.query(
'INSERT INTO orders (id, customer_id, status, total_amount) VALUES ($1, $2, $3, $4)',
[orderId, command.customerId, 'placed', command.totalAmount]
);
// 2. Outbox에 이벤트 저장 (같은 트랜잭션)
const event = new OrderPlaced(orderId, command.customerId, command.items, command.totalAmount);
await client.query(
`INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)`,
['Order', orderId, 'order.placed', JSON.stringify(event)]
);
await client.query('COMMIT');
return orderId;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
6.3 CDC with Debezium
# Debezium Connector 설정
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: outbox-connector
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: postgres
database.port: 5432
database.user: debezium
database.password: secret
database.dbname: orders
table.include.list: public.outbox
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.id: id
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.payload: payload
transforms.outbox.route.by.field: aggregate_type
transforms.outbox.route.topic.replacement: events.orders
6.4 Polling Publisher
Debezium 없이 간단한 폴링 방식으로도 구현할 수 있습니다.
class OutboxPollingPublisher {
private isRunning = false;
async start(): Promise<void> {
this.isRunning = true;
while (this.isRunning) {
try {
await this.publishPendingEvents();
} catch (error) {
console.error('Polling error:', error);
}
await this.sleep(1000); // 1초 간격
}
}
private async publishPendingEvents(): Promise<void> {
const client = await this.pool.connect();
try {
// 미발행 이벤트 조회 (잠금)
const result = await client.query(
`SELECT * FROM outbox
WHERE published_at IS NULL AND retry_count < 5
ORDER BY created_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED`
);
for (const row of result.rows) {
try {
// Kafka에 발행
await this.kafka.produce(
`events.${row.aggregate_type.toLowerCase()}`,
row.aggregate_id,
row.payload
);
// 발행 완료 표시
await client.query(
'UPDATE outbox SET published_at = NOW() WHERE id = $1',
[row.id]
);
} catch (error) {
// 실패 카운트 증가
await client.query(
'UPDATE outbox SET retry_count = retry_count + 1, last_error = $1 WHERE id = $2',
[error.message, row.id]
);
}
}
} finally {
client.release();
}
}
}
7. 멱등성 (Idempotency)
7.1 왜 멱등성이 중요한가
이벤트 드리븐 시스템에서는 메시지가 중복 전달될 수 있습니다 (at-least-once delivery). 같은 이벤트를 여러 번 처리해도 결과가 같도록 멱등성을 보장해야 합니다.
7.2 멱등성 키 구현
class IdempotencyStore {
async isProcessed(idempotencyKey: string): Promise<boolean> {
const result = await this.pool.query(
'SELECT 1 FROM processed_events WHERE idempotency_key = $1',
[idempotencyKey]
);
return result.rows.length > 0;
}
async markProcessed(
idempotencyKey: string,
result: unknown,
client: PoolClient
): Promise<void> {
await client.query(
`INSERT INTO processed_events (idempotency_key, result, processed_at)
VALUES ($1, $2, NOW())
ON CONFLICT (idempotency_key) DO NOTHING`,
[idempotencyKey, JSON.stringify(result)]
);
}
}
// 멱등성 보장 이벤트 핸들러
class IdempotentEventHandler<T extends DomainEvent> {
constructor(
private idempotencyStore: IdempotencyStore,
private innerHandler: EventHandler<T>
) {}
async handle(event: T): Promise<void> {
const key = `${event.eventType}:${event.eventId}`;
// 이미 처리된 이벤트인지 확인
if (await this.idempotencyStore.isProcessed(key)) {
console.log(`Event ${key} already processed, skipping`);
return;
}
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 이벤트 처리
await this.innerHandler.handle(event);
// 처리 완료 표시 (같은 트랜잭션)
await this.idempotencyStore.markProcessed(key, null, client);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
7.3 중복 제거 (Deduplication)
// Kafka Consumer 중복 제거
class DeduplicatingConsumer {
private recentEventIds: Set<string> = new Set();
private maxCacheSize = 10000;
async processMessage(message: KafkaMessage): Promise<void> {
const event = JSON.parse(message.value.toString());
const eventId = event.eventId;
// 메모리 캐시에서 중복 체크 (빠른 경로)
if (this.recentEventIds.has(eventId)) {
return; // 중복 - 스킵
}
// DB에서 중복 체크 (느린 경로)
if (await this.idempotencyStore.isProcessed(eventId)) {
this.recentEventIds.add(eventId);
return; // 중복 - 스킵
}
// 이벤트 처리
await this.handler.handle(event);
// 캐시 갱신
this.recentEventIds.add(eventId);
if (this.recentEventIds.size > this.maxCacheSize) {
const firstKey = this.recentEventIds.values().next().value;
this.recentEventIds.delete(firstKey);
}
}
}
8. Dead Letter Queue (DLQ)
8.1 DLQ 패턴
처리할 수 없는 메시지를 별도 큐에 보관하여 메인 처리 흐름을 막지 않습니다.
class EventConsumerWithDLQ {
private maxRetries = 3;
async processEvent(event: DomainEvent, retryCount = 0): Promise<void> {
try {
await this.handler.handle(event);
} catch (error) {
if (retryCount < this.maxRetries) {
// 재시도 (지수 백오프)
const delay = Math.pow(2, retryCount) * 1000;
await this.sleep(delay);
return this.processEvent(event, retryCount + 1);
}
// 최대 재시도 초과: DLQ로 이동
await this.sendToDLQ(event, error);
}
}
private async sendToDLQ(event: DomainEvent, error: Error): Promise<void> {
await this.kafka.produce('events.dlq', event.aggregateId, {
originalEvent: event,
error: {
message: error.message,
stack: error.stack,
},
failedAt: new Date().toISOString(),
retryCount: this.maxRetries,
});
// 알림 발송
await this.alertService.notify(
`Event processing failed after ${this.maxRetries} retries`,
{ eventId: event.eventId, eventType: event.eventType, error: error.message }
);
}
}
9. 이벤트 스키마 진화
9.1 Avro를 이용한 스키마 관리
{
"type": "record",
"name": "OrderPlaced",
"namespace": "com.ecommerce.orders",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "long"},
{"name": "currency", "type": "string", "default": "KRW"},
{"name": "couponCode", "type": ["null", "string"], "default": null}
]
}
9.2 스키마 호환성 규칙
| 호환성 타입 | 허용 변경 | 사용 사례 |
|---|---|---|
| BACKWARD | 새 스키마로 이전 데이터 읽기 | 컨슈머 먼저 업그레이드 |
| FORWARD | 이전 스키마로 새 데이터 읽기 | 프로듀서 먼저 업그레이드 |
| FULL | 양방향 호환 | 가장 안전, 권장 |
10. Kafka + TypeScript 실전 구현
10.1 Kafka Producer
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
retry: { retries: 5 },
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
idempotent: true,
maxInFlightRequests: 5,
transactionalId: 'order-producer-1',
});
async function publishOrderEvent(event: OrderPlaced): Promise<void> {
await producer.send({
topic: 'events.orders',
messages: [
{
key: event.orderId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'event-id': event.eventId,
'correlation-id': event.metadata.correlationId || '',
},
},
],
});
}
10.2 Kafka Consumer
const consumer = kafka.consumer({
groupId: 'payment-service-group',
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
async function startConsumer(): Promise<void> {
await consumer.connect();
await consumer.subscribe({
topics: ['events.orders'],
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
const eventType = message.headers?.['event-type']?.toString();
console.log(`Received: ${eventType} from ${topic}[${partition}]`);
try {
switch (eventType) {
case 'order.placed':
await paymentHandler.onOrderPlaced(event);
break;
case 'order.cancelled':
await paymentHandler.onOrderCancelled(event);
break;
default:
console.log(`Unknown event type: ${eventType}`);
}
} catch (error) {
console.error(`Failed to process event: ${error}`);
// DLQ 처리 또는 재시도
}
},
});
}
11. 이벤트 드리븐 시스템 테스트
11.1 단위 테스트
describe('Order Aggregate', () => {
it('should emit OrderPlaced event when creating order', () => {
const order = Order.create('ord-1', 'cust-1', [
{ productId: 'prod-1', quantity: 2, unitPrice: 10000 },
]);
const events = order.getUncommittedEvents();
expect(events).toHaveLength(1);
expect(events[0]).toBeInstanceOf(OrderPlaced);
expect((events[0] as OrderPlaced).totalAmount).toBe(20000);
});
it('should not allow cancellation of shipped order', () => {
const order = buildOrderInStatus(OrderStatus.SHIPPED);
expect(() => order.cancel('Changed mind'))
.toThrow('Cannot cancel shipped order');
});
});
11.2 통합 테스트
describe('Order Saga Integration', () => {
let kafkaContainer: StartedTestContainer;
let postgresContainer: StartedTestContainer;
beforeAll(async () => {
kafkaContainer = await new GenericContainer('confluentinc/cp-kafka:7.5.0')
.withExposedPorts(9092)
.start();
postgresContainer = await new GenericContainer('postgres:16')
.withExposedPorts(5432)
.start();
});
it('should complete order saga successfully', async () => {
// 1. 주문 생성
const orderId = await orderService.placeOrder({
customerId: 'cust-1',
items: [{ productId: 'prod-1', quantity: 1, unitPrice: 10000 }],
});
// 2. 이벤트 전파 대기
await waitForEvent('payment.completed', orderId, 5000);
await waitForEvent('inventory.reserved', orderId, 5000);
// 3. 결과 확인
const order = await orderQueryService.getOrderDetails(orderId);
expect(order.status).toBe('confirmed');
});
});
12. 안티패턴
12.1 이벤트 폭발 (Event Explosion)
너무 세분화된 이벤트는 시스템 복잡성을 증가시킵니다. 비즈니스 의미 단위로 이벤트를 설계하세요.
12.2 이벤트를 통한 강한 결합
이벤트에 너무 많은 데이터를 포함하면 소비자가 프로듀서의 내부 구조에 의존하게 됩니다. 이벤트는 공개 계약이므로 신중하게 설계해야 합니다.
12.3 동기적 이벤트 처리
이벤트 핸들러에서 다른 서비스를 동기적으로 호출하면 이벤트 드리븐의 장점이 사라집니다. 비동기 처리를 유지하세요.
12.4 멱등성 무시
"at-least-once" 전달 보장을 고려하지 않으면 중복 처리로 데이터 불일치가 발생합니다.
13. 퀴즈
Q1: Event Sourcing에서 스냅샷이 필요한 이유는 무엇인가요?
정답: 이벤트 수가 많아질수록 Aggregate 복원 시간이 길어지기 때문
Event Sourcing에서 Aggregate의 현재 상태를 얻으려면 모든 이벤트를 처음부터 재생해야 합니다. 이벤트가 수천, 수만 개가 되면 복원 시간이 급격히 증가합니다. 스냅샷은 특정 시점의 Aggregate 상태를 저장하여, 해당 시점 이후의 이벤트만 재생하면 되도록 최적화합니다.
Q2: Choreography Saga와 Orchestration Saga의 장단점은 무엇인가요?
정답:
Choreography: 장점 - 단순한 워크플로우에 적합, 서비스 간 결합도가 낮음, 단일 실패점 없음. 단점 - 복잡한 워크플로우에서 추적이 어려움, 사이클 의존성 발생 가능, 테스트가 어려움.
Orchestration: 장점 - 복잡한 워크플로우에 적합, 전체 흐름을 한곳에서 파악 가능, 테스트 용이. 단점 - 오케스트레이터가 단일 실패점, 오케스트레이터에 로직 집중 위험.
Q3: Outbox 패턴이 해결하는 "이중 쓰기 문제"란 무엇인가요?
정답: 데이터베이스와 메시지 브로커에 동시에 쓸 때 원자성을 보장할 수 없는 문제
DB에 데이터를 저장하고 Kafka에 이벤트를 발행하는 두 작업은 서로 다른 시스템이므로 하나의 트랜잭션으로 묶을 수 없습니다. DB 저장은 성공했지만 Kafka 발행이 실패하면 데이터 불일치가 발생합니다. Outbox 패턴은 이벤트를 같은 DB 트랜잭션 내의 outbox 테이블에 저장하고, 별도 프로세스가 이를 읽어 브로커에 발행함으로써 원자성을 보장합니다.
Q4: CQRS에서 "최종 일관성(Eventual Consistency)"의 의미와 대처 방법은?
정답: 쓰기 모델의 변경이 읽기 모델에 즉시 반영되지 않는 것
CQRS에서 Command 측의 변경사항은 이벤트를 통해 Query 측에 비동기적으로 전파됩니다. 따라서 쓰기 직후 읽기를 하면 이전 상태가 보일 수 있습니다. 대처 방법으로는 UI에서 낙관적 업데이트를 적용하거나, 특정 작업 후에는 Command 측에서 직접 조회하거나, 버전 기반 조건부 조회를 사용하는 방법이 있습니다.
Q5: 멱등성이 이벤트 드리븐 시스템에서 중요한 이유는?
정답: "at-least-once" 전달 보장으로 인해 동일 이벤트가 중복 전달될 수 있기 때문
대부분의 메시지 브로커는 "at-least-once" 전달을 보장합니다. 네트워크 문제, 소비자 재시작 등으로 같은 이벤트가 여러 번 전달될 수 있습니다. 멱등성이 보장되지 않으면 같은 결제가 두 번 처리되거나, 재고가 중복 차감되는 등의 문제가 발생합니다. 멱등성 키, 중복 제거, 조건부 업데이트 등으로 이를 방지합니다.
14. 참고 자료
- Martin Fowler - Event Sourcing - 이벤트 소싱 개념 정리
- Microsoft - CQRS Pattern - CQRS 패턴 가이드
- Microservices.io - Saga Pattern - Saga 패턴 상세 설명
- Debezium Documentation - CDC 도구 공식 문서
- KafkaJS Documentation - Node.js Kafka 클라이언트
- Apache Kafka Documentation - Kafka 공식 문서
- Confluent Schema Registry - 스키마 레지스트리
- EventStoreDB - 전용 이벤트 스토어 데이터베이스
- Axon Framework - Java CQRS/ES 프레임워크
- NestJS CQRS - NestJS CQRS 레시피
- Domain-Driven Design Reference - DDD 레퍼런스
- Microservices Patterns by Chris Richardson - 마이크로서비스 패턴 카탈로그
- RabbitMQ Documentation - RabbitMQ 공식 문서