Skip to content

필사 모드: Event-Driven Architecture Design Patterns — Kafka, CQRS, Event Sourcing Practical Guide

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Introduction

As the number of microservices grows, inter-service communication becomes complex. Services tightly coupled through synchronous calls suffer cascading failures when one goes down. **Event-Driven Architecture (EDA)** solves this problem through **asynchronous events**. In this article, we cover the core EDA patterns -- CQRS and Event Sourcing -- with practical code centered around Kafka.

1. Event-Driven Architecture Overview

Synchronous vs Asynchronous Communication

graph LR

subgraph "Synchronous (REST)"

A[Order Service] -->|HTTP Call| B[Payment Service]

B -->|HTTP Call| C[Inventory Service]

C -->|HTTP Call| D[Notification Service]

end

In the synchronous approach, if the Payment Service goes down, the order also fails. All services are **tightly coupled**.

graph LR

subgraph "Asynchronous (Event-Driven)"

A2[Order Service] -->|Publish Event| K[Kafka]

K -->|Subscribe| B2[Payment Service]

K -->|Subscribe| C2[Inventory Service]

K -->|Subscribe| D2[Notification Service]

end

In the asynchronous approach, the Order Service only needs to publish an event. Each service consumes events **independently**.

Core Components of EDA

| Component | Role | Example |

| ---------------- | ------------------------------------------- | ----------------------------------------- |

| **Producer** | Creates and publishes events | Order Service |

| **Event Broker** | Stores and delivers events | Apache Kafka |

| **Consumer** | Subscribes to and processes events | Payment, Inventory, Notification Services |

| **Event** | A message representing a fact that occurred | OrderCreated, PaymentCompleted |

2. Apache Kafka Basics

Kafka Architecture

graph TB

P1[Producer 1] --> T[Topic: orders]

P2[Producer 2] --> T

T --> PA0[Partition 0]

T --> PA1[Partition 1]

T --> PA2[Partition 2]

PA0 --> C1[Consumer Group A - Consumer 1]

PA1 --> C2[Consumer Group A - Consumer 2]

PA2 --> C3[Consumer Group A - Consumer 3]

PA0 --> C4[Consumer Group B - Consumer 1]

PA1 --> C4

PA2 --> C4

Key Concepts

- **Topic**: A logical channel where events are stored (e.g., `orders`, `payments`)

- **Partition**: A unit of parallel processing within a Topic. Ordering is guaranteed by partition key

- **Consumer Group**: Consumers in the same group are assigned partitions for parallel processing

- **Offset**: The sequence number of a message within a partition. Tracks how far a consumer has read

Kafka Producer Implementation (Python)

from confluent_kafka import Producer

config = {

'bootstrap.servers': 'kafka-broker:9092',

'client.id': 'order-service',

'acks': 'all', # Confirm written to all replicas

'retries': 3,

'retry.backoff.ms': 1000,

}

producer = Producer(config)

def publish_order_event(order: dict):

"""Publish an order event"""

event = {

'event_type': 'OrderCreated',

'event_id': str(uuid.uuid4()),

'timestamp': datetime.utcnow().isoformat(),

'data': {

'order_id': order['id'],

'user_id': order['user_id'],

'items': order['items'],

'total_amount': order['total'],

}

}

Partition key = user_id -> Orders from the same user go to the same partition

producer.produce(

topic='orders',

key=str(order['user_id']).encode('utf-8'),

value=json.dumps(event).encode('utf-8'),

callback=delivery_callback,

)

producer.flush()

def delivery_callback(err, msg):

if err:

print(f'Delivery failed: {err}')

else:

print(f'Delivery succeeded: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

Kafka Consumer Implementation (Python)

from confluent_kafka import Consumer

config = {

'bootstrap.servers': 'kafka-broker:9092',

'group.id': 'payment-service',

'auto.offset.reset': 'earliest',

'enable.auto.commit': False, # Manual commit for at-least-once guarantee

}

consumer = Consumer(config)

consumer.subscribe(['orders'])

def process_events():

"""Event consumption loop"""

while True:

msg = consumer.poll(timeout=1.0)

if msg is None:

continue

if msg.error():

print(f'Error: {msg.error()}')

continue

event = json.loads(msg.value().decode('utf-8'))

try:

if event['event_type'] == 'OrderCreated':

handle_order_created(event['data'])

Manual commit after successful processing

consumer.commit(msg)

except Exception as e:

print(f'Processing failed: {e}')

Send to DLQ (Dead Letter Queue)

send_to_dlq(event, str(e))

def handle_order_created(data: dict):

"""Handle order created event -> initiate payment"""

payment = create_payment(

order_id=data['order_id'],

amount=data['total_amount'],

)

Publish payment completed event

publish_payment_event(payment)

3. CQRS (Command Query Responsibility Segregation)

CQRS is a pattern that separates the **write (Command)** and **read (Query)** models.

CQRS Architecture Flow

graph TB

Client[Client]

Client -->|Command| CmdAPI[Command API]

Client -->|Query| QryAPI[Query API]

CmdAPI --> WriteDB[(Write DB - PostgreSQL)]

CmdAPI -->|Publish Event| Kafka[Kafka]

Kafka -->|Consume Event| Projector[Projector / Denormalizer]

Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]

QryAPI --> ReadDB

Why CQRS?

| Traditional Approach | CQRS |

| ------------------------------------------ | -------------------------------------- |

| Single model for reads and writes | Separate read/write models |

| Read performance degraded by complex JOINs | Denormalized model optimized for reads |

| Difficult to scale | Independent scaling of reads/writes |

| Suitable for simple domains | Suitable for complex domains |

CQRS Implementation Example

=== Command Side (Write) ===

class OrderCommandHandler:

def __init__(self, db, event_publisher):

self.db = db

self.publisher = event_publisher

def create_order(self, cmd: CreateOrderCommand):

"""Handle create order command"""

Business logic validation

if not self._validate_stock(cmd.items):

raise InsufficientStockError()

Save to Write DB

order = Order(

id=uuid.uuid4(),

user_id=cmd.user_id,

items=cmd.items,

status='CREATED',

)

self.db.save(order)

Publish event (asynchronously update Read Model)

self.publisher.publish('orders', OrderCreatedEvent(

order_id=order.id,

user_id=order.user_id,

items=order.items,

total=order.total,

created_at=order.created_at,

))

return order.id

=== Projector (Event -> Read Model Transformation) ===

class OrderProjector:

def __init__(self, read_db):

self.read_db = read_db # Elasticsearch or Redis

def handle_order_created(self, event: OrderCreatedEvent):

"""Order created event -> Save denormalized data to read model"""

user = self.get_user_info(event.user_id)

Denormalized read model (no JOINs needed!)

order_view = {

'order_id': str(event.order_id),

'user_name': user.name,

'user_email': user.email,

'items': [

{'name': item.name, 'qty': item.qty, 'price': item.price}

for item in event.items

],

'total': event.total,

'status': 'CREATED',

'created_at': event.created_at.isoformat(),

}

self.read_db.index('orders', order_view)

=== Query Side (Read) ===

class OrderQueryHandler:

def __init__(self, read_db):

self.read_db = read_db

def get_user_orders(self, user_id: str, page: int = 1):

"""Retrieve user's order list (returned directly from Read Model)"""

return self.read_db.search('orders', {

'query': {'term': {'user_id': user_id}},

'sort': [{'created_at': 'desc'}],

'from': (page - 1) * 20,

'size': 20,

})

4. Event Sourcing -- Events Are the Data

Event Sourcing is a pattern that stores the **sequence of state change events** instead of storing the current state.

Event Sourcing Flow

sequenceDiagram

participant Client

participant OrderAggregate

participant EventStore

participant Projector

participant ReadDB

Client->>OrderAggregate: CreateOrder Command

OrderAggregate->>EventStore: Store OrderCreated Event

EventStore->>Projector: Propagate Event

Projector->>ReadDB: Update Read Model

Client->>OrderAggregate: PayOrder Command

OrderAggregate->>EventStore: Store OrderPaid Event

EventStore->>Projector: Propagate Event

Projector->>ReadDB: Update Read Model

Note over EventStore: Events are never deleted or modified!

Traditional CRUD vs Event Sourcing

Traditional CRUD -- Only stores current state

orders table:

| id | user_id | status | total |

|----|---------|-----------|--------|

| 1 | 100 | SHIPPED | 50000 |

-> "When was it ordered, when was it paid, when was it shipped?" Unknown!

Event Sourcing -- Preserves entire change history

events table:

| seq | aggregate_id | type | data | timestamp |

|-----|-------------|----------------|-------------------------|---------------------|

| 1 | order-1 | OrderCreated | {items: [...], total: 50000} | 2026-03-02 10:00 |

| 2 | order-1 | PaymentReceived| {method: "card"} | 2026-03-02 10:05 |

| 3 | order-1 | OrderShipped | {tracking: "KR123"} | 2026-03-02 14:30 |

-> The entire history can be reconstructed!

Event Sourcing Implementation

from dataclasses import dataclass, field

from datetime import datetime

from typing import List

=== Event Definitions ===

@dataclass

class DomainEvent:

event_id: str

aggregate_id: str

event_type: str

data: dict

timestamp: datetime

version: int

=== Aggregate (Domain Object) ===

class OrderAggregate:

def __init__(self, order_id: str):

self.id = order_id

self.status = None

self.items = []

self.total = 0

self.version = 0

self._pending_events: List[DomainEvent] = []

--- Command Handler ---

def create(self, user_id: str, items: list):

"""Create order command"""

if self.status is not None:

raise Exception("Order already exists")

Create event (no state change, only record the event)

self._apply_event(DomainEvent(

event_id=str(uuid.uuid4()),

aggregate_id=self.id,

event_type='OrderCreated',

data={'user_id': user_id, 'items': items,

'total': sum(i['price'] * i['qty'] for i in items)},

timestamp=datetime.utcnow(),

version=self.version + 1,

))

def pay(self, payment_method: str):

"""Payment command"""

if self.status != 'CREATED':

raise Exception(f"Cannot process payment in state: {self.status}")

self._apply_event(DomainEvent(

event_id=str(uuid.uuid4()),

aggregate_id=self.id,

event_type='OrderPaid',

data={'payment_method': payment_method},

timestamp=datetime.utcnow(),

version=self.version + 1,

))

--- Event Handler (state changes happen only here!) ---

def _on_order_created(self, event: DomainEvent):

self.status = 'CREATED'

self.items = event.data['items']

self.total = event.data['total']

def _on_order_paid(self, event: DomainEvent):

self.status = 'PAID'

def _apply_event(self, event: DomainEvent):

"""Apply event (state change + event storage)"""

handler = {

'OrderCreated': self._on_order_created,

'OrderPaid': self._on_order_paid,

}.get(event.event_type)

if handler:

handler(event)

self.version = event.version

self._pending_events.append(event)

@classmethod

def from_events(cls, order_id: str, events: List[DomainEvent]):

"""Restore current state from event sequence"""

aggregate = cls(order_id)

for event in events:

aggregate._apply_event(event)

aggregate._pending_events.clear() # Already persisted events

return aggregate

=== Event Store ===

class EventStore:

def __init__(self, db):

self.db = db

def save(self, aggregate: OrderAggregate):

"""Write unsaved events to the Event Store"""

for event in aggregate._pending_events:

self.db.execute(

"""INSERT INTO events

(event_id, aggregate_id, event_type, data, timestamp, version)

VALUES (%s, %s, %s, %s, %s, %s)""",

(event.event_id, event.aggregate_id, event.event_type,

json.dumps(event.data), event.timestamp, event.version)

)

aggregate._pending_events.clear()

def load(self, aggregate_id: str) -> OrderAggregate:

"""Read events from the Event Store and restore the Aggregate"""

rows = self.db.query(

"SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",

(aggregate_id,)

)

events = [DomainEvent(**row) for row in rows]

return OrderAggregate.from_events(aggregate_id, events)

5. Saga Pattern -- Distributed Transaction Management

A pattern for managing transactions that span multiple services in microservices.

Choreography Saga (Event-Based)

sequenceDiagram

participant Order as Order Service

participant Kafka

participant Payment as Payment Service

participant Stock as Inventory Service

participant Notify as Notification Service

Order->>Kafka: OrderCreated

Kafka->>Payment: Receive OrderCreated

Payment->>Kafka: PaymentCompleted

Kafka->>Stock: Receive PaymentCompleted

Stock->>Kafka: StockReserved

Kafka->>Notify: Receive StockReserved

Notify->>Notify: Send notification to user

Note over Payment,Stock: What if payment fails?

Payment->>Kafka: PaymentFailed

Kafka->>Order: Receive PaymentFailed

Order->>Order: Cancel order (compensating transaction)

Orchestration Saga (Orchestrator-Based)

graph TB

Orch[Saga Orchestrator]

Orch -->|1. Create Order| Order[Order Service]

Order -->|Success| Orch

Orch -->|2. Request Payment| Payment[Payment Service]

Payment -->|Success| Orch

Orch -->|3. Deduct Stock| Stock[Inventory Service]

Stock -->|Success| Orch

Orch -->|4. Send Notification| Notify[Notification Service]

Stock -->|Failure| Orch

Orch -->|Compensate: Cancel Payment| Payment

Orch -->|Compensate: Cancel Order| Order

Comparison of Both Approaches

| Item | Choreography | Orchestration |

| --------------- | ---------------------------------- | ---------------------------- |

| Central Control | None (each service is autonomous) | Orchestrator controls |

| Coupling | Loose | Depends on orchestrator |

| Debugging | Difficult (requires event tracing) | Relatively easier |

| Complexity | Complex as services increase | Orchestrator becomes complex |

| Best Suited For | Simple flows (3-4 steps) | Complex flows (5+ steps) |

6. Practical Considerations

Idempotency

Since events can be delivered more than once, **idempotency** is essential:

def handle_payment_event(event: dict):

"""Ensure idempotency -- prevent duplicate event processing"""

event_id = event['event_id']

Check if event was already processed

if is_already_processed(event_id):

logger.info(f'Event already processed: {event_id}')

return

Process business logic

process_payment(event['data'])

Record as processed

mark_as_processed(event_id)

Dead Letter Queue (DLQ)

Store failed events in a separate queue:

MAX_RETRIES = 3

def consume_with_retry(event: dict, retry_count: int = 0):

try:

handle_event(event)

except Exception as e:

if retry_count < MAX_RETRIES:

Retry (exponential backoff)

time.sleep(2 ** retry_count)

consume_with_retry(event, retry_count + 1)

else:

Send to DLQ

producer.produce(

topic='orders.dlq',

value=json.dumps({

'original_event': event,

'error': str(e),

'failed_at': datetime.utcnow().isoformat(),

}).encode('utf-8'),

)

Event Schema Evolution

v1 event

{"event_type": "OrderCreated", "version": 1,

"data": {"order_id": "123", "total": 50000}}

v2 event (currency field added)

{"event_type": "OrderCreated", "version": 2,

"data": {"order_id": "123", "total": 50000, "currency": "KRW"}}

Upcaster to convert v1 -> v2

def upcast_order_created_v1_to_v2(event: dict) -> dict:

if event.get('version', 1) == 1:

event['data']['currency'] = 'KRW' # Default value

event['version'] = 2

return event

7. EDA Adoption Checklist

- [ ] Are **synchronous calls** between services causing failure propagation?

- [ ] Is the **read/write ratio** significantly different? Consider CQRS

- [ ] Do you need **audit logs/history tracking**? Consider Event Sourcing

- [ ] Do you have an event **idempotency** handling strategy?

- [ ] Are **DLQ** monitoring and alerting configured?

- [ ] Do you have an event **schema versioning** strategy?

- [ ] Are you managing distributed transactions with the **Saga pattern**?

- [ ] Have you set **partition keys** where event ordering is required?

Quiz

Q1: What is the biggest advantage of Event-Driven Architecture over synchronous approaches?

Loose coupling between services. A failure in one service does not propagate to others, and each

service can scale independently.

Within the same Consumer Group, each partition is assigned to only one consumer. This enables

parallel processing at the partition level while guaranteeing ordering within each partition.

Because the requirements for reading and writing are different. Writes use a normalized model for

data consistency, while reads use a denormalized model for optimized query performance.

By replaying the event sequence stored in the Event Store from the beginning in order to

reconstruct the current state. The Aggregate's from_events() method fulfills this role.

Q5: What is the main difference between Choreography Saga and Orchestration Saga?

Choreography has no central control -- each service autonomously exchanges events to progress.

Orchestration has a central orchestrator that issues commands to each service in order.

The same event can be delivered multiple times due to network errors or consumer restarts. Without

idempotency guarantees, issues like duplicate payments or duplicate inventory deductions can

occur.

It stores events that have failed processing after exceeding the maximum retry count in a separate

queue, allowing them to be manually inspected and reprocessed later.

Events with the same user_id always go to the same partition, ensuring that events for a given

user are processed in order.

In principle, no. Events are immutable, and to change the state, a new compensating event must be

added. This is a core principle of Event Sourcing.

Data consistency between read and write models becomes eventual consistency rather than immediate,

and system complexity increases. It can be over-engineering for simple CRUD applications.

현재 단락 (1/396)

As the number of microservices grows, inter-service communication becomes complex. Services tightly ...

작성 글자: 0원문 글자: 15,333작성 단락: 0/396