Split View: Apache Spark 내부 완전 가이드 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 심층 분석
Apache Spark 내부 완전 가이드 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 심층 분석
들어가며: Spark는 어떻게 MapReduce를 쫓아냈는가
2010년, UC Berkeley에서
2010년 UC Berkeley의 AMPLab에서 Matei Zaharia가 논문을 발표했다: "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing".
당시 빅데이터의 왕은 Hadoop MapReduce였다. Spark는 두 가지 주장을 했다:
- 메모리 기반으로 100배 빠르다.
- API가 훨씬 단순하다.
처음엔 MapReduce 진영이 회의적이었다. 하지만 Spark는:
- 2013: Apache에 기증.
- 2014: Databricks 창업.
- 2016: Spark 2.0 (DataFrame, Catalyst, Tungsten).
- 2020: Spark 3.0 (Adaptive Query Execution).
- 현재: 빅데이터 처리의 사실상 표준.
Hadoop MapReduce는 사라졌다. Spark는 전 세계 수만 기업의 데이터 파이프라인의 중추가 되었다.
왜 이렇게 빨라졌는가
Spark의 성능 혁명은 세 가지 축에서 나왔다:
- RDD (Resilient Distributed Dataset): In-memory 연산.
- Catalyst Optimizer: 쿼리 최적화.
- Tungsten: CPU와 메모리 최적화.
이 세 가지가 합쳐져 MapReduce 대비 100배, 때로는 1000배 성능을 낸다.
이 글에서 다룰 것
- RDD의 탄생과 lineage.
- DataFrame/Dataset: 구조화된 데이터.
- Catalyst optimizer: 쿼리 재작성.
- Physical plan과 cost-based 최적화.
- Tungsten project: CPU 캐시 친화적 실행.
- Whole-stage code generation: JIT으로 코드 생성.
- Shuffle: 분산 집계의 비밀.
- Adaptive Query Execution: 런타임 최적화.
- 실전 튜닝.
왜 이 지식이 중요한가
- 데이터 엔지니어: Spark 튜닝의 80%는 내부 이해.
- DA/DS: 왜 이 쿼리가 느린지 설명.
- 플랫폼 엔지니어: 클러스터 최적화.
- 다른 시스템 학습: Dask, Ray, Flink가 Spark의 개념을 차용.
1. RDD: 모든 것의 시작
탄생 배경
2009년경 빅데이터 처리는 MapReduce가 지배했다. 문제는:
- 반복 알고리즘이 극도로 느림 (ML, 그래프).
- 각 Map-Reduce job이 디스크를 왕복.
- 인터랙티브 분석이 불가능 (매번 몇 분~시간).
Zaharia의 통찰: "메모리에서 데이터를 유지하면 되지 않을까?"
문제: 메모리는 휘발성. 노드 장애 시 어떻게?
해결: Lineage (계보). 데이터가 아닌 변환 연산의 기록을 유지. 장애 시 재계산.
RDD란
RDD (Resilient Distributed Dataset):
- Resilient: 장애에 견딤 (lineage로).
- Distributed: 클러스터에 분산.
- Dataset: 데이터 집합.
핵심 특성:
- Immutable: 한 번 만들어지면 변경 불가.
- Lazy: Transformation은 action이 불릴 때까지 실행 안 됨.
- Typed: Java/Scala의 타입 시스템 활용.
- Partitioned: 여러 worker에 분산.
Transformation vs Action
Transformation: 새 RDD 생성. Lazy.
val lines = sc.textFile("data.txt") // RDD[String]
val words = lines.flatMap(_.split(" ")) // 아직 실행 안 됨
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _) // 여전히 실행 안 됨
Action: 실제 계산 트리거. Eager.
reduced.collect() // 여기서 전체 체인이 실행됨
왜 Lazy인가
Lazy evaluation의 장점:
- 최적화 기회: 여러 transformation을 한 번에 처리. 예: map → filter → map을 하나의 pass로.
- 불필요한 계산 회피:
take(10)만 필요하면 10개만. - Pipeline: 중간 데이터를 메모리에 쌓지 않음.
Lineage와 Fault Tolerance
val rdd1 = sc.textFile("data.txt") // Dependency: HDFS
val rdd2 = rdd1.map(x => x.toUpperCase) // Dependency: rdd1
val rdd3 = rdd2.filter(x => x.startsWith("A")) // Dependency: rdd2
만약 rdd3를 계산 중에 한 partition이 노드 장애로 손실되면:
- Lineage 확인: rdd3 → rdd2 → rdd1 → file.
- 손실된 partition을 재계산.
- 다른 partition은 그대로.
장점: Replication 없이 내결함성. 디스크 I/O 없음.
단점: Lineage가 길면 재계산이 오래 걸림. Checkpoint로 해결 (중간 저장).
Narrow vs Wide Dependency
RDD 간 의존성은 두 종류:
Narrow dependency:
- 자식 partition이 부모의 소수 partition에 의존.
- 예: map, filter, union.
- 한 stage 내에서 pipeline 실행 가능.
- 재계산 시 영향 범위 작음.
Wide dependency (shuffle dependency):
- 자식 partition이 부모의 여러 partition에 의존.
- 예: groupByKey, reduceByKey, join.
- Shuffle 필요: 데이터 재분산.
- Stage 경계.
Narrow: Wide:
Parent Parent Parent Parent Parent
| | | / | \ |
▼ ▼ ▼ ▼ ▼ ▼ ▼
Child1 Child2 ↓ ↓ ↓ ↓ ↓
Shuffle 발생
↓ ↓ ↓
Child1 Child2 Child3
DAG와 Stage
Spark가 액션을 받으면:
- RDD graph를 DAG로 변환.
- Wide dependency를 경계로 stage 분할.
- 각 stage는 tasks로 분해 (partition 수 만큼).
- Stages를 순차적으로, task를 병렬로 실행.
Action: reduce()
↓
Logical DAG:
RDD1 → map → RDD2 → reduceByKey → RDD3 → collect
Stages:
Stage 1: textFile → map (narrow, pipeline)
[Shuffle]
Stage 2: reduceByKey → collect
RDD의 한계
RDD는 훌륭했지만 몇 가지 약점:
- Schema 없음: 그냥 Java/Scala 객체의 컬렉션.
- Optimizer 없음: 사용자가 직접 최적화.
- 느린 직렬화: Java serialization.
- 메모리 오버헤드: JVM 객체의 overhead.
Spark 2.0은 이를 해결하기 위해 DataFrame/Dataset + Catalyst + Tungsten을 도입했다.
2. DataFrame & Dataset: 구조화 혁명
DataFrame의 등장
Spark 1.3 (2015)에서 DataFrame 도입. SQL의 테이블처럼 구조화된 RDD:
val df = spark.read.json("people.json")
df.show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Bob |
// | 25|Alice|
// +---+----+
df.filter($"age" > 20).select("name").show()
RDD와 차이:
- Schema 있음: 컬럼과 타입 정보.
- SQL처럼 조작: 고수준 API.
- Optimizer 적용: Catalyst가 알아서 최적화.
- Tungsten 메모리 포맷: JVM 객체 아닌 바이너리.
Dataset (Spark 1.6+)
Dataset: DataFrame + 타입 안전성.
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)
장점:
- 컴파일 타임 타입 체크.
- Object API와 DataFrame 최적화 모두.
Scala/Java에만 존재. Python은 DataFrame만 사용 (Python은 정적 타입이 아니므로).
DataFrame = Dataset[Row]
사실 DataFrame은 Dataset[Row]의 alias. Row는 untyped.
// 같은 것
val df: DataFrame = spark.read.json(...)
val df: Dataset[Row] = spark.read.json(...)
성능 차이
단순 예시: 1억 행 필터링.
- RDD: 100초.
- DataFrame/Dataset: 15초.
6~7배 차이. 이유:
- Catalyst 최적화 (predicate pushdown 등).
- Tungsten 바이너리 포맷.
- Whole-stage codegen.
3. Catalyst Optimizer: 쿼리 재작성의 마법
Catalyst의 4단계
Catalyst는 SQL/DataFrame query를 4단계로 처리한다:
SQL or DataFrame
↓
1. Analysis (unresolved plan → resolved logical plan)
↓
2. Logical Optimization (rule-based)
↓
3. Physical Planning (여러 물리 계획 생성 + 비용 비교)
↓
4. Code Generation (Tungsten)
↓
RDD (실행)
1단계: Analysis
입력: Unresolved logical plan.
SQL: SELECT name FROM users WHERE age > 20
↓ parser
UnresolvedRelation("users")
→ UnresolvedAttribute("age")
→ UnresolvedAttribute("name")
Catalog를 참조해서 resolve:
- 테이블 이름 → 실제 데이터 소스.
- 컬럼 이름 → 컬럼 타입.
- 함수 → 실제 구현.
결과: Resolved Logical Plan.
2단계: Logical Optimization
Rule-based 최적화. 수십~수백 개의 rule을 반복 적용:
Constant Folding:
age * 2 + 3 → (미리 계산)
Predicate Pushdown:
SELECT * FROM (table JOIN other) WHERE table.x > 10
↓ pushdown
SELECT * FROM (SELECT * FROM table WHERE x > 10) JOIN other
필터를 조인 전으로 밀어 넣음. 중간 결과 크기 감소.
Projection Pushdown:
SELECT name FROM users
// users에 50개 컬럼 있어도 name만 읽음
Simplification:
WHERE true AND x > 10 → WHERE x > 10
WHERE false → never executes
Subquery Elimination:
SELECT * FROM t WHERE id IN (SELECT id FROM s)
↓
SELECT * FROM t JOIN s ON t.id = s.id
Logical Plan 예시
val df = spark.sql("""
SELECT u.name, COUNT(*) as orders
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
""")
df.explain(true)
Parsed Logical Plan:
'Project ['u.name, 'COUNT(1) AS orders]
+- 'Filter ('u.country = KR)
+- 'Join Inner, ('u.id = 'o.user_id)
:- 'UnresolvedRelation `users`
+- 'UnresolvedRelation `orders`
Analyzed Logical Plan:
Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Filter (country#12 = KR)
+- Join Inner, (id#9 = user_id#15)
:- Relation[id#9,name#10,age#11,country#12] parquet
+- Relation[user_id#15,total#16] parquet
Optimized Logical Plan:
Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Project [name#10]
+- Join Inner, (id#9 = user_id#15)
:- Project [id#9, name#10]
+- Filter (country#12 = KR)
+- Relation[...] parquet
+- Project [user_id#15]
+- Relation[...] parquet
변화:
- Filter가 users 바로 위로 이동.
- Projection이 각 relation 바로 위로.
- 불필요한 컬럼 제거.
3단계: Physical Planning
논리 계획을 실행 가능한 물리 연산으로 변환. Catalyst는 여러 물리 계획을 생성하고 비교.
Join 선택 예시:
- BroadcastHashJoin: 작은 테이블이면.
- SortMergeJoin: 큰 테이블들.
- ShuffledHashJoin: 중간 크기.
- BroadcastNestedLoop: cartesian이면.
Catalyst는 각 옵션의 cost를 추정하고 가장 저렴한 것 선택.
4단계: Code Generation
선택된 물리 계획을 Java 바이트코드로 변환. 다음 섹션에서 자세히.
Cost-Based Optimizer (CBO)
Spark 2.2+부터 통계 기반 최적화:
ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age
이후 Catalyst가:
- Table 크기, 행 수.
- 컬럼별 min/max/null count.
- 컬럼별 distinct count.
이 정보로 join 순서 최적화, selectivity 추정 등.
기본 RBO (rule-based optimizer)와 조합되어 더 나은 계획 생성.
4. Tungsten: CPU와 메모리의 재발명
Tungsten 프로젝트
Spark 1.5 (2015)에 시작. "Spark 성능을 하드웨어 한계까지".
핵심 목표:
- JVM 메모리 오버헤드 제거.
- CPU 캐시 활용.
- Vectorized 실행.
- 코드 생성으로 CPU 파이프라인 최적화.
JVM 객체의 문제
Java 객체의 오버헤드:
Integer i = 42;
int 값은 4바이트지만, Integer 객체는:
- Object header: ~16 바이트.
- Value field: 4 바이트.
- Padding: 4 바이트.
- 총 24 바이트.
6배 오버헤드. 수백만 객체가 있으면 메모리 낭비 심각.
String 더 심함:
String s = "hello";
// Object: 40+ bytes
// char[] 객체: 24 bytes + 2 * 5 = 34 bytes
// 총 60+ bytes
해결: Off-heap Binary Format
Tungsten은 데이터를 JVM 힙 밖의 바이너리 포맷에 저장:
Row: [age=30, name="Alice"]
Binary encoding (off-heap):
[bitmap][age:4bytes][name_offset:4bytes][name_length:4bytes]...[name_data]
- No JVM objects.
- No GC pressure.
- Cache-friendly layout.
- 직접 메모리 접근:
sun.misc.Unsafe.
UnsafeRow
Spark의 기본 row 표현:
public final class UnsafeRow extends MutableRow {
private Object baseObject; // byte[] or null (off-heap)
private long baseOffset;
private int sizeInBytes;
private int numFields;
// ...
}
각 필드 접근:
long getLong(int i) {
return Platform.getLong(baseObject, baseOffset + offsets[i]);
}
Direct memory access. JVM 객체 생성 없음.
Cache Friendliness
Tungsten 포맷은 연속 메모리:
Row 0: [col0][col1][col2]
Row 1: [col0][col1][col2]
Row 2: [col0][col1][col2]
특정 컬럼만 읽으면:
col0: 0, 12, 24, ... (stride: row size)
Spatial locality 나쁨. 그러나 일반 접근 패턴에선 여전히 빠름.
Vectorized reader (Parquet 등)는 columnar 포맷으로 직접 읽음 → 더 캐시 친화적.
메모리 관리
Tungsten은 자체 메모리 매니저:
- Execution memory: Shuffle, join, sort용.
- Storage memory: Cache (persist).
- 두 영역이 동적으로 서로 빌림.
- Off-heap + on-heap 지원.
Spark 설정:
spark.executor.memory = 8g
spark.memory.fraction = 0.6 # execution + storage
spark.memory.storageFraction = 0.5 # storage의 최소 보장
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g
성능 효과
Tungsten 전후 벤치마크:
- 메모리 사용: 30~70% 감소.
- 쿼리 속도: 2~5배 향상.
- GC pause: 크게 감소.
특히 aggregation, join 등 메모리 집약적 작업에서 효과 큼.
5. Whole-Stage Code Generation
전통 Volcano 모델의 문제
전통 SQL 엔진은 Volcano 모델 (iterator-based):
while (child.hasNext()) {
Row row = child.next();
if (predicate(row)) {
process(row);
}
}
각 연산자가 next()를 호출. 함수 호출 오버헤드:
- Virtual function calls: CPU 파이프라인 깨짐.
- Object allocation: row 객체 계속 생성.
- Cache misses: 매번 다른 코드 경로.
단순 쿼리도 연산자 수에 비례하는 오버헤드.
아이디어: 통합 코드 생성
"여러 연산자를 하나의 함수로 병합". 예:
SELECT name FROM users WHERE age > 20
Volcano:
Scan → Filter → Project
세 개의 iterator 객체.
Codegen:
// 런타임에 생성된 코드
while (scanner.hasNext()) {
Row row = scanner.next();
if (row.getInt("age") > 20) {
emit(row.getString("name"));
}
}
한 개의 루프. 함수 호출 없음. 컴파일러가 최적화.
Whole-Stage CodeGen
Spark 2.0 (2016)의 Whole-Stage Code Generation (WSCG):
- Stage 단위로 합병: Shuffle 경계까지 모두.
- Java 소스 코드 생성: 런타임에.
- Janino로 즉시 컴파일.
- 직접 실행: JVM이 JIT 최적화.
결과:
- 함수 호출 감소 (virtual call → inlined).
- 중간 객체 제거.
- CPU pipeline 활용.
- 2~10배 성능 향상.
예시: 생성된 코드
SELECT id, name FROM users WHERE age > 20 AND country = 'KR'
생성된 Java 코드 (간소화):
public class GeneratedIterator {
private scan_input_0; // Parquet reader
public boolean hasNext() { return scan_input_0.hasNext(); }
public UnsafeRow next() {
while (scan_input_0.hasNext()) {
UnsafeRow row = scan_input_0.next();
int age = row.getInt(2);
if (age > 20) {
UTF8String country = row.getUTF8String(3);
if (country.equals("KR")) {
// project id, name
UnsafeRow output = /* build row */;
return output;
}
}
}
return null;
}
}
모든 것이 인라인. 루프 하나. JIT이 바이트코드를 네이티브로 최적화.
Vectorized Execution과의 차이
Whole-stage codegen은 row-at-a-time 기반이다. 반면 vectorized execution은 batch-at-a-time:
while (batch = next_batch()) { // 1024 rows
for (row : batch) {
// process
}
}
ClickHouse, Snowflake, DuckDB가 vectorized.
Spark의 접근은 둘 다 활용:
- Columnar scan: Parquet, ORC를 vectorized로 읽음.
- Row-based processing: Codegen으로 각 row 처리.
- Spark 3.2+: 일부 연산자에 vectorized 지원 추가.
EXPLAIN으로 확인
df.explain("codegen")
출력:
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
...
실제 생성된 Java 코드를 볼 수 있다. 디버깅에 유용.
6. Shuffle: 분산 집계의 핵심
왜 Shuffle이 필요한가
Wide dependency (groupBy, join 등)는 데이터 재분배가 필요하다:
Stage 1 (4 partitions):
P0: [(a,1), (b,2), (c,3)]
P1: [(a,4), (d,5)]
P2: [(b,6), (c,7), (e,8)]
P3: [(d,9), (a,10)]
reduceByKey:
모든 'a'는 같은 partition으로
모든 'b'는 같은 partition으로
...
Stage 2 (4 partitions after shuffle):
P0: [(a, [1,4,10])] → [(a,15)]
P1: [(b, [2,6])] → [(b,8)]
P2: [(c, [3,7])] → [(c,10)]
P3: [(d, [5,9]), (e, [8])] → [(d,14),(e,8)]
Shuffle은 stage 사이의 데이터 이동. 네트워크와 디스크 I/O 집약적.
Shuffle 과정
Write 단계 (Map 측):
- 각 task가 로컬 파일에 결과 쓰기.
- 각 reduce task에 해당하는 partition으로 분류.
- 디스크에 저장 (나중에 reduce가 fetch).
Read 단계 (Reduce 측):
- 모든 map task로부터 자기 partition 데이터 fetch.
- 네트워크로 전송 (executor 간).
- 메모리에 모음.
- 처리 (sort, aggregate 등).
Shuffle의 비용
Shuffle은 Spark의 가장 비싼 연산:
- 디스크 I/O: write + read.
- 네트워크: 모든 executor 간 cross.
- Serialization: 객체 ↔ 바이트.
- Sort: 보통 필요.
규모:
- 작은 job: 수 MB.
- 큰 job: 수 TB.
1 TB shuffle = 많은 시간과 자원.
Shuffle 전략의 진화
1. Hash Shuffle (초기 Spark):
- 각 map task가 각 reduce partition별로 별도 파일.
- M × R 파일 (M=map tasks, R=reduce tasks).
- 100 × 200 = 20,000 파일.
- 파일 시스템 부담.
2. Sort Shuffle (1.1+, 기본):
- 각 map task가 하나의 파일에 정렬된 결과.
- 인덱스 파일로 partition 위치 표시.
- M 파일만.
- 훨씬 효율적.
3. Tungsten Sort Shuffle (1.5+):
- Tungsten의 binary format 활용.
- 캐시 친화적.
- 더 빠름.
Broadcast Join
작은 테이블을 조인할 때의 최적화:
val small = ... // 10 MB
val big = ... // 1 TB
val joined = big.join(broadcast(small), "key")
Broadcast:
- Driver가 small 테이블을 모든 executor에 복사.
- 각 executor가 big의 자기 partition과 로컬에서 조인.
- Shuffle 없음.
조건: 작은 테이블이 메모리에 들어가야. 기본 spark.sql.autoBroadcastJoinThreshold = 10 MB.
Shuffle join 대비: 수 배~수십 배 빠름.
Shuffle Partitioning
spark.sql.shuffle.partitions: shuffle 후 partition 수. 기본 200.
너무 작음:
- 큰 partition.
- 일부 task의 OOM.
- 병렬성 부족.
너무 큼:
- 많은 작은 파일.
- 스케줄링 오버헤드.
가이드: 각 partition이 100~200 MB가 되도록.
Shuffle 최적화 기법
1. 파일 수 줄이기:
df.coalesce(10).write.parquet(...)
작은 partition들을 병합.
2. Partitioning 조정:
df.repartition(50, $"key") // key 기반 재분할
3. Broadcast 강제:
val hint_df = big.join(broadcast(small), ...)
// 또는 SQL hint
// SELECT /*+ BROADCAST(small) */ ...
4. Skew 처리: 다음 섹션.
7. Adaptive Query Execution (AQE)
Spark 3.0의 킬러 기능
Adaptive Query Execution (AQE): 런타임에 수집한 통계를 바탕으로 쿼리 계획을 동적으로 재최적화.
spark.sql.adaptive.enabled = true # 3.2+부터 기본 true
주요 기능
1. Dynamically coalescing shuffle partitions:
전통적 문제:
spark.sql.shuffle.partitions = 200을 정해둠.- 작은 shuffle은 200 partition이 과도.
- 큰 shuffle은 부족.
AQE:
- Shuffle 후 실제 데이터 크기 확인.
- 작은 partition을 자동 병합.
- 예: 200 → 20 (각 partition이 충분한 크기).
2. Dynamically switching join strategies:
계획 시:
SortMergeJoin결정 (두 테이블 크기 추정).
실행 중:
- 실제 한 테이블이 예상보다 작은 것 판명.
- 동적으로 BroadcastHashJoin으로 전환.
3. Dynamically optimizing skew joins:
Skew 감지:
- 특정 key의 partition이 평균보다 훨씬 큼.
AQE:
- 큰 partition을 여러 sub-partition으로 분할.
- 각각 별도 task로 실행.
- 결과를 union.
실측 효과
Databricks 벤치마크 (TPC-DS):
- AQE 전: 100%.
- AQE 후: 130% 이상.
- 일부 쿼리: 3배+ 향상.
거의 공짜로 얻는 성능. AQE는 Spark 3.0의 가장 중요한 개선 중 하나.
8. Dynamic Partition Pruning
문제
Star schema의 fact와 dimension join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
전통:
fact_sales전체 스캔 (테라바이트 급).dim_date필터 (year=2024).- Join.
fact_sales는 date_id로 partitioned되어 있어도 전체 스캔. Filter가 dim에만 있어서.
AQE + DPP
Spark 3.0의 Dynamic Partition Pruning:
dim_date에서 year=2024인 id 먼저 수집.- 그 id들을 fact_sales의 partition filter로 pushdown.
fact_sales스캔 시 해당 partition만 로드.
효과:
- 수 TB → 수 GB.
- 10~100배 빠른 쿼리.
조건
- 파티션된 테이블.
- Dimension 필터가 highly selective.
- Spark 3.0+.
9. 실전 튜닝
메모리 설정
--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10
원칙:
- Executor당 cores 4~6: GC, I/O 균형.
- Memory overhead = max(384MB, 10% of memory).
- Instances: 클러스터 용량과 workload에 따라.
Shuffle 튜닝
--conf spark.sql.shuffle.partitions=200 # 기본, 조정
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
Partition 수 계산:
- 총 데이터 / 목표 크기 (200 MB).
- 예: 1 TB → 5000 partitions.
Broadcast 임계값
--conf spark.sql.autoBroadcastJoinThreshold=100MB
기본 10 MB는 너무 작을 수 있음. 100 MB까지 올리면 더 많은 broadcast join.
주의: Executor 메모리 확인. Broadcast 테이블이 모든 executor에 복제되므로.
Cache 전략
df.cache() // MEMORY_AND_DISK (기본)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) // 직렬화 (작음)
언제 cache:
- 여러 번 사용되는 DataFrame.
- 계산 비용 큰 DataFrame.
주의: 메모리 부족하면 disk로 spill. 재계산이 더 나을 수 있음.
파티셔닝
Partitioned write:
df.write.partitionBy("year", "month").parquet(...)
파일을 year/month별 디렉토리로 저장. 쿼리 시 partition pruning으로 빠른 읽기.
주의: 너무 많은 partition은 작은 파일 문제.
Skew 수동 처리
AQE가 부족하면 수동:
// Salting
val salted = df.withColumn("salted_key",
concat($"key", lit("_"), (rand() * 10).cast("int")))
// Join 후 다시 그룹화
10. 성능 디버깅
Spark UI
Spark Web UI가 가장 중요한 도구:
- Jobs: Action별 실행.
- Stages: Stage별 task 분포.
- SQL/DataFrame: 논리/물리 계획.
- Executors: 리소스 사용.
- Storage: Cache 상태.
주의할 Metric
Task duration 분포:
- Median과 Max 차이가 크면 skew.
- 99th percentile이 나쁘면 병목.
Shuffle read/write:
- 크기가 예상보다 크면 filter pushdown 실패.
- 작으면 효율적.
GC time:
- Task 시간의 10% 이상이면 GC 문제.
- Memory 증설 또는 parallelism 증가.
Explain 명령어
df.explain() // 물리 계획
df.explain(true) // 4단계 모두
df.explain("cost") // CBO 비용
df.explain("codegen") // 생성된 코드
df.explain("formatted") // 보기 쉬운 포맷
Spark SQL Shell
빠른 테스트:
spark-sql
> SELECT ...
대화형 쿼리와 EXPLAIN.
11. 흔한 함정과 교훈
함정 1: 너무 많은 작은 파일
증상: 작은 작업이 수 분 걸림.
원인: 수백만 개 작은 파일.
해결: coalesce 또는 repartition으로 병합.
함정 2: Skew
증상: 99% task는 빠르고 1%가 영원히.
원인: 한 key가 압도적으로 많음 (예: user_id = null).
해결:
- AQE의 skew join 활성화.
- Null key 필터.
- Salting.
함정 3: Broadcast OOM
증상: Executor OOM 발생.
원인: Broadcast 테이블이 너무 큼.
해결:
spark.sql.autoBroadcastJoinThreshold낮춤.- 명시적
broadcast()제거. - Sort-merge join으로.
함정 4: 너무 많은 Shuffle Partition
증상: 작업이 느리고 많은 작은 task.
원인: spark.sql.shuffle.partitions 너무 큼.
해결: AQE coalesce 활성화, 또는 수동 조정.
함정 5: Python UDF의 오버헤드
증상: PySpark 작업이 극도로 느림.
원인: Python UDF는 JVM ↔ Python 직렬화 왕복.
해결:
- SQL 내장 함수 사용.
- Pandas UDF (vectorized, Apache Arrow 사용).
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
return s * 2
일반 UDF보다 10배 이상 빠름.
함정 6: 잘못된 Partition 수
증상: Under-utilization 또는 OOM.
원인:
- 너무 적음: 큰 task, OOM.
- 너무 많음: 오버헤드.
해결: 각 partition 100~200 MB 원칙.
퀴즈로 복습하기
Q1. RDD의 "lineage"가 어떻게 장애 복구를 가능하게 하는가?
A.
Lineage의 의미: RDD는 데이터 자체가 아닌 **"어떻게 만들어졌는지의 기록"**을 유지한다.
val rdd1 = sc.textFile("hdfs://data.txt") // source
val rdd2 = rdd1.map(_.toUpperCase) // parent: rdd1
val rdd3 = rdd2.filter(_.startsWith("A")) // parent: rdd2
val rdd4 = rdd3.flatMap(_.split(",")) // parent: rdd3
각 RDD는 부모 RDD와 변환 함수를 기억한다. 이 관계를 이어가면 원천 데이터 소스부터 현재까지의 전체 계보가 된다.
장애 복구 원리:
시나리오: rdd4.collect() 실행 중 executor 하나가 죽음. 그 executor가 처리하던 rdd4의 partition 3이 사라짐.
복구 과정:
- 손실 감지: Driver가 task 실패 감지.
- Lineage 추적: rdd4 partition 3 → rdd3 partition 3 → rdd2 partition 3 → rdd1 partition 3.
- 재계산: 다른 executor에서 원천부터 다시 계산.
- HDFS에서 원본 텍스트 partition 3 읽기.
- map 적용.
- filter 적용.
- flatMap 적용.
- 완료: 손실된 partition 복구.
Key: Partition 단위 재계산. 전체 RDD가 아닌 손실된 partition만. 다른 partition들은 영향 없음.
Replication과의 비교:
Traditional fault tolerance (replication):
- 모든 데이터를 여러 노드에 복제.
- 네트워크 I/O 많음 (복제 시).
- 디스크 사용 2~3배.
- 장애 시 즉시 다른 복제본 사용.
RDD lineage:
- Replication 없음. 데이터 한 번만.
- 네트워크/디스크 절약.
- 장애 시 재계산 (시간 걸림).
- 메모리 기반 연산과 궁합 좋음.
왜 Spark의 선택은 lineage인가:
Spark는 반복 연산(ML, 그래프)을 타겟으로 했다. 이런 워크로드는:
- 데이터가 메모리에 있어 빠름.
- Lineage 재계산도 (메모리니까) 빠름.
- Replication 비용이 반복적 쓰기로 누적.
Lineage의 한계:
1. Lineage가 길면 재계산이 오래:
rdd1 → rdd2 → rdd3 → ... → rdd100 → action
rdd99 partition 손실 시, rdd1부터 재계산. 수 분~시간 가능.
해결: Checkpoint:
rdd50.checkpoint()
- rdd50을 디스크에 저장.
- Lineage를 이 지점에서 "잘라냄".
- 이후 복구는 rdd50부터.
2. Wide dependency의 문제:
rdd1 (4 partitions) --shuffle--> rdd2 (4 partitions)
rdd2 partition 3 손실 → rdd1의 모든 partition 필요 (shuffle이라 재분배). 비용이 큼.
해결: Shuffle 결과를 영구 저장. Spark가 기본으로 함 (shuffle files가 로컬 디스크에 남음).
3. 외부 시스템 부작용:
rdd.foreach(x => saveToDatabase(x))
재계산 시 중복 저장 위험. Lineage는 순수 함수 가정.
해결: Idempotent 연산, 또는 정확히 한 번 semantic을 별도 구현.
교훈:
RDD lineage는 **"replication 없이 내결함성"**을 가능하게 했다. 이것이 Spark가 MapReduce 대비 100배 빠를 수 있었던 기반이다. 메모리 연산 + 경량 복구 = 빅데이터의 새 패러다임.
현대 적용:
- RDD lineage → DataFrame, Dataset 아래에서도 유지.
- Checkpoint → 장기 실행 job에서 중요.
- Structured Streaming에서도 비슷한 아이디어 (log-based recovery).
분산 시스템 철학적 관점:
Lineage는 "상태를 저장하지 말고, 만드는 방법을 저장하라" 는 접근. 이는 함수형 프로그래밍의 철학과 일치한다:
- Immutable: RDD는 변하지 않음.
- Referential transparency: 같은 입력 → 같은 출력 (재계산 가능).
- Composition: RDD들의 체인.
Spark는 분산 컴퓨팅에 함수형 원칙을 적용해서 새 영역을 열었다. 그 단순하고 우아한 아이디어가 10년 넘게 이 분야를 이끌고 있다.
Q2. Catalyst Optimizer가 어떻게 쿼리를 "rewrite"하여 성능을 개선하는가?
A.
Catalyst의 핵심 아이디어: SQL/DataFrame 쿼리를 tree 로 표현하고, 수십~수백 개의 rewrite rule을 반복 적용해서 더 효율적인 tree로 변환한다.
4단계 처리 과정:
1. Parsing → Unresolved Logical Plan
SELECT name FROM users WHERE age > 20
↓
Project [name]
Filter (age > 20)
UnresolvedRelation("users")
2. Analysis → Resolved Logical Plan
Catalog 참조로:
users→ 실제 테이블 스키마.name,age→ 컬럼 참조.- 타입 검증.
Project [name#10]
Filter (age#11 > 20)
Relation[name#10, age#11, email#12] parquet
3. Logical Optimization → Optimized Logical Plan
이 단계가 진짜 마법. 많은 rule이 반복 적용된다:
Rule 1: Projection Pushdown
Project [name#10]
Filter (age#11 > 20)
Relation[name#10, age#11, email#12] ← email 불필요
↓
Project [name#10]
Filter (age#11 > 20)
Project [name#10, age#11] ← 필요한 컬럼만
Relation[name, age, email]
이점: Parquet reader가 name, age 컬럼만 읽음. email 스킵. I/O 절약.
Rule 2: Filter Pushdown to Data Source
Parquet/ORC는 predicate pushdown을 지원한다:
Filter (age > 20)
Relation(parquet)
↓
Relation(parquet) WITH FILTER (age > 20)
Parquet reader가 min/max 통계를 보고 row group 전체를 스킵. 수십 배 빠른 읽기.
Rule 3: Constant Folding
WHERE age > 18 + 2
↓
WHERE age > 20
매 행마다 18+2를 계산하지 않음.
Rule 4: Simplification
WHERE true AND x > 10
↓
WHERE x > 10
WHERE x > 10 OR false
↓
WHERE x > 10
Rule 5: Join Reorder
SELECT * FROM a JOIN b ON a.id = b.id JOIN c ON b.cid = c.id
WHERE a.country = 'KR'
CBO (Cost-Based Optimizer)가 통계를 보고 최적 순서 결정:
a를 먼저 필터 (country='KR').- 작아진
a와b조인. - 결과와
c조인.
Catalyst는 지수 개의 조인 순서를 탐색하고 가장 싼 것 선택.
Rule 6: Subquery Unnesting
SELECT * FROM orders
WHERE user_id IN (SELECT id FROM users WHERE country = 'KR')
↓
SELECT o.* FROM orders o
INNER JOIN (SELECT id FROM users WHERE country = 'KR') u
ON o.user_id = u.id
Semi-join으로 변환. 훨씬 빠름.
Rule 7: Column Pruning Through Join
SELECT a.name FROM a JOIN b ON a.id = b.aid
b에서는 aid만 필요 (조인 키). 다른 컬럼 스킵.
Rule 8: Decimal 연산 최적화
SELECT SUM(price * quantity) FROM sales
Decimal 곱셈은 비쌈. Catalyst는 overflow를 고려해서 필요한 정밀도만 사용.
4. Physical Planning
논리 계획 → 실행 연산자:
Project → org.apache.spark.sql.execution.ProjectExec
Filter → FilterExec
Join → SortMergeJoinExec (또는 BroadcastHashJoinExec 등)
Aggregate → HashAggregateExec
여러 물리 계획을 생성하고 cost를 비교해서 선택.
예: Join 전략 선택
SELECT * FROM big JOIN small ON big.id = small.id
Catalyst는 다음을 고려:
small크기 <spark.sql.autoBroadcastJoinThreshold(기본 10MB)?- Yes: BroadcastHashJoin 선택 (small을 모든 executor에 복사, shuffle 없음).
- No: SortMergeJoin 선택 (양쪽 shuffle).
Broadcast join은 수 배~수십 배 빠르다.
실전 효과:
간단한 쿼리:
SELECT u.name, SUM(o.amount)
FROM users u JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
최적화 전 (naive 실행):
- users 전체 스캔.
- orders 전체 스캔.
- 전체 join.
- filter (country='KR').
- group by.
1 TB orders 스캔. 수십 분.
최적화 후 (Catalyst):
- users에 filter pushdown (country='KR' → 10% 데이터만).
- Projection pushdown (id, name만 읽음).
- orders도 필요 컬럼만 읽음 (user_id, amount).
- Broadcast join (users가 충분히 작아서).
- group by.
I/O 90% 절약. 10배 빠른 쿼리.
Catalyst의 확장성:
Catalyst의 rule은 사용자 정의 가능:
object MyRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case Filter(p, child) => /* custom logic */
}
}
spark.experimental.extraOptimizations = Seq(MyRule)
이 확장성이 Databricks의 Delta Lake, Iceberg 같은 프로젝트의 기반.
교훈:
Catalyst는 **"규칙 기반 트리 변환"**이라는 단순한 아이디어로 거대한 성과를 냈다. 각 rule은 단순하지만 조합되면 놀라운 최적화.
전통 DB (PostgreSQL, MySQL)도 쿼리 optimizer가 있지만 Catalyst의 강점:
- Scala tree API로 쉬운 rule 작성.
- Extensible: 외부 rule 추가.
- DataFrame API와 통합: SQL이 아닌 코드도 최적화.
결과: 기본 SQL을 작성해도 손으로 최적화한 것과 비슷한 성능. 이것이 Spark가 데이터 엔지니어에게 사랑받는 이유다.
Q3. Tungsten이 JVM 객체 대신 binary format을 쓰는 이유와 그 효과는?
A.
JVM 객체의 숨은 비용:
평범한 Java 코드:
Integer age = 30;
String name = "Alice";
메모리 사용을 보면:
Integer: 16 bytes header + 4 bytes value + 4 bytes padding = 24 bytes (4 bytes 데이터에)String "Alice":- String 객체: ~40 bytes
- 내부
char[]: 16 bytes header + 10 bytes (5 chars) + padding = ~32 bytes - 총: ~72 bytes (5 chars 데이터에)
실제 데이터 (int 4 + 5 chars = 9 bytes)가 96 bytes를 차지. 10배 오버헤드.
빅데이터에선 이것이 재앙:
1억 row에 5개 필드가 있다면:
- 실제 데이터: ~5 GB.
- JVM 객체로: ~50 GB.
메모리 부족. GC pressure. 캐시 miss.
Tungsten의 해결:
Row를 연속된 바이트 배열에 저장:
Row(age=30, name="Alice", email="a@b.c")
Tungsten format:
[null_bitmap:8B][age:8B][name_ptr:8B][email_ptr:8B][name:"Alice"][email:"a@b.c"]
- 고정 크기 필드 (int, long, double): 인라인.
- 가변 크기 필드 (string, array): pointer + 뒷부분에 실제 데이터.
- Null bitmap: 각 필드의 null 여부.
- Header 없음: 순수 데이터만.
결과:
- 5개 필드 row: ~40 bytes (JVM 객체 ~200 bytes에서).
- 5배 메모리 절약.
UnsafeRow:
Spark의 구현 클래스:
public class UnsafeRow extends BaseMutableRow {
private Object baseObject; // byte[] 또는 null (off-heap)
private long baseOffset; // 시작 위치
private int numFields;
private int sizeInBytes;
}
직접 메모리 접근 (sun.misc.Unsafe):
public long getLong(int ordinal) {
assertIndexIsValid(ordinal);
return Platform.getLong(baseObject, baseOffset + ordinal * 8L);
}
JVM을 거치지 않고 포인터 산술로 바로 메모리 접근. Unsafe는 이름대로 안전장치를 건너뛴다. JVM이 검증 안 함.
효과 1: 메모리 절약
1억 row, 10 필드 DataFrame:
- JVM object: ~30 GB.
- Tungsten: ~5 GB.
6배 적은 메모리. 같은 클러스터에서 6배 큰 데이터 처리 가능.
효과 2: GC pressure 제거
JVM GC는 많은 작은 객체에서 느려진다. Young generation을 계속 순회해야.
Tungsten은 적은 수의 큰 배열만 사용:
- Off-heap: GC 대상 아예 아님.
- On-heap: GC 하더라도 객체 수가 적음.
결과: Full GC 빈도 대폭 감소. Pause time 대폭 감소.
효과 3: Cache friendliness
CPU cache는 연속된 메모리를 선호 (spatial locality). 각 cache line은 64 bytes.
JVM 객체:
Integer 1 @ address 0x1000
Integer 2 @ address 0x8000 ← 멀리 떨어짐 (heap 할당 랜덤)
Integer 3 @ address 0x4000
하나 읽을 때마다 cache miss 가능성. 수십 ~ 수백 cycles 낭비.
Tungsten:
[field1][field2][field3][field4]... 연속
한 번의 메모리 접근이 여러 필드를 cache에 로드. 다음 접근은 hit.
성능: 2~5배 향상 (메모리 집약적 워크로드).
효과 4: Direct serialization
UnsafeRow는 이미 byte array. 네트워크로 전송 시:
- Java 객체: Kryo/Java serialization 필요 (느림).
- UnsafeRow: 그대로 전송 (zero serialization).
Shuffle이 훨씬 빠름.
효과 5: Vectorized 처리의 기반
연속 메모리는 SIMD (벡터화 명령어) 친화적:
// 일반
for (int i = 0; i < n; i++) {
result[i] = a[i] + b[i];
}
// SIMD (4개 동시 처리)
for (int i = 0; i < n; i += 4) {
_mm_store_ps(result + i, _mm_add_ps(_mm_load_ps(a + i), _mm_load_ps(b + i)));
}
Tungsten의 연속 메모리가 SIMD의 기반. JVM은 자체적으로 SIMD를 잘 활용하지 못하지만, 데이터 레이아웃이 친화적이면 JIT이 인식.
비용과 제약:
복잡도: Tungsten은 매우 낮은 레벨이다. 버그가 있으면 JVM crash. Spark 팀이 조심스럽게 개발.
디버깅 어려움: JVM heap dump, profiler가 덜 유용.
특정 연산의 오버헤드: 가변 크기 필드(string) 수정이 복잡.
Python 호환: PySpark에서 UnsafeRow를 Python 객체로 변환할 때 비용.
Arrow와의 협력:
최근 Spark는 Apache Arrow와 통합:
- Arrow: 메모리 columnar 포맷.
- Python ↔ JVM 교환 시 Arrow 사용.
- Pandas UDF가 Arrow 덕분에 10배 빠름.
Tungsten과 Arrow는 서로 보완. Tungsten은 row-wise 처리, Arrow는 column-wise 교환.
Spark 3.x의 Vectorized Execution:
Spark 3.0+는 일부 연산자에 columnar 처리 도입:
- Parquet/ORC vectorized reader.
- Columnar aggregation.
- 4096 rows batch 단위.
Tungsten row format에서 columnar batch로 진화. DuckDB, ClickHouse의 방식을 따라감.
교훈:
Tungsten은 Spark의 **"underlying 재발명"**이다. 표면 API는 그대로지만 내부가 완전히 바뀌었다. 사용자는 변경 없이 2~10배 성능 향상.
이는 **"하드웨어를 존중하는 설계"**의 중요성을 보여준다:
- CPU cache가 싫어하는 것 (random access) 피하기.
- GC의 약점 (작은 객체) 회피.
- Memory bandwidth 아끼기.
JVM이 "추상화"를 제공하지만, 고성능이 필요하면 그 추상화를 영리하게 우회해야 한다. Tungsten은 "Java/Scala 속에서 C 수준 성능을 낸다"는 도전의 답이다.
모든 빅데이터 엔진이 결국 같은 방향으로 진화: 바이너리 포맷, 연속 메모리, 벡터화. Tungsten은 Spark의 해답이었고, 같은 아이디어가 Flink, Presto, Snowflake에서도 발견된다. 수렴 진화의 예다.
Q4. Whole-Stage Code Generation이 어떻게 기존 Volcano 모델보다 빠른가?
A.
전통 Volcano 모델:
1970년대부터 쓰인 query execution model. 각 연산자는 iterator:
interface QueryOperator {
Row next();
boolean hasNext();
}
예시: SELECT name FROM users WHERE age > 20
Project ← Filter ← Scan
실행:
Project.next() {
while (true) {
Row row = Filter.next();
if (row == null) return null;
return new Row(row.getString("name"));
}
}
Filter.next() {
while (true) {
Row row = Scan.next();
if (row == null) return null;
if (row.getInt("age") > 20) return row;
}
}
Scan.next() {
return disk.readNextRow();
}
각 연산자가 독립 객체. next()를 계속 호출.
Volcano의 비용:
-
Virtual function calls:
next()는 인터페이스 메소드. JVM은 virtual dispatch 사용 → inline 어려움. -
Object allocation: 매 row마다 새 Row 객체 생성 가능성. GC 부담.
-
Branch prediction 실패: 연산자마다 다른 코드 경로 실행. CPU pipeline 예측 실패.
-
Cache miss: 매 연산자의 코드가 다른 메모리 위치.
-
Boxing: Java의 경우
Integer같은 wrapper 사용 → heap 할당.
결과: 단순 쿼리도 Volcano 오버헤드만으로 2-5배 느림.
Whole-Stage Code Generation의 답:
아이디어: "여러 연산자를 하나의 함수로 병합". 런타임에 Java 코드를 생성하고 컴파일.
예시: 같은 쿼리
Volcano (의사 코드):
for each row from Scan:
call Filter.next(row)
call Project.next(filter_output)
output project_output
3개 iterator, 수많은 함수 호출.
WSCG가 생성하는 코드:
public class GeneratedIterator {
public Object[] next() {
while (scanner.hasNext()) {
InternalRow row = scanner.next();
int age = row.getInt(2); // age 컬럼
if (age > 20) { // filter inlined
UTF8String name = row.getUTF8String(1); // name 컬럼
return new Object[]{ name }; // project inlined
}
// 조건 불만족, 다음 row로
}
return null; // 끝
}
}
핵심 차이:
-
단일 함수: Scan, Filter, Project가 한 함수의 내부. 함수 호출 없음.
-
Inline: 컴파일러(JVM JIT)가 모든 것을 인라인. Virtual call 제거.
-
직접 메모리 접근:
row.getInt(2)는 Tungsten의Unsafe.getInt(address). -
단순한 loop: 하나의 while 루프. CPU pipeline 친화적.
성능 향상:
Databricks 벤치마크 (TPC-DS):
- Volcano: 100%.
- WSCG: 400% ~ 1000%.
4-10배 향상. 마법이 아니라 오버헤드 제거.
어떻게 작동하는가:
-
Spark의 최적화 단계:
- Logical plan → Physical plan.
- 여러 연산자를 stage로 그룹화 (narrow dependency 내).
- 각 stage가 WSCG 대상.
-
Code generation:
- Physical plan 트리를 순회.
- 각 연산자가 자기 코드 조각 제공.
- 합쳐서 Java source code 생성.
-
Compilation:
- Janino (가벼운 Java compiler)로 컴파일.
- 바이트코드 생성.
ClassLoader에 로드.
-
Execution:
- 컴파일된 클래스의 인스턴스 생성.
- 일반 Java 메소드처럼 호출.
- JVM JIT가 최적화 (hot path → native code).
실전 예시:
spark.range(1, 1000000)
.filter($"id" > 500000)
.select($"id" * 2)
.count()
EXPLAIN 결과:
*(1) Filter (id#0L > 500000)
+- *(1) Range (1, 1000000, step=1, splits=...)
*(1)이 WSCG stage 번호. Range, Filter, Project가 하나의 코드로 통합.
explain("codegen")으로 실제 생성된 코드 확인 가능:
public Object generate(Object[] references) {
return new GeneratedIteratorForCodegenStage1(references);
}
final class GeneratedIteratorForCodegenStage1 extends BufferedRowIterator {
// ... 수백 줄의 생성된 코드
}
제약:
-
64KB 메소드 한계: JVM의 단일 메소드가 64KB 초과 불가. 너무 많은 연산자 병합 시 분할.
-
복잡한 연산자: Window function, UDF 등은 codegen 어려움. Fall back to Volcano.
-
디버깅 어려움: 생성된 코드를 디버거로 추적 힘듦.
-
컴파일 오버헤드: 매 쿼리마다 컴파일. 작은 쿼리엔 역효과 가능.
WSCG vs Vectorized:
Spark의 WSCG는 row-at-a-time + inline. 또 다른 접근:
Vectorized Execution (ClickHouse, DuckDB, Presto 일부):
- Batch 단위 (1024 rows).
- SIMD 명령 활용.
- Columnar 처리.
장점: SIMD의 힘. 단점: 구현 복잡.
Spark는 두 접근을 조합:
- Parquet reader: vectorized (columnar).
- 연산자: WSCG (row-wise).
- 향후: 더 많은 vectorization 도입 중.
Hyper / Umbra 비교:
독일 TU Munich의 HyPer/Umbra 연구 시스템은 더 급진적:
- LLVM IR로 컴파일.
- 네이티브 코드 (JIT 없음).
- 성능이 C++ 손 작성 코드 수준.
Spark WSCG는 Java 바이트코드까지. JVM JIT에 의존. Umbra만큼은 아니지만 훨씬 간단하고 이식성 있다.
Rough numbers:
- Volcano: baseline.
- WSCG: 4-10x faster.
- Vectorized: 4-10x faster.
- LLVM-based: 10-20x faster.
Spark는 "충분히 빠르면서 구현 가능"의 균형을 찾았다.
교훈:
WSCG는 전통 지혜의 재평가다. Volcano 모델이 40년간 지배한 이유는 우아함과 유연성이지만, 현대 CPU 아키텍처 (deep pipeline, cache, SIMD)에선 적합하지 않다.
해결: "연산자 추상화를 유지하면서도 런타임에 구체화". 이것이 codegen의 본질.
이는 "지연된 optimization" 철학이다:
- 사용자는 high-level 코드 작성.
- 시스템이 물리적 최적화.
- 컴파일러의 전통적 역할을 런타임으로.
JIT 컴파일러가 수십 년간 해온 일이지만, 빅데이터에 적용한 것은 Spark가 처음 크게 성공시켰다. 이후 거의 모든 현대 데이터 엔진이 이 패턴을 따르고 있다.
현대 SQL 엔진의 성능은 하드웨어 성능의 몇 % 를 사용하는가의 경쟁이다. Volcano는 10%, WSCG는 40%, vectorized는 60%, LLVM-based는 80%. 각 진화가 이 수치를 밀어올린다.
Q5. Adaptive Query Execution (AQE)이 실제 실행 통계를 어떻게 활용하는가?
A.
Static optimization의 한계:
전통 쿼리 optimizer (Catalyst 포함)는 실행 전에 최적화를 결정한다. 근거는:
- 통계 (
ANALYZE TABLE). - 데이터 소스 메타데이터.
- Schema.
문제: 통계가 부정확하거나 최신이 아닐 수 있다:
- 데이터가 변경됨 (ANALYZE 이후).
- 통계 계산이 비쌈 (대용량).
- 복잡한 조건의 selectivity 추정 어려움.
- Join 결과 크기 예측 어려움.
결과: 잘못된 계획 선택 → 느린 쿼리.
AQE의 기본 아이디어:
"Stage 경계에서 실제 통계를 수집하고, 남은 계획을 재최적화"
Spark는 이미 shuffle 단계에서 중간 결과가 디스크에 저장된다. 이 데이터의 실제 크기를 측정해서 남은 단계를 최적화할 수 있다.
AQE의 세 가지 최적화:
1. Dynamically Coalescing Shuffle Partitions
문제: spark.sql.shuffle.partitions 기본값 200. 실제 데이터 크기와 무관.
시나리오:
- 작은 table filter 결과: 100 MB.
- 200 partition = 각 500 KB.
- 200개의 매우 작은 task. 오버헤드 비율 높음.
AQE 해결:
- Shuffle 완료 후 실제 partition 크기 측정.
- 작은 partition들을 병합.
- 예: 200 → 20 (각 ~5 MB).
- Task 수 감소, 오버헤드 감소, 성능 향상.
설정:
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB
2. Dynamically Switching Join Strategies
문제: Join 전략 선택 시 table 크기 추정이 부정확.
시나리오:
SELECT * FROM big JOIN medium ON big.id = medium.id
WHERE medium.status = 'active'
계획 시 추정: medium 크기 = 100 MB → SortMergeJoin 선택.
실제 실행: 필터 후 medium.active = 5 MB. Broadcast가 훨씬 나았을 것.
AQE 해결:
- Medium 필터 결과를 shuffle.
- 실제 크기가 5 MB임을 확인.
- BroadcastHashJoin으로 동적 전환.
- 수 배 빠름.
예시:
Before AQE: 120 seconds (SortMergeJoin)
After AQE: 25 seconds (auto-switched to BroadcastHashJoin)
설정:
spark.sql.adaptive.autoBroadcastJoinThreshold=10MB
3. Dynamically Optimizing Skew Joins
문제: Data skew. 일부 key가 과도하게 많음.
시나리오:
SELECT * FROM orders JOIN users ON orders.user_id = users.id
- 대부분 user는 10~100 orders.
- 한 user (봇, 관리자)가 1,000,000 orders.
- 해당 partition이 100배 크다.
- 그 task가 100배 오래 걸림.
- 나머지 tasks는 빨리 끝나지만 skewed task 기다림.
전통적 해결: Salting (수동, 복잡).
AQE 해결:
- Shuffle 후 각 partition 크기 확인.
- 평균의 5배 이상 크면 skewed로 판단.
- Skewed partition을 여러 sub-partition으로 분할.
- 각 sub-partition이 다른 task에서 처리.
- 반대쪽 partition도 복제 (broadcast 필요).
예시:
Before:
Task 1: 1 GB (skewed user) → 100 seconds
Task 2: 10 MB → 1 second
Task 3: 10 MB → 1 second
...
Total time: 100 seconds (Task 1 bound)
AQE divides Task 1 into 10 sub-tasks:
Task 1a: 100 MB → 10 seconds
Task 1b: 100 MB → 10 seconds
...
Task 2: 10 MB → 1 second
Total time: 10 seconds (10x faster)
설정:
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
4. Dynamic Partition Pruning (DPP) (AQE와 함께 발전)
Star schema의 fact + dimension join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
전통: fact_sales 전체 스캔 + join + filter.
DPP:
dim_date에서 year=2024인 id 먼저 추출.- 이 id들을 fact_sales의 partition filter로 pushdown.
fact_sales스캔 시 해당 partition만 로드.
효과:
- 파티션된 fact_sales (year별)이면 1 TB → 50 GB.
- 20배 빠른 쿼리.
실측 효과:
Databricks TPC-DS 벤치마크:
- AQE 비활성: 100% (baseline).
- AQE 활성: 130~200%.
- 일부 쿼리: 3배+ 향상.
- 평균: 1.8배.
거의 공짜. 기존 코드 변경 없이.
Spark 버전별:
- Spark 3.0: AQE 도입 (기본 비활성).
- Spark 3.2: AQE 기본 활성화.
- Spark 3.3+: 개선 지속.
내부 작동:
AQE는 Spark의 query execution을 stage 단위로 실행하되, stage 사이에 pause한다:
- Stage 1 실행.
- Stage 1 완료 → shuffle 파일 생성.
- Shuffle 파일의 실제 통계 수집.
- Stage 2의 physical plan을 re-optimize.
- Stage 2 실행.
- 반복.
이 stage 단위 통계 → 재최적화 루프가 AQE의 핵심.
한계와 함정:
1. Streaming에 제한적: Structured Streaming은 AQE 덜 적용됨.
2. 작은 쿼리엔 불필요: Re-optimization 오버헤드가 작은 쿼리에선 역효과.
3. 복잡한 쿼리 예측 어려움: 매우 복잡한 쿼리는 여전히 수동 튜닝 필요.
4. DPP와의 상호작용: 복잡한 경우 예측 안 되는 계획 변경.
실전 권장:
# 기본 설정 (Spark 3.2+는 이미 기본)
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
# 파라미터 조정
spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB # 목표 partition 크기
spark.sql.adaptive.coalescePartitions.minPartitionSize=32MB # 최소
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 # skew 기준
교훈:
AQE는 "optimization은 한 번에 끝나지 않는다" 는 철학의 구현이다. 전통 optimizer는 ahead-of-time (실행 전 모든 결정), AQE는 just-in-time (필요 시 재최적화).
이는 JIT 컴파일러의 철학과 같다:
- 처음엔 inexpert 결정.
- 런타임 관찰.
- 최적화.
분산 데이터 처리에서 같은 원칙 적용. Spark가 선구자였고, 이후 Trino, Presto도 유사 기능 추가.
**"정적 vs 동적 최적화"**의 트레이드오프:
- 정적: 단순, 예측 가능. 완전 최적은 아닐 수 있음.
- 동적: 복잡, 오버헤드. 실제 데이터에 적응.
AQE는 이 둘 사이의 실용적 타협이다. 완벽한 동적 재계획은 아니지만, stage 경계에서 핵심 결정을 조정함으로써 대부분의 이득을 얻는다.
미래:
AQE는 계속 발전 중:
- Multi-stage re-optimization.
- Cost model 개선.
- ML 기반 최적화 (실험적).
- Streaming adaptation.
Spark가 10년 넘게 데이터 처리 영역을 선도하는 이유는 이런 지속적 혁신 때문이다. "충분히 좋다"에 만족하지 않고, 매 버전마다 의미 있는 개선을 가져온다. AQE는 그 한 예이며, 앞으로도 더 나아질 것이다.
마치며: 데이터의 정교한 엔진
핵심 정리
- RDD: 메모리 기반 분산 컬렉션. Lineage로 장애 복구.
- DataFrame: 구조화된 API. Schema + optimizer.
- Catalyst: Rule + Cost 기반 SQL 최적화.
- Tungsten: Off-heap binary format. CPU 캐시 친화.
- WSCG: 런타임 Java 코드 생성. Volcano 오버헤드 제거.
- Shuffle: 분산의 핵심이자 병목. Sort shuffle 기본.
- AQE: 런타임 통계로 재최적화. 30-200% 향상.
Spark가 우리에게 준 것
Spark는 빅데이터 처리의 민주화다. MapReduce 시대엔:
- Java로 map/reduce 작성.
- 복잡한 최적화 수동.
- 느린 반복.
Spark 이후:
- SQL 또는 간단한 DataFrame API.
- 자동 최적화 (Catalyst + Tungsten + AQE).
- 빠른 반복.
- 배치 + 스트림 + ML 통합.
진입 장벽이 낮아지면서 더 많은 사람이 데이터를 다룰 수 있게 되었다. 데이터 분석가도 SQL로 복잡한 분산 쿼리 실행. 데이터 과학자도 머신러닝 파이프라인을 PySpark로.
실전 권장
- 기본 설정 이해:
spark.executor.memory,spark.sql.shuffle.partitions, AQE. - Broadcast join 활용: 작은 dimension 테이블.
- Partition column 선택: 쿼리 패턴에 맞게.
- EXPLAIN 읽기: 느린 쿼리의 첫 단계.
- Spark UI 활용: Task 분포, GC time 모니터.
- 적절한 file format: Parquet/ORC 선호.
마지막 교훈
Spark의 성공은 기술적 우수성 + 타이밍 + 생태계의 합이다:
- 기술: RDD, Catalyst, Tungsten의 혁신.
- 타이밍: 메모리 가격 하락 + 빅데이터 수요 폭발.
- 생태계: Python/R 지원, ML, 스트리밍, SQL 모두 통합.
이 세 가지 중 하나라도 빠졌으면 Spark는 지금 위치에 없을 것이다. 기술은 필요하지만 충분하지 않다는 교훈이다.
당신이 다음에 Spark job을 실행할 때, 아래에서 벌어지는 일을 기억하자:
- Catalyst가 수십 개의 rule로 쿼리 재작성.
- Tungsten이 바이너리 포맷으로 메모리 절약.
- WSCG가 Java 코드 생성 + 컴파일.
- AQE가 실행 중 통계로 재최적화.
- Shuffle이 데이터 재분배.
이 모든 것이 수 ms~수 초 안에 일어난다. 그리고 당신은 단지 .filter().groupBy().count()만 썼다. 이것이 현대 데이터 엔지니어링의 마법이다.
참고 자료
- Apache Spark Documentation
- Learning Spark, 2nd Edition (Jules Damji et al.)
- Spark SQL: Relational Data Processing (Armbrust et al., 2015)
- Resilient Distributed Datasets (Zaharia et al., 2012)
- Databricks Engineering Blog
- Spark Internals by Reynold Xin
- Project Tungsten: Bringing Spark Closer to Bare Metal
- Apache Spark as a Compiler (Databricks Blog)
- Adaptive Query Execution in Apache Spark
- High Performance Spark (Holden Karau, Rachel Warren)
Apache Spark Internals Deep Dive 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle
Introduction: How Spark Dethroned MapReduce
2010, UC Berkeley
In 2010, Matei Zaharia published "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing" at UC Berkeley's AMPLab. Hadoop MapReduce was king. Spark claimed:
- 100x faster via in-memory computation.
- Much simpler API.
Timeline:
- 2013: Donated to Apache.
- 2014: Databricks founded.
- 2016: Spark 2.0 (DataFrame, Catalyst, Tungsten).
- 2020: Spark 3.0 (Adaptive Query Execution).
- Today: de facto standard for big data.
Why So Fast
Spark's performance revolution came from three axes:
- RDD (Resilient Distributed Dataset): in-memory computation.
- Catalyst Optimizer: query optimization.
- Tungsten: CPU and memory optimization.
Combined, they deliver 100x — sometimes 1000x — over MapReduce.
1. RDD: Where It All Began
Motivation
Circa 2009, MapReduce dominated but had problems:
- Iterative algorithms (ML, graphs) extremely slow.
- Each job round-trips to disk.
- Interactive analysis impossible.
Zaharia's insight: "What if we keep data in memory?" But memory is volatile — how to handle node failures? Answer: track the lineage (transformation history), not the data. Recompute on failure.
RDD Properties
- Resilient: tolerates failures via lineage.
- Distributed: partitioned across the cluster.
- Immutable, Lazy, Typed, Partitioned.
Transformation vs Action
Transformations build new RDDs lazily:
val lines = sc.textFile("data.txt")
val words = lines.flatMap(_.split(" "))
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _)
Actions trigger execution:
reduced.collect()
Laziness enables fused passes, early-exit on take(10), and pipelined execution without intermediate materialization.
Lineage & Fault Tolerance
val rdd1 = sc.textFile("data.txt")
val rdd2 = rdd1.map(x => x.toUpperCase)
val rdd3 = rdd2.filter(x => x.startsWith("A"))
If a partition of rdd3 is lost, Spark recomputes just that partition from lineage. No replication needed. Long lineages can be shortened with checkpoint().
Narrow vs Wide Dependency
- Narrow (map, filter, union): child depends on a few parent partitions. Pipelined within a stage.
- Wide (groupByKey, reduceByKey, join): requires shuffle. Stage boundary.
DAG and Stage
On action: RDD graph becomes a DAG, split at wide dependencies into stages. Each stage decomposes into tasks (one per partition) executed in parallel.
Limitations of RDDs
No schema, no optimizer, slow Java serialization, JVM object overhead. Spark 2.0 answered with DataFrame/Dataset + Catalyst + Tungsten.
2. DataFrame & Dataset
DataFrame (Spark 1.3)
Structured, SQL-like view of an RDD:
val df = spark.read.json("people.json")
df.filter($"age" > 20).select("name").show()
Schema-aware, higher-level API, Catalyst-optimized, stored in Tungsten binary format.
Dataset (Spark 1.6+)
DataFrame + compile-time type safety:
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)
DataFrame = Dataset[Row]. Python uses DataFrame only.
Performance Difference
Filtering 100M rows: RDD ~100s, DataFrame/Dataset ~15s — 6-7x faster because of Catalyst, Tungsten, and whole-stage codegen.
3. Catalyst Optimizer
Four Stages
SQL or DataFrame
-> Analysis (resolve names, types)
-> Logical Optimization (rule-based)
-> Physical Planning (multiple plans + cost)
-> Code Generation (Tungsten)
-> RDD execution
Logical Optimization Rules
- Constant folding:
age * 2 + 3precomputed. - Predicate pushdown: filters move below joins and into data sources.
- Projection pushdown: read only needed columns.
- Simplification:
WHERE true AND x > 10becomesWHERE x > 10. - Subquery elimination:
IN (SELECT ...)becomes semi-join.
Example
val df = spark.sql("""
SELECT u.name, COUNT(*) as orders
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
""")
df.explain(true)
Optimized plan pushes country = 'KR' filter down to users, projects only needed columns before the join.
Physical Planning
Generates multiple physical plans and picks by cost:
- BroadcastHashJoin: small table.
- SortMergeJoin: large tables.
- ShuffledHashJoin: medium.
- BroadcastNestedLoop: cartesian.
Cost-Based Optimizer (CBO)
Since Spark 2.2, with:
ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age
Catalyst uses row counts, min/max, null/distinct counts to reorder joins and estimate selectivity.
4. Tungsten: Reinventing CPU and Memory
JVM Object Overhead
Integer i = 42; // ~24 bytes for a 4-byte int
String s = "hello"; // 60+ bytes for 5 chars
Billions of objects mean massive memory waste and GC pressure.
Off-heap Binary Format
Tungsten stores rows as contiguous bytes off the JVM heap:
Row: [age=30, name="Alice"]
Binary: [bitmap][age:4][name_offset:4][name_length:4]...[name_data]
Benefits: no JVM objects, no GC pressure, cache-friendly, direct memory access via sun.misc.Unsafe.
UnsafeRow
public final class UnsafeRow extends MutableRow {
private Object baseObject;
private long baseOffset;
private int sizeInBytes;
private int numFields;
}
Field access is pointer arithmetic:
long getLong(int i) {
return Platform.getLong(baseObject, baseOffset + offsets[i]);
}
Memory Management
Tungsten manages execution memory (shuffle, join, sort) and storage memory (cache) with dynamic borrowing. Supports both on-heap and off-heap.
spark.executor.memory = 8g
spark.memory.fraction = 0.6
spark.memory.storageFraction = 0.5
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g
Result: 30-70% less memory, 2-5x faster queries, much less GC pause.
5. Whole-Stage Code Generation
Volcano Model Overhead
Traditional SQL engines use iterator-based Volcano:
while (child.hasNext()) {
Row row = child.next();
if (predicate(row)) process(row);
}
Per-operator virtual calls, per-row object allocations, and frequent cache misses add up.
WSCG Idea
Fuse multiple operators into a single generated function. For SELECT name FROM users WHERE age > 20:
while (scanner.hasNext()) {
Row row = scanner.next();
if (row.getInt("age") > 20) {
emit(row.getString("name"));
}
}
One loop, no virtual calls, JIT-friendly.
Pipeline
Spark 2.0 generates Java source per stage (up to shuffle boundary), compiles with Janino, runs on the JVM which JIT-compiles to native. Typical speedup: 2-10x.
Example Generated Code
public class GeneratedIterator {
private scan_input_0;
public boolean hasNext() { return scan_input_0.hasNext(); }
public UnsafeRow next() {
while (scan_input_0.hasNext()) {
UnsafeRow row = scan_input_0.next();
int age = row.getInt(2);
if (age > 20) {
UTF8String country = row.getUTF8String(3);
if (country.equals("KR")) {
UnsafeRow output = /* build row */;
return output;
}
}
}
return null;
}
}
Inspect with EXPLAIN
df.explain("codegen")
Shows the generated Java code. *(1) marks a WSCG stage.
6. Shuffle: The Core of Distributed Aggregation
Why Shuffle
Wide dependencies require redistribution:
Stage 1 (4 partitions):
P0: [(a,1), (b,2), (c,3)]
P1: [(a,4), (d,5)]
...
reduceByKey -> all 'a' go to the same partition.
Shuffle Phases
- Map side: each task writes to local files, partitioned by reducer.
- Reduce side: reducers fetch their partitions from all mappers over the network, then sort/aggregate.
Shuffle is Spark's most expensive operation — disk I/O, network, serialization, sorting.
Shuffle Strategies
- Hash Shuffle (early): M x R files — file system overload.
- Sort Shuffle (1.1+, default): one file per map task with index file.
- Tungsten Sort Shuffle (1.5+): binary format, cache-friendly.
Broadcast Join
val joined = big.join(broadcast(small), "key")
Driver broadcasts small to every executor; each local partition of big joins locally. No shuffle. Default threshold: spark.sql.autoBroadcastJoinThreshold = 10 MB. Often several times to tens of times faster than shuffle joins.
Shuffle Partitioning
spark.sql.shuffle.partitions default 200. Too few -> OOM and under-parallelism; too many -> small-file overhead. Target ~100-200 MB per partition.
Optimization Techniques
df.coalesce(10).write.parquet(...)
df.repartition(50, $"key")
val hint_df = big.join(broadcast(small), ...)
Plus skew handling (below).
7. Adaptive Query Execution (AQE)
Spark 3.0's Killer Feature
AQE re-plans queries at runtime using actual shuffle statistics.
spark.sql.adaptive.enabled = true # default true since 3.2
Three Optimizations
- Dynamically coalesce shuffle partitions: merges small partitions after shuffle (e.g. 200 -> 20).
- Dynamically switch join strategies: upgrades SortMergeJoin to BroadcastHashJoin when actual sizes are small.
- Dynamically optimize skew joins: splits oversized partitions into sub-partitions.
Databricks TPC-DS: AQE yields roughly 1.3-2x speedup on average; some queries 3x+.
8. Dynamic Partition Pruning
Problem
Star schema join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
Without DPP, fact_sales is scanned in full.
With DPP
Spark extracts matching ids from dim_date, pushes them into fact_sales partition filter. Multi-TB scans become multi-GB — 10-100x speedup.
Requires: partitioned table, highly selective dimension filter, Spark 3.0+.
9. Practical Tuning
Memory
--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10
Rules: 4-6 cores per executor; overhead = max(384MB, 10%).
Shuffle
--conf spark.sql.shuffle.partitions=200
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
Partitions = total data / target size (200 MB). E.g. 1 TB -> 5000 partitions.
Broadcast Threshold
--conf spark.sql.autoBroadcastJoinThreshold=100MB
Default 10 MB is often too small. Watch executor memory — broadcast replicates to all executors.
Cache
df.cache()
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
Cache when reused multiple times or expensive to compute. Spilling to disk may be worse than recomputation.
Skew (Manual)
val salted = df.withColumn("salted_key",
concat($"key", lit("_"), (rand() * 10).cast("int")))
10. Performance Debugging
Spark UI
Most important tool: Jobs, Stages, SQL/DataFrame, Executors, Storage tabs.
Key Metrics
- Task duration distribution: large median-to-max gap means skew.
- Shuffle read/write: bigger than expected means filter pushdown failed.
- GC time:
>10% of task time means trouble.
Explain
df.explain()
df.explain(true)
df.explain("cost")
df.explain("codegen")
df.explain("formatted")
11. Common Pitfalls
- Too many small files ->
coalesce/repartition. - Skew -> AQE skew join, filter null keys, salting.
- Broadcast OOM -> lower threshold or remove
broadcast()hint. - Too many shuffle partitions -> AQE coalesce or manual adjust.
- Python UDF overhead -> use Pandas UDF (Arrow-based, 10x faster):
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
return s * 2
- Wrong partition counts -> aim for 100-200 MB each.
Quiz Review
Q1. How does RDD lineage enable fault recovery?
Each RDD remembers its parent and the transformation that produced it. On failure, Spark recomputes only the lost partitions from their lineage. This avoids replication (saving network/disk) but can be slow for long lineages — mitigated by checkpoint(). Partition-level recomputation is the key property. Wide dependencies (shuffles) complicate recovery because Spark may need many parent partitions; shuffle files kept on local disk help. Lineage assumes pure functions — side effects like saveToDatabase may duplicate on retry, so idempotency matters.
Lineage reflects a functional philosophy: store how to build state, not the state itself. Combined with in-memory computation, it made Spark 100x faster than MapReduce while remaining fault-tolerant.
Q2. How does Catalyst rewrite queries for performance?
Catalyst represents queries as trees and applies dozens of rewrite rules iteratively.
Key rules:
- Projection pushdown: read only referenced columns.
- Filter pushdown: move predicates into data sources (Parquet predicate pushdown can skip entire row groups using min/max stats).
- Constant folding: evaluate constants at compile time.
- Simplification:
WHERE true AND x > 10becomesWHERE x > 10. - Join reorder (CBO): pick cheapest join order from statistics.
- Subquery unnesting:
IN (SELECT ...)becomes semi-join. - Column pruning through joins.
- Decimal precision optimization.
Physical planning picks among BroadcastHashJoin, SortMergeJoin, etc. using cost. Catalyst rules are user-extensible — Delta Lake and Iceberg build on this. Result: naive SQL often runs at hand-tuned speed.
Q3. Why does Tungsten use binary format instead of JVM objects?
A JVM Integer takes ~24 bytes for 4 bytes of data; a 5-char String takes ~72 bytes. At billion-row scale, this 10x overhead becomes catastrophic memory bloat and GC pressure.
Tungsten stores rows as contiguous byte arrays with inline fixed-size fields, pointer-offset variable-size fields, and a null bitmap. UnsafeRow accesses fields via sun.misc.Unsafe pointer arithmetic.
Benefits:
- Memory: ~6x less.
- GC pressure: removed (off-heap or fewer, larger arrays).
- Cache friendliness: contiguous layout fits cache lines.
- Serialization:
UnsafeRowships over the wire as-is. - SIMD-friendly layout.
Costs: low-level complexity, harder debugging, expensive variable-size mutations, Python-JVM conversion cost (now mitigated by Arrow).
Spark 3.x added columnar vectorized execution (Parquet/ORC readers, aggregation in batches of 4096). Tungsten and Arrow are complementary: Tungsten for row-wise internal processing, Arrow for cross-language columnar exchange.
Q4. Why is Whole-Stage CodeGen faster than Volcano?
Volcano's iterator model (from the 1970s) elegantly composes operators via next(), but modern CPUs hate it:
- Virtual function calls prevent inlining.
- Per-row object allocations pressure GC.
- Branch misprediction across operators.
- Cache misses between operator code paths.
- Java boxing.
WSCG fuses operators in a stage into a single generated Java function, compiles with Janino, then JVM JIT optimizes. Resulting code has one loop, inlined checks, direct memory access via Tungsten.
Databricks TPC-DS: 4-10x faster than Volcano.
Constraints: JVM's 64KB method limit, complex operators (window, UDF) fall back to Volcano, debugging generated code is hard, compilation overhead for tiny queries.
Comparison:
- Volcano: baseline.
- WSCG: 4-10x.
- Vectorized (ClickHouse, DuckDB): 4-10x.
- LLVM-native (HyPer/Umbra): 10-20x.
Spark combines vectorized Parquet reading with WSCG for row processing — a pragmatic balance.
Q5. How does AQE use runtime statistics?
Static optimizers commit to a plan before execution; their estimates can be wrong due to stale stats, hard selectivity estimation, or unpredictable join sizes. AQE pauses at shuffle boundaries, inspects the actual shuffled data, and re-optimizes the remaining plan.
Three main optimizations:
- Coalesce shuffle partitions: merges tiny partitions (e.g. 200 -> 20).
- Switch join strategy: upgrade SortMergeJoin to BroadcastHashJoin when actual size fits.
- Skew join: split oversized partitions into sub-partitions processed by separate tasks.
Example: a skewed partition taking 100s turns into 10 sub-partitions of 10s each.
Dynamic Partition Pruning works alongside AQE: extract dim filter ids, push as partition filter into fact table, scan only needed partitions. Multi-TB scans become multi-GB.
Spark 3.0 introduced AQE (opt-in); 3.2 enabled by default. TPC-DS average ~1.8x, some queries 3x+.
Limits: less applicable to streaming, overhead on tiny queries, complex queries may still need manual tuning. Philosophy: optimization is JIT, not AOT.
Closing: The Data Engine Under the Hood
Summary
- RDD: in-memory distributed collection; lineage-based recovery.
- DataFrame: structured API + schema + optimizer.
- Catalyst: rule + cost-based SQL optimization.
- Tungsten: off-heap binary format, cache-friendly.
- WSCG: runtime Java code generation, removes Volcano overhead.
- Shuffle: the core of distribution and the main bottleneck.
- AQE: runtime re-optimization, 30-200% speedup.
Practical Recommendations
- Understand defaults:
spark.executor.memory,spark.sql.shuffle.partitions, AQE. - Use broadcast join for small dimension tables.
- Partition columns that match query patterns.
- Read EXPLAIN for slow queries.
- Monitor via Spark UI (task distribution, GC time).
- Prefer columnar formats (Parquet/ORC).
Final Lesson
Spark's success = technical excellence + timing + ecosystem. Next time you write .filter().groupBy().count(), remember: Catalyst rewrites it, Tungsten lays out memory, WSCG generates Java, AQE re-plans at runtime, and shuffle redistributes — all in milliseconds to seconds.
References
- Apache Spark Documentation
- Learning Spark, 2nd Edition (Jules Damji et al.)
- Spark SQL: Relational Data Processing (Armbrust et al., 2015)
- Resilient Distributed Datasets (Zaharia et al., 2012)
- Databricks Engineering Blog
- Project Tungsten: Bringing Spark Closer to Bare Metal
- Apache Spark as a Compiler (Databricks Blog)
- Adaptive Query Execution in Apache Spark
- High Performance Spark (Holden Karau, Rachel Warren)