1. Why Event-Driven Architecture
Transitioning from a monolith to MSA introduces the **distributed transaction** problem:
graph LR
A[Order Service] -->|Synchronous HTTP call| B[Payment Service]
B -->|Synchronous HTTP call| C[Inventory Service]
C -->|Synchronous HTTP call| D[Shipping Service]
style A fill:#f96,stroke:#333
style D fill:#6f9,stroke:#333
**Problems with the synchronous approach:**
- If any one service fails, everything fails (cascading failure)
- Tight coupling between services
- Latency increases as the call chain grows
- Difficult to roll back on partial failure
graph LR
A[Order Service] -->|Publish event| K[Kafka]
K -->|Consume event| B[Payment Service]
K -->|Consume event| C[Inventory Service]
K -->|Consume event| D[Shipping Service]
style K fill:#ff9,stroke:#333
**Benefits of event-driven:**
- Loose coupling between services
- Reduced response time through asynchronous processing
- Independent scaling
- State recovery through event replay
2. CQRS (Command Query Responsibility Segregation)
2.1 Core Concept
A pattern that separates **Commands** (writes) from **Queries** (reads):
graph TB
Client -->|Command: Create Order| CmdAPI[Command API]
Client -->|Query: Get Order| QryAPI[Query API]
CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]
WriteDB -->|Publish event| EventBus[Kafka]
EventBus -->|Consume event| Projector[Projector]
Projector --> ReadDB[(Read DB<br/>Elasticsearch)]
QryAPI --> ReadDB
style CmdAPI fill:#f96,stroke:#333
style QryAPI fill:#6f9,stroke:#333
style EventBus fill:#ff9,stroke:#333
2.2 Why Separate?
| Aspect | Command (Write) | Query (Read) |
| ------------ | --------------------------- | -------------------------------- |
| Model | Normalized, integrity-first | Denormalized, performance-first |
| Scaling | Vertical scaling | Horizontal scaling (replication) |
| DB | PostgreSQL, MySQL | Elasticsearch, Redis, MongoDB |
| Optimization | Transaction safety | Read performance |
| Ratio | 10-20% of total | 80-90% of total |
2.3 Implementation Example
Command Side
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CreateOrderCommand:
customer_id: str
items: list[dict]
total_amount: float
class OrderCommandHandler:
def __init__(self, repo, event_publisher):
self.repo = repo
self.event_publisher = event_publisher
def handle_create_order(self, cmd: CreateOrderCommand):
order = Order(
id=str(uuid.uuid4()),
customer_id=cmd.customer_id,
items=cmd.items,
total_amount=cmd.total_amount,
status="PENDING",
created_at=datetime.utcnow()
)
Save to Write DB
self.repo.save(order)
Publish event
self.event_publisher.publish("order.created", {
"order_id": order.id,
"customer_id": order.customer_id,
"total_amount": order.total_amount,
"items": order.items,
})
return order.id
Query Side
class OrderQueryService:
def __init__(self, read_repo):
self.read_repo = read_repo # Elasticsearch
def get_order(self, order_id: str):
return self.read_repo.find_by_id(order_id)
def search_orders(self, customer_id: str, status: str = None):
return self.read_repo.search(customer_id=customer_id, status=status)
Projector (Event -> Read Model synchronization)
class OrderProjector:
def __init__(self, read_repo):
self.read_repo = read_repo
def on_order_created(self, event):
self.read_repo.upsert({
"id": event["order_id"],
"customer_id": event["customer_id"],
"total_amount": event["total_amount"],
"status": "PENDING",
"items": event["items"],
})
def on_order_paid(self, event):
self.read_repo.update(event["order_id"], {"status": "PAID"})
3. Saga Pattern: Distributed Transaction Management
3.1 Choreography vs Orchestration
graph TB
subgraph "Choreography (Event-based)"
O1[Order] -->|OrderCreated| P1[Payment]
P1 -->|PaymentCompleted| S1[Inventory]
S1 -->|StockReserved| D1[Shipping]
D1 -->|DeliveryStarted| O1
end
graph TB
subgraph "Orchestration (Central coordination)"
Orch[Saga Orchestrator]
Orch -->|1. Payment request| P2[Payment]
P2 -->|Payment complete| Orch
Orch -->|2. Reserve inventory| S2[Inventory]
S2 -->|Reservation complete| Orch
Orch -->|3. Shipping request| D2[Shipping]
D2 -->|Shipping started| Orch
end
| Approach | Pros | Cons |
| ------------- | -------------------------- | ------------------------------------------------------ |
| Choreography | Simple, loose coupling | Hard to follow flow, risk of circular dependency |
| Orchestration | Clear flow, easy to manage | Centralized, Orchestrator is a single point of failure |
3.2 Orchestration Saga Implementation
from enum import Enum
from kafka import KafkaProducer, KafkaConsumer
class SagaStep(Enum):
PAYMENT = "payment"
INVENTORY = "inventory"
DELIVERY = "delivery"
class OrderSagaOrchestrator:
STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]
def __init__(self, producer: KafkaProducer):
self.producer = producer
self.saga_state = {} # order_id -> {step, status}
def start_saga(self, order_id: str, order_data: dict):
self.saga_state[order_id] = {
"current_step": 0,
"data": order_data,
"completed_steps": [],
}
self._execute_step(order_id)
def _execute_step(self, order_id: str):
state = self.saga_state[order_id]
step_idx = state["current_step"]
if step_idx >= len(self.STEPS):
All steps completed
self._publish(f"order.completed", {"order_id": order_id})
return
step = self.STEPS[step_idx]
self._publish(f"{step.value}.request", {
"order_id": order_id,
"saga_id": order_id,
**state["data"]
})
def handle_step_success(self, order_id: str, step: SagaStep):
state = self.saga_state[order_id]
state["completed_steps"].append(step)
state["current_step"] += 1
self._execute_step(order_id)
def handle_step_failure(self, order_id: str, failed_step: SagaStep):
"""Execute compensating transactions (reverse order)"""
state = self.saga_state[order_id]
for step in reversed(state["completed_steps"]):
self._publish(f"{step.value}.compensate", {
"order_id": order_id,
})
self._publish("order.failed", {
"order_id": order_id,
"failed_at": failed_step.value,
})
def _publish(self, topic: str, data: dict):
self.producer.send(topic, json.dumps(data).encode())
3.3 Compensating Transaction Example
sequenceDiagram
participant O as Orchestrator
participant P as Payment Service
participant I as Inventory Service
participant D as Shipping Service
O->>P: 1. Payment request
P-->>O: Payment success
O->>I: 2. Reserve inventory
I-->>O: Reservation success
O->>D: 3. Shipping request
D-->>O: Shipping failed (address error)
Note over O: Start compensating transactions (reverse order)
O->>I: Cancel inventory reservation
I-->>O: Cancellation complete
O->>P: Refund payment
P-->>O: Refund complete
O->>O: Mark order as failed
4. Event Sourcing
4.1 Concept
Instead of storing state directly, store **state change events**:
Traditional approach:
orders table: {id: 1, status: "SHIPPED", amount: 50000}
Event Sourcing:
events table:
1. OrderCreated {amount: 50000}
2. PaymentReceived {payment_id: "pay_123"}
3. StockReserved {warehouse: "seoul"}
4. OrderShipped {tracking: "KR12345"}
-> Replay events in order to reconstruct current state
4.2 Implementation
class EventStore:
def __init__(self, db):
self.db = db
def append(self, aggregate_id: str, event_type: str, data: dict, version: int):
self.db.execute("""
INSERT INTO events (aggregate_id, event_type, data, version, created_at)
VALUES (%s, %s, %s, %s, NOW())
""", (aggregate_id, event_type, json.dumps(data), version))
def get_events(self, aggregate_id: str, after_version: int = 0):
return self.db.query("""
SELECT event_type, data, version
FROM events
WHERE aggregate_id = %s AND version > %s
ORDER BY version
""", (aggregate_id, after_version))
class Order:
def __init__(self):
self.id = None
self.status = None
self.version = 0
self._pending_events = []
def apply_event(self, event_type: str, data: dict):
if event_type == "OrderCreated":
self.id = data["order_id"]
self.status = "PENDING"
elif event_type == "PaymentReceived":
self.status = "PAID"
elif event_type == "OrderShipped":
self.status = "SHIPPED"
self.version += 1
@classmethod
def from_events(cls, events):
order = cls()
for e in events:
order.apply_event(e["event_type"], e["data"])
return order
5. Full Architecture with Kafka
graph TB
Client[Client] --> Gateway[API Gateway]
Gateway --> OrderCmd[Order Command API]
Gateway --> OrderQry[Order Query API]
OrderCmd --> WriteDB[(PostgreSQL)]
WriteDB --> CDC[Debezium CDC]
CDC --> Kafka[Apache Kafka]
Kafka --> Saga[Saga Orchestrator]
Kafka --> Projector[CQRS Projector]
Saga --> PaymentSvc[Payment Service]
Saga --> InventorySvc[Inventory Service]
Saga --> DeliverySvc[Shipping Service]
Projector --> ReadDB[(Elasticsearch)]
OrderQry --> ReadDB
PaymentSvc --> Kafka
InventorySvc --> Kafka
DeliverySvc --> Kafka
style Kafka fill:#ff9,stroke:#333
style Saga fill:#f96,stroke:#333
6. Practical Considerations
6.1 Idempotency
Preventing duplicate event processing
class IdempotentConsumer:
def __init__(self, redis_client):
self.redis = redis_client
def process_event(self, event_id: str, handler):
key = f"processed:{event_id}"
if self.redis.setnx(key, "1"):
self.redis.expire(key, 86400) # 24-hour TTL
handler()
else:
pass # Already processed, skip
6.2 Event Ordering Guarantees
Guarantee ordering with Kafka partition keys
producer.send(
"order.events",
key=order_id.encode(), # Same order -> same partition -> order guaranteed
value=json.dumps(event).encode()
)
7. Quiz
Because the **requirements for reads and writes are fundamentally different**. Writes need normalization + transactions, while reads need denormalization + performance optimization. Each can use its own optimized storage and model.
**Choreography**: Each service autonomously publishes/subscribes to events. **Orchestration**: A central Orchestrator explicitly commands each service. Orchestration offers better flow visibility and easier management.
When a step in the Saga fails, it **reverses already completed steps in reverse order**. For example: Shipping fails, then cancel inventory reservation, then refund payment.
(1) Complete **audit trail** (2) **Point-in-time recovery** through event replay (3) Ability to **build new Read Models from historical events**.
Due to network failures, retries, and rebalancing, **the same event may be delivered multiple times**. Without idempotency, serious issues like duplicate payments or duplicate inventory deductions can occur.
Use the **same partition key**. Messages with the same key are delivered to the same partition, guaranteeing order. For example, use order_id as the partition key.
**Eventually Consistent** — eventual consistency. Data may not be up-to-date when queried. Solutions: (1) Read directly from the Write DB immediately after writing (2) Respond only after confirming event processing is complete.
Quiz
Q1: What is the main topic covered in "Event-Driven Architecture + CQRS + Saga Pattern: A
Practical MSA Design Guide"?
A practical design guide combining Event-Driven Architecture, CQRS (Command Query Responsibility
Segregation), and the Saga pattern in an MSA environment. Includes Kafka-based implementation
code, Mermaid diagrams, and an order-payment-shipping system case study.
Transitioning from a monolith to MSA introduces the distributed transaction problem: Problems with
the synchronous approach: If any one service fails, everything fails (cascading failure) Tight
coupling between services Latency increases as the call chain grows Difficult to roll...
Q3: Explain the core concept of CQRS (Command Query Responsibility Segregation).
2.1 Core Concept A pattern that separates Commands (writes) from Queries (reads): 2.2 Why
Separate? 2.3 Implementation Example
Q4: What are the key aspects of Saga Pattern: Distributed Transaction Management?
3.1 Choreography vs Orchestration 3.2 Orchestration Saga Implementation 3.3 Compensating
Transaction Example
4.1 Concept Instead of storing state directly, store state change events: 4.2 Implementation
현재 단락 (1/278)
Transitioning from a monolith to MSA introduces the **distributed transaction** problem: