- Published on
Apache Spark 内部完全ガイド 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 深層解析
- Authors

- Name
- Youngju Kim
- @fjvbn20031
はじめに: Spark はいかにして MapReduce を駆逐したか
2010 年、UC Berkeley にて
2010 年 UC Berkeley の AMPLab で Matei Zaharia が論文を発表した。"Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"。当時ビッグデータの王は Hadoop MapReduce。Spark は 2 つを主張した:
- メモリベースで 100 倍高速。
- API がはるかにシンプル。
タイムライン:
- 2013: Apache に寄贈。
- 2014: Databricks 創業。
- 2016: Spark 2.0 (DataFrame, Catalyst, Tungsten)。
- 2020: Spark 3.0 (Adaptive Query Execution)。
- 現在: ビッグデータ処理の事実上の標準。
なぜここまで速いのか
Spark のパフォーマンス革命は 3 つの軸から来る:
- RDD (Resilient Distributed Dataset): インメモリ演算。
- Catalyst Optimizer: クエリ最適化。
- Tungsten: CPU とメモリの最適化。
この 3 つが合わさり MapReduce 比 100 倍、時に 1000 倍のパフォーマンスを叩き出す。
1. RDD: すべての始まり
誕生の背景
2009 年頃、ビッグデータ処理は MapReduce が支配。問題は:
- 反復アルゴリズム (ML、グラフ) が極度に遅い。
- 各 Map-Reduce ジョブがディスクを往復。
- 対話的分析が不可能。
Zaharia の洞察: 「データをメモリに保持すればよいのでは?」 しかしメモリは揮発的。ノード障害時はどうするか? 解決は lineage (系譜)。データではなく変換演算の記録を保持する。障害時は再計算。
RDD の特性
- Resilient: 障害に強い (lineage で)。
- Distributed: クラスタに分散。
- Immutable、Lazy、Typed、Partitioned。
Transformation vs Action
Transformation は lazy:
val lines = sc.textFile("data.txt")
val words = lines.flatMap(_.split(" "))
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _)
Action が実行をトリガー:
reduced.collect()
Lazy 評価の利点: 複数 transformation を一度に融合、take(10) 時の早期終了、中間データを貯めないパイプライン。
Lineage と耐障害性
val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.map(x => x.toUpperCase)
val rdd3 = rdd2.filter(x => x.startsWith("A"))
rdd3 の partition が失われた場合、lineage をたどって当該 partition のみ再計算。レプリケーション不要。長い lineage は checkpoint() で短縮できる。
Narrow vs Wide Dependency
- Narrow (map、filter、union): 子 partition が親の少数 partition に依存。stage 内で pipeline 実行。
- Wide (groupByKey、reduceByKey、join): shuffle 必要、stage 境界。
DAG と Stage
Action 受信時、RDD グラフを DAG に変換、wide dependency を境界に stage 分割、各 stage が partition 数だけの task に分解、並列実行。
RDD の限界
スキーマなし、optimizer なし、遅い Java serialization、JVM オブジェクトのオーバーヘッド。Spark 2.0 は DataFrame/Dataset + Catalyst + Tungsten でこれを解決した。
2. DataFrame & Dataset
DataFrame (Spark 1.3)
SQL のテーブルのように構造化された RDD:
val df = spark.read.json("people.json")
df.filter($"age" > 20).select("name").show()
スキーマあり、高水準 API、Catalyst 最適化、Tungsten バイナリ format。
Dataset (Spark 1.6+)
DataFrame + 型安全性:
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)
DataFrame = Dataset[Row]。Python は DataFrame のみ。
パフォーマンス差
1 億行フィルタ: RDD 100 秒、DataFrame/Dataset 15 秒 — 6〜7 倍高速。Catalyst、Tungsten、whole-stage codegen のおかげ。
3. Catalyst Optimizer
4 段階
SQL or DataFrame
-> Analysis (resolved logical plan)
-> Logical Optimization (rule-based)
-> Physical Planning (複数プラン + コスト比較)
-> Code Generation (Tungsten)
-> RDD 実行
Logical Optimization ルール
- Constant folding:
age * 2 + 3を事前計算。 - Predicate pushdown: フィルタを join の前・データソースへ。
- Projection pushdown: 必要な列のみ読む。
- Simplification:
WHERE true AND x > 10はWHERE x > 10に。 - Subquery elimination:
IN (SELECT ...)を semi-join に変換。
例
val df = spark.sql("""
SELECT u.name, COUNT(*) as orders
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
""")
df.explain(true)
最適化後: country = 'KR' フィルタが users 直上に移動、必要列のみ projection。
Physical Planning
論理プランから実行可能な物理演算子へ。複数プランを生成しコスト比較:
- BroadcastHashJoin: 小テーブル。
- SortMergeJoin: 大テーブル同士。
- ShuffledHashJoin: 中間。
- BroadcastNestedLoop: cartesian。
Cost-Based Optimizer (CBO)
Spark 2.2+ から統計ベース最適化:
ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age
行数、min/max、null/distinct カウントで join 順序の最適化と selectivity 推定。
4. Tungsten: CPU とメモリの再発明
JVM オブジェクトのオーバーヘッド
Integer i = 42; // 4 バイトの int に ~24 バイト
String s = "hello"; // 5 文字に 60+ バイト
数億オブジェクトでメモリ浪費と GC 圧力が深刻。
Off-heap バイナリ format
Tungsten は JVM ヒープ外に行を連続バイトで保存:
Row: [age=30, name="Alice"]
Binary: [bitmap][age:4][name_offset:4][name_length:4]...[name_data]
利点: JVM オブジェクトなし、GC 圧力なし、キャッシュ親和、sun.misc.Unsafe による直接メモリアクセス。
UnsafeRow
public final class UnsafeRow extends MutableRow {
private Object baseObject;
private long baseOffset;
private int sizeInBytes;
private int numFields;
}
フィールドアクセスはポインタ演算:
long getLong(int i) {
return Platform.getLong(baseObject, baseOffset + offsets[i]);
}
メモリ管理
Tungsten は execution memory (shuffle、join、sort) と storage memory (cache) を動的に借り合う。
spark.executor.memory = 8g
spark.memory.fraction = 0.6
spark.memory.storageFraction = 0.5
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g
効果: メモリ 30〜70% 削減、クエリ 2〜5 倍高速、GC pause 大幅減。
5. Whole-Stage Code Generation
Volcano モデルのオーバーヘッド
従来 SQL エンジンは iterator-based Volcano:
while (child.hasNext()) {
Row row = child.next();
if (predicate(row)) process(row);
}
virtual function call、行ごとのオブジェクト生成、CPU パイプライン破壊が累積。
WSCG のアイデア
複数演算子を 1 つの関数に融合。SELECT name FROM users WHERE age > 20 の場合:
while (scanner.hasNext()) {
Row row = scanner.next();
if (row.getInt("age") > 20) {
emit(row.getString("name"));
}
}
単一ループ、virtual call なし、JIT 最適化可能。
仕組み
Spark 2.0 が stage 単位で Java ソースを生成、Janino でコンパイル、JVM の JIT が native 化。典型的に 2〜10 倍高速。
生成コード例
public class GeneratedIterator {
private scan_input_0;
public boolean hasNext() { return scan_input_0.hasNext(); }
public UnsafeRow next() {
while (scan_input_0.hasNext()) {
UnsafeRow row = scan_input_0.next();
int age = row.getInt(2);
if (age > 20) {
UTF8String country = row.getUTF8String(3);
if (country.equals("KR")) {
UnsafeRow output = /* build row */;
return output;
}
}
}
return null;
}
}
EXPLAIN で確認
df.explain("codegen")
生成された Java コードを表示。*(1) は WSCG stage 番号。
6. Shuffle: 分散集計の核心
なぜ Shuffle が必要か
wide dependency はデータの再分配が必要:
Stage 1 (4 partitions):
P0: [(a,1), (b,2), (c,3)]
P1: [(a,4), (d,5)]
...
reduceByKey -> すべての 'a' を同じ partition へ。
Shuffle フェーズ
- Map 側: 各 task がローカルファイルに、reducer ごとの partition に分類。
- Reduce 側: 全 map task から自 partition データを fetch、ネットワーク転送、ソート/集計。
Shuffle は Spark で最も高コスト: ディスク I/O、ネットワーク、serialization、sort。
Shuffle 戦略の進化
- Hash Shuffle (初期): M × R ファイル — FS 負担。
- Sort Shuffle (1.1+、デフォルト): 1 map task につき 1 ファイル + index。
- Tungsten Sort Shuffle (1.5+): binary format、キャッシュ親和。
Broadcast Join
val joined = big.join(broadcast(small), "key")
Driver が小テーブルを全 Executor にコピー、各 Executor が big の自 partition とローカル join。Shuffle なし。デフォルト spark.sql.autoBroadcastJoinThreshold = 10 MB。shuffle join 比で数倍〜数十倍高速。
Shuffle Partitioning
spark.sql.shuffle.partitions デフォルト 200。小さすぎれば OOM、大きすぎれば小ファイル問題。目安: 各 partition 100〜200 MB。
最適化テクニック
df.coalesce(10).write.parquet(...)
df.repartition(50, $"key")
val hint_df = big.join(broadcast(small), ...)
プラス skew 処理 (後述)。
7. Adaptive Query Execution (AQE)
Spark 3.0 のキラー機能
AQE は実行時統計を使い、残りのプランを動的に再最適化する。
spark.sql.adaptive.enabled = true # 3.2 以降デフォルト true
3 つの最適化
- Shuffle partition の動的 coalesce: 小 partition を merge (例 200 -> 20)。
- Join 戦略の動的切替: 実サイズが小さければ SortMergeJoin を BroadcastHashJoin へ昇格。
- Skew join の動的最適化: 大きすぎる partition を分割して複数 task で処理。
Databricks TPC-DS: 平均 1.3〜2 倍、一部クエリは 3 倍以上。
8. Dynamic Partition Pruning
問題
Star schema の join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
DPP なしでは fact_sales を全スキャン。
DPP
Spark は dim_date から year=2024 の id を先に抽出、fact_sales の partition filter として pushdown。対象 partition のみロード。数 TB -> 数 GB、10〜100 倍高速化。
条件: partitioned テーブル、selective な dimension filter、Spark 3.0+。
9. 実戦チューニング
メモリ
--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10
Executor あたり 4〜6 cores、overhead は max(384MB, 10%)。
Shuffle
--conf spark.sql.shuffle.partitions=200
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
Partition 数 = 総データ / 目標サイズ (200 MB)。1 TB なら 5000。
Broadcast 閾値
--conf spark.sql.autoBroadcastJoinThreshold=100MB
デフォルト 10 MB は小さすぎることが多い。Executor メモリに注意。
Cache
df.cache()
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
複数回使用または高コスト計算の DataFrame に使う。
Skew 手動処理
val salted = df.withColumn("salted_key",
concat($"key", lit("_"), (rand() * 10).cast("int")))
10. パフォーマンスデバッグ
Spark UI
最重要ツール。Jobs、Stages、SQL/DataFrame、Executors、Storage タブ。
注目 Metric
- Task duration 分布: median と max の差が大きければ skew。
- Shuffle read/write: 想定超なら filter pushdown 失敗。
- GC time: task 時間の 10% 超なら GC 問題。
Explain
df.explain()
df.explain(true)
df.explain("cost")
df.explain("codegen")
df.explain("formatted")
11. よくある落とし穴
- 小さいファイルが多すぎる ->
coalesce/repartition。 - Skew -> AQE skew join、null key フィルタ、salting。
- Broadcast OOM -> 閾値を下げる、
broadcast()ヒント削除。 - Shuffle partition が多すぎる -> AQE coalesce または手動調整。
- Python UDF のオーバーヘッド -> Pandas UDF (Arrow ベースで 10 倍高速):
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
return s * 2
- Partition 数が不適切 -> 各 partition 100〜200 MB を目指す。
クイズで復習
Q1. RDD の lineage はどのように障害復旧を実現するか?
各 RDD は親 RDD と変換関数を記憶する。partition 損失時、lineage をたどって当該 partition のみ再計算。レプリケーション不要でディスク/ネットワーク節約。partition 単位再計算が鍵。
Lineage が長いと再計算が重い — checkpoint() で切断可能。Wide dependency (shuffle) は復旧が複雑だが、shuffle ファイルがローカルディスクに残るため軽減される。外部システム副作用 (DB 書き込みなど) は再実行で重複する可能性があり、idempotent 設計が必要。
Lineage は「状態ではなく、作り方を保存せよ」という関数型哲学の実装。インメモリ演算と軽量復旧の組み合わせが、Spark を MapReduce 比 100 倍高速にした基盤である。
Q2. Catalyst はどのようにクエリを rewrite するのか?
Catalyst はクエリをツリーとして表現し、数十〜数百の rewrite rule を反復適用する。
主要ルール:
- Projection pushdown: 参照列のみ読む。
- Filter pushdown: Parquet の min/max 統計で row group スキップ可能。
- Constant folding: 定数式を事前計算。
- Simplification:
WHERE true AND x > 10をWHERE x > 10に。 - Join reorder (CBO): 統計から最安 join 順序。
- Subquery unnesting:
IN (SELECT ...)を semi-join に。 - Column pruning through join。
- Decimal 精度最適化。
Physical planning では BroadcastHashJoin、SortMergeJoin などを cost で選択。Broadcast は数倍〜数十倍高速。
Catalyst ルールはユーザー拡張可能で、Delta Lake や Iceberg がこれに立脚する。結果として素朴な SQL が手動最適化に近い速度で動く。
Q3. Tungsten が JVM オブジェクトではなくバイナリ format を使う理由と効果は?
Integer は 4 バイトの値に ~24 バイト、5 文字の String は ~72 バイト。億行スケールでは 10 倍のメモリ浪費と GC 圧力になる。
Tungsten は行を連続バイト配列に保存。固定長フィールドはインライン、可変長はポインタ + オフセット、null bitmap。UnsafeRow は sun.misc.Unsafe でポインタ演算による直接アクセス。
効果:
- メモリ: ~6 倍削減。
- GC 圧力: 除去 (off-heap または大きな配列少数)。
- キャッシュ親和: 連続レイアウトが cache line にフィット。
- Serialization:
UnsafeRowはそのままネットワーク送信可能。 - SIMD 親和レイアウト。
コスト: 低レベル複雑性、デバッグ困難、可変長変更が重い、Python-JVM 変換コスト (Arrow で緩和)。
Spark 3.x は columnar vectorized execution (Parquet/ORC reader、batch 4096 行) を導入。Tungsten と Arrow は補完関係: Tungsten は row-wise 内部処理、Arrow は言語間 columnar 交換。
Q4. Whole-Stage CodeGen はなぜ Volcano より高速か?
Volcano の iterator モデル (1970 年代) は演算子を next() で合成するが、現代 CPU とは相性が悪い:
- virtual function call がインライン化を阻害。
- 行ごとのオブジェクト生成で GC 圧力。
- 演算子間で branch 予測ミス。
- コードパス切替でキャッシュミス。
- Java boxing。
WSCG は stage 内の演算子を 1 つの Java 関数に融合し、Janino でコンパイル、JVM の JIT で最適化。単一ループ、inline 判定、Tungsten 経由の直接メモリアクセス。
Databricks TPC-DS: Volcano 比 4〜10 倍。
制約: JVM の 64KB メソッド制限、複雑な演算子 (window、UDF) は Volcano にフォールバック、生成コードのデバッグが難しい、小クエリでのコンパイルオーバーヘッド。
比較:
- Volcano: baseline。
- WSCG: 4〜10 倍。
- Vectorized (ClickHouse、DuckDB): 4〜10 倍。
- LLVM ネイティブ (HyPer/Umbra): 10〜20 倍。
Spark は vectorized Parquet 読取 + row 処理は WSCG という実用的妥協。
Q5. AQE は実行時統計をどう活用するか?
静的 optimizer は実行前にプランを決める。統計の陳腐化、selectivity 推定困難、join サイズ予測困難により誤計画になりうる。AQE は shuffle 境界で一時停止し、実データを観察、残りのプランを再最適化する。
3 つの最適化:
- Shuffle partition の coalesce: 小 partition を merge (例 200 -> 20)。
- Join 戦略切替: 実サイズが小ならば SortMergeJoin -> BroadcastHashJoin。
- Skew join: 大 partition を sub-partition に分割、別 task で処理。
例: skewed partition 100 秒 -> 10 sub-partition の 10 秒ずつ。
DPP (Dynamic Partition Pruning) も併用: dim filter id を fact の partition filter として pushdown、数 TB を数 GB に。
Spark 3.0 で導入 (opt-in)、3.2 でデフォルト有効化。TPC-DS 平均 ~1.8 倍、一部 3 倍以上。
限界: Structured Streaming への適用が限定的、小クエリでは逆効果、複雑クエリは依然手動チューニング必要。哲学的に AQE は AOT 最適化ではなく JIT 最適化。
おわりに: データの精緻なエンジン
要点
- RDD: インメモリ分散コレクション、lineage で障害復旧。
- DataFrame: 構造化 API + スキーマ + optimizer。
- Catalyst: rule + cost ベース SQL 最適化。
- Tungsten: off-heap バイナリ format、キャッシュ親和。
- WSCG: 実行時 Java コード生成、Volcano オーバーヘッド除去。
- Shuffle: 分散の核、同時にボトルネック。
- AQE: 実行時再最適化、30〜200% 改善。
実戦推奨
- デフォルト設定を理解:
spark.executor.memory、spark.sql.shuffle.partitions、AQE。 - 小 dimension は broadcast join。
- クエリパターンに合わせた partition column。
- 遅いクエリには EXPLAIN。
- Spark UI で task 分布と GC time を監視。
- Parquet/ORC を優先。
最後の教訓
Spark の成功は技術的卓越 + タイミング + エコシステムの合算。次に .filter().groupBy().count() を書くとき、内部では Catalyst が rewrite し、Tungsten がメモリレイアウトを最適化し、WSCG が Java を生成し、AQE が再計画し、Shuffle がデータを再分配している。すべて数ミリ秒〜数秒で。
参考資料
- Apache Spark Documentation
- Learning Spark, 2nd Edition (Jules Damji et al.)
- Spark SQL: Relational Data Processing (Armbrust et al., 2015)
- Resilient Distributed Datasets (Zaharia et al., 2012)
- Databricks Engineering Blog
- Project Tungsten: Bringing Spark Closer to Bare Metal
- Apache Spark as a Compiler (Databricks Blog)
- Adaptive Query Execution in Apache Spark
- High Performance Spark (Holden Karau, Rachel Warren)