- Authors

- Name
- Youngju Kim
- @fjvbn20031
- はじめに
- 1. 分散システムの基礎: CAP定理と一貫性モデル
- 2. コンセンサスアルゴリズム: PaxosとRaft
- 3. 分散トランザクション: 2PC、Saga、CQRS
- 4. メッセージキュー: Kafkaアーキテクチャ
- 5. 分散ストレージ: コンシステントハッシングとCassandra
- 6. 分散ML学習: Ring-AllReduceとPyTorch Distributed
- 7. オブザーバビリティ: 分散トレース、ログ、メトリクス
- クイズ
- おわりに
はじめに
分散システムは現代のAIインフラの骨格です。GPT-4のような大規模モデルを学習させるには数千個のGPUが連携して動作する必要があり、毎秒数百万件の推論リクエストを処理するにはKafkaやCassandraのような分散システムが不可欠です。
このガイドでは、CAP定理のような理論的基礎から実際のPyTorch分散学習コードまで、AIエンジニアに必要な分散システムの知識を体系的に解説します。
1. 分散システムの基礎: CAP定理と一貫性モデル
CAP定理
2000年にEric Brewerが提唱したCAP定理は、分散システムの根本的な制約を説明します。
┌─────────────────────────────────────────────┐
│ CAP Triangle │
│ │
│ Consistency (C) │
│ △ │
│ / \ │
│ / \ │
│ / \ │
│ / CA \ │
│ / (RDBMS)\ │
│ /───────────\ │
│ / CP │ AP \ │
│ / (HBase│(Cassandra) │
│ ▽────────┼────────▽ │
│ Availability Partition Tolerance │
│ (A) (P) │
└─────────────────────────────────────────────┘
3つの性質を同時にすべて満たすことはできません:
- C (Consistency): 全ノードが同じデータを読める
- A (Availability): 全リクエストが応答を受け取れる
- P (Partition Tolerance): ネットワーク分断が発生しても動作し続ける
実際の分散環境ではネットワーク分断は必ず発生するため、実質的な選択は CP vs AP です。
| システム | 選択 | 例 |
|---|---|---|
| HBase, Zookeeper, etcd | CP | 分断時に可用性を犠牲にする |
| Cassandra, DynamoDB | AP | 分断時に一貫性を犠牲にする |
| 従来型RDBMS | CA | 分断を許容しない |
PACELC: CAPの拡張
CAPは分断時のみを扱いますが、PACELCは通常運用時のLatency vs Consistencyのトレードオフも含みます。
P → A or C (パーティション発生時)
E → L or C (通常運用時)
Cassandra: PA/EL (可用性 + 低レイテンシ優先)
etcd/Raft: PC/EC (一貫性優先)
一貫性モデル
強い一貫性から弱い一貫性の順:
線形化可能性 (Linearizability)
└── 最も強力。全操作が即座に反映される
└── 例: etcd, Zookeeper
逐次一貫性 (Sequential Consistency)
└── 全ノードが同じ順序で操作を観察する
└── 例: CPUメモリモデル
因果一貫性 (Causal Consistency)
└── 因果関係のある操作は順序が保証される
└── 例: MongoDBの因果一貫性セッション
結果整合性 (Eventual Consistency)
└── いつかは一貫性が保たれる
└── 例: Cassandra, DNS, S3
2. コンセンサスアルゴリズム: PaxosとRaft
Paxosの問題点
Leslie Lamportが1989年に提案したPaxosは理論的には完全ですが、理解するのが非常に難しいです。Diego Ongaroが"In Search of an Understandable Consensus Algorithm"という論文タイトルでRaftを発表した理由がここにあります。
Raftアルゴリズム
Raftは分散コンセンサスを3つの独立した問題に分解します:
- リーダー選出 (Leader Election)
- ログ複製 (Log Replication)
- 安全性 (Safety)
ノードの状態遷移
┌──────────┐ timeout ┌──────────────┐ majority ┌────────┐
│ │──────────▶│ Candidate │────────────▶│ │
│ Follower │ └──────────────┘ │ Leader │
│ │◀─────────────────────────────────────── │ │
└──────────┘ receive heartbeat └────────┘
▲ │
└────────────────── heartbeat ──────────────────────┘
リーダー選出の流れ
- Followerがelection timeout(150〜300ms)内にheatbeatを受信できなければCandidateに遷移
- term番号を1増加し、自分に投票してから他のノードにRequestVote RPCを送信
- 過半数の票を得たCandidateが新しいLeaderに当選
- LeaderはAppendEntries(heartbeat)を定期的に送信して再選挙を防ぐ
ログ複製
# etcdを使った分散ロックの実装
import etcd3
import time
def distributed_lock(etcd_client, lock_name, ttl=10):
"""Raftベースのetcdで分散ロックを実装"""
lease = etcd_client.lease(ttl)
lock_key = f"/locks/{lock_name}"
# アトミックなCAS (Compare-And-Swap)
success, _ = etcd_client.transaction(
compare=[
etcd3.transactions.create(lock_key, '==', 0)
],
success=[
etcd3.transactions.put(lock_key, 'locked', lease=lease)
],
failure=[]
)
if success:
print(f"ロック取得: {lock_name}")
return lease
else:
print(f"ロック保持中: {lock_name}")
return None
def release_lock(etcd_client, lease):
"""ロックの解放"""
if lease:
lease.revoke()
print("ロック解放")
# 使用例
client = etcd3.client(host='localhost', port=2379)
lease = distributed_lock(client, 'my-resource')
if lease:
try:
# クリティカルセクションでの処理
time.sleep(5)
finally:
release_lock(client, lease)
KubernetesがetcdをどのようにするWか
Kubernetesはすべてのクラスター状態をetcdに保存します。etcdクラスターの書き込み性能はRaftコンセンサスによって制限されるため、一般的に3〜5ノードが推奨されます。
┌─────────────────────────────────────────────────────┐
│ etcd Cluster (3 nodes) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Leader │───▶│Follower 1│ │Follower 2│ │
│ │ node-1 │───▶│ node-2 │ │ node-3 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ▲ │ │ │
│ └───────────────┴───────────────┘ │
│ Raft Consensus │
└─────────────────────────────────────────────────────┘
3. 分散トランザクション: 2PC、Saga、CQRS
2PC (2フェーズコミット)
フェーズ1 (Prepare):
Coordinator ──▶ Participant A: "準備できた?"
Coordinator ──▶ Participant B: "準備できた?"
Participant A ──▶ Coordinator: "YES"
Participant B ──▶ Coordinator: "YES"
フェーズ2 (Commit):
Coordinator ──▶ Participant A: "コミットして!"
Coordinator ──▶ Participant B: "コミットして!"
2PCの問題点: フェーズ1後にCoordinatorがクラッシュすると、Participantたちがブロック状態に陥ります。
Sagaパターン
Sagaは長いトランザクションを複数のローカルトランザクションに分解し、失敗時には補償トランザクションを実行します。
注文Saga:
1. 注文作成 → 補償: 注文キャンセル
2. 在庫引き落とし → 補償: 在庫復元
3. 支払い処理 → 補償: 支払い払い戻し
4. 配送開始 → 補償: 配送キャンセル
from typing import Callable, List
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SagaStep:
name: str
action: Callable
compensating_action: Callable
class SagaOrchestrator:
"""Sagaオーケストレーターパターンの実装"""
def __init__(self, steps: List[SagaStep]):
self.steps = steps
self.completed_steps = []
def execute(self, context: dict) -> bool:
for step in self.steps:
try:
logger.info(f"ステップ実行: {step.name}")
step.action(context)
self.completed_steps.append(step)
logger.info(f"ステップ完了: {step.name}")
except Exception as e:
logger.error(f"ステップ失敗: {step.name}, エラー: {e}")
self._compensate(context)
return False
return True
def _compensate(self, context: dict):
"""逆順で補償トランザクションを実行"""
logger.info("補償処理を開始...")
for step in reversed(self.completed_steps):
try:
logger.info(f"補償実行: {step.name}")
step.compensating_action(context)
except Exception as e:
logger.error(f"補償失敗: {step.name}, エラー: {e}")
raise RuntimeError(f"Saga補償が{step.name}で失敗")
# 注文Sagaの定義
def create_order(ctx):
ctx['order_id'] = 'ORD-001'
print(f"注文作成: {ctx['order_id']}")
def cancel_order(ctx):
print(f"注文キャンセル: {ctx['order_id']}")
def deduct_inventory(ctx):
ctx['inventory_reserved'] = True
print("在庫引き落とし完了")
def restore_inventory(ctx):
print("在庫復元完了")
def process_payment(ctx):
raise Exception("決済ゲートウェイタイムアウト")
def refund_payment(ctx):
print("支払い払い戻し完了")
order_saga = SagaOrchestrator([
SagaStep("create_order", create_order, cancel_order),
SagaStep("deduct_inventory", deduct_inventory, restore_inventory),
SagaStep("process_payment", process_payment, refund_payment),
])
context = {"user_id": "user-123", "amount": 50000}
result = order_saga.execute(context)
print(f"Saga結果: {'成功' if result else '補償して失敗'}")
イベントソーシングとCQRS
CQRSアーキテクチャ:
コマンド側 (書き込み) クエリ側 (読み取り)
───────────────── ────────────────
POST /orders ──▶ GET /orders/123
PUT /inventory ──▶ GET /inventory/stats
│ ▲
▼ │
Event Store ──── Projection ────────┘
(追記専用) (読み取りモデル更新)
イベントソーシングの核心: 現在の状態ではなく、イベントのシーケンスを保存します。これにより特定時点の状態復元、監査ログ、イベントリプレイが可能になります。
4. メッセージキュー: Kafkaアーキテクチャ
Kafkaの核心概念
┌─────────────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ Topic: orders (4 partitions, replication factor: 3) │
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ P0 L │ │ P0 F │ │ P0 F │ L=Leader, F=Follower │
│ │ P1 F │ │ P1 L │ │ P1 F │ │
│ │ P2 F │ │ P2 F │ │ P2 L │ │
│ │ P3 L │ │ P3 F │ │ P3 F │ │
│ └──────┘ └──────┘ └──────┘ │
│ │
│ Consumer Group A Consumer Group B │
│ Consumer 1: P0, P1 Consumer X: P0, P1, P2, P3 │
│ Consumer 2: P2, P3 │
└─────────────────────────────────────────────────────────────┘
- パーティション: 順序保証と並列処理の単位
- レプリケーション: 高可用性のためにパーティションを複数ブローカーに複製
- コンシューマーグループ: 各パーティションはグループ内の1つのコンシューマーのみが処理
Kafka Producer/Consumerの実践
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
# Producer設定
producer_config = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all', # 全ISRレプリカの確認後にack
'retries': 3,
'enable.idempotence': True, # 厳密に1回の送信保証
'compression.type': 'snappy',
}
def delivery_report(err, msg):
if err is not None:
print(f'メッセージ配信失敗: {err}')
else:
print(f'メッセージ配信完了: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer = Producer(producer_config)
# 注文イベントの発行
for i in range(10):
order_event = {
'order_id': f'ORD-{i:04d}',
'user_id': f'user-{i % 5}',
'amount': 10000 * (i + 1),
'timestamp': time.time()
}
producer.produce(
topic='orders',
key=order_event['user_id'], # 同じユーザーは同じパーティションへ
value=json.dumps(order_event),
callback=delivery_report
)
producer.poll(0)
producer.flush()
print("全メッセージ送信完了")
# Consumer設定
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # at-least-once保証のための手動コミット
}
consumer = Consumer(consumer_config)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaError(msg.error())
event = json.loads(msg.value().decode('utf-8'))
print(f"注文処理: {event['order_id']}, 金額: {event['amount']}")
# 処理完了後にオフセットをコミット
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()
Kafka TopicのYAML設定
# Kafka Topic設定 (Strimzi Operator)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: orders
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 12 # 最大並列コンシューマー数 = パーティション数
replicas: 3 # レプリケーションファクター
config:
retention.ms: '604800000' # 7日間保持
min.insync.replicas: '2' # 最低2つのISRレプリカ確認後にack
compression.type: snappy
cleanup.policy: delete
Pulsar vs Kafka
| 項目 | Kafka | Pulsar |
|---|---|---|
| ストレージ | ブローカーに直接 | Apache BookKeeperで分離 |
| マルチテナント | 限定的 | ネイティブサポート |
| 地理的レプリケーション | MirrorMaker2 | 組み込み |
| レイテンシ | 約5ms | 約1ms |
| 運用複雑度 | 低い | 高い |
| 成熟度 | 非常に高い | 高い |
5. 分散ストレージ: コンシステントハッシングとCassandra
コンシステントハッシング
通常のハッシングの問題: ノードの追加・削除時にほぼ全てのキーが再配置されます。 コンシステントハッシングの解決策: ノード変更時に最小限のキーのみ再配置されます。
import hashlib
from sortedcontainers import SortedDict
class ConsistentHashRing:
"""コンシステントハッシングリングの実装"""
def __init__(self, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = SortedDict()
self.nodes = set()
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""仮想ノードを含めてリングにノードを追加"""
self.nodes.add(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}#{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = node
print(f"ノード追加: {node} ({self.virtual_nodes}個の仮想ノード)")
def remove_node(self, node: str):
"""リングからノードを削除"""
self.nodes.discard(node)
keys_to_remove = [k for k, v in self.ring.items() if v == node]
for k in keys_to_remove:
del self.ring[k]
print(f"ノード削除: {node}")
def get_node(self, key: str) -> str:
"""キーに対応するノードを返す"""
if not self.ring:
raise Exception("リングが空です")
hash_val = self._hash(key)
idx = self.ring.bisect_left(hash_val)
if idx == len(self.ring):
idx = 0 # リングなのでラップアラウンド
return self.ring.values()[idx]
def get_nodes_for_replication(self, key: str, n: int) -> list:
"""レプリケーション用にn個のノードを返す"""
if len(self.nodes) < n:
raise Exception(f"ノード数不足: {len(self.nodes)} < {n}")
hash_val = self._hash(key)
idx = self.ring.bisect_left(hash_val)
result_nodes = []
seen_nodes = set()
total = len(self.ring)
for i in range(total):
curr_idx = (idx + i) % total
node = self.ring.values()[curr_idx]
if node not in seen_nodes:
seen_nodes.add(node)
result_nodes.append(node)
if len(result_nodes) == n:
break
return result_nodes
# 使用例
ring = ConsistentHashRing(virtual_nodes=150)
for i in range(5):
ring.add_node(f"cassandra-node-{i}")
for key in ["user:1001", "order:5555", "product:abc", "session:xyz"]:
node = ring.get_node(key)
replicas = ring.get_nodes_for_replication(key, 3)
print(f"キー '{key}' -> プライマリ: {node}, レプリカ: {replicas}")
Apache Cassandraアーキテクチャ
┌───────────────────────────────────────────────────────────────────┐
│ Cassandraリング (6ノード) │
│ │
│ Node 1 │
│ / \ │
│ Node 6 Node 2 │
│ | | │
│ Node 5 Node 3 │
│ \ / │
│ Node 4 │
│ │
│ 書き込みパス: │
│ Client → Coordinator → パーティションキーのハッシュ → 担当ノード │
│ → CommitLog (WAL) → Memtable → SSTable │
│ │
│ 読み取りパス: │
│ Client → Coordinator → 担当ノード → Row Cache / Bloomフィルタ │
│ → SSTableのマージ → 最新データを返す │
└───────────────────────────────────────────────────────────────────┘
Cassandraの主要な特性:
- Tunable Consistency: クエリごとに
QUORUM、ONE、ALLなどの一貫性レベルを選択可能 - Gossipプロトコル: エピデミックアルゴリズムでノード間の状態を伝播
- Compaction: 定期的にSSTableをマージして読み取りパフォーマンスを向上
6. 分散ML学習: Ring-AllReduceとPyTorch Distributed
Parameter Server vs All-Reduce
Parameter Server方式:
─────────────────────
Worker 1 ─┐
Worker 2 ─┤── PS (Parameter Server) ──▶ 勾配集計後に配布
Worker 3 ─┘
問題: PSがボトルネック。帯域幅の要求がO(n)で増加
Ring-AllReduce方式:
────────────────────
Worker 1 ──▶ Worker 2 ──▶ Worker 3
▲ │
└──────────────────────────┘
各Workerは隣接Workerとのみ通信。
通信量はノード数に関わらず 2*(n-1)/n * data_size で一定
Ring-AllReduceの2フェーズ:
- Reduce-Scatter: 各ノードが自分のチャンクをreduceしながら次のノードへ転送
- All-Gather: reduceされた結果を全ノードに伝播
PyTorch DistributedDataParallel (DDP)
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def setup(rank, world_size):
"""分散学習の初期化"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# NCCLバックエンド: GPU間高速通信 (NVLink, InfiniBandを活用)
dist.init_process_group(
backend='nccl',
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
def cleanup():
dist.destroy_process_group()
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(1024, 2048),
nn.ReLU(),
nn.Linear(2048, 1024),
nn.ReLU(),
nn.Linear(1024, 10)
)
def forward(self, x):
return self.layers(x)
def train(rank, world_size, epochs=5):
setup(rank, world_size)
# 各GPUにモデルを配置してDDPでラップ
model = SimpleModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
# DistributedSampler: 各プロセスが異なるデータを処理
dataset = torch.utils.data.TensorDataset(
torch.randn(1000, 1024),
torch.randint(0, 10, (1000,))
)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
sampler.set_epoch(epoch) # エポックごとにシャッフルを再設定
total_loss = 0.0
for batch_x, batch_y in dataloader:
batch_x = batch_x.to(rank)
batch_y = batch_y.to(rank)
optimizer.zero_grad()
output = ddp_model(batch_x)
loss = criterion(output, batch_y)
loss.backward() # 勾配のAllReduceが自動的に実行される
optimizer.step()
total_loss += loss.item()
if rank == 0: # メインプロセスのみログ出力
print(f"Epoch {epoch}: Loss = {total_loss/len(dataloader):.4f}")
cleanup()
# 実行: torchrun --nproc_per_node=4 train.py
if __name__ == '__main__':
world_size = torch.cuda.device_count()
torch.multiprocessing.spawn(
train,
args=(world_size,),
nprocs=world_size,
join=True
)
NCCLと通信バックエンド
NCCL (NVIDIA Collective Communications Library):
シングルノード (NVLink):
GPU0 ←──── NVLink ────→ GPU1
│ │
600 GB/s 双方向帯域幅
マルチノード (InfiniBand + RDMA):
Node 1 ←── IB (200 Gb/s) ──→ Node 2
│ │
RDMA: CPUを介さずGPU間で直接通信
バックエンドの選択:
- nccl: GPU分散学習 (推奨)
- gloo: CPU学習またはデバッグ
- mpi: HPC環境
Horovodとの比較
| 項目 | PyTorch DDP | Horovod |
|---|---|---|
| 統合 | PyTorch組み込み | 別途インストール |
| アルゴリズム | バケットAllReduce | Ring-AllReduce |
| フレームワーク | PyTorch専用 | TF/PyTorch/MXNet |
| 使いやすさ | やや複雑 | シンプル (hvd.init()) |
| パフォーマンス | 同等 | 同等 |
7. オブザーバビリティ: 分散トレース、ログ、メトリクス
分散トレース
マイクロサービス環境でリクエストが複数のサービスを横断する際、全体の経路を追跡します。
リクエストフロー (Jaeger/Zipkinによるトレース):
Browser → API GW → Order Svc → Inventory Svc → DB
│ │ │ │ │
│ TraceID: abc123 │ │ │
│ SpanID: 001 SpanID: 002 SpanID: 003 │
│ │ │ │
└──────────────────────┴────────────┴──────────┘
合計レイテンシ: 120ms
Order Svc: 80ms (ボトルネック!)
OpenTelemetryのレイヤー構成:
- API: 計装コードの記述
- SDK: テレメトリデータの処理とエクスポート
- Exporter: Jaeger/Zipkin/OTLPエンドポイントへ送信
ELK Stackによるログ集約
┌─────────────────────────────────────────────────────┐
│ ELK Stack │
│ │
│ App Logs ──▶ Filebeat ──▶ Logstash ──▶ Elasticsearch│
│ K8s Logs ──▶ Fluentd ──/ ──▶ Kibana │
│ (可視化) │
│ │
│ モダンな代替スタック: │
│ App Logs ──▶ Promtail ──▶ Loki ──▶ Grafana │
└─────────────────────────────────────────────────────┘
Prometheus + Grafanaメトリクス
# Prometheusスクレイピング設定
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-broker-1:9090', 'kafka-broker-2:9090']
metrics_path: '/metrics'
scrape_interval: 15s
- job_name: 'pytorch-training'
static_configs:
- targets: ['trainer-0:8080', 'trainer-1:8080']
Grafanaダッシュボードで監視すべき主要なKafkaメトリクス:
kafka_consumer_lag: コンシューマーの遅延 (主要なSLA指標)kafka_network_request_rate: ブローカーのリクエスト処理率kafka_log_size: パーティションのログサイズkafka_under_replicated_partitions: レプリケーション遅延パーティション数
クイズ
Q1. CAP定理においてネットワーク分断が発生した場合、CPシステムとAPシステムのトレードオフの違いは何ですか?
答え: CPシステムは分断時に可用性を犠牲にして一貫性を維持します。APシステムは逆に一貫性を犠牲にして可用性を維持します。
解説: ネットワーク分断が発生すると、分断された側のノード間で通信が不可能になります。CPシステム(例: etcd, HBase)は古いデータを提供しないよう、この状況でリクエストを拒否するかエラーを返します。APシステム(例: Cassandra, DynamoDB)は分断されたノードでも応答し続けますが、異なるノードが異なるデータを返す可能性があります。分断が解消された後、結果整合性(eventual consistency)によってデータが整合されます。
Q2. Raftアルゴリズムにおいて、リーダーが障害を起こした場合に新しいリーダーを選出する流れを説明してください。
答え: Followerがelection timeout内にheartbeatを受信できなければCandidateになり、RequestVote RPCを送信して、過半数の票を得たノードが新しいLeaderになります。
解説: リーダー障害後、Followerたちはelection timeout(通常150〜300ms)を待ちます。タイムアウトしたFollowerはCandidateに遷移してtermを1増やし、自分に投票してから他のノードにRequestVote RPCを送ります。各ノードは1つのtermで1回しか投票できず(先着順)、過半数の票を得たCandidateがLeaderになります。票が分散した場合(スプリットボート)は全員がタイムアウト後に再試行します。
Q3. Kafkaでconsumer groupのコンシューマー数がパーティション数より多い場合に何が起きますか?
答え: 超過したコンシューマーはどのパーティションにも割り当てられず、アイドル状態になります。
解説: Kafkaの消費モデルでは、1つのパーティションは同一consumer group内の1つのコンシューマーのみが処理できます。コンシューマーが4つでパーティションが3つの場合、1つのコンシューマーはパーティションが割り当てられずメッセージを処理しません。これはリソースの無駄遣いなので、通常はコンシューマー数がパーティション数を超えないよう設計します。処理量を増やすにはまずパーティション数を増やす必要があります。
Q4. Ring-AllReduceがParameter Server方式より通信効率が高い理由を説明してください。
答え: Ring-AllReduceでは各ノードが隣接ノードとのみ通信するため総帯域幅はノード数に関わらず一定ですが、Parameter ServerではすべてのWorkerがPSと通信するためPSの帯域幅がボトルネックになります。
解説: Parameter Server方式ではn個のWorkerがいる場合、PSはn個のWorker全員と勾配データをやり取りする必要があります。Workerが増えるほどPSの必要帯域幅は線形に増加します。一方Ring-AllReduceでは各Workerは隣接2ノードにのみデータを送ります。n個のノードでの1ノードあたりの通信量は 2*(n-1)/n * data_size で収束し、ノードを増やしても各ノードの通信負担は増加しません。全ノードの帯域幅が均等に活用されます。
Q5. Sagaパターンにおいて補償トランザクション(compensating transaction)が必要になる状況はいつですか?
答え: Sagaの中間ステップが失敗した際、既に完了した前のステップの効果を元に戻すために補償トランザクションが逆順で実行されます。
解説: Sagaパターンは分散環境でACIDトランザクションが使えない場合に使用します。各ステップはローカルトランザクションとして既にコミットされています。例えば注文作成→在庫引き落とし→支払い処理の順で、支払いが失敗した場合、既に完了した在庫引き落としを元に戻す「在庫復元」トランザクションと注文作成を元に戻す「注文キャンセル」トランザクションが逆順で実行されます。補償トランザクション自体が失敗する可能性があるため、冪等性の保証と再試行ロジックが重要です。
おわりに
分散システムは一朝一夕でマスターできる分野ではありません。CAP定理のトレードオフを理解し、Raftコンセンサスをコードで実装し、Kafkaのパーティション戦略を実際のサービスに適用していく中で、少しずつ深みが増していきます。
特にAIエンジニアにとって、Ring-AllReduceやNCCLのような分散ML学習技術はますます重要になっています。GPT-4規模のモデルを学習させるには数千個のGPUのネットワーク通信を最適化する必要があり、これは結局のところ分散システムの核心原理に帰結します。
推奨学習パス: CAP定理の理解 → Raft論文を読む → etcdを実際に運用 → Kafkaの本番環境経験 → PyTorch DDPでマルチGPU学習