Skip to content

✍️ 필사 모드: Apache Spark Internals Deep Dive 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

Introduction: How Spark Dethroned MapReduce

2010, UC Berkeley

In 2010, Matei Zaharia published "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing" at UC Berkeley's AMPLab. Hadoop MapReduce was king. Spark claimed:

  1. 100x faster via in-memory computation.
  2. Much simpler API.

Timeline:

  • 2013: Donated to Apache.
  • 2014: Databricks founded.
  • 2016: Spark 2.0 (DataFrame, Catalyst, Tungsten).
  • 2020: Spark 3.0 (Adaptive Query Execution).
  • Today: de facto standard for big data.

Why So Fast

Spark's performance revolution came from three axes:

  1. RDD (Resilient Distributed Dataset): in-memory computation.
  2. Catalyst Optimizer: query optimization.
  3. Tungsten: CPU and memory optimization.

Combined, they deliver 100x — sometimes 1000x — over MapReduce.


1. RDD: Where It All Began

Motivation

Circa 2009, MapReduce dominated but had problems:

  1. Iterative algorithms (ML, graphs) extremely slow.
  2. Each job round-trips to disk.
  3. Interactive analysis impossible.

Zaharia's insight: "What if we keep data in memory?" But memory is volatile — how to handle node failures? Answer: track the lineage (transformation history), not the data. Recompute on failure.

RDD Properties

  • Resilient: tolerates failures via lineage.
  • Distributed: partitioned across the cluster.
  • Immutable, Lazy, Typed, Partitioned.

Transformation vs Action

Transformations build new RDDs lazily:

val lines = sc.textFile("data.txt")
val words = lines.flatMap(_.split(" "))
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _)

Actions trigger execution:

reduced.collect()

Laziness enables fused passes, early-exit on take(10), and pipelined execution without intermediate materialization.

Lineage & Fault Tolerance

val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.map(x => x.toUpperCase)
val rdd3 = rdd2.filter(x => x.startsWith("A"))

If a partition of rdd3 is lost, Spark recomputes just that partition from lineage. No replication needed. Long lineages can be shortened with checkpoint().

Narrow vs Wide Dependency

  • Narrow (map, filter, union): child depends on a few parent partitions. Pipelined within a stage.
  • Wide (groupByKey, reduceByKey, join): requires shuffle. Stage boundary.

DAG and Stage

On action: RDD graph becomes a DAG, split at wide dependencies into stages. Each stage decomposes into tasks (one per partition) executed in parallel.

Limitations of RDDs

No schema, no optimizer, slow Java serialization, JVM object overhead. Spark 2.0 answered with DataFrame/Dataset + Catalyst + Tungsten.


2. DataFrame & Dataset

DataFrame (Spark 1.3)

Structured, SQL-like view of an RDD:

val df = spark.read.json("people.json")
df.filter($"age" > 20).select("name").show()

Schema-aware, higher-level API, Catalyst-optimized, stored in Tungsten binary format.

Dataset (Spark 1.6+)

DataFrame + compile-time type safety:

case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)

DataFrame = Dataset[Row]. Python uses DataFrame only.

Performance Difference

Filtering 100M rows: RDD ~100s, DataFrame/Dataset ~15s — 6-7x faster because of Catalyst, Tungsten, and whole-stage codegen.


3. Catalyst Optimizer

Four Stages

SQL or DataFrame
  -> Analysis (resolve names, types)
  -> Logical Optimization (rule-based)
  -> Physical Planning (multiple plans + cost)
  -> Code Generation (Tungsten)
  -> RDD execution

Logical Optimization Rules

  • Constant folding: age * 2 + 3 precomputed.
  • Predicate pushdown: filters move below joins and into data sources.
  • Projection pushdown: read only needed columns.
  • Simplification: WHERE true AND x > 10 becomes WHERE x > 10.
  • Subquery elimination: IN (SELECT ...) becomes semi-join.

Example

val df = spark.sql("""
  SELECT u.name, COUNT(*) as orders
  FROM users u
  JOIN orders o ON u.id = o.user_id
  WHERE u.country = 'KR'
  GROUP BY u.name
""")
df.explain(true)

Optimized plan pushes country = 'KR' filter down to users, projects only needed columns before the join.

Physical Planning

Generates multiple physical plans and picks by cost:

  • BroadcastHashJoin: small table.
  • SortMergeJoin: large tables.
  • ShuffledHashJoin: medium.
  • BroadcastNestedLoop: cartesian.

Cost-Based Optimizer (CBO)

Since Spark 2.2, with:

ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age

Catalyst uses row counts, min/max, null/distinct counts to reorder joins and estimate selectivity.


4. Tungsten: Reinventing CPU and Memory

JVM Object Overhead

Integer i = 42;  // ~24 bytes for a 4-byte int
String s = "hello";  // 60+ bytes for 5 chars

Billions of objects mean massive memory waste and GC pressure.

Off-heap Binary Format

Tungsten stores rows as contiguous bytes off the JVM heap:

Row: [age=30, name="Alice"]
Binary: [bitmap][age:4][name_offset:4][name_length:4]...[name_data]

Benefits: no JVM objects, no GC pressure, cache-friendly, direct memory access via sun.misc.Unsafe.

UnsafeRow

public final class UnsafeRow extends MutableRow {
    private Object baseObject;
    private long baseOffset;
    private int sizeInBytes;
    private int numFields;
}

Field access is pointer arithmetic:

long getLong(int i) {
    return Platform.getLong(baseObject, baseOffset + offsets[i]);
}

Memory Management

Tungsten manages execution memory (shuffle, join, sort) and storage memory (cache) with dynamic borrowing. Supports both on-heap and off-heap.

spark.executor.memory = 8g
spark.memory.fraction = 0.6
spark.memory.storageFraction = 0.5
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g

Result: 30-70% less memory, 2-5x faster queries, much less GC pause.


5. Whole-Stage Code Generation

Volcano Model Overhead

Traditional SQL engines use iterator-based Volcano:

while (child.hasNext()) {
    Row row = child.next();
    if (predicate(row)) process(row);
}

Per-operator virtual calls, per-row object allocations, and frequent cache misses add up.

WSCG Idea

Fuse multiple operators into a single generated function. For SELECT name FROM users WHERE age > 20:

while (scanner.hasNext()) {
    Row row = scanner.next();
    if (row.getInt("age") > 20) {
        emit(row.getString("name"));
    }
}

One loop, no virtual calls, JIT-friendly.

Pipeline

Spark 2.0 generates Java source per stage (up to shuffle boundary), compiles with Janino, runs on the JVM which JIT-compiles to native. Typical speedup: 2-10x.

Example Generated Code

public class GeneratedIterator {
    private scan_input_0;

    public boolean hasNext() { return scan_input_0.hasNext(); }

    public UnsafeRow next() {
        while (scan_input_0.hasNext()) {
            UnsafeRow row = scan_input_0.next();
            int age = row.getInt(2);
            if (age > 20) {
                UTF8String country = row.getUTF8String(3);
                if (country.equals("KR")) {
                    UnsafeRow output = /* build row */;
                    return output;
                }
            }
        }
        return null;
    }
}

Inspect with EXPLAIN

df.explain("codegen")

Shows the generated Java code. *(1) marks a WSCG stage.


6. Shuffle: The Core of Distributed Aggregation

Why Shuffle

Wide dependencies require redistribution:

Stage 1 (4 partitions):
  P0: [(a,1), (b,2), (c,3)]
  P1: [(a,4), (d,5)]
  ...
reduceByKey  -> all 'a' go to the same partition.

Shuffle Phases

  • Map side: each task writes to local files, partitioned by reducer.
  • Reduce side: reducers fetch their partitions from all mappers over the network, then sort/aggregate.

Shuffle is Spark's most expensive operation — disk I/O, network, serialization, sorting.

Shuffle Strategies

  • Hash Shuffle (early): M x R files — file system overload.
  • Sort Shuffle (1.1+, default): one file per map task with index file.
  • Tungsten Sort Shuffle (1.5+): binary format, cache-friendly.

Broadcast Join

val joined = big.join(broadcast(small), "key")

Driver broadcasts small to every executor; each local partition of big joins locally. No shuffle. Default threshold: spark.sql.autoBroadcastJoinThreshold = 10 MB. Often several times to tens of times faster than shuffle joins.

Shuffle Partitioning

spark.sql.shuffle.partitions default 200. Too few -> OOM and under-parallelism; too many -> small-file overhead. Target ~100-200 MB per partition.

Optimization Techniques

df.coalesce(10).write.parquet(...)
df.repartition(50, $"key")
val hint_df = big.join(broadcast(small), ...)

Plus skew handling (below).


7. Adaptive Query Execution (AQE)

Spark 3.0's Killer Feature

AQE re-plans queries at runtime using actual shuffle statistics.

spark.sql.adaptive.enabled = true  # default true since 3.2

Three Optimizations

  1. Dynamically coalesce shuffle partitions: merges small partitions after shuffle (e.g. 200 -> 20).
  2. Dynamically switch join strategies: upgrades SortMergeJoin to BroadcastHashJoin when actual sizes are small.
  3. Dynamically optimize skew joins: splits oversized partitions into sub-partitions.

Databricks TPC-DS: AQE yields roughly 1.3-2x speedup on average; some queries 3x+.


8. Dynamic Partition Pruning

Problem

Star schema join:

SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024

Without DPP, fact_sales is scanned in full.

With DPP

Spark extracts matching ids from dim_date, pushes them into fact_sales partition filter. Multi-TB scans become multi-GB — 10-100x speedup.

Requires: partitioned table, highly selective dimension filter, Spark 3.0+.


9. Practical Tuning

Memory

--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10

Rules: 4-6 cores per executor; overhead = max(384MB, 10%).

Shuffle

--conf spark.sql.shuffle.partitions=200
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true

Partitions = total data / target size (200 MB). E.g. 1 TB -> 5000 partitions.

Broadcast Threshold

--conf spark.sql.autoBroadcastJoinThreshold=100MB

Default 10 MB is often too small. Watch executor memory — broadcast replicates to all executors.

Cache

df.cache()
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

Cache when reused multiple times or expensive to compute. Spilling to disk may be worse than recomputation.

Skew (Manual)

val salted = df.withColumn("salted_key",
    concat($"key", lit("_"), (rand() * 10).cast("int")))

10. Performance Debugging

Spark UI

Most important tool: Jobs, Stages, SQL/DataFrame, Executors, Storage tabs.

Key Metrics

  • Task duration distribution: large median-to-max gap means skew.
  • Shuffle read/write: bigger than expected means filter pushdown failed.
  • GC time: >10% of task time means trouble.

Explain

df.explain()
df.explain(true)
df.explain("cost")
df.explain("codegen")
df.explain("formatted")

11. Common Pitfalls

  1. Too many small files -> coalesce/repartition.
  2. Skew -> AQE skew join, filter null keys, salting.
  3. Broadcast OOM -> lower threshold or remove broadcast() hint.
  4. Too many shuffle partitions -> AQE coalesce or manual adjust.
  5. Python UDF overhead -> use Pandas UDF (Arrow-based, 10x faster):
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
    return s * 2
  1. Wrong partition counts -> aim for 100-200 MB each.

Quiz Review

Q1. How does RDD lineage enable fault recovery?

Each RDD remembers its parent and the transformation that produced it. On failure, Spark recomputes only the lost partitions from their lineage. This avoids replication (saving network/disk) but can be slow for long lineages — mitigated by checkpoint(). Partition-level recomputation is the key property. Wide dependencies (shuffles) complicate recovery because Spark may need many parent partitions; shuffle files kept on local disk help. Lineage assumes pure functions — side effects like saveToDatabase may duplicate on retry, so idempotency matters.

Lineage reflects a functional philosophy: store how to build state, not the state itself. Combined with in-memory computation, it made Spark 100x faster than MapReduce while remaining fault-tolerant.

Q2. How does Catalyst rewrite queries for performance?

Catalyst represents queries as trees and applies dozens of rewrite rules iteratively.

Key rules:

  • Projection pushdown: read only referenced columns.
  • Filter pushdown: move predicates into data sources (Parquet predicate pushdown can skip entire row groups using min/max stats).
  • Constant folding: evaluate constants at compile time.
  • Simplification: WHERE true AND x > 10 becomes WHERE x > 10.
  • Join reorder (CBO): pick cheapest join order from statistics.
  • Subquery unnesting: IN (SELECT ...) becomes semi-join.
  • Column pruning through joins.
  • Decimal precision optimization.

Physical planning picks among BroadcastHashJoin, SortMergeJoin, etc. using cost. Catalyst rules are user-extensible — Delta Lake and Iceberg build on this. Result: naive SQL often runs at hand-tuned speed.

Q3. Why does Tungsten use binary format instead of JVM objects?

A JVM Integer takes ~24 bytes for 4 bytes of data; a 5-char String takes ~72 bytes. At billion-row scale, this 10x overhead becomes catastrophic memory bloat and GC pressure.

Tungsten stores rows as contiguous byte arrays with inline fixed-size fields, pointer-offset variable-size fields, and a null bitmap. UnsafeRow accesses fields via sun.misc.Unsafe pointer arithmetic.

Benefits:

  • Memory: ~6x less.
  • GC pressure: removed (off-heap or fewer, larger arrays).
  • Cache friendliness: contiguous layout fits cache lines.
  • Serialization: UnsafeRow ships over the wire as-is.
  • SIMD-friendly layout.

Costs: low-level complexity, harder debugging, expensive variable-size mutations, Python-JVM conversion cost (now mitigated by Arrow).

Spark 3.x added columnar vectorized execution (Parquet/ORC readers, aggregation in batches of 4096). Tungsten and Arrow are complementary: Tungsten for row-wise internal processing, Arrow for cross-language columnar exchange.

Q4. Why is Whole-Stage CodeGen faster than Volcano?

Volcano's iterator model (from the 1970s) elegantly composes operators via next(), but modern CPUs hate it:

  1. Virtual function calls prevent inlining.
  2. Per-row object allocations pressure GC.
  3. Branch misprediction across operators.
  4. Cache misses between operator code paths.
  5. Java boxing.

WSCG fuses operators in a stage into a single generated Java function, compiles with Janino, then JVM JIT optimizes. Resulting code has one loop, inlined checks, direct memory access via Tungsten.

Databricks TPC-DS: 4-10x faster than Volcano.

Constraints: JVM's 64KB method limit, complex operators (window, UDF) fall back to Volcano, debugging generated code is hard, compilation overhead for tiny queries.

Comparison:

  • Volcano: baseline.
  • WSCG: 4-10x.
  • Vectorized (ClickHouse, DuckDB): 4-10x.
  • LLVM-native (HyPer/Umbra): 10-20x.

Spark combines vectorized Parquet reading with WSCG for row processing — a pragmatic balance.

Q5. How does AQE use runtime statistics?

Static optimizers commit to a plan before execution; their estimates can be wrong due to stale stats, hard selectivity estimation, or unpredictable join sizes. AQE pauses at shuffle boundaries, inspects the actual shuffled data, and re-optimizes the remaining plan.

Three main optimizations:

  1. Coalesce shuffle partitions: merges tiny partitions (e.g. 200 -> 20).
  2. Switch join strategy: upgrade SortMergeJoin to BroadcastHashJoin when actual size fits.
  3. Skew join: split oversized partitions into sub-partitions processed by separate tasks.

Example: a skewed partition taking 100s turns into 10 sub-partitions of 10s each.

Dynamic Partition Pruning works alongside AQE: extract dim filter ids, push as partition filter into fact table, scan only needed partitions. Multi-TB scans become multi-GB.

Spark 3.0 introduced AQE (opt-in); 3.2 enabled by default. TPC-DS average ~1.8x, some queries 3x+.

Limits: less applicable to streaming, overhead on tiny queries, complex queries may still need manual tuning. Philosophy: optimization is JIT, not AOT.


Closing: The Data Engine Under the Hood

Summary

  1. RDD: in-memory distributed collection; lineage-based recovery.
  2. DataFrame: structured API + schema + optimizer.
  3. Catalyst: rule + cost-based SQL optimization.
  4. Tungsten: off-heap binary format, cache-friendly.
  5. WSCG: runtime Java code generation, removes Volcano overhead.
  6. Shuffle: the core of distribution and the main bottleneck.
  7. AQE: runtime re-optimization, 30-200% speedup.

Practical Recommendations

  • Understand defaults: spark.executor.memory, spark.sql.shuffle.partitions, AQE.
  • Use broadcast join for small dimension tables.
  • Partition columns that match query patterns.
  • Read EXPLAIN for slow queries.
  • Monitor via Spark UI (task distribution, GC time).
  • Prefer columnar formats (Parquet/ORC).

Final Lesson

Spark's success = technical excellence + timing + ecosystem. Next time you write .filter().groupBy().count(), remember: Catalyst rewrites it, Tungsten lays out memory, WSCG generates Java, AQE re-plans at runtime, and shuffle redistributes — all in milliseconds to seconds.


References

현재 단락 (1/288)

In 2010, Matei Zaharia published "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for I...

작성 글자: 0원문 글자: 16,306작성 단락: 0/288