- Authors
- Name
- 1. Why Event-Driven Architecture
- 2. CQRS (Command Query Responsibility Segregation)
- 3. Saga Pattern: Distributed Transaction Management
- 4. Event Sourcing
- 5. Full Architecture with Kafka
- 6. Practical Considerations
- 7. Quiz
1. Why Event-Driven Architecture
Transitioning from a monolith to MSA introduces the distributed transaction problem:
graph LR
A[Order Service] -->|Synchronous HTTP call| B[Payment Service]
B -->|Synchronous HTTP call| C[Inventory Service]
C -->|Synchronous HTTP call| D[Shipping Service]
style A fill:#f96,stroke:#333
style D fill:#6f9,stroke:#333
Problems with the synchronous approach:
- If any one service fails, everything fails (cascading failure)
- Tight coupling between services
- Latency increases as the call chain grows
- Difficult to roll back on partial failure
graph LR
A[Order Service] -->|Publish event| K[Kafka]
K -->|Consume event| B[Payment Service]
K -->|Consume event| C[Inventory Service]
K -->|Consume event| D[Shipping Service]
style K fill:#ff9,stroke:#333
Benefits of event-driven:
- Loose coupling between services
- Reduced response time through asynchronous processing
- Independent scaling
- State recovery through event replay
2. CQRS (Command Query Responsibility Segregation)
2.1 Core Concept
A pattern that separates Commands (writes) from Queries (reads):
graph TB
Client -->|Command: Create Order| CmdAPI[Command API]
Client -->|Query: Get Order| QryAPI[Query API]
CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]
WriteDB -->|Publish event| EventBus[Kafka]
EventBus -->|Consume event| Projector[Projector]
Projector --> ReadDB[(Read DB<br/>Elasticsearch)]
QryAPI --> ReadDB
style CmdAPI fill:#f96,stroke:#333
style QryAPI fill:#6f9,stroke:#333
style EventBus fill:#ff9,stroke:#333
2.2 Why Separate?
| Aspect | Command (Write) | Query (Read) |
|---|---|---|
| Model | Normalized, integrity-first | Denormalized, performance-first |
| Scaling | Vertical scaling | Horizontal scaling (replication) |
| DB | PostgreSQL, MySQL | Elasticsearch, Redis, MongoDB |
| Optimization | Transaction safety | Read performance |
| Ratio | 10-20% of total | 80-90% of total |
2.3 Implementation Example
# Command Side
from dataclasses import dataclass
from datetime import datetime
import uuid
@dataclass
class CreateOrderCommand:
customer_id: str
items: list[dict]
total_amount: float
class OrderCommandHandler:
def __init__(self, repo, event_publisher):
self.repo = repo
self.event_publisher = event_publisher
def handle_create_order(self, cmd: CreateOrderCommand):
order = Order(
id=str(uuid.uuid4()),
customer_id=cmd.customer_id,
items=cmd.items,
total_amount=cmd.total_amount,
status="PENDING",
created_at=datetime.utcnow()
)
# Save to Write DB
self.repo.save(order)
# Publish event
self.event_publisher.publish("order.created", {
"order_id": order.id,
"customer_id": order.customer_id,
"total_amount": order.total_amount,
"items": order.items,
})
return order.id
# Query Side
class OrderQueryService:
def __init__(self, read_repo):
self.read_repo = read_repo # Elasticsearch
def get_order(self, order_id: str):
return self.read_repo.find_by_id(order_id)
def search_orders(self, customer_id: str, status: str = None):
return self.read_repo.search(customer_id=customer_id, status=status)
# Projector (Event -> Read Model synchronization)
class OrderProjector:
def __init__(self, read_repo):
self.read_repo = read_repo
def on_order_created(self, event):
self.read_repo.upsert({
"id": event["order_id"],
"customer_id": event["customer_id"],
"total_amount": event["total_amount"],
"status": "PENDING",
"items": event["items"],
})
def on_order_paid(self, event):
self.read_repo.update(event["order_id"], {"status": "PAID"})
3. Saga Pattern: Distributed Transaction Management
3.1 Choreography vs Orchestration
graph TB
subgraph "Choreography (Event-based)"
O1[Order] -->|OrderCreated| P1[Payment]
P1 -->|PaymentCompleted| S1[Inventory]
S1 -->|StockReserved| D1[Shipping]
D1 -->|DeliveryStarted| O1
end
graph TB
subgraph "Orchestration (Central coordination)"
Orch[Saga Orchestrator]
Orch -->|1. Payment request| P2[Payment]
P2 -->|Payment complete| Orch
Orch -->|2. Reserve inventory| S2[Inventory]
S2 -->|Reservation complete| Orch
Orch -->|3. Shipping request| D2[Shipping]
D2 -->|Shipping started| Orch
end
| Approach | Pros | Cons |
|---|---|---|
| Choreography | Simple, loose coupling | Hard to follow flow, risk of circular dependency |
| Orchestration | Clear flow, easy to manage | Centralized, Orchestrator is a single point of failure |
3.2 Orchestration Saga Implementation
from enum import Enum
from kafka import KafkaProducer, KafkaConsumer
import json
class SagaStep(Enum):
PAYMENT = "payment"
INVENTORY = "inventory"
DELIVERY = "delivery"
class OrderSagaOrchestrator:
STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]
def __init__(self, producer: KafkaProducer):
self.producer = producer
self.saga_state = {} # order_id -> {step, status}
def start_saga(self, order_id: str, order_data: dict):
self.saga_state[order_id] = {
"current_step": 0,
"data": order_data,
"completed_steps": [],
}
self._execute_step(order_id)
def _execute_step(self, order_id: str):
state = self.saga_state[order_id]
step_idx = state["current_step"]
if step_idx >= len(self.STEPS):
# All steps completed
self._publish(f"order.completed", {"order_id": order_id})
return
step = self.STEPS[step_idx]
self._publish(f"{step.value}.request", {
"order_id": order_id,
"saga_id": order_id,
**state["data"]
})
def handle_step_success(self, order_id: str, step: SagaStep):
state = self.saga_state[order_id]
state["completed_steps"].append(step)
state["current_step"] += 1
self._execute_step(order_id)
def handle_step_failure(self, order_id: str, failed_step: SagaStep):
"""Execute compensating transactions (reverse order)"""
state = self.saga_state[order_id]
for step in reversed(state["completed_steps"]):
self._publish(f"{step.value}.compensate", {
"order_id": order_id,
})
self._publish("order.failed", {
"order_id": order_id,
"failed_at": failed_step.value,
})
def _publish(self, topic: str, data: dict):
self.producer.send(topic, json.dumps(data).encode())
3.3 Compensating Transaction Example
sequenceDiagram
participant O as Orchestrator
participant P as Payment Service
participant I as Inventory Service
participant D as Shipping Service
O->>P: 1. Payment request
P-->>O: Payment success
O->>I: 2. Reserve inventory
I-->>O: Reservation success
O->>D: 3. Shipping request
D-->>O: Shipping failed (address error)
Note over O: Start compensating transactions (reverse order)
O->>I: Cancel inventory reservation
I-->>O: Cancellation complete
O->>P: Refund payment
P-->>O: Refund complete
O->>O: Mark order as failed
4. Event Sourcing
4.1 Concept
Instead of storing state directly, store state change events:
Traditional approach:
orders table: {id: 1, status: "SHIPPED", amount: 50000}
Event Sourcing:
events table:
1. OrderCreated {amount: 50000}
2. PaymentReceived {payment_id: "pay_123"}
3. StockReserved {warehouse: "seoul"}
4. OrderShipped {tracking: "KR12345"}
-> Replay events in order to reconstruct current state
4.2 Implementation
class EventStore:
def __init__(self, db):
self.db = db
def append(self, aggregate_id: str, event_type: str, data: dict, version: int):
self.db.execute("""
INSERT INTO events (aggregate_id, event_type, data, version, created_at)
VALUES (%s, %s, %s, %s, NOW())
""", (aggregate_id, event_type, json.dumps(data), version))
def get_events(self, aggregate_id: str, after_version: int = 0):
return self.db.query("""
SELECT event_type, data, version
FROM events
WHERE aggregate_id = %s AND version > %s
ORDER BY version
""", (aggregate_id, after_version))
class Order:
def __init__(self):
self.id = None
self.status = None
self.version = 0
self._pending_events = []
def apply_event(self, event_type: str, data: dict):
if event_type == "OrderCreated":
self.id = data["order_id"]
self.status = "PENDING"
elif event_type == "PaymentReceived":
self.status = "PAID"
elif event_type == "OrderShipped":
self.status = "SHIPPED"
self.version += 1
@classmethod
def from_events(cls, events):
order = cls()
for e in events:
order.apply_event(e["event_type"], e["data"])
return order
5. Full Architecture with Kafka
graph TB
Client[Client] --> Gateway[API Gateway]
Gateway --> OrderCmd[Order Command API]
Gateway --> OrderQry[Order Query API]
OrderCmd --> WriteDB[(PostgreSQL)]
WriteDB --> CDC[Debezium CDC]
CDC --> Kafka[Apache Kafka]
Kafka --> Saga[Saga Orchestrator]
Kafka --> Projector[CQRS Projector]
Saga --> PaymentSvc[Payment Service]
Saga --> InventorySvc[Inventory Service]
Saga --> DeliverySvc[Shipping Service]
Projector --> ReadDB[(Elasticsearch)]
OrderQry --> ReadDB
PaymentSvc --> Kafka
InventorySvc --> Kafka
DeliverySvc --> Kafka
style Kafka fill:#ff9,stroke:#333
style Saga fill:#f96,stroke:#333
6. Practical Considerations
6.1 Idempotency
# Preventing duplicate event processing
class IdempotentConsumer:
def __init__(self, redis_client):
self.redis = redis_client
def process_event(self, event_id: str, handler):
key = f"processed:{event_id}"
if self.redis.setnx(key, "1"):
self.redis.expire(key, 86400) # 24-hour TTL
handler()
else:
pass # Already processed, skip
6.2 Event Ordering Guarantees
# Guarantee ordering with Kafka partition keys
producer.send(
"order.events",
key=order_id.encode(), # Same order -> same partition -> order guaranteed
value=json.dumps(event).encode()
)
7. Quiz
Q1. What is the primary reason for separating Command and Query in CQRS?
Because the requirements for reads and writes are fundamentally different. Writes need normalization + transactions, while reads need denormalization + performance optimization. Each can use its own optimized storage and model.
Q2. What is the difference between Saga Choreography and Orchestration?
Choreography: Each service autonomously publishes/subscribes to events. Orchestration: A central Orchestrator explicitly commands each service. Orchestration offers better flow visibility and easier management.
Q3. What is a compensating transaction?
When a step in the Saga fails, it reverses already completed steps in reverse order. For example: Shipping fails, then cancel inventory reservation, then refund payment.
Q4. What is the biggest advantage of Event Sourcing?
(1) Complete audit trail (2) Point-in-time recovery through event replay (3) Ability to build new Read Models from historical events.
Q5. Why is idempotency important in event processing?
Due to network failures, retries, and rebalancing, the same event may be delivered multiple times. Without idempotency, serious issues like duplicate payments or duplicate inventory deductions can occur.
Q6. How do you guarantee event ordering in Kafka?
Use the same partition key. Messages with the same key are delivered to the same partition, guaranteeing order. For example, use order_id as the partition key.
Q7. What about the problem where the Read DB updates later than the Write DB in CQRS?
Eventually Consistent — eventual consistency. Data may not be up-to-date when queried. Solutions: (1) Read directly from the Write DB immediately after writing (2) Respond only after confirming event processing is complete.