Split View: 분산 시스템 완전 정복: CAP theorem부터 분산 ML 학습, Kafka까지
분산 시스템 완전 정복: CAP theorem부터 분산 ML 학습, Kafka까지
- 들어가며
- 1. 분산 시스템 기초: CAP theorem과 일관성 모델
- 2. 합의 알고리즘: Paxos와 Raft
- 3. 분산 트랜잭션: 2PC, Saga, CQRS
- 4. 메시지 큐: Kafka 아키텍처
- 5. 분산 스토리지: Consistent Hashing과 Cassandra
- 6. 분산 ML 학습: Ring-AllReduce와 PyTorch Distributed
- 7. 관측성: 분산 추적, 로그, 메트릭
- 퀴즈
- 마치며
들어가며
분산 시스템은 현대 AI 인프라의 뼈대입니다. GPT-4 같은 대형 모델을 학습시키려면 수천 개의 GPU가 협력해야 하고, 초당 수백만 건의 추론 요청을 처리하려면 Kafka, Cassandra 같은 분산 시스템이 필수입니다.
이 가이드는 CAP theorem 같은 이론적 기초부터 실제 PyTorch 분산 학습 코드까지 AI 엔지니어에게 필요한 분산 시스템 지식을 체계적으로 다룹니다.
1. 분산 시스템 기초: CAP theorem과 일관성 모델
CAP Theorem
2000년 Eric Brewer가 제시한 CAP theorem은 분산 시스템의 근본적 한계를 설명합니다.
┌─────────────────────────────────────────────┐
│ CAP Triangle │
│ │
│ Consistency (C) │
│ △ │
│ / \ │
│ / \ │
│ / \ │
│ / CA \ │
│ / (RDBMS)\ │
│ /───────────\ │
│ / CP │ AP \ │
│ / (HBase│(Cassandra) │
│ ▽────────┼────────▽ │
│ Availability Partition Tolerance │
│ (A) (P) │
└─────────────────────────────────────────────┘
세 가지 속성은 동시에 모두 만족할 수 없습니다:
- C (Consistency): 모든 노드가 동일한 데이터를 읽는다
- A (Availability): 모든 요청이 응답을 받는다
- P (Partition Tolerance): 네트워크 파티션 상황에서도 작동한다
네트워크 파티션은 실제 분산 환경에서 반드시 발생하기 때문에, 실질적 선택은 CP vs AP입니다.
| 시스템 | 선택 | 사례 |
|---|---|---|
| HBase, Zookeeper, etcd | CP | 파티션 시 가용성 포기 |
| Cassandra, DynamoDB | AP | 파티션 시 일관성 포기 |
| 전통적 RDBMS | CA | 파티션 허용 안 함 |
PACELC: CAP의 확장
CAP은 파티션 상황만 다루지만, PACELC는 정상 운영 시의 Latency vs Consistency 트레이드오프도 포함합니다.
P → A or C (파티션 발생 시)
E → L or C (정상 운영 시)
Cassandra: PA/EL (가용성 + 낮은 레이턴시 우선)
etcd/Raft: PC/EC (일관성 우선)
일관성 모델
강한 일관성에서 약한 일관성 순서로:
Linearizability (선형화 가능성)
└── 가장 강력. 모든 연산이 즉각 반영
└── 예: etcd, Zookeeper
Sequential Consistency (순차 일관성)
└── 모든 노드가 같은 순서로 연산을 봄
└── 예: CPU 메모리 모델
Causal Consistency (인과 일관성)
└── 인과관계가 있는 연산은 순서 보장
└── 예: MongoDB causally consistent sessions
Eventual Consistency (최종 일관성)
└── 언젠가는 일관성이 맞춰짐
└── 예: Cassandra, DNS, S3
2. 합의 알고리즘: Paxos와 Raft
Paxos의 문제
Leslie Lamport가 1989년 제안한 Paxos는 이론적으로 완벽하지만 이해하기 너무 어렵습니다. Diego Ongaro가 "Paxos는 이해하기 너무 어렵다"는 논문 제목으로 Raft를 발표한 이유가 여기에 있습니다.
Raft 알고리즘
Raft는 분산 합의를 세 가지 독립적 문제로 분리합니다:
- 리더 선출 (Leader Election)
- 로그 복제 (Log Replication)
- 안전성 (Safety)
노드 상태
┌──────────┐ timeout ┌──────────────┐ majority ┌────────┐
│ │──────────▶│ Candidate │────────────▶│ │
│ Follower │ └──────────────┘ │ Leader │
│ │◀─────────────────────────────────────── │ │
└──────────┘ receive heartbeat └────────┘
▲ │
└────────────────── heartbeat ──────────────────────┘
리더 선출 과정
- Follower가 election timeout(150-300ms) 동안 heartbeat를 못 받으면 Candidate로 전환
- term 번호를 1 증가시키고 자신에게 투표 후 다른 노드에 RequestVote RPC 발송
- 과반수 투표를 받으면 Leader 당선
- Leader는 주기적으로 AppendEntries(heartbeat) 발송
로그 복제
# etcd를 이용한 분산 잠금 구현
import etcd3
import time
def distributed_lock(etcd_client, lock_name, ttl=10):
"""Raft 기반 etcd로 분산 잠금 구현"""
lease = etcd_client.lease(ttl)
lock_key = f"/locks/{lock_name}"
# atomic CAS (Compare-And-Swap)
success, _ = etcd_client.transaction(
compare=[
etcd3.transactions.create(lock_key, '==', 0)
],
success=[
etcd3.transactions.put(lock_key, 'locked', lease=lease)
],
failure=[]
)
if success:
print(f"Lock acquired: {lock_name}")
return lease
else:
print(f"Lock already held: {lock_name}")
return None
def release_lock(etcd_client, lease):
"""잠금 해제"""
if lease:
lease.revoke()
print("Lock released")
# 사용 예시
client = etcd3.client(host='localhost', port=2379)
lease = distributed_lock(client, 'my-resource')
if lease:
try:
# 임계 영역에서 작업 수행
time.sleep(5)
finally:
release_lock(client, lease)
etcd 실전: Kubernetes가 etcd를 사용하는 방식
Kubernetes는 모든 클러스터 상태를 etcd에 저장합니다. etcd 클러스터의 쓰기 성능은 Raft 합의에 의해 제한되므로, 일반적으로 3~5개 노드를 권장합니다.
┌─────────────────────────────────────────────────────┐
│ etcd Cluster (3 nodes) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Leader │───▶│Follower 1│ │Follower 2│ │
│ │ node-1 │───▶│ node-2 │ │ node-3 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ▲ │ │ │
│ └───────────────┴───────────────┘ │
│ Raft Consensus │
└─────────────────────────────────────────────────────┘
3. 분산 트랜잭션: 2PC, Saga, CQRS
2PC (Two-Phase Commit)
Phase 1 (Prepare):
Coordinator ──▶ Participant A: "준비됐어?"
Coordinator ──▶ Participant B: "준비됐어?"
Participant A ──▶ Coordinator: "YES"
Participant B ──▶ Coordinator: "YES"
Phase 2 (Commit):
Coordinator ──▶ Participant A: "커밋해!"
Coordinator ──▶ Participant B: "커밋해!"
2PC의 문제점: Coordinator 장애 시 Participant들이 blocking 상태에 빠집니다.
Saga 패턴
Saga는 긴 트랜잭션을 여러 로컬 트랜잭션으로 분해하고, 실패 시 보상 트랜잭션을 실행합니다.
주문 Saga:
1. 주문 생성 → 실패 시: 주문 취소
2. 재고 차감 → 실패 시: 재고 복구
3. 결제 처리 → 실패 시: 결제 환불
4. 배송 시작 → 실패 시: 배송 취소
from typing import Callable, List
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SagaStep:
name: str
action: Callable
compensating_action: Callable
class SagaOrchestrator:
"""Saga 오케스트레이터 패턴 구현"""
def __init__(self, steps: List[SagaStep]):
self.steps = steps
self.completed_steps = []
def execute(self, context: dict) -> bool:
for step in self.steps:
try:
logger.info(f"Executing step: {step.name}")
step.action(context)
self.completed_steps.append(step)
logger.info(f"Step completed: {step.name}")
except Exception as e:
logger.error(f"Step failed: {step.name}, error: {e}")
self._compensate(context)
return False
return True
def _compensate(self, context: dict):
"""역순으로 보상 트랜잭션 실행"""
logger.info("Starting compensation...")
for step in reversed(self.completed_steps):
try:
logger.info(f"Compensating: {step.name}")
step.compensating_action(context)
except Exception as e:
logger.error(f"Compensation failed: {step.name}, error: {e}")
# 보상 트랜잭션 실패는 수동 개입이 필요
raise RuntimeError(f"Saga compensation failed at {step.name}")
# 주문 Saga 정의
def create_order(ctx):
ctx['order_id'] = 'ORD-001'
print(f"Order created: {ctx['order_id']}")
def cancel_order(ctx):
print(f"Order cancelled: {ctx['order_id']}")
def deduct_inventory(ctx):
ctx['inventory_reserved'] = True
print("Inventory deducted")
def restore_inventory(ctx):
print("Inventory restored")
def process_payment(ctx):
# 결제 실패 시뮬레이션
raise Exception("Payment gateway timeout")
def refund_payment(ctx):
print("Payment refunded")
order_saga = SagaOrchestrator([
SagaStep("create_order", create_order, cancel_order),
SagaStep("deduct_inventory", deduct_inventory, restore_inventory),
SagaStep("process_payment", process_payment, refund_payment),
])
context = {"user_id": "user-123", "amount": 50000}
result = order_saga.execute(context)
print(f"Saga result: {'success' if result else 'failed with compensation'}")
이벤트 소싱과 CQRS
CQRS 아키텍처:
Command Side (쓰기) Query Side (읽기)
───────────────── ────────────────
POST /orders ──▶ GET /orders/123
PUT /inventory ──▶ GET /inventory/stats
│ ▲
▼ │
Event Store ──── Projection ────────┘
(append-only) (read model 갱신)
이벤트 소싱의 핵심: 현재 상태 대신 이벤트의 시퀀스를 저장합니다. 이를 통해 특정 시점의 상태 복원, 감사 로그, 이벤트 리플레이가 가능합니다.
4. 메시지 큐: Kafka 아키텍처
Kafka 핵심 개념
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ Topic: orders (4 partitions, replication factor: 3) │
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ P0 L │ │ P0 F │ │ P0 F │ L=Leader, F=Follower │
│ │ P1 F │ │ P1 L │ │ P1 F │ │
│ │ P2 F │ │ P2 F │ │ P2 L │ │
│ │ P3 L │ │ P3 F │ │ P3 F │ │
│ └──────┘ └──────┘ └──────┘ │
│ │
│ Consumer Group A Consumer Group B │
│ Consumer 1: P0, P1 Consumer X: P0, P1, P2, P3 │
│ Consumer 2: P2, P3 │
└─────────────────────────────────────────────────────────────┘
- Partition: 순서 보장의 단위이자 병렬 처리의 단위
- Replication: 파티션을 여러 브로커에 복제하여 고가용성 보장
- Consumer Group: 각 파티션은 그룹 내 하나의 컨슈머만 처리
Kafka Producer/Consumer 실전
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
# Producer 설정
producer_config = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # 모든 ISR 복제 완료 후 확인
'retries': 3,
'enable.idempotence': True, # 정확히 한 번 전송 보장
'compression.type': 'snappy',
}
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer = Producer(producer_config)
# 주문 이벤트 발행
for i in range(10):
order_event = {
'order_id': f'ORD-{i:04d}',
'user_id': f'user-{i % 5}',
'amount': 10000 * (i + 1),
'timestamp': time.time()
}
producer.produce(
topic='orders',
key=order_event['user_id'], # 같은 user는 같은 파티션으로
value=json.dumps(order_event),
callback=delivery_report
)
producer.poll(0)
producer.flush()
print("All messages sent")
# Consumer 설정
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # 수동 오프셋 커밋으로 at-least-once 보장
}
consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaError(msg.error())
event = json.loads(msg.value().decode('utf-8'))
print(f"Processing order: {event['order_id']}, amount: {event['amount']}")
# 처리 완료 후 오프셋 커밋
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()
Kafka Topic YAML 설정
# Kafka Topic 설정 (Strimzi Operator 기준)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: orders
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 12 # 파티션 수 = 최대 병렬 컨슈머 수
replicas: 3 # 복제 팩터
config:
retention.ms: '604800000' # 7일 보관
min.insync.replicas: '2' # 최소 2개 ISR 복제 후 확인
compression.type: snappy
cleanup.policy: delete
Pulsar vs Kafka
| 항목 | Kafka | Pulsar |
|---|---|---|
| 스토리지 | Broker에 직접 | Apache BookKeeper 분리 |
| 멀티 테넌시 | 제한적 | 네이티브 지원 |
| 지역 복제 | MirrorMaker2 | 내장 Geo-Replication |
| 레이턴시 | ~5ms | ~1ms |
| 운영 복잡도 | 낮음 | 높음 |
| 성숙도 | 매우 높음 | 높음 |
5. 분산 스토리지: Consistent Hashing과 Cassandra
Consistent Hashing
일반 해싱의 문제: 노드 추가/제거 시 거의 모든 키가 재배치됩니다. Consistent Hashing의 해결책: 노드 변경 시 최소한의 키만 재배치됩니다.
import hashlib
from sortedcontainers import SortedDict
class ConsistentHashRing:
"""Consistent Hashing 링 구현"""
def __init__(self, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = SortedDict()
self.nodes = set()
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""노드를 링에 추가 (가상 노드 포함)"""
self.nodes.add(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}#{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = node
print(f"Added node: {node} with {self.virtual_nodes} virtual nodes")
def remove_node(self, node: str):
"""노드를 링에서 제거"""
self.nodes.discard(node)
keys_to_remove = [k for k, v in self.ring.items() if v == node]
for k in keys_to_remove:
del self.ring[k]
print(f"Removed node: {node}")
def get_node(self, key: str) -> str:
"""키에 해당하는 노드 반환"""
if not self.ring:
raise Exception("Ring is empty")
hash_val = self._hash(key)
# 해시값보다 크거나 같은 첫 번째 가상 노드 찾기
idx = self.ring.bisect_left(hash_val)
if idx == len(self.ring):
idx = 0 # 링이므로 wrap around
return self.ring.values()[idx]
def get_nodes_for_replication(self, key: str, n: int) -> list:
"""복제를 위한 n개의 노드 반환"""
if len(self.nodes) < n:
raise Exception(f"Not enough nodes: {len(self.nodes)} < {n}")
hash_val = self._hash(key)
idx = self.ring.bisect_left(hash_val)
result_nodes = []
seen_nodes = set()
total = len(self.ring)
for i in range(total):
curr_idx = (idx + i) % total
node = self.ring.values()[curr_idx]
if node not in seen_nodes:
seen_nodes.add(node)
result_nodes.append(node)
if len(result_nodes) == n:
break
return result_nodes
# 사용 예시
ring = ConsistentHashRing(virtual_nodes=150)
for i in range(5):
ring.add_node(f"cassandra-node-{i}")
# 키 분배 테스트
for key in ["user:1001", "order:5555", "product:abc", "session:xyz"]:
node = ring.get_node(key)
replicas = ring.get_nodes_for_replication(key, 3)
print(f"Key '{key}' -> Primary: {node}, Replicas: {replicas}")
Apache Cassandra 아키텍처
┌─────────────────────────────────────────────────────────────────┐
│ Cassandra Ring (6 nodes) │
│ │
│ Node 1 │
│ / \ │
│ Node 6 Node 2 │
│ | | │
│ Node 5 Node 3 │
│ \ / │
│ Node 4 │
│ │
│ Write Path: │
│ Client → Coordinator → Partition Key 해싱 → 담당 노드들 │
│ → CommitLog (WAL) → Memtable → SSTable │
│ │
│ Read Path: │
│ Client → Coordinator → 담당 노드들 → Row Cache / Bloom Filter │
│ → SSTable 병합 → 최신 데이터 반환 │
└─────────────────────────────────────────────────────────────────┘
Cassandra의 핵심 특성:
- Tunable Consistency:
QUORUM,ONE,ALL등으로 일관성 수준 선택 가능 - Gossip Protocol: 노드 간 상태 전파에 epidemic 알고리즘 사용
- Compaction: 주기적으로 SSTable을 병합하여 읽기 성능 향상
6. 분산 ML 학습: Ring-AllReduce와 PyTorch Distributed
Parameter Server vs All-Reduce
Parameter Server 방식:
─────────────────────
Worker 1 ─┐
Worker 2 ─┤── PS (Parameter Server) ──▶ 그래디언트 집계 후 배포
Worker 3 ─┘
문제: PS가 병목. 대역폭이 O(n)으로 증가
Ring-AllReduce 방식:
────────────────────
Worker 1 ──▶ Worker 2 ──▶ Worker 3
▲ │
└──────────────────────────┘
각 Worker가 이웃과만 통신.
총 통신량은 노드 수에 무관하게 O(n-1)/n * data_size
Ring-AllReduce의 두 단계:
- Reduce-Scatter: 각 노드가 자신의 청크를 reduce하면서 다음 노드로 전달
- All-Gather: reduce된 결과를 모든 노드에 전파
PyTorch DistributedDataParallel (DDP)
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def setup(rank, world_size):
"""분산 학습 초기화"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# NCCL 백엔드: GPU간 고속 통신 (NVLink, InfiniBand 활용)
dist.init_process_group(
backend='nccl',
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
def cleanup():
dist.destroy_process_group()
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(1024, 2048),
nn.ReLU(),
nn.Linear(2048, 1024),
nn.ReLU(),
nn.Linear(1024, 10)
)
def forward(self, x):
return self.layers(x)
def train(rank, world_size, epochs=5):
setup(rank, world_size)
# 각 GPU에 모델 배치 후 DDP로 래핑
model = SimpleModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
# DistributedSampler: 각 프로세스가 다른 데이터 처리
dataset = torch.utils.data.TensorDataset(
torch.randn(1000, 1024),
torch.randint(0, 10, (1000,))
)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
sampler.set_epoch(epoch) # 에포크마다 셔플 재설정
total_loss = 0.0
for batch_x, batch_y in dataloader:
batch_x = batch_x.to(rank)
batch_y = batch_y.to(rank)
optimizer.zero_grad()
output = ddp_model(batch_x)
loss = criterion(output, batch_y)
loss.backward() # 자동으로 그래디언트 AllReduce 수행
optimizer.step()
total_loss += loss.item()
if rank == 0: # 메인 프로세스에서만 로그 출력
print(f"Epoch {epoch}: Loss = {total_loss/len(dataloader):.4f}")
cleanup()
# 실행: torchrun --nproc_per_node=4 train.py
if __name__ == '__main__':
world_size = torch.cuda.device_count()
torch.multiprocessing.spawn(
train,
args=(world_size,),
nprocs=world_size,
join=True
)
NCCL과 통신 백엔드
NCCL (NVIDIA Collective Communications Library):
단일 노드 (NVLink):
GPU0 ←──── NVLink ────→ GPU1
│ │
600 GB/s 양방향 대역폭
다중 노드 (InfiniBand + RDMA):
Node 1 ←── IB (200 Gb/s) ──→ Node 2
│ │
RDMA: CPU 없이 GPU-to-GPU 직접 통신
백엔드 선택:
- nccl: GPU 분산 학습 (권장)
- gloo: CPU 학습 또는 디버깅
- mpi: HPC 환경
Horovod와의 비교
| 항목 | PyTorch DDP | Horovod |
|---|---|---|
| 통합 | PyTorch 내장 | 별도 설치 |
| 알고리즘 | Bucketed AllReduce | Ring-AllReduce |
| 프레임워크 | PyTorch 전용 | TF/PyTorch/MXNet |
| 사용 편의성 | 약간 복잡 | 간단 (hvd.init()) |
| 성능 | 유사 | 유사 |
7. 관측성: 분산 추적, 로그, 메트릭
분산 추적 (Distributed Tracing)
마이크로서비스 환경에서 요청이 여러 서비스를 거칠 때 전체 경로를 추적합니다.
요청 흐름 (Jaeger/Zipkin 추적):
Browser → API GW → Order Svc → Inventory Svc → DB
│ │ │ │ │
│ TraceID: abc123 │ │ │
│ SpanID: 001 SpanID: 002 SpanID: 003 │
│ │ │ │
└──────────────────────┴────────────┴──────────┘
전체 레이턴시: 120ms
Order Svc: 80ms (병목!)
OpenTelemetry 계층:
- API: 계측 코드 작성
- SDK: 데이터 처리 및 내보내기
- Exporter: Jaeger/Zipkin/OTLP로 전송
ELK Stack 로그 집계
┌─────────────────────────────────────────────────────┐
│ ELK Stack │
│ │
│ App Logs ──▶ Filebeat ──▶ Logstash ──▶ Elasticsearch│
│ K8s Logs ──▶ Fluentd ──/ ──▶ Kibana │
│ (시각화) │
│ │
│ 또는 현대적 스택: │
│ App Logs ──▶ Promtail ──▶ Loki ──▶ Grafana │
└─────────────────────────────────────────────────────┘
Prometheus + Grafana 메트릭
# Prometheus 스크레이핑 설정
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-broker-1:9090', 'kafka-broker-2:9090']
metrics_path: '/metrics'
scrape_interval: 15s
- job_name: 'pytorch-training'
static_configs:
- targets: ['trainer-0:8080', 'trainer-1:8080']
핵심 Kafka 메트릭 (Grafana 대시보드):
kafka_consumer_lag: 컨슈머 지연 (핵심 SLA 지표)kafka_network_request_rate: 브로커 요청 처리율kafka_log_size: 파티션 로그 크기kafka_under_replicated_partitions: 복제 지연 파티션 수
퀴즈
Q1. CAP theorem에서 네트워크 파티션 발생 시 CP 시스템과 AP 시스템의 트레이드오프 차이는?
정답: CP 시스템은 파티션 상황에서 가용성을 포기하고 일관성을 유지합니다. AP 시스템은 반대로 일관성을 포기하고 가용성을 유지합니다.
설명: 네트워크 파티션이 발생하면 분리된 노드 간 통신이 불가능합니다. CP 시스템(예: etcd, HBase)은 이때 요청을 거부하거나 오류를 반환하여 오래된 데이터 제공을 막습니다. AP 시스템(예: Cassandra, DynamoDB)은 파티션된 노드에서도 응답을 계속하되, 서로 다른 노드가 다른 데이터를 반환할 수 있습니다. 파티션이 해소되면 최종 일관성(eventual consistency)을 통해 데이터를 맞춥니다.
Q2. Raft 알고리즘에서 리더 장애 시 새 리더를 선출하는 과정은?
정답: Follower가 election timeout 내 heartbeat를 수신하지 못하면 Candidate가 되어 RequestVote RPC를 전송하고, 과반수 투표를 받은 노드가 새 Leader가 됩니다.
설명: 리더 장애 시 Follower들은 election timeout(보통 150-300ms) 동안 heartbeat를 기다립니다. Timeout이 지나면 Candidate로 전환하여 term을 1 증가시키고 자신에게 먼저 투표합니다. 이후 다른 노드들에 RequestVote RPC를 전송합니다. 각 노드는 한 term에 한 번만 투표할 수 있으며, 과반수 투표를 받은 Candidate가 Leader가 됩니다. Split vote 발생 시 모두 timeout 후 재시도합니다.
Q3. Kafka에서 consumer group의 컨슈머 수가 partition 수보다 많을 때 발생하는 문제는?
정답: 초과하는 컨슈머들은 아무 파티션도 할당받지 못하고 idle 상태가 됩니다.
설명: Kafka의 소비 모델에서 하나의 파티션은 동일 consumer group 내 하나의 컨슈머만 처리할 수 있습니다. 따라서 컨슈머가 4개이고 파티션이 3개라면, 1개의 컨슈머는 어떤 파티션도 할당받지 못해 메시지를 처리하지 못합니다. 이는 자원 낭비이므로, 일반적으로 컨슈머 수는 파티션 수를 초과하지 않도록 설계합니다. 처리량 증가를 위해서는 파티션 수를 먼저 늘려야 합니다.
Q4. Ring-AllReduce가 Parameter Server 방식보다 통신 효율이 높은 이유는?
정답: Ring-AllReduce는 각 노드가 이웃과만 통신하여 총 대역폭이 노드 수와 무관하게 일정하지만, Parameter Server는 모든 Worker가 PS와 통신하여 PS의 대역폭이 병목이 됩니다.
설명: Parameter Server 방식에서 n개의 Worker가 있을 때 PS는 n개의 Worker 모두와 통신해야 합니다. Worker가 늘어날수록 PS의 네트워크 대역폭 요구량이 선형으로 증가합니다. 반면 Ring-AllReduce에서 각 Worker는 양쪽 이웃 노드에만 데이터를 전송합니다. n개 노드의 총 데이터 전송량은 2*(n-1)/n * data_size로, 모든 노드의 대역폭을 동등하게 활용합니다. 따라서 노드를 추가해도 각 노드의 통신 부담이 증가하지 않습니다.
Q5. Saga 패턴에서 보상 트랜잭션(compensating transaction)이 필요한 상황은?
정답: Saga의 중간 단계가 실패했을 때, 이미 완료된 이전 단계들의 효과를 되돌리기 위해 보상 트랜잭션이 실행됩니다.
설명: Saga 패턴은 분산 환경에서 ACID 트랜잭션을 사용할 수 없을 때 사용합니다. 각 단계는 로컬 트랜잭션으로 이미 커밋됩니다. 예를 들어 주문 생성 → 재고 차감 → 결제 처리 순서에서 결제 처리가 실패하면, 이미 완료된 재고 차감을 되돌리는 "재고 복구" 트랜잭션과 주문 생성을 되돌리는 "주문 취소" 트랜잭션이 역순으로 실행됩니다. 보상 트랜잭션 자체가 실패할 수 있으므로 멱등성(idempotency) 보장과 재시도 로직이 중요합니다.
마치며
분산 시스템은 하루아침에 마스터할 수 있는 분야가 아닙니다. CAP theorem의 트레이드오프를 이해하고, Raft 합의를 코드로 구현해보고, Kafka 파티션 전략을 실제 서비스에 적용해보면서 점차 깊이가 쌓입니다.
특히 AI 엔지니어라면 Ring-AllReduce와 NCCL 같은 분산 ML 학습 기술이 점점 더 중요해지고 있습니다. GPT-4 규모의 모델을 학습시키려면 수천 개 GPU의 네트워크 통신을 최적화해야 하고, 이는 결국 분산 시스템의 핵심 원리로 귀결됩니다.
핵심 학습 경로: CAP theorem 이해 → Raft 논문 읽기 → etcd 직접 운영 → Kafka 프로덕션 경험 → PyTorch DDP로 멀티 GPU 학습
Distributed Systems Complete Guide: From CAP Theorem to Distributed ML Training and Kafka
- Introduction
- 1. Distributed Systems Fundamentals: CAP Theorem and Consistency Models
- 2. Consensus Algorithms: Paxos and Raft
- 3. Distributed Transactions: 2PC, Saga, and CQRS
- 4. Message Queues: Kafka Architecture
- 5. Distributed Storage: Consistent Hashing and Cassandra
- 6. Distributed ML Training: Ring-AllReduce and PyTorch Distributed
- 7. Observability: Tracing, Logs, and Metrics
- Quiz
- Conclusion
Introduction
Distributed systems are the backbone of modern AI infrastructure. Training a large model like GPT-4 requires thousands of GPUs working in concert, and serving millions of inference requests per second demands distributed systems like Kafka and Cassandra.
This guide covers the distributed systems knowledge AI engineers need most — from theoretical foundations like the CAP theorem all the way to practical PyTorch distributed training code.
1. Distributed Systems Fundamentals: CAP Theorem and Consistency Models
CAP Theorem
Proposed by Eric Brewer in 2000, the CAP theorem describes a fundamental limitation of distributed systems.
┌─────────────────────────────────────────────┐
│ CAP Triangle │
│ │
│ Consistency (C) │
│ △ │
│ / \ │
│ / \ │
│ / \ │
│ / CA \ │
│ / (RDBMS)\ │
│ /───────────\ │
│ / CP │ AP \ │
│ / (HBase│(Cassandra) │
│ ▽────────┼────────▽ │
│ Availability Partition Tolerance │
│ (A) (P) │
└─────────────────────────────────────────────┘
No system can simultaneously guarantee all three properties:
- C (Consistency): All nodes read the same data at the same time
- A (Availability): Every request receives a response
- P (Partition Tolerance): The system continues operating despite network partitions
Since network partitions are inevitable in real distributed environments, the practical choice is always CP vs AP.
| System | Choice | Example |
|---|---|---|
| HBase, Zookeeper, etcd | CP | Sacrifice availability during partition |
| Cassandra, DynamoDB | AP | Sacrifice consistency during partition |
| Traditional RDBMS | CA | Does not tolerate partitions |
PACELC: Extending CAP
CAP only covers partition scenarios, but PACELC also captures the Latency vs Consistency tradeoff during normal operation.
P → A or C (during partition)
E → L or C (during normal operation)
Cassandra: PA/EL (availability + low latency first)
etcd/Raft: PC/EC (consistency first)
Consistency Models
Ordered from strongest to weakest:
Linearizability
└── Strongest. All operations appear instantaneous
└── Examples: etcd, Zookeeper
Sequential Consistency
└── All nodes observe operations in the same order
└── Examples: CPU memory model
Causal Consistency
└── Causally related operations maintain order
└── Examples: MongoDB causally consistent sessions
Eventual Consistency
└── All nodes converge to the same value eventually
└── Examples: Cassandra, DNS, S3
2. Consensus Algorithms: Paxos and Raft
The Problem with Paxos
Leslie Lamport introduced Paxos in 1989. It is theoretically sound but notoriously difficult to understand. Diego Ongaro literally titled his Raft paper "In Search of an Understandable Consensus Algorithm" for this reason.
The Raft Algorithm
Raft decomposes distributed consensus into three independent sub-problems:
- Leader Election
- Log Replication
- Safety
Node States
┌──────────┐ timeout ┌──────────────┐ majority ┌────────┐
│ │──────────▶│ Candidate │────────────▶│ │
│ Follower │ └──────────────┘ │ Leader │
│ │◀─────────────────────────────────────── │ │
└──────────┘ receive heartbeat └────────┘
▲ │
└────────────────── heartbeat ──────────────────────┘
Leader Election Process
- A Follower that receives no heartbeat within an election timeout (150-300ms) transitions to Candidate
- It increments its term number, votes for itself, and sends RequestVote RPCs to other nodes
- The first Candidate to receive a majority of votes becomes the new Leader
- The Leader continuously sends AppendEntries (heartbeat) RPCs to prevent new elections
Log Replication
# Distributed locking with etcd
import etcd3
import time
def distributed_lock(etcd_client, lock_name, ttl=10):
"""Distributed lock using Raft-backed etcd"""
lease = etcd_client.lease(ttl)
lock_key = f"/locks/{lock_name}"
# atomic Compare-And-Swap
success, _ = etcd_client.transaction(
compare=[
etcd3.transactions.create(lock_key, '==', 0)
],
success=[
etcd3.transactions.put(lock_key, 'locked', lease=lease)
],
failure=[]
)
if success:
print(f"Lock acquired: {lock_name}")
return lease
else:
print(f"Lock already held: {lock_name}")
return None
def release_lock(etcd_client, lease):
"""Release the distributed lock"""
if lease:
lease.revoke()
print("Lock released")
# Usage
client = etcd3.client(host='localhost', port=2379)
lease = distributed_lock(client, 'my-resource')
if lease:
try:
# Perform work in critical section
time.sleep(5)
finally:
release_lock(client, lease)
How Kubernetes Uses etcd
Kubernetes stores all cluster state in etcd. Write throughput is bounded by Raft consensus, which is why 3 to 5 etcd nodes are recommended for production clusters.
┌─────────────────────────────────────────────────────┐
│ etcd Cluster (3 nodes) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Leader │───▶│Follower 1│ │Follower 2│ │
│ │ node-1 │───▶│ node-2 │ │ node-3 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ▲ │ │ │
│ └───────────────┴───────────────┘ │
│ Raft Consensus │
└─────────────────────────────────────────────────────┘
3. Distributed Transactions: 2PC, Saga, and CQRS
2PC (Two-Phase Commit)
Phase 1 (Prepare):
Coordinator ──▶ Participant A: "Are you ready?"
Coordinator ──▶ Participant B: "Are you ready?"
Participant A ──▶ Coordinator: "YES"
Participant B ──▶ Coordinator: "YES"
Phase 2 (Commit):
Coordinator ──▶ Participant A: "Commit!"
Coordinator ──▶ Participant B: "Commit!"
The critical weakness of 2PC: if the Coordinator crashes after Phase 1, Participants are stuck in a blocking state indefinitely.
Saga Pattern
The Saga pattern decomposes a long transaction into a sequence of local transactions. If any step fails, compensating transactions execute in reverse order.
Order Saga:
1. Create Order → compensate: Cancel Order
2. Deduct Inventory → compensate: Restore Inventory
3. Process Payment → compensate: Refund Payment
4. Start Shipping → compensate: Cancel Shipment
from typing import Callable, List
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SagaStep:
name: str
action: Callable
compensating_action: Callable
class SagaOrchestrator:
"""Saga Orchestrator pattern implementation"""
def __init__(self, steps: List[SagaStep]):
self.steps = steps
self.completed_steps = []
def execute(self, context: dict) -> bool:
for step in self.steps:
try:
logger.info(f"Executing step: {step.name}")
step.action(context)
self.completed_steps.append(step)
logger.info(f"Step completed: {step.name}")
except Exception as e:
logger.error(f"Step failed: {step.name}, error: {e}")
self._compensate(context)
return False
return True
def _compensate(self, context: dict):
"""Execute compensating transactions in reverse order"""
logger.info("Starting compensation...")
for step in reversed(self.completed_steps):
try:
logger.info(f"Compensating: {step.name}")
step.compensating_action(context)
except Exception as e:
logger.error(f"Compensation failed: {step.name}, error: {e}")
raise RuntimeError(f"Saga compensation failed at {step.name}")
# Define the Order Saga
def create_order(ctx):
ctx['order_id'] = 'ORD-001'
print(f"Order created: {ctx['order_id']}")
def cancel_order(ctx):
print(f"Order cancelled: {ctx['order_id']}")
def deduct_inventory(ctx):
ctx['inventory_reserved'] = True
print("Inventory deducted")
def restore_inventory(ctx):
print("Inventory restored")
def process_payment(ctx):
raise Exception("Payment gateway timeout")
def refund_payment(ctx):
print("Payment refunded")
order_saga = SagaOrchestrator([
SagaStep("create_order", create_order, cancel_order),
SagaStep("deduct_inventory", deduct_inventory, restore_inventory),
SagaStep("process_payment", process_payment, refund_payment),
])
context = {"user_id": "user-123", "amount": 50000}
result = order_saga.execute(context)
print(f"Saga result: {'success' if result else 'failed with compensation'}")
Event Sourcing and CQRS
CQRS Architecture:
Command Side (Writes) Query Side (Reads)
───────────────── ──────────────────
POST /orders ──▶ GET /orders/123
PUT /inventory ──▶ GET /inventory/stats
│ ▲
▼ │
Event Store ──── Projection ────────┘
(append-only) (updates read models)
The key idea of event sourcing: store the sequence of events rather than the current state. This enables point-in-time state reconstruction, audit logs, and event replay.
4. Message Queues: Kafka Architecture
Core Kafka Concepts
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ Topic: orders (4 partitions, replication factor: 3) │
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ P0 L │ │ P0 F │ │ P0 F │ L=Leader, F=Follower │
│ │ P1 F │ │ P1 L │ │ P1 F │ │
│ │ P2 F │ │ P2 F │ │ P2 L │ │
│ │ P3 L │ │ P3 F │ │ P3 F │ │
│ └──────┘ └──────┘ └──────┘ │
│ │
│ Consumer Group A Consumer Group B │
│ Consumer 1: P0, P1 Consumer X: P0, P1, P2, P3 │
│ Consumer 2: P2, P3 │
└─────────────────────────────────────────────────────────────┘
- Partition: The unit of ordering guarantees and parallel processing
- Replication: Partitions replicated across brokers for high availability
- Consumer Group: Each partition is consumed by exactly one consumer per group
Kafka Producer/Consumer in Practice
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
# Producer configuration
producer_config = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # Wait for all ISR replicas to acknowledge
'retries': 3,
'enable.idempotence': True, # Exactly-once semantics
'compression.type': 'snappy',
}
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer = Producer(producer_config)
# Publish order events
for i in range(10):
order_event = {
'order_id': f'ORD-{i:04d}',
'user_id': f'user-{i % 5}',
'amount': 10000 * (i + 1),
'timestamp': time.time()
}
producer.produce(
topic='orders',
key=order_event['user_id'], # Same user → same partition
value=json.dumps(order_event),
callback=delivery_report
)
producer.poll(0)
producer.flush()
print("All messages sent")
# Consumer configuration
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manual commit for at-least-once
}
consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaError(msg.error())
event = json.loads(msg.value().decode('utf-8'))
print(f"Processing order: {event['order_id']}, amount: {event['amount']}")
# Commit offset after successful processing
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()
Kafka Topic YAML Configuration
# Kafka Topic configuration (Strimzi Operator)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: orders
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 12 # Max parallelism = partition count
replicas: 3 # Replication factor
config:
retention.ms: '604800000' # Retain messages for 7 days
min.insync.replicas: '2' # Require 2 ISR replicas for ack
compression.type: snappy
cleanup.policy: delete
Pulsar vs Kafka
| Feature | Kafka | Pulsar |
|---|---|---|
| Storage | Coupled with broker | Separate (Apache BookKeeper) |
| Multi-tenancy | Limited | Native support |
| Geo-replication | MirrorMaker2 | Built-in |
| Latency | ~5ms | ~1ms |
| Operational complexity | Low | Higher |
| Maturity | Very high | High |
5. Distributed Storage: Consistent Hashing and Cassandra
Consistent Hashing
The problem with naive hashing: adding or removing a node remaps nearly all keys. Consistent hashing solution: only a minimal number of keys are remapped when the node set changes.
import hashlib
from sortedcontainers import SortedDict
class ConsistentHashRing:
"""Consistent hashing ring implementation"""
def __init__(self, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = SortedDict()
self.nodes = set()
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""Add a node to the ring with virtual nodes"""
self.nodes.add(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}#{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = node
print(f"Added node: {node} with {self.virtual_nodes} virtual nodes")
def remove_node(self, node: str):
"""Remove a node from the ring"""
self.nodes.discard(node)
keys_to_remove = [k for k, v in self.ring.items() if v == node]
for k in keys_to_remove:
del self.ring[k]
print(f"Removed node: {node}")
def get_node(self, key: str) -> str:
"""Return the node responsible for a key"""
if not self.ring:
raise Exception("Ring is empty")
hash_val = self._hash(key)
idx = self.ring.bisect_left(hash_val)
if idx == len(self.ring):
idx = 0 # Wrap around the ring
return self.ring.values()[idx]
def get_nodes_for_replication(self, key: str, n: int) -> list:
"""Return n distinct nodes for replication"""
if len(self.nodes) < n:
raise Exception(f"Not enough nodes: {len(self.nodes)} < {n}")
hash_val = self._hash(key)
idx = self.ring.bisect_left(hash_val)
result_nodes = []
seen_nodes = set()
total = len(self.ring)
for i in range(total):
curr_idx = (idx + i) % total
node = self.ring.values()[curr_idx]
if node not in seen_nodes:
seen_nodes.add(node)
result_nodes.append(node)
if len(result_nodes) == n:
break
return result_nodes
# Example usage
ring = ConsistentHashRing(virtual_nodes=150)
for i in range(5):
ring.add_node(f"cassandra-node-{i}")
for key in ["user:1001", "order:5555", "product:abc", "session:xyz"]:
node = ring.get_node(key)
replicas = ring.get_nodes_for_replication(key, 3)
print(f"Key '{key}' -> Primary: {node}, Replicas: {replicas}")
Apache Cassandra Architecture
┌──────────────────────────────────────────────────────────────────┐
│ Cassandra Ring (6 nodes) │
│ │
│ Node 1 │
│ / \ │
│ Node 6 Node 2 │
│ | | │
│ Node 5 Node 3 │
│ \ / │
│ Node 4 │
│ │
│ Write Path: │
│ Client → Coordinator → Hash partition key → Responsible nodes │
│ → CommitLog (WAL) → Memtable → SSTable │
│ │
│ Read Path: │
│ Client → Coordinator → Responsible nodes → Row Cache / Bloom │
│ → Merge SSTables → Return latest data │
└──────────────────────────────────────────────────────────────────┘
Key Cassandra characteristics:
- Tunable Consistency: Choose consistency level per query:
QUORUM,ONE,ALL - Gossip Protocol: Nodes spread state information using an epidemic algorithm
- Compaction: Periodically merges SSTables to improve read performance
6. Distributed ML Training: Ring-AllReduce and PyTorch Distributed
Parameter Server vs All-Reduce
Parameter Server approach:
───────────────────────
Worker 1 ─┐
Worker 2 ─┤── PS (Parameter Server) ──▶ Aggregate + broadcast gradients
Worker 3 ─┘
Problem: PS is a bottleneck. Bandwidth requirement is O(n).
Ring-AllReduce approach:
────────────────────────
Worker 1 ──▶ Worker 2 ──▶ Worker 3
▲ │
└──────────────────────────┘
Each worker communicates only with neighbors.
Total data transferred: 2 * (n-1)/n * data_size (independent of n)
Ring-AllReduce operates in two phases:
- Reduce-Scatter: Each node reduces its chunk and sends it to the next node
- All-Gather: The reduced results are broadcast to all nodes
PyTorch DistributedDataParallel (DDP)
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def setup(rank, world_size):
"""Initialize distributed training"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# NCCL backend: high-speed GPU communication (NVLink, InfiniBand)
dist.init_process_group(
backend='nccl',
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
def cleanup():
dist.destroy_process_group()
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(1024, 2048),
nn.ReLU(),
nn.Linear(2048, 1024),
nn.ReLU(),
nn.Linear(1024, 10)
)
def forward(self, x):
return self.layers(x)
def train(rank, world_size, epochs=5):
setup(rank, world_size)
# Place model on GPU, wrap with DDP
model = SimpleModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
# DistributedSampler ensures each process gets a different data shard
dataset = torch.utils.data.TensorDataset(
torch.randn(1000, 1024),
torch.randint(0, 10, (1000,))
)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
sampler.set_epoch(epoch) # Reseed shuffle each epoch
total_loss = 0.0
for batch_x, batch_y in dataloader:
batch_x = batch_x.to(rank)
batch_y = batch_y.to(rank)
optimizer.zero_grad()
output = ddp_model(batch_x)
loss = criterion(output, batch_y)
loss.backward() # Gradient AllReduce happens automatically
optimizer.step()
total_loss += loss.item()
if rank == 0: # Only rank 0 logs
print(f"Epoch {epoch}: Loss = {total_loss/len(dataloader):.4f}")
cleanup()
# Launch: torchrun --nproc_per_node=4 train.py
if __name__ == '__main__':
world_size = torch.cuda.device_count()
torch.multiprocessing.spawn(
train,
args=(world_size,),
nprocs=world_size,
join=True
)
NCCL and Communication Backends
NCCL (NVIDIA Collective Communications Library):
Single node (NVLink):
GPU0 ←──── NVLink ────→ GPU1
│ │
600 GB/s bidirectional bandwidth
Multi-node (InfiniBand + RDMA):
Node 1 ←── IB (200 Gb/s) ──→ Node 2
│ │
RDMA: direct GPU-to-GPU without CPU involvement
Backend selection:
- nccl: GPU distributed training (recommended)
- gloo: CPU training or debugging
- mpi: HPC environments
Horovod Comparison
| Feature | PyTorch DDP | Horovod |
|---|---|---|
| Integration | Built into PyTorch | Separate install |
| Algorithm | Bucketed AllReduce | Ring-AllReduce |
| Frameworks | PyTorch only | TF/PyTorch/MXNet |
| Ease of use | Moderate | Simple (hvd.init()) |
| Performance | Comparable | Comparable |
7. Observability: Tracing, Logs, and Metrics
Distributed Tracing
In a microservices architecture, requests traverse multiple services. Distributed tracing lets you see the entire journey of a single request.
Request flow (traced by Jaeger/Zipkin):
Browser → API GW → Order Svc → Inventory Svc → DB
│ │ │ │ │
│ TraceID: abc123 │ │ │
│ SpanID: 001 SpanID: 002 SpanID: 003 │
│ │ │ │
└──────────────────────┴────────────┴──────────┘
Total latency: 120ms
Order Svc: 80ms (bottleneck!)
OpenTelemetry layers:
- API: write instrumentation code
- SDK: process and export telemetry data
- Exporter: send to Jaeger / Zipkin / OTLP endpoint
ELK Stack Log Aggregation
┌─────────────────────────────────────────────────────┐
│ ELK Stack │
│ │
│ App Logs ──▶ Filebeat ──▶ Logstash ──▶ Elasticsearch│
│ K8s Logs ──▶ Fluentd ──/ ──▶ Kibana │
│ (dashboards)│
│ │
│ Modern alternative: │
│ App Logs ──▶ Promtail ──▶ Loki ──▶ Grafana │
└─────────────────────────────────────────────────────┘
Prometheus + Grafana Metrics
# Prometheus scrape configuration
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-broker-1:9090', 'kafka-broker-2:9090']
metrics_path: '/metrics'
scrape_interval: 15s
- job_name: 'pytorch-training'
static_configs:
- targets: ['trainer-0:8080', 'trainer-1:8080']
Critical Kafka metrics for Grafana dashboards:
kafka_consumer_lag: Consumer lag (key SLA metric)kafka_network_request_rate: Broker request throughputkafka_log_size: Partition log sizekafka_under_replicated_partitions: Number of under-replicated partitions
Quiz
Q1. When a network partition occurs, what are the tradeoffs a CP system and an AP system each make?
Answer: A CP system sacrifices availability to maintain consistency during a partition. An AP system does the opposite — it sacrifices consistency to maintain availability.
Explanation: When a network partition occurs, nodes on opposite sides of the partition can no longer communicate. A CP system (e.g., etcd, HBase) will refuse requests or return errors rather than risk serving stale data. An AP system (e.g., Cassandra, DynamoDB) continues serving requests from partitioned nodes, meaning different nodes may return different data. Once the partition heals, the system reconciles via eventual consistency.
Q2. Describe the process by which Raft elects a new leader when the current leader fails.
Answer: When a Follower does not receive a heartbeat within its election timeout, it becomes a Candidate, sends RequestVote RPCs, and the first Candidate to receive a majority of votes becomes the new Leader.
Explanation: After the leader fails, Followers wait for their election timeout (typically 150-300ms). The first Follower to time out transitions to Candidate, increments its term, votes for itself, and sends RequestVote RPCs to all other nodes. Each node grants at most one vote per term on a first-come-first-served basis. The first Candidate to accumulate a majority wins and becomes the new Leader. If a split vote occurs, all Candidates time out and restart the election with a new, higher term.
Q3. What happens in Kafka when a consumer group has more consumers than partitions?
Answer: The excess consumers receive no partition assignment and remain idle — they process no messages.
Explanation: In Kafka's consumption model, each partition in a topic can be consumed by at most one consumer within the same consumer group at a time. If there are 4 consumers but only 3 partitions, one consumer will have no partition assigned and will sit idle, consuming resources but doing no work. For this reason, you generally design so the number of consumers does not exceed the number of partitions. To scale throughput further, you must increase the partition count first.
Q4. Why is Ring-AllReduce more communication-efficient than the Parameter Server approach?
Answer: In Ring-AllReduce, each node communicates only with its two neighbors, so total bandwidth usage is constant regardless of the number of nodes. In Parameter Server, all Workers communicate with the PS, making the PS a bandwidth bottleneck that scales linearly with worker count.
Explanation: With n workers in a Parameter Server setup, the PS must exchange gradients with all n workers. As n grows, the PS's required bandwidth grows linearly. In Ring-AllReduce, each worker sends data only to the next worker in the ring. The total data transferred per worker converges to 2 _ (n-1)/n _ data_size, which approaches 2x data_size as n grows — independent of the number of workers. This allows every node's bandwidth to be fully and evenly utilized.
Q5. In the Saga pattern, when is a compensating transaction required?
Answer: When an intermediate step in a Saga fails, compensating transactions are executed for all previously completed steps (in reverse order) to undo their effects.
Explanation: The Saga pattern handles distributed transactions when ACID guarantees are unavailable. Each step commits locally and immediately. For example, in an Order Saga — Create Order → Deduct Inventory → Process Payment — if the payment step fails, a "Refund Payment" compensating transaction runs, followed by a "Restore Inventory" transaction, and finally a "Cancel Order" transaction. Because compensating transactions can also fail, they must be idempotent and designed for retries. Manual intervention may be required if a compensation step fails.
Conclusion
Distributed systems mastery is not acquired overnight. Understanding CAP theorem tradeoffs, implementing Raft consensus by hand, applying Kafka partition strategies to real production services — these skills deepen through experience.
For AI engineers in particular, distributed ML training techniques like Ring-AllReduce and NCCL are becoming increasingly central. Training a model at the scale of GPT-4 requires optimizing the network communication of thousands of GPUs, which ultimately comes down to the core principles of distributed systems.
Recommended learning path: Understand CAP theorem → Read the Raft paper → Operate etcd in production → Get Kafka production experience → Train multi-GPU models with PyTorch DDP