Split View: 이벤트 드리븐 아키텍처 패턴 가이드 2025: CQRS, Event Sourcing, Saga 패턴까지
이벤트 드리븐 아키텍처 패턴 가이드 2025: CQRS, Event Sourcing, Saga 패턴까지
목차
1. 이벤트 드리븐 아키텍처 기본 개념
이벤트 드리븐 아키텍처(EDA)는 시스템의 컴포넌트가 이벤트를 통해 통신하는 소프트웨어 아키텍처 패턴입니다. 서비스 간 직접 호출 대신 이벤트를 발행하고 구독하는 방식으로, 느슨한 결합과 높은 확장성을 달성합니다.
1.1 핵심 개념
이벤트(Event): 시스템에서 발생한 의미 있는 사실(fact)입니다. 과거형으로 명명합니다. 예: OrderPlaced, PaymentCompleted, InventoryReserved.
이벤트 프로듀서(Producer): 이벤트를 생성하고 발행하는 컴포넌트입니다.
이벤트 컨슈머(Consumer): 이벤트를 구독하고 처리하는 컴포넌트입니다.
이벤트 브로커(Broker): 프로듀서와 컨슈머 사이에서 이벤트를 전달하는 인프라입니다. Kafka, RabbitMQ, AWS EventBridge 등이 있습니다.
1.2 이벤트 유형
이벤트 유형
├── Domain Event (도메인 이벤트)
│ └── 비즈니스 도메인에서 발생한 사실
│ 예: OrderPlaced, PaymentFailed
├── Integration Event (통합 이벤트)
│ └── 바운디드 컨텍스트 간 통신을 위한 이벤트
│ 예: OrderConfirmedIntegrationEvent
├── Event Notification (이벤트 알림)
│ └── 최소한의 데이터만 포함. 소비자가 필요시 조회
│ 예: OrderStatusChanged (orderId만 포함)
└── Event-Carried State Transfer (상태 전달 이벤트)
└── 전체 상태를 포함. 소비자가 별도 조회 불필요
예: OrderDetails (전체 주문 정보 포함)
1.3 Pub/Sub 패턴
// TypeScript - 이벤트 버스 인터페이스
interface EventBus {
publish<T extends DomainEvent>(event: T): Promise<void>;
subscribe<T extends DomainEvent>(
eventType: string,
handler: EventHandler<T>
): void;
unsubscribe(eventType: string, handler: EventHandler<unknown>): void;
}
interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: Date;
version: number;
metadata: Record<string, string>;
}
interface EventHandler<T extends DomainEvent> {
handle(event: T): Promise<void>;
}
2. 도메인 이벤트
2.1 도메인 이벤트 설계 원칙
좋은 도메인 이벤트는 다음 원칙을 따릅니다.
불변성(Immutable): 이벤트는 이미 발생한 사실이므로 변경할 수 없습니다.
과거형 명명: OrderPlaced (O), PlaceOrder (X). 이벤트는 이미 일어난 일입니다.
자기 서술적(Self-describing): 이벤트만으로 무슨 일이 있었는지 이해할 수 있어야 합니다.
비즈니스 의미: 기술적 구현이 아닌 비즈니스 관점에서 명명합니다.
2.2 도메인 이벤트 구현
// 기본 도메인 이벤트
abstract class BaseDomainEvent implements DomainEvent {
readonly eventId: string;
readonly occurredAt: Date;
readonly version: number;
abstract readonly eventType: string;
abstract readonly aggregateId: string;
metadata: Record<string, string> = {};
constructor() {
this.eventId = crypto.randomUUID();
this.occurredAt = new Date();
this.version = 1;
}
}
// 주문 생성 이벤트
class OrderPlaced extends BaseDomainEvent {
readonly eventType = 'order.placed';
readonly aggregateId: string;
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: OrderItem[],
public readonly totalAmount: Money,
public readonly shippingAddress: Address
) {
super();
this.aggregateId = orderId;
}
}
// 결제 완료 이벤트
class PaymentCompleted extends BaseDomainEvent {
readonly eventType = 'payment.completed';
readonly aggregateId: string;
constructor(
public readonly paymentId: string,
public readonly orderId: string,
public readonly amount: Money,
public readonly paymentMethod: string
) {
super();
this.aggregateId = paymentId;
}
}
// 재고 예약 이벤트
class InventoryReserved extends BaseDomainEvent {
readonly eventType = 'inventory.reserved';
readonly aggregateId: string;
constructor(
public readonly reservationId: string,
public readonly orderId: string,
public readonly items: ReservedItem[]
) {
super();
this.aggregateId = reservationId;
}
}
2.3 이벤트 스키마 설계
// 이벤트 스키마 (Avro-like)
const OrderPlacedSchema = {
type: 'record',
name: 'OrderPlaced',
namespace: 'com.ecommerce.orders.v1',
fields: [
{ name: 'eventId', type: 'string' },
{ name: 'eventType', type: 'string' },
{ name: 'aggregateId', type: 'string' },
{ name: 'occurredAt', type: 'string' }, // ISO 8601
{ name: 'version', type: 'int' },
{ name: 'orderId', type: 'string' },
{ name: 'customerId', type: 'string' },
{
name: 'items',
type: {
type: 'array',
items: {
type: 'record',
name: 'OrderItem',
fields: [
{ name: 'productId', type: 'string' },
{ name: 'quantity', type: 'int' },
{ name: 'unitPrice', type: 'long' },
],
},
},
},
{ name: 'totalAmount', type: 'long' },
{ name: 'currency', type: 'string' },
],
};
3. Event Sourcing
Event Sourcing은 애플리케이션의 상태를 이벤트의 시퀀스로 저장하는 패턴입니다. 현재 상태를 직접 저장하는 대신, 상태 변경의 이력을 모두 기록합니다.
3.1 기본 개념
전통적 방식 (CRUD):
┌─────────────────────────┐
│ orders 테이블 │
│ id: ORD-001 │
│ status: shipped │ ← 현재 상태만 저장
│ total: 50000 │
│ updated_at: 2025-03-24 │
└─────────────────────────┘
Event Sourcing 방식:
┌─────────────────────────────────────────┐
│ event_store │
│ 1. OrderPlaced (ORD-001, 2025-03-24) │
│ 2. PaymentReceived(ORD-001, 2025-03-24) │
│ 3. ItemPacked (ORD-001, 2025-03-24) │
│ 4. OrderShipped (ORD-001, 2025-03-24) │ ← 모든 이력 보존
└─────────────────────────────────────────┘
3.2 이벤트 스토어 구현
// 이벤트 스토어 인터페이스
interface EventStore {
// 이벤트 추가 (낙관적 동시성 제어)
append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void>;
// 스트림에서 이벤트 읽기
readStream(
streamId: string,
fromVersion?: number
): Promise<DomainEvent[]>;
// 전체 이벤트 구독
subscribe(
handler: (event: DomainEvent) => Promise<void>
): void;
}
// PostgreSQL 기반 이벤트 스토어
class PostgresEventStore implements EventStore {
async append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 낙관적 동시성 검사
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE stream_id = $1',
[streamId]
);
const currentVersion = result.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current is ${currentVersion}`
);
}
// 이벤트 저장
let version = expectedVersion;
for (const event of events) {
version++;
await client.query(
`INSERT INTO events (event_id, stream_id, event_type, data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.eventId,
streamId,
event.eventType,
JSON.stringify(event),
JSON.stringify(event.metadata),
version,
event.occurredAt,
]
);
}
await client.query('COMMIT');
// 이벤트 발행 (비동기)
for (const event of events) {
await this.eventBus.publish(event);
}
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async readStream(streamId: string, fromVersion = 0): Promise<DomainEvent[]> {
const result = await this.pool.query(
'SELECT data FROM events WHERE stream_id = $1 AND version > $2 ORDER BY version ASC',
[streamId, fromVersion]
);
return result.rows.map((row) => JSON.parse(row.data));
}
}
3.3 Aggregate와 Event Sourcing
// Event Sourced Aggregate
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = [];
private _version: number = 0;
get version(): number {
return this._version;
}
// 이벤트 적용 (상태 변경)
protected apply(event: DomainEvent): void {
this.when(event);
this.uncommittedEvents.push(event);
this._version++;
}
// 이벤트 핸들러 (하위 클래스에서 구현)
protected abstract when(event: DomainEvent): void;
// 이벤트로부터 상태 복원
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event);
this._version++;
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
}
// 주문 Aggregate
class Order extends EventSourcedAggregate {
private id: string = '';
private customerId: string = '';
private items: OrderItem[] = [];
private status: OrderStatus = OrderStatus.DRAFT;
private totalAmount: number = 0;
// 명령: 주문 생성
static create(
orderId: string,
customerId: string,
items: OrderItem[]
): Order {
const order = new Order();
const totalAmount = items.reduce(
(sum, item) => sum + item.unitPrice * item.quantity, 0
);
order.apply(new OrderPlaced(orderId, customerId, items, totalAmount));
return order;
}
// 명령: 결제 확인
confirmPayment(paymentId: string, amount: number): void {
if (this.status !== OrderStatus.PLACED) {
throw new Error('Order is not in placed status');
}
this.apply(new OrderPaymentConfirmed(this.id, paymentId, amount));
}
// 명령: 주문 취소
cancel(reason: string): void {
if (this.status === OrderStatus.SHIPPED) {
throw new Error('Cannot cancel shipped order');
}
this.apply(new OrderCancelled(this.id, reason));
}
// 이벤트 핸들러
protected when(event: DomainEvent): void {
if (event instanceof OrderPlaced) {
this.id = event.orderId;
this.customerId = event.customerId;
this.items = event.items;
this.totalAmount = event.totalAmount;
this.status = OrderStatus.PLACED;
} else if (event instanceof OrderPaymentConfirmed) {
this.status = OrderStatus.CONFIRMED;
} else if (event instanceof OrderCancelled) {
this.status = OrderStatus.CANCELLED;
}
}
}
3.4 스냅샷
이벤트가 많이 쌓이면 Aggregate 복원이 느려집니다. 스냅샷으로 이를 최적화합니다.
class SnapshotStore {
async saveSnapshot(
streamId: string,
snapshot: AggregateSnapshot
): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (stream_id, version, data, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (stream_id) DO UPDATE SET version = $2, data = $3, created_at = NOW()`,
[streamId, snapshot.version, JSON.stringify(snapshot.state)]
);
}
async getLatestSnapshot(streamId: string): Promise<AggregateSnapshot | null> {
const result = await this.pool.query(
'SELECT version, data FROM snapshots WHERE stream_id = $1',
[streamId]
);
if (result.rows.length === 0) return null;
return {
version: result.rows[0].version,
state: JSON.parse(result.rows[0].data),
};
}
}
// 스냅샷을 활용한 Aggregate 로딩
class OrderRepository {
async getById(orderId: string): Promise<Order> {
const order = new Order();
// 1. 스냅샷 로드
const snapshot = await this.snapshotStore.getLatestSnapshot(orderId);
let fromVersion = 0;
if (snapshot) {
order.restoreFromSnapshot(snapshot.state);
fromVersion = snapshot.version;
}
// 2. 스냅샷 이후의 이벤트만 로드
const events = await this.eventStore.readStream(orderId, fromVersion);
order.loadFromHistory(events);
// 3. 일정 이벤트 수 이후 스냅샷 생성
if (events.length > 50) {
await this.snapshotStore.saveSnapshot(orderId, {
version: order.version,
state: order.toSnapshot(),
});
}
return order;
}
}
3.5 프로젝션 (Read Model)
// 주문 조회 프로젝션
class OrderProjection {
async handle(event: DomainEvent): Promise<void> {
if (event instanceof OrderPlaced) {
await this.db.query(
`INSERT INTO order_read_model (id, customer_id, status, total_amount, created_at)
VALUES ($1, $2, $3, $4, $5)`,
[event.orderId, event.customerId, 'placed', event.totalAmount, event.occurredAt]
);
// 주문 아이템 프로젝션
for (const item of event.items) {
await this.db.query(
`INSERT INTO order_items_read_model (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)`,
[event.orderId, item.productId, item.quantity, item.unitPrice]
);
}
} else if (event instanceof OrderPaymentConfirmed) {
await this.db.query(
'UPDATE order_read_model SET status = $1, payment_id = $2, updated_at = $3 WHERE id = $4',
['confirmed', event.paymentId, event.occurredAt, event.orderId]
);
} else if (event instanceof OrderShipped) {
await this.db.query(
'UPDATE order_read_model SET status = $1, tracking_number = $2, shipped_at = $3 WHERE id = $4',
['shipped', event.trackingNumber, event.occurredAt, event.orderId]
);
}
}
}
4. CQRS (Command Query Responsibility Segregation)
CQRS는 쓰기(Command)와 읽기(Query) 모델을 분리하는 패턴입니다.
4.1 기본 구조
┌──────────────┐ Command ┌──────────────┐
│ Client │ ───────────────► │ Command Side │
│ │ │ (Write) │
│ │ Query │ │
│ │ ───────────────► │ │
└──────────────┘ └──────┬───────┘
│
Events│
▼
┌──────────────┐
│ Event Bus │
└──────┬───────┘
│
▼
┌──────────────┐ Query ┌──────────────┐
│ Client │ ◄─────────────── │ Query Side │
│ │ │ (Read) │
└──────────────┘ └──────────────┘
4.2 Command Side 구현
// Command 정의
interface Command {
commandId: string;
timestamp: Date;
}
class PlaceOrderCommand implements Command {
commandId: string;
timestamp: Date;
constructor(
public readonly customerId: string,
public readonly items: OrderItemDto[],
public readonly shippingAddress: AddressDto
) {
this.commandId = crypto.randomUUID();
this.timestamp = new Date();
}
}
// Command Handler
class PlaceOrderHandler {
constructor(
private orderRepository: OrderRepository,
private inventoryService: InventoryService,
private pricingService: PricingService
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// 1. 재고 확인
await this.inventoryService.checkAvailability(command.items);
// 2. 가격 계산
const pricedItems = await this.pricingService.calculatePrices(command.items);
// 3. 주문 Aggregate 생성
const orderId = crypto.randomUUID();
const order = Order.create(orderId, command.customerId, pricedItems);
// 4. 이벤트 스토어에 저장
await this.orderRepository.save(order);
return orderId;
}
}
// Command Bus
class CommandBus {
private handlers: Map<string, CommandHandler<unknown>> = new Map();
register<T extends Command>(
commandType: string,
handler: CommandHandler<T>
): void {
this.handlers.set(commandType, handler);
}
async dispatch<T extends Command>(command: T): Promise<unknown> {
const handler = this.handlers.get(command.constructor.name);
if (!handler) {
throw new Error(`No handler for ${command.constructor.name}`);
}
return handler.handle(command);
}
}
4.3 Query Side 구현
// Query 정의
class GetOrderDetailsQuery {
constructor(public readonly orderId: string) {}
}
class ListCustomerOrdersQuery {
constructor(
public readonly customerId: string,
public readonly page: number = 1,
public readonly pageSize: number = 20
) {}
}
// Query Handler
class GetOrderDetailsHandler {
constructor(private readDb: Pool) {}
async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsDto> {
const result = await this.readDb.query(
`SELECT o.*, json_agg(i.*) as items
FROM order_read_model o
JOIN order_items_read_model i ON o.id = i.order_id
WHERE o.id = $1
GROUP BY o.id`,
[query.orderId]
);
if (result.rows.length === 0) {
throw new NotFoundError(`Order ${query.orderId} not found`);
}
return this.mapToDto(result.rows[0]);
}
}
// 최종 일관성 (Eventual Consistency)
// 읽기 모델은 이벤트 처리 후 업데이트되므로 약간의 지연이 있을 수 있음
class OrderQueryService {
async getOrderWithConsistencyCheck(
orderId: string,
expectedVersion?: number
): Promise<OrderDetailsDto> {
const order = await this.getOrderDetails(orderId);
// 특정 버전이 요구되면 확인
if (expectedVersion && order.version < expectedVersion) {
// 프로젝션이 아직 따라잡지 못한 경우
// 잠시 대기 후 재시도 또는 Command 측에서 직접 조회
await this.waitForProjection(orderId, expectedVersion);
return this.getOrderDetails(orderId);
}
return order;
}
}
5. Saga 패턴
Saga 패턴은 분산 트랜잭션을 로컬 트랜잭션의 시퀀스로 관리합니다.
5.1 Choreography vs Orchestration
Choreography (안무 방식):
서비스들이 이벤트를 통해 자율적으로 협력
OrderService ──OrderPlaced──► PaymentService
│
PaymentCompleted
│
▼
InventoryService
│
InventoryReserved
│
▼
ShippingService
Orchestration (오케스트레이션 방식):
중앙 오케스트레이터가 전체 흐름을 조율
┌──────────────────┐
│ OrderSaga │
│ (Orchestrator) │
└──────┬───────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
PaymentService InventoryService ShippingService
5.2 Choreography Saga 구현
// 주문 서비스: 이벤트 발행
class OrderService {
async placeOrder(command: PlaceOrderCommand): Promise<string> {
const order = Order.create(command);
await this.orderRepository.save(order);
// OrderPlaced 이벤트가 자동 발행됨
return order.id;
}
}
// 결제 서비스: OrderPlaced 이벤트 처리
class PaymentEventHandler {
@EventHandler('order.placed')
async onOrderPlaced(event: OrderPlaced): Promise<void> {
try {
const payment = await this.paymentService.processPayment(
event.orderId,
event.totalAmount
);
// 성공 이벤트 발행
await this.eventBus.publish(new PaymentCompleted(
payment.id, event.orderId, event.totalAmount
));
} catch (error) {
// 실패 이벤트 발행 (보상 트랜잭션 트리거)
await this.eventBus.publish(new PaymentFailed(
event.orderId, error.message
));
}
}
}
// 재고 서비스: PaymentCompleted 이벤트 처리
class InventoryEventHandler {
@EventHandler('payment.completed')
async onPaymentCompleted(event: PaymentCompleted): Promise<void> {
try {
const reservation = await this.inventoryService.reserveItems(
event.orderId
);
await this.eventBus.publish(new InventoryReserved(
reservation.id, event.orderId, reservation.items
));
} catch (error) {
// 보상: 결제 환불 트리거
await this.eventBus.publish(new InventoryReservationFailed(
event.orderId, error.message
));
}
}
}
// 주문 서비스: 보상 트랜잭션 처리
class OrderCompensationHandler {
@EventHandler('payment.failed')
async onPaymentFailed(event: PaymentFailed): Promise<void> {
await this.orderService.cancelOrder(event.orderId, 'Payment failed');
}
@EventHandler('inventory.reservation.failed')
async onInventoryFailed(event: InventoryReservationFailed): Promise<void> {
// 결제 환불 요청
await this.paymentService.refund(event.orderId);
await this.orderService.cancelOrder(event.orderId, 'Out of stock');
}
}
5.3 Orchestration Saga 구현
// Saga 정의
class OrderSaga {
private state: SagaState = SagaState.STARTED;
private orderId: string;
private paymentId?: string;
private reservationId?: string;
constructor(
private commandBus: CommandBus,
private sagaStore: SagaStore
) {}
async start(orderId: string): Promise<void> {
this.orderId = orderId;
this.state = SagaState.PAYMENT_PENDING;
await this.sagaStore.save(this);
// Step 1: 결제 요청
await this.commandBus.dispatch(new ProcessPaymentCommand(orderId));
}
async handlePaymentCompleted(event: PaymentCompleted): Promise<void> {
this.paymentId = event.paymentId;
this.state = SagaState.INVENTORY_PENDING;
await this.sagaStore.save(this);
// Step 2: 재고 예약 요청
await this.commandBus.dispatch(new ReserveInventoryCommand(
this.orderId, event.paymentId
));
}
async handlePaymentFailed(event: PaymentFailed): Promise<void> {
this.state = SagaState.COMPENSATING;
await this.sagaStore.save(this);
// 보상: 주문 취소
await this.commandBus.dispatch(new CancelOrderCommand(
this.orderId, 'Payment failed'
));
this.state = SagaState.COMPENSATED;
await this.sagaStore.save(this);
}
async handleInventoryReserved(event: InventoryReserved): Promise<void> {
this.reservationId = event.reservationId;
this.state = SagaState.SHIPPING_PENDING;
await this.sagaStore.save(this);
// Step 3: 배송 요청
await this.commandBus.dispatch(new CreateShipmentCommand(
this.orderId, this.reservationId
));
}
async handleInventoryFailed(event: InventoryReservationFailed): Promise<void> {
this.state = SagaState.COMPENSATING;
await this.sagaStore.save(this);
// 보상: 결제 환불 후 주문 취소
await this.commandBus.dispatch(new RefundPaymentCommand(
this.orderId, this.paymentId!
));
await this.commandBus.dispatch(new CancelOrderCommand(
this.orderId, 'Inventory unavailable'
));
this.state = SagaState.COMPENSATED;
await this.sagaStore.save(this);
}
async handleShipmentCreated(event: ShipmentCreated): Promise<void> {
this.state = SagaState.COMPLETED;
await this.sagaStore.save(this);
// 주문 확정
await this.commandBus.dispatch(new ConfirmOrderCommand(this.orderId));
}
}
6. Outbox 패턴
6.1 이중 쓰기 문제
데이터베이스 업데이트와 메시지 발행을 원자적으로 수행해야 합니다. Outbox 패턴은 이 문제를 해결합니다.
문제 시나리오:
1. DB에 주문 저장 ✓
2. Kafka에 이벤트 발행 ✗ ← 실패하면 데이터 불일치!
Outbox 패턴:
1. DB 트랜잭션 내에서:
- 주문 저장 ✓
- outbox 테이블에 이벤트 저장 ✓
2. 별도 프로세스가 outbox에서 읽어 Kafka에 발행
6.2 Outbox 테이블 구현
-- Outbox 테이블
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
published_at TIMESTAMP NULL,
retry_count INT DEFAULT 0,
last_error TEXT NULL
);
CREATE INDEX idx_outbox_unpublished ON outbox (created_at)
WHERE published_at IS NULL;
// Outbox 패턴 구현
class OrderService {
async placeOrder(command: PlaceOrderCommand): Promise<string> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 1. 주문 저장
const orderId = crypto.randomUUID();
await client.query(
'INSERT INTO orders (id, customer_id, status, total_amount) VALUES ($1, $2, $3, $4)',
[orderId, command.customerId, 'placed', command.totalAmount]
);
// 2. Outbox에 이벤트 저장 (같은 트랜잭션)
const event = new OrderPlaced(orderId, command.customerId, command.items, command.totalAmount);
await client.query(
`INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)`,
['Order', orderId, 'order.placed', JSON.stringify(event)]
);
await client.query('COMMIT');
return orderId;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
6.3 CDC with Debezium
# Debezium Connector 설정
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: outbox-connector
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: postgres
database.port: 5432
database.user: debezium
database.password: secret
database.dbname: orders
table.include.list: public.outbox
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.id: id
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.payload: payload
transforms.outbox.route.by.field: aggregate_type
transforms.outbox.route.topic.replacement: events.orders
6.4 Polling Publisher
Debezium 없이 간단한 폴링 방식으로도 구현할 수 있습니다.
class OutboxPollingPublisher {
private isRunning = false;
async start(): Promise<void> {
this.isRunning = true;
while (this.isRunning) {
try {
await this.publishPendingEvents();
} catch (error) {
console.error('Polling error:', error);
}
await this.sleep(1000); // 1초 간격
}
}
private async publishPendingEvents(): Promise<void> {
const client = await this.pool.connect();
try {
// 미발행 이벤트 조회 (잠금)
const result = await client.query(
`SELECT * FROM outbox
WHERE published_at IS NULL AND retry_count < 5
ORDER BY created_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED`
);
for (const row of result.rows) {
try {
// Kafka에 발행
await this.kafka.produce(
`events.${row.aggregate_type.toLowerCase()}`,
row.aggregate_id,
row.payload
);
// 발행 완료 표시
await client.query(
'UPDATE outbox SET published_at = NOW() WHERE id = $1',
[row.id]
);
} catch (error) {
// 실패 카운트 증가
await client.query(
'UPDATE outbox SET retry_count = retry_count + 1, last_error = $1 WHERE id = $2',
[error.message, row.id]
);
}
}
} finally {
client.release();
}
}
}
7. 멱등성 (Idempotency)
7.1 왜 멱등성이 중요한가
이벤트 드리븐 시스템에서는 메시지가 중복 전달될 수 있습니다 (at-least-once delivery). 같은 이벤트를 여러 번 처리해도 결과가 같도록 멱등성을 보장해야 합니다.
7.2 멱등성 키 구현
class IdempotencyStore {
async isProcessed(idempotencyKey: string): Promise<boolean> {
const result = await this.pool.query(
'SELECT 1 FROM processed_events WHERE idempotency_key = $1',
[idempotencyKey]
);
return result.rows.length > 0;
}
async markProcessed(
idempotencyKey: string,
result: unknown,
client: PoolClient
): Promise<void> {
await client.query(
`INSERT INTO processed_events (idempotency_key, result, processed_at)
VALUES ($1, $2, NOW())
ON CONFLICT (idempotency_key) DO NOTHING`,
[idempotencyKey, JSON.stringify(result)]
);
}
}
// 멱등성 보장 이벤트 핸들러
class IdempotentEventHandler<T extends DomainEvent> {
constructor(
private idempotencyStore: IdempotencyStore,
private innerHandler: EventHandler<T>
) {}
async handle(event: T): Promise<void> {
const key = `${event.eventType}:${event.eventId}`;
// 이미 처리된 이벤트인지 확인
if (await this.idempotencyStore.isProcessed(key)) {
console.log(`Event ${key} already processed, skipping`);
return;
}
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// 이벤트 처리
await this.innerHandler.handle(event);
// 처리 완료 표시 (같은 트랜잭션)
await this.idempotencyStore.markProcessed(key, null, client);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
7.3 중복 제거 (Deduplication)
// Kafka Consumer 중복 제거
class DeduplicatingConsumer {
private recentEventIds: Set<string> = new Set();
private maxCacheSize = 10000;
async processMessage(message: KafkaMessage): Promise<void> {
const event = JSON.parse(message.value.toString());
const eventId = event.eventId;
// 메모리 캐시에서 중복 체크 (빠른 경로)
if (this.recentEventIds.has(eventId)) {
return; // 중복 - 스킵
}
// DB에서 중복 체크 (느린 경로)
if (await this.idempotencyStore.isProcessed(eventId)) {
this.recentEventIds.add(eventId);
return; // 중복 - 스킵
}
// 이벤트 처리
await this.handler.handle(event);
// 캐시 갱신
this.recentEventIds.add(eventId);
if (this.recentEventIds.size > this.maxCacheSize) {
const firstKey = this.recentEventIds.values().next().value;
this.recentEventIds.delete(firstKey);
}
}
}
8. Dead Letter Queue (DLQ)
8.1 DLQ 패턴
처리할 수 없는 메시지를 별도 큐에 보관하여 메인 처리 흐름을 막지 않습니다.
class EventConsumerWithDLQ {
private maxRetries = 3;
async processEvent(event: DomainEvent, retryCount = 0): Promise<void> {
try {
await this.handler.handle(event);
} catch (error) {
if (retryCount < this.maxRetries) {
// 재시도 (지수 백오프)
const delay = Math.pow(2, retryCount) * 1000;
await this.sleep(delay);
return this.processEvent(event, retryCount + 1);
}
// 최대 재시도 초과: DLQ로 이동
await this.sendToDLQ(event, error);
}
}
private async sendToDLQ(event: DomainEvent, error: Error): Promise<void> {
await this.kafka.produce('events.dlq', event.aggregateId, {
originalEvent: event,
error: {
message: error.message,
stack: error.stack,
},
failedAt: new Date().toISOString(),
retryCount: this.maxRetries,
});
// 알림 발송
await this.alertService.notify(
`Event processing failed after ${this.maxRetries} retries`,
{ eventId: event.eventId, eventType: event.eventType, error: error.message }
);
}
}
9. 이벤트 스키마 진화
9.1 Avro를 이용한 스키마 관리
{
"type": "record",
"name": "OrderPlaced",
"namespace": "com.ecommerce.orders",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "long"},
{"name": "currency", "type": "string", "default": "KRW"},
{"name": "couponCode", "type": ["null", "string"], "default": null}
]
}
9.2 스키마 호환성 규칙
| 호환성 타입 | 허용 변경 | 사용 사례 |
|---|---|---|
| BACKWARD | 새 스키마로 이전 데이터 읽기 | 컨슈머 먼저 업그레이드 |
| FORWARD | 이전 스키마로 새 데이터 읽기 | 프로듀서 먼저 업그레이드 |
| FULL | 양방향 호환 | 가장 안전, 권장 |
10. Kafka + TypeScript 실전 구현
10.1 Kafka Producer
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
retry: { retries: 5 },
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
idempotent: true,
maxInFlightRequests: 5,
transactionalId: 'order-producer-1',
});
async function publishOrderEvent(event: OrderPlaced): Promise<void> {
await producer.send({
topic: 'events.orders',
messages: [
{
key: event.orderId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'event-id': event.eventId,
'correlation-id': event.metadata.correlationId || '',
},
},
],
});
}
10.2 Kafka Consumer
const consumer = kafka.consumer({
groupId: 'payment-service-group',
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
async function startConsumer(): Promise<void> {
await consumer.connect();
await consumer.subscribe({
topics: ['events.orders'],
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
const eventType = message.headers?.['event-type']?.toString();
console.log(`Received: ${eventType} from ${topic}[${partition}]`);
try {
switch (eventType) {
case 'order.placed':
await paymentHandler.onOrderPlaced(event);
break;
case 'order.cancelled':
await paymentHandler.onOrderCancelled(event);
break;
default:
console.log(`Unknown event type: ${eventType}`);
}
} catch (error) {
console.error(`Failed to process event: ${error}`);
// DLQ 처리 또는 재시도
}
},
});
}
11. 이벤트 드리븐 시스템 테스트
11.1 단위 테스트
describe('Order Aggregate', () => {
it('should emit OrderPlaced event when creating order', () => {
const order = Order.create('ord-1', 'cust-1', [
{ productId: 'prod-1', quantity: 2, unitPrice: 10000 },
]);
const events = order.getUncommittedEvents();
expect(events).toHaveLength(1);
expect(events[0]).toBeInstanceOf(OrderPlaced);
expect((events[0] as OrderPlaced).totalAmount).toBe(20000);
});
it('should not allow cancellation of shipped order', () => {
const order = buildOrderInStatus(OrderStatus.SHIPPED);
expect(() => order.cancel('Changed mind'))
.toThrow('Cannot cancel shipped order');
});
});
11.2 통합 테스트
describe('Order Saga Integration', () => {
let kafkaContainer: StartedTestContainer;
let postgresContainer: StartedTestContainer;
beforeAll(async () => {
kafkaContainer = await new GenericContainer('confluentinc/cp-kafka:7.5.0')
.withExposedPorts(9092)
.start();
postgresContainer = await new GenericContainer('postgres:16')
.withExposedPorts(5432)
.start();
});
it('should complete order saga successfully', async () => {
// 1. 주문 생성
const orderId = await orderService.placeOrder({
customerId: 'cust-1',
items: [{ productId: 'prod-1', quantity: 1, unitPrice: 10000 }],
});
// 2. 이벤트 전파 대기
await waitForEvent('payment.completed', orderId, 5000);
await waitForEvent('inventory.reserved', orderId, 5000);
// 3. 결과 확인
const order = await orderQueryService.getOrderDetails(orderId);
expect(order.status).toBe('confirmed');
});
});
12. 안티패턴
12.1 이벤트 폭발 (Event Explosion)
너무 세분화된 이벤트는 시스템 복잡성을 증가시킵니다. 비즈니스 의미 단위로 이벤트를 설계하세요.
12.2 이벤트를 통한 강한 결합
이벤트에 너무 많은 데이터를 포함하면 소비자가 프로듀서의 내부 구조에 의존하게 됩니다. 이벤트는 공개 계약이므로 신중하게 설계해야 합니다.
12.3 동기적 이벤트 처리
이벤트 핸들러에서 다른 서비스를 동기적으로 호출하면 이벤트 드리븐의 장점이 사라집니다. 비동기 처리를 유지하세요.
12.4 멱등성 무시
"at-least-once" 전달 보장을 고려하지 않으면 중복 처리로 데이터 불일치가 발생합니다.
13. 퀴즈
Q1: Event Sourcing에서 스냅샷이 필요한 이유는 무엇인가요?
정답: 이벤트 수가 많아질수록 Aggregate 복원 시간이 길어지기 때문
Event Sourcing에서 Aggregate의 현재 상태를 얻으려면 모든 이벤트를 처음부터 재생해야 합니다. 이벤트가 수천, 수만 개가 되면 복원 시간이 급격히 증가합니다. 스냅샷은 특정 시점의 Aggregate 상태를 저장하여, 해당 시점 이후의 이벤트만 재생하면 되도록 최적화합니다.
Q2: Choreography Saga와 Orchestration Saga의 장단점은 무엇인가요?
정답:
Choreography: 장점 - 단순한 워크플로우에 적합, 서비스 간 결합도가 낮음, 단일 실패점 없음. 단점 - 복잡한 워크플로우에서 추적이 어려움, 사이클 의존성 발생 가능, 테스트가 어려움.
Orchestration: 장점 - 복잡한 워크플로우에 적합, 전체 흐름을 한곳에서 파악 가능, 테스트 용이. 단점 - 오케스트레이터가 단일 실패점, 오케스트레이터에 로직 집중 위험.
Q3: Outbox 패턴이 해결하는 "이중 쓰기 문제"란 무엇인가요?
정답: 데이터베이스와 메시지 브로커에 동시에 쓸 때 원자성을 보장할 수 없는 문제
DB에 데이터를 저장하고 Kafka에 이벤트를 발행하는 두 작업은 서로 다른 시스템이므로 하나의 트랜잭션으로 묶을 수 없습니다. DB 저장은 성공했지만 Kafka 발행이 실패하면 데이터 불일치가 발생합니다. Outbox 패턴은 이벤트를 같은 DB 트랜잭션 내의 outbox 테이블에 저장하고, 별도 프로세스가 이를 읽어 브로커에 발행함으로써 원자성을 보장합니다.
Q4: CQRS에서 "최종 일관성(Eventual Consistency)"의 의미와 대처 방법은?
정답: 쓰기 모델의 변경이 읽기 모델에 즉시 반영되지 않는 것
CQRS에서 Command 측의 변경사항은 이벤트를 통해 Query 측에 비동기적으로 전파됩니다. 따라서 쓰기 직후 읽기를 하면 이전 상태가 보일 수 있습니다. 대처 방법으로는 UI에서 낙관적 업데이트를 적용하거나, 특정 작업 후에는 Command 측에서 직접 조회하거나, 버전 기반 조건부 조회를 사용하는 방법이 있습니다.
Q5: 멱등성이 이벤트 드리븐 시스템에서 중요한 이유는?
정답: "at-least-once" 전달 보장으로 인해 동일 이벤트가 중복 전달될 수 있기 때문
대부분의 메시지 브로커는 "at-least-once" 전달을 보장합니다. 네트워크 문제, 소비자 재시작 등으로 같은 이벤트가 여러 번 전달될 수 있습니다. 멱등성이 보장되지 않으면 같은 결제가 두 번 처리되거나, 재고가 중복 차감되는 등의 문제가 발생합니다. 멱등성 키, 중복 제거, 조건부 업데이트 등으로 이를 방지합니다.
14. 참고 자료
- Martin Fowler - Event Sourcing - 이벤트 소싱 개념 정리
- Microsoft - CQRS Pattern - CQRS 패턴 가이드
- Microservices.io - Saga Pattern - Saga 패턴 상세 설명
- Debezium Documentation - CDC 도구 공식 문서
- KafkaJS Documentation - Node.js Kafka 클라이언트
- Apache Kafka Documentation - Kafka 공식 문서
- Confluent Schema Registry - 스키마 레지스트리
- EventStoreDB - 전용 이벤트 스토어 데이터베이스
- Axon Framework - Java CQRS/ES 프레임워크
- NestJS CQRS - NestJS CQRS 레시피
- Domain-Driven Design Reference - DDD 레퍼런스
- Microservices Patterns by Chris Richardson - 마이크로서비스 패턴 카탈로그
- RabbitMQ Documentation - RabbitMQ 공식 문서
Event-Driven Architecture Patterns Guide 2025: CQRS, Event Sourcing, Saga Pattern
Table of Contents
1. Event-Driven Architecture Fundamentals
Event-Driven Architecture (EDA) is a software architecture pattern where system components communicate through events. Instead of direct service-to-service calls, services publish and subscribe to events, achieving loose coupling and high scalability.
1.1 Core Concepts
Event: A meaningful fact that occurred in the system. Named in past tense. Examples: OrderPlaced, PaymentCompleted, InventoryReserved.
Event Producer: A component that creates and publishes events.
Event Consumer: A component that subscribes to and processes events.
Event Broker: Infrastructure that delivers events between producers and consumers. Examples include Kafka, RabbitMQ, and AWS EventBridge.
1.2 Event Types
Event Types
├── Domain Event
│ └── A fact that occurred in the business domain
│ Example: OrderPlaced, PaymentFailed
├── Integration Event
│ └── Events for cross-bounded context communication
│ Example: OrderConfirmedIntegrationEvent
├── Event Notification
│ └── Contains minimal data. Consumer queries if needed
│ Example: OrderStatusChanged (contains only orderId)
└── Event-Carried State Transfer
└── Contains full state. No additional query needed
Example: OrderDetails (includes full order info)
1.3 Pub/Sub Pattern
// TypeScript - Event Bus Interface
interface EventBus {
publish<T extends DomainEvent>(event: T): Promise<void>;
subscribe<T extends DomainEvent>(
eventType: string,
handler: EventHandler<T>
): void;
unsubscribe(eventType: string, handler: EventHandler<unknown>): void;
}
interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
occurredAt: Date;
version: number;
metadata: Record<string, string>;
}
interface EventHandler<T extends DomainEvent> {
handle(event: T): Promise<void>;
}
2. Domain Events
2.1 Domain Event Design Principles
Good domain events follow these principles.
Immutable: Events represent facts that already happened, so they cannot be changed.
Past Tense Naming: OrderPlaced (O), PlaceOrder (X). Events describe what already happened.
Self-describing: The event alone should convey what happened.
Business Meaning: Name from a business perspective, not technical implementation.
2.2 Domain Event Implementation
// Base Domain Event
abstract class BaseDomainEvent implements DomainEvent {
readonly eventId: string;
readonly occurredAt: Date;
readonly version: number;
abstract readonly eventType: string;
abstract readonly aggregateId: string;
metadata: Record<string, string> = {};
constructor() {
this.eventId = crypto.randomUUID();
this.occurredAt = new Date();
this.version = 1;
}
}
// Order Placed Event
class OrderPlaced extends BaseDomainEvent {
readonly eventType = 'order.placed';
readonly aggregateId: string;
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: OrderItem[],
public readonly totalAmount: Money,
public readonly shippingAddress: Address
) {
super();
this.aggregateId = orderId;
}
}
// Payment Completed Event
class PaymentCompleted extends BaseDomainEvent {
readonly eventType = 'payment.completed';
readonly aggregateId: string;
constructor(
public readonly paymentId: string,
public readonly orderId: string,
public readonly amount: Money,
public readonly paymentMethod: string
) {
super();
this.aggregateId = paymentId;
}
}
// Inventory Reserved Event
class InventoryReserved extends BaseDomainEvent {
readonly eventType = 'inventory.reserved';
readonly aggregateId: string;
constructor(
public readonly reservationId: string,
public readonly orderId: string,
public readonly items: ReservedItem[]
) {
super();
this.aggregateId = reservationId;
}
}
2.3 Event Schema Design
// Event Schema (Avro-like)
const OrderPlacedSchema = {
type: 'record',
name: 'OrderPlaced',
namespace: 'com.ecommerce.orders.v1',
fields: [
{ name: 'eventId', type: 'string' },
{ name: 'eventType', type: 'string' },
{ name: 'aggregateId', type: 'string' },
{ name: 'occurredAt', type: 'string' }, // ISO 8601
{ name: 'version', type: 'int' },
{ name: 'orderId', type: 'string' },
{ name: 'customerId', type: 'string' },
{
name: 'items',
type: {
type: 'array',
items: {
type: 'record',
name: 'OrderItem',
fields: [
{ name: 'productId', type: 'string' },
{ name: 'quantity', type: 'int' },
{ name: 'unitPrice', type: 'long' },
],
},
},
},
{ name: 'totalAmount', type: 'long' },
{ name: 'currency', type: 'string' },
],
};
3. Event Sourcing
Event Sourcing stores application state as a sequence of events. Instead of storing current state directly, it records the entire history of state changes.
3.1 Core Concept
Traditional Approach (CRUD):
┌─────────────────────────┐
│ orders table │
│ id: ORD-001 │
│ status: shipped │ ← Only current state stored
│ total: 50000 │
│ updated_at: 2025-03-24 │
└─────────────────────────┘
Event Sourcing Approach:
┌─────────────────────────────────────────┐
│ event_store │
│ 1. OrderPlaced (ORD-001, 2025-03-24) │
│ 2. PaymentReceived(ORD-001, 2025-03-24) │
│ 3. ItemPacked (ORD-001, 2025-03-24) │
│ 4. OrderShipped (ORD-001, 2025-03-24) │ ← Full history preserved
└─────────────────────────────────────────┘
3.2 Event Store Implementation
// Event Store Interface
interface EventStore {
// Append events (optimistic concurrency control)
append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void>;
// Read events from stream
readStream(
streamId: string,
fromVersion?: number
): Promise<DomainEvent[]>;
// Subscribe to all events
subscribe(
handler: (event: DomainEvent) => Promise<void>
): void;
}
// PostgreSQL-based Event Store
class PostgresEventStore implements EventStore {
async append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Optimistic concurrency check
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE stream_id = $1',
[streamId]
);
const currentVersion = result.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but current is ${currentVersion}`
);
}
// Store events
let version = expectedVersion;
for (const event of events) {
version++;
await client.query(
`INSERT INTO events (stream_id, version, event_type, data, metadata, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())`,
[streamId, version, event.eventType, JSON.stringify(event), JSON.stringify(event.metadata)]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async readStream(streamId: string, fromVersion: number = 0): Promise<DomainEvent[]> {
const result = await this.pool.query(
'SELECT data FROM events WHERE stream_id = $1 AND version > $2 ORDER BY version ASC',
[streamId, fromVersion]
);
return result.rows.map(row => JSON.parse(row.data));
}
}
3.3 Aggregate with Event Sourcing
// Event Sourced Aggregate base class
abstract class EventSourcedAggregate {
private uncommittedEvents: DomainEvent[] = [];
version: number = 0;
protected apply(event: DomainEvent): void {
this.when(event);
this.version++;
this.uncommittedEvents.push(event);
}
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.when(event);
this.version++;
}
}
getUncommittedEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
protected abstract when(event: DomainEvent): void;
}
// Order Aggregate
class Order extends EventSourcedAggregate {
private id!: string;
private customerId!: string;
private items!: OrderItem[];
private totalAmount!: Money;
private status!: OrderStatus;
// Command: Place Order
static place(orderId: string, customerId: string, items: OrderItem[], total: Money): Order {
const order = new Order();
order.apply(new OrderPlaced(orderId, customerId, items, total, new Address()));
return order;
}
// Command: Confirm Payment
confirmPayment(paymentId: string): void {
if (this.status !== OrderStatus.PLACED) {
throw new InvalidOperationError('Can only confirm payment for placed orders');
}
this.apply(new OrderPaymentConfirmed(this.id, paymentId));
}
// Command: Cancel Order
cancel(reason: string): void {
if (this.status === OrderStatus.CANCELLED) {
throw new InvalidOperationError('Order is already cancelled');
}
this.apply(new OrderCancelled(this.id, reason));
}
// Apply events to state
protected when(event: DomainEvent): void {
if (event instanceof OrderPlaced) {
this.id = event.orderId;
this.customerId = event.customerId;
this.items = event.items;
this.totalAmount = event.totalAmount;
this.status = OrderStatus.PLACED;
} else if (event instanceof OrderPaymentConfirmed) {
this.status = OrderStatus.CONFIRMED;
} else if (event instanceof OrderCancelled) {
this.status = OrderStatus.CANCELLED;
}
}
}
3.4 Snapshots
As events accumulate, Aggregate restoration becomes slow. Snapshots optimize this.
class SnapshotStore {
async saveSnapshot(
streamId: string,
snapshot: AggregateSnapshot
): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (stream_id, version, data, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (stream_id) DO UPDATE SET version = $2, data = $3, created_at = NOW()`,
[streamId, snapshot.version, JSON.stringify(snapshot.state)]
);
}
async getLatestSnapshot(streamId: string): Promise<AggregateSnapshot | null> {
const result = await this.pool.query(
'SELECT version, data FROM snapshots WHERE stream_id = $1',
[streamId]
);
if (result.rows.length === 0) return null;
return {
version: result.rows[0].version,
state: JSON.parse(result.rows[0].data),
};
}
}
// Aggregate loading with snapshots
class OrderRepository {
async getById(orderId: string): Promise<Order> {
const order = new Order();
// 1. Load snapshot
const snapshot = await this.snapshotStore.getLatestSnapshot(orderId);
let fromVersion = 0;
if (snapshot) {
order.restoreFromSnapshot(snapshot.state);
fromVersion = snapshot.version;
}
// 2. Load only events after snapshot
const events = await this.eventStore.readStream(orderId, fromVersion);
order.loadFromHistory(events);
// 3. Create snapshot after certain number of events
if (events.length > 50) {
await this.snapshotStore.saveSnapshot(orderId, {
version: order.version,
state: order.toSnapshot(),
});
}
return order;
}
}
3.5 Projections (Read Model)
// Projection that builds read model from events
class OrderProjection {
constructor(private readDb: Pool) {}
async handle(event: DomainEvent): Promise<void> {
if (event instanceof OrderPlaced) {
await this.readDb.query(
`INSERT INTO order_read_model (id, customer_id, status, total_amount, created_at, version)
VALUES ($1, $2, 'PLACED', $3, $4, $5)`,
[event.orderId, event.customerId, event.totalAmount.amount, event.occurredAt, event.version]
);
for (const item of event.items) {
await this.readDb.query(
`INSERT INTO order_items_read_model (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)`,
[event.orderId, item.productId, item.quantity, item.unitPrice]
);
}
} else if (event instanceof OrderPaymentConfirmed) {
await this.readDb.query(
`UPDATE order_read_model SET status = 'CONFIRMED', version = $2 WHERE id = $1`,
[event.orderId, event.version]
);
} else if (event instanceof OrderCancelled) {
await this.readDb.query(
`UPDATE order_read_model SET status = 'CANCELLED', version = $2 WHERE id = $1`,
[event.orderId, event.version]
);
}
}
}
4. CQRS (Command Query Responsibility Segregation)
CQRS separates the write model (Command) and read model (Query) to optimize each independently.
4.1 CQRS Architecture
┌─────────────────────────────────────────────────┐
│ Client │
│ ┌──────────┐ ┌──────────┐ │
│ │ Write │ │ Read │ │
│ │ Request │ │ Request │ │
│ └────┬─────┘ └─────┬────┘ │
└──────────────┼──────────────┼───────────────────┘
│ │
┌──────────▼──────┐ ┌────▼──────────┐
│ Command Side │ │ Query Side │
│ │ │ │
│ Command Handler│ │ Query Handler│
│ │ │ │ │ │
│ ┌────▼──────┐ │ │ ┌────▼────┐ │
│ │ Domain │ │ │ │ Read DB │ │
│ │ Model │ │ │ │ (Denorm)│ │
│ └────┬──────┘ │ │ └─────────┘ │
│ ┌────▼──────┐ │ │ ▲ │
│ │ Write DB │ │ │ │ │
│ └────┬──────┘ │ │ Projection │
└───────┼─────────┘ └───────┼───────┘
│ │
└─── Events ─────────┘
4.2 Command Side Implementation
// Command definition
class PlaceOrderCommand {
constructor(
public readonly customerId: string,
public readonly items: OrderItemDto[],
public readonly shippingAddress: AddressDto
) {}
}
// Command Handler
class PlaceOrderHandler {
constructor(
private orderRepo: OrderRepository,
private eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// Business logic
const orderId = generateOrderId();
const items = command.items.map(i => new OrderItem(i.productId, i.quantity, i.unitPrice));
const total = items.reduce((sum, i) => sum + i.quantity * i.unitPrice, 0);
// Create aggregate and apply events
const order = Order.place(orderId, command.customerId, items, new Money(total, 'USD'));
// Save to event store
await this.orderRepo.save(order);
// Publish events
for (const event of order.getUncommittedEvents()) {
await this.eventBus.publish(event);
}
return orderId;
}
}
// Command Bus
class CommandBus {
private handlers = new Map<string, CommandHandler<unknown>>();
register<T extends Command>(commandType: string, handler: CommandHandler<T>): void {
this.handlers.set(commandType, handler);
}
async dispatch<T extends Command>(command: T): Promise<unknown> {
const handler = this.handlers.get(command.constructor.name);
if (!handler) {
throw new Error(`No handler for ${command.constructor.name}`);
}
return handler.handle(command);
}
}
4.3 Query Side Implementation
// Query definition
class GetOrderDetailsQuery {
constructor(public readonly orderId: string) {}
}
class ListCustomerOrdersQuery {
constructor(
public readonly customerId: string,
public readonly page: number = 1,
public readonly pageSize: number = 20
) {}
}
// Query Handler
class GetOrderDetailsHandler {
constructor(private readDb: Pool) {}
async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsDto> {
const result = await this.readDb.query(
`SELECT o.*, json_agg(i.*) as items
FROM order_read_model o
JOIN order_items_read_model i ON o.id = i.order_id
WHERE o.id = $1
GROUP BY o.id`,
[query.orderId]
);
if (result.rows.length === 0) {
throw new NotFoundError(`Order ${query.orderId} not found`);
}
return this.mapToDto(result.rows[0]);
}
}
// Eventual Consistency
// Read model updates after event processing, so slight delays may occur
class OrderQueryService {
async getOrderWithConsistencyCheck(
orderId: string,
expectedVersion?: number
): Promise<OrderDetailsDto> {
const order = await this.getOrderDetails(orderId);
// Check if specific version is required
if (expectedVersion && order.version < expectedVersion) {
// Projection hasn't caught up yet
// Wait briefly and retry, or query directly from Command side
await this.waitForProjection(orderId, expectedVersion);
return this.getOrderDetails(orderId);
}
return order;
}
}
5. Saga Pattern
The Saga pattern manages distributed transactions as a sequence of local transactions.
5.1 Choreography vs Orchestration
Choreography:
Services collaborate autonomously through events
Order Service ──OrderPlaced──> Payment Service
──PaymentCompleted──> Inventory Service
──InventoryReserved──> Shipping Service
Pros: Low coupling, no single point of failure
Cons: Hard to track complex workflows, possible cyclic dependencies
─────────────────────────────────────────────────
Orchestration:
A central orchestrator controls the workflow
┌──────────────────┐
│ Order Saga │
│ Orchestrator │
└───────┬──────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌──────▼──────┐ ┌─────▼───────┐ ┌─────▼───────┐
│ Payment │ │ Inventory │ │ Shipping │
│ Service │ │ Service │ │ Service │
└─────────────┘ └─────────────┘ └─────────────┘
Pros: Easy to track flow, testable
Cons: Orchestrator is single point of failure
5.2 Orchestration Saga Implementation
// Saga State
enum SagaState {
STARTED = 'STARTED',
PAYMENT_PENDING = 'PAYMENT_PENDING',
INVENTORY_PENDING = 'INVENTORY_PENDING',
SHIPPING_PENDING = 'SHIPPING_PENDING',
COMPLETED = 'COMPLETED',
COMPENSATING = 'COMPENSATING',
COMPENSATED = 'COMPENSATED',
FAILED = 'FAILED',
}
// Order Processing Saga
class OrderProcessingSaga {
private state: SagaState = SagaState.STARTED;
private orderId!: string;
private paymentId?: string;
private reservationId?: string;
constructor(
private sagaStore: SagaStore,
private commandBus: CommandBus
) {}
// Start: when order is placed
async start(event: OrderPlaced): Promise<void> {
this.orderId = event.orderId;
this.state = SagaState.PAYMENT_PENDING;
await this.sagaStore.save(this);
// Step 1: Request payment
await this.commandBus.dispatch(new ProcessPaymentCommand(
event.orderId, event.totalAmount
));
}
async handlePaymentCompleted(event: PaymentCompleted): Promise<void> {
this.paymentId = event.paymentId;
this.state = SagaState.INVENTORY_PENDING;
await this.sagaStore.save(this);
// Step 2: Reserve inventory
await this.commandBus.dispatch(new ReserveInventoryCommand(
this.orderId, event.paymentId
));
}
async handlePaymentFailed(event: PaymentFailed): Promise<void> {
this.state = SagaState.COMPENSATING;
await this.sagaStore.save(this);
// Compensate: Cancel order
await this.commandBus.dispatch(new CancelOrderCommand(
this.orderId, 'Payment failed'
));
this.state = SagaState.COMPENSATED;
await this.sagaStore.save(this);
}
async handleInventoryReserved(event: InventoryReserved): Promise<void> {
this.reservationId = event.reservationId;
this.state = SagaState.SHIPPING_PENDING;
await this.sagaStore.save(this);
// Step 3: Request shipping
await this.commandBus.dispatch(new CreateShipmentCommand(
this.orderId, this.reservationId
));
}
async handleInventoryFailed(event: InventoryReservationFailed): Promise<void> {
this.state = SagaState.COMPENSATING;
await this.sagaStore.save(this);
// Compensate: Refund payment then cancel order
await this.commandBus.dispatch(new RefundPaymentCommand(
this.orderId, this.paymentId!
));
await this.commandBus.dispatch(new CancelOrderCommand(
this.orderId, 'Inventory unavailable'
));
this.state = SagaState.COMPENSATED;
await this.sagaStore.save(this);
}
async handleShipmentCreated(event: ShipmentCreated): Promise<void> {
this.state = SagaState.COMPLETED;
await this.sagaStore.save(this);
// Confirm order
await this.commandBus.dispatch(new ConfirmOrderCommand(this.orderId));
}
}
6. Outbox Pattern
6.1 The Dual-Write Problem
Database updates and message publishing must be performed atomically. The Outbox pattern solves this.
Problem Scenario:
1. Save order to DB ✓
2. Publish event to Kafka ✗ ← If this fails, data inconsistency!
Outbox Pattern:
1. Within DB transaction:
- Save order ✓
- Save event to outbox table ✓
2. Separate process reads from outbox and publishes to Kafka
6.2 Outbox Table Schema
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
published_at TIMESTAMP,
retry_count INT DEFAULT 0,
last_error TEXT
);
CREATE INDEX idx_outbox_unpublished ON outbox (created_at)
WHERE published_at IS NULL;
6.3 Outbox Implementation
class OutboxService {
async saveWithOutbox(
order: Order,
events: DomainEvent[],
client: PoolClient
): Promise<void> {
// Save order and events in same transaction
await client.query(
'INSERT INTO orders (id, customer_id, status, total) VALUES ($1, $2, $3, $4)',
[order.id, order.customerId, order.status, order.total]
);
for (const event of events) {
await client.query(
`INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)`,
['Order', order.id, event.eventType, JSON.stringify(event)]
);
}
}
}
// Outbox Relay: periodically reads outbox and publishes
class OutboxRelay {
async processOutbox(): Promise<void> {
const client = await this.pool.connect();
try {
// Read unpublished events
const result = await client.query(
`SELECT * FROM outbox
WHERE published_at IS NULL AND retry_count < 5
ORDER BY created_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED`
);
for (const row of result.rows) {
try {
await this.kafka.produce(
`events.${row.aggregate_type.toLowerCase()}`,
row.aggregate_id,
row.payload
);
// Mark as published
await client.query(
'UPDATE outbox SET published_at = NOW() WHERE id = $1',
[row.id]
);
} catch (error) {
// Increment failure count
await client.query(
'UPDATE outbox SET retry_count = retry_count + 1, last_error = $1 WHERE id = $2',
[error.message, row.id]
);
}
}
} finally {
client.release();
}
}
}
6.4 CDC (Change Data Capture) with Debezium
# Debezium Connector Configuration
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "orderdb",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "events.orders"
}
}
7. Idempotency
7.1 Why Idempotency Matters
In event-driven systems, messages may be delivered multiple times (at-least-once delivery). Idempotency ensures processing the same event multiple times produces the same result.
7.2 Idempotency Key Implementation
class IdempotencyStore {
async isProcessed(idempotencyKey: string): Promise<boolean> {
const result = await this.pool.query(
'SELECT 1 FROM processed_events WHERE idempotency_key = $1',
[idempotencyKey]
);
return result.rows.length > 0;
}
async markProcessed(
idempotencyKey: string,
result: unknown,
client: PoolClient
): Promise<void> {
await client.query(
`INSERT INTO processed_events (idempotency_key, result, processed_at)
VALUES ($1, $2, NOW())
ON CONFLICT (idempotency_key) DO NOTHING`,
[idempotencyKey, JSON.stringify(result)]
);
}
}
// Idempotent Event Handler
class IdempotentEventHandler<T extends DomainEvent> {
constructor(
private idempotencyStore: IdempotencyStore,
private innerHandler: EventHandler<T>
) {}
async handle(event: T): Promise<void> {
const key = `${event.eventType}:${event.eventId}`;
// Check if event already processed
if (await this.idempotencyStore.isProcessed(key)) {
console.log(`Event ${key} already processed, skipping`);
return;
}
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Process event
await this.innerHandler.handle(event);
// Mark as processed
await this.idempotencyStore.markProcessed(key, null, client);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
7.3 Idempotency Strategies
| Strategy | Description | Example |
|---|---|---|
| Idempotency Key | Store processed event IDs | Event ID based deduplication |
| Conditional Update | Update only if conditions met | WHERE version = expected |
| Upsert | INSERT ON CONFLICT DO UPDATE | Natural idempotency |
| Deduplication Log | Record processed messages | Consumer-side dedup table |
8. Eventual Consistency
8.1 Consistency Models
Strong Consistency:
Writer ──Write──> DB ──Read──> Reader
└── Always sees latest ──┘
Eventual Consistency:
Writer ──Write──> Write DB ──Event──> Read DB ──Read──> Reader
└── May see stale data temporarily ──┘
8.2 Handling Eventual Consistency
// Strategy 1: Optimistic UI Update
// After write, update UI immediately without waiting for read model
class OrderController {
async placeOrder(req: Request, res: Response): Promise<void> {
const orderId = await this.commandBus.dispatch(
new PlaceOrderCommand(req.body)
);
// Return orderId immediately
// Client uses this to poll or subscribe for updates
res.json({
orderId,
status: 'PROCESSING',
message: 'Order is being processed'
});
}
}
// Strategy 2: Version-based consistency check
class OrderApiController {
async getOrder(req: Request, res: Response): Promise<void> {
const minVersion = req.headers['x-min-version']
? parseInt(req.headers['x-min-version'] as string)
: undefined;
const order = await this.queryService.getOrderWithConsistencyCheck(
req.params.orderId,
minVersion
);
res.json(order);
}
}
// Strategy 3: Subscription-based updates
class OrderEventsWebSocket {
handleConnection(client: WebSocket, orderId: string): void {
this.eventBus.subscribe('order.*', async (event: DomainEvent) => {
if (event.aggregateId === orderId) {
client.send(JSON.stringify({
type: event.eventType,
data: event,
}));
}
});
}
}
9. Schema Evolution and Compatibility
9.1 Event Versioning
// Schema evolution strategy
interface OrderPlacedV1 {
orderId: string;
customerId: string;
totalAmount: number;
}
interface OrderPlacedV2 extends OrderPlacedV1 {
currency: string; // New field (default: 'USD')
shippingMethod: string; // New field (default: 'STANDARD')
}
// Event upcaster: converts old versions to new versions
class OrderPlacedUpcaster {
upcast(event: OrderPlacedV1): OrderPlacedV2 {
return {
...event,
currency: 'USD',
shippingMethod: 'STANDARD',
};
}
}
9.2 Schema Compatibility Types
| Type | Description | Upgrade Strategy |
|---|---|---|
| BACKWARD | Read old data with new schema | Upgrade consumers first |
| FORWARD | Read new data with old schema | Upgrade producers first |
| FULL | Bidirectional compatibility | Safest, recommended |
10. Kafka + TypeScript Production Implementation
10.1 Kafka Producer
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
retry: { retries: 5 },
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
idempotent: true,
maxInFlightRequests: 5,
transactionalId: 'order-producer-1',
});
async function publishOrderEvent(event: OrderPlaced): Promise<void> {
await producer.send({
topic: 'events.orders',
messages: [
{
key: event.orderId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'event-id': event.eventId,
'correlation-id': event.metadata.correlationId || '',
},
},
],
});
}
10.2 Kafka Consumer
const consumer = kafka.consumer({
groupId: 'payment-service-group',
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
async function startConsumer(): Promise<void> {
await consumer.connect();
await consumer.subscribe({
topics: ['events.orders'],
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
const eventType = message.headers?.['event-type']?.toString();
console.log(`Received: ${eventType} from ${topic}[${partition}]`);
try {
switch (eventType) {
case 'order.placed':
await paymentHandler.onOrderPlaced(event);
break;
case 'order.cancelled':
await paymentHandler.onOrderCancelled(event);
break;
default:
console.log(`Unknown event type: ${eventType}`);
}
} catch (error) {
console.error(`Failed to process event: ${error}`);
// DLQ handling or retry
}
},
});
}
10.3 Dead Letter Queue (DLQ) Pattern
class DeadLetterQueueHandler {
private dlqProducer: Producer;
async handleFailedMessage(
originalTopic: string,
message: KafkaMessage,
error: Error
): Promise<void> {
await this.dlqProducer.send({
topic: `${originalTopic}.dlq`,
messages: [
{
key: message.key,
value: message.value,
headers: {
...message.headers,
'dlq-reason': error.message,
'dlq-timestamp': new Date().toISOString(),
'original-topic': originalTopic,
'retry-count': String(
parseInt(message.headers?.['retry-count']?.toString() || '0') + 1
),
},
},
],
});
}
}
11. RabbitMQ Implementation
11.1 Exchange Types and Routing
Exchange Types:
├── Direct Exchange
│ └── Routes by exact routing key match
│ Use case: Specific service targeting
├── Topic Exchange
│ └── Routes by routing key pattern (*.order.#)
│ Use case: Flexible event routing
├── Fanout Exchange
│ └── Broadcasts to all bound queues
│ Use case: Event notification to all consumers
└── Headers Exchange
└── Routes by message header matching
Use case: Complex routing logic
11.2 RabbitMQ with Node.js
import amqp from 'amqplib';
class RabbitMQEventBus {
private connection!: amqp.Connection;
private channel!: amqp.Channel;
async connect(): Promise<void> {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
// Declare exchange
await this.channel.assertExchange('domain-events', 'topic', {
durable: true,
});
}
async publish(event: DomainEvent): Promise<void> {
const routingKey = `events.${event.eventType}`;
this.channel.publish(
'domain-events',
routingKey,
Buffer.from(JSON.stringify(event)),
{
persistent: true,
messageId: event.eventId,
contentType: 'application/json',
headers: {
'event-type': event.eventType,
'aggregate-id': event.aggregateId,
},
}
);
}
async subscribe(pattern: string, handler: (event: DomainEvent) => Promise<void>): Promise<void> {
const queue = await this.channel.assertQueue('', { exclusive: true });
await this.channel.bindQueue(queue.queue, 'domain-events', pattern);
await this.channel.consume(queue.queue, async (msg) => {
if (!msg) return;
try {
const event = JSON.parse(msg.content.toString());
await handler(event);
this.channel.ack(msg);
} catch (error) {
// Reject and requeue or send to DLQ
this.channel.nack(msg, false, false);
}
});
}
}
12. Common Anti-Patterns
12.1 Overly Granular Events
Excessively fine-grained events increase system complexity. Design events at the business meaning level.
12.2 Tight Coupling Through Events
Including too much data in events creates dependency on the producer's internal structure. Events are public contracts and should be designed carefully.
12.3 Synchronous Event Processing
Calling other services synchronously in event handlers defeats the purpose of EDA. Maintain asynchronous processing.
12.4 Ignoring Idempotency
Failing to consider "at-least-once" delivery guarantees leads to duplicate processing and data inconsistency.
12.5 Missing Correlation IDs
Without correlation IDs, tracing a request across multiple services becomes nearly impossible. Always propagate correlation IDs through events.
12.6 No Dead Letter Queue
Without DLQ, failed messages are lost or block the queue. Always implement a DLQ strategy for handling poison messages.
13. EDA Pattern Selection Guide
| Scenario | Recommended Pattern |
|---|---|
| Audit trail required | Event Sourcing |
| Read/write ratio heavily skewed | CQRS |
| Multi-service transactions | Saga Pattern |
| DB + message broker atomicity | Outbox Pattern |
| High throughput event streaming | Kafka |
| Complex routing logic | RabbitMQ |
| Simple event notification | Event Notification + SNS/SQS |
14. Quiz
Q1: Why are snapshots needed in Event Sourcing?
Answer: Because Aggregate restoration time grows as events accumulate
In Event Sourcing, restoring the current state of an Aggregate requires replaying all events from the beginning. When events number in the thousands or tens of thousands, restoration time increases dramatically. Snapshots store the Aggregate state at a specific point, so only events after that point need to be replayed.
Q2: What are the pros and cons of Choreography Saga vs Orchestration Saga?
Answer:
Choreography: Pros - Suitable for simple workflows, low coupling between services, no single point of failure. Cons - Difficult to track complex workflows, possible cyclic dependencies, harder to test.
Orchestration: Pros - Suitable for complex workflows, entire flow visible in one place, easier to test. Cons - Orchestrator becomes single point of failure, risk of logic concentration in orchestrator.
Q3: What is the "dual-write problem" that the Outbox pattern solves?
Answer: The inability to guarantee atomicity when writing to both a database and a message broker simultaneously
Saving data to DB and publishing events to Kafka are operations on different systems and cannot be wrapped in a single transaction. If DB save succeeds but Kafka publish fails, data inconsistency occurs. The Outbox pattern stores events in an outbox table within the same DB transaction, and a separate process reads and publishes them to the broker, guaranteeing atomicity.
Q4: What does "Eventual Consistency" mean in CQRS, and how do you handle it?
Answer: Changes in the write model are not immediately reflected in the read model
In CQRS, changes from the Command side are propagated asynchronously to the Query side through events. Reading immediately after writing may show stale state. Handling strategies include applying optimistic updates in the UI, querying directly from the Command side after specific operations, or using version-based conditional queries.
Q5: Why is idempotency critical in event-driven systems?
Answer: Because "at-least-once" delivery guarantees mean the same event may be delivered multiple times
Most message brokers guarantee "at-least-once" delivery. Network issues, consumer restarts, etc. can cause the same event to be delivered multiple times. Without idempotency, the same payment could be processed twice, or inventory could be deducted multiple times. Idempotency keys, deduplication, and conditional updates prevent these issues.
15. References
- Martin Fowler - Event Sourcing - Event Sourcing concepts
- Microsoft - CQRS Pattern - CQRS pattern guide
- Microservices.io - Saga Pattern - Saga pattern details
- Debezium Documentation - CDC tool official docs
- KafkaJS Documentation - Node.js Kafka client
- Apache Kafka Documentation - Kafka official docs
- Confluent Schema Registry - Schema Registry
- EventStoreDB - Dedicated event store database
- Axon Framework - Java CQRS/ES framework
- NestJS CQRS - NestJS CQRS recipe
- Domain-Driven Design Reference - DDD reference
- Microservices Patterns by Chris Richardson - Microservices pattern catalog
- RabbitMQ Documentation - RabbitMQ official docs