- Published on
分散システム基礎完全ガイド 2025: 合意アルゴリズム、レプリケーション、一貫性モデル、障害耐性
- Authors

- Name
- Youngju Kim
- @fjvbn20031
目次(もくじ)
1. なぜ分散(ぶんさん)システムなのか
1.1 分散システムが必要な理由
単一サーバーでは解決できない3つの根本的な理由があります。
- スケーラビリティ(Scalability): 単一マシンのCPU、メモリ、ディスクには物理的限界がある
- 可用性(Availability): 単一障害点(SPOF)を排除してサービス継続性を保証する
- 地理的分散(Geographic Distribution): ユーザーに近い場所からサービスして遅延を減らす
分散システムのコアチャレンジ
┌─────────────────────────────────────────────┐
│ │
│ ネットワークは信頼できない │
│ ├── パケット損失 │
│ ├── レイテンシ変動 │
│ └── ネットワークパーティション │
│ │
│ ノードは故障しうる │
│ ├── クラッシュ │
│ ├── スローダウン │
│ └── ビザンチン障害(悪意ある動作) │
│ │
│ グローバル時刻は存在しない │
│ ├── クロックドリフト │
│ ├── NTP誤差 │
│ └── イベント順序決定の困難さ │
│ │
└─────────────────────────────────────────────┘
1.2 分散コンピューティングの8つの誤謬(ごびゅう)
Peter Deutschが定義した8つの誤った仮定です。
| 番号 | 誤謬(Fallacy) | 現実 |
|---|---|---|
| 1 | ネットワークは信頼できる | パケット損失、遅延、パーティションが発生する |
| 2 | レイテンシはゼロ | 物理的距離に比例してレイテンシが発生する |
| 3 | 帯域幅は無限 | ネットワーク帯域幅には限界がある |
| 4 | ネットワークは安全 | セキュリティ脅威は常に存在する |
| 5 | トポロジは変わらない | ネットワーク構成は常に変化する |
| 6 | 管理者は一人 | 複数の組織がネットワークを管理する |
| 7 | 転送コストはゼロ | データ転送にはコストがかかる |
| 8 | ネットワークは均質 | 様々な機器とプロトコルが混在する |
2. CAP定理(ていり)とPACELC
2.1 CAP定理
Eric Brewerが2000年に提案したCAP定理は、分散システム設計の基本原則です。
CAP定理
┌─────────────────────────────────────────────┐
│ │
│ Consistency (C) │
│ /\ │
│ / \ │
│ / \ │
│ / CP \ CA │
│ / システム\ システム │
│ / \ │
│ /____________\ │
│ Availability (A) ─── Partition │
│ Tolerance (P) │
│ │
│ CP: HBase, MongoDB, etcd, ZooKeeper │
│ AP: Cassandra, DynamoDB, CouchDB │
│ CA: 単一ノードRDBMS(理論的) │
│ │
│ パーティション時: CまたはAの選択が必須 │
└─────────────────────────────────────────────┘
3つの属性:
- C(Consistency): 全ノードが同一データを保証
- A(Availability): 全リクエストが応答を受ける(障害のないノード)
- P(Partition Tolerance): ネットワークパーティション発生時も動作
核心: ネットワークパーティションは不可避なのでPは必須。したがって実際の選択はCP vs APです。
2.2 PACELC拡張
Daniel Abadiが提案したPACELCはCAPを拡張し、正常状態でのトレードオフも扱います。
PACELC定理
if (Partition) then
Availability vs Consistency
else
Latency vs Consistency
┌─────────────────────────────────────────────┐
│ システム │ P発生時 │ 正常時 (E) │
│───────────────│────────│───────────────│
│ DynamoDB │ PA │ EL(低レイテンシ)│
│ Cassandra │ PA │ EL │
│ MongoDB │ PC │ EC(強い一貫性) │
│ Google Spanner│ PC │ EC │
│ CockroachDB │ PC │ EL(読み取り最適化)│
└─────────────────────────────────────────────┘
3. 一貫性(いっかんせい)モデル
3.1 一貫性スペクトラム
強い一貫性 ◄─────────────────────────────► 弱い一貫性
線形化可能性 順序的 因果的 最終的
(Linearizable) (Sequential) (Causal) (Eventual)
│ 低パフォーマンス │ │ │ 高パフォーマンス │
│ 実装複雑 │ │ │ 実装単純 │
│ 高い保証 │ │ │ 低い保証 │
3.2 線形化可能性(せんけいかかのうせい)
最も強い一貫性保証です。すべての操作がリアルタイム順序と一致する必要があります。
Linearizable
Client A: ──write(x=1)──────────────────────
Client B: ────────────read(x)→1─────────────
Client C: ──────────────────read(x)→1───────
全クライアントが書き込み完了後、常に最新値を読む
NOT Linearizable
Client A: ──write(x=1)──────────────────────
Client B: ────────────read(x)→1─────────────
Client C: ──────────────────read(x)→0─────── ← 古い値!
3.3 最終的一貫性(Eventual Consistency)
更新が中断なく続けば、最終的にすべてのレプリカが同一状態に到達します。
# Eventual Consistencyを前提とした読み取りパターン
class EventuallyConsistentStore:
def __init__(self, replicas: list):
self.replicas = replicas
def write(self, key: str, value: str, timestamp: float):
"""全レプリカへ非同期書き込み"""
for replica in self.replicas:
replica.async_put(key, value, timestamp)
def read(self, key: str) -> str:
"""読み取り - 任意のレプリカから読める"""
values = [r.get(key) for r in self.replicas]
return max(values, key=lambda v: v.timestamp).value
def read_your_writes(self, key: str, session_token: str) -> str:
"""自身の書き込みを読めることを保証"""
last_write_replica = self.get_replica_for_session(session_token)
return last_write_replica.get(key).value
3.4 因果的一貫性(いんがてきいっかんせい)
因果的に関連する操作の順序のみを保証します。
因果的一貫性の例:
A: write(x=1) ← Bがx=1を読んだ後y=2を書いた場合
B: read(x)→1, write(y=2)
C: read(y)→2, read(x)→?
因果的一貫性が保証されれば:
Cがy=2を見られるなら、x=1も必ず見えなければならない
(y=2の因果的原因であるx=1が先に見える必要がある)
4. 合意(ごうい)アルゴリズム
4.1 Paxos
Leslie Lamportが提案した最初の合意アルゴリズムです。
Basic Paxos(単一値合意)
┌─────────────────────────────────────────────┐
│ │
│ Phase 1a: Prepare │
│ Proposer → Acceptor: prepare(n) │
│ 「提案番号nで準備してください」 │
│ │
│ Phase 1b: Promise │
│ Acceptor → Proposer: promise(n, prev_val) │
│ 「n未満は拒否します。以前の受理値を伝達」 │
│ │
│ Phase 2a: Accept │
│ Proposer → Acceptor: accept(n, value) │
│ 「この値を受理してください」 │
│ │
│ Phase 2b: Accepted │
│ Acceptor → Learner: accepted(n, value) │
│ 「過半数が受理すれば合意完了」 │
│ │
└─────────────────────────────────────────────┘
4.2 Raft:理解(りかい)しやすい合意アルゴリズム
Diego Ongaroが2014年に提案したRaftは、Paxosと同等の安全性を提供しながら理解しやすく設計されています。
Raft状態遷移
┌────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ タイムアウト ┌──────────┐ │
│ │ Follower │───────────→│ Candidate│ │
│ │ │←───────────│ │ │
│ └──────────┘ 失敗/発見 └────┬─────┘ │
│ ↑ │ │
│ │ 過半数投票獲得 │ │
│ │ ↓ │
│ │ ┌──────────┐ │
│ └──────────────│ Leader │ │
│ 新リーダー発見 │ │ │
│ └──────────┘ │
│ │
│ Term(任期): │
│ ┌───┬───┬───┬───┬───┐ │
│ │ 1 │ 2 │ 3 │ 4 │ 5 │ ... │
│ └───┴───┴───┴───┴───┘ │
│ 各Termで最大1つのLeader │
└────────────────────────────────────────────────┘
Raft Leader Election
Leader Election過程
┌────────────────────────────────────────────────┐
│ 1. Followerがheartbeatタイムアウト │
│ 2. Termを増加し、自分に投票 │
│ 3. RequestVote RPCを他ノードに送信 │
│ 4. 過半数投票を得ればLeaderになる │
│ │
│ [Node A: Follower] [Node B: Follower] │
│ [Node C: Follower] [Node D: Follower] │
│ [Node E: Follower] │
│ │
│ Node Aタイムアウト → Candidate (Term 2) │
│ A→B: RequestVote(Term 2) → B: Grant │
│ A→C: RequestVote(Term 2) → C: Grant │
│ A→D: RequestVote(Term 2) → D: Grant │
│ │
│ Aが過半数(3/5)獲得 → Leader! │
│ A→all: AppendEntries(heartbeat) │
└────────────────────────────────────────────────┘
Raft Log Replication
Log Replication
┌────────────────────────────────────────────────┐
│ │
│ Client → Leader: write(x=5) │
│ │
│ Leader Log: [1:x=3] [2:y=7] [3:x=5] │
│ Follower A: [1:x=3] [2:y=7] │
│ Follower B: [1:x=3] [2:y=7] │
│ Follower C: [1:x=3] │
│ Follower D: [1:x=3] [2:y=7] │
│ │
│ 1. Leaderがログにエントリ追加 │
│ 2. AppendEntries RPCで複製 │
│ 3. 過半数の複製確認時にコミット │
│ 4. クライアントに応答 │
│ 5. 次のheartbeatでコミットを通知 │
│ │
│ 安全性: Leaderはコミット済み全エントリを含む │
│ (Election Restriction) │
└────────────────────────────────────────────────┘
4.3 Raft実装例(擬似(ぎじ)コード)
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
term: int
index: int
command: str
@dataclass
class RaftNode:
node_id: str
state: NodeState = NodeState.FOLLOWER
current_term: int = 0
voted_for: Optional[str] = None
log: list = field(default_factory=list)
commit_index: int = 0
last_applied: int = 0
# Leader専用
next_index: dict = field(default_factory=dict)
match_index: dict = field(default_factory=dict)
def on_election_timeout(self):
"""選挙タイムアウト: Candidateへ遷移"""
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
votes_received = 1 # 自分に投票
for peer in self.peers:
response = self.send_request_vote(peer)
if response and response.vote_granted:
votes_received += 1
if votes_received > len(self.peers) // 2:
self.become_leader()
def become_leader(self):
"""Leaderへ遷移"""
self.state = NodeState.LEADER
for peer in self.peers:
self.next_index[peer] = len(self.log)
self.match_index[peer] = 0
self.send_heartbeats()
def on_append_entries(self, term, leader_id, entries, leader_commit):
"""AppendEntries RPC処理"""
if term < self.current_term:
return False
self.state = NodeState.FOLLOWER
self.current_term = term
self.reset_election_timer()
self.log.extend(entries)
if leader_commit > self.commit_index:
self.commit_index = min(leader_commit, len(self.log) - 1)
self.apply_committed_entries()
return True
def replicate_log(self, command: str):
"""クライアントコマンド処理(Leaderのみ)"""
if self.state != NodeState.LEADER:
return False
entry = LogEntry(
term=self.current_term,
index=len(self.log),
command=command
)
self.log.append(entry)
ack_count = 1
for peer in self.peers:
success = self.send_append_entries(peer, [entry])
if success:
ack_count += 1
if ack_count > len(self.peers) // 2:
self.commit_index = entry.index
self.apply_committed_entries()
return True
return False
5. レプリケーション戦略(せんりゃく)
5.1 リーダー-フォロワー
リーダー-フォロワーレプリケーション
┌─────────────────────────────────────────────┐
│ │
│ Client ──write──→ ┌────────┐ │
│ │ Leader │ │
│ Client ──read──→ │ (R/W) │ │
│ └───┬────┘ │
│ 複製 │ 複製 │
│ ┌─────────┼─────────┐ │
│ ↓ ↓ ↓ │
│ ┌────────┐┌────────┐┌────────┐ │
│ │Follower││Follower││Follower│ │
│ │ (Read) ││ (Read) ││ (Read) │ │
│ └────────┘└────────┘└────────┘ │
│ │
│ 同期複製: 遅いがデータ損失なし │
│ 非同期複製: 速いがデータ損失の可能性 │
│ 半同期複製: 少なくとも1つに同期複製 │
└─────────────────────────────────────────────┘
5.2 マルチリーダー
マルチリーダーレプリケーション
┌─────────────────────────────────────────────┐
│ │
│ DC-US DC-EU DC-Asia │
│ ┌────────┐ ┌────────┐ ┌────────┐│
│ │Leader 1│◄─────►│Leader 2│◄────►│Leader 3││
│ │ (R/W) │ │ (R/W) │ │ (R/W) ││
│ └───┬────┘ └───┬────┘ └───┬────┘│
│ │ │ │ │
│ ┌───┴───┐ ┌───┴───┐ ┌───┴───┐ │
│ │Follower│ │Follower│ │Follower│ │
│ └───────┘ └───────┘ └───────┘ │
│ │
│ 利点: 書き込みレイテンシ削減、オフライン動作│
│ 欠点: 衝突解決が必要(LWW、CRDT等) │
└─────────────────────────────────────────────┘
5.3 リーダーレス / クォーラム
リーダーレスレプリケーション(Dynamoスタイル)
┌─────────────────────────────────────────────┐
│ │
│ N=3(総レプリカ), W=2(書き込みクォーラム)│
│ R=2(読み取りクォーラム) │
│ │
│ Write: Client → Node1 (ack) │
│ → Node2 (ack) ← W=2充足 │
│ → Node3 (timeout) │
│ │
│ Read: Client → Node1 (v=5, ts=100) │
│ → Node2 (v=5, ts=100) ← 最新│
│ → Node3 (v=3, ts=90) ← 古い│
│ │
│ Read repair: 古いNode3に最新値を送信 │
│ │
│ クォーラム条件: W + R > N → 少なくとも │
│ 1つの交差が存在し最新値の読み取りを保証 │
│ │
└─────────────────────────────────────────────┘
6. 分散クロック
6.1 物理的時計とNTP
物理的時計の問題
┌─────────────────────────────────────────────┐
│ │
│ Node A時計: 10:00:00.000 │
│ Node B時計: 10:00:00.150(150msずれ) │
│ Node C時計: 09:59:59.800(200msずれ) │
│ │
│ NTP同期精度: 通常数十ms │
│ Google Spanner TrueTime: 約7ms誤差 │
│ │
│ 問題: イベントAがBより先に発生したかを │
│ 物理的時計だけでは判断できない │
└─────────────────────────────────────────────┘
6.2 Lamportタイムスタンプ
Leslie Lamportが提案した論理的時計です。因果関係の順序を保証します。
Lamport Timestamp
ルール:
1. イベント発生時にローカルカウンタ増加
2. メッセージ送信時にカウンタを含める
3. メッセージ受信時にmax(local, received) + 1
Node A: [1] ──msg──→ [2] ────→ [3]
|
Node B: [1] ────→ [3] (max(1,2)+1) → [4]
|
Node C: [1] ──→ [2] ───────────→ [5] (max(2,4)+1)
限界: aとbのLamport時間だけでは
因果関係を区別できない
class LamportClock:
def __init__(self):
self.time = 0
def tick(self) -> int:
"""ローカルイベント"""
self.time += 1
return self.time
def send(self) -> int:
"""メッセージ送信"""
self.time += 1
return self.time
def receive(self, sender_time: int) -> int:
"""メッセージ受信"""
self.time = max(self.time, sender_time) + 1
return self.time
6.3 ベクタークロック
ベクタークロック
各ノードが全ノードのカウンタをベクターとして維持
Node A: [A:1, B:0, C:0]
──msg──→
Node B: [A:0, B:1, C:0] → 受信後 [A:1, B:2, C:0]
──msg──→
Node C: [A:0, B:0, C:1] → 受信後 [A:1, B:2, C:2]
因果関係判断:
V1 < V2 ⟺ 全てのiについて V1[i] <= V2[i] かつ
少なくとも1つのjについて V1[j] < V2[j]
V1 || V2(同時発生) ⟺ V1 < V2でもV2 < V1でもない
class VectorClock:
def __init__(self, node_id: str, nodes: list):
self.node_id = node_id
self.clock = {n: 0 for n in nodes}
def tick(self):
"""ローカルイベント"""
self.clock[self.node_id] += 1
def send(self) -> dict:
"""メッセージ送信"""
self.clock[self.node_id] += 1
return self.clock.copy()
def receive(self, sender_clock: dict):
"""メッセージ受信"""
for node in self.clock:
self.clock[node] = max(self.clock[node], sender_clock.get(node, 0))
self.clock[self.node_id] += 1
def is_before(self, other_clock: dict) -> bool:
"""selfがotherより前に発生したか"""
all_leq = all(self.clock[n] <= other_clock.get(n, 0) for n in self.clock)
any_lt = any(self.clock[n] < other_clock.get(n, 0) for n in self.clock)
return all_leq and any_lt
def is_concurrent(self, other_clock: dict) -> bool:
"""同時発生か"""
return not self.is_before(other_clock) and not self._is_after(other_clock)
6.4 ハイブリッド論理クロック(HLC)
HLC (Hybrid Logical Clock)
= 物理的時計 + 論理的時計の利点を組み合わせ
構成: (physical_time, logical_counter)
利点:
- 物理的時間に近い値(人が理解可能)
- 因果関係を追跡可能
- NTP誤差を補正
使用例: CockroachDB, YugabyteDB
7. 障害(しょうがい)モデル
7.1 障害タイプ
障害モデルスペクトラム(耐性の難易度順)
┌─────────────────────────────────────────────┐
│ │
│ Crash-Stop(最も単純) │
│ ├── ノードが停止して復旧しない │
│ ├── 検出が比較的容易 │
│ └── ほとんどの合意アルゴリズムが仮定 │
│ │
│ Crash-Recovery │
│ ├── ノードが停止し後に復旧 │
│ ├── 永続ストレージから状態復元 │
│ └── WAL(Write-Ahead Log)が必要 │
│ │
│ Omission │
│ ├── メッセージを送信/受信できない │
│ ├── ネットワークパーティションを含む │
│ └── タイムアウトで検出 │
│ │
│ Byzantine(最も複雑) │
│ ├── ノードが任意/悪意ある動作をする │
│ ├── 誤ったデータ送信が可能 │
│ ├── 合意に3f+1ノード必要(f: 障害数) │
│ └── ブロックチェーンで主に使用 │
│ │
└─────────────────────────────────────────────┘
7.2 障害検出
class PhiAccrualFailureDetector:
"""Phi Accrual障害検出器(Akkaで使用)"""
def __init__(self, threshold: float = 8.0):
self.threshold = threshold
self.heartbeat_intervals = []
self.last_heartbeat = None
def heartbeat(self, timestamp: float):
"""ハートビート受信"""
if self.last_heartbeat:
interval = timestamp - self.last_heartbeat
self.heartbeat_intervals.append(interval)
self.last_heartbeat = timestamp
def phi(self, current_time: float) -> float:
"""phi値計算 - 障害疑い度"""
if not self.heartbeat_intervals:
return 0.0
time_since_last = current_time - self.last_heartbeat
mean = sum(self.heartbeat_intervals) / len(self.heartbeat_intervals)
variance = sum((x - mean) ** 2 for x in self.heartbeat_intervals) / len(self.heartbeat_intervals)
std_dev = variance ** 0.5
if std_dev == 0:
return float('inf') if time_since_last > mean else 0.0
y = (time_since_last - mean) / std_dev
return max(0.0, y * 1.5)
def is_alive(self, current_time: float) -> bool:
"""ノードが生存しているか判断"""
return self.phi(current_time) < self.threshold
8. 分散トランザクション
8.1 2PC(Two-Phase Commit)
2PC (Two-Phase Commit)
┌────────────────────────────────────────────────┐
│ │
│ Phase 1: Prepare(投票) │
│ Coordinator → Participant A: prepare? │
│ Coordinator → Participant B: prepare? │
│ Coordinator → Participant C: prepare? │
│ │
│ A → Coordinator: YES(準備完了) │
│ B → Coordinator: YES │
│ C → Coordinator: YES │
│ │
│ Phase 2: Commit(実行) │
│ Coordinator → A: commit │
│ Coordinator → B: commit │
│ Coordinator → C: commit │
│ │
│ 問題点: │
│ - コーディネーター障害時にブロッキング │
│ - 単一障害点(SPOF) │
│ - 同期的でパフォーマンス低下 │
│ │
│ 1つでもNO → Abort │
│ コーディネーター障害 → 不確実状態(in-doubt) │
└────────────────────────────────────────────────┘
8.2 Sagaパターン
Sagaパターン: 補償トランザクションベース
┌────────────────────────────────────────────────┐
│ │
│ 正常フロー: │
│ T1 → T2 → T3 → T4 → 完了 │
│ (注文)(決済)(在庫)(配送) │
│ │
│ T3失敗時の補償: │
│ T1 → T2 → T3(失敗) → C2 → C1 │
│ (注文)(決済)(在庫失敗)(返金)(注文キャンセル) │
│ │
│ Choreography(イベント駆動): │
│ 各サービスがイベントを発行し購読 │
│ │
│ Orchestration(中央調整): │
│ Saga Orchestratorが各ステップを調整 │
│ │
└────────────────────────────────────────────────┘
# Saga Orchestratorの例
from dataclasses import dataclass
from typing import Callable
from enum import Enum
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable
class SagaOrchestrator:
def __init__(self, steps: list):
self.steps = steps
self.completed_steps = []
self.status = SagaStatus.PENDING
async def execute(self):
"""Saga実行"""
self.status = SagaStatus.RUNNING
for step in self.steps:
try:
await step.action()
self.completed_steps.append(step)
except Exception as e:
print(f"Step '{step.name}' failed: {e}")
await self.compensate()
return False
self.status = SagaStatus.COMPLETED
return True
async def compensate(self):
"""補償トランザクション実行(逆順)"""
self.status = SagaStatus.COMPENSATING
for step in reversed(self.completed_steps):
try:
await step.compensation()
print(f"Compensated: {step.name}")
except Exception as e:
print(f"Compensation failed for '{step.name}': {e}")
self.status = SagaStatus.FAILED
return
self.status = SagaStatus.FAILED
9. パーティショニングとシャーディング
9.1 パーティショニング戦略
範囲パーティショニング
┌─────────────────────────────────────────────┐
│ Key: A-F → Shard 1 │
│ Key: G-N → Shard 2 │
│ Key: O-Z → Shard 3 │
│ │
│ 利点: 範囲クエリが効率的 │
│ 欠点: ホットスポットの可能性 │
└─────────────────────────────────────────────┘
ハッシュパーティショニング
┌─────────────────────────────────────────────┐
│ hash(key) % 3 == 0 → Shard 1 │
│ hash(key) % 3 == 1 → Shard 2 │
│ hash(key) % 3 == 2 → Shard 3 │
│ │
│ 利点: 均等分配 │
│ 欠点: 範囲クエリ不可、リシャーディングコスト高│
└─────────────────────────────────────────────┘
9.2 コンシステントハッシング
コンシステントハッシング
┌─────────────────────────────────────────────┐
│ │
│ Node A (0) │
│ / \ │
│ / \ │
│ Node D Node B │
│ (270) (90) │
│ \ / │
│ \ / │
│ Node C (180) │
│ │
│ keyのハッシュ → リング上の位置 │
│ 時計回りで最も近いノードに割り当て │
│ │
│ ノード追加/削除時: │
│ - 全体リバランスでなく隣接ノードのみ影響 │
│ - 平均K/Nキーのみ移動 │
│ │
│ 仮想ノード(Virtual Node): │
│ - 各物理ノードがリング上に複数位置を占有 │
│ - 負荷均等分配 │
│ │
└─────────────────────────────────────────────┘
import hashlib
from bisect import bisect_right
class ConsistentHashRing:
def __init__(self, virtual_nodes: int = 150):
self.virtual_nodes = virtual_nodes
self.ring = {}
self.sorted_keys = []
self.nodes = set()
def _hash(self, key: str) -> int:
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""ノード追加"""
self.nodes.add(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}:vn{i}"
h = self._hash(virtual_key)
self.ring[h] = node
self.sorted_keys.append(h)
self.sorted_keys.sort()
def remove_node(self, node: str):
"""ノード削除"""
self.nodes.discard(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}:vn{i}"
h = self._hash(virtual_key)
del self.ring[h]
self.sorted_keys.remove(h)
def get_node(self, key: str) -> str:
"""キーに対するノード検索"""
if not self.ring:
return None
h = self._hash(key)
idx = bisect_right(self.sorted_keys, h)
if idx >= len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
ring = ConsistentHashRing(virtual_nodes=150)
ring.add_node("node-1")
ring.add_node("node-2")
ring.add_node("node-3")
print(ring.get_node("user:123"))
print(ring.get_node("order:456"))
10. Gossipプロトコルとメンバーシップ
10.1 Gossipプロトコル
Gossip(エピデミック)プロトコル
┌─────────────────────────────────────────────┐
│ │
│ 定期的にランダムノードに状態情報を伝播 │
│ │
│ Round 1: A → B(Aの情報) │
│ Round 2: A → C, B → D(A,Bの情報) │
│ Round 3: C → E, D → F(拡散続行) │
│ ... │
│ O(log N)ラウンド後に全体伝播 │
│ │
│ Push: 自分の情報を送る │
│ Pull: 相手の情報を要求 │
│ Push-Pull: 双方向交換(最も効率的) │
│ │
│ 使用例: │
│ - メンバーシップ管理 │
│ - 障害検出 │
│ - 分散集計 │
│ - Amazon DynamoDB, Apache Cassandra │
│ │
└─────────────────────────────────────────────┘
10.2 SWIMプロトコル
SWIM (Scalable Weakly-consistent Infection-style Membership)
┌────────────────────────────────────────────────┐
│ │
│ 1. Ping: ランダムノードに直接ping │
│ A ──ping──→ B │
│ │
│ 2. Ack: 応答 │
│ A ←──ack── B │
│ │
│ 3. 無応答時: 間接ping(Ping-Req) │
│ A ──ping-req──→ C ──ping──→ B │
│ A ←──ack────── C ←──ack── B │
│ │
│ 4. まだ無応答: Bを疑い(Suspect) │
│ 5. 一定時間後: Bを死亡と判断 │
│ │
│ 使用: HashiCorp Serf, Consul │
└────────────────────────────────────────────────┘
11. 実システム分析(ぶんせき)
11.1 Google Spanner
Google Spannerのキーテクノロジー
┌─────────────────────────────────────────────┐
│ │
│ TrueTime API │
│ ├── GPS + 原子時計ベース │
│ ├── 時間不確実性区間を返す │
│ ├── 誤差: 約7ms以内 │
│ └── 外部一貫性(External Consistency)を保証 │
│ │
│ Paxosベースレプリケーション │
│ ├── 各Spannerサーバーグループがpaxos実行 │
│ ├── 同期レプリケーションで強い一貫性 │
│ └── 全世界分散可能 │
│ │
│ 読み書きトランザクション │
│ ├── 2PL + 2PC │
│ ├── TrueTimeでコミットタイムスタンプ割り当て │
│ └── 読み取り専用: ロックなしスナップショット │
│ │
└─────────────────────────────────────────────┘
11.2 Amazon DynamoDB
DynamoDBの主要設計
┌─────────────────────────────────────────────┐
│ │
│ コンシステントハッシング + 仮想ノード │
│ ├── パーティションキーベースのデータ分配 │
│ └── 自動リバランシング │
│ │
│ Sloppy Quorum + Hinted Handoff │
│ ├── N=3, W=2, R=2 │
│ ├── 障害時に他ノードが代替保存 │
│ └── 復旧後に元ノードへ転送 │
│ │
│ Vector Clockで衝突検出 │
│ ├── 因果関係追跡 │
│ └── 同時書き込み検出時クライアントが解決 │
│ │
│ Anti-Entropy (Merkle Tree) │
│ ├── レプリカ間の不整合検出 │
│ └── 効率的な同期化 │
│ │
└─────────────────────────────────────────────┘
11.3 Apache Kafka内部構造
Kafkaレプリケーションモデル
┌─────────────────────────────────────────────┐
│ │
│ Topic: orders (Partition 3, RF=3) │
│ │
│ Partition 0: │
│ Leader: Broker 1 │
│ ISR: [Broker 1, Broker 2, Broker 3] │
│ │
│ Partition 1: │
│ Leader: Broker 2 │
│ ISR: [Broker 2, Broker 3, Broker 1] │
│ │
│ Partition 2: │
│ Leader: Broker 3 │
│ ISR: [Broker 3, Broker 1, Broker 2] │
│ │
│ ISR (In-Sync Replicas): │
│ ├── Leaderのログに追いついたレプリカ │
│ ├── 遅れるとISRから除外 │
│ └── acks=all → ISR全体が複製確認 │
│ │
│ Controller: │
│ ├── ZooKeeper/KRaftで選出 │
│ ├── パーティションリーダー割り当て │
│ └── ブローカー障害時にリーダー再選出 │
│ │
└─────────────────────────────────────────────┘
12. 設計(せっけい)パターン集
12.1 Circuit Breaker
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 遮断
HALF_OPEN = "half_open" # テスト
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max = half_open_max
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_count = 0
self.lock = Lock()
def call(self, func, *args, **kwargs):
"""保護された関数呼び出し"""
with self.lock:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_count = 0
else:
raise CircuitBreakerOpenError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.half_open_count += 1
if self.half_open_count >= self.half_open_max:
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
12.2 指数バックオフ付きリトライ
import random
import time
from functools import wraps
def retry_with_backoff(max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True):
"""指数バックオフとジッター付きリトライ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise e
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay = delay * random.uniform(0.5, 1.5)
print(f"Attempt {attempt + 1} failed, "
f"retrying in {delay:.1f}s...")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=5, base_delay=0.5)
def call_remote_service():
"""リモートサービス呼び出し"""
pass
13. クイズ
Q1. CAP定理で実際に選択すべきものは?
ネットワークパーティション(P)は分散システムで不可避なので、実際の選択はCP vs APです。
- CP選択: パーティション時に一貫性を優先(一部リクエスト拒否)。例: etcd, ZooKeeper, HBase
- AP選択: パーティション時に可用性を優先(古いデータを返す可能性)。例: Cassandra, DynamoDB
PACELC拡張では正常状態でのレイテンシ(L)vs 一貫性(C)のトレードオフも考慮します。
Q2. RaftでLeader Electionが必要な状況は?
- クラスタ初期起動: まだリーダーがない時
- リーダー障害: Followerがリーダーのheartbeatを一定時間(election timeout)内に受け取れない時
- ネットワークパーティション: リーダーと過半数ノードが分離された時
過程: FollowerがCandidateに遷移しTermを増加させ、他ノードに投票を要請します。過半数の投票を得れば新Leaderになります。同Termで2つのCandidateが同時に選挙を開始するとsplit voteが発生し、ランダムタイムアウトで解決します。
Q3. ベクタークロックがLamportタイムスタンプより優れている点は?
Lamportタイムスタンプは L(a) が L(b) より小さい時、aがbの原因なのか偶然小さいだけなのか区別できません。ベクタークロックは因果関係を正確に判断できます。
ベクタークロックでは2つのイベントが因果的に関連するか、または同時発生(concurrent)かを正確に判別できます。これにより同時書き込み衝突を検出し適切な解決戦略を適用できます。欠点はベクターサイズがノード数に比例して増加することです。
Q4. Sagaパターンが2PCの代わりに使われる理由は?
2PCの問題点:
- コーディネーターが単一障害点(SPOF)
- 参加者がprepare後にブロックされうる
- 同期的でパフォーマンス低下
- マイクロサービス環境で強い結合が発生
Sagaの利点:
- 各サービスが独立してローカルトランザクション実行
- 非同期的で高い可用性
- 障害時に補償トランザクションで復旧
- 疎結合を維持
ただし、Sagaは最終的一貫性のみを保証し、中間状態が露出する可能性があります。
Q5. コンシステントハッシングで仮想ノードが必要な理由は?
物理ノードのみだとハッシュリング上でノードが不均等に分布し、負荷偏りが発生します。
仮想ノード(Virtual Node)を使うと:
- 各物理ノードがリング上に複数位置を占め均等な負荷分配
- ノード追加/削除時の漸進的なリバランシングが可能
- 異機種ハードウェアに対して仮想ノード数を変えて性能に合った負荷割り当て
例えば、性能が2倍のサーバーに仮想ノードを2倍割り当てて、より多くのキーを処理させることができます。
14. 参考資料(さんこうしりょう)
- "Designing Data-Intensive Applications" - Martin Kleppmann(必読書)
- Raft論文 - "In Search of an Understandable Consensus Algorithm" (Diego Ongaro, 2014)
- Paxos論文 - "The Part-Time Parliament" (Leslie Lamport, 1998)
- Dynamo論文 - "Dynamo: Amazon's Highly Available Key-value Store" (2007)
- Google Spanner論文 - "Spanner: Google's Globally-Distributed Database" (2012)
- CAP定理証明 - "Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services" (2002)
- PACELC - Daniel Abadi, "Consistency Tradeoffs in Modern Distributed Database System Design"
- Lamport Clocks - "Time, Clocks, and the Ordering of Events in a Distributed System" (1978)
- Vector Clocks - "Timestamps in Message-Passing Systems That Preserve the Partial Ordering" (1988)
- Kafka Documentation - Apache Kafka Official Documentation
- etcd Documentation - etcd.io
- "Distributed Systems for Fun and Profit" - Mikito Takada(オンライン無料)
- Jepsen - Kyle Kingsburyの分散システム検証プロジェクト