Skip to content

✍️ 필사 모드: Apache Spark 内部完全ガイド 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 深層解析

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

はじめに: Spark はいかにして MapReduce を駆逐したか

2010 年、UC Berkeley にて

2010 年 UC Berkeley の AMPLab で Matei Zaharia が論文を発表した。"Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"。当時ビッグデータの王は Hadoop MapReduce。Spark は 2 つを主張した:

  1. メモリベースで 100 倍高速。
  2. API がはるかにシンプル。

タイムライン:

  • 2013: Apache に寄贈。
  • 2014: Databricks 創業。
  • 2016: Spark 2.0 (DataFrame, Catalyst, Tungsten)。
  • 2020: Spark 3.0 (Adaptive Query Execution)。
  • 現在: ビッグデータ処理の事実上の標準。

なぜここまで速いのか

Spark のパフォーマンス革命は 3 つの軸から来る:

  1. RDD (Resilient Distributed Dataset): インメモリ演算。
  2. Catalyst Optimizer: クエリ最適化。
  3. Tungsten: CPU とメモリの最適化。

この 3 つが合わさり MapReduce 比 100 倍、時に 1000 倍のパフォーマンスを叩き出す。


1. RDD: すべての始まり

誕生の背景

2009 年頃、ビッグデータ処理は MapReduce が支配。問題は:

  1. 反復アルゴリズム (ML、グラフ) が極度に遅い。
  2. 各 Map-Reduce ジョブがディスクを往復。
  3. 対話的分析が不可能。

Zaharia の洞察: 「データをメモリに保持すればよいのでは?」 しかしメモリは揮発的。ノード障害時はどうするか? 解決は lineage (系譜)。データではなく変換演算の記録を保持する。障害時は再計算。

RDD の特性

  • Resilient: 障害に強い (lineage で)。
  • Distributed: クラスタに分散。
  • Immutable、Lazy、Typed、Partitioned。

Transformation vs Action

Transformation は lazy:

val lines = sc.textFile("data.txt")
val words = lines.flatMap(_.split(" "))
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _)

Action が実行をトリガー:

reduced.collect()

Lazy 評価の利点: 複数 transformation を一度に融合、take(10) 時の早期終了、中間データを貯めないパイプライン。

Lineage と耐障害性

val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.map(x => x.toUpperCase)
val rdd3 = rdd2.filter(x => x.startsWith("A"))

rdd3 の partition が失われた場合、lineage をたどって当該 partition のみ再計算。レプリケーション不要。長い lineage は checkpoint() で短縮できる。

Narrow vs Wide Dependency

  • Narrow (map、filter、union): 子 partition が親の少数 partition に依存。stage 内で pipeline 実行。
  • Wide (groupByKey、reduceByKey、join): shuffle 必要、stage 境界。

DAG と Stage

Action 受信時、RDD グラフを DAG に変換、wide dependency を境界に stage 分割、各 stage が partition 数だけの task に分解、並列実行。

RDD の限界

スキーマなし、optimizer なし、遅い Java serialization、JVM オブジェクトのオーバーヘッド。Spark 2.0 は DataFrame/Dataset + Catalyst + Tungsten でこれを解決した。


2. DataFrame & Dataset

DataFrame (Spark 1.3)

SQL のテーブルのように構造化された RDD:

val df = spark.read.json("people.json")
df.filter($"age" > 20).select("name").show()

スキーマあり、高水準 API、Catalyst 最適化、Tungsten バイナリ format。

Dataset (Spark 1.6+)

DataFrame + 型安全性:

case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)

DataFrame = Dataset[Row]。Python は DataFrame のみ。

パフォーマンス差

1 億行フィルタ: RDD 100 秒、DataFrame/Dataset 15 秒 — 6〜7 倍高速。Catalyst、Tungsten、whole-stage codegen のおかげ。


3. Catalyst Optimizer

4 段階

SQL or DataFrame
  -> Analysis (resolved logical plan)
  -> Logical Optimization (rule-based)
  -> Physical Planning (複数プラン + コスト比較)
  -> Code Generation (Tungsten)
  -> RDD 実行

Logical Optimization ルール

  • Constant folding: age * 2 + 3 を事前計算。
  • Predicate pushdown: フィルタを join の前・データソースへ。
  • Projection pushdown: 必要な列のみ読む。
  • Simplification: WHERE true AND x > 10WHERE x > 10 に。
  • Subquery elimination: IN (SELECT ...) を semi-join に変換。

val df = spark.sql("""
  SELECT u.name, COUNT(*) as orders
  FROM users u
  JOIN orders o ON u.id = o.user_id
  WHERE u.country = 'KR'
  GROUP BY u.name
""")
df.explain(true)

最適化後: country = 'KR' フィルタが users 直上に移動、必要列のみ projection。

Physical Planning

論理プランから実行可能な物理演算子へ。複数プランを生成しコスト比較:

  • BroadcastHashJoin: 小テーブル。
  • SortMergeJoin: 大テーブル同士。
  • ShuffledHashJoin: 中間。
  • BroadcastNestedLoop: cartesian。

Cost-Based Optimizer (CBO)

Spark 2.2+ から統計ベース最適化:

ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age

行数、min/max、null/distinct カウントで join 順序の最適化と selectivity 推定。


4. Tungsten: CPU とメモリの再発明

JVM オブジェクトのオーバーヘッド

Integer i = 42;  // 4 バイトの int に ~24 バイト
String s = "hello";  // 5 文字に 60+ バイト

数億オブジェクトでメモリ浪費と GC 圧力が深刻。

Off-heap バイナリ format

Tungsten は JVM ヒープ外に行を連続バイトで保存:

Row: [age=30, name="Alice"]
Binary: [bitmap][age:4][name_offset:4][name_length:4]...[name_data]

利点: JVM オブジェクトなし、GC 圧力なし、キャッシュ親和、sun.misc.Unsafe による直接メモリアクセス。

UnsafeRow

public final class UnsafeRow extends MutableRow {
    private Object baseObject;
    private long baseOffset;
    private int sizeInBytes;
    private int numFields;
}

フィールドアクセスはポインタ演算:

long getLong(int i) {
    return Platform.getLong(baseObject, baseOffset + offsets[i]);
}

メモリ管理

Tungsten は execution memory (shuffle、join、sort) と storage memory (cache) を動的に借り合う。

spark.executor.memory = 8g
spark.memory.fraction = 0.6
spark.memory.storageFraction = 0.5
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g

効果: メモリ 30〜70% 削減、クエリ 2〜5 倍高速、GC pause 大幅減。


5. Whole-Stage Code Generation

Volcano モデルのオーバーヘッド

従来 SQL エンジンは iterator-based Volcano:

while (child.hasNext()) {
    Row row = child.next();
    if (predicate(row)) process(row);
}

virtual function call、行ごとのオブジェクト生成、CPU パイプライン破壊が累積。

WSCG のアイデア

複数演算子を 1 つの関数に融合。SELECT name FROM users WHERE age > 20 の場合:

while (scanner.hasNext()) {
    Row row = scanner.next();
    if (row.getInt("age") > 20) {
        emit(row.getString("name"));
    }
}

単一ループ、virtual call なし、JIT 最適化可能。

仕組み

Spark 2.0 が stage 単位で Java ソースを生成、Janino でコンパイル、JVM の JIT が native 化。典型的に 2〜10 倍高速。

生成コード例

public class GeneratedIterator {
    private scan_input_0;

    public boolean hasNext() { return scan_input_0.hasNext(); }

    public UnsafeRow next() {
        while (scan_input_0.hasNext()) {
            UnsafeRow row = scan_input_0.next();
            int age = row.getInt(2);
            if (age > 20) {
                UTF8String country = row.getUTF8String(3);
                if (country.equals("KR")) {
                    UnsafeRow output = /* build row */;
                    return output;
                }
            }
        }
        return null;
    }
}

EXPLAIN で確認

df.explain("codegen")

生成された Java コードを表示。*(1) は WSCG stage 番号。


6. Shuffle: 分散集計の核心

なぜ Shuffle が必要か

wide dependency はデータの再分配が必要:

Stage 1 (4 partitions):
  P0: [(a,1), (b,2), (c,3)]
  P1: [(a,4), (d,5)]
  ...
reduceByKey  -> すべての 'a' を同じ partition へ。

Shuffle フェーズ

  • Map 側: 各 task がローカルファイルに、reducer ごとの partition に分類。
  • Reduce 側: 全 map task から自 partition データを fetch、ネットワーク転送、ソート/集計。

Shuffle は Spark で最も高コスト: ディスク I/O、ネットワーク、serialization、sort。

Shuffle 戦略の進化

  • Hash Shuffle (初期): M × R ファイル — FS 負担。
  • Sort Shuffle (1.1+、デフォルト): 1 map task につき 1 ファイル + index。
  • Tungsten Sort Shuffle (1.5+): binary format、キャッシュ親和。

Broadcast Join

val joined = big.join(broadcast(small), "key")

Driver が小テーブルを全 Executor にコピー、各 Executor が big の自 partition とローカル join。Shuffle なし。デフォルト spark.sql.autoBroadcastJoinThreshold = 10 MB。shuffle join 比で数倍〜数十倍高速。

Shuffle Partitioning

spark.sql.shuffle.partitions デフォルト 200。小さすぎれば OOM、大きすぎれば小ファイル問題。目安: 各 partition 100〜200 MB。

最適化テクニック

df.coalesce(10).write.parquet(...)
df.repartition(50, $"key")
val hint_df = big.join(broadcast(small), ...)

プラス skew 処理 (後述)。


7. Adaptive Query Execution (AQE)

Spark 3.0 のキラー機能

AQE は実行時統計を使い、残りのプランを動的に再最適化する。

spark.sql.adaptive.enabled = true  # 3.2 以降デフォルト true

3 つの最適化

  1. Shuffle partition の動的 coalesce: 小 partition を merge (例 200 -> 20)。
  2. Join 戦略の動的切替: 実サイズが小さければ SortMergeJoin を BroadcastHashJoin へ昇格。
  3. Skew join の動的最適化: 大きすぎる partition を分割して複数 task で処理。

Databricks TPC-DS: 平均 1.3〜2 倍、一部クエリは 3 倍以上。


8. Dynamic Partition Pruning

問題

Star schema の join:

SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024

DPP なしでは fact_sales を全スキャン。

DPP

Spark は dim_date から year=2024 の id を先に抽出、fact_sales の partition filter として pushdown。対象 partition のみロード。数 TB -> 数 GB、10〜100 倍高速化。

条件: partitioned テーブル、selective な dimension filter、Spark 3.0+。


9. 実戦チューニング

メモリ

--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10

Executor あたり 4〜6 cores、overhead は max(384MB, 10%)。

Shuffle

--conf spark.sql.shuffle.partitions=200
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true

Partition 数 = 総データ / 目標サイズ (200 MB)。1 TB なら 5000。

Broadcast 閾値

--conf spark.sql.autoBroadcastJoinThreshold=100MB

デフォルト 10 MB は小さすぎることが多い。Executor メモリに注意。

Cache

df.cache()
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

複数回使用または高コスト計算の DataFrame に使う。

Skew 手動処理

val salted = df.withColumn("salted_key",
    concat($"key", lit("_"), (rand() * 10).cast("int")))

10. パフォーマンスデバッグ

Spark UI

最重要ツール。Jobs、Stages、SQL/DataFrame、Executors、Storage タブ。

注目 Metric

  • Task duration 分布: median と max の差が大きければ skew。
  • Shuffle read/write: 想定超なら filter pushdown 失敗。
  • GC time: task 時間の 10% 超なら GC 問題。

Explain

df.explain()
df.explain(true)
df.explain("cost")
df.explain("codegen")
df.explain("formatted")

11. よくある落とし穴

  1. 小さいファイルが多すぎる -> coalesce/repartition
  2. Skew -> AQE skew join、null key フィルタ、salting。
  3. Broadcast OOM -> 閾値を下げる、broadcast() ヒント削除。
  4. Shuffle partition が多すぎる -> AQE coalesce または手動調整。
  5. Python UDF のオーバーヘッド -> Pandas UDF (Arrow ベースで 10 倍高速):
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
    return s * 2
  1. Partition 数が不適切 -> 各 partition 100〜200 MB を目指す。

クイズで復習

Q1. RDD の lineage はどのように障害復旧を実現するか?

各 RDD は親 RDD と変換関数を記憶する。partition 損失時、lineage をたどって当該 partition のみ再計算。レプリケーション不要でディスク/ネットワーク節約。partition 単位再計算が鍵。

Lineage が長いと再計算が重い — checkpoint() で切断可能。Wide dependency (shuffle) は復旧が複雑だが、shuffle ファイルがローカルディスクに残るため軽減される。外部システム副作用 (DB 書き込みなど) は再実行で重複する可能性があり、idempotent 設計が必要。

Lineage は「状態ではなく、作り方を保存せよ」という関数型哲学の実装。インメモリ演算と軽量復旧の組み合わせが、Spark を MapReduce 比 100 倍高速にした基盤である。

Q2. Catalyst はどのようにクエリを rewrite するのか?

Catalyst はクエリをツリーとして表現し、数十〜数百の rewrite rule を反復適用する。

主要ルール:

  • Projection pushdown: 参照列のみ読む。
  • Filter pushdown: Parquet の min/max 統計で row group スキップ可能。
  • Constant folding: 定数式を事前計算。
  • Simplification: WHERE true AND x > 10WHERE x > 10 に。
  • Join reorder (CBO): 統計から最安 join 順序。
  • Subquery unnesting: IN (SELECT ...) を semi-join に。
  • Column pruning through join。
  • Decimal 精度最適化。

Physical planning では BroadcastHashJoin、SortMergeJoin などを cost で選択。Broadcast は数倍〜数十倍高速。

Catalyst ルールはユーザー拡張可能で、Delta Lake や Iceberg がこれに立脚する。結果として素朴な SQL が手動最適化に近い速度で動く。

Q3. Tungsten が JVM オブジェクトではなくバイナリ format を使う理由と効果は?

Integer は 4 バイトの値に ~24 バイト、5 文字の String は ~72 バイト。億行スケールでは 10 倍のメモリ浪費と GC 圧力になる。

Tungsten は行を連続バイト配列に保存。固定長フィールドはインライン、可変長はポインタ + オフセット、null bitmap。UnsafeRowsun.misc.Unsafe でポインタ演算による直接アクセス。

効果:

  • メモリ: ~6 倍削減。
  • GC 圧力: 除去 (off-heap または大きな配列少数)。
  • キャッシュ親和: 連続レイアウトが cache line にフィット。
  • Serialization: UnsafeRow はそのままネットワーク送信可能。
  • SIMD 親和レイアウト。

コスト: 低レベル複雑性、デバッグ困難、可変長変更が重い、Python-JVM 変換コスト (Arrow で緩和)。

Spark 3.x は columnar vectorized execution (Parquet/ORC reader、batch 4096 行) を導入。Tungsten と Arrow は補完関係: Tungsten は row-wise 内部処理、Arrow は言語間 columnar 交換。

Q4. Whole-Stage CodeGen はなぜ Volcano より高速か?

Volcano の iterator モデル (1970 年代) は演算子を next() で合成するが、現代 CPU とは相性が悪い:

  1. virtual function call がインライン化を阻害。
  2. 行ごとのオブジェクト生成で GC 圧力。
  3. 演算子間で branch 予測ミス。
  4. コードパス切替でキャッシュミス。
  5. Java boxing。

WSCG は stage 内の演算子を 1 つの Java 関数に融合し、Janino でコンパイル、JVM の JIT で最適化。単一ループ、inline 判定、Tungsten 経由の直接メモリアクセス。

Databricks TPC-DS: Volcano 比 4〜10 倍。

制約: JVM の 64KB メソッド制限、複雑な演算子 (window、UDF) は Volcano にフォールバック、生成コードのデバッグが難しい、小クエリでのコンパイルオーバーヘッド。

比較:

  • Volcano: baseline。
  • WSCG: 4〜10 倍。
  • Vectorized (ClickHouse、DuckDB): 4〜10 倍。
  • LLVM ネイティブ (HyPer/Umbra): 10〜20 倍。

Spark は vectorized Parquet 読取 + row 処理は WSCG という実用的妥協。

Q5. AQE は実行時統計をどう活用するか?

静的 optimizer は実行前にプランを決める。統計の陳腐化、selectivity 推定困難、join サイズ予測困難により誤計画になりうる。AQE は shuffle 境界で一時停止し、実データを観察、残りのプランを再最適化する。

3 つの最適化:

  1. Shuffle partition の coalesce: 小 partition を merge (例 200 -> 20)。
  2. Join 戦略切替: 実サイズが小ならば SortMergeJoin -> BroadcastHashJoin。
  3. Skew join: 大 partition を sub-partition に分割、別 task で処理。

例: skewed partition 100 秒 -> 10 sub-partition の 10 秒ずつ。

DPP (Dynamic Partition Pruning) も併用: dim filter id を fact の partition filter として pushdown、数 TB を数 GB に。

Spark 3.0 で導入 (opt-in)、3.2 でデフォルト有効化。TPC-DS 平均 ~1.8 倍、一部 3 倍以上。

限界: Structured Streaming への適用が限定的、小クエリでは逆効果、複雑クエリは依然手動チューニング必要。哲学的に AQE は AOT 最適化ではなく JIT 最適化。


おわりに: データの精緻なエンジン

要点

  1. RDD: インメモリ分散コレクション、lineage で障害復旧。
  2. DataFrame: 構造化 API + スキーマ + optimizer。
  3. Catalyst: rule + cost ベース SQL 最適化。
  4. Tungsten: off-heap バイナリ format、キャッシュ親和。
  5. WSCG: 実行時 Java コード生成、Volcano オーバーヘッド除去。
  6. Shuffle: 分散の核、同時にボトルネック。
  7. AQE: 実行時再最適化、30〜200% 改善。

実戦推奨

  • デフォルト設定を理解: spark.executor.memoryspark.sql.shuffle.partitions、AQE。
  • 小 dimension は broadcast join。
  • クエリパターンに合わせた partition column。
  • 遅いクエリには EXPLAIN。
  • Spark UI で task 分布と GC time を監視。
  • Parquet/ORC を優先。

最後の教訓

Spark の成功は技術的卓越 + タイミング + エコシステムの合算。次に .filter().groupBy().count() を書くとき、内部では Catalyst が rewrite し、Tungsten がメモリレイアウトを最適化し、WSCG が Java を生成し、AQE が再計画し、Shuffle がデータを再分配している。すべて数ミリ秒〜数秒で。


参考資料

현재 단락 (1/290)

2010 年 UC Berkeley の AMPLab で Matei Zaharia が論文を発表した。"Resilient Distributed Datasets: A Fault-Tolera...

작성 글자: 0원문 글자: 12,489작성 단락: 0/290