- Authors
- Name
- Introduction
- 1. Event-Driven Architecture Overview
- 2. Apache Kafka Basics
- 3. CQRS (Command Query Responsibility Segregation)
- 4. Event Sourcing -- Events Are the Data
- 5. Saga Pattern -- Distributed Transaction Management
- 6. Practical Considerations
- 7. EDA Adoption Checklist
- Quiz

Introduction
As the number of microservices grows, inter-service communication becomes complex. Services tightly coupled through synchronous calls suffer cascading failures when one goes down. Event-Driven Architecture (EDA) solves this problem through asynchronous events. In this article, we cover the core EDA patterns -- CQRS and Event Sourcing -- with practical code centered around Kafka.
1. Event-Driven Architecture Overview
Synchronous vs Asynchronous Communication
graph LR
subgraph "Synchronous (REST)"
A[Order Service] -->|HTTP Call| B[Payment Service]
B -->|HTTP Call| C[Inventory Service]
C -->|HTTP Call| D[Notification Service]
end
In the synchronous approach, if the Payment Service goes down, the order also fails. All services are tightly coupled.
graph LR
subgraph "Asynchronous (Event-Driven)"
A2[Order Service] -->|Publish Event| K[Kafka]
K -->|Subscribe| B2[Payment Service]
K -->|Subscribe| C2[Inventory Service]
K -->|Subscribe| D2[Notification Service]
end
In the asynchronous approach, the Order Service only needs to publish an event. Each service consumes events independently.
Core Components of EDA
| Component | Role | Example |
|---|---|---|
| Producer | Creates and publishes events | Order Service |
| Event Broker | Stores and delivers events | Apache Kafka |
| Consumer | Subscribes to and processes events | Payment, Inventory, Notification Services |
| Event | A message representing a fact that occurred | OrderCreated, PaymentCompleted |
2. Apache Kafka Basics
Kafka Architecture
graph TB
P1[Producer 1] --> T[Topic: orders]
P2[Producer 2] --> T
T --> PA0[Partition 0]
T --> PA1[Partition 1]
T --> PA2[Partition 2]
PA0 --> C1[Consumer Group A - Consumer 1]
PA1 --> C2[Consumer Group A - Consumer 2]
PA2 --> C3[Consumer Group A - Consumer 3]
PA0 --> C4[Consumer Group B - Consumer 1]
PA1 --> C4
PA2 --> C4
Key Concepts
- Topic: A logical channel where events are stored (e.g.,
orders,payments) - Partition: A unit of parallel processing within a Topic. Ordering is guaranteed by partition key
- Consumer Group: Consumers in the same group are assigned partitions for parallel processing
- Offset: The sequence number of a message within a partition. Tracks how far a consumer has read
Kafka Producer Implementation (Python)
from confluent_kafka import Producer
import json
config = {
'bootstrap.servers': 'kafka-broker:9092',
'client.id': 'order-service',
'acks': 'all', # Confirm written to all replicas
'retries': 3,
'retry.backoff.ms': 1000,
}
producer = Producer(config)
def publish_order_event(order: dict):
"""Publish an order event"""
event = {
'event_type': 'OrderCreated',
'event_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'data': {
'order_id': order['id'],
'user_id': order['user_id'],
'items': order['items'],
'total_amount': order['total'],
}
}
# Partition key = user_id -> Orders from the same user go to the same partition
producer.produce(
topic='orders',
key=str(order['user_id']).encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
callback=delivery_callback,
)
producer.flush()
def delivery_callback(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivery succeeded: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
Kafka Consumer Implementation (Python)
from confluent_kafka import Consumer
config = {
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'payment-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manual commit for at-least-once guarantee
}
consumer = Consumer(config)
consumer.subscribe(['orders'])
def process_events():
"""Event consumption loop"""
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f'Error: {msg.error()}')
continue
event = json.loads(msg.value().decode('utf-8'))
try:
if event['event_type'] == 'OrderCreated':
handle_order_created(event['data'])
# Manual commit after successful processing
consumer.commit(msg)
except Exception as e:
print(f'Processing failed: {e}')
# Send to DLQ (Dead Letter Queue)
send_to_dlq(event, str(e))
def handle_order_created(data: dict):
"""Handle order created event -> initiate payment"""
payment = create_payment(
order_id=data['order_id'],
amount=data['total_amount'],
)
# Publish payment completed event
publish_payment_event(payment)
3. CQRS (Command Query Responsibility Segregation)
CQRS is a pattern that separates the write (Command) and read (Query) models.
CQRS Architecture Flow
graph TB
Client[Client]
Client -->|Command| CmdAPI[Command API]
Client -->|Query| QryAPI[Query API]
CmdAPI --> WriteDB[(Write DB - PostgreSQL)]
CmdAPI -->|Publish Event| Kafka[Kafka]
Kafka -->|Consume Event| Projector[Projector / Denormalizer]
Projector --> ReadDB[(Read DB - Elasticsearch/Redis)]
QryAPI --> ReadDB
Why CQRS?
| Traditional Approach | CQRS |
|---|---|
| Single model for reads and writes | Separate read/write models |
| Read performance degraded by complex JOINs | Denormalized model optimized for reads |
| Difficult to scale | Independent scaling of reads/writes |
| Suitable for simple domains | Suitable for complex domains |
CQRS Implementation Example
# === Command Side (Write) ===
class OrderCommandHandler:
def __init__(self, db, event_publisher):
self.db = db
self.publisher = event_publisher
def create_order(self, cmd: CreateOrderCommand):
"""Handle create order command"""
# Business logic validation
if not self._validate_stock(cmd.items):
raise InsufficientStockError()
# Save to Write DB
order = Order(
id=uuid.uuid4(),
user_id=cmd.user_id,
items=cmd.items,
status='CREATED',
)
self.db.save(order)
# Publish event (asynchronously update Read Model)
self.publisher.publish('orders', OrderCreatedEvent(
order_id=order.id,
user_id=order.user_id,
items=order.items,
total=order.total,
created_at=order.created_at,
))
return order.id
# === Projector (Event -> Read Model Transformation) ===
class OrderProjector:
def __init__(self, read_db):
self.read_db = read_db # Elasticsearch or Redis
def handle_order_created(self, event: OrderCreatedEvent):
"""Order created event -> Save denormalized data to read model"""
user = self.get_user_info(event.user_id)
# Denormalized read model (no JOINs needed!)
order_view = {
'order_id': str(event.order_id),
'user_name': user.name,
'user_email': user.email,
'items': [
{'name': item.name, 'qty': item.qty, 'price': item.price}
for item in event.items
],
'total': event.total,
'status': 'CREATED',
'created_at': event.created_at.isoformat(),
}
self.read_db.index('orders', order_view)
# === Query Side (Read) ===
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
def get_user_orders(self, user_id: str, page: int = 1):
"""Retrieve user's order list (returned directly from Read Model)"""
return self.read_db.search('orders', {
'query': {'term': {'user_id': user_id}},
'sort': [{'created_at': 'desc'}],
'from': (page - 1) * 20,
'size': 20,
})
4. Event Sourcing -- Events Are the Data
Event Sourcing is a pattern that stores the sequence of state change events instead of storing the current state.
Event Sourcing Flow
sequenceDiagram
participant Client
participant OrderAggregate
participant EventStore
participant Projector
participant ReadDB
Client->>OrderAggregate: CreateOrder Command
OrderAggregate->>EventStore: Store OrderCreated Event
EventStore->>Projector: Propagate Event
Projector->>ReadDB: Update Read Model
Client->>OrderAggregate: PayOrder Command
OrderAggregate->>EventStore: Store OrderPaid Event
EventStore->>Projector: Propagate Event
Projector->>ReadDB: Update Read Model
Note over EventStore: Events are never deleted or modified!
Traditional CRUD vs Event Sourcing
# Traditional CRUD -- Only stores current state
orders table:
| id | user_id | status | total |
|----|---------|-----------|--------|
| 1 | 100 | SHIPPED | 50000 |
-> "When was it ordered, when was it paid, when was it shipped?" Unknown!
# Event Sourcing -- Preserves entire change history
events table:
| seq | aggregate_id | type | data | timestamp |
|-----|-------------|----------------|-------------------------|---------------------|
| 1 | order-1 | OrderCreated | {items: [...], total: 50000} | 2026-03-02 10:00 |
| 2 | order-1 | PaymentReceived| {method: "card"} | 2026-03-02 10:05 |
| 3 | order-1 | OrderShipped | {tracking: "KR123"} | 2026-03-02 14:30 |
-> The entire history can be reconstructed!
Event Sourcing Implementation
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import json
# === Event Definitions ===
@dataclass
class DomainEvent:
event_id: str
aggregate_id: str
event_type: str
data: dict
timestamp: datetime
version: int
# === Aggregate (Domain Object) ===
class OrderAggregate:
def __init__(self, order_id: str):
self.id = order_id
self.status = None
self.items = []
self.total = 0
self.version = 0
self._pending_events: List[DomainEvent] = []
# --- Command Handler ---
def create(self, user_id: str, items: list):
"""Create order command"""
if self.status is not None:
raise Exception("Order already exists")
# Create event (no state change, only record the event)
self._apply_event(DomainEvent(
event_id=str(uuid.uuid4()),
aggregate_id=self.id,
event_type='OrderCreated',
data={'user_id': user_id, 'items': items,
'total': sum(i['price'] * i['qty'] for i in items)},
timestamp=datetime.utcnow(),
version=self.version + 1,
))
def pay(self, payment_method: str):
"""Payment command"""
if self.status != 'CREATED':
raise Exception(f"Cannot process payment in state: {self.status}")
self._apply_event(DomainEvent(
event_id=str(uuid.uuid4()),
aggregate_id=self.id,
event_type='OrderPaid',
data={'payment_method': payment_method},
timestamp=datetime.utcnow(),
version=self.version + 1,
))
# --- Event Handler (state changes happen only here!) ---
def _on_order_created(self, event: DomainEvent):
self.status = 'CREATED'
self.items = event.data['items']
self.total = event.data['total']
def _on_order_paid(self, event: DomainEvent):
self.status = 'PAID'
def _apply_event(self, event: DomainEvent):
"""Apply event (state change + event storage)"""
handler = {
'OrderCreated': self._on_order_created,
'OrderPaid': self._on_order_paid,
}.get(event.event_type)
if handler:
handler(event)
self.version = event.version
self._pending_events.append(event)
@classmethod
def from_events(cls, order_id: str, events: List[DomainEvent]):
"""Restore current state from event sequence"""
aggregate = cls(order_id)
for event in events:
aggregate._apply_event(event)
aggregate._pending_events.clear() # Already persisted events
return aggregate
# === Event Store ===
class EventStore:
def __init__(self, db):
self.db = db
def save(self, aggregate: OrderAggregate):
"""Write unsaved events to the Event Store"""
for event in aggregate._pending_events:
self.db.execute(
"""INSERT INTO events
(event_id, aggregate_id, event_type, data, timestamp, version)
VALUES (%s, %s, %s, %s, %s, %s)""",
(event.event_id, event.aggregate_id, event.event_type,
json.dumps(event.data), event.timestamp, event.version)
)
aggregate._pending_events.clear()
def load(self, aggregate_id: str) -> OrderAggregate:
"""Read events from the Event Store and restore the Aggregate"""
rows = self.db.query(
"SELECT * FROM events WHERE aggregate_id = %s ORDER BY version",
(aggregate_id,)
)
events = [DomainEvent(**row) for row in rows]
return OrderAggregate.from_events(aggregate_id, events)
5. Saga Pattern -- Distributed Transaction Management
A pattern for managing transactions that span multiple services in microservices.
Choreography Saga (Event-Based)
sequenceDiagram
participant Order as Order Service
participant Kafka
participant Payment as Payment Service
participant Stock as Inventory Service
participant Notify as Notification Service
Order->>Kafka: OrderCreated
Kafka->>Payment: Receive OrderCreated
Payment->>Kafka: PaymentCompleted
Kafka->>Stock: Receive PaymentCompleted
Stock->>Kafka: StockReserved
Kafka->>Notify: Receive StockReserved
Notify->>Notify: Send notification to user
Note over Payment,Stock: What if payment fails?
Payment->>Kafka: PaymentFailed
Kafka->>Order: Receive PaymentFailed
Order->>Order: Cancel order (compensating transaction)
Orchestration Saga (Orchestrator-Based)
graph TB
Orch[Saga Orchestrator]
Orch -->|1. Create Order| Order[Order Service]
Order -->|Success| Orch
Orch -->|2. Request Payment| Payment[Payment Service]
Payment -->|Success| Orch
Orch -->|3. Deduct Stock| Stock[Inventory Service]
Stock -->|Success| Orch
Orch -->|4. Send Notification| Notify[Notification Service]
Stock -->|Failure| Orch
Orch -->|Compensate: Cancel Payment| Payment
Orch -->|Compensate: Cancel Order| Order
Comparison of Both Approaches
| Item | Choreography | Orchestration |
|---|---|---|
| Central Control | None (each service is autonomous) | Orchestrator controls |
| Coupling | Loose | Depends on orchestrator |
| Debugging | Difficult (requires event tracing) | Relatively easier |
| Complexity | Complex as services increase | Orchestrator becomes complex |
| Best Suited For | Simple flows (3-4 steps) | Complex flows (5+ steps) |
6. Practical Considerations
Idempotency
Since events can be delivered more than once, idempotency is essential:
def handle_payment_event(event: dict):
"""Ensure idempotency -- prevent duplicate event processing"""
event_id = event['event_id']
# Check if event was already processed
if is_already_processed(event_id):
logger.info(f'Event already processed: {event_id}')
return
# Process business logic
process_payment(event['data'])
# Record as processed
mark_as_processed(event_id)
Dead Letter Queue (DLQ)
Store failed events in a separate queue:
MAX_RETRIES = 3
def consume_with_retry(event: dict, retry_count: int = 0):
try:
handle_event(event)
except Exception as e:
if retry_count < MAX_RETRIES:
# Retry (exponential backoff)
time.sleep(2 ** retry_count)
consume_with_retry(event, retry_count + 1)
else:
# Send to DLQ
producer.produce(
topic='orders.dlq',
value=json.dumps({
'original_event': event,
'error': str(e),
'failed_at': datetime.utcnow().isoformat(),
}).encode('utf-8'),
)
Event Schema Evolution
# v1 event
{"event_type": "OrderCreated", "version": 1,
"data": {"order_id": "123", "total": 50000}}
# v2 event (currency field added)
{"event_type": "OrderCreated", "version": 2,
"data": {"order_id": "123", "total": 50000, "currency": "KRW"}}
# Upcaster to convert v1 -> v2
def upcast_order_created_v1_to_v2(event: dict) -> dict:
if event.get('version', 1) == 1:
event['data']['currency'] = 'KRW' # Default value
event['version'] = 2
return event
7. EDA Adoption Checklist
- Are synchronous calls between services causing failure propagation?
- Is the read/write ratio significantly different? Consider CQRS
- Do you need audit logs/history tracking? Consider Event Sourcing
- Do you have an event idempotency handling strategy?
- Are DLQ monitoring and alerting configured?
- Do you have an event schema versioning strategy?
- Are you managing distributed transactions with the Saga pattern?
- Have you set partition keys where event ordering is required?
Quiz
Q1: What is the biggest advantage of Event-Driven Architecture over synchronous approaches?
Loose coupling between services. A failure in one service does not propagate to others, and each service can scale independently.
Q2: How do consumers within the same Consumer Group behave in Kafka?
Within the same Consumer Group, each partition is assigned to only one consumer. This enables parallel processing at the partition level while guaranteeing ordering within each partition.
Q3: Why does CQRS separate the read and write models?
Because the requirements for reading and writing are different. Writes use a normalized model for data consistency, while reads use a denormalized model for optimized query performance.
Q4: How is the current state restored in Event Sourcing?
By replaying the event sequence stored in the Event Store from the beginning in order to reconstruct the current state. The Aggregate's from_events() method fulfills this role.
Q5: What is the main difference between Choreography Saga and Orchestration Saga?
Choreography has no central control -- each service autonomously exchanges events to progress. Orchestration has a central orchestrator that issues commands to each service in order.
Q6: Why is idempotency important in event processing?
The same event can be delivered multiple times due to network errors or consumer restarts. Without idempotency guarantees, issues like duplicate payments or duplicate inventory deductions can occur.
Q7: What is the role of a Dead Letter Queue (DLQ)?
It stores events that have failed processing after exceeding the maximum retry count in a separate queue, allowing them to be manually inspected and reprocessed later.
Q8: Why set the partition key to user_id in Kafka?
Events with the same user_id always go to the same partition, ensuring that events for a given user are processed in order.
Q9: Can events in Event Sourcing be modified or deleted?
In principle, no. Events are immutable, and to change the state, a new compensating event must be added. This is a core principle of Event Sourcing.
Q10: What are the drawbacks of CQRS?
Data consistency between read and write models becomes eventual consistency rather than immediate, and system complexity increases. It can be over-engineering for simple CRUD applications.