필사 모드: Apache Spark 내부 완전 가이드 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 심층 분석
한국어들어가며: Spark는 어떻게 MapReduce를 쫓아냈는가
2010년, UC Berkeley에서
2010년 UC Berkeley의 AMPLab에서 **Matei Zaharia**가 논문을 발표했다: "**Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing**".
당시 빅데이터의 왕은 **Hadoop MapReduce**였다. Spark는 두 가지 주장을 했다:
1. **메모리 기반으로 100배 빠르다**.
2. **API가 훨씬 단순하다**.
처음엔 MapReduce 진영이 회의적이었다. 하지만 Spark는:
- **2013**: Apache에 기증.
- **2014**: Databricks 창업.
- **2016**: Spark 2.0 (DataFrame, Catalyst, Tungsten).
- **2020**: Spark 3.0 (Adaptive Query Execution).
- **현재**: 빅데이터 처리의 **사실상 표준**.
Hadoop MapReduce는 사라졌다. Spark는 전 세계 수만 기업의 데이터 파이프라인의 중추가 되었다.
왜 이렇게 빨라졌는가
Spark의 성능 혁명은 **세 가지 축**에서 나왔다:
1. **RDD (Resilient Distributed Dataset)**: In-memory 연산.
2. **Catalyst Optimizer**: 쿼리 최적화.
3. **Tungsten**: CPU와 메모리 최적화.
이 세 가지가 합쳐져 MapReduce 대비 100배, 때로는 1000배 성능을 낸다.
이 글에서 다룰 것
1. **RDD의 탄생과 lineage**.
2. **DataFrame/Dataset**: 구조화된 데이터.
3. **Catalyst optimizer**: 쿼리 재작성.
4. **Physical plan과 cost-based 최적화**.
5. **Tungsten project**: CPU 캐시 친화적 실행.
6. **Whole-stage code generation**: JIT으로 코드 생성.
7. **Shuffle**: 분산 집계의 비밀.
8. **Adaptive Query Execution**: 런타임 최적화.
9. **실전 튜닝**.
왜 이 지식이 중요한가
- **데이터 엔지니어**: Spark 튜닝의 80%는 내부 이해.
- **DA/DS**: 왜 이 쿼리가 느린지 설명.
- **플랫폼 엔지니어**: 클러스터 최적화.
- **다른 시스템 학습**: Dask, Ray, Flink가 Spark의 개념을 차용.
1. RDD: 모든 것의 시작
탄생 배경
2009년경 빅데이터 처리는 **MapReduce**가 지배했다. 문제는:
1. **반복 알고리즘**이 극도로 느림 (ML, 그래프).
2. 각 Map-Reduce job이 **디스크를 왕복**.
3. **인터랙티브 분석**이 불가능 (매번 몇 분~시간).
**Zaharia의 통찰**: "**메모리에서 데이터를 유지**하면 되지 않을까?"
문제: 메모리는 휘발성. 노드 장애 시 어떻게?
**해결**: **Lineage (계보)**. 데이터가 아닌 **변환 연산의 기록**을 유지. 장애 시 재계산.
RDD란
**RDD (Resilient Distributed Dataset)**:
- **Resilient**: 장애에 견딤 (lineage로).
- **Distributed**: 클러스터에 분산.
- **Dataset**: 데이터 집합.
핵심 특성:
1. **Immutable**: 한 번 만들어지면 변경 불가.
2. **Lazy**: Transformation은 action이 불릴 때까지 실행 안 됨.
3. **Typed**: Java/Scala의 타입 시스템 활용.
4. **Partitioned**: 여러 worker에 분산.
Transformation vs Action
**Transformation**: 새 RDD 생성. **Lazy**.
val lines = sc.textFile("data.txt") // RDD[String]
val words = lines.flatMap(_.split(" ")) // 아직 실행 안 됨
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _) // 여전히 실행 안 됨
**Action**: 실제 계산 트리거. **Eager**.
reduced.collect() // 여기서 전체 체인이 실행됨
왜 Lazy인가
Lazy evaluation의 장점:
1. **최적화 기회**: 여러 transformation을 **한 번에 처리**. 예: map → filter → map을 하나의 pass로.
2. **불필요한 계산 회피**: `take(10)`만 필요하면 10개만.
3. **Pipeline**: 중간 데이터를 메모리에 쌓지 않음.
Lineage와 Fault Tolerance
val rdd1 = sc.textFile("data.txt") // Dependency: HDFS
val rdd2 = rdd1.map(x => x.toUpperCase) // Dependency: rdd1
val rdd3 = rdd2.filter(x => x.startsWith("A")) // Dependency: rdd2
만약 `rdd3`를 계산 중에 한 partition이 노드 장애로 손실되면:
1. Lineage 확인: rdd3 → rdd2 → rdd1 → file.
2. 손실된 partition을 **재계산**.
3. 다른 partition은 그대로.
**장점**: Replication 없이 내결함성. 디스크 I/O 없음.
**단점**: Lineage가 길면 재계산이 오래 걸림. **Checkpoint**로 해결 (중간 저장).
Narrow vs Wide Dependency
RDD 간 의존성은 두 종류:
**Narrow dependency**:
- 자식 partition이 부모의 **소수 partition**에 의존.
- 예: map, filter, union.
- **한 stage 내에서 pipeline 실행 가능**.
- 재계산 시 영향 범위 작음.
**Wide dependency** (shuffle dependency):
- 자식 partition이 부모의 **여러 partition**에 의존.
- 예: groupByKey, reduceByKey, join.
- **Shuffle 필요**: 데이터 재분산.
- Stage 경계.
Narrow: Wide:
Parent Parent Parent Parent Parent
| | | / | \ |
▼ ▼ ▼ ▼ ▼ ▼ ▼
Child1 Child2 ↓ ↓ ↓ ↓ ↓
Shuffle 발생
↓ ↓ ↓
Child1 Child2 Child3
DAG와 Stage
Spark가 액션을 받으면:
1. **RDD graph를 DAG로 변환**.
2. **Wide dependency를 경계로 stage 분할**.
3. 각 stage는 **tasks**로 분해 (partition 수 만큼).
4. Stages를 순차적으로, task를 병렬로 실행.
Action: reduce()
↓
Logical DAG:
RDD1 → map → RDD2 → reduceByKey → RDD3 → collect
Stages:
Stage 1: textFile → map (narrow, pipeline)
[Shuffle]
Stage 2: reduceByKey → collect
RDD의 한계
RDD는 훌륭했지만 몇 가지 약점:
1. **Schema 없음**: 그냥 Java/Scala 객체의 컬렉션.
2. **Optimizer 없음**: 사용자가 직접 최적화.
3. **느린 직렬화**: Java serialization.
4. **메모리 오버헤드**: JVM 객체의 overhead.
Spark 2.0은 이를 해결하기 위해 **DataFrame/Dataset + Catalyst + Tungsten**을 도입했다.
2. DataFrame & Dataset: 구조화 혁명
DataFrame의 등장
Spark 1.3 (2015)에서 **DataFrame** 도입. SQL의 테이블처럼 구조화된 RDD:
val df = spark.read.json("people.json")
df.show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Bob |
// | 25|Alice|
// +---+----+
df.filter($"age" > 20).select("name").show()
**RDD와 차이**:
- **Schema 있음**: 컬럼과 타입 정보.
- **SQL처럼 조작**: 고수준 API.
- **Optimizer 적용**: Catalyst가 알아서 최적화.
- **Tungsten 메모리 포맷**: JVM 객체 아닌 바이너리.
Dataset (Spark 1.6+)
**Dataset**: DataFrame + **타입 안전성**.
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)
**장점**:
- 컴파일 타임 타입 체크.
- Object API와 DataFrame 최적화 모두.
**Scala/Java에만 존재**. Python은 DataFrame만 사용 (Python은 정적 타입이 아니므로).
DataFrame = Dataset[Row]
사실 DataFrame은 `Dataset[Row]`의 alias. Row는 untyped.
// 같은 것
val df: DataFrame = spark.read.json(...)
val df: Dataset[Row] = spark.read.json(...)
성능 차이
단순 예시: 1억 행 필터링.
- **RDD**: 100초.
- **DataFrame/Dataset**: 15초.
**6~7배 차이**. 이유:
1. Catalyst 최적화 (predicate pushdown 등).
2. Tungsten 바이너리 포맷.
3. Whole-stage codegen.
3. Catalyst Optimizer: 쿼리 재작성의 마법
Catalyst의 4단계
Catalyst는 SQL/DataFrame query를 4단계로 처리한다:
SQL or DataFrame
↓
1. Analysis (unresolved plan → resolved logical plan)
↓
2. Logical Optimization (rule-based)
↓
3. Physical Planning (여러 물리 계획 생성 + 비용 비교)
↓
4. Code Generation (Tungsten)
↓
RDD (실행)
1단계: Analysis
**입력**: Unresolved logical plan.
SQL: SELECT name FROM users WHERE age > 20
↓ parser
UnresolvedRelation("users")
→ UnresolvedAttribute("age")
→ UnresolvedAttribute("name")
**Catalog**를 참조해서 resolve:
- 테이블 이름 → 실제 데이터 소스.
- 컬럼 이름 → 컬럼 타입.
- 함수 → 실제 구현.
결과: **Resolved Logical Plan**.
2단계: Logical Optimization
**Rule-based** 최적화. 수십~수백 개의 rule을 반복 적용:
**Constant Folding**:
age * 2 + 3 → (미리 계산)
**Predicate Pushdown**:
SELECT * FROM (table JOIN other) WHERE table.x > 10
↓ pushdown
SELECT * FROM (SELECT * FROM table WHERE x > 10) JOIN other
필터를 조인 전으로 밀어 넣음. 중간 결과 크기 감소.
**Projection Pushdown**:
SELECT name FROM users
// users에 50개 컬럼 있어도 name만 읽음
**Simplification**:
WHERE true AND x > 10 → WHERE x > 10
WHERE false → never executes
**Subquery Elimination**:
SELECT * FROM t WHERE id IN (SELECT id FROM s)
↓
SELECT * FROM t JOIN s ON t.id = s.id
Logical Plan 예시
val df = spark.sql("""
SELECT u.name, COUNT(*) as orders
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
""")
df.explain(true)
**Parsed Logical Plan**:
'Project ['u.name, 'COUNT(1) AS orders]
+- 'Filter ('u.country = KR)
+- 'Join Inner, ('u.id = 'o.user_id)
:- 'UnresolvedRelation `users`
+- 'UnresolvedRelation `orders`
**Analyzed Logical Plan**:
Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Filter (country#12 = KR)
+- Join Inner, (id#9 = user_id#15)
:- Relation[id#9,name#10,age#11,country#12] parquet
+- Relation[user_id#15,total#16] parquet
**Optimized Logical Plan**:
Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Project [name#10]
+- Join Inner, (id#9 = user_id#15)
:- Project [id#9, name#10]
+- Filter (country#12 = KR)
+- Relation[...] parquet
+- Project [user_id#15]
+- Relation[...] parquet
변화:
- Filter가 users 바로 위로 이동.
- Projection이 각 relation 바로 위로.
- 불필요한 컬럼 제거.
3단계: Physical Planning
논리 계획을 **실행 가능한 물리 연산**으로 변환. Catalyst는 **여러 물리 계획을 생성**하고 비교.
**Join 선택 예시**:
- **BroadcastHashJoin**: 작은 테이블이면.
- **SortMergeJoin**: 큰 테이블들.
- **ShuffledHashJoin**: 중간 크기.
- **BroadcastNestedLoop**: cartesian이면.
Catalyst는 각 옵션의 **cost를 추정**하고 가장 저렴한 것 선택.
4단계: Code Generation
선택된 물리 계획을 **Java 바이트코드로 변환**. 다음 섹션에서 자세히.
Cost-Based Optimizer (CBO)
Spark 2.2+부터 **통계 기반** 최적화:
ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age
이후 Catalyst가:
- Table 크기, 행 수.
- 컬럼별 min/max/null count.
- 컬럼별 distinct count.
이 정보로 **join 순서 최적화**, selectivity 추정 등.
기본 RBO (rule-based optimizer)와 조합되어 더 나은 계획 생성.
4. Tungsten: CPU와 메모리의 재발명
Tungsten 프로젝트
Spark 1.5 (2015)에 시작. **"Spark 성능을 하드웨어 한계까지"**.
핵심 목표:
1. **JVM 메모리 오버헤드 제거**.
2. **CPU 캐시 활용**.
3. **Vectorized 실행**.
4. **코드 생성으로 CPU 파이프라인 최적화**.
JVM 객체의 문제
**Java 객체의 오버헤드**:
Integer i = 42;
`int` 값은 4바이트지만, `Integer` 객체는:
- Object header: ~16 바이트.
- Value field: 4 바이트.
- Padding: 4 바이트.
- **총 24 바이트**.
6배 오버헤드. 수백만 객체가 있으면 메모리 낭비 심각.
**String 더 심함**:
String s = "hello";
// Object: 40+ bytes
// char[] 객체: 24 bytes + 2 * 5 = 34 bytes
// 총 60+ bytes
해결: Off-heap Binary Format
Tungsten은 데이터를 **JVM 힙 밖**의 바이너리 포맷에 저장:
Row: [age=30, name="Alice"]
Binary encoding (off-heap):
[bitmap][age:4bytes][name_offset:4bytes][name_length:4bytes]...[name_data]
- **No JVM objects**.
- **No GC pressure**.
- **Cache-friendly layout**.
- **직접 메모리 접근**: `sun.misc.Unsafe`.
UnsafeRow
Spark의 기본 row 표현:
public final class UnsafeRow extends MutableRow {
private Object baseObject; // byte[] or null (off-heap)
private long baseOffset;
private int sizeInBytes;
private int numFields;
// ...
}
각 필드 접근:
long getLong(int i) {
return Platform.getLong(baseObject, baseOffset + offsets[i]);
}
Direct memory access. JVM 객체 생성 없음.
Cache Friendliness
Tungsten 포맷은 **연속 메모리**:
Row 0: [col0][col1][col2]
Row 1: [col0][col1][col2]
Row 2: [col0][col1][col2]
특정 컬럼만 읽으면:
col0: 0, 12, 24, ... (stride: row size)
Spatial locality 나쁨. 그러나 일반 접근 패턴에선 여전히 빠름.
**Vectorized reader** (Parquet 등)는 columnar 포맷으로 직접 읽음 → 더 캐시 친화적.
메모리 관리
Tungsten은 자체 **메모리 매니저**:
- **Execution memory**: Shuffle, join, sort용.
- **Storage memory**: Cache (persist).
- 두 영역이 **동적으로** 서로 빌림.
- **Off-heap** + **on-heap** 지원.
Spark 설정:
spark.executor.memory = 8g
spark.memory.fraction = 0.6 # execution + storage
spark.memory.storageFraction = 0.5 # storage의 최소 보장
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g
성능 효과
Tungsten 전후 벤치마크:
- **메모리 사용**: 30~70% 감소.
- **쿼리 속도**: 2~5배 향상.
- **GC pause**: 크게 감소.
특히 **aggregation, join 등 메모리 집약적** 작업에서 효과 큼.
5. Whole-Stage Code Generation
전통 Volcano 모델의 문제
전통 SQL 엔진은 **Volcano 모델** (iterator-based):
while (child.hasNext()) {
Row row = child.next();
if (predicate(row)) {
process(row);
}
}
각 연산자가 `next()`를 호출. 함수 호출 오버헤드:
- **Virtual function calls**: CPU 파이프라인 깨짐.
- **Object allocation**: row 객체 계속 생성.
- **Cache misses**: 매번 다른 코드 경로.
단순 쿼리도 연산자 수에 비례하는 오버헤드.
아이디어: 통합 코드 생성
"**여러 연산자를 하나의 함수로 병합**". 예:
SELECT name FROM users WHERE age > 20
**Volcano**:
Scan → Filter → Project
세 개의 iterator 객체.
**Codegen**:
// 런타임에 생성된 코드
while (scanner.hasNext()) {
Row row = scanner.next();
if (row.getInt("age") > 20) {
emit(row.getString("name"));
}
}
한 개의 루프. 함수 호출 없음. 컴파일러가 최적화.
Whole-Stage CodeGen
Spark 2.0 (2016)의 **Whole-Stage Code Generation (WSCG)**:
1. **Stage 단위로 합병**: Shuffle 경계까지 모두.
2. **Java 소스 코드 생성**: 런타임에.
3. **Janino**로 즉시 컴파일.
4. **직접 실행**: JVM이 JIT 최적화.
**결과**:
- 함수 호출 감소 (virtual call → inlined).
- 중간 객체 제거.
- CPU pipeline 활용.
- **2~10배 성능 향상**.
예시: 생성된 코드
SELECT id, name FROM users WHERE age > 20 AND country = 'KR'
**생성된 Java 코드** (간소화):
public class GeneratedIterator {
private scan_input_0; // Parquet reader
public boolean hasNext() { return scan_input_0.hasNext(); }
public UnsafeRow next() {
while (scan_input_0.hasNext()) {
UnsafeRow row = scan_input_0.next();
int age = row.getInt(2);
if (age > 20) {
UTF8String country = row.getUTF8String(3);
if (country.equals("KR")) {
// project id, name
UnsafeRow output = /* build row */;
return output;
}
}
}
return null;
}
}
모든 것이 **인라인**. 루프 하나. JIT이 바이트코드를 네이티브로 최적화.
Vectorized Execution과의 차이
Whole-stage codegen은 **row-at-a-time** 기반이다. 반면 **vectorized execution**은 **batch-at-a-time**:
while (batch = next_batch()) { // 1024 rows
for (row : batch) {
// process
}
}
ClickHouse, Snowflake, DuckDB가 vectorized.
Spark의 접근은 **둘 다 활용**:
- **Columnar scan**: Parquet, ORC를 vectorized로 읽음.
- **Row-based processing**: Codegen으로 각 row 처리.
- Spark 3.2+: 일부 연산자에 vectorized 지원 추가.
EXPLAIN으로 확인
df.explain("codegen")
출력:
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
...
실제 생성된 Java 코드를 볼 수 있다. 디버깅에 유용.
6. Shuffle: 분산 집계의 핵심
왜 Shuffle이 필요한가
Wide dependency (groupBy, join 등)는 데이터 **재분배**가 필요하다:
Stage 1 (4 partitions):
P0: [(a,1), (b,2), (c,3)]
P1: [(a,4), (d,5)]
P2: [(b,6), (c,7), (e,8)]
P3: [(d,9), (a,10)]
reduceByKey:
모든 'a'는 같은 partition으로
모든 'b'는 같은 partition으로
...
Stage 2 (4 partitions after shuffle):
P0: [(a, [1,4,10])] → [(a,15)]
P1: [(b, [2,6])] → [(b,8)]
P2: [(c, [3,7])] → [(c,10)]
P3: [(d, [5,9]), (e, [8])] → [(d,14),(e,8)]
Shuffle은 **stage 사이의 데이터 이동**. 네트워크와 디스크 I/O 집약적.
Shuffle 과정
**Write 단계** (Map 측):
1. 각 task가 **로컬 파일**에 결과 쓰기.
2. 각 reduce task에 해당하는 **partition으로 분류**.
3. 디스크에 저장 (나중에 reduce가 fetch).
**Read 단계** (Reduce 측):
1. 모든 map task로부터 자기 partition 데이터 **fetch**.
2. 네트워크로 전송 (executor 간).
3. 메모리에 모음.
4. 처리 (sort, aggregate 등).
Shuffle의 비용
Shuffle은 Spark의 **가장 비싼 연산**:
- **디스크 I/O**: write + read.
- **네트워크**: 모든 executor 간 cross.
- **Serialization**: 객체 ↔ 바이트.
- **Sort**: 보통 필요.
**규모**:
- 작은 job: 수 MB.
- 큰 job: 수 TB.
1 TB shuffle = 많은 시간과 자원.
Shuffle 전략의 진화
**1. Hash Shuffle** (초기 Spark):
- 각 map task가 각 reduce partition별로 **별도 파일**.
- M × R 파일 (M=map tasks, R=reduce tasks).
- 100 × 200 = 20,000 파일.
- 파일 시스템 부담.
**2. Sort Shuffle** (1.1+, 기본):
- 각 map task가 **하나의 파일**에 정렬된 결과.
- 인덱스 파일로 partition 위치 표시.
- M 파일만.
- **훨씬 효율적**.
**3. Tungsten Sort Shuffle** (1.5+):
- Tungsten의 binary format 활용.
- 캐시 친화적.
- 더 빠름.
Broadcast Join
**작은 테이블**을 조인할 때의 최적화:
val small = ... // 10 MB
val big = ... // 1 TB
val joined = big.join(broadcast(small), "key")
**Broadcast**:
1. Driver가 small 테이블을 **모든 executor에 복사**.
2. 각 executor가 big의 자기 partition과 **로컬에서 조인**.
3. **Shuffle 없음**.
**조건**: 작은 테이블이 메모리에 들어가야. 기본 `spark.sql.autoBroadcastJoinThreshold = 10 MB`.
**Shuffle join 대비**: **수 배~수십 배 빠름**.
Shuffle Partitioning
`spark.sql.shuffle.partitions`: shuffle 후 partition 수. 기본 200.
**너무 작음**:
- 큰 partition.
- 일부 task의 OOM.
- 병렬성 부족.
**너무 큼**:
- 많은 작은 파일.
- 스케줄링 오버헤드.
**가이드**: 각 partition이 **100~200 MB**가 되도록.
Shuffle 최적화 기법
**1. 파일 수 줄이기**:
df.coalesce(10).write.parquet(...)
작은 partition들을 병합.
**2. Partitioning 조정**:
df.repartition(50, $"key") // key 기반 재분할
**3. Broadcast 강제**:
val hint_df = big.join(broadcast(small), ...)
// 또는 SQL hint
// SELECT /*+ BROADCAST(small) */ ...
**4. Skew 처리**: 다음 섹션.
7. Adaptive Query Execution (AQE)
Spark 3.0의 킬러 기능
**Adaptive Query Execution (AQE)**: 런타임에 수집한 통계를 바탕으로 **쿼리 계획을 동적으로 재최적화**.
spark.sql.adaptive.enabled = true # 3.2+부터 기본 true
주요 기능
**1. Dynamically coalescing shuffle partitions**:
전통적 문제:
- `spark.sql.shuffle.partitions = 200`을 정해둠.
- 작은 shuffle은 200 partition이 과도.
- 큰 shuffle은 부족.
AQE:
- Shuffle 후 실제 데이터 크기 확인.
- **작은 partition을 자동 병합**.
- 예: 200 → 20 (각 partition이 충분한 크기).
**2. Dynamically switching join strategies**:
계획 시:
- `SortMergeJoin` 결정 (두 테이블 크기 추정).
실행 중:
- 실제 한 테이블이 예상보다 작은 것 판명.
- **동적으로 BroadcastHashJoin으로 전환**.
**3. Dynamically optimizing skew joins**:
Skew 감지:
- 특정 key의 partition이 평균보다 훨씬 큼.
AQE:
- 큰 partition을 **여러 sub-partition으로 분할**.
- 각각 별도 task로 실행.
- 결과를 union.
실측 효과
Databricks 벤치마크 (TPC-DS):
- AQE 전: 100%.
- AQE 후: 130% 이상.
- 일부 쿼리: **3배+ 향상**.
거의 공짜로 얻는 성능. AQE는 Spark 3.0의 가장 중요한 개선 중 하나.
8. Dynamic Partition Pruning
문제
Star schema의 fact와 dimension join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
전통:
1. `fact_sales` 전체 스캔 (테라바이트 급).
2. `dim_date` 필터 (year=2024).
3. Join.
`fact_sales`는 date_id로 **partitioned**되어 있어도 전체 스캔. Filter가 dim에만 있어서.
AQE + DPP
Spark 3.0의 **Dynamic Partition Pruning**:
1. `dim_date`에서 year=2024인 id 먼저 수집.
2. **그 id들을 fact_sales의 partition filter로 pushdown**.
3. `fact_sales` 스캔 시 **해당 partition만** 로드.
**효과**:
- 수 TB → 수 GB.
- **10~100배 빠른 쿼리**.
조건
- 파티션된 테이블.
- Dimension 필터가 highly selective.
- Spark 3.0+.
9. 실전 튜닝
메모리 설정
--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10
**원칙**:
- Executor당 **cores 4~6**: GC, I/O 균형.
- **Memory overhead = max(384MB, 10% of memory)**.
- **Instances**: 클러스터 용량과 workload에 따라.
Shuffle 튜닝
--conf spark.sql.shuffle.partitions=200 # 기본, 조정
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
**Partition 수 계산**:
- 총 데이터 / 목표 크기 (200 MB).
- 예: 1 TB → 5000 partitions.
Broadcast 임계값
--conf spark.sql.autoBroadcastJoinThreshold=100MB
기본 10 MB는 너무 작을 수 있음. 100 MB까지 올리면 더 많은 broadcast join.
**주의**: Executor 메모리 확인. Broadcast 테이블이 모든 executor에 복제되므로.
Cache 전략
df.cache() // MEMORY_AND_DISK (기본)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) // 직렬화 (작음)
**언제 cache**:
- **여러 번 사용**되는 DataFrame.
- 계산 비용 큰 DataFrame.
**주의**: 메모리 부족하면 disk로 spill. 재계산이 더 나을 수 있음.
파티셔닝
**Partitioned write**:
df.write.partitionBy("year", "month").parquet(...)
파일을 year/month별 디렉토리로 저장. 쿼리 시 **partition pruning**으로 빠른 읽기.
**주의**: 너무 많은 partition은 작은 파일 문제.
Skew 수동 처리
AQE가 부족하면 수동:
// Salting
val salted = df.withColumn("salted_key",
concat($"key", lit("_"), (rand() * 10).cast("int")))
// Join 후 다시 그룹화
10. 성능 디버깅
Spark UI
Spark Web UI가 가장 중요한 도구:
- **Jobs**: Action별 실행.
- **Stages**: Stage별 task 분포.
- **SQL/DataFrame**: 논리/물리 계획.
- **Executors**: 리소스 사용.
- **Storage**: Cache 상태.
주의할 Metric
**Task duration 분포**:
- **Median**과 **Max** 차이가 크면 **skew**.
- 99th percentile이 나쁘면 병목.
**Shuffle read/write**:
- 크기가 예상보다 크면 **filter pushdown 실패**.
- 작으면 효율적.
**GC time**:
- Task 시간의 **10% 이상**이면 GC 문제.
- Memory 증설 또는 parallelism 증가.
Explain 명령어
df.explain() // 물리 계획
df.explain(true) // 4단계 모두
df.explain("cost") // CBO 비용
df.explain("codegen") // 생성된 코드
df.explain("formatted") // 보기 쉬운 포맷
Spark SQL Shell
빠른 테스트:
spark-sql
> SELECT ...
대화형 쿼리와 EXPLAIN.
11. 흔한 함정과 교훈
함정 1: 너무 많은 작은 파일
**증상**: 작은 작업이 수 분 걸림.
**원인**: 수백만 개 작은 파일.
**해결**: `coalesce` 또는 `repartition`으로 병합.
함정 2: Skew
**증상**: 99% task는 빠르고 1%가 영원히.
**원인**: 한 key가 압도적으로 많음 (예: `user_id = null`).
**해결**:
1. AQE의 skew join 활성화.
2. Null key 필터.
3. Salting.
함정 3: Broadcast OOM
**증상**: Executor OOM 발생.
**원인**: Broadcast 테이블이 너무 큼.
**해결**:
- `spark.sql.autoBroadcastJoinThreshold` 낮춤.
- 명시적 `broadcast()` 제거.
- Sort-merge join으로.
함정 4: 너무 많은 Shuffle Partition
**증상**: 작업이 느리고 많은 작은 task.
**원인**: `spark.sql.shuffle.partitions` 너무 큼.
**해결**: AQE coalesce 활성화, 또는 수동 조정.
함정 5: Python UDF의 오버헤드
**증상**: PySpark 작업이 극도로 느림.
**원인**: Python UDF는 **JVM ↔ Python 직렬화** 왕복.
**해결**:
- SQL 내장 함수 사용.
- Pandas UDF (vectorized, Apache Arrow 사용).
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
return s * 2
일반 UDF보다 **10배 이상** 빠름.
함정 6: 잘못된 Partition 수
**증상**: Under-utilization 또는 OOM.
**원인**:
- 너무 적음: 큰 task, OOM.
- 너무 많음: 오버헤드.
**해결**: 각 partition 100~200 MB 원칙.
퀴즈로 복습하기
**A.**
**Lineage의 의미**: RDD는 데이터 자체가 아닌 **"어떻게 만들어졌는지의 기록"**을 유지한다.
val rdd1 = sc.textFile("hdfs://data.txt") // source
val rdd2 = rdd1.map(_.toUpperCase) // parent: rdd1
val rdd3 = rdd2.filter(_.startsWith("A")) // parent: rdd2
val rdd4 = rdd3.flatMap(_.split(",")) // parent: rdd3
각 RDD는 **부모 RDD와 변환 함수**를 기억한다. 이 관계를 이어가면 **원천 데이터 소스부터 현재까지의 전체 계보**가 된다.
**장애 복구 원리**:
**시나리오**: `rdd4.collect()` 실행 중 executor 하나가 죽음. 그 executor가 처리하던 rdd4의 partition 3이 사라짐.
**복구 과정**:
1. **손실 감지**: Driver가 task 실패 감지.
2. **Lineage 추적**: rdd4 partition 3 → rdd3 partition 3 → rdd2 partition 3 → rdd1 partition 3.
3. **재계산**: 다른 executor에서 원천부터 다시 계산.
- HDFS에서 원본 텍스트 partition 3 읽기.
- map 적용.
- filter 적용.
- flatMap 적용.
4. **완료**: 손실된 partition 복구.
**Key: Partition 단위 재계산**. 전체 RDD가 아닌 **손실된 partition만**. 다른 partition들은 영향 없음.
**Replication과의 비교**:
**Traditional fault tolerance (replication)**:
- 모든 데이터를 여러 노드에 복제.
- 네트워크 I/O 많음 (복제 시).
- 디스크 사용 2~3배.
- 장애 시 즉시 다른 복제본 사용.
**RDD lineage**:
- **Replication 없음**. 데이터 한 번만.
- 네트워크/디스크 절약.
- 장애 시 **재계산** (시간 걸림).
- 메모리 기반 연산과 궁합 좋음.
**왜 Spark의 선택은 lineage인가**:
Spark는 **반복 연산**(ML, 그래프)을 타겟으로 했다. 이런 워크로드는:
- 데이터가 메모리에 있어 빠름.
- Lineage 재계산도 (메모리니까) 빠름.
- Replication 비용이 반복적 쓰기로 누적.
**Lineage의 한계**:
**1. Lineage가 길면 재계산이 오래**:
rdd1 → rdd2 → rdd3 → ... → rdd100 → action
rdd99 partition 손실 시, rdd1부터 재계산. 수 분~시간 가능.
**해결: Checkpoint**:
rdd50.checkpoint()
- rdd50을 **디스크에 저장**.
- Lineage를 이 지점에서 "잘라냄".
- 이후 복구는 rdd50부터.
**2. Wide dependency의 문제**:
rdd1 (4 partitions) --shuffle--> rdd2 (4 partitions)
rdd2 partition 3 손실 → **rdd1의 모든 partition** 필요 (shuffle이라 재분배). 비용이 큼.
**해결**: Shuffle 결과를 영구 저장. Spark가 기본으로 함 (**shuffle files**가 로컬 디스크에 남음).
**3. 외부 시스템 부작용**:
rdd.foreach(x => saveToDatabase(x))
재계산 시 **중복 저장** 위험. Lineage는 순수 함수 가정.
**해결**: Idempotent 연산, 또는 정확히 한 번 semantic을 별도 구현.
**교훈**:
RDD lineage는 **"replication 없이 내결함성"**을 가능하게 했다. 이것이 Spark가 MapReduce 대비 **100배 빠를** 수 있었던 기반이다. 메모리 연산 + 경량 복구 = 빅데이터의 새 패러다임.
**현대 적용**:
- **RDD lineage** → DataFrame, Dataset 아래에서도 유지.
- **Checkpoint** → 장기 실행 job에서 중요.
- **Structured Streaming**에서도 비슷한 아이디어 (log-based recovery).
**분산 시스템 철학적 관점**:
Lineage는 **"상태를 저장하지 말고, 만드는 방법을 저장하라"** 는 접근. 이는 함수형 프로그래밍의 철학과 일치한다:
- Immutable: RDD는 변하지 않음.
- Referential transparency: 같은 입력 → 같은 출력 (재계산 가능).
- Composition: RDD들의 체인.
Spark는 **분산 컴퓨팅에 함수형 원칙을 적용**해서 새 영역을 열었다. 그 단순하고 우아한 아이디어가 10년 넘게 이 분야를 이끌고 있다.
**A.**
**Catalyst의 핵심 아이디어**: SQL/DataFrame 쿼리를 **tree** 로 표현하고, 수십~수백 개의 **rewrite rule**을 반복 적용해서 더 효율적인 tree로 변환한다.
**4단계 처리 과정**:
**1. Parsing → Unresolved Logical Plan**
SELECT name FROM users WHERE age > 20
↓
Project [name]
Filter (age > 20)
UnresolvedRelation("users")
**2. Analysis → Resolved Logical Plan**
Catalog 참조로:
- `users` → 실제 테이블 스키마.
- `name`, `age` → 컬럼 참조.
- 타입 검증.
Project [name#10]
Filter (age#11 > 20)
Relation[name#10, age#11, email#12] parquet
**3. Logical Optimization → Optimized Logical Plan**
이 단계가 진짜 마법. 많은 rule이 반복 적용된다:
**Rule 1: Projection Pushdown**
Project [name#10]
Filter (age#11 > 20)
Relation[name#10, age#11, email#12] ← email 불필요
↓
Project [name#10]
Filter (age#11 > 20)
Project [name#10, age#11] ← 필요한 컬럼만
Relation[name, age, email]
**이점**: Parquet reader가 name, age 컬럼만 읽음. email 스킵. I/O 절약.
**Rule 2: Filter Pushdown to Data Source**
Parquet/ORC는 **predicate pushdown**을 지원한다:
Filter (age > 20)
Relation(parquet)
↓
Relation(parquet) WITH FILTER (age > 20)
Parquet reader가 min/max 통계를 보고 **row group 전체를 스킵**. 수십 배 빠른 읽기.
**Rule 3: Constant Folding**
WHERE age > 18 + 2
↓
WHERE age > 20
매 행마다 `18+2`를 계산하지 않음.
**Rule 4: Simplification**
WHERE true AND x > 10
↓
WHERE x > 10
WHERE x > 10 OR false
↓
WHERE x > 10
**Rule 5: Join Reorder**
SELECT * FROM a JOIN b ON a.id = b.id JOIN c ON b.cid = c.id
WHERE a.country = 'KR'
CBO (Cost-Based Optimizer)가 통계를 보고 최적 순서 결정:
- `a`를 먼저 필터 (country='KR').
- 작아진 `a`와 `b` 조인.
- 결과와 `c` 조인.
Catalyst는 **지수 개의 조인 순서**를 탐색하고 가장 싼 것 선택.
**Rule 6: Subquery Unnesting**
SELECT * FROM orders
WHERE user_id IN (SELECT id FROM users WHERE country = 'KR')
↓
SELECT o.* FROM orders o
INNER JOIN (SELECT id FROM users WHERE country = 'KR') u
ON o.user_id = u.id
Semi-join으로 변환. 훨씬 빠름.
**Rule 7: Column Pruning Through Join**
SELECT a.name FROM a JOIN b ON a.id = b.aid
`b`에서는 `aid`만 필요 (조인 키). 다른 컬럼 스킵.
**Rule 8: Decimal 연산 최적화**
SELECT SUM(price * quantity) FROM sales
Decimal 곱셈은 비쌈. Catalyst는 overflow를 고려해서 **필요한 정밀도만** 사용.
**4. Physical Planning**
논리 계획 → 실행 연산자:
Project → org.apache.spark.sql.execution.ProjectExec
Filter → FilterExec
Join → SortMergeJoinExec (또는 BroadcastHashJoinExec 등)
Aggregate → HashAggregateExec
여러 물리 계획을 생성하고 **cost를 비교**해서 선택.
**예: Join 전략 선택**
SELECT * FROM big JOIN small ON big.id = small.id
Catalyst는 다음을 고려:
- `small` 크기 < `spark.sql.autoBroadcastJoinThreshold` (기본 10MB)?
- Yes: **BroadcastHashJoin** 선택 (small을 모든 executor에 복사, shuffle 없음).
- No: **SortMergeJoin** 선택 (양쪽 shuffle).
Broadcast join은 **수 배~수십 배** 빠르다.
**실전 효과**:
간단한 쿼리:
SELECT u.name, SUM(o.amount)
FROM users u JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
**최적화 전** (naive 실행):
1. users 전체 스캔.
2. orders 전체 스캔.
3. 전체 join.
4. filter (country='KR').
5. group by.
1 TB orders 스캔. 수십 분.
**최적화 후** (Catalyst):
1. users에 filter pushdown (country='KR' → 10% 데이터만).
2. Projection pushdown (id, name만 읽음).
3. orders도 필요 컬럼만 읽음 (user_id, amount).
4. Broadcast join (users가 충분히 작아서).
5. group by.
I/O 90% 절약. **10배 빠른 쿼리**.
**Catalyst의 확장성**:
Catalyst의 rule은 **사용자 정의** 가능:
object MyRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case Filter(p, child) => /* custom logic */
}
}
spark.experimental.extraOptimizations = Seq(MyRule)
이 확장성이 Databricks의 Delta Lake, Iceberg 같은 프로젝트의 기반.
**교훈**:
Catalyst는 **"규칙 기반 트리 변환"**이라는 단순한 아이디어로 거대한 성과를 냈다. 각 rule은 단순하지만 조합되면 **놀라운 최적화**.
전통 DB (PostgreSQL, MySQL)도 쿼리 optimizer가 있지만 Catalyst의 강점:
1. **Scala tree API**로 쉬운 rule 작성.
2. **Extensible**: 외부 rule 추가.
3. **DataFrame API와 통합**: SQL이 아닌 코드도 최적화.
결과: **기본 SQL을 작성해도 손으로 최적화한 것과 비슷한 성능**. 이것이 Spark가 데이터 엔지니어에게 사랑받는 이유다.
**A.**
**JVM 객체의 숨은 비용**:
평범한 Java 코드:
Integer age = 30;
String name = "Alice";
메모리 사용을 보면:
- `Integer`: 16 bytes header + 4 bytes value + 4 bytes padding = **24 bytes** (4 bytes 데이터에)
- `String "Alice"`:
- String 객체: ~40 bytes
- 내부 `char[]`: 16 bytes header + 10 bytes (5 chars) + padding = ~32 bytes
- 총: **~72 bytes** (5 chars 데이터에)
**실제 데이터 (int 4 + 5 chars = 9 bytes)가 96 bytes를 차지**. **10배 오버헤드**.
**빅데이터에선 이것이 재앙**:
1억 row에 5개 필드가 있다면:
- 실제 데이터: ~5 GB.
- JVM 객체로: **~50 GB**.
메모리 부족. GC pressure. 캐시 miss.
**Tungsten의 해결**:
Row를 **연속된 바이트 배열**에 저장:
Row(age=30, name="Alice", email="a@b.c")
Tungsten format:
[null_bitmap:8B][age:8B][name_ptr:8B][email_ptr:8B][name:"Alice"][email:"a@b.c"]
- **고정 크기 필드** (int, long, double): 인라인.
- **가변 크기 필드** (string, array): pointer + 뒷부분에 실제 데이터.
- **Null bitmap**: 각 필드의 null 여부.
- **Header 없음**: 순수 데이터만.
**결과**:
- 5개 필드 row: **~40 bytes** (JVM 객체 ~200 bytes에서).
- **5배 메모리 절약**.
**UnsafeRow**:
Spark의 구현 클래스:
public class UnsafeRow extends BaseMutableRow {
private Object baseObject; // byte[] 또는 null (off-heap)
private long baseOffset; // 시작 위치
private int numFields;
private int sizeInBytes;
}
**직접 메모리 접근** (`sun.misc.Unsafe`):
public long getLong(int ordinal) {
assertIndexIsValid(ordinal);
return Platform.getLong(baseObject, baseOffset + ordinal * 8L);
}
JVM을 거치지 않고 **포인터 산술**로 바로 메모리 접근. Unsafe는 이름대로 안전장치를 건너뛴다. JVM이 검증 안 함.
**효과 1: 메모리 절약**
1억 row, 10 필드 DataFrame:
- JVM object: ~30 GB.
- Tungsten: **~5 GB**.
**6배** 적은 메모리. 같은 클러스터에서 6배 큰 데이터 처리 가능.
**효과 2: GC pressure 제거**
JVM GC는 **많은 작은 객체**에서 느려진다. Young generation을 계속 순회해야.
Tungsten은 **적은 수의 큰 배열**만 사용:
- Off-heap: GC 대상 아예 아님.
- On-heap: GC 하더라도 객체 수가 적음.
**결과**: Full GC 빈도 대폭 감소. Pause time 대폭 감소.
**효과 3: Cache friendliness**
CPU cache는 **연속된 메모리**를 선호 (spatial locality). 각 cache line은 64 bytes.
JVM 객체:
Integer 1 @ address 0x1000
Integer 2 @ address 0x8000 ← 멀리 떨어짐 (heap 할당 랜덤)
Integer 3 @ address 0x4000
하나 읽을 때마다 **cache miss 가능성**. 수십 ~ 수백 cycles 낭비.
Tungsten:
[field1][field2][field3][field4]... 연속
한 번의 메모리 접근이 여러 필드를 **cache에 로드**. 다음 접근은 hit.
**성능**: **2~5배 향상** (메모리 집약적 워크로드).
**효과 4: Direct serialization**
UnsafeRow는 **이미 byte array**. 네트워크로 전송 시:
- Java 객체: Kryo/Java serialization 필요 (느림).
- UnsafeRow: **그대로 전송** (zero serialization).
Shuffle이 훨씬 빠름.
**효과 5: Vectorized 처리의 기반**
연속 메모리는 **SIMD (벡터화 명령어)** 친화적:
// 일반
for (int i = 0; i < n; i++) {
result[i] = a[i] + b[i];
}
// SIMD (4개 동시 처리)
for (int i = 0; i < n; i += 4) {
_mm_store_ps(result + i, _mm_add_ps(_mm_load_ps(a + i), _mm_load_ps(b + i)));
}
Tungsten의 연속 메모리가 SIMD의 기반. JVM은 자체적으로 SIMD를 잘 활용하지 못하지만, 데이터 레이아웃이 친화적이면 JIT이 인식.
**비용과 제약**:
**복잡도**: Tungsten은 **매우 낮은 레벨**이다. 버그가 있으면 JVM crash. Spark 팀이 조심스럽게 개발.
**디버깅 어려움**: JVM heap dump, profiler가 덜 유용.
**특정 연산의 오버헤드**: 가변 크기 필드(string) 수정이 복잡.
**Python 호환**: PySpark에서 UnsafeRow를 Python 객체로 변환할 때 비용.
**Arrow와의 협력**:
최근 Spark는 **Apache Arrow**와 통합:
- Arrow: 메모리 columnar 포맷.
- Python ↔ JVM 교환 시 Arrow 사용.
- Pandas UDF가 Arrow 덕분에 10배 빠름.
Tungsten과 Arrow는 **서로 보완**. Tungsten은 row-wise 처리, Arrow는 column-wise 교환.
**Spark 3.x의 Vectorized Execution**:
Spark 3.0+는 일부 연산자에 columnar 처리 도입:
- Parquet/ORC vectorized reader.
- Columnar aggregation.
- 4096 rows batch 단위.
Tungsten row format에서 columnar batch로 진화. DuckDB, ClickHouse의 방식을 따라감.
**교훈**:
Tungsten은 Spark의 **"underlying 재발명"**이다. 표면 API는 그대로지만 내부가 완전히 바뀌었다. 사용자는 변경 없이 2~10배 성능 향상.
이는 **"하드웨어를 존중하는 설계"**의 중요성을 보여준다:
- CPU cache가 싫어하는 것 (random access) 피하기.
- GC의 약점 (작은 객체) 회피.
- Memory bandwidth 아끼기.
JVM이 "추상화"를 제공하지만, 고성능이 필요하면 그 추상화를 **영리하게 우회**해야 한다. Tungsten은 "Java/Scala 속에서 C 수준 성능을 낸다"는 도전의 답이다.
모든 빅데이터 엔진이 결국 같은 방향으로 진화: **바이너리 포맷, 연속 메모리, 벡터화**. Tungsten은 Spark의 해답이었고, 같은 아이디어가 Flink, Presto, Snowflake에서도 발견된다. **수렴 진화**의 예다.
**A.**
**전통 Volcano 모델**:
1970년대부터 쓰인 query execution model. 각 연산자는 **iterator**:
interface QueryOperator {
Row next();
boolean hasNext();
}
**예시: `SELECT name FROM users WHERE age > 20`**
Project ← Filter ← Scan
실행:
Project.next() {
while (true) {
Row row = Filter.next();
if (row == null) return null;
return new Row(row.getString("name"));
}
}
Filter.next() {
while (true) {
Row row = Scan.next();
if (row == null) return null;
if (row.getInt("age") > 20) return row;
}
}
Scan.next() {
return disk.readNextRow();
}
각 연산자가 **독립 객체**. `next()`를 계속 호출.
**Volcano의 비용**:
1. **Virtual function calls**: `next()`는 인터페이스 메소드. JVM은 virtual dispatch 사용 → **inline 어려움**.
2. **Object allocation**: 매 row마다 새 Row 객체 생성 가능성. GC 부담.
3. **Branch prediction 실패**: 연산자마다 다른 코드 경로 실행. CPU pipeline 예측 실패.
4. **Cache miss**: 매 연산자의 코드가 다른 메모리 위치.
5. **Boxing**: Java의 경우 `Integer` 같은 wrapper 사용 → heap 할당.
**결과**: 단순 쿼리도 **Volcano 오버헤드만으로 2-5배 느림**.
**Whole-Stage Code Generation의 답**:
**아이디어**: "**여러 연산자를 하나의 함수로 병합**". 런타임에 Java 코드를 **생성**하고 컴파일.
**예시: 같은 쿼리**
**Volcano (의사 코드)**:
for each row from Scan:
call Filter.next(row)
call Project.next(filter_output)
output project_output
3개 iterator, 수많은 함수 호출.
**WSCG가 생성하는 코드**:
public class GeneratedIterator {
public Object[] next() {
while (scanner.hasNext()) {
InternalRow row = scanner.next();
int age = row.getInt(2); // age 컬럼
if (age > 20) { // filter inlined
UTF8String name = row.getUTF8String(1); // name 컬럼
return new Object[]{ name }; // project inlined
}
// 조건 불만족, 다음 row로
}
return null; // 끝
}
}
**핵심 차이**:
1. **단일 함수**: Scan, Filter, Project가 **한 함수의 내부**. 함수 호출 없음.
2. **Inline**: 컴파일러(JVM JIT)가 모든 것을 인라인. Virtual call 제거.
3. **직접 메모리 접근**: `row.getInt(2)`는 Tungsten의 `Unsafe.getInt(address)`.
4. **단순한 loop**: 하나의 while 루프. CPU pipeline 친화적.
**성능 향상**:
Databricks 벤치마크 (TPC-DS):
- Volcano: 100%.
- WSCG: **400% ~ 1000%**.
**4-10배 향상**. 마법이 아니라 오버헤드 제거.
**어떻게 작동하는가**:
1. **Spark의 최적화 단계**:
- Logical plan → Physical plan.
- 여러 연산자를 **stage**로 그룹화 (narrow dependency 내).
- 각 stage가 WSCG 대상.
2. **Code generation**:
- Physical plan 트리를 순회.
- 각 연산자가 자기 코드 조각 제공.
- 합쳐서 **Java source code** 생성.
3. **Compilation**:
- **Janino** (가벼운 Java compiler)로 컴파일.
- 바이트코드 생성.
- `ClassLoader`에 로드.
4. **Execution**:
- 컴파일된 클래스의 인스턴스 생성.
- 일반 Java 메소드처럼 호출.
- JVM JIT가 최적화 (hot path → native code).
**실전 예시**:
spark.range(1, 1000000)
.filter($"id" > 500000)
.select($"id" * 2)
.count()
EXPLAIN 결과:
*(1) Filter (id#0L > 500000)
+- *(1) Range (1, 1000000, step=1, splits=...)
`*(1)`이 WSCG stage 번호. Range, Filter, Project가 하나의 코드로 통합.
`explain("codegen")`으로 실제 생성된 코드 확인 가능:
public Object generate(Object[] references) {
return new GeneratedIteratorForCodegenStage1(references);
}
final class GeneratedIteratorForCodegenStage1 extends BufferedRowIterator {
// ... 수백 줄의 생성된 코드
}
**제약**:
1. **64KB 메소드 한계**: JVM의 단일 메소드가 64KB 초과 불가. 너무 많은 연산자 병합 시 분할.
2. **복잡한 연산자**: Window function, UDF 등은 codegen 어려움. Fall back to Volcano.
3. **디버깅 어려움**: 생성된 코드를 디버거로 추적 힘듦.
4. **컴파일 오버헤드**: 매 쿼리마다 컴파일. 작은 쿼리엔 역효과 가능.
**WSCG vs Vectorized**:
Spark의 WSCG는 **row-at-a-time + inline**. 또 다른 접근:
**Vectorized Execution** (ClickHouse, DuckDB, Presto 일부):
- Batch 단위 (1024 rows).
- SIMD 명령 활용.
- Columnar 처리.
장점: SIMD의 힘. 단점: 구현 복잡.
Spark는 두 접근을 **조합**:
- Parquet reader: vectorized (columnar).
- 연산자: WSCG (row-wise).
- 향후: 더 많은 vectorization 도입 중.
**Hyper / Umbra 비교**:
독일 TU Munich의 **HyPer/Umbra** 연구 시스템은 더 급진적:
- **LLVM IR로 컴파일**.
- 네이티브 코드 (JIT 없음).
- 성능이 C++ 손 작성 코드 수준.
Spark WSCG는 Java 바이트코드까지. JVM JIT에 의존. Umbra만큼은 아니지만 **훨씬 간단하고 이식성 있다**.
**Rough numbers**:
- Volcano: baseline.
- WSCG: 4-10x faster.
- Vectorized: 4-10x faster.
- LLVM-based: 10-20x faster.
Spark는 "충분히 빠르면서 구현 가능"의 균형을 찾았다.
**교훈**:
WSCG는 **전통 지혜의 재평가**다. Volcano 모델이 40년간 지배한 이유는 **우아함과 유연성**이지만, 현대 CPU 아키텍처 (deep pipeline, cache, SIMD)에선 **적합하지 않다**.
해결: "**연산자 추상화를 유지하면서도 런타임에 구체화**". 이것이 codegen의 본질.
이는 **"지연된 optimization"** 철학이다:
- 사용자는 high-level 코드 작성.
- 시스템이 물리적 최적화.
- 컴파일러의 전통적 역할을 런타임으로.
JIT 컴파일러가 수십 년간 해온 일이지만, 빅데이터에 적용한 것은 Spark가 처음 크게 성공시켰다. 이후 거의 모든 현대 데이터 엔진이 이 패턴을 따르고 있다.
현대 SQL 엔진의 성능은 **하드웨어 성능의 몇 % 를 사용하는가**의 경쟁이다. Volcano는 10%, WSCG는 40%, vectorized는 60%, LLVM-based는 80%. 각 진화가 이 수치를 밀어올린다.
**A.**
**Static optimization의 한계**:
전통 쿼리 optimizer (Catalyst 포함)는 **실행 전**에 최적화를 결정한다. 근거는:
- 통계 (`ANALYZE TABLE`).
- 데이터 소스 메타데이터.
- Schema.
**문제**: **통계가 부정확하거나 최신이 아닐 수 있다**:
1. 데이터가 변경됨 (ANALYZE 이후).
2. 통계 계산이 비쌈 (대용량).
3. 복잡한 조건의 selectivity 추정 어려움.
4. Join 결과 크기 예측 어려움.
**결과**: 잘못된 계획 선택 → 느린 쿼리.
**AQE의 기본 아이디어**:
"**Stage 경계에서 실제 통계를 수집**하고, **남은 계획을 재최적화**"
Spark는 이미 shuffle 단계에서 **중간 결과**가 디스크에 저장된다. 이 데이터의 **실제 크기**를 측정해서 남은 단계를 최적화할 수 있다.
**AQE의 세 가지 최적화**:
**1. Dynamically Coalescing Shuffle Partitions**
**문제**: `spark.sql.shuffle.partitions` 기본값 200. 실제 데이터 크기와 무관.
**시나리오**:
- 작은 table filter 결과: 100 MB.
- 200 partition = **각 500 KB**.
- 200개의 매우 작은 task. 오버헤드 비율 높음.
**AQE 해결**:
- Shuffle 완료 후 **실제 partition 크기 측정**.
- 작은 partition들을 **병합**.
- 예: 200 → **20** (각 ~5 MB).
- Task 수 감소, 오버헤드 감소, 성능 향상.
**설정**:
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB
**2. Dynamically Switching Join Strategies**
**문제**: Join 전략 선택 시 table 크기 추정이 부정확.
**시나리오**:
SELECT * FROM big JOIN medium ON big.id = medium.id
WHERE medium.status = 'active'
**계획 시 추정**: `medium` 크기 = 100 MB → SortMergeJoin 선택.
**실제 실행**: 필터 후 `medium.active` = **5 MB**. Broadcast가 훨씬 나았을 것.
**AQE 해결**:
- Medium 필터 결과를 shuffle.
- 실제 크기가 5 MB임을 확인.
- **BroadcastHashJoin으로 동적 전환**.
- 수 배 빠름.
**예시**:
Before AQE: 120 seconds (SortMergeJoin)
After AQE: 25 seconds (auto-switched to BroadcastHashJoin)
**설정**:
spark.sql.adaptive.autoBroadcastJoinThreshold=10MB
**3. Dynamically Optimizing Skew Joins**
**문제**: Data skew. 일부 key가 과도하게 많음.
**시나리오**:
SELECT * FROM orders JOIN users ON orders.user_id = users.id
- 대부분 user는 10~100 orders.
- 한 user (봇, 관리자)가 1,000,000 orders.
- 해당 partition이 **100배 크다**.
- 그 task가 **100배 오래 걸림**.
- 나머지 tasks는 빨리 끝나지만 **skewed task 기다림**.
**전통적 해결**: Salting (수동, 복잡).
**AQE 해결**:
1. Shuffle 후 각 partition 크기 확인.
2. **평균의 5배 이상** 크면 skewed로 판단.
3. Skewed partition을 **여러 sub-partition으로 분할**.
4. 각 sub-partition이 다른 task에서 처리.
5. 반대쪽 partition도 **복제** (broadcast 필요).
**예시**:
Before:
Task 1: 1 GB (skewed user) → 100 seconds
Task 2: 10 MB → 1 second
Task 3: 10 MB → 1 second
...
Total time: 100 seconds (Task 1 bound)
AQE divides Task 1 into 10 sub-tasks:
Task 1a: 100 MB → 10 seconds
Task 1b: 100 MB → 10 seconds
...
Task 2: 10 MB → 1 second
Total time: 10 seconds (10x faster)
**설정**:
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
**4. Dynamic Partition Pruning (DPP)** (AQE와 함께 발전)
Star schema의 fact + dimension join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
**전통**: `fact_sales` 전체 스캔 + join + filter.
**DPP**:
1. `dim_date`에서 year=2024인 id 먼저 추출.
2. 이 id들을 **fact_sales의 partition filter로 pushdown**.
3. `fact_sales` 스캔 시 **해당 partition만 로드**.
**효과**:
- 파티션된 fact_sales (year별)이면 **1 TB → 50 GB**.
- **20배 빠른 쿼리**.
**실측 효과**:
**Databricks TPC-DS 벤치마크**:
- AQE 비활성: 100% (baseline).
- AQE 활성: **130~200%**.
- 일부 쿼리: **3배+ 향상**.
- 평균: **1.8배**.
**거의 공짜**. 기존 코드 변경 없이.
**Spark 버전별**:
- **Spark 3.0**: AQE 도입 (기본 비활성).
- **Spark 3.2**: AQE 기본 활성화.
- **Spark 3.3+**: 개선 지속.
**내부 작동**:
AQE는 Spark의 **query execution을 stage 단위로** 실행하되, **stage 사이에 pause**한다:
1. Stage 1 실행.
2. Stage 1 완료 → shuffle 파일 생성.
3. **Shuffle 파일의 실제 통계 수집**.
4. Stage 2의 physical plan을 **re-optimize**.
5. Stage 2 실행.
6. 반복.
이 **stage 단위 통계 → 재최적화** 루프가 AQE의 핵심.
**한계와 함정**:
**1. Streaming에 제한적**: Structured Streaming은 AQE 덜 적용됨.
**2. 작은 쿼리엔 불필요**: Re-optimization 오버헤드가 작은 쿼리에선 역효과.
**3. 복잡한 쿼리 예측 어려움**: 매우 복잡한 쿼리는 여전히 수동 튜닝 필요.
**4. DPP와의 상호작용**: 복잡한 경우 예측 안 되는 계획 변경.
**실전 권장**:
기본 설정 (Spark 3.2+는 이미 기본)
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
파라미터 조정
spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB # 목표 partition 크기
spark.sql.adaptive.coalescePartitions.minPartitionSize=32MB # 최소
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 # skew 기준
**교훈**:
AQE는 **"optimization은 한 번에 끝나지 않는다"** 는 철학의 구현이다. 전통 optimizer는 **ahead-of-time** (실행 전 모든 결정), AQE는 **just-in-time** (필요 시 재최적화).
이는 **JIT 컴파일러**의 철학과 같다:
- 처음엔 inexpert 결정.
- 런타임 관찰.
- 최적화.
분산 데이터 처리에서 같은 원칙 적용. Spark가 선구자였고, 이후 Trino, Presto도 유사 기능 추가.
**"정적 vs 동적 최적화"**의 트레이드오프:
- **정적**: 단순, 예측 가능. 완전 최적은 아닐 수 있음.
- **동적**: 복잡, 오버헤드. 실제 데이터에 적응.
AQE는 이 둘 사이의 **실용적 타협**이다. 완벽한 동적 재계획은 아니지만, stage 경계에서 핵심 결정을 조정함으로써 대부분의 이득을 얻는다.
**미래**:
AQE는 계속 발전 중:
- **Multi-stage re-optimization**.
- **Cost model 개선**.
- **ML 기반 최적화** (실험적).
- **Streaming adaptation**.
Spark가 10년 넘게 데이터 처리 영역을 선도하는 이유는 **이런 지속적 혁신** 때문이다. "충분히 좋다"에 만족하지 않고, 매 버전마다 의미 있는 개선을 가져온다. AQE는 그 한 예이며, 앞으로도 더 나아질 것이다.
마치며: 데이터의 정교한 엔진
핵심 정리
1. **RDD**: 메모리 기반 분산 컬렉션. Lineage로 장애 복구.
2. **DataFrame**: 구조화된 API. Schema + optimizer.
3. **Catalyst**: Rule + Cost 기반 SQL 최적화.
4. **Tungsten**: Off-heap binary format. CPU 캐시 친화.
5. **WSCG**: 런타임 Java 코드 생성. Volcano 오버헤드 제거.
6. **Shuffle**: 분산의 핵심이자 병목. Sort shuffle 기본.
7. **AQE**: 런타임 통계로 재최적화. 30-200% 향상.
Spark가 우리에게 준 것
Spark는 **빅데이터 처리의 민주화**다. MapReduce 시대엔:
- Java로 map/reduce 작성.
- 복잡한 최적화 수동.
- 느린 반복.
Spark 이후:
- SQL 또는 간단한 DataFrame API.
- 자동 최적화 (Catalyst + Tungsten + AQE).
- 빠른 반복.
- 배치 + 스트림 + ML 통합.
**진입 장벽이 낮아지면서 더 많은 사람이 데이터를 다룰 수 있게** 되었다. 데이터 분석가도 SQL로 복잡한 분산 쿼리 실행. 데이터 과학자도 머신러닝 파이프라인을 PySpark로.
실전 권장
- **기본 설정 이해**: `spark.executor.memory`, `spark.sql.shuffle.partitions`, AQE.
- **Broadcast join 활용**: 작은 dimension 테이블.
- **Partition column 선택**: 쿼리 패턴에 맞게.
- **EXPLAIN 읽기**: 느린 쿼리의 첫 단계.
- **Spark UI 활용**: Task 분포, GC time 모니터.
- **적절한 file format**: Parquet/ORC 선호.
마지막 교훈
Spark의 성공은 **기술적 우수성** + **타이밍** + **생태계**의 합이다:
1. **기술**: RDD, Catalyst, Tungsten의 혁신.
2. **타이밍**: 메모리 가격 하락 + 빅데이터 수요 폭발.
3. **생태계**: Python/R 지원, ML, 스트리밍, SQL 모두 통합.
이 세 가지 중 하나라도 빠졌으면 Spark는 지금 위치에 없을 것이다. **기술은 필요하지만 충분하지 않다**는 교훈이다.
당신이 다음에 Spark job을 실행할 때, 아래에서 벌어지는 일을 기억하자:
- Catalyst가 수십 개의 rule로 쿼리 재작성.
- Tungsten이 바이너리 포맷으로 메모리 절약.
- WSCG가 Java 코드 생성 + 컴파일.
- AQE가 실행 중 통계로 재최적화.
- Shuffle이 데이터 재분배.
이 모든 것이 **수 ms~수 초** 안에 일어난다. 그리고 당신은 단지 `.filter().groupBy().count()`만 썼다. 이것이 현대 데이터 엔지니어링의 마법이다.
참고 자료
- [Apache Spark Documentation](https://spark.apache.org/docs/latest/)
- [Learning Spark, 2nd Edition (Jules Damji et al.)](https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/)
- [Spark SQL: Relational Data Processing (Armbrust et al., 2015)](https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf)
- [Resilient Distributed Datasets (Zaharia et al., 2012)](https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf)
- [Databricks Engineering Blog](https://www.databricks.com/blog/category/engineering)
- [Spark Internals by Reynold Xin](https://www.slideshare.net/rxin/)
- [Project Tungsten: Bringing Spark Closer to Bare Metal](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html)
- [Apache Spark as a Compiler (Databricks Blog)](https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html)
- [Adaptive Query Execution in Apache Spark](https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html)
- [High Performance Spark (Holden Karau, Rachel Warren)](https://www.oreilly.com/library/view/high-performance-spark/9781491943199/)
현재 단락 (1/1201)
2010년 UC Berkeley의 AMPLab에서 **Matei Zaharia**가 논문을 발표했다: "**Resilient Distributed Datasets: A Fault-...