Skip to content
Published on

분산 시스템 완전 정복: CAP theorem부터 분산 ML 학습, Kafka까지

Authors

들어가며

분산 시스템은 현대 AI 인프라의 뼈대입니다. GPT-4 같은 대형 모델을 학습시키려면 수천 개의 GPU가 협력해야 하고, 초당 수백만 건의 추론 요청을 처리하려면 Kafka, Cassandra 같은 분산 시스템이 필수입니다.

이 가이드는 CAP theorem 같은 이론적 기초부터 실제 PyTorch 분산 학습 코드까지 AI 엔지니어에게 필요한 분산 시스템 지식을 체계적으로 다룹니다.


1. 분산 시스템 기초: CAP theorem과 일관성 모델

CAP Theorem

2000년 Eric Brewer가 제시한 CAP theorem은 분산 시스템의 근본적 한계를 설명합니다.

┌─────────────────────────────────────────────┐
CAP Triangle│                                             │
Consistency (C)│               △                             │
/ \                            │
/   \                           │
/     \                          │
/  CA   \                         │
/  (RDBMS)\                        │
/───────────\                       │
/  CPAP  \                      │
/ (HBase│(Cassandra)│      ▽────────┼────────▽                    │
Availability  Partition Tolerance     (A)              (P)└─────────────────────────────────────────────┘

세 가지 속성은 동시에 모두 만족할 수 없습니다:

  • C (Consistency): 모든 노드가 동일한 데이터를 읽는다
  • A (Availability): 모든 요청이 응답을 받는다
  • P (Partition Tolerance): 네트워크 파티션 상황에서도 작동한다

네트워크 파티션은 실제 분산 환경에서 반드시 발생하기 때문에, 실질적 선택은 CP vs AP입니다.

시스템선택사례
HBase, Zookeeper, etcdCP파티션 시 가용성 포기
Cassandra, DynamoDBAP파티션 시 일관성 포기
전통적 RDBMSCA파티션 허용 안 함

PACELC: CAP의 확장

CAP은 파티션 상황만 다루지만, PACELC는 정상 운영 시의 Latency vs Consistency 트레이드오프도 포함합니다.

PA or C  (파티션 발생 시)
EL or C  (정상 운영 시)

Cassandra: PA/EL  (가용성 + 낮은 레이턴시 우선)
etcd/Raft: PC/EC  (일관성 우선)

일관성 모델

강한 일관성에서 약한 일관성 순서로:

Linearizability (선형화 가능성)
  └── 가장 강력. 모든 연산이 즉각 반영
  └── 예: etcd, Zookeeper

Sequential Consistency (순차 일관성)
  └── 모든 노드가 같은 순서로 연산을 봄
  └── 예: CPU 메모리 모델

Causal Consistency (인과 일관성)
  └── 인과관계가 있는 연산은 순서 보장
  └── 예: MongoDB causally consistent sessions

Eventual Consistency (최종 일관성)
  └── 언젠가는 일관성이 맞춰짐
  └── 예: Cassandra, DNS, S3

2. 합의 알고리즘: Paxos와 Raft

Paxos의 문제

Leslie Lamport가 1989년 제안한 Paxos는 이론적으로 완벽하지만 이해하기 너무 어렵습니다. Diego Ongaro가 "Paxos는 이해하기 너무 어렵다"는 논문 제목으로 Raft를 발표한 이유가 여기에 있습니다.

Raft 알고리즘

Raft는 분산 합의를 세 가지 독립적 문제로 분리합니다:

  1. 리더 선출 (Leader Election)
  2. 로그 복제 (Log Replication)
  3. 안전성 (Safety)

노드 상태

┌──────────┐  timeout   ┌──────────────┐  majority   ┌────────┐
│          │──────────▶│   Candidate  │────────────▶│        │
Follower │            └──────────────┘             │ Leader│          │◀─────────────────────────────────────── │        │
└──────────┘          receive heartbeat              └────────┘
     ▲                                                    │
     └────────────────── heartbeat ──────────────────────┘

리더 선출 과정

  1. Follower가 election timeout(150-300ms) 동안 heartbeat를 못 받으면 Candidate로 전환
  2. term 번호를 1 증가시키고 자신에게 투표 후 다른 노드에 RequestVote RPC 발송
  3. 과반수 투표를 받으면 Leader 당선
  4. Leader는 주기적으로 AppendEntries(heartbeat) 발송

로그 복제

# etcd를 이용한 분산 잠금 구현
import etcd3
import time

def distributed_lock(etcd_client, lock_name, ttl=10):
    """Raft 기반 etcd로 분산 잠금 구현"""
    lease = etcd_client.lease(ttl)
    lock_key = f"/locks/{lock_name}"

    # atomic CAS (Compare-And-Swap)
    success, _ = etcd_client.transaction(
        compare=[
            etcd3.transactions.create(lock_key, '==', 0)
        ],
        success=[
            etcd3.transactions.put(lock_key, 'locked', lease=lease)
        ],
        failure=[]
    )

    if success:
        print(f"Lock acquired: {lock_name}")
        return lease
    else:
        print(f"Lock already held: {lock_name}")
        return None

def release_lock(etcd_client, lease):
    """잠금 해제"""
    if lease:
        lease.revoke()
        print("Lock released")

# 사용 예시
client = etcd3.client(host='localhost', port=2379)

lease = distributed_lock(client, 'my-resource')
if lease:
    try:
        # 임계 영역에서 작업 수행
        time.sleep(5)
    finally:
        release_lock(client, lease)

etcd 실전: Kubernetes가 etcd를 사용하는 방식

Kubernetes는 모든 클러스터 상태를 etcd에 저장합니다. etcd 클러스터의 쓰기 성능은 Raft 합의에 의해 제한되므로, 일반적으로 3~5개 노드를 권장합니다.

┌─────────────────────────────────────────────────────┐
│                  etcd Cluster (3 nodes)│                                                     │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐      │
│  │  Leader  │───▶│Follower 1│    │Follower 2│      │
│  │  node-1  │───▶│  node-2  │    │  node-3  │      │
│  └──────────┘    └──────────┘    └──────────┘      │
│       ▲               │               │             │
│       └───────────────┴───────────────┘             │
Raft Consensus└─────────────────────────────────────────────────────┘

3. 분산 트랜잭션: 2PC, Saga, CQRS

2PC (Two-Phase Commit)

Phase 1 (Prepare):
  Coordinator ──▶ Participant A: "준비됐어?"
  Coordinator ──▶ Participant B: "준비됐어?"
  Participant A ──▶ Coordinator: "YES"
  Participant B ──▶ Coordinator: "YES"

Phase 2 (Commit):
  Coordinator ──▶ Participant A: "커밋해!"
  Coordinator ──▶ Participant B: "커밋해!"

2PC의 문제점: Coordinator 장애 시 Participant들이 blocking 상태에 빠집니다.

Saga 패턴

Saga는 긴 트랜잭션을 여러 로컬 트랜잭션으로 분해하고, 실패 시 보상 트랜잭션을 실행합니다.

주문 Saga:
  1. 주문 생성 → 실패 시: 주문 취소
  2. 재고 차감 → 실패 시: 재고 복구
  3. 결제 처리 → 실패 시: 결제 환불
  4. 배송 시작 → 실패 시: 배송 취소
from typing import Callable, List
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

@dataclass
class SagaStep:
    name: str
    action: Callable
    compensating_action: Callable

class SagaOrchestrator:
    """Saga 오케스트레이터 패턴 구현"""

    def __init__(self, steps: List[SagaStep]):
        self.steps = steps
        self.completed_steps = []

    def execute(self, context: dict) -> bool:
        for step in self.steps:
            try:
                logger.info(f"Executing step: {step.name}")
                step.action(context)
                self.completed_steps.append(step)
                logger.info(f"Step completed: {step.name}")
            except Exception as e:
                logger.error(f"Step failed: {step.name}, error: {e}")
                self._compensate(context)
                return False
        return True

    def _compensate(self, context: dict):
        """역순으로 보상 트랜잭션 실행"""
        logger.info("Starting compensation...")
        for step in reversed(self.completed_steps):
            try:
                logger.info(f"Compensating: {step.name}")
                step.compensating_action(context)
            except Exception as e:
                logger.error(f"Compensation failed: {step.name}, error: {e}")
                # 보상 트랜잭션 실패는 수동 개입이 필요
                raise RuntimeError(f"Saga compensation failed at {step.name}")

# 주문 Saga 정의
def create_order(ctx):
    ctx['order_id'] = 'ORD-001'
    print(f"Order created: {ctx['order_id']}")

def cancel_order(ctx):
    print(f"Order cancelled: {ctx['order_id']}")

def deduct_inventory(ctx):
    ctx['inventory_reserved'] = True
    print("Inventory deducted")

def restore_inventory(ctx):
    print("Inventory restored")

def process_payment(ctx):
    # 결제 실패 시뮬레이션
    raise Exception("Payment gateway timeout")

def refund_payment(ctx):
    print("Payment refunded")

order_saga = SagaOrchestrator([
    SagaStep("create_order", create_order, cancel_order),
    SagaStep("deduct_inventory", deduct_inventory, restore_inventory),
    SagaStep("process_payment", process_payment, refund_payment),
])

context = {"user_id": "user-123", "amount": 50000}
result = order_saga.execute(context)
print(f"Saga result: {'success' if result else 'failed with compensation'}")

이벤트 소싱과 CQRS

CQRS 아키텍처:

  Command Side (쓰기)          Query Side (읽기)
  ─────────────────            ────────────────
  POST /orders          ──▶    GET /orders/123
  PUT  /inventory       ──▶    GET /inventory/stats
       │                              ▲
       ▼                              │
  Event Store ──── Projection ────────┘
  (append-only)    (read model 갱신)

이벤트 소싱의 핵심: 현재 상태 대신 이벤트의 시퀀스를 저장합니다. 이를 통해 특정 시점의 상태 복원, 감사 로그, 이벤트 리플레이가 가능합니다.


4. 메시지 큐: Kafka 아키텍처

Kafka 핵심 개념

┌─────────────────────────────────────────────────────────────┐
Kafka Cluster│                                                             │
Topic: orders (4 partitions, replication factor: 3)│                                                             │
Broker 1      Broker 2      Broker 3│  ┌──────┐      ┌──────┐      ┌──────┐                       │
│  │ P0 L │      │ P0 F │      │ P0 FL=Leader, F=Follower│  │ P1 F │      │ P1 L │      │ P1 F │                       │
│  │ P2 F │      │ P2 F │      │ P2 L │                       │
│  │ P3 L │      │ P3 F │      │ P3 F │                       │
│  └──────┘      └──────┘      └──────┘                       │
│                                                             │
Consumer Group A          Consumer Group BConsumer 1: P0, P1        Consumer X: P0, P1, P2, P3Consumer 2: P2, P3└─────────────────────────────────────────────────────────────┘
  • Partition: 순서 보장의 단위이자 병렬 처리의 단위
  • Replication: 파티션을 여러 브로커에 복제하여 고가용성 보장
  • Consumer Group: 각 파티션은 그룹 내 하나의 컨슈머만 처리

Kafka Producer/Consumer 실전

from confluent_kafka import Producer, Consumer, KafkaError
import json
import time

# Producer 설정
producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',          # 모든 ISR 복제 완료 후 확인
    'retries': 3,
    'enable.idempotence': True,  # 정확히 한 번 전송 보장
    'compression.type': 'snappy',
}

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer = Producer(producer_config)

# 주문 이벤트 발행
for i in range(10):
    order_event = {
        'order_id': f'ORD-{i:04d}',
        'user_id': f'user-{i % 5}',
        'amount': 10000 * (i + 1),
        'timestamp': time.time()
    }
    producer.produce(
        topic='orders',
        key=order_event['user_id'],   # 같은 user는 같은 파티션으로
        value=json.dumps(order_event),
        callback=delivery_report
    )
    producer.poll(0)

producer.flush()
print("All messages sent")

# Consumer 설정
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,   # 수동 오프셋 커밋으로 at-least-once 보장
}

consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                raise KafkaError(msg.error())

        event = json.loads(msg.value().decode('utf-8'))
        print(f"Processing order: {event['order_id']}, amount: {event['amount']}")

        # 처리 완료 후 오프셋 커밋
        consumer.commit(asynchronous=False)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Kafka Topic YAML 설정

# Kafka Topic 설정 (Strimzi Operator 기준)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: orders
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12 # 파티션 수 = 최대 병렬 컨슈머 수
  replicas: 3 # 복제 팩터
  config:
    retention.ms: '604800000' # 7일 보관
    min.insync.replicas: '2' # 최소 2개 ISR 복제 후 확인
    compression.type: snappy
    cleanup.policy: delete

Pulsar vs Kafka

항목KafkaPulsar
스토리지Broker에 직접Apache BookKeeper 분리
멀티 테넌시제한적네이티브 지원
지역 복제MirrorMaker2내장 Geo-Replication
레이턴시~5ms~1ms
운영 복잡도낮음높음
성숙도매우 높음높음

5. 분산 스토리지: Consistent Hashing과 Cassandra

Consistent Hashing

일반 해싱의 문제: 노드 추가/제거 시 거의 모든 키가 재배치됩니다. Consistent Hashing의 해결책: 노드 변경 시 최소한의 키만 재배치됩니다.

import hashlib
from sortedcontainers import SortedDict

class ConsistentHashRing:
    """Consistent Hashing 링 구현"""

    def __init__(self, virtual_nodes=150):
        self.virtual_nodes = virtual_nodes
        self.ring = SortedDict()
        self.nodes = set()

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node: str):
        """노드를 링에 추가 (가상 노드 포함)"""
        self.nodes.add(node)
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}#{i}"
            hash_val = self._hash(virtual_key)
            self.ring[hash_val] = node
        print(f"Added node: {node} with {self.virtual_nodes} virtual nodes")

    def remove_node(self, node: str):
        """노드를 링에서 제거"""
        self.nodes.discard(node)
        keys_to_remove = [k for k, v in self.ring.items() if v == node]
        for k in keys_to_remove:
            del self.ring[k]
        print(f"Removed node: {node}")

    def get_node(self, key: str) -> str:
        """키에 해당하는 노드 반환"""
        if not self.ring:
            raise Exception("Ring is empty")

        hash_val = self._hash(key)
        # 해시값보다 크거나 같은 첫 번째 가상 노드 찾기
        idx = self.ring.bisect_left(hash_val)
        if idx == len(self.ring):
            idx = 0  # 링이므로 wrap around

        return self.ring.values()[idx]

    def get_nodes_for_replication(self, key: str, n: int) -> list:
        """복제를 위한 n개의 노드 반환"""
        if len(self.nodes) < n:
            raise Exception(f"Not enough nodes: {len(self.nodes)} < {n}")

        hash_val = self._hash(key)
        idx = self.ring.bisect_left(hash_val)

        result_nodes = []
        seen_nodes = set()
        total = len(self.ring)

        for i in range(total):
            curr_idx = (idx + i) % total
            node = self.ring.values()[curr_idx]
            if node not in seen_nodes:
                seen_nodes.add(node)
                result_nodes.append(node)
                if len(result_nodes) == n:
                    break

        return result_nodes

# 사용 예시
ring = ConsistentHashRing(virtual_nodes=150)
for i in range(5):
    ring.add_node(f"cassandra-node-{i}")

# 키 분배 테스트
for key in ["user:1001", "order:5555", "product:abc", "session:xyz"]:
    node = ring.get_node(key)
    replicas = ring.get_nodes_for_replication(key, 3)
    print(f"Key '{key}' -> Primary: {node}, Replicas: {replicas}")

Apache Cassandra 아키텍처

┌─────────────────────────────────────────────────────────────────┐
Cassandra Ring (6 nodes)│                                                                 │
Node 1/      \                                  │
Node 6      Node 2|            |Node 5      Node 3│                       \      /Node 4│                                                                 │
Write Path:ClientCoordinatorPartition Key 해싱 → 담당 노드들        │
│                       → CommitLog (WAL)MemtableSSTable│                                                                 │
Read Path:ClientCoordinator → 담당 노드들 → Row Cache / Bloom Filter│                       → SSTable 병합 → 최신 데이터 반환         │
└─────────────────────────────────────────────────────────────────┘

Cassandra의 핵심 특성:

  • Tunable Consistency: QUORUM, ONE, ALL 등으로 일관성 수준 선택 가능
  • Gossip Protocol: 노드 간 상태 전파에 epidemic 알고리즘 사용
  • Compaction: 주기적으로 SSTable을 병합하여 읽기 성능 향상

6. 분산 ML 학습: Ring-AllReduce와 PyTorch Distributed

Parameter Server vs All-Reduce

Parameter Server 방식:
  ─────────────────────
  Worker 1 ─┐
  Worker 2 ─┤── PS (Parameter Server) ──▶ 그래디언트 집계 후 배포
  Worker 3 ─┘

  문제: PS가 병목. 대역폭이 O(n)으로 증가

Ring-AllReduce 방식:
  ────────────────────
  Worker 1 ──▶ Worker 2 ──▶ Worker 3
     ▲                          │
     └──────────────────────────┘

Worker가 이웃과만 통신.
   통신량은 노드 수에 무관하게 O(n-1)/n * data_size

Ring-AllReduce의 두 단계:

  1. Reduce-Scatter: 각 노드가 자신의 청크를 reduce하면서 다음 노드로 전달
  2. All-Gather: reduce된 결과를 모든 노드에 전파

PyTorch DistributedDataParallel (DDP)

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

def setup(rank, world_size):
    """분산 학습 초기화"""
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # NCCL 백엔드: GPU간 고속 통신 (NVLink, InfiniBand 활용)
    dist.init_process_group(
        backend='nccl',
        rank=rank,
        world_size=world_size
    )
    torch.cuda.set_device(rank)

def cleanup():
    dist.destroy_process_group()

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(1024, 2048),
            nn.ReLU(),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.Linear(1024, 10)
        )

    def forward(self, x):
        return self.layers(x)

def train(rank, world_size, epochs=5):
    setup(rank, world_size)

    # 각 GPU에 모델 배치 후 DDP로 래핑
    model = SimpleModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # DistributedSampler: 각 프로세스가 다른 데이터 처리
    dataset = torch.utils.data.TensorDataset(
        torch.randn(1000, 1024),
        torch.randint(0, 10, (1000,))
    )
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

    optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        sampler.set_epoch(epoch)  # 에포크마다 셔플 재설정
        total_loss = 0.0

        for batch_x, batch_y in dataloader:
            batch_x = batch_x.to(rank)
            batch_y = batch_y.to(rank)

            optimizer.zero_grad()
            output = ddp_model(batch_x)
            loss = criterion(output, batch_y)
            loss.backward()  # 자동으로 그래디언트 AllReduce 수행
            optimizer.step()
            total_loss += loss.item()

        if rank == 0:  # 메인 프로세스에서만 로그 출력
            print(f"Epoch {epoch}: Loss = {total_loss/len(dataloader):.4f}")

    cleanup()

# 실행: torchrun --nproc_per_node=4 train.py
if __name__ == '__main__':
    world_size = torch.cuda.device_count()
    torch.multiprocessing.spawn(
        train,
        args=(world_size,),
        nprocs=world_size,
        join=True
    )

NCCL과 통신 백엔드

NCCL (NVIDIA Collective Communications Library):

  단일 노드 (NVLink):
  GPU0 ←──── NVLink ────→ GPU1
    │                       │
   600 GB/s 양방향 대역폭

  다중 노드 (InfiniBand + RDMA):
  Node 1 ←── IB (200 Gb/s) ──→ Node 2
    │                              │
  RDMA: CPU 없이 GPU-to-GPU 직접 통신

백엔드 선택:
  - nccl: GPU 분산 학습 (권장)
  - gloo: CPU 학습 또는 디버깅
  - mpi: HPC 환경

Horovod와의 비교

항목PyTorch DDPHorovod
통합PyTorch 내장별도 설치
알고리즘Bucketed AllReduceRing-AllReduce
프레임워크PyTorch 전용TF/PyTorch/MXNet
사용 편의성약간 복잡간단 (hvd.init())
성능유사유사

7. 관측성: 분산 추적, 로그, 메트릭

분산 추적 (Distributed Tracing)

마이크로서비스 환경에서 요청이 여러 서비스를 거칠 때 전체 경로를 추적합니다.

요청 흐름 (Jaeger/Zipkin 추적):

  BrowserAPI GWOrder SvcInventory SvcDB
    │           │          │            │          │
TraceID: abc123     │            │          │
SpanID: 001    SpanID: 002  SpanID: 003    │                      │            │          │
    └──────────────────────┴────────────┴──────────┘
             전체 레이턴시: 120ms
             Order Svc: 80ms (병목!)
OpenTelemetry 계층:
  - API: 계측 코드 작성
  - SDK: 데이터 처리 및 내보내기
  - Exporter: Jaeger/Zipkin/OTLP로 전송

ELK Stack 로그 집계

┌─────────────────────────────────────────────────────┐
ELK Stack│                                                     │
App Logs ──▶ Filebeat ──▶ Logstash ──▶ Elasticsearch│
K8s Logs ──▶ Fluentd ──/             ──▶ Kibana                                         (시각화)│                                                     │
│  또는 현대적 스택:App Logs ──▶ Promtail ──▶ Loki ──▶ Grafana└─────────────────────────────────────────────────────┘

Prometheus + Grafana 메트릭

# Prometheus 스크레이핑 설정
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-broker-1:9090', 'kafka-broker-2:9090']
    metrics_path: '/metrics'
    scrape_interval: 15s

  - job_name: 'pytorch-training'
    static_configs:
      - targets: ['trainer-0:8080', 'trainer-1:8080']

핵심 Kafka 메트릭 (Grafana 대시보드):

  • kafka_consumer_lag: 컨슈머 지연 (핵심 SLA 지표)
  • kafka_network_request_rate: 브로커 요청 처리율
  • kafka_log_size: 파티션 로그 크기
  • kafka_under_replicated_partitions: 복제 지연 파티션 수

퀴즈

Q1. CAP theorem에서 네트워크 파티션 발생 시 CP 시스템과 AP 시스템의 트레이드오프 차이는?

정답: CP 시스템은 파티션 상황에서 가용성을 포기하고 일관성을 유지합니다. AP 시스템은 반대로 일관성을 포기하고 가용성을 유지합니다.

설명: 네트워크 파티션이 발생하면 분리된 노드 간 통신이 불가능합니다. CP 시스템(예: etcd, HBase)은 이때 요청을 거부하거나 오류를 반환하여 오래된 데이터 제공을 막습니다. AP 시스템(예: Cassandra, DynamoDB)은 파티션된 노드에서도 응답을 계속하되, 서로 다른 노드가 다른 데이터를 반환할 수 있습니다. 파티션이 해소되면 최종 일관성(eventual consistency)을 통해 데이터를 맞춥니다.

Q2. Raft 알고리즘에서 리더 장애 시 새 리더를 선출하는 과정은?

정답: Follower가 election timeout 내 heartbeat를 수신하지 못하면 Candidate가 되어 RequestVote RPC를 전송하고, 과반수 투표를 받은 노드가 새 Leader가 됩니다.

설명: 리더 장애 시 Follower들은 election timeout(보통 150-300ms) 동안 heartbeat를 기다립니다. Timeout이 지나면 Candidate로 전환하여 term을 1 증가시키고 자신에게 먼저 투표합니다. 이후 다른 노드들에 RequestVote RPC를 전송합니다. 각 노드는 한 term에 한 번만 투표할 수 있으며, 과반수 투표를 받은 Candidate가 Leader가 됩니다. Split vote 발생 시 모두 timeout 후 재시도합니다.

Q3. Kafka에서 consumer group의 컨슈머 수가 partition 수보다 많을 때 발생하는 문제는?

정답: 초과하는 컨슈머들은 아무 파티션도 할당받지 못하고 idle 상태가 됩니다.

설명: Kafka의 소비 모델에서 하나의 파티션은 동일 consumer group 내 하나의 컨슈머만 처리할 수 있습니다. 따라서 컨슈머가 4개이고 파티션이 3개라면, 1개의 컨슈머는 어떤 파티션도 할당받지 못해 메시지를 처리하지 못합니다. 이는 자원 낭비이므로, 일반적으로 컨슈머 수는 파티션 수를 초과하지 않도록 설계합니다. 처리량 증가를 위해서는 파티션 수를 먼저 늘려야 합니다.

Q4. Ring-AllReduce가 Parameter Server 방식보다 통신 효율이 높은 이유는?

정답: Ring-AllReduce는 각 노드가 이웃과만 통신하여 총 대역폭이 노드 수와 무관하게 일정하지만, Parameter Server는 모든 Worker가 PS와 통신하여 PS의 대역폭이 병목이 됩니다.

설명: Parameter Server 방식에서 n개의 Worker가 있을 때 PS는 n개의 Worker 모두와 통신해야 합니다. Worker가 늘어날수록 PS의 네트워크 대역폭 요구량이 선형으로 증가합니다. 반면 Ring-AllReduce에서 각 Worker는 양쪽 이웃 노드에만 데이터를 전송합니다. n개 노드의 총 데이터 전송량은 2*(n-1)/n * data_size로, 모든 노드의 대역폭을 동등하게 활용합니다. 따라서 노드를 추가해도 각 노드의 통신 부담이 증가하지 않습니다.

Q5. Saga 패턴에서 보상 트랜잭션(compensating transaction)이 필요한 상황은?

정답: Saga의 중간 단계가 실패했을 때, 이미 완료된 이전 단계들의 효과를 되돌리기 위해 보상 트랜잭션이 실행됩니다.

설명: Saga 패턴은 분산 환경에서 ACID 트랜잭션을 사용할 수 없을 때 사용합니다. 각 단계는 로컬 트랜잭션으로 이미 커밋됩니다. 예를 들어 주문 생성 → 재고 차감 → 결제 처리 순서에서 결제 처리가 실패하면, 이미 완료된 재고 차감을 되돌리는 "재고 복구" 트랜잭션과 주문 생성을 되돌리는 "주문 취소" 트랜잭션이 역순으로 실행됩니다. 보상 트랜잭션 자체가 실패할 수 있으므로 멱등성(idempotency) 보장과 재시도 로직이 중요합니다.


마치며

분산 시스템은 하루아침에 마스터할 수 있는 분야가 아닙니다. CAP theorem의 트레이드오프를 이해하고, Raft 합의를 코드로 구현해보고, Kafka 파티션 전략을 실제 서비스에 적용해보면서 점차 깊이가 쌓입니다.

특히 AI 엔지니어라면 Ring-AllReduce와 NCCL 같은 분산 ML 학습 기술이 점점 더 중요해지고 있습니다. GPT-4 규모의 모델을 학습시키려면 수천 개 GPU의 네트워크 통신을 최적화해야 하고, 이는 결국 분산 시스템의 핵심 원리로 귀결됩니다.

핵심 학습 경로: CAP theorem 이해 → Raft 논문 읽기 → etcd 직접 운영 → Kafka 프로덕션 경험 → PyTorch DDP로 멀티 GPU 학습