Skip to content

필사 모드: Saga 패턴으로 구현하는 마이크로서비스 분산 트랜잭션: Choreography vs Orchestration

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

마이크로서비스 아키텍처에서 가장 어려운 문제 중 하나가 **분산 트랜잭션**입니다. 모놀리스에서는 데이터베이스의 ACID 트랜잭션으로 해결했지만, 각 서비스가 자체 데이터베이스를 가진 마이크로서비스에서는 2PC(Two-Phase Commit)가 현실적이지 않습니다.

**Saga 패턴**은 긴 트랜잭션을 여러 로컬 트랜잭션으로 분해하고, 실패 시 보상 트랜잭션(Compensating Transaction)으로 일관성을 유지하는 패턴입니다.

왜 2PC가 아닌 Saga인가?

| 항목 | 2PC | Saga |

| --------- | ---------------- | ------------- |

| 정합성 | 강한 정합성 | 결과적 정합성 |

| 가용성 | 낮음 (동기 잠금) | 높음 (비동기) |

| 지연시간 | 높음 | 낮음 |

| 확장성 | 제한적 | 우수 |

| 장애 격리 | 전체 롤백 | 부분 보상 |

주문 시나리오

온라인 쇼핑몰의 주문 처리를 예로 듭니다:

1. **주문 서비스**: 주문 생성

2. **결제 서비스**: 결제 처리

3. **재고 서비스**: 재고 차감

4. **배송 서비스**: 배송 생성

만약 3단계(재고 차감)에서 실패하면, 2단계(결제)를 환불하고 1단계(주문)를 취소해야 합니다.

Choreography 방식 — 이벤트 기반

각 서비스가 이벤트를 발행하고, 다른 서비스가 이벤트를 구독하여 자신의 로컬 트랜잭션을 실행합니다.

이벤트 정의

events.py

from dataclasses import dataclass

from datetime import datetime

@dataclass

class OrderCreated:

order_id: str

customer_id: str

items: list

total_amount: float

timestamp: datetime

@dataclass

class PaymentProcessed:

order_id: str

payment_id: str

amount: float

@dataclass

class PaymentFailed:

order_id: str

reason: str

@dataclass

class InventoryReserved:

order_id: str

items: list

@dataclass

class InventoryInsufficient:

order_id: str

item_id: str

requested: int

available: int

@dataclass

class ShippingCreated:

order_id: str

tracking_number: str

보상 이벤트

@dataclass

class PaymentRefunded:

order_id: str

payment_id: str

amount: float

@dataclass

class InventoryReleased:

order_id: str

items: list

@dataclass

class OrderCancelled:

order_id: str

reason: str

주문 서비스

order_service.py

from kafka import KafkaProducer, KafkaConsumer

class OrderService:

def __init__(self):

self.producer = KafkaProducer(

bootstrap_servers='kafka:9092',

value_serializer=lambda v: json.dumps(v).encode()

)

self.db = OrderDB()

def create_order(self, customer_id, items, total_amount):

"""주문 생성 + OrderCreated 이벤트 발행"""

order = self.db.create(

customer_id=customer_id,

items=items,

total_amount=total_amount,

status="PENDING"

)

self.producer.send('order-events', {

'type': 'OrderCreated',

'order_id': order.id,

'customer_id': customer_id,

'items': items,

'total_amount': total_amount

})

return order

def handle_payment_failed(self, event):

"""결제 실패 시 주문 취소"""

self.db.update_status(event['order_id'], "CANCELLED")

self.producer.send('order-events', {

'type': 'OrderCancelled',

'order_id': event['order_id'],

'reason': event['reason']

})

def handle_inventory_insufficient(self, event):

"""재고 부족 시 환불 요청"""

self.db.update_status(event['order_id'], "CANCELLING")

결제 환불 이벤트 발행

self.producer.send('payment-events', {

'type': 'RefundRequested',

'order_id': event['order_id']

})

def handle_shipping_created(self, event):

"""배송 생성 완료 시 주문 확정"""

self.db.update_status(event['order_id'], "CONFIRMED")

결제 서비스

payment_service.py

class PaymentService:

def __init__(self):

self.producer = KafkaProducer(

bootstrap_servers='kafka:9092',

value_serializer=lambda v: json.dumps(v).encode()

)

self.db = PaymentDB()

def handle_order_created(self, event):

"""주문 생성 이벤트 수신 -> 결제 처리"""

try:

payment = self.db.process_payment(

order_id=event['order_id'],

customer_id=event['customer_id'],

amount=event['total_amount']

)

self.producer.send('payment-events', {

'type': 'PaymentProcessed',

'order_id': event['order_id'],

'payment_id': payment.id,

'amount': event['total_amount']

})

except InsufficientFundsError as e:

self.producer.send('payment-events', {

'type': 'PaymentFailed',

'order_id': event['order_id'],

'reason': str(e)

})

def handle_refund_requested(self, event):

"""보상 트랜잭션: 환불 처리"""

payment = self.db.get_by_order(event['order_id'])

self.db.refund(payment.id)

self.producer.send('payment-events', {

'type': 'PaymentRefunded',

'order_id': event['order_id'],

'payment_id': payment.id,

'amount': payment.amount

})

Orchestration 방식 — 중앙 코디네이터

하나의 Orchestrator(Saga Coordinator)가 전체 트랜잭션 흐름을 관리합니다.

Temporal로 구현하는 Saga Orchestrator

workflows.py

from temporalio import workflow

from temporalio.common import RetryPolicy

from datetime import timedelta

@workflow.defn

class OrderSagaWorkflow:

"""주문 Saga - Orchestration 방식"""

@workflow.run

async def run(self, order_request: dict) -> dict:

retry_policy = RetryPolicy(

maximum_attempts=3,

initial_interval=timedelta(seconds=1),

maximum_interval=timedelta(seconds=10),

)

order_id = None

payment_id = None

reservation_id = None

try:

Step 1: 주문 생성

order_id = await workflow.execute_activity(

create_order,

order_request,

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Order created: {order_id}")

Step 2: 결제 처리

payment_id = await workflow.execute_activity(

process_payment,

{"order_id": order_id, "amount": order_request["total_amount"]},

start_to_close_timeout=timedelta(seconds=60),

retry_policy=retry_policy,

)

workflow.logger.info(f"Payment processed: {payment_id}")

Step 3: 재고 예약

reservation_id = await workflow.execute_activity(

reserve_inventory,

{"order_id": order_id, "items": order_request["items"]},

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Inventory reserved: {reservation_id}")

Step 4: 배송 생성

tracking_number = await workflow.execute_activity(

create_shipment,

{"order_id": order_id, "address": order_request["address"]},

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Shipment created: {tracking_number}")

Step 5: 주문 확정

await workflow.execute_activity(

confirm_order,

{"order_id": order_id},

start_to_close_timeout=timedelta(seconds=10),

)

return {

"status": "SUCCESS",

"order_id": order_id,

"tracking_number": tracking_number

}

except Exception as e:

workflow.logger.error(f"Saga failed: {e}")

보상 트랜잭션 실행 (역순)

if reservation_id:

await workflow.execute_activity(

release_inventory,

{"reservation_id": reservation_id},

start_to_close_timeout=timedelta(seconds=30),

)

if payment_id:

await workflow.execute_activity(

refund_payment,

{"payment_id": payment_id},

start_to_close_timeout=timedelta(seconds=60),

)

if order_id:

await workflow.execute_activity(

cancel_order,

{"order_id": order_id, "reason": str(e)},

start_to_close_timeout=timedelta(seconds=10),

)

return {"status": "FAILED", "reason": str(e)}

Activity 구현

activities.py

from temporalio import activity

@activity.defn

async def create_order(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://order-service:8080/orders",

json=request

)

response.raise_for_status()

return response.json()["order_id"]

@activity.defn

async def process_payment(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://payment-service:8080/payments",

json=request

)

response.raise_for_status()

return response.json()["payment_id"]

@activity.defn

async def reserve_inventory(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://inventory-service:8080/reservations",

json=request

)

if response.status_code == 409:

raise InsufficientInventoryError(response.json()["message"])

response.raise_for_status()

return response.json()["reservation_id"]

보상 활동

@activity.defn

async def refund_payment(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://payment-service:8080/refunds",

json=request

)

response.raise_for_status()

@activity.defn

async def release_inventory(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.delete(

f"http://inventory-service:8080/reservations/{request['reservation_id']}"

)

response.raise_for_status()

@activity.defn

async def cancel_order(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.patch(

f"http://order-service:8080/orders/{request['order_id']}/cancel",

json={"reason": request["reason"]}

)

response.raise_for_status()

Temporal Worker 실행

worker.py

from temporalio.client import Client

from temporalio.worker import Worker

async def main():

client = await Client.connect("temporal:7233")

worker = Worker(

client,

task_queue="order-saga",

workflows=[OrderSagaWorkflow],

activities=[

create_order,

process_payment,

reserve_inventory,

create_shipment,

confirm_order,

refund_payment,

release_inventory,

cancel_order,

],

)

await worker.run()

if __name__ == "__main__":

asyncio.run(main())

Choreography vs Orchestration

| 항목 | Choreography | Orchestration |

| ----------- | ---------------------- | --------------------- |

| 결합도 | 느슨함 | Orchestrator에 의존 |

| 복잡도 | 서비스 수 증가 시 급증 | 중앙에서 관리 |

| 가시성 | 이벤트 추적 어려움 | 워크플로 상태 명확 |

| 단일 장애점 | 없음 | Orchestrator |

| 적합한 경우 | 단순한 흐름 (3-4 단계) | 복잡한 흐름 (5+ 단계) |

| 테스트 | 어려움 | 상대적으로 쉬움 |

Semantic Lock 카운터 측정

Saga는 격리성이 없으므로, 동시 실행 시 데이터 이상이 발생할 수 있습니다.

Semantic Lock 패턴

class OrderDB:

def create_with_lock(self, order_data):

"""주문 생성 시 상태를 PENDING으로 설정하여 다른 Saga가 간섭 못하게"""

order = Order(

**order_data,

status="PENDING", # Semantic Lock

saga_id=generate_saga_id()

)

self.session.add(order)

self.session.commit()

return order

def confirm_order(self, order_id, saga_id):

"""Saga 완료 시 락 해제"""

order = self.session.query(Order).filter_by(

id=order_id,

saga_id=saga_id,

status="PENDING"

).first()

if not order:

raise ConcurrencyError("Order already processed by another saga")

order.status = "CONFIRMED"

self.session.commit()

멱등성 보장

보상 트랜잭션이 여러 번 실행될 수 있으므로 멱등성이 필수입니다.

멱등성 키를 활용한 결제 처리

class PaymentService:

def process_payment(self, order_id, amount, idempotency_key):

이미 처리된 요청인지 확인

existing = self.db.find_by_idempotency_key(idempotency_key)

if existing:

return existing # 동일 결과 반환

payment = self.db.create_payment(

order_id=order_id,

amount=amount,

idempotency_key=idempotency_key

)

return payment

정리

Saga 패턴은 마이크로서비스에서 분산 트랜잭션을 처리하는 핵심 패턴입니다:

- **Choreography**: 단순한 흐름에 적합, 이벤트 기반으로 느슨한 결합

- **Orchestration**: 복잡한 흐름에 적합, Temporal 등으로 중앙 관리

- **보상 트랜잭션**: 실패 시 역순으로 복구

- **멱등성**: 재시도 안전성을 위해 필수

- **Semantic Lock**: 동시 Saga 간 간섭 방지

**Q1. 2PC 대신 Saga를 사용하는 주된 이유는?**

2PC는 동기 잠금으로 가용성과 확장성이 제한되지만, Saga는 비동기로 높은 가용성과 확장성을 제공합니다.

**Q2. 보상 트랜잭션(Compensating Transaction)이란?**

Saga의 특정 단계가 실패했을 때, 이전 성공한 단계를 되돌리는 역순 트랜잭션입니다.

**Q3. Choreography 방식의 단점은?**

서비스 수가 증가하면 이벤트 흐름이 복잡해지고, 전체 Saga의 진행 상황을 파악하기 어렵습니다.

**Q4. Temporal이 Saga Orchestration에 적합한 이유는?**

워크플로 상태를 자동으로 영속화하고, 실패 시 재시도와 보상을 선언적으로 관리할 수 있습니다.

**Q5. Semantic Lock 패턴의 목적은?**

PENDING 상태로 다른 Saga가 같은 데이터를 동시에 수정하는 것을 방지합니다.

**Q6. 보상 트랜잭션에서 멱등성이 중요한 이유는?**

네트워크 오류 등으로 보상이 여러 번 실행될 수 있으므로, 같은 결과를 보장해야 합니다.

**Q7. Choreography와 Orchestration 중 어떤 상황에서 Orchestration을 선택해야 하나요?**

5단계 이상의 복잡한 흐름, 조건부 분기가 많은 경우, 전체 진행 상황의 가시성이 필요한 경우입니다.

현재 단락 (1/355)

마이크로서비스 아키텍처에서 가장 어려운 문제 중 하나가 **분산 트랜잭션**입니다. 모놀리스에서는 데이터베이스의 ACID 트랜잭션으로 해결했지만, 각 서비스가 자체 데이터베이스를 ...

작성 글자: 0원문 글자: 9,418작성 단락: 0/355