- Published on
Apache Spark 내부 완전 가이드 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 심층 분석
- Authors

- Name
- Youngju Kim
- @fjvbn20031
들어가며: Spark는 어떻게 MapReduce를 쫓아냈는가
2010년, UC Berkeley에서
2010년 UC Berkeley의 AMPLab에서 Matei Zaharia가 논문을 발표했다: "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing".
당시 빅데이터의 왕은 Hadoop MapReduce였다. Spark는 두 가지 주장을 했다:
- 메모리 기반으로 100배 빠르다.
- API가 훨씬 단순하다.
처음엔 MapReduce 진영이 회의적이었다. 하지만 Spark는:
- 2013: Apache에 기증.
- 2014: Databricks 창업.
- 2016: Spark 2.0 (DataFrame, Catalyst, Tungsten).
- 2020: Spark 3.0 (Adaptive Query Execution).
- 현재: 빅데이터 처리의 사실상 표준.
Hadoop MapReduce는 사라졌다. Spark는 전 세계 수만 기업의 데이터 파이프라인의 중추가 되었다.
왜 이렇게 빨라졌는가
Spark의 성능 혁명은 세 가지 축에서 나왔다:
- RDD (Resilient Distributed Dataset): In-memory 연산.
- Catalyst Optimizer: 쿼리 최적화.
- Tungsten: CPU와 메모리 최적화.
이 세 가지가 합쳐져 MapReduce 대비 100배, 때로는 1000배 성능을 낸다.
이 글에서 다룰 것
- RDD의 탄생과 lineage.
- DataFrame/Dataset: 구조화된 데이터.
- Catalyst optimizer: 쿼리 재작성.
- Physical plan과 cost-based 최적화.
- Tungsten project: CPU 캐시 친화적 실행.
- Whole-stage code generation: JIT으로 코드 생성.
- Shuffle: 분산 집계의 비밀.
- Adaptive Query Execution: 런타임 최적화.
- 실전 튜닝.
왜 이 지식이 중요한가
- 데이터 엔지니어: Spark 튜닝의 80%는 내부 이해.
- DA/DS: 왜 이 쿼리가 느린지 설명.
- 플랫폼 엔지니어: 클러스터 최적화.
- 다른 시스템 학습: Dask, Ray, Flink가 Spark의 개념을 차용.
1. RDD: 모든 것의 시작
탄생 배경
2009년경 빅데이터 처리는 MapReduce가 지배했다. 문제는:
- 반복 알고리즘이 극도로 느림 (ML, 그래프).
- 각 Map-Reduce job이 디스크를 왕복.
- 인터랙티브 분석이 불가능 (매번 몇 분~시간).
Zaharia의 통찰: "메모리에서 데이터를 유지하면 되지 않을까?"
문제: 메모리는 휘발성. 노드 장애 시 어떻게?
해결: Lineage (계보). 데이터가 아닌 변환 연산의 기록을 유지. 장애 시 재계산.
RDD란
RDD (Resilient Distributed Dataset):
- Resilient: 장애에 견딤 (lineage로).
- Distributed: 클러스터에 분산.
- Dataset: 데이터 집합.
핵심 특성:
- Immutable: 한 번 만들어지면 변경 불가.
- Lazy: Transformation은 action이 불릴 때까지 실행 안 됨.
- Typed: Java/Scala의 타입 시스템 활용.
- Partitioned: 여러 worker에 분산.
Transformation vs Action
Transformation: 새 RDD 생성. Lazy.
val lines = sc.textFile("data.txt") // RDD[String]
val words = lines.flatMap(_.split(" ")) // 아직 실행 안 됨
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _) // 여전히 실행 안 됨
Action: 실제 계산 트리거. Eager.
reduced.collect() // 여기서 전체 체인이 실행됨
왜 Lazy인가
Lazy evaluation의 장점:
- 최적화 기회: 여러 transformation을 한 번에 처리. 예: map → filter → map을 하나의 pass로.
- 불필요한 계산 회피:
take(10)만 필요하면 10개만. - Pipeline: 중간 데이터를 메모리에 쌓지 않음.
Lineage와 Fault Tolerance
val rdd1 = sc.textFile("data.txt") // Dependency: HDFS
val rdd2 = rdd1.map(x => x.toUpperCase) // Dependency: rdd1
val rdd3 = rdd2.filter(x => x.startsWith("A")) // Dependency: rdd2
만약 rdd3를 계산 중에 한 partition이 노드 장애로 손실되면:
- Lineage 확인: rdd3 → rdd2 → rdd1 → file.
- 손실된 partition을 재계산.
- 다른 partition은 그대로.
장점: Replication 없이 내결함성. 디스크 I/O 없음.
단점: Lineage가 길면 재계산이 오래 걸림. Checkpoint로 해결 (중간 저장).
Narrow vs Wide Dependency
RDD 간 의존성은 두 종류:
Narrow dependency:
- 자식 partition이 부모의 소수 partition에 의존.
- 예: map, filter, union.
- 한 stage 내에서 pipeline 실행 가능.
- 재계산 시 영향 범위 작음.
Wide dependency (shuffle dependency):
- 자식 partition이 부모의 여러 partition에 의존.
- 예: groupByKey, reduceByKey, join.
- Shuffle 필요: 데이터 재분산.
- Stage 경계.
Narrow: Wide:
Parent Parent Parent Parent Parent
| | | / | \ |
▼ ▼ ▼ ▼ ▼ ▼ ▼
Child1 Child2 ↓ ↓ ↓ ↓ ↓
Shuffle 발생
↓ ↓ ↓
Child1 Child2 Child3
DAG와 Stage
Spark가 액션을 받으면:
- RDD graph를 DAG로 변환.
- Wide dependency를 경계로 stage 분할.
- 각 stage는 tasks로 분해 (partition 수 만큼).
- Stages를 순차적으로, task를 병렬로 실행.
Action: reduce()
↓
Logical DAG:
RDD1 → map → RDD2 → reduceByKey → RDD3 → collect
Stages:
Stage 1: textFile → map (narrow, pipeline)
[Shuffle]
Stage 2: reduceByKey → collect
RDD의 한계
RDD는 훌륭했지만 몇 가지 약점:
- Schema 없음: 그냥 Java/Scala 객체의 컬렉션.
- Optimizer 없음: 사용자가 직접 최적화.
- 느린 직렬화: Java serialization.
- 메모리 오버헤드: JVM 객체의 overhead.
Spark 2.0은 이를 해결하기 위해 DataFrame/Dataset + Catalyst + Tungsten을 도입했다.
2. DataFrame & Dataset: 구조화 혁명
DataFrame의 등장
Spark 1.3 (2015)에서 DataFrame 도입. SQL의 테이블처럼 구조화된 RDD:
val df = spark.read.json("people.json")
df.show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Bob |
// | 25|Alice|
// +---+----+
df.filter($"age" > 20).select("name").show()
RDD와 차이:
- Schema 있음: 컬럼과 타입 정보.
- SQL처럼 조작: 고수준 API.
- Optimizer 적용: Catalyst가 알아서 최적화.
- Tungsten 메모리 포맷: JVM 객체 아닌 바이너리.
Dataset (Spark 1.6+)
Dataset: DataFrame + 타입 안전성.
case class Person(name: String, age: Int)
val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)
장점:
- 컴파일 타임 타입 체크.
- Object API와 DataFrame 최적화 모두.
Scala/Java에만 존재. Python은 DataFrame만 사용 (Python은 정적 타입이 아니므로).
DataFrame = Dataset[Row]
사실 DataFrame은 Dataset[Row]의 alias. Row는 untyped.
// 같은 것
val df: DataFrame = spark.read.json(...)
val df: Dataset[Row] = spark.read.json(...)
성능 차이
단순 예시: 1억 행 필터링.
- RDD: 100초.
- DataFrame/Dataset: 15초.
6~7배 차이. 이유:
- Catalyst 최적화 (predicate pushdown 등).
- Tungsten 바이너리 포맷.
- Whole-stage codegen.
3. Catalyst Optimizer: 쿼리 재작성의 마법
Catalyst의 4단계
Catalyst는 SQL/DataFrame query를 4단계로 처리한다:
SQL or DataFrame
↓
1. Analysis (unresolved plan → resolved logical plan)
↓
2. Logical Optimization (rule-based)
↓
3. Physical Planning (여러 물리 계획 생성 + 비용 비교)
↓
4. Code Generation (Tungsten)
↓
RDD (실행)
1단계: Analysis
입력: Unresolved logical plan.
SQL: SELECT name FROM users WHERE age > 20
↓ parser
UnresolvedRelation("users")
→ UnresolvedAttribute("age")
→ UnresolvedAttribute("name")
Catalog를 참조해서 resolve:
- 테이블 이름 → 실제 데이터 소스.
- 컬럼 이름 → 컬럼 타입.
- 함수 → 실제 구현.
결과: Resolved Logical Plan.
2단계: Logical Optimization
Rule-based 최적화. 수십~수백 개의 rule을 반복 적용:
Constant Folding:
age * 2 + 3 → (미리 계산)
Predicate Pushdown:
SELECT * FROM (table JOIN other) WHERE table.x > 10
↓ pushdown
SELECT * FROM (SELECT * FROM table WHERE x > 10) JOIN other
필터를 조인 전으로 밀어 넣음. 중간 결과 크기 감소.
Projection Pushdown:
SELECT name FROM users
// users에 50개 컬럼 있어도 name만 읽음
Simplification:
WHERE true AND x > 10 → WHERE x > 10
WHERE false → never executes
Subquery Elimination:
SELECT * FROM t WHERE id IN (SELECT id FROM s)
↓
SELECT * FROM t JOIN s ON t.id = s.id
Logical Plan 예시
val df = spark.sql("""
SELECT u.name, COUNT(*) as orders
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
""")
df.explain(true)
Parsed Logical Plan:
'Project ['u.name, 'COUNT(1) AS orders]
+- 'Filter ('u.country = KR)
+- 'Join Inner, ('u.id = 'o.user_id)
:- 'UnresolvedRelation `users`
+- 'UnresolvedRelation `orders`
Analyzed Logical Plan:
Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Filter (country#12 = KR)
+- Join Inner, (id#9 = user_id#15)
:- Relation[id#9,name#10,age#11,country#12] parquet
+- Relation[user_id#15,total#16] parquet
Optimized Logical Plan:
Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Project [name#10]
+- Join Inner, (id#9 = user_id#15)
:- Project [id#9, name#10]
+- Filter (country#12 = KR)
+- Relation[...] parquet
+- Project [user_id#15]
+- Relation[...] parquet
변화:
- Filter가 users 바로 위로 이동.
- Projection이 각 relation 바로 위로.
- 불필요한 컬럼 제거.
3단계: Physical Planning
논리 계획을 실행 가능한 물리 연산으로 변환. Catalyst는 여러 물리 계획을 생성하고 비교.
Join 선택 예시:
- BroadcastHashJoin: 작은 테이블이면.
- SortMergeJoin: 큰 테이블들.
- ShuffledHashJoin: 중간 크기.
- BroadcastNestedLoop: cartesian이면.
Catalyst는 각 옵션의 cost를 추정하고 가장 저렴한 것 선택.
4단계: Code Generation
선택된 물리 계획을 Java 바이트코드로 변환. 다음 섹션에서 자세히.
Cost-Based Optimizer (CBO)
Spark 2.2+부터 통계 기반 최적화:
ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age
이후 Catalyst가:
- Table 크기, 행 수.
- 컬럼별 min/max/null count.
- 컬럼별 distinct count.
이 정보로 join 순서 최적화, selectivity 추정 등.
기본 RBO (rule-based optimizer)와 조합되어 더 나은 계획 생성.
4. Tungsten: CPU와 메모리의 재발명
Tungsten 프로젝트
Spark 1.5 (2015)에 시작. "Spark 성능을 하드웨어 한계까지".
핵심 목표:
- JVM 메모리 오버헤드 제거.
- CPU 캐시 활용.
- Vectorized 실행.
- 코드 생성으로 CPU 파이프라인 최적화.
JVM 객체의 문제
Java 객체의 오버헤드:
Integer i = 42;
int 값은 4바이트지만, Integer 객체는:
- Object header: ~16 바이트.
- Value field: 4 바이트.
- Padding: 4 바이트.
- 총 24 바이트.
6배 오버헤드. 수백만 객체가 있으면 메모리 낭비 심각.
String 더 심함:
String s = "hello";
// Object: 40+ bytes
// char[] 객체: 24 bytes + 2 * 5 = 34 bytes
// 총 60+ bytes
해결: Off-heap Binary Format
Tungsten은 데이터를 JVM 힙 밖의 바이너리 포맷에 저장:
Row: [age=30, name="Alice"]
Binary encoding (off-heap):
[bitmap][age:4bytes][name_offset:4bytes][name_length:4bytes]...[name_data]
- No JVM objects.
- No GC pressure.
- Cache-friendly layout.
- 직접 메모리 접근:
sun.misc.Unsafe.
UnsafeRow
Spark의 기본 row 표현:
public final class UnsafeRow extends MutableRow {
private Object baseObject; // byte[] or null (off-heap)
private long baseOffset;
private int sizeInBytes;
private int numFields;
// ...
}
각 필드 접근:
long getLong(int i) {
return Platform.getLong(baseObject, baseOffset + offsets[i]);
}
Direct memory access. JVM 객체 생성 없음.
Cache Friendliness
Tungsten 포맷은 연속 메모리:
Row 0: [col0][col1][col2]
Row 1: [col0][col1][col2]
Row 2: [col0][col1][col2]
특정 컬럼만 읽으면:
col0: 0, 12, 24, ... (stride: row size)
Spatial locality 나쁨. 그러나 일반 접근 패턴에선 여전히 빠름.
Vectorized reader (Parquet 등)는 columnar 포맷으로 직접 읽음 → 더 캐시 친화적.
메모리 관리
Tungsten은 자체 메모리 매니저:
- Execution memory: Shuffle, join, sort용.
- Storage memory: Cache (persist).
- 두 영역이 동적으로 서로 빌림.
- Off-heap + on-heap 지원.
Spark 설정:
spark.executor.memory = 8g
spark.memory.fraction = 0.6 # execution + storage
spark.memory.storageFraction = 0.5 # storage의 최소 보장
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g
성능 효과
Tungsten 전후 벤치마크:
- 메모리 사용: 30~70% 감소.
- 쿼리 속도: 2~5배 향상.
- GC pause: 크게 감소.
특히 aggregation, join 등 메모리 집약적 작업에서 효과 큼.
5. Whole-Stage Code Generation
전통 Volcano 모델의 문제
전통 SQL 엔진은 Volcano 모델 (iterator-based):
while (child.hasNext()) {
Row row = child.next();
if (predicate(row)) {
process(row);
}
}
각 연산자가 next()를 호출. 함수 호출 오버헤드:
- Virtual function calls: CPU 파이프라인 깨짐.
- Object allocation: row 객체 계속 생성.
- Cache misses: 매번 다른 코드 경로.
단순 쿼리도 연산자 수에 비례하는 오버헤드.
아이디어: 통합 코드 생성
"여러 연산자를 하나의 함수로 병합". 예:
SELECT name FROM users WHERE age > 20
Volcano:
Scan → Filter → Project
세 개의 iterator 객체.
Codegen:
// 런타임에 생성된 코드
while (scanner.hasNext()) {
Row row = scanner.next();
if (row.getInt("age") > 20) {
emit(row.getString("name"));
}
}
한 개의 루프. 함수 호출 없음. 컴파일러가 최적화.
Whole-Stage CodeGen
Spark 2.0 (2016)의 Whole-Stage Code Generation (WSCG):
- Stage 단위로 합병: Shuffle 경계까지 모두.
- Java 소스 코드 생성: 런타임에.
- Janino로 즉시 컴파일.
- 직접 실행: JVM이 JIT 최적화.
결과:
- 함수 호출 감소 (virtual call → inlined).
- 중간 객체 제거.
- CPU pipeline 활용.
- 2~10배 성능 향상.
예시: 생성된 코드
SELECT id, name FROM users WHERE age > 20 AND country = 'KR'
생성된 Java 코드 (간소화):
public class GeneratedIterator {
private scan_input_0; // Parquet reader
public boolean hasNext() { return scan_input_0.hasNext(); }
public UnsafeRow next() {
while (scan_input_0.hasNext()) {
UnsafeRow row = scan_input_0.next();
int age = row.getInt(2);
if (age > 20) {
UTF8String country = row.getUTF8String(3);
if (country.equals("KR")) {
// project id, name
UnsafeRow output = /* build row */;
return output;
}
}
}
return null;
}
}
모든 것이 인라인. 루프 하나. JIT이 바이트코드를 네이티브로 최적화.
Vectorized Execution과의 차이
Whole-stage codegen은 row-at-a-time 기반이다. 반면 vectorized execution은 batch-at-a-time:
while (batch = next_batch()) { // 1024 rows
for (row : batch) {
// process
}
}
ClickHouse, Snowflake, DuckDB가 vectorized.
Spark의 접근은 둘 다 활용:
- Columnar scan: Parquet, ORC를 vectorized로 읽음.
- Row-based processing: Codegen으로 각 row 처리.
- Spark 3.2+: 일부 연산자에 vectorized 지원 추가.
EXPLAIN으로 확인
df.explain("codegen")
출력:
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
...
실제 생성된 Java 코드를 볼 수 있다. 디버깅에 유용.
6. Shuffle: 분산 집계의 핵심
왜 Shuffle이 필요한가
Wide dependency (groupBy, join 등)는 데이터 재분배가 필요하다:
Stage 1 (4 partitions):
P0: [(a,1), (b,2), (c,3)]
P1: [(a,4), (d,5)]
P2: [(b,6), (c,7), (e,8)]
P3: [(d,9), (a,10)]
reduceByKey:
모든 'a'는 같은 partition으로
모든 'b'는 같은 partition으로
...
Stage 2 (4 partitions after shuffle):
P0: [(a, [1,4,10])] → [(a,15)]
P1: [(b, [2,6])] → [(b,8)]
P2: [(c, [3,7])] → [(c,10)]
P3: [(d, [5,9]), (e, [8])] → [(d,14),(e,8)]
Shuffle은 stage 사이의 데이터 이동. 네트워크와 디스크 I/O 집약적.
Shuffle 과정
Write 단계 (Map 측):
- 각 task가 로컬 파일에 결과 쓰기.
- 각 reduce task에 해당하는 partition으로 분류.
- 디스크에 저장 (나중에 reduce가 fetch).
Read 단계 (Reduce 측):
- 모든 map task로부터 자기 partition 데이터 fetch.
- 네트워크로 전송 (executor 간).
- 메모리에 모음.
- 처리 (sort, aggregate 등).
Shuffle의 비용
Shuffle은 Spark의 가장 비싼 연산:
- 디스크 I/O: write + read.
- 네트워크: 모든 executor 간 cross.
- Serialization: 객체 ↔ 바이트.
- Sort: 보통 필요.
규모:
- 작은 job: 수 MB.
- 큰 job: 수 TB.
1 TB shuffle = 많은 시간과 자원.
Shuffle 전략의 진화
1. Hash Shuffle (초기 Spark):
- 각 map task가 각 reduce partition별로 별도 파일.
- M × R 파일 (M=map tasks, R=reduce tasks).
- 100 × 200 = 20,000 파일.
- 파일 시스템 부담.
2. Sort Shuffle (1.1+, 기본):
- 각 map task가 하나의 파일에 정렬된 결과.
- 인덱스 파일로 partition 위치 표시.
- M 파일만.
- 훨씬 효율적.
3. Tungsten Sort Shuffle (1.5+):
- Tungsten의 binary format 활용.
- 캐시 친화적.
- 더 빠름.
Broadcast Join
작은 테이블을 조인할 때의 최적화:
val small = ... // 10 MB
val big = ... // 1 TB
val joined = big.join(broadcast(small), "key")
Broadcast:
- Driver가 small 테이블을 모든 executor에 복사.
- 각 executor가 big의 자기 partition과 로컬에서 조인.
- Shuffle 없음.
조건: 작은 테이블이 메모리에 들어가야. 기본 spark.sql.autoBroadcastJoinThreshold = 10 MB.
Shuffle join 대비: 수 배~수십 배 빠름.
Shuffle Partitioning
spark.sql.shuffle.partitions: shuffle 후 partition 수. 기본 200.
너무 작음:
- 큰 partition.
- 일부 task의 OOM.
- 병렬성 부족.
너무 큼:
- 많은 작은 파일.
- 스케줄링 오버헤드.
가이드: 각 partition이 100~200 MB가 되도록.
Shuffle 최적화 기법
1. 파일 수 줄이기:
df.coalesce(10).write.parquet(...)
작은 partition들을 병합.
2. Partitioning 조정:
df.repartition(50, $"key") // key 기반 재분할
3. Broadcast 강제:
val hint_df = big.join(broadcast(small), ...)
// 또는 SQL hint
// SELECT /*+ BROADCAST(small) */ ...
4. Skew 처리: 다음 섹션.
7. Adaptive Query Execution (AQE)
Spark 3.0의 킬러 기능
Adaptive Query Execution (AQE): 런타임에 수집한 통계를 바탕으로 쿼리 계획을 동적으로 재최적화.
spark.sql.adaptive.enabled = true # 3.2+부터 기본 true
주요 기능
1. Dynamically coalescing shuffle partitions:
전통적 문제:
spark.sql.shuffle.partitions = 200을 정해둠.- 작은 shuffle은 200 partition이 과도.
- 큰 shuffle은 부족.
AQE:
- Shuffle 후 실제 데이터 크기 확인.
- 작은 partition을 자동 병합.
- 예: 200 → 20 (각 partition이 충분한 크기).
2. Dynamically switching join strategies:
계획 시:
SortMergeJoin결정 (두 테이블 크기 추정).
실행 중:
- 실제 한 테이블이 예상보다 작은 것 판명.
- 동적으로 BroadcastHashJoin으로 전환.
3. Dynamically optimizing skew joins:
Skew 감지:
- 특정 key의 partition이 평균보다 훨씬 큼.
AQE:
- 큰 partition을 여러 sub-partition으로 분할.
- 각각 별도 task로 실행.
- 결과를 union.
실측 효과
Databricks 벤치마크 (TPC-DS):
- AQE 전: 100%.
- AQE 후: 130% 이상.
- 일부 쿼리: 3배+ 향상.
거의 공짜로 얻는 성능. AQE는 Spark 3.0의 가장 중요한 개선 중 하나.
8. Dynamic Partition Pruning
문제
Star schema의 fact와 dimension join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
전통:
fact_sales전체 스캔 (테라바이트 급).dim_date필터 (year=2024).- Join.
fact_sales는 date_id로 partitioned되어 있어도 전체 스캔. Filter가 dim에만 있어서.
AQE + DPP
Spark 3.0의 Dynamic Partition Pruning:
dim_date에서 year=2024인 id 먼저 수집.- 그 id들을 fact_sales의 partition filter로 pushdown.
fact_sales스캔 시 해당 partition만 로드.
효과:
- 수 TB → 수 GB.
- 10~100배 빠른 쿼리.
조건
- 파티션된 테이블.
- Dimension 필터가 highly selective.
- Spark 3.0+.
9. 실전 튜닝
메모리 설정
--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10
원칙:
- Executor당 cores 4~6: GC, I/O 균형.
- Memory overhead = max(384MB, 10% of memory).
- Instances: 클러스터 용량과 workload에 따라.
Shuffle 튜닝
--conf spark.sql.shuffle.partitions=200 # 기본, 조정
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true
Partition 수 계산:
- 총 데이터 / 목표 크기 (200 MB).
- 예: 1 TB → 5000 partitions.
Broadcast 임계값
--conf spark.sql.autoBroadcastJoinThreshold=100MB
기본 10 MB는 너무 작을 수 있음. 100 MB까지 올리면 더 많은 broadcast join.
주의: Executor 메모리 확인. Broadcast 테이블이 모든 executor에 복제되므로.
Cache 전략
df.cache() // MEMORY_AND_DISK (기본)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER) // 직렬화 (작음)
언제 cache:
- 여러 번 사용되는 DataFrame.
- 계산 비용 큰 DataFrame.
주의: 메모리 부족하면 disk로 spill. 재계산이 더 나을 수 있음.
파티셔닝
Partitioned write:
df.write.partitionBy("year", "month").parquet(...)
파일을 year/month별 디렉토리로 저장. 쿼리 시 partition pruning으로 빠른 읽기.
주의: 너무 많은 partition은 작은 파일 문제.
Skew 수동 처리
AQE가 부족하면 수동:
// Salting
val salted = df.withColumn("salted_key",
concat($"key", lit("_"), (rand() * 10).cast("int")))
// Join 후 다시 그룹화
10. 성능 디버깅
Spark UI
Spark Web UI가 가장 중요한 도구:
- Jobs: Action별 실행.
- Stages: Stage별 task 분포.
- SQL/DataFrame: 논리/물리 계획.
- Executors: 리소스 사용.
- Storage: Cache 상태.
주의할 Metric
Task duration 분포:
- Median과 Max 차이가 크면 skew.
- 99th percentile이 나쁘면 병목.
Shuffle read/write:
- 크기가 예상보다 크면 filter pushdown 실패.
- 작으면 효율적.
GC time:
- Task 시간의 10% 이상이면 GC 문제.
- Memory 증설 또는 parallelism 증가.
Explain 명령어
df.explain() // 물리 계획
df.explain(true) // 4단계 모두
df.explain("cost") // CBO 비용
df.explain("codegen") // 생성된 코드
df.explain("formatted") // 보기 쉬운 포맷
Spark SQL Shell
빠른 테스트:
spark-sql
> SELECT ...
대화형 쿼리와 EXPLAIN.
11. 흔한 함정과 교훈
함정 1: 너무 많은 작은 파일
증상: 작은 작업이 수 분 걸림.
원인: 수백만 개 작은 파일.
해결: coalesce 또는 repartition으로 병합.
함정 2: Skew
증상: 99% task는 빠르고 1%가 영원히.
원인: 한 key가 압도적으로 많음 (예: user_id = null).
해결:
- AQE의 skew join 활성화.
- Null key 필터.
- Salting.
함정 3: Broadcast OOM
증상: Executor OOM 발생.
원인: Broadcast 테이블이 너무 큼.
해결:
spark.sql.autoBroadcastJoinThreshold낮춤.- 명시적
broadcast()제거. - Sort-merge join으로.
함정 4: 너무 많은 Shuffle Partition
증상: 작업이 느리고 많은 작은 task.
원인: spark.sql.shuffle.partitions 너무 큼.
해결: AQE coalesce 활성화, 또는 수동 조정.
함정 5: Python UDF의 오버헤드
증상: PySpark 작업이 극도로 느림.
원인: Python UDF는 JVM ↔ Python 직렬화 왕복.
해결:
- SQL 내장 함수 사용.
- Pandas UDF (vectorized, Apache Arrow 사용).
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
return s * 2
일반 UDF보다 10배 이상 빠름.
함정 6: 잘못된 Partition 수
증상: Under-utilization 또는 OOM.
원인:
- 너무 적음: 큰 task, OOM.
- 너무 많음: 오버헤드.
해결: 각 partition 100~200 MB 원칙.
퀴즈로 복습하기
Q1. RDD의 "lineage"가 어떻게 장애 복구를 가능하게 하는가?
A.
Lineage의 의미: RDD는 데이터 자체가 아닌 **"어떻게 만들어졌는지의 기록"**을 유지한다.
val rdd1 = sc.textFile("hdfs://data.txt") // source
val rdd2 = rdd1.map(_.toUpperCase) // parent: rdd1
val rdd3 = rdd2.filter(_.startsWith("A")) // parent: rdd2
val rdd4 = rdd3.flatMap(_.split(",")) // parent: rdd3
각 RDD는 부모 RDD와 변환 함수를 기억한다. 이 관계를 이어가면 원천 데이터 소스부터 현재까지의 전체 계보가 된다.
장애 복구 원리:
시나리오: rdd4.collect() 실행 중 executor 하나가 죽음. 그 executor가 처리하던 rdd4의 partition 3이 사라짐.
복구 과정:
- 손실 감지: Driver가 task 실패 감지.
- Lineage 추적: rdd4 partition 3 → rdd3 partition 3 → rdd2 partition 3 → rdd1 partition 3.
- 재계산: 다른 executor에서 원천부터 다시 계산.
- HDFS에서 원본 텍스트 partition 3 읽기.
- map 적용.
- filter 적용.
- flatMap 적용.
- 완료: 손실된 partition 복구.
Key: Partition 단위 재계산. 전체 RDD가 아닌 손실된 partition만. 다른 partition들은 영향 없음.
Replication과의 비교:
Traditional fault tolerance (replication):
- 모든 데이터를 여러 노드에 복제.
- 네트워크 I/O 많음 (복제 시).
- 디스크 사용 2~3배.
- 장애 시 즉시 다른 복제본 사용.
RDD lineage:
- Replication 없음. 데이터 한 번만.
- 네트워크/디스크 절약.
- 장애 시 재계산 (시간 걸림).
- 메모리 기반 연산과 궁합 좋음.
왜 Spark의 선택은 lineage인가:
Spark는 반복 연산(ML, 그래프)을 타겟으로 했다. 이런 워크로드는:
- 데이터가 메모리에 있어 빠름.
- Lineage 재계산도 (메모리니까) 빠름.
- Replication 비용이 반복적 쓰기로 누적.
Lineage의 한계:
1. Lineage가 길면 재계산이 오래:
rdd1 → rdd2 → rdd3 → ... → rdd100 → action
rdd99 partition 손실 시, rdd1부터 재계산. 수 분~시간 가능.
해결: Checkpoint:
rdd50.checkpoint()
- rdd50을 디스크에 저장.
- Lineage를 이 지점에서 "잘라냄".
- 이후 복구는 rdd50부터.
2. Wide dependency의 문제:
rdd1 (4 partitions) --shuffle--> rdd2 (4 partitions)
rdd2 partition 3 손실 → rdd1의 모든 partition 필요 (shuffle이라 재분배). 비용이 큼.
해결: Shuffle 결과를 영구 저장. Spark가 기본으로 함 (shuffle files가 로컬 디스크에 남음).
3. 외부 시스템 부작용:
rdd.foreach(x => saveToDatabase(x))
재계산 시 중복 저장 위험. Lineage는 순수 함수 가정.
해결: Idempotent 연산, 또는 정확히 한 번 semantic을 별도 구현.
교훈:
RDD lineage는 **"replication 없이 내결함성"**을 가능하게 했다. 이것이 Spark가 MapReduce 대비 100배 빠를 수 있었던 기반이다. 메모리 연산 + 경량 복구 = 빅데이터의 새 패러다임.
현대 적용:
- RDD lineage → DataFrame, Dataset 아래에서도 유지.
- Checkpoint → 장기 실행 job에서 중요.
- Structured Streaming에서도 비슷한 아이디어 (log-based recovery).
분산 시스템 철학적 관점:
Lineage는 "상태를 저장하지 말고, 만드는 방법을 저장하라" 는 접근. 이는 함수형 프로그래밍의 철학과 일치한다:
- Immutable: RDD는 변하지 않음.
- Referential transparency: 같은 입력 → 같은 출력 (재계산 가능).
- Composition: RDD들의 체인.
Spark는 분산 컴퓨팅에 함수형 원칙을 적용해서 새 영역을 열었다. 그 단순하고 우아한 아이디어가 10년 넘게 이 분야를 이끌고 있다.
Q2. Catalyst Optimizer가 어떻게 쿼리를 "rewrite"하여 성능을 개선하는가?
A.
Catalyst의 핵심 아이디어: SQL/DataFrame 쿼리를 tree 로 표현하고, 수십~수백 개의 rewrite rule을 반복 적용해서 더 효율적인 tree로 변환한다.
4단계 처리 과정:
1. Parsing → Unresolved Logical Plan
SELECT name FROM users WHERE age > 20
↓
Project [name]
Filter (age > 20)
UnresolvedRelation("users")
2. Analysis → Resolved Logical Plan
Catalog 참조로:
users→ 실제 테이블 스키마.name,age→ 컬럼 참조.- 타입 검증.
Project [name#10]
Filter (age#11 > 20)
Relation[name#10, age#11, email#12] parquet
3. Logical Optimization → Optimized Logical Plan
이 단계가 진짜 마법. 많은 rule이 반복 적용된다:
Rule 1: Projection Pushdown
Project [name#10]
Filter (age#11 > 20)
Relation[name#10, age#11, email#12] ← email 불필요
↓
Project [name#10]
Filter (age#11 > 20)
Project [name#10, age#11] ← 필요한 컬럼만
Relation[name, age, email]
이점: Parquet reader가 name, age 컬럼만 읽음. email 스킵. I/O 절약.
Rule 2: Filter Pushdown to Data Source
Parquet/ORC는 predicate pushdown을 지원한다:
Filter (age > 20)
Relation(parquet)
↓
Relation(parquet) WITH FILTER (age > 20)
Parquet reader가 min/max 통계를 보고 row group 전체를 스킵. 수십 배 빠른 읽기.
Rule 3: Constant Folding
WHERE age > 18 + 2
↓
WHERE age > 20
매 행마다 18+2를 계산하지 않음.
Rule 4: Simplification
WHERE true AND x > 10
↓
WHERE x > 10
WHERE x > 10 OR false
↓
WHERE x > 10
Rule 5: Join Reorder
SELECT * FROM a JOIN b ON a.id = b.id JOIN c ON b.cid = c.id
WHERE a.country = 'KR'
CBO (Cost-Based Optimizer)가 통계를 보고 최적 순서 결정:
a를 먼저 필터 (country='KR').- 작아진
a와b조인. - 결과와
c조인.
Catalyst는 지수 개의 조인 순서를 탐색하고 가장 싼 것 선택.
Rule 6: Subquery Unnesting
SELECT * FROM orders
WHERE user_id IN (SELECT id FROM users WHERE country = 'KR')
↓
SELECT o.* FROM orders o
INNER JOIN (SELECT id FROM users WHERE country = 'KR') u
ON o.user_id = u.id
Semi-join으로 변환. 훨씬 빠름.
Rule 7: Column Pruning Through Join
SELECT a.name FROM a JOIN b ON a.id = b.aid
b에서는 aid만 필요 (조인 키). 다른 컬럼 스킵.
Rule 8: Decimal 연산 최적화
SELECT SUM(price * quantity) FROM sales
Decimal 곱셈은 비쌈. Catalyst는 overflow를 고려해서 필요한 정밀도만 사용.
4. Physical Planning
논리 계획 → 실행 연산자:
Project → org.apache.spark.sql.execution.ProjectExec
Filter → FilterExec
Join → SortMergeJoinExec (또는 BroadcastHashJoinExec 등)
Aggregate → HashAggregateExec
여러 물리 계획을 생성하고 cost를 비교해서 선택.
예: Join 전략 선택
SELECT * FROM big JOIN small ON big.id = small.id
Catalyst는 다음을 고려:
small크기 <spark.sql.autoBroadcastJoinThreshold(기본 10MB)?- Yes: BroadcastHashJoin 선택 (small을 모든 executor에 복사, shuffle 없음).
- No: SortMergeJoin 선택 (양쪽 shuffle).
Broadcast join은 수 배~수십 배 빠르다.
실전 효과:
간단한 쿼리:
SELECT u.name, SUM(o.amount)
FROM users u JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name
최적화 전 (naive 실행):
- users 전체 스캔.
- orders 전체 스캔.
- 전체 join.
- filter (country='KR').
- group by.
1 TB orders 스캔. 수십 분.
최적화 후 (Catalyst):
- users에 filter pushdown (country='KR' → 10% 데이터만).
- Projection pushdown (id, name만 읽음).
- orders도 필요 컬럼만 읽음 (user_id, amount).
- Broadcast join (users가 충분히 작아서).
- group by.
I/O 90% 절약. 10배 빠른 쿼리.
Catalyst의 확장성:
Catalyst의 rule은 사용자 정의 가능:
object MyRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case Filter(p, child) => /* custom logic */
}
}
spark.experimental.extraOptimizations = Seq(MyRule)
이 확장성이 Databricks의 Delta Lake, Iceberg 같은 프로젝트의 기반.
교훈:
Catalyst는 **"규칙 기반 트리 변환"**이라는 단순한 아이디어로 거대한 성과를 냈다. 각 rule은 단순하지만 조합되면 놀라운 최적화.
전통 DB (PostgreSQL, MySQL)도 쿼리 optimizer가 있지만 Catalyst의 강점:
- Scala tree API로 쉬운 rule 작성.
- Extensible: 외부 rule 추가.
- DataFrame API와 통합: SQL이 아닌 코드도 최적화.
결과: 기본 SQL을 작성해도 손으로 최적화한 것과 비슷한 성능. 이것이 Spark가 데이터 엔지니어에게 사랑받는 이유다.
Q3. Tungsten이 JVM 객체 대신 binary format을 쓰는 이유와 그 효과는?
A.
JVM 객체의 숨은 비용:
평범한 Java 코드:
Integer age = 30;
String name = "Alice";
메모리 사용을 보면:
Integer: 16 bytes header + 4 bytes value + 4 bytes padding = 24 bytes (4 bytes 데이터에)String "Alice":- String 객체: ~40 bytes
- 내부
char[]: 16 bytes header + 10 bytes (5 chars) + padding = ~32 bytes - 총: ~72 bytes (5 chars 데이터에)
실제 데이터 (int 4 + 5 chars = 9 bytes)가 96 bytes를 차지. 10배 오버헤드.
빅데이터에선 이것이 재앙:
1억 row에 5개 필드가 있다면:
- 실제 데이터: ~5 GB.
- JVM 객체로: ~50 GB.
메모리 부족. GC pressure. 캐시 miss.
Tungsten의 해결:
Row를 연속된 바이트 배열에 저장:
Row(age=30, name="Alice", email="a@b.c")
Tungsten format:
[null_bitmap:8B][age:8B][name_ptr:8B][email_ptr:8B][name:"Alice"][email:"a@b.c"]
- 고정 크기 필드 (int, long, double): 인라인.
- 가변 크기 필드 (string, array): pointer + 뒷부분에 실제 데이터.
- Null bitmap: 각 필드의 null 여부.
- Header 없음: 순수 데이터만.
결과:
- 5개 필드 row: ~40 bytes (JVM 객체 ~200 bytes에서).
- 5배 메모리 절약.
UnsafeRow:
Spark의 구현 클래스:
public class UnsafeRow extends BaseMutableRow {
private Object baseObject; // byte[] 또는 null (off-heap)
private long baseOffset; // 시작 위치
private int numFields;
private int sizeInBytes;
}
직접 메모리 접근 (sun.misc.Unsafe):
public long getLong(int ordinal) {
assertIndexIsValid(ordinal);
return Platform.getLong(baseObject, baseOffset + ordinal * 8L);
}
JVM을 거치지 않고 포인터 산술로 바로 메모리 접근. Unsafe는 이름대로 안전장치를 건너뛴다. JVM이 검증 안 함.
효과 1: 메모리 절약
1억 row, 10 필드 DataFrame:
- JVM object: ~30 GB.
- Tungsten: ~5 GB.
6배 적은 메모리. 같은 클러스터에서 6배 큰 데이터 처리 가능.
효과 2: GC pressure 제거
JVM GC는 많은 작은 객체에서 느려진다. Young generation을 계속 순회해야.
Tungsten은 적은 수의 큰 배열만 사용:
- Off-heap: GC 대상 아예 아님.
- On-heap: GC 하더라도 객체 수가 적음.
결과: Full GC 빈도 대폭 감소. Pause time 대폭 감소.
효과 3: Cache friendliness
CPU cache는 연속된 메모리를 선호 (spatial locality). 각 cache line은 64 bytes.
JVM 객체:
Integer 1 @ address 0x1000
Integer 2 @ address 0x8000 ← 멀리 떨어짐 (heap 할당 랜덤)
Integer 3 @ address 0x4000
하나 읽을 때마다 cache miss 가능성. 수십 ~ 수백 cycles 낭비.
Tungsten:
[field1][field2][field3][field4]... 연속
한 번의 메모리 접근이 여러 필드를 cache에 로드. 다음 접근은 hit.
성능: 2~5배 향상 (메모리 집약적 워크로드).
효과 4: Direct serialization
UnsafeRow는 이미 byte array. 네트워크로 전송 시:
- Java 객체: Kryo/Java serialization 필요 (느림).
- UnsafeRow: 그대로 전송 (zero serialization).
Shuffle이 훨씬 빠름.
효과 5: Vectorized 처리의 기반
연속 메모리는 SIMD (벡터화 명령어) 친화적:
// 일반
for (int i = 0; i < n; i++) {
result[i] = a[i] + b[i];
}
// SIMD (4개 동시 처리)
for (int i = 0; i < n; i += 4) {
_mm_store_ps(result + i, _mm_add_ps(_mm_load_ps(a + i), _mm_load_ps(b + i)));
}
Tungsten의 연속 메모리가 SIMD의 기반. JVM은 자체적으로 SIMD를 잘 활용하지 못하지만, 데이터 레이아웃이 친화적이면 JIT이 인식.
비용과 제약:
복잡도: Tungsten은 매우 낮은 레벨이다. 버그가 있으면 JVM crash. Spark 팀이 조심스럽게 개발.
디버깅 어려움: JVM heap dump, profiler가 덜 유용.
특정 연산의 오버헤드: 가변 크기 필드(string) 수정이 복잡.
Python 호환: PySpark에서 UnsafeRow를 Python 객체로 변환할 때 비용.
Arrow와의 협력:
최근 Spark는 Apache Arrow와 통합:
- Arrow: 메모리 columnar 포맷.
- Python ↔ JVM 교환 시 Arrow 사용.
- Pandas UDF가 Arrow 덕분에 10배 빠름.
Tungsten과 Arrow는 서로 보완. Tungsten은 row-wise 처리, Arrow는 column-wise 교환.
Spark 3.x의 Vectorized Execution:
Spark 3.0+는 일부 연산자에 columnar 처리 도입:
- Parquet/ORC vectorized reader.
- Columnar aggregation.
- 4096 rows batch 단위.
Tungsten row format에서 columnar batch로 진화. DuckDB, ClickHouse의 방식을 따라감.
교훈:
Tungsten은 Spark의 **"underlying 재발명"**이다. 표면 API는 그대로지만 내부가 완전히 바뀌었다. 사용자는 변경 없이 2~10배 성능 향상.
이는 **"하드웨어를 존중하는 설계"**의 중요성을 보여준다:
- CPU cache가 싫어하는 것 (random access) 피하기.
- GC의 약점 (작은 객체) 회피.
- Memory bandwidth 아끼기.
JVM이 "추상화"를 제공하지만, 고성능이 필요하면 그 추상화를 영리하게 우회해야 한다. Tungsten은 "Java/Scala 속에서 C 수준 성능을 낸다"는 도전의 답이다.
모든 빅데이터 엔진이 결국 같은 방향으로 진화: 바이너리 포맷, 연속 메모리, 벡터화. Tungsten은 Spark의 해답이었고, 같은 아이디어가 Flink, Presto, Snowflake에서도 발견된다. 수렴 진화의 예다.
Q4. Whole-Stage Code Generation이 어떻게 기존 Volcano 모델보다 빠른가?
A.
전통 Volcano 모델:
1970년대부터 쓰인 query execution model. 각 연산자는 iterator:
interface QueryOperator {
Row next();
boolean hasNext();
}
예시: SELECT name FROM users WHERE age > 20
Project ← Filter ← Scan
실행:
Project.next() {
while (true) {
Row row = Filter.next();
if (row == null) return null;
return new Row(row.getString("name"));
}
}
Filter.next() {
while (true) {
Row row = Scan.next();
if (row == null) return null;
if (row.getInt("age") > 20) return row;
}
}
Scan.next() {
return disk.readNextRow();
}
각 연산자가 독립 객체. next()를 계속 호출.
Volcano의 비용:
-
Virtual function calls:
next()는 인터페이스 메소드. JVM은 virtual dispatch 사용 → inline 어려움. -
Object allocation: 매 row마다 새 Row 객체 생성 가능성. GC 부담.
-
Branch prediction 실패: 연산자마다 다른 코드 경로 실행. CPU pipeline 예측 실패.
-
Cache miss: 매 연산자의 코드가 다른 메모리 위치.
-
Boxing: Java의 경우
Integer같은 wrapper 사용 → heap 할당.
결과: 단순 쿼리도 Volcano 오버헤드만으로 2-5배 느림.
Whole-Stage Code Generation의 답:
아이디어: "여러 연산자를 하나의 함수로 병합". 런타임에 Java 코드를 생성하고 컴파일.
예시: 같은 쿼리
Volcano (의사 코드):
for each row from Scan:
call Filter.next(row)
call Project.next(filter_output)
output project_output
3개 iterator, 수많은 함수 호출.
WSCG가 생성하는 코드:
public class GeneratedIterator {
public Object[] next() {
while (scanner.hasNext()) {
InternalRow row = scanner.next();
int age = row.getInt(2); // age 컬럼
if (age > 20) { // filter inlined
UTF8String name = row.getUTF8String(1); // name 컬럼
return new Object[]{ name }; // project inlined
}
// 조건 불만족, 다음 row로
}
return null; // 끝
}
}
핵심 차이:
-
단일 함수: Scan, Filter, Project가 한 함수의 내부. 함수 호출 없음.
-
Inline: 컴파일러(JVM JIT)가 모든 것을 인라인. Virtual call 제거.
-
직접 메모리 접근:
row.getInt(2)는 Tungsten의Unsafe.getInt(address). -
단순한 loop: 하나의 while 루프. CPU pipeline 친화적.
성능 향상:
Databricks 벤치마크 (TPC-DS):
- Volcano: 100%.
- WSCG: 400% ~ 1000%.
4-10배 향상. 마법이 아니라 오버헤드 제거.
어떻게 작동하는가:
-
Spark의 최적화 단계:
- Logical plan → Physical plan.
- 여러 연산자를 stage로 그룹화 (narrow dependency 내).
- 각 stage가 WSCG 대상.
-
Code generation:
- Physical plan 트리를 순회.
- 각 연산자가 자기 코드 조각 제공.
- 합쳐서 Java source code 생성.
-
Compilation:
- Janino (가벼운 Java compiler)로 컴파일.
- 바이트코드 생성.
ClassLoader에 로드.
-
Execution:
- 컴파일된 클래스의 인스턴스 생성.
- 일반 Java 메소드처럼 호출.
- JVM JIT가 최적화 (hot path → native code).
실전 예시:
spark.range(1, 1000000)
.filter($"id" > 500000)
.select($"id" * 2)
.count()
EXPLAIN 결과:
*(1) Filter (id#0L > 500000)
+- *(1) Range (1, 1000000, step=1, splits=...)
*(1)이 WSCG stage 번호. Range, Filter, Project가 하나의 코드로 통합.
explain("codegen")으로 실제 생성된 코드 확인 가능:
public Object generate(Object[] references) {
return new GeneratedIteratorForCodegenStage1(references);
}
final class GeneratedIteratorForCodegenStage1 extends BufferedRowIterator {
// ... 수백 줄의 생성된 코드
}
제약:
-
64KB 메소드 한계: JVM의 단일 메소드가 64KB 초과 불가. 너무 많은 연산자 병합 시 분할.
-
복잡한 연산자: Window function, UDF 등은 codegen 어려움. Fall back to Volcano.
-
디버깅 어려움: 생성된 코드를 디버거로 추적 힘듦.
-
컴파일 오버헤드: 매 쿼리마다 컴파일. 작은 쿼리엔 역효과 가능.
WSCG vs Vectorized:
Spark의 WSCG는 row-at-a-time + inline. 또 다른 접근:
Vectorized Execution (ClickHouse, DuckDB, Presto 일부):
- Batch 단위 (1024 rows).
- SIMD 명령 활용.
- Columnar 처리.
장점: SIMD의 힘. 단점: 구현 복잡.
Spark는 두 접근을 조합:
- Parquet reader: vectorized (columnar).
- 연산자: WSCG (row-wise).
- 향후: 더 많은 vectorization 도입 중.
Hyper / Umbra 비교:
독일 TU Munich의 HyPer/Umbra 연구 시스템은 더 급진적:
- LLVM IR로 컴파일.
- 네이티브 코드 (JIT 없음).
- 성능이 C++ 손 작성 코드 수준.
Spark WSCG는 Java 바이트코드까지. JVM JIT에 의존. Umbra만큼은 아니지만 훨씬 간단하고 이식성 있다.
Rough numbers:
- Volcano: baseline.
- WSCG: 4-10x faster.
- Vectorized: 4-10x faster.
- LLVM-based: 10-20x faster.
Spark는 "충분히 빠르면서 구현 가능"의 균형을 찾았다.
교훈:
WSCG는 전통 지혜의 재평가다. Volcano 모델이 40년간 지배한 이유는 우아함과 유연성이지만, 현대 CPU 아키텍처 (deep pipeline, cache, SIMD)에선 적합하지 않다.
해결: "연산자 추상화를 유지하면서도 런타임에 구체화". 이것이 codegen의 본질.
이는 "지연된 optimization" 철학이다:
- 사용자는 high-level 코드 작성.
- 시스템이 물리적 최적화.
- 컴파일러의 전통적 역할을 런타임으로.
JIT 컴파일러가 수십 년간 해온 일이지만, 빅데이터에 적용한 것은 Spark가 처음 크게 성공시켰다. 이후 거의 모든 현대 데이터 엔진이 이 패턴을 따르고 있다.
현대 SQL 엔진의 성능은 하드웨어 성능의 몇 % 를 사용하는가의 경쟁이다. Volcano는 10%, WSCG는 40%, vectorized는 60%, LLVM-based는 80%. 각 진화가 이 수치를 밀어올린다.
Q5. Adaptive Query Execution (AQE)이 실제 실행 통계를 어떻게 활용하는가?
A.
Static optimization의 한계:
전통 쿼리 optimizer (Catalyst 포함)는 실행 전에 최적화를 결정한다. 근거는:
- 통계 (
ANALYZE TABLE). - 데이터 소스 메타데이터.
- Schema.
문제: 통계가 부정확하거나 최신이 아닐 수 있다:
- 데이터가 변경됨 (ANALYZE 이후).
- 통계 계산이 비쌈 (대용량).
- 복잡한 조건의 selectivity 추정 어려움.
- Join 결과 크기 예측 어려움.
결과: 잘못된 계획 선택 → 느린 쿼리.
AQE의 기본 아이디어:
"Stage 경계에서 실제 통계를 수집하고, 남은 계획을 재최적화"
Spark는 이미 shuffle 단계에서 중간 결과가 디스크에 저장된다. 이 데이터의 실제 크기를 측정해서 남은 단계를 최적화할 수 있다.
AQE의 세 가지 최적화:
1. Dynamically Coalescing Shuffle Partitions
문제: spark.sql.shuffle.partitions 기본값 200. 실제 데이터 크기와 무관.
시나리오:
- 작은 table filter 결과: 100 MB.
- 200 partition = 각 500 KB.
- 200개의 매우 작은 task. 오버헤드 비율 높음.
AQE 해결:
- Shuffle 완료 후 실제 partition 크기 측정.
- 작은 partition들을 병합.
- 예: 200 → 20 (각 ~5 MB).
- Task 수 감소, 오버헤드 감소, 성능 향상.
설정:
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB
2. Dynamically Switching Join Strategies
문제: Join 전략 선택 시 table 크기 추정이 부정확.
시나리오:
SELECT * FROM big JOIN medium ON big.id = medium.id
WHERE medium.status = 'active'
계획 시 추정: medium 크기 = 100 MB → SortMergeJoin 선택.
실제 실행: 필터 후 medium.active = 5 MB. Broadcast가 훨씬 나았을 것.
AQE 해결:
- Medium 필터 결과를 shuffle.
- 실제 크기가 5 MB임을 확인.
- BroadcastHashJoin으로 동적 전환.
- 수 배 빠름.
예시:
Before AQE: 120 seconds (SortMergeJoin)
After AQE: 25 seconds (auto-switched to BroadcastHashJoin)
설정:
spark.sql.adaptive.autoBroadcastJoinThreshold=10MB
3. Dynamically Optimizing Skew Joins
문제: Data skew. 일부 key가 과도하게 많음.
시나리오:
SELECT * FROM orders JOIN users ON orders.user_id = users.id
- 대부분 user는 10~100 orders.
- 한 user (봇, 관리자)가 1,000,000 orders.
- 해당 partition이 100배 크다.
- 그 task가 100배 오래 걸림.
- 나머지 tasks는 빨리 끝나지만 skewed task 기다림.
전통적 해결: Salting (수동, 복잡).
AQE 해결:
- Shuffle 후 각 partition 크기 확인.
- 평균의 5배 이상 크면 skewed로 판단.
- Skewed partition을 여러 sub-partition으로 분할.
- 각 sub-partition이 다른 task에서 처리.
- 반대쪽 partition도 복제 (broadcast 필요).
예시:
Before:
Task 1: 1 GB (skewed user) → 100 seconds
Task 2: 10 MB → 1 second
Task 3: 10 MB → 1 second
...
Total time: 100 seconds (Task 1 bound)
AQE divides Task 1 into 10 sub-tasks:
Task 1a: 100 MB → 10 seconds
Task 1b: 100 MB → 10 seconds
...
Task 2: 10 MB → 1 second
Total time: 10 seconds (10x faster)
설정:
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
4. Dynamic Partition Pruning (DPP) (AQE와 함께 발전)
Star schema의 fact + dimension join:
SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024
전통: fact_sales 전체 스캔 + join + filter.
DPP:
dim_date에서 year=2024인 id 먼저 추출.- 이 id들을 fact_sales의 partition filter로 pushdown.
fact_sales스캔 시 해당 partition만 로드.
효과:
- 파티션된 fact_sales (year별)이면 1 TB → 50 GB.
- 20배 빠른 쿼리.
실측 효과:
Databricks TPC-DS 벤치마크:
- AQE 비활성: 100% (baseline).
- AQE 활성: 130~200%.
- 일부 쿼리: 3배+ 향상.
- 평균: 1.8배.
거의 공짜. 기존 코드 변경 없이.
Spark 버전별:
- Spark 3.0: AQE 도입 (기본 비활성).
- Spark 3.2: AQE 기본 활성화.
- Spark 3.3+: 개선 지속.
내부 작동:
AQE는 Spark의 query execution을 stage 단위로 실행하되, stage 사이에 pause한다:
- Stage 1 실행.
- Stage 1 완료 → shuffle 파일 생성.
- Shuffle 파일의 실제 통계 수집.
- Stage 2의 physical plan을 re-optimize.
- Stage 2 실행.
- 반복.
이 stage 단위 통계 → 재최적화 루프가 AQE의 핵심.
한계와 함정:
1. Streaming에 제한적: Structured Streaming은 AQE 덜 적용됨.
2. 작은 쿼리엔 불필요: Re-optimization 오버헤드가 작은 쿼리에선 역효과.
3. 복잡한 쿼리 예측 어려움: 매우 복잡한 쿼리는 여전히 수동 튜닝 필요.
4. DPP와의 상호작용: 복잡한 경우 예측 안 되는 계획 변경.
실전 권장:
# 기본 설정 (Spark 3.2+는 이미 기본)
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
# 파라미터 조정
spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB # 목표 partition 크기
spark.sql.adaptive.coalescePartitions.minPartitionSize=32MB # 최소
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 # skew 기준
교훈:
AQE는 "optimization은 한 번에 끝나지 않는다" 는 철학의 구현이다. 전통 optimizer는 ahead-of-time (실행 전 모든 결정), AQE는 just-in-time (필요 시 재최적화).
이는 JIT 컴파일러의 철학과 같다:
- 처음엔 inexpert 결정.
- 런타임 관찰.
- 최적화.
분산 데이터 처리에서 같은 원칙 적용. Spark가 선구자였고, 이후 Trino, Presto도 유사 기능 추가.
**"정적 vs 동적 최적화"**의 트레이드오프:
- 정적: 단순, 예측 가능. 완전 최적은 아닐 수 있음.
- 동적: 복잡, 오버헤드. 실제 데이터에 적응.
AQE는 이 둘 사이의 실용적 타협이다. 완벽한 동적 재계획은 아니지만, stage 경계에서 핵심 결정을 조정함으로써 대부분의 이득을 얻는다.
미래:
AQE는 계속 발전 중:
- Multi-stage re-optimization.
- Cost model 개선.
- ML 기반 최적화 (실험적).
- Streaming adaptation.
Spark가 10년 넘게 데이터 처리 영역을 선도하는 이유는 이런 지속적 혁신 때문이다. "충분히 좋다"에 만족하지 않고, 매 버전마다 의미 있는 개선을 가져온다. AQE는 그 한 예이며, 앞으로도 더 나아질 것이다.
마치며: 데이터의 정교한 엔진
핵심 정리
- RDD: 메모리 기반 분산 컬렉션. Lineage로 장애 복구.
- DataFrame: 구조화된 API. Schema + optimizer.
- Catalyst: Rule + Cost 기반 SQL 최적화.
- Tungsten: Off-heap binary format. CPU 캐시 친화.
- WSCG: 런타임 Java 코드 생성. Volcano 오버헤드 제거.
- Shuffle: 분산의 핵심이자 병목. Sort shuffle 기본.
- AQE: 런타임 통계로 재최적화. 30-200% 향상.
Spark가 우리에게 준 것
Spark는 빅데이터 처리의 민주화다. MapReduce 시대엔:
- Java로 map/reduce 작성.
- 복잡한 최적화 수동.
- 느린 반복.
Spark 이후:
- SQL 또는 간단한 DataFrame API.
- 자동 최적화 (Catalyst + Tungsten + AQE).
- 빠른 반복.
- 배치 + 스트림 + ML 통합.
진입 장벽이 낮아지면서 더 많은 사람이 데이터를 다룰 수 있게 되었다. 데이터 분석가도 SQL로 복잡한 분산 쿼리 실행. 데이터 과학자도 머신러닝 파이프라인을 PySpark로.
실전 권장
- 기본 설정 이해:
spark.executor.memory,spark.sql.shuffle.partitions, AQE. - Broadcast join 활용: 작은 dimension 테이블.
- Partition column 선택: 쿼리 패턴에 맞게.
- EXPLAIN 읽기: 느린 쿼리의 첫 단계.
- Spark UI 활용: Task 분포, GC time 모니터.
- 적절한 file format: Parquet/ORC 선호.
마지막 교훈
Spark의 성공은 기술적 우수성 + 타이밍 + 생태계의 합이다:
- 기술: RDD, Catalyst, Tungsten의 혁신.
- 타이밍: 메모리 가격 하락 + 빅데이터 수요 폭발.
- 생태계: Python/R 지원, ML, 스트리밍, SQL 모두 통합.
이 세 가지 중 하나라도 빠졌으면 Spark는 지금 위치에 없을 것이다. 기술은 필요하지만 충분하지 않다는 교훈이다.
당신이 다음에 Spark job을 실행할 때, 아래에서 벌어지는 일을 기억하자:
- Catalyst가 수십 개의 rule로 쿼리 재작성.
- Tungsten이 바이너리 포맷으로 메모리 절약.
- WSCG가 Java 코드 생성 + 컴파일.
- AQE가 실행 중 통계로 재최적화.
- Shuffle이 데이터 재분배.
이 모든 것이 수 ms~수 초 안에 일어난다. 그리고 당신은 단지 .filter().groupBy().count()만 썼다. 이것이 현대 데이터 엔지니어링의 마법이다.
참고 자료
- Apache Spark Documentation
- Learning Spark, 2nd Edition (Jules Damji et al.)
- Spark SQL: Relational Data Processing (Armbrust et al., 2015)
- Resilient Distributed Datasets (Zaharia et al., 2012)
- Databricks Engineering Blog
- Spark Internals by Reynold Xin
- Project Tungsten: Bringing Spark Closer to Bare Metal
- Apache Spark as a Compiler (Databricks Blog)
- Adaptive Query Execution in Apache Spark
- High Performance Spark (Holden Karau, Rachel Warren)