Skip to content

필사 모드: Event-Driven Architecture + CQRS + Saga Pattern: A Practical MSA Design Guide

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

1. Why Event-Driven Architecture

Transitioning from a monolith to MSA introduces the **distributed transaction** problem:

graph LR

A[Order Service] -->|Synchronous HTTP call| B[Payment Service]

B -->|Synchronous HTTP call| C[Inventory Service]

C -->|Synchronous HTTP call| D[Shipping Service]

style A fill:#f96,stroke:#333

style D fill:#6f9,stroke:#333

**Problems with the synchronous approach:**

- If any one service fails, everything fails (cascading failure)

- Tight coupling between services

- Latency increases as the call chain grows

- Difficult to roll back on partial failure

graph LR

A[Order Service] -->|Publish event| K[Kafka]

K -->|Consume event| B[Payment Service]

K -->|Consume event| C[Inventory Service]

K -->|Consume event| D[Shipping Service]

style K fill:#ff9,stroke:#333

**Benefits of event-driven:**

- Loose coupling between services

- Reduced response time through asynchronous processing

- Independent scaling

- State recovery through event replay

2. CQRS (Command Query Responsibility Segregation)

2.1 Core Concept

A pattern that separates **Commands** (writes) from **Queries** (reads):

graph TB

Client -->|Command: Create Order| CmdAPI[Command API]

Client -->|Query: Get Order| QryAPI[Query API]

CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]

WriteDB -->|Publish event| EventBus[Kafka]

EventBus -->|Consume event| Projector[Projector]

Projector --> ReadDB[(Read DB<br/>Elasticsearch)]

QryAPI --> ReadDB

style CmdAPI fill:#f96,stroke:#333

style QryAPI fill:#6f9,stroke:#333

style EventBus fill:#ff9,stroke:#333

2.2 Why Separate?

| Aspect | Command (Write) | Query (Read) |

| ------------ | --------------------------- | -------------------------------- |

| Model | Normalized, integrity-first | Denormalized, performance-first |

| Scaling | Vertical scaling | Horizontal scaling (replication) |

| DB | PostgreSQL, MySQL | Elasticsearch, Redis, MongoDB |

| Optimization | Transaction safety | Read performance |

| Ratio | 10-20% of total | 80-90% of total |

2.3 Implementation Example

Command Side

from dataclasses import dataclass

from datetime import datetime

@dataclass

class CreateOrderCommand:

customer_id: str

items: list[dict]

total_amount: float

class OrderCommandHandler:

def __init__(self, repo, event_publisher):

self.repo = repo

self.event_publisher = event_publisher

def handle_create_order(self, cmd: CreateOrderCommand):

order = Order(

id=str(uuid.uuid4()),

customer_id=cmd.customer_id,

items=cmd.items,

total_amount=cmd.total_amount,

status="PENDING",

created_at=datetime.utcnow()

)

Save to Write DB

self.repo.save(order)

Publish event

self.event_publisher.publish("order.created", {

"order_id": order.id,

"customer_id": order.customer_id,

"total_amount": order.total_amount,

"items": order.items,

})

return order.id

Query Side

class OrderQueryService:

def __init__(self, read_repo):

self.read_repo = read_repo # Elasticsearch

def get_order(self, order_id: str):

return self.read_repo.find_by_id(order_id)

def search_orders(self, customer_id: str, status: str = None):

return self.read_repo.search(customer_id=customer_id, status=status)

Projector (Event -> Read Model synchronization)

class OrderProjector:

def __init__(self, read_repo):

self.read_repo = read_repo

def on_order_created(self, event):

self.read_repo.upsert({

"id": event["order_id"],

"customer_id": event["customer_id"],

"total_amount": event["total_amount"],

"status": "PENDING",

"items": event["items"],

})

def on_order_paid(self, event):

self.read_repo.update(event["order_id"], {"status": "PAID"})

3. Saga Pattern: Distributed Transaction Management

3.1 Choreography vs Orchestration

graph TB

subgraph "Choreography (Event-based)"

O1[Order] -->|OrderCreated| P1[Payment]

P1 -->|PaymentCompleted| S1[Inventory]

S1 -->|StockReserved| D1[Shipping]

D1 -->|DeliveryStarted| O1

end

graph TB

subgraph "Orchestration (Central coordination)"

Orch[Saga Orchestrator]

Orch -->|1. Payment request| P2[Payment]

P2 -->|Payment complete| Orch

Orch -->|2. Reserve inventory| S2[Inventory]

S2 -->|Reservation complete| Orch

Orch -->|3. Shipping request| D2[Shipping]

D2 -->|Shipping started| Orch

end

| Approach | Pros | Cons |

| ------------- | -------------------------- | ------------------------------------------------------ |

| Choreography | Simple, loose coupling | Hard to follow flow, risk of circular dependency |

| Orchestration | Clear flow, easy to manage | Centralized, Orchestrator is a single point of failure |

3.2 Orchestration Saga Implementation

from enum import Enum

from kafka import KafkaProducer, KafkaConsumer

class SagaStep(Enum):

PAYMENT = "payment"

INVENTORY = "inventory"

DELIVERY = "delivery"

class OrderSagaOrchestrator:

STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]

def __init__(self, producer: KafkaProducer):

self.producer = producer

self.saga_state = {} # order_id -> {step, status}

def start_saga(self, order_id: str, order_data: dict):

self.saga_state[order_id] = {

"current_step": 0,

"data": order_data,

"completed_steps": [],

}

self._execute_step(order_id)

def _execute_step(self, order_id: str):

state = self.saga_state[order_id]

step_idx = state["current_step"]

if step_idx >= len(self.STEPS):

All steps completed

self._publish(f"order.completed", {"order_id": order_id})

return

step = self.STEPS[step_idx]

self._publish(f"{step.value}.request", {

"order_id": order_id,

"saga_id": order_id,

**state["data"]

})

def handle_step_success(self, order_id: str, step: SagaStep):

state = self.saga_state[order_id]

state["completed_steps"].append(step)

state["current_step"] += 1

self._execute_step(order_id)

def handle_step_failure(self, order_id: str, failed_step: SagaStep):

"""Execute compensating transactions (reverse order)"""

state = self.saga_state[order_id]

for step in reversed(state["completed_steps"]):

self._publish(f"{step.value}.compensate", {

"order_id": order_id,

})

self._publish("order.failed", {

"order_id": order_id,

"failed_at": failed_step.value,

})

def _publish(self, topic: str, data: dict):

self.producer.send(topic, json.dumps(data).encode())

3.3 Compensating Transaction Example

sequenceDiagram

participant O as Orchestrator

participant P as Payment Service

participant I as Inventory Service

participant D as Shipping Service

O->>P: 1. Payment request

P-->>O: Payment success

O->>I: 2. Reserve inventory

I-->>O: Reservation success

O->>D: 3. Shipping request

D-->>O: Shipping failed (address error)

Note over O: Start compensating transactions (reverse order)

O->>I: Cancel inventory reservation

I-->>O: Cancellation complete

O->>P: Refund payment

P-->>O: Refund complete

O->>O: Mark order as failed

4. Event Sourcing

4.1 Concept

Instead of storing state directly, store **state change events**:

Traditional approach:

orders table: {id: 1, status: "SHIPPED", amount: 50000}

Event Sourcing:

events table:

1. OrderCreated {amount: 50000}

2. PaymentReceived {payment_id: "pay_123"}

3. StockReserved {warehouse: "seoul"}

4. OrderShipped {tracking: "KR12345"}

-> Replay events in order to reconstruct current state

4.2 Implementation

class EventStore:

def __init__(self, db):

self.db = db

def append(self, aggregate_id: str, event_type: str, data: dict, version: int):

self.db.execute("""

INSERT INTO events (aggregate_id, event_type, data, version, created_at)

VALUES (%s, %s, %s, %s, NOW())

""", (aggregate_id, event_type, json.dumps(data), version))

def get_events(self, aggregate_id: str, after_version: int = 0):

return self.db.query("""

SELECT event_type, data, version

FROM events

WHERE aggregate_id = %s AND version > %s

ORDER BY version

""", (aggregate_id, after_version))

class Order:

def __init__(self):

self.id = None

self.status = None

self.version = 0

self._pending_events = []

def apply_event(self, event_type: str, data: dict):

if event_type == "OrderCreated":

self.id = data["order_id"]

self.status = "PENDING"

elif event_type == "PaymentReceived":

self.status = "PAID"

elif event_type == "OrderShipped":

self.status = "SHIPPED"

self.version += 1

@classmethod

def from_events(cls, events):

order = cls()

for e in events:

order.apply_event(e["event_type"], e["data"])

return order

5. Full Architecture with Kafka

graph TB

Client[Client] --> Gateway[API Gateway]

Gateway --> OrderCmd[Order Command API]

Gateway --> OrderQry[Order Query API]

OrderCmd --> WriteDB[(PostgreSQL)]

WriteDB --> CDC[Debezium CDC]

CDC --> Kafka[Apache Kafka]

Kafka --> Saga[Saga Orchestrator]

Kafka --> Projector[CQRS Projector]

Saga --> PaymentSvc[Payment Service]

Saga --> InventorySvc[Inventory Service]

Saga --> DeliverySvc[Shipping Service]

Projector --> ReadDB[(Elasticsearch)]

OrderQry --> ReadDB

PaymentSvc --> Kafka

InventorySvc --> Kafka

DeliverySvc --> Kafka

style Kafka fill:#ff9,stroke:#333

style Saga fill:#f96,stroke:#333

6. Practical Considerations

6.1 Idempotency

Preventing duplicate event processing

class IdempotentConsumer:

def __init__(self, redis_client):

self.redis = redis_client

def process_event(self, event_id: str, handler):

key = f"processed:{event_id}"

if self.redis.setnx(key, "1"):

self.redis.expire(key, 86400) # 24-hour TTL

handler()

else:

pass # Already processed, skip

6.2 Event Ordering Guarantees

Guarantee ordering with Kafka partition keys

producer.send(

"order.events",

key=order_id.encode(), # Same order -> same partition -> order guaranteed

value=json.dumps(event).encode()

)

7. Quiz

Because the **requirements for reads and writes are fundamentally different**. Writes need normalization + transactions, while reads need denormalization + performance optimization. Each can use its own optimized storage and model.

**Choreography**: Each service autonomously publishes/subscribes to events. **Orchestration**: A central Orchestrator explicitly commands each service. Orchestration offers better flow visibility and easier management.

When a step in the Saga fails, it **reverses already completed steps in reverse order**. For example: Shipping fails, then cancel inventory reservation, then refund payment.

(1) Complete **audit trail** (2) **Point-in-time recovery** through event replay (3) Ability to **build new Read Models from historical events**.

Due to network failures, retries, and rebalancing, **the same event may be delivered multiple times**. Without idempotency, serious issues like duplicate payments or duplicate inventory deductions can occur.

Use the **same partition key**. Messages with the same key are delivered to the same partition, guaranteeing order. For example, use order_id as the partition key.

**Eventually Consistent** — eventual consistency. Data may not be up-to-date when queried. Solutions: (1) Read directly from the Write DB immediately after writing (2) Respond only after confirming event processing is complete.

Quiz

Q1: What is the main topic covered in "Event-Driven Architecture + CQRS + Saga Pattern: A

Practical MSA Design Guide"?

A practical design guide combining Event-Driven Architecture, CQRS (Command Query Responsibility

Segregation), and the Saga pattern in an MSA environment. Includes Kafka-based implementation

code, Mermaid diagrams, and an order-payment-shipping system case study.

Transitioning from a monolith to MSA introduces the distributed transaction problem: Problems with

the synchronous approach: If any one service fails, everything fails (cascading failure) Tight

coupling between services Latency increases as the call chain grows Difficult to roll...

Q3: Explain the core concept of CQRS (Command Query Responsibility Segregation).

2.1 Core Concept A pattern that separates Commands (writes) from Queries (reads): 2.2 Why

Separate? 2.3 Implementation Example

Q4: What are the key aspects of Saga Pattern: Distributed Transaction Management?

3.1 Choreography vs Orchestration 3.2 Orchestration Saga Implementation 3.3 Compensating

Transaction Example

4.1 Concept Instead of storing state directly, store state change events: 4.2 Implementation

현재 단락 (1/278)

Transitioning from a monolith to MSA introduces the **distributed transaction** problem:

작성 글자: 0원문 글자: 10,645작성 단락: 0/278