Skip to content

필사 모드: 이벤트 드리븐 아키텍처 + CQRS + Saga 패턴: MSA 실전 설계 가이드

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

1. 왜 이벤트 드리븐 아키텍처인가

모놀리식에서 MSA로 전환하면 **분산 트랜잭션** 문제가 발생한다:

graph LR

A[주문 서비스] -->|HTTP 동기 호출| B[결제 서비스]

B -->|HTTP 동기 호출| C[재고 서비스]

C -->|HTTP 동기 호출| D[배송 서비스]

style A fill:#f96,stroke:#333

style D fill:#6f9,stroke:#333

**동기 방식의 문제:**

- 하나라도 실패하면 전체 실패 (cascading failure)

- 서비스 간 강결합

- 호출 체인이 길어질수록 지연 시간 증가

- 부분 실패 시 롤백 어려움

graph LR

A[주문 서비스] -->|이벤트 발행| K[Kafka]

K -->|이벤트 소비| B[결제 서비스]

K -->|이벤트 소비| C[재고 서비스]

K -->|이벤트 소비| D[배송 서비스]

style K fill:#ff9,stroke:#333

**이벤트 드리븐의 장점:**

- 서비스 간 느슨한 결합 (loose coupling)

- 비동기 처리로 응답 시간 단축

- 독립적 확장 가능

- 이벤트 리플레이로 상태 복구 가능

2. CQRS (Command Query Responsibility Segregation)

2.1 핵심 개념

**명령(Command)**과 **조회(Query)**를 분리하는 패턴:

graph TB

Client -->|Command: 주문 생성| CmdAPI[Command API]

Client -->|Query: 주문 조회| QryAPI[Query API]

CmdAPI --> WriteDB[(Write DB<br/>PostgreSQL)]

WriteDB -->|이벤트 발행| EventBus[Kafka]

EventBus -->|이벤트 소비| Projector[Projector]

Projector --> ReadDB[(Read DB<br/>Elasticsearch)]

QryAPI --> ReadDB

style CmdAPI fill:#f96,stroke:#333

style QryAPI fill:#6f9,stroke:#333

style EventBus fill:#ff9,stroke:#333

2.2 왜 분리하는가?

| 측면 | Command (쓰기) | Query (읽기) |

| ------ | ------------------- | ----------------------------- |

| 모델 | 정규화, 무결성 중심 | 비정규화, 성능 중심 |

| 확장 | 수직 확장 | 수평 확장 (복제) |

| DB | PostgreSQL, MySQL | Elasticsearch, Redis, MongoDB |

| 최적화 | 트랜잭션 안전성 | 읽기 성능 |

| 비율 | 전체의 10~20% | 전체의 80~90% |

2.3 구현 예제

Command Side

from dataclasses import dataclass

from datetime import datetime

@dataclass

class CreateOrderCommand:

customer_id: str

items: list[dict]

total_amount: float

class OrderCommandHandler:

def __init__(self, repo, event_publisher):

self.repo = repo

self.event_publisher = event_publisher

def handle_create_order(self, cmd: CreateOrderCommand):

order = Order(

id=str(uuid.uuid4()),

customer_id=cmd.customer_id,

items=cmd.items,

total_amount=cmd.total_amount,

status="PENDING",

created_at=datetime.utcnow()

)

Write DB에 저장

self.repo.save(order)

이벤트 발행

self.event_publisher.publish("order.created", {

"order_id": order.id,

"customer_id": order.customer_id,

"total_amount": order.total_amount,

"items": order.items,

})

return order.id

Query Side

class OrderQueryService:

def __init__(self, read_repo):

self.read_repo = read_repo # Elasticsearch

def get_order(self, order_id: str):

return self.read_repo.find_by_id(order_id)

def search_orders(self, customer_id: str, status: str = None):

return self.read_repo.search(customer_id=customer_id, status=status)

Projector (이벤트 → Read Model 동기화)

class OrderProjector:

def __init__(self, read_repo):

self.read_repo = read_repo

def on_order_created(self, event):

self.read_repo.upsert({

"id": event["order_id"],

"customer_id": event["customer_id"],

"total_amount": event["total_amount"],

"status": "PENDING",

"items": event["items"],

})

def on_order_paid(self, event):

self.read_repo.update(event["order_id"], {"status": "PAID"})

3. Saga 패턴: 분산 트랜잭션 관리

3.1 Choreography vs Orchestration

graph TB

subgraph "Choreography (이벤트 기반)"

O1[주문] -->|OrderCreated| P1[결제]

P1 -->|PaymentCompleted| S1[재고]

S1 -->|StockReserved| D1[배송]

D1 -->|DeliveryStarted| O1

end

graph TB

subgraph "Orchestration (중앙 조율)"

Orch[Saga Orchestrator]

Orch -->|1. 결제 요청| P2[결제]

P2 -->|결제 완료| Orch

Orch -->|2. 재고 예약| S2[재고]

S2 -->|예약 완료| Orch

Orch -->|3. 배송 요청| D2[배송]

D2 -->|배송 시작| Orch

end

| 방식 | 장점 | 단점 |

| ------------- | -------------------- | ----------------------------------- |

| Choreography | 단순, 느슨한 결합 | 흐름 파악 어려움, 순환 의존성 위험 |

| Orchestration | 흐름 명확, 관리 용이 | 중앙 집중, Orchestrator 단일 장애점 |

3.2 Orchestration Saga 구현

from enum import Enum

from kafka import KafkaProducer, KafkaConsumer

class SagaStep(Enum):

PAYMENT = "payment"

INVENTORY = "inventory"

DELIVERY = "delivery"

class OrderSagaOrchestrator:

STEPS = [SagaStep.PAYMENT, SagaStep.INVENTORY, SagaStep.DELIVERY]

def __init__(self, producer: KafkaProducer):

self.producer = producer

self.saga_state = {} # order_id -> {step, status}

def start_saga(self, order_id: str, order_data: dict):

self.saga_state[order_id] = {

"current_step": 0,

"data": order_data,

"completed_steps": [],

}

self._execute_step(order_id)

def _execute_step(self, order_id: str):

state = self.saga_state[order_id]

step_idx = state["current_step"]

if step_idx >= len(self.STEPS):

모든 단계 완료

self._publish(f"order.completed", {"order_id": order_id})

return

step = self.STEPS[step_idx]

self._publish(f"{step.value}.request", {

"order_id": order_id,

"saga_id": order_id,

**state["data"]

})

def handle_step_success(self, order_id: str, step: SagaStep):

state = self.saga_state[order_id]

state["completed_steps"].append(step)

state["current_step"] += 1

self._execute_step(order_id)

def handle_step_failure(self, order_id: str, failed_step: SagaStep):

"""보상 트랜잭션 실행 (역순)"""

state = self.saga_state[order_id]

for step in reversed(state["completed_steps"]):

self._publish(f"{step.value}.compensate", {

"order_id": order_id,

})

self._publish("order.failed", {

"order_id": order_id,

"failed_at": failed_step.value,

})

def _publish(self, topic: str, data: dict):

self.producer.send(topic, json.dumps(data).encode())

3.3 보상 트랜잭션 예시

sequenceDiagram

participant O as Orchestrator

participant P as 결제 서비스

participant I as 재고 서비스

participant D as 배송 서비스

O->>P: 1. 결제 요청

P-->>O: 결제 성공

O->>I: 2. 재고 예약

I-->>O: 재고 예약 성공

O->>D: 3. 배송 요청

D-->>O: ❌ 배송 실패 (주소 오류)

Note over O: 보상 트랜잭션 시작 (역순)

O->>I: 재고 예약 취소

I-->>O: 취소 완료

O->>P: 결제 환불

P-->>O: 환불 완료

O->>O: 주문 실패 처리

4. Event Sourcing

4.1 개념

상태를 직접 저장하는 대신, **상태 변경 이벤트**를 저장:

전통적 방식:

orders 테이블: {id: 1, status: "SHIPPED", amount: 50000}

Event Sourcing:

events 테이블:

1. OrderCreated {amount: 50000}

2. PaymentReceived {payment_id: "pay_123"}

3. StockReserved {warehouse: "seoul"}

4. OrderShipped {tracking: "KR12345"}

→ 이벤트를 순서대로 리플레이하면 현재 상태 복원

4.2 구현

class EventStore:

def __init__(self, db):

self.db = db

def append(self, aggregate_id: str, event_type: str, data: dict, version: int):

self.db.execute("""

INSERT INTO events (aggregate_id, event_type, data, version, created_at)

VALUES (%s, %s, %s, %s, NOW())

""", (aggregate_id, event_type, json.dumps(data), version))

def get_events(self, aggregate_id: str, after_version: int = 0):

return self.db.query("""

SELECT event_type, data, version

FROM events

WHERE aggregate_id = %s AND version > %s

ORDER BY version

""", (aggregate_id, after_version))

class Order:

def __init__(self):

self.id = None

self.status = None

self.version = 0

self._pending_events = []

def apply_event(self, event_type: str, data: dict):

if event_type == "OrderCreated":

self.id = data["order_id"]

self.status = "PENDING"

elif event_type == "PaymentReceived":

self.status = "PAID"

elif event_type == "OrderShipped":

self.status = "SHIPPED"

self.version += 1

@classmethod

def from_events(cls, events):

order = cls()

for e in events:

order.apply_event(e["event_type"], e["data"])

return order

5. Kafka 기반 전체 아키텍처

graph TB

Client[클라이언트] --> Gateway[API Gateway]

Gateway --> OrderCmd[주문 Command API]

Gateway --> OrderQry[주문 Query API]

OrderCmd --> WriteDB[(PostgreSQL)]

WriteDB --> CDC[Debezium CDC]

CDC --> Kafka[Apache Kafka]

Kafka --> Saga[Saga Orchestrator]

Kafka --> Projector[CQRS Projector]

Saga --> PaymentSvc[결제 서비스]

Saga --> InventorySvc[재고 서비스]

Saga --> DeliverySvc[배송 서비스]

Projector --> ReadDB[(Elasticsearch)]

OrderQry --> ReadDB

PaymentSvc --> Kafka

InventorySvc --> Kafka

DeliverySvc --> Kafka

style Kafka fill:#ff9,stroke:#333

style Saga fill:#f96,stroke:#333

6. 실전 고려사항

6.1 멱등성 (Idempotency)

이벤트 중복 처리 방지

class IdempotentConsumer:

def __init__(self, redis_client):

self.redis = redis_client

def process_event(self, event_id: str, handler):

key = f"processed:{event_id}"

if self.redis.setnx(key, "1"):

self.redis.expire(key, 86400) # 24시간 TTL

handler()

else:

pass # 이미 처리됨, 스킵

6.2 이벤트 순서 보장

Kafka 파티션 키로 순서 보장

producer.send(

"order.events",

key=order_id.encode(), # 같은 주문 → 같은 파티션 → 순서 보장

value=json.dumps(event).encode()

)

7. 퀴즈

읽기/쓰기의 **요구사항이 근본적으로 다르기 때문**. 쓰기는 정규화+트랜잭션, 읽기는 비정규화+성능 최적화. 각각에 최적화된 저장소와 모델 사용 가능.

**Choreography**: 각 서비스가 이벤트를 발행/구독하여 자율적으로 동작. **Orchestration**: 중앙 Orchestrator가 각 서비스에 명시적으로 명령. Orchestration이 흐름 파악과 관리가 용이.

Saga에서 한 단계 실패 시, 이미 완료된 단계를 **역순으로 취소**하는 트랜잭션. 예: 배송 실패 → 재고 예약 취소 → 결제 환불.

(1) 완전한 **감사 추적(Audit Trail)** (2) 이벤트 리플레이로 **시점 복구** 가능 (3) **새로운 Read Model을 과거 이벤트로 구축** 가능.

네트워크 장애, 재시도, 리밸런싱 등으로 **같은 이벤트가 여러 번 전달**될 수 있음. 멱등하지 않으면 중복 결제, 중복 재고 차감 등 심각한 문제 발생.

**같은 파티션 키**를 사용. 동일 키의 메시지는 같은 파티션으로 전달되어 순서가 보장됨. 예: order_id를 파티션 키로 사용.

**Eventually Consistent** — 최종적 일관성. 조회 시 최신 데이터가 아닐 수 있음. 해결: (1) 쓰기 직후 Write DB에서 직접 조회 (2) 이벤트 처리 완료 확인 후 응답.

현재 단락 (1/263)

모놀리식에서 MSA로 전환하면 **분산 트랜잭션** 문제가 발생한다:

작성 글자: 0원문 글자: 7,528작성 단락: 0/263