Skip to content

필사 모드: Implementing Distributed Transactions in Microservices with the Saga Pattern: Choreography vs Orchestration

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Introduction

One of the hardest problems in microservices architecture is **distributed transactions**. In a monolith, we solved this with the database's ACID transactions, but in microservices where each service owns its own database, 2PC (Two-Phase Commit) is not practical.

The **Saga pattern** decomposes a long transaction into multiple local transactions and maintains consistency through compensating transactions when failures occur.

Why Saga Instead of 2PC?

| Aspect | 2PC | Saga |

| --------------- | ----------------------- | -------------------- |

| Consistency | Strong consistency | Eventual consistency |

| Availability | Low (synchronous locks) | High (asynchronous) |

| Latency | High | Low |

| Scalability | Limited | Excellent |

| Fault isolation | Full rollback | Partial compensation |

Order Scenario

Let us use an online shopping order process as an example:

1. **Order Service**: Create the order

2. **Payment Service**: Process payment

3. **Inventory Service**: Deduct inventory

4. **Shipping Service**: Create shipment

If step 3 (inventory deduction) fails, we must refund step 2 (payment) and cancel step 1 (order).

Choreography Approach — Event-Driven

Each service publishes events, and other services subscribe to those events to execute their own local transactions.

Event Definitions

events.py

from dataclasses import dataclass

from datetime import datetime

@dataclass

class OrderCreated:

order_id: str

customer_id: str

items: list

total_amount: float

timestamp: datetime

@dataclass

class PaymentProcessed:

order_id: str

payment_id: str

amount: float

@dataclass

class PaymentFailed:

order_id: str

reason: str

@dataclass

class InventoryReserved:

order_id: str

items: list

@dataclass

class InventoryInsufficient:

order_id: str

item_id: str

requested: int

available: int

@dataclass

class ShippingCreated:

order_id: str

tracking_number: str

Compensation events

@dataclass

class PaymentRefunded:

order_id: str

payment_id: str

amount: float

@dataclass

class InventoryReleased:

order_id: str

items: list

@dataclass

class OrderCancelled:

order_id: str

reason: str

Order Service

order_service.py

from kafka import KafkaProducer, KafkaConsumer

class OrderService:

def __init__(self):

self.producer = KafkaProducer(

bootstrap_servers='kafka:9092',

value_serializer=lambda v: json.dumps(v).encode()

)

self.db = OrderDB()

def create_order(self, customer_id, items, total_amount):

"""Create order + publish OrderCreated event"""

order = self.db.create(

customer_id=customer_id,

items=items,

total_amount=total_amount,

status="PENDING"

)

self.producer.send('order-events', {

'type': 'OrderCreated',

'order_id': order.id,

'customer_id': customer_id,

'items': items,

'total_amount': total_amount

})

return order

def handle_payment_failed(self, event):

"""Cancel order on payment failure"""

self.db.update_status(event['order_id'], "CANCELLED")

self.producer.send('order-events', {

'type': 'OrderCancelled',

'order_id': event['order_id'],

'reason': event['reason']

})

def handle_inventory_insufficient(self, event):

"""Request refund on insufficient inventory"""

self.db.update_status(event['order_id'], "CANCELLING")

Publish payment refund event

self.producer.send('payment-events', {

'type': 'RefundRequested',

'order_id': event['order_id']

})

def handle_shipping_created(self, event):

"""Confirm order when shipping is created"""

self.db.update_status(event['order_id'], "CONFIRMED")

Payment Service

payment_service.py

class PaymentService:

def __init__(self):

self.producer = KafkaProducer(

bootstrap_servers='kafka:9092',

value_serializer=lambda v: json.dumps(v).encode()

)

self.db = PaymentDB()

def handle_order_created(self, event):

"""Receive OrderCreated event -> process payment"""

try:

payment = self.db.process_payment(

order_id=event['order_id'],

customer_id=event['customer_id'],

amount=event['total_amount']

)

self.producer.send('payment-events', {

'type': 'PaymentProcessed',

'order_id': event['order_id'],

'payment_id': payment.id,

'amount': event['total_amount']

})

except InsufficientFundsError as e:

self.producer.send('payment-events', {

'type': 'PaymentFailed',

'order_id': event['order_id'],

'reason': str(e)

})

def handle_refund_requested(self, event):

"""Compensating transaction: process refund"""

payment = self.db.get_by_order(event['order_id'])

self.db.refund(payment.id)

self.producer.send('payment-events', {

'type': 'PaymentRefunded',

'order_id': event['order_id'],

'payment_id': payment.id,

'amount': payment.amount

})

Orchestration Approach — Central Coordinator

A single Orchestrator (Saga Coordinator) manages the entire transaction flow.

Saga Orchestrator with Temporal

workflows.py

from temporalio import workflow

from temporalio.common import RetryPolicy

from datetime import timedelta

@workflow.defn

class OrderSagaWorkflow:

"""Order Saga - Orchestration approach"""

@workflow.run

async def run(self, order_request: dict) -> dict:

retry_policy = RetryPolicy(

maximum_attempts=3,

initial_interval=timedelta(seconds=1),

maximum_interval=timedelta(seconds=10),

)

order_id = None

payment_id = None

reservation_id = None

try:

Step 1: Create order

order_id = await workflow.execute_activity(

create_order,

order_request,

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Order created: {order_id}")

Step 2: Process payment

payment_id = await workflow.execute_activity(

process_payment,

{"order_id": order_id, "amount": order_request["total_amount"]},

start_to_close_timeout=timedelta(seconds=60),

retry_policy=retry_policy,

)

workflow.logger.info(f"Payment processed: {payment_id}")

Step 3: Reserve inventory

reservation_id = await workflow.execute_activity(

reserve_inventory,

{"order_id": order_id, "items": order_request["items"]},

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Inventory reserved: {reservation_id}")

Step 4: Create shipment

tracking_number = await workflow.execute_activity(

create_shipment,

{"order_id": order_id, "address": order_request["address"]},

start_to_close_timeout=timedelta(seconds=30),

retry_policy=retry_policy,

)

workflow.logger.info(f"Shipment created: {tracking_number}")

Step 5: Confirm order

await workflow.execute_activity(

confirm_order,

{"order_id": order_id},

start_to_close_timeout=timedelta(seconds=10),

)

return {

"status": "SUCCESS",

"order_id": order_id,

"tracking_number": tracking_number

}

except Exception as e:

workflow.logger.error(f"Saga failed: {e}")

Execute compensating transactions (reverse order)

if reservation_id:

await workflow.execute_activity(

release_inventory,

{"reservation_id": reservation_id},

start_to_close_timeout=timedelta(seconds=30),

)

if payment_id:

await workflow.execute_activity(

refund_payment,

{"payment_id": payment_id},

start_to_close_timeout=timedelta(seconds=60),

)

if order_id:

await workflow.execute_activity(

cancel_order,

{"order_id": order_id, "reason": str(e)},

start_to_close_timeout=timedelta(seconds=10),

)

return {"status": "FAILED", "reason": str(e)}

Activity Implementation

activities.py

from temporalio import activity

@activity.defn

async def create_order(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://order-service:8080/orders",

json=request

)

response.raise_for_status()

return response.json()["order_id"]

@activity.defn

async def process_payment(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://payment-service:8080/payments",

json=request

)

response.raise_for_status()

return response.json()["payment_id"]

@activity.defn

async def reserve_inventory(request: dict) -> str:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://inventory-service:8080/reservations",

json=request

)

if response.status_code == 409:

raise InsufficientInventoryError(response.json()["message"])

response.raise_for_status()

return response.json()["reservation_id"]

Compensation activities

@activity.defn

async def refund_payment(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.post(

"http://payment-service:8080/refunds",

json=request

)

response.raise_for_status()

@activity.defn

async def release_inventory(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.delete(

f"http://inventory-service:8080/reservations/{request['reservation_id']}"

)

response.raise_for_status()

@activity.defn

async def cancel_order(request: dict) -> None:

async with httpx.AsyncClient() as client:

response = await client.patch(

f"http://order-service:8080/orders/{request['order_id']}/cancel",

json={"reason": request["reason"]}

)

response.raise_for_status()

Running the Temporal Worker

worker.py

from temporalio.client import Client

from temporalio.worker import Worker

async def main():

client = await Client.connect("temporal:7233")

worker = Worker(

client,

task_queue="order-saga",

workflows=[OrderSagaWorkflow],

activities=[

create_order,

process_payment,

reserve_inventory,

create_shipment,

confirm_order,

refund_payment,

release_inventory,

cancel_order,

],

)

await worker.run()

if __name__ == "__main__":

asyncio.run(main())

Choreography vs Orchestration

| Aspect | Choreography | Orchestration |

| ----------------------- | ----------------------------- | ------------------------ |

| Coupling | Loose | Depends on Orchestrator |

| Complexity | Surges as service count grows | Centrally managed |

| Visibility | Hard to trace events | Clear workflow state |

| Single point of failure | None | Orchestrator |

| Best for | Simple flows (3-4 steps) | Complex flows (5+ steps) |

| Testing | Difficult | Relatively easy |

Semantic Lock Countermeasure

Saga lacks isolation, so concurrent executions can cause data anomalies.

Semantic Lock pattern

class OrderDB:

def create_with_lock(self, order_data):

"""Set status to PENDING on creation to prevent other Sagas from interfering"""

order = Order(

**order_data,

status="PENDING", # Semantic Lock

saga_id=generate_saga_id()

)

self.session.add(order)

self.session.commit()

return order

def confirm_order(self, order_id, saga_id):

"""Release the lock when the Saga completes"""

order = self.session.query(Order).filter_by(

id=order_id,

saga_id=saga_id,

status="PENDING"

).first()

if not order:

raise ConcurrencyError("Order already processed by another saga")

order.status = "CONFIRMED"

self.session.commit()

Ensuring Idempotency

Since compensating transactions may execute multiple times, idempotency is essential.

Payment processing with idempotency keys

class PaymentService:

def process_payment(self, order_id, amount, idempotency_key):

Check if request was already processed

existing = self.db.find_by_idempotency_key(idempotency_key)

if existing:

return existing # Return the same result

payment = self.db.create_payment(

order_id=order_id,

amount=amount,

idempotency_key=idempotency_key

)

return payment

Summary

The Saga pattern is the core pattern for handling distributed transactions in microservices:

- **Choreography**: Suited for simple flows, loosely coupled through events

- **Orchestration**: Suited for complex flows, centrally managed with tools like Temporal

- **Compensating Transactions**: Recover in reverse order on failure

- **Idempotency**: Essential for retry safety

- **Semantic Lock**: Prevents interference between concurrent Sagas

**Q1. What is the main reason for using Saga instead of 2PC?**

2PC limits availability and scalability through synchronous locking, while Saga provides high availability and scalability through asynchronous processing.

**Q2. What is a compensating transaction?**

A reverse transaction that undoes previously completed steps when a specific step in the Saga fails.

**Q3. What are the downsides of the Choreography approach?**

As the number of services grows, event flows become complex and it becomes difficult to understand the overall Saga's progress.

**Q4. Why is Temporal well-suited for Saga Orchestration?**

It automatically persists workflow state and allows declarative management of retries and compensation on failure.

**Q5. What is the purpose of the Semantic Lock pattern?**

It prevents other Sagas from concurrently modifying the same data by using a PENDING status.

**Q6. Why is idempotency important in compensating transactions?**

Compensation may execute multiple times due to network errors and other issues, so it must guarantee the same result each time.

**Q7. In what situations should you choose Orchestration over Choreography?**

When you have complex flows with 5 or more steps, many conditional branches, or need visibility into overall progress.

Quiz

Q1: What is the main topic covered in "Implementing Distributed Transactions in Microservices

with the Saga Pattern: Choreography vs Orchestration"?

Implement distributed transactions in microservices using the Saga pattern. We cover the

differences between Choreography and Orchestration, compensating transactions, and practical

implementation with Temporal — all with code examples.

Let us use an online shopping order process as an example: Order Service: Create the order Payment

Service: Process payment Inventory Service: Deduct inventory Shipping Service: Create shipment If

step 3 (inventory deduction) fails, we must refund step 2 (payment) and cancel step...

Each service publishes events, and other services subscribe to those events to execute their own

local transactions. Event Definitions Order Service Payment Service

single Orchestrator (Saga Coordinator) manages the entire transaction flow. Saga Orchestrator with

Temporal Activity Implementation Running the Temporal Worker

Saga lacks isolation, so concurrent executions can cause data anomalies.

현재 단락 (1/368)

One of the hardest problems in microservices architecture is **distributed transactions**. In a mono...

작성 글자: 0원문 글자: 12,362작성 단락: 0/368