Skip to content

필사 모드: Apache Spark 내부 완전 가이드 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 심층 분석

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며: Spark는 어떻게 MapReduce를 쫓아냈는가

2010년, UC Berkeley에서

2010년 UC Berkeley의 AMPLab에서 **Matei Zaharia**가 논문을 발표했다: "**Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing**".

당시 빅데이터의 왕은 **Hadoop MapReduce**였다. Spark는 두 가지 주장을 했다:

1. **메모리 기반으로 100배 빠르다**.

2. **API가 훨씬 단순하다**.

처음엔 MapReduce 진영이 회의적이었다. 하지만 Spark는:

- **2013**: Apache에 기증.

- **2014**: Databricks 창업.

- **2016**: Spark 2.0 (DataFrame, Catalyst, Tungsten).

- **2020**: Spark 3.0 (Adaptive Query Execution).

- **현재**: 빅데이터 처리의 **사실상 표준**.

Hadoop MapReduce는 사라졌다. Spark는 전 세계 수만 기업의 데이터 파이프라인의 중추가 되었다.

왜 이렇게 빨라졌는가

Spark의 성능 혁명은 **세 가지 축**에서 나왔다:

1. **RDD (Resilient Distributed Dataset)**: In-memory 연산.

2. **Catalyst Optimizer**: 쿼리 최적화.

3. **Tungsten**: CPU와 메모리 최적화.

이 세 가지가 합쳐져 MapReduce 대비 100배, 때로는 1000배 성능을 낸다.

이 글에서 다룰 것

1. **RDD의 탄생과 lineage**.

2. **DataFrame/Dataset**: 구조화된 데이터.

3. **Catalyst optimizer**: 쿼리 재작성.

4. **Physical plan과 cost-based 최적화**.

5. **Tungsten project**: CPU 캐시 친화적 실행.

6. **Whole-stage code generation**: JIT으로 코드 생성.

7. **Shuffle**: 분산 집계의 비밀.

8. **Adaptive Query Execution**: 런타임 최적화.

9. **실전 튜닝**.

왜 이 지식이 중요한가

- **데이터 엔지니어**: Spark 튜닝의 80%는 내부 이해.

- **DA/DS**: 왜 이 쿼리가 느린지 설명.

- **플랫폼 엔지니어**: 클러스터 최적화.

- **다른 시스템 학습**: Dask, Ray, Flink가 Spark의 개념을 차용.

1. RDD: 모든 것의 시작

탄생 배경

2009년경 빅데이터 처리는 **MapReduce**가 지배했다. 문제는:

1. **반복 알고리즘**이 극도로 느림 (ML, 그래프).

2. 각 Map-Reduce job이 **디스크를 왕복**.

3. **인터랙티브 분석**이 불가능 (매번 몇 분~시간).

**Zaharia의 통찰**: "**메모리에서 데이터를 유지**하면 되지 않을까?"

문제: 메모리는 휘발성. 노드 장애 시 어떻게?

**해결**: **Lineage (계보)**. 데이터가 아닌 **변환 연산의 기록**을 유지. 장애 시 재계산.

RDD란

**RDD (Resilient Distributed Dataset)**:

- **Resilient**: 장애에 견딤 (lineage로).

- **Distributed**: 클러스터에 분산.

- **Dataset**: 데이터 집합.

핵심 특성:

1. **Immutable**: 한 번 만들어지면 변경 불가.

2. **Lazy**: Transformation은 action이 불릴 때까지 실행 안 됨.

3. **Typed**: Java/Scala의 타입 시스템 활용.

4. **Partitioned**: 여러 worker에 분산.

Transformation vs Action

**Transformation**: 새 RDD 생성. **Lazy**.

val lines = sc.textFile("data.txt") // RDD[String]

val words = lines.flatMap(_.split(" ")) // 아직 실행 안 됨

val counts = words.map(w => (w, 1))

val reduced = counts.reduceByKey(_ + _) // 여전히 실행 안 됨

**Action**: 실제 계산 트리거. **Eager**.

reduced.collect() // 여기서 전체 체인이 실행됨

왜 Lazy인가

Lazy evaluation의 장점:

1. **최적화 기회**: 여러 transformation을 **한 번에 처리**. 예: map → filter → map을 하나의 pass로.

2. **불필요한 계산 회피**: `take(10)`만 필요하면 10개만.

3. **Pipeline**: 중간 데이터를 메모리에 쌓지 않음.

Lineage와 Fault Tolerance

val rdd1 = sc.textFile("data.txt") // Dependency: HDFS

val rdd2 = rdd1.map(x => x.toUpperCase) // Dependency: rdd1

val rdd3 = rdd2.filter(x => x.startsWith("A")) // Dependency: rdd2

만약 `rdd3`를 계산 중에 한 partition이 노드 장애로 손실되면:

1. Lineage 확인: rdd3 → rdd2 → rdd1 → file.

2. 손실된 partition을 **재계산**.

3. 다른 partition은 그대로.

**장점**: Replication 없이 내결함성. 디스크 I/O 없음.

**단점**: Lineage가 길면 재계산이 오래 걸림. **Checkpoint**로 해결 (중간 저장).

Narrow vs Wide Dependency

RDD 간 의존성은 두 종류:

**Narrow dependency**:

- 자식 partition이 부모의 **소수 partition**에 의존.

- 예: map, filter, union.

- **한 stage 내에서 pipeline 실행 가능**.

- 재계산 시 영향 범위 작음.

**Wide dependency** (shuffle dependency):

- 자식 partition이 부모의 **여러 partition**에 의존.

- 예: groupByKey, reduceByKey, join.

- **Shuffle 필요**: 데이터 재분산.

- Stage 경계.

Narrow: Wide:

Parent Parent Parent Parent Parent

| | | / | \ |

▼ ▼ ▼ ▼ ▼ ▼ ▼

Child1 Child2 ↓ ↓ ↓ ↓ ↓

Shuffle 발생

↓ ↓ ↓

Child1 Child2 Child3

DAG와 Stage

Spark가 액션을 받으면:

1. **RDD graph를 DAG로 변환**.

2. **Wide dependency를 경계로 stage 분할**.

3. 각 stage는 **tasks**로 분해 (partition 수 만큼).

4. Stages를 순차적으로, task를 병렬로 실행.

Action: reduce()

Logical DAG:

RDD1 → map → RDD2 → reduceByKey → RDD3 → collect

Stages:

Stage 1: textFile → map (narrow, pipeline)

[Shuffle]

Stage 2: reduceByKey → collect

RDD의 한계

RDD는 훌륭했지만 몇 가지 약점:

1. **Schema 없음**: 그냥 Java/Scala 객체의 컬렉션.

2. **Optimizer 없음**: 사용자가 직접 최적화.

3. **느린 직렬화**: Java serialization.

4. **메모리 오버헤드**: JVM 객체의 overhead.

Spark 2.0은 이를 해결하기 위해 **DataFrame/Dataset + Catalyst + Tungsten**을 도입했다.

2. DataFrame & Dataset: 구조화 혁명

DataFrame의 등장

Spark 1.3 (2015)에서 **DataFrame** 도입. SQL의 테이블처럼 구조화된 RDD:

val df = spark.read.json("people.json")

df.show()

// +---+----+

// |age|name|

// +---+----+

// | 30|Bob |

// | 25|Alice|

// +---+----+

df.filter($"age" > 20).select("name").show()

**RDD와 차이**:

- **Schema 있음**: 컬럼과 타입 정보.

- **SQL처럼 조작**: 고수준 API.

- **Optimizer 적용**: Catalyst가 알아서 최적화.

- **Tungsten 메모리 포맷**: JVM 객체 아닌 바이너리.

Dataset (Spark 1.6+)

**Dataset**: DataFrame + **타입 안전성**.

case class Person(name: String, age: Int)

val ds = spark.read.json("people.json").as[Person]

ds.filter(_.age > 20).map(_.name)

**장점**:

- 컴파일 타임 타입 체크.

- Object API와 DataFrame 최적화 모두.

**Scala/Java에만 존재**. Python은 DataFrame만 사용 (Python은 정적 타입이 아니므로).

DataFrame = Dataset[Row]

사실 DataFrame은 `Dataset[Row]`의 alias. Row는 untyped.

// 같은 것

val df: DataFrame = spark.read.json(...)

val df: Dataset[Row] = spark.read.json(...)

성능 차이

단순 예시: 1억 행 필터링.

- **RDD**: 100초.

- **DataFrame/Dataset**: 15초.

**6~7배 차이**. 이유:

1. Catalyst 최적화 (predicate pushdown 등).

2. Tungsten 바이너리 포맷.

3. Whole-stage codegen.

3. Catalyst Optimizer: 쿼리 재작성의 마법

Catalyst의 4단계

Catalyst는 SQL/DataFrame query를 4단계로 처리한다:

SQL or DataFrame

1. Analysis (unresolved plan → resolved logical plan)

2. Logical Optimization (rule-based)

3. Physical Planning (여러 물리 계획 생성 + 비용 비교)

4. Code Generation (Tungsten)

RDD (실행)

1단계: Analysis

**입력**: Unresolved logical plan.

SQL: SELECT name FROM users WHERE age > 20

↓ parser

UnresolvedRelation("users")

→ UnresolvedAttribute("age")

→ UnresolvedAttribute("name")

**Catalog**를 참조해서 resolve:

- 테이블 이름 → 실제 데이터 소스.

- 컬럼 이름 → 컬럼 타입.

- 함수 → 실제 구현.

결과: **Resolved Logical Plan**.

2단계: Logical Optimization

**Rule-based** 최적화. 수십~수백 개의 rule을 반복 적용:

**Constant Folding**:

age * 2 + 3 → (미리 계산)

**Predicate Pushdown**:

SELECT * FROM (table JOIN other) WHERE table.x > 10

↓ pushdown

SELECT * FROM (SELECT * FROM table WHERE x > 10) JOIN other

필터를 조인 전으로 밀어 넣음. 중간 결과 크기 감소.

**Projection Pushdown**:

SELECT name FROM users

// users에 50개 컬럼 있어도 name만 읽음

**Simplification**:

WHERE true AND x > 10 → WHERE x > 10

WHERE false → never executes

**Subquery Elimination**:

SELECT * FROM t WHERE id IN (SELECT id FROM s)

SELECT * FROM t JOIN s ON t.id = s.id

Logical Plan 예시

val df = spark.sql("""

SELECT u.name, COUNT(*) as orders

FROM users u

JOIN orders o ON u.id = o.user_id

WHERE u.country = 'KR'

GROUP BY u.name

""")

df.explain(true)

**Parsed Logical Plan**:

'Project ['u.name, 'COUNT(1) AS orders]

+- 'Filter ('u.country = KR)

+- 'Join Inner, ('u.id = 'o.user_id)

:- 'UnresolvedRelation `users`

+- 'UnresolvedRelation `orders`

**Analyzed Logical Plan**:

Aggregate [name#10], [name#10, count(1) AS orders#20]

+- Filter (country#12 = KR)

+- Join Inner, (id#9 = user_id#15)

:- Relation[id#9,name#10,age#11,country#12] parquet

+- Relation[user_id#15,total#16] parquet

**Optimized Logical Plan**:

Aggregate [name#10], [name#10, count(1) AS orders#20]

+- Project [name#10]

+- Join Inner, (id#9 = user_id#15)

:- Project [id#9, name#10]

+- Filter (country#12 = KR)

+- Relation[...] parquet

+- Project [user_id#15]

+- Relation[...] parquet

변화:

- Filter가 users 바로 위로 이동.

- Projection이 각 relation 바로 위로.

- 불필요한 컬럼 제거.

3단계: Physical Planning

논리 계획을 **실행 가능한 물리 연산**으로 변환. Catalyst는 **여러 물리 계획을 생성**하고 비교.

**Join 선택 예시**:

- **BroadcastHashJoin**: 작은 테이블이면.

- **SortMergeJoin**: 큰 테이블들.

- **ShuffledHashJoin**: 중간 크기.

- **BroadcastNestedLoop**: cartesian이면.

Catalyst는 각 옵션의 **cost를 추정**하고 가장 저렴한 것 선택.

4단계: Code Generation

선택된 물리 계획을 **Java 바이트코드로 변환**. 다음 섹션에서 자세히.

Cost-Based Optimizer (CBO)

Spark 2.2+부터 **통계 기반** 최적화:

ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age

이후 Catalyst가:

- Table 크기, 행 수.

- 컬럼별 min/max/null count.

- 컬럼별 distinct count.

이 정보로 **join 순서 최적화**, selectivity 추정 등.

기본 RBO (rule-based optimizer)와 조합되어 더 나은 계획 생성.

4. Tungsten: CPU와 메모리의 재발명

Tungsten 프로젝트

Spark 1.5 (2015)에 시작. **"Spark 성능을 하드웨어 한계까지"**.

핵심 목표:

1. **JVM 메모리 오버헤드 제거**.

2. **CPU 캐시 활용**.

3. **Vectorized 실행**.

4. **코드 생성으로 CPU 파이프라인 최적화**.

JVM 객체의 문제

**Java 객체의 오버헤드**:

Integer i = 42;

`int` 값은 4바이트지만, `Integer` 객체는:

- Object header: ~16 바이트.

- Value field: 4 바이트.

- Padding: 4 바이트.

- **총 24 바이트**.

6배 오버헤드. 수백만 객체가 있으면 메모리 낭비 심각.

**String 더 심함**:

String s = "hello";

// Object: 40+ bytes

// char[] 객체: 24 bytes + 2 * 5 = 34 bytes

// 총 60+ bytes

해결: Off-heap Binary Format

Tungsten은 데이터를 **JVM 힙 밖**의 바이너리 포맷에 저장:

Row: [age=30, name="Alice"]

Binary encoding (off-heap):

[bitmap][age:4bytes][name_offset:4bytes][name_length:4bytes]...[name_data]

- **No JVM objects**.

- **No GC pressure**.

- **Cache-friendly layout**.

- **직접 메모리 접근**: `sun.misc.Unsafe`.

UnsafeRow

Spark의 기본 row 표현:

public final class UnsafeRow extends MutableRow {

private Object baseObject; // byte[] or null (off-heap)

private long baseOffset;

private int sizeInBytes;

private int numFields;

// ...

}

각 필드 접근:

long getLong(int i) {

return Platform.getLong(baseObject, baseOffset + offsets[i]);

}

Direct memory access. JVM 객체 생성 없음.

Cache Friendliness

Tungsten 포맷은 **연속 메모리**:

Row 0: [col0][col1][col2]

Row 1: [col0][col1][col2]

Row 2: [col0][col1][col2]

특정 컬럼만 읽으면:

col0: 0, 12, 24, ... (stride: row size)

Spatial locality 나쁨. 그러나 일반 접근 패턴에선 여전히 빠름.

**Vectorized reader** (Parquet 등)는 columnar 포맷으로 직접 읽음 → 더 캐시 친화적.

메모리 관리

Tungsten은 자체 **메모리 매니저**:

- **Execution memory**: Shuffle, join, sort용.

- **Storage memory**: Cache (persist).

- 두 영역이 **동적으로** 서로 빌림.

- **Off-heap** + **on-heap** 지원.

Spark 설정:

spark.executor.memory = 8g

spark.memory.fraction = 0.6 # execution + storage

spark.memory.storageFraction = 0.5 # storage의 최소 보장

spark.memory.offHeap.enabled = true

spark.memory.offHeap.size = 4g

성능 효과

Tungsten 전후 벤치마크:

- **메모리 사용**: 30~70% 감소.

- **쿼리 속도**: 2~5배 향상.

- **GC pause**: 크게 감소.

특히 **aggregation, join 등 메모리 집약적** 작업에서 효과 큼.

5. Whole-Stage Code Generation

전통 Volcano 모델의 문제

전통 SQL 엔진은 **Volcano 모델** (iterator-based):

while (child.hasNext()) {

Row row = child.next();

if (predicate(row)) {

process(row);

}

}

각 연산자가 `next()`를 호출. 함수 호출 오버헤드:

- **Virtual function calls**: CPU 파이프라인 깨짐.

- **Object allocation**: row 객체 계속 생성.

- **Cache misses**: 매번 다른 코드 경로.

단순 쿼리도 연산자 수에 비례하는 오버헤드.

아이디어: 통합 코드 생성

"**여러 연산자를 하나의 함수로 병합**". 예:

SELECT name FROM users WHERE age > 20

**Volcano**:

Scan → Filter → Project

세 개의 iterator 객체.

**Codegen**:

// 런타임에 생성된 코드

while (scanner.hasNext()) {

Row row = scanner.next();

if (row.getInt("age") > 20) {

emit(row.getString("name"));

}

}

한 개의 루프. 함수 호출 없음. 컴파일러가 최적화.

Whole-Stage CodeGen

Spark 2.0 (2016)의 **Whole-Stage Code Generation (WSCG)**:

1. **Stage 단위로 합병**: Shuffle 경계까지 모두.

2. **Java 소스 코드 생성**: 런타임에.

3. **Janino**로 즉시 컴파일.

4. **직접 실행**: JVM이 JIT 최적화.

**결과**:

- 함수 호출 감소 (virtual call → inlined).

- 중간 객체 제거.

- CPU pipeline 활용.

- **2~10배 성능 향상**.

예시: 생성된 코드

SELECT id, name FROM users WHERE age > 20 AND country = 'KR'

**생성된 Java 코드** (간소화):

public class GeneratedIterator {

private scan_input_0; // Parquet reader

public boolean hasNext() { return scan_input_0.hasNext(); }

public UnsafeRow next() {

while (scan_input_0.hasNext()) {

UnsafeRow row = scan_input_0.next();

int age = row.getInt(2);

if (age > 20) {

UTF8String country = row.getUTF8String(3);

if (country.equals("KR")) {

// project id, name

UnsafeRow output = /* build row */;

return output;

}

}

}

return null;

}

}

모든 것이 **인라인**. 루프 하나. JIT이 바이트코드를 네이티브로 최적화.

Vectorized Execution과의 차이

Whole-stage codegen은 **row-at-a-time** 기반이다. 반면 **vectorized execution**은 **batch-at-a-time**:

while (batch = next_batch()) { // 1024 rows

for (row : batch) {

// process

}

}

ClickHouse, Snowflake, DuckDB가 vectorized.

Spark의 접근은 **둘 다 활용**:

- **Columnar scan**: Parquet, ORC를 vectorized로 읽음.

- **Row-based processing**: Codegen으로 각 row 처리.

- Spark 3.2+: 일부 연산자에 vectorized 지원 추가.

EXPLAIN으로 확인

df.explain("codegen")

출력:

Generated code:

/* 001 */ public Object generate(Object[] references) {

/* 002 */ return new GeneratedIteratorForCodegenStage1(references);

/* 003 */ }

...

실제 생성된 Java 코드를 볼 수 있다. 디버깅에 유용.

6. Shuffle: 분산 집계의 핵심

왜 Shuffle이 필요한가

Wide dependency (groupBy, join 등)는 데이터 **재분배**가 필요하다:

Stage 1 (4 partitions):

P0: [(a,1), (b,2), (c,3)]

P1: [(a,4), (d,5)]

P2: [(b,6), (c,7), (e,8)]

P3: [(d,9), (a,10)]

reduceByKey:

모든 'a'는 같은 partition으로

모든 'b'는 같은 partition으로

...

Stage 2 (4 partitions after shuffle):

P0: [(a, [1,4,10])] → [(a,15)]

P1: [(b, [2,6])] → [(b,8)]

P2: [(c, [3,7])] → [(c,10)]

P3: [(d, [5,9]), (e, [8])] → [(d,14),(e,8)]

Shuffle은 **stage 사이의 데이터 이동**. 네트워크와 디스크 I/O 집약적.

Shuffle 과정

**Write 단계** (Map 측):

1. 각 task가 **로컬 파일**에 결과 쓰기.

2. 각 reduce task에 해당하는 **partition으로 분류**.

3. 디스크에 저장 (나중에 reduce가 fetch).

**Read 단계** (Reduce 측):

1. 모든 map task로부터 자기 partition 데이터 **fetch**.

2. 네트워크로 전송 (executor 간).

3. 메모리에 모음.

4. 처리 (sort, aggregate 등).

Shuffle의 비용

Shuffle은 Spark의 **가장 비싼 연산**:

- **디스크 I/O**: write + read.

- **네트워크**: 모든 executor 간 cross.

- **Serialization**: 객체 ↔ 바이트.

- **Sort**: 보통 필요.

**규모**:

- 작은 job: 수 MB.

- 큰 job: 수 TB.

1 TB shuffle = 많은 시간과 자원.

Shuffle 전략의 진화

**1. Hash Shuffle** (초기 Spark):

- 각 map task가 각 reduce partition별로 **별도 파일**.

- M × R 파일 (M=map tasks, R=reduce tasks).

- 100 × 200 = 20,000 파일.

- 파일 시스템 부담.

**2. Sort Shuffle** (1.1+, 기본):

- 각 map task가 **하나의 파일**에 정렬된 결과.

- 인덱스 파일로 partition 위치 표시.

- M 파일만.

- **훨씬 효율적**.

**3. Tungsten Sort Shuffle** (1.5+):

- Tungsten의 binary format 활용.

- 캐시 친화적.

- 더 빠름.

Broadcast Join

**작은 테이블**을 조인할 때의 최적화:

val small = ... // 10 MB

val big = ... // 1 TB

val joined = big.join(broadcast(small), "key")

**Broadcast**:

1. Driver가 small 테이블을 **모든 executor에 복사**.

2. 각 executor가 big의 자기 partition과 **로컬에서 조인**.

3. **Shuffle 없음**.

**조건**: 작은 테이블이 메모리에 들어가야. 기본 `spark.sql.autoBroadcastJoinThreshold = 10 MB`.

**Shuffle join 대비**: **수 배~수십 배 빠름**.

Shuffle Partitioning

`spark.sql.shuffle.partitions`: shuffle 후 partition 수. 기본 200.

**너무 작음**:

- 큰 partition.

- 일부 task의 OOM.

- 병렬성 부족.

**너무 큼**:

- 많은 작은 파일.

- 스케줄링 오버헤드.

**가이드**: 각 partition이 **100~200 MB**가 되도록.

Shuffle 최적화 기법

**1. 파일 수 줄이기**:

df.coalesce(10).write.parquet(...)

작은 partition들을 병합.

**2. Partitioning 조정**:

df.repartition(50, $"key") // key 기반 재분할

**3. Broadcast 강제**:

val hint_df = big.join(broadcast(small), ...)

// 또는 SQL hint

// SELECT /*+ BROADCAST(small) */ ...

**4. Skew 처리**: 다음 섹션.

7. Adaptive Query Execution (AQE)

Spark 3.0의 킬러 기능

**Adaptive Query Execution (AQE)**: 런타임에 수집한 통계를 바탕으로 **쿼리 계획을 동적으로 재최적화**.

spark.sql.adaptive.enabled = true # 3.2+부터 기본 true

주요 기능

**1. Dynamically coalescing shuffle partitions**:

전통적 문제:

- `spark.sql.shuffle.partitions = 200`을 정해둠.

- 작은 shuffle은 200 partition이 과도.

- 큰 shuffle은 부족.

AQE:

- Shuffle 후 실제 데이터 크기 확인.

- **작은 partition을 자동 병합**.

- 예: 200 → 20 (각 partition이 충분한 크기).

**2. Dynamically switching join strategies**:

계획 시:

- `SortMergeJoin` 결정 (두 테이블 크기 추정).

실행 중:

- 실제 한 테이블이 예상보다 작은 것 판명.

- **동적으로 BroadcastHashJoin으로 전환**.

**3. Dynamically optimizing skew joins**:

Skew 감지:

- 특정 key의 partition이 평균보다 훨씬 큼.

AQE:

- 큰 partition을 **여러 sub-partition으로 분할**.

- 각각 별도 task로 실행.

- 결과를 union.

실측 효과

Databricks 벤치마크 (TPC-DS):

- AQE 전: 100%.

- AQE 후: 130% 이상.

- 일부 쿼리: **3배+ 향상**.

거의 공짜로 얻는 성능. AQE는 Spark 3.0의 가장 중요한 개선 중 하나.

8. Dynamic Partition Pruning

문제

Star schema의 fact와 dimension join:

SELECT f.amount

FROM fact_sales f JOIN dim_date d ON f.date_id = d.id

WHERE d.year = 2024

전통:

1. `fact_sales` 전체 스캔 (테라바이트 급).

2. `dim_date` 필터 (year=2024).

3. Join.

`fact_sales`는 date_id로 **partitioned**되어 있어도 전체 스캔. Filter가 dim에만 있어서.

AQE + DPP

Spark 3.0의 **Dynamic Partition Pruning**:

1. `dim_date`에서 year=2024인 id 먼저 수집.

2. **그 id들을 fact_sales의 partition filter로 pushdown**.

3. `fact_sales` 스캔 시 **해당 partition만** 로드.

**효과**:

- 수 TB → 수 GB.

- **10~100배 빠른 쿼리**.

조건

- 파티션된 테이블.

- Dimension 필터가 highly selective.

- Spark 3.0+.

9. 실전 튜닝

메모리 설정

--conf spark.executor.memory=16g

--conf spark.executor.memoryOverhead=2g

--conf spark.executor.cores=4

--conf spark.executor.instances=10

**원칙**:

- Executor당 **cores 4~6**: GC, I/O 균형.

- **Memory overhead = max(384MB, 10% of memory)**.

- **Instances**: 클러스터 용량과 workload에 따라.

Shuffle 튜닝

--conf spark.sql.shuffle.partitions=200 # 기본, 조정

--conf spark.sql.adaptive.enabled=true

--conf spark.sql.adaptive.coalescePartitions.enabled=true

--conf spark.sql.adaptive.skewJoin.enabled=true

**Partition 수 계산**:

- 총 데이터 / 목표 크기 (200 MB).

- 예: 1 TB → 5000 partitions.

Broadcast 임계값

--conf spark.sql.autoBroadcastJoinThreshold=100MB

기본 10 MB는 너무 작을 수 있음. 100 MB까지 올리면 더 많은 broadcast join.

**주의**: Executor 메모리 확인. Broadcast 테이블이 모든 executor에 복제되므로.

Cache 전략

df.cache() // MEMORY_AND_DISK (기본)

df.persist(StorageLevel.MEMORY_ONLY)

df.persist(StorageLevel.MEMORY_AND_DISK_SER) // 직렬화 (작음)

**언제 cache**:

- **여러 번 사용**되는 DataFrame.

- 계산 비용 큰 DataFrame.

**주의**: 메모리 부족하면 disk로 spill. 재계산이 더 나을 수 있음.

파티셔닝

**Partitioned write**:

df.write.partitionBy("year", "month").parquet(...)

파일을 year/month별 디렉토리로 저장. 쿼리 시 **partition pruning**으로 빠른 읽기.

**주의**: 너무 많은 partition은 작은 파일 문제.

Skew 수동 처리

AQE가 부족하면 수동:

// Salting

val salted = df.withColumn("salted_key",

concat($"key", lit("_"), (rand() * 10).cast("int")))

// Join 후 다시 그룹화

10. 성능 디버깅

Spark UI

Spark Web UI가 가장 중요한 도구:

- **Jobs**: Action별 실행.

- **Stages**: Stage별 task 분포.

- **SQL/DataFrame**: 논리/물리 계획.

- **Executors**: 리소스 사용.

- **Storage**: Cache 상태.

주의할 Metric

**Task duration 분포**:

- **Median**과 **Max** 차이가 크면 **skew**.

- 99th percentile이 나쁘면 병목.

**Shuffle read/write**:

- 크기가 예상보다 크면 **filter pushdown 실패**.

- 작으면 효율적.

**GC time**:

- Task 시간의 **10% 이상**이면 GC 문제.

- Memory 증설 또는 parallelism 증가.

Explain 명령어

df.explain() // 물리 계획

df.explain(true) // 4단계 모두

df.explain("cost") // CBO 비용

df.explain("codegen") // 생성된 코드

df.explain("formatted") // 보기 쉬운 포맷

Spark SQL Shell

빠른 테스트:

spark-sql

> SELECT ...

대화형 쿼리와 EXPLAIN.

11. 흔한 함정과 교훈

함정 1: 너무 많은 작은 파일

**증상**: 작은 작업이 수 분 걸림.

**원인**: 수백만 개 작은 파일.

**해결**: `coalesce` 또는 `repartition`으로 병합.

함정 2: Skew

**증상**: 99% task는 빠르고 1%가 영원히.

**원인**: 한 key가 압도적으로 많음 (예: `user_id = null`).

**해결**:

1. AQE의 skew join 활성화.

2. Null key 필터.

3. Salting.

함정 3: Broadcast OOM

**증상**: Executor OOM 발생.

**원인**: Broadcast 테이블이 너무 큼.

**해결**:

- `spark.sql.autoBroadcastJoinThreshold` 낮춤.

- 명시적 `broadcast()` 제거.

- Sort-merge join으로.

함정 4: 너무 많은 Shuffle Partition

**증상**: 작업이 느리고 많은 작은 task.

**원인**: `spark.sql.shuffle.partitions` 너무 큼.

**해결**: AQE coalesce 활성화, 또는 수동 조정.

함정 5: Python UDF의 오버헤드

**증상**: PySpark 작업이 극도로 느림.

**원인**: Python UDF는 **JVM ↔ Python 직렬화** 왕복.

**해결**:

- SQL 내장 함수 사용.

- Pandas UDF (vectorized, Apache Arrow 사용).

@pandas_udf("double")

def my_udf(s: pd.Series) -> pd.Series:

return s * 2

일반 UDF보다 **10배 이상** 빠름.

함정 6: 잘못된 Partition 수

**증상**: Under-utilization 또는 OOM.

**원인**:

- 너무 적음: 큰 task, OOM.

- 너무 많음: 오버헤드.

**해결**: 각 partition 100~200 MB 원칙.

퀴즈로 복습하기

**A.**

**Lineage의 의미**: RDD는 데이터 자체가 아닌 **"어떻게 만들어졌는지의 기록"**을 유지한다.

val rdd1 = sc.textFile("hdfs://data.txt") // source

val rdd2 = rdd1.map(_.toUpperCase) // parent: rdd1

val rdd3 = rdd2.filter(_.startsWith("A")) // parent: rdd2

val rdd4 = rdd3.flatMap(_.split(",")) // parent: rdd3

각 RDD는 **부모 RDD와 변환 함수**를 기억한다. 이 관계를 이어가면 **원천 데이터 소스부터 현재까지의 전체 계보**가 된다.

**장애 복구 원리**:

**시나리오**: `rdd4.collect()` 실행 중 executor 하나가 죽음. 그 executor가 처리하던 rdd4의 partition 3이 사라짐.

**복구 과정**:

1. **손실 감지**: Driver가 task 실패 감지.

2. **Lineage 추적**: rdd4 partition 3 → rdd3 partition 3 → rdd2 partition 3 → rdd1 partition 3.

3. **재계산**: 다른 executor에서 원천부터 다시 계산.

- HDFS에서 원본 텍스트 partition 3 읽기.

- map 적용.

- filter 적용.

- flatMap 적용.

4. **완료**: 손실된 partition 복구.

**Key: Partition 단위 재계산**. 전체 RDD가 아닌 **손실된 partition만**. 다른 partition들은 영향 없음.

**Replication과의 비교**:

**Traditional fault tolerance (replication)**:

- 모든 데이터를 여러 노드에 복제.

- 네트워크 I/O 많음 (복제 시).

- 디스크 사용 2~3배.

- 장애 시 즉시 다른 복제본 사용.

**RDD lineage**:

- **Replication 없음**. 데이터 한 번만.

- 네트워크/디스크 절약.

- 장애 시 **재계산** (시간 걸림).

- 메모리 기반 연산과 궁합 좋음.

**왜 Spark의 선택은 lineage인가**:

Spark는 **반복 연산**(ML, 그래프)을 타겟으로 했다. 이런 워크로드는:

- 데이터가 메모리에 있어 빠름.

- Lineage 재계산도 (메모리니까) 빠름.

- Replication 비용이 반복적 쓰기로 누적.

**Lineage의 한계**:

**1. Lineage가 길면 재계산이 오래**:

rdd1 → rdd2 → rdd3 → ... → rdd100 → action

rdd99 partition 손실 시, rdd1부터 재계산. 수 분~시간 가능.

**해결: Checkpoint**:

rdd50.checkpoint()

- rdd50을 **디스크에 저장**.

- Lineage를 이 지점에서 "잘라냄".

- 이후 복구는 rdd50부터.

**2. Wide dependency의 문제**:

rdd1 (4 partitions) --shuffle--> rdd2 (4 partitions)

rdd2 partition 3 손실 → **rdd1의 모든 partition** 필요 (shuffle이라 재분배). 비용이 큼.

**해결**: Shuffle 결과를 영구 저장. Spark가 기본으로 함 (**shuffle files**가 로컬 디스크에 남음).

**3. 외부 시스템 부작용**:

rdd.foreach(x => saveToDatabase(x))

재계산 시 **중복 저장** 위험. Lineage는 순수 함수 가정.

**해결**: Idempotent 연산, 또는 정확히 한 번 semantic을 별도 구현.

**교훈**:

RDD lineage는 **"replication 없이 내결함성"**을 가능하게 했다. 이것이 Spark가 MapReduce 대비 **100배 빠를** 수 있었던 기반이다. 메모리 연산 + 경량 복구 = 빅데이터의 새 패러다임.

**현대 적용**:

- **RDD lineage** → DataFrame, Dataset 아래에서도 유지.

- **Checkpoint** → 장기 실행 job에서 중요.

- **Structured Streaming**에서도 비슷한 아이디어 (log-based recovery).

**분산 시스템 철학적 관점**:

Lineage는 **"상태를 저장하지 말고, 만드는 방법을 저장하라"** 는 접근. 이는 함수형 프로그래밍의 철학과 일치한다:

- Immutable: RDD는 변하지 않음.

- Referential transparency: 같은 입력 → 같은 출력 (재계산 가능).

- Composition: RDD들의 체인.

Spark는 **분산 컴퓨팅에 함수형 원칙을 적용**해서 새 영역을 열었다. 그 단순하고 우아한 아이디어가 10년 넘게 이 분야를 이끌고 있다.

**A.**

**Catalyst의 핵심 아이디어**: SQL/DataFrame 쿼리를 **tree** 로 표현하고, 수십~수백 개의 **rewrite rule**을 반복 적용해서 더 효율적인 tree로 변환한다.

**4단계 처리 과정**:

**1. Parsing → Unresolved Logical Plan**

SELECT name FROM users WHERE age > 20

Project [name]

Filter (age > 20)

UnresolvedRelation("users")

**2. Analysis → Resolved Logical Plan**

Catalog 참조로:

- `users` → 실제 테이블 스키마.

- `name`, `age` → 컬럼 참조.

- 타입 검증.

Project [name#10]

Filter (age#11 > 20)

Relation[name#10, age#11, email#12] parquet

**3. Logical Optimization → Optimized Logical Plan**

이 단계가 진짜 마법. 많은 rule이 반복 적용된다:

**Rule 1: Projection Pushdown**

Project [name#10]

Filter (age#11 > 20)

Relation[name#10, age#11, email#12] ← email 불필요

Project [name#10]

Filter (age#11 > 20)

Project [name#10, age#11] ← 필요한 컬럼만

Relation[name, age, email]

**이점**: Parquet reader가 name, age 컬럼만 읽음. email 스킵. I/O 절약.

**Rule 2: Filter Pushdown to Data Source**

Parquet/ORC는 **predicate pushdown**을 지원한다:

Filter (age > 20)

Relation(parquet)

Relation(parquet) WITH FILTER (age > 20)

Parquet reader가 min/max 통계를 보고 **row group 전체를 스킵**. 수십 배 빠른 읽기.

**Rule 3: Constant Folding**

WHERE age > 18 + 2

WHERE age > 20

매 행마다 `18+2`를 계산하지 않음.

**Rule 4: Simplification**

WHERE true AND x > 10

WHERE x > 10

WHERE x > 10 OR false

WHERE x > 10

**Rule 5: Join Reorder**

SELECT * FROM a JOIN b ON a.id = b.id JOIN c ON b.cid = c.id

WHERE a.country = 'KR'

CBO (Cost-Based Optimizer)가 통계를 보고 최적 순서 결정:

- `a`를 먼저 필터 (country='KR').

- 작아진 `a`와 `b` 조인.

- 결과와 `c` 조인.

Catalyst는 **지수 개의 조인 순서**를 탐색하고 가장 싼 것 선택.

**Rule 6: Subquery Unnesting**

SELECT * FROM orders

WHERE user_id IN (SELECT id FROM users WHERE country = 'KR')

SELECT o.* FROM orders o

INNER JOIN (SELECT id FROM users WHERE country = 'KR') u

ON o.user_id = u.id

Semi-join으로 변환. 훨씬 빠름.

**Rule 7: Column Pruning Through Join**

SELECT a.name FROM a JOIN b ON a.id = b.aid

`b`에서는 `aid`만 필요 (조인 키). 다른 컬럼 스킵.

**Rule 8: Decimal 연산 최적화**

SELECT SUM(price * quantity) FROM sales

Decimal 곱셈은 비쌈. Catalyst는 overflow를 고려해서 **필요한 정밀도만** 사용.

**4. Physical Planning**

논리 계획 → 실행 연산자:

Project → org.apache.spark.sql.execution.ProjectExec

Filter → FilterExec

Join → SortMergeJoinExec (또는 BroadcastHashJoinExec 등)

Aggregate → HashAggregateExec

여러 물리 계획을 생성하고 **cost를 비교**해서 선택.

**예: Join 전략 선택**

SELECT * FROM big JOIN small ON big.id = small.id

Catalyst는 다음을 고려:

- `small` 크기 < `spark.sql.autoBroadcastJoinThreshold` (기본 10MB)?

- Yes: **BroadcastHashJoin** 선택 (small을 모든 executor에 복사, shuffle 없음).

- No: **SortMergeJoin** 선택 (양쪽 shuffle).

Broadcast join은 **수 배~수십 배** 빠르다.

**실전 효과**:

간단한 쿼리:

SELECT u.name, SUM(o.amount)

FROM users u JOIN orders o ON u.id = o.user_id

WHERE u.country = 'KR'

GROUP BY u.name

**최적화 전** (naive 실행):

1. users 전체 스캔.

2. orders 전체 스캔.

3. 전체 join.

4. filter (country='KR').

5. group by.

1 TB orders 스캔. 수십 분.

**최적화 후** (Catalyst):

1. users에 filter pushdown (country='KR' → 10% 데이터만).

2. Projection pushdown (id, name만 읽음).

3. orders도 필요 컬럼만 읽음 (user_id, amount).

4. Broadcast join (users가 충분히 작아서).

5. group by.

I/O 90% 절약. **10배 빠른 쿼리**.

**Catalyst의 확장성**:

Catalyst의 rule은 **사용자 정의** 가능:

object MyRule extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan.transform {

case Filter(p, child) => /* custom logic */

}

}

spark.experimental.extraOptimizations = Seq(MyRule)

이 확장성이 Databricks의 Delta Lake, Iceberg 같은 프로젝트의 기반.

**교훈**:

Catalyst는 **"규칙 기반 트리 변환"**이라는 단순한 아이디어로 거대한 성과를 냈다. 각 rule은 단순하지만 조합되면 **놀라운 최적화**.

전통 DB (PostgreSQL, MySQL)도 쿼리 optimizer가 있지만 Catalyst의 강점:

1. **Scala tree API**로 쉬운 rule 작성.

2. **Extensible**: 외부 rule 추가.

3. **DataFrame API와 통합**: SQL이 아닌 코드도 최적화.

결과: **기본 SQL을 작성해도 손으로 최적화한 것과 비슷한 성능**. 이것이 Spark가 데이터 엔지니어에게 사랑받는 이유다.

**A.**

**JVM 객체의 숨은 비용**:

평범한 Java 코드:

Integer age = 30;

String name = "Alice";

메모리 사용을 보면:

- `Integer`: 16 bytes header + 4 bytes value + 4 bytes padding = **24 bytes** (4 bytes 데이터에)

- `String "Alice"`:

- String 객체: ~40 bytes

- 내부 `char[]`: 16 bytes header + 10 bytes (5 chars) + padding = ~32 bytes

- 총: **~72 bytes** (5 chars 데이터에)

**실제 데이터 (int 4 + 5 chars = 9 bytes)가 96 bytes를 차지**. **10배 오버헤드**.

**빅데이터에선 이것이 재앙**:

1억 row에 5개 필드가 있다면:

- 실제 데이터: ~5 GB.

- JVM 객체로: **~50 GB**.

메모리 부족. GC pressure. 캐시 miss.

**Tungsten의 해결**:

Row를 **연속된 바이트 배열**에 저장:

Row(age=30, name="Alice", email="a@b.c")

Tungsten format:

[null_bitmap:8B][age:8B][name_ptr:8B][email_ptr:8B][name:"Alice"][email:"a@b.c"]

- **고정 크기 필드** (int, long, double): 인라인.

- **가변 크기 필드** (string, array): pointer + 뒷부분에 실제 데이터.

- **Null bitmap**: 각 필드의 null 여부.

- **Header 없음**: 순수 데이터만.

**결과**:

- 5개 필드 row: **~40 bytes** (JVM 객체 ~200 bytes에서).

- **5배 메모리 절약**.

**UnsafeRow**:

Spark의 구현 클래스:

public class UnsafeRow extends BaseMutableRow {

private Object baseObject; // byte[] 또는 null (off-heap)

private long baseOffset; // 시작 위치

private int numFields;

private int sizeInBytes;

}

**직접 메모리 접근** (`sun.misc.Unsafe`):

public long getLong(int ordinal) {

assertIndexIsValid(ordinal);

return Platform.getLong(baseObject, baseOffset + ordinal * 8L);

}

JVM을 거치지 않고 **포인터 산술**로 바로 메모리 접근. Unsafe는 이름대로 안전장치를 건너뛴다. JVM이 검증 안 함.

**효과 1: 메모리 절약**

1억 row, 10 필드 DataFrame:

- JVM object: ~30 GB.

- Tungsten: **~5 GB**.

**6배** 적은 메모리. 같은 클러스터에서 6배 큰 데이터 처리 가능.

**효과 2: GC pressure 제거**

JVM GC는 **많은 작은 객체**에서 느려진다. Young generation을 계속 순회해야.

Tungsten은 **적은 수의 큰 배열**만 사용:

- Off-heap: GC 대상 아예 아님.

- On-heap: GC 하더라도 객체 수가 적음.

**결과**: Full GC 빈도 대폭 감소. Pause time 대폭 감소.

**효과 3: Cache friendliness**

CPU cache는 **연속된 메모리**를 선호 (spatial locality). 각 cache line은 64 bytes.

JVM 객체:

Integer 1 @ address 0x1000

Integer 2 @ address 0x8000 ← 멀리 떨어짐 (heap 할당 랜덤)

Integer 3 @ address 0x4000

하나 읽을 때마다 **cache miss 가능성**. 수십 ~ 수백 cycles 낭비.

Tungsten:

[field1][field2][field3][field4]... 연속

한 번의 메모리 접근이 여러 필드를 **cache에 로드**. 다음 접근은 hit.

**성능**: **2~5배 향상** (메모리 집약적 워크로드).

**효과 4: Direct serialization**

UnsafeRow는 **이미 byte array**. 네트워크로 전송 시:

- Java 객체: Kryo/Java serialization 필요 (느림).

- UnsafeRow: **그대로 전송** (zero serialization).

Shuffle이 훨씬 빠름.

**효과 5: Vectorized 처리의 기반**

연속 메모리는 **SIMD (벡터화 명령어)** 친화적:

// 일반

for (int i = 0; i < n; i++) {

result[i] = a[i] + b[i];

}

// SIMD (4개 동시 처리)

for (int i = 0; i < n; i += 4) {

_mm_store_ps(result + i, _mm_add_ps(_mm_load_ps(a + i), _mm_load_ps(b + i)));

}

Tungsten의 연속 메모리가 SIMD의 기반. JVM은 자체적으로 SIMD를 잘 활용하지 못하지만, 데이터 레이아웃이 친화적이면 JIT이 인식.

**비용과 제약**:

**복잡도**: Tungsten은 **매우 낮은 레벨**이다. 버그가 있으면 JVM crash. Spark 팀이 조심스럽게 개발.

**디버깅 어려움**: JVM heap dump, profiler가 덜 유용.

**특정 연산의 오버헤드**: 가변 크기 필드(string) 수정이 복잡.

**Python 호환**: PySpark에서 UnsafeRow를 Python 객체로 변환할 때 비용.

**Arrow와의 협력**:

최근 Spark는 **Apache Arrow**와 통합:

- Arrow: 메모리 columnar 포맷.

- Python ↔ JVM 교환 시 Arrow 사용.

- Pandas UDF가 Arrow 덕분에 10배 빠름.

Tungsten과 Arrow는 **서로 보완**. Tungsten은 row-wise 처리, Arrow는 column-wise 교환.

**Spark 3.x의 Vectorized Execution**:

Spark 3.0+는 일부 연산자에 columnar 처리 도입:

- Parquet/ORC vectorized reader.

- Columnar aggregation.

- 4096 rows batch 단위.

Tungsten row format에서 columnar batch로 진화. DuckDB, ClickHouse의 방식을 따라감.

**교훈**:

Tungsten은 Spark의 **"underlying 재발명"**이다. 표면 API는 그대로지만 내부가 완전히 바뀌었다. 사용자는 변경 없이 2~10배 성능 향상.

이는 **"하드웨어를 존중하는 설계"**의 중요성을 보여준다:

- CPU cache가 싫어하는 것 (random access) 피하기.

- GC의 약점 (작은 객체) 회피.

- Memory bandwidth 아끼기.

JVM이 "추상화"를 제공하지만, 고성능이 필요하면 그 추상화를 **영리하게 우회**해야 한다. Tungsten은 "Java/Scala 속에서 C 수준 성능을 낸다"는 도전의 답이다.

모든 빅데이터 엔진이 결국 같은 방향으로 진화: **바이너리 포맷, 연속 메모리, 벡터화**. Tungsten은 Spark의 해답이었고, 같은 아이디어가 Flink, Presto, Snowflake에서도 발견된다. **수렴 진화**의 예다.

**A.**

**전통 Volcano 모델**:

1970년대부터 쓰인 query execution model. 각 연산자는 **iterator**:

interface QueryOperator {

Row next();

boolean hasNext();

}

**예시: `SELECT name FROM users WHERE age > 20`**

Project ← Filter ← Scan

실행:

Project.next() {

while (true) {

Row row = Filter.next();

if (row == null) return null;

return new Row(row.getString("name"));

}

}

Filter.next() {

while (true) {

Row row = Scan.next();

if (row == null) return null;

if (row.getInt("age") > 20) return row;

}

}

Scan.next() {

return disk.readNextRow();

}

각 연산자가 **독립 객체**. `next()`를 계속 호출.

**Volcano의 비용**:

1. **Virtual function calls**: `next()`는 인터페이스 메소드. JVM은 virtual dispatch 사용 → **inline 어려움**.

2. **Object allocation**: 매 row마다 새 Row 객체 생성 가능성. GC 부담.

3. **Branch prediction 실패**: 연산자마다 다른 코드 경로 실행. CPU pipeline 예측 실패.

4. **Cache miss**: 매 연산자의 코드가 다른 메모리 위치.

5. **Boxing**: Java의 경우 `Integer` 같은 wrapper 사용 → heap 할당.

**결과**: 단순 쿼리도 **Volcano 오버헤드만으로 2-5배 느림**.

**Whole-Stage Code Generation의 답**:

**아이디어**: "**여러 연산자를 하나의 함수로 병합**". 런타임에 Java 코드를 **생성**하고 컴파일.

**예시: 같은 쿼리**

**Volcano (의사 코드)**:

for each row from Scan:

call Filter.next(row)

call Project.next(filter_output)

output project_output

3개 iterator, 수많은 함수 호출.

**WSCG가 생성하는 코드**:

public class GeneratedIterator {

public Object[] next() {

while (scanner.hasNext()) {

InternalRow row = scanner.next();

int age = row.getInt(2); // age 컬럼

if (age > 20) { // filter inlined

UTF8String name = row.getUTF8String(1); // name 컬럼

return new Object[]{ name }; // project inlined

}

// 조건 불만족, 다음 row로

}

return null; // 끝

}

}

**핵심 차이**:

1. **단일 함수**: Scan, Filter, Project가 **한 함수의 내부**. 함수 호출 없음.

2. **Inline**: 컴파일러(JVM JIT)가 모든 것을 인라인. Virtual call 제거.

3. **직접 메모리 접근**: `row.getInt(2)`는 Tungsten의 `Unsafe.getInt(address)`.

4. **단순한 loop**: 하나의 while 루프. CPU pipeline 친화적.

**성능 향상**:

Databricks 벤치마크 (TPC-DS):

- Volcano: 100%.

- WSCG: **400% ~ 1000%**.

**4-10배 향상**. 마법이 아니라 오버헤드 제거.

**어떻게 작동하는가**:

1. **Spark의 최적화 단계**:

- Logical plan → Physical plan.

- 여러 연산자를 **stage**로 그룹화 (narrow dependency 내).

- 각 stage가 WSCG 대상.

2. **Code generation**:

- Physical plan 트리를 순회.

- 각 연산자가 자기 코드 조각 제공.

- 합쳐서 **Java source code** 생성.

3. **Compilation**:

- **Janino** (가벼운 Java compiler)로 컴파일.

- 바이트코드 생성.

- `ClassLoader`에 로드.

4. **Execution**:

- 컴파일된 클래스의 인스턴스 생성.

- 일반 Java 메소드처럼 호출.

- JVM JIT가 최적화 (hot path → native code).

**실전 예시**:

spark.range(1, 1000000)

.filter($"id" > 500000)

.select($"id" * 2)

.count()

EXPLAIN 결과:

*(1) Filter (id#0L > 500000)

+- *(1) Range (1, 1000000, step=1, splits=...)

`*(1)`이 WSCG stage 번호. Range, Filter, Project가 하나의 코드로 통합.

`explain("codegen")`으로 실제 생성된 코드 확인 가능:

public Object generate(Object[] references) {

return new GeneratedIteratorForCodegenStage1(references);

}

final class GeneratedIteratorForCodegenStage1 extends BufferedRowIterator {

// ... 수백 줄의 생성된 코드

}

**제약**:

1. **64KB 메소드 한계**: JVM의 단일 메소드가 64KB 초과 불가. 너무 많은 연산자 병합 시 분할.

2. **복잡한 연산자**: Window function, UDF 등은 codegen 어려움. Fall back to Volcano.

3. **디버깅 어려움**: 생성된 코드를 디버거로 추적 힘듦.

4. **컴파일 오버헤드**: 매 쿼리마다 컴파일. 작은 쿼리엔 역효과 가능.

**WSCG vs Vectorized**:

Spark의 WSCG는 **row-at-a-time + inline**. 또 다른 접근:

**Vectorized Execution** (ClickHouse, DuckDB, Presto 일부):

- Batch 단위 (1024 rows).

- SIMD 명령 활용.

- Columnar 처리.

장점: SIMD의 힘. 단점: 구현 복잡.

Spark는 두 접근을 **조합**:

- Parquet reader: vectorized (columnar).

- 연산자: WSCG (row-wise).

- 향후: 더 많은 vectorization 도입 중.

**Hyper / Umbra 비교**:

독일 TU Munich의 **HyPer/Umbra** 연구 시스템은 더 급진적:

- **LLVM IR로 컴파일**.

- 네이티브 코드 (JIT 없음).

- 성능이 C++ 손 작성 코드 수준.

Spark WSCG는 Java 바이트코드까지. JVM JIT에 의존. Umbra만큼은 아니지만 **훨씬 간단하고 이식성 있다**.

**Rough numbers**:

- Volcano: baseline.

- WSCG: 4-10x faster.

- Vectorized: 4-10x faster.

- LLVM-based: 10-20x faster.

Spark는 "충분히 빠르면서 구현 가능"의 균형을 찾았다.

**교훈**:

WSCG는 **전통 지혜의 재평가**다. Volcano 모델이 40년간 지배한 이유는 **우아함과 유연성**이지만, 현대 CPU 아키텍처 (deep pipeline, cache, SIMD)에선 **적합하지 않다**.

해결: "**연산자 추상화를 유지하면서도 런타임에 구체화**". 이것이 codegen의 본질.

이는 **"지연된 optimization"** 철학이다:

- 사용자는 high-level 코드 작성.

- 시스템이 물리적 최적화.

- 컴파일러의 전통적 역할을 런타임으로.

JIT 컴파일러가 수십 년간 해온 일이지만, 빅데이터에 적용한 것은 Spark가 처음 크게 성공시켰다. 이후 거의 모든 현대 데이터 엔진이 이 패턴을 따르고 있다.

현대 SQL 엔진의 성능은 **하드웨어 성능의 몇 % 를 사용하는가**의 경쟁이다. Volcano는 10%, WSCG는 40%, vectorized는 60%, LLVM-based는 80%. 각 진화가 이 수치를 밀어올린다.

**A.**

**Static optimization의 한계**:

전통 쿼리 optimizer (Catalyst 포함)는 **실행 전**에 최적화를 결정한다. 근거는:

- 통계 (`ANALYZE TABLE`).

- 데이터 소스 메타데이터.

- Schema.

**문제**: **통계가 부정확하거나 최신이 아닐 수 있다**:

1. 데이터가 변경됨 (ANALYZE 이후).

2. 통계 계산이 비쌈 (대용량).

3. 복잡한 조건의 selectivity 추정 어려움.

4. Join 결과 크기 예측 어려움.

**결과**: 잘못된 계획 선택 → 느린 쿼리.

**AQE의 기본 아이디어**:

"**Stage 경계에서 실제 통계를 수집**하고, **남은 계획을 재최적화**"

Spark는 이미 shuffle 단계에서 **중간 결과**가 디스크에 저장된다. 이 데이터의 **실제 크기**를 측정해서 남은 단계를 최적화할 수 있다.

**AQE의 세 가지 최적화**:

**1. Dynamically Coalescing Shuffle Partitions**

**문제**: `spark.sql.shuffle.partitions` 기본값 200. 실제 데이터 크기와 무관.

**시나리오**:

- 작은 table filter 결과: 100 MB.

- 200 partition = **각 500 KB**.

- 200개의 매우 작은 task. 오버헤드 비율 높음.

**AQE 해결**:

- Shuffle 완료 후 **실제 partition 크기 측정**.

- 작은 partition들을 **병합**.

- 예: 200 → **20** (각 ~5 MB).

- Task 수 감소, 오버헤드 감소, 성능 향상.

**설정**:

spark.sql.adaptive.enabled=true

spark.sql.adaptive.coalescePartitions.enabled=true

spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB

spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB

**2. Dynamically Switching Join Strategies**

**문제**: Join 전략 선택 시 table 크기 추정이 부정확.

**시나리오**:

SELECT * FROM big JOIN medium ON big.id = medium.id

WHERE medium.status = 'active'

**계획 시 추정**: `medium` 크기 = 100 MB → SortMergeJoin 선택.

**실제 실행**: 필터 후 `medium.active` = **5 MB**. Broadcast가 훨씬 나았을 것.

**AQE 해결**:

- Medium 필터 결과를 shuffle.

- 실제 크기가 5 MB임을 확인.

- **BroadcastHashJoin으로 동적 전환**.

- 수 배 빠름.

**예시**:

Before AQE: 120 seconds (SortMergeJoin)

After AQE: 25 seconds (auto-switched to BroadcastHashJoin)

**설정**:

spark.sql.adaptive.autoBroadcastJoinThreshold=10MB

**3. Dynamically Optimizing Skew Joins**

**문제**: Data skew. 일부 key가 과도하게 많음.

**시나리오**:

SELECT * FROM orders JOIN users ON orders.user_id = users.id

- 대부분 user는 10~100 orders.

- 한 user (봇, 관리자)가 1,000,000 orders.

- 해당 partition이 **100배 크다**.

- 그 task가 **100배 오래 걸림**.

- 나머지 tasks는 빨리 끝나지만 **skewed task 기다림**.

**전통적 해결**: Salting (수동, 복잡).

**AQE 해결**:

1. Shuffle 후 각 partition 크기 확인.

2. **평균의 5배 이상** 크면 skewed로 판단.

3. Skewed partition을 **여러 sub-partition으로 분할**.

4. 각 sub-partition이 다른 task에서 처리.

5. 반대쪽 partition도 **복제** (broadcast 필요).

**예시**:

Before:

Task 1: 1 GB (skewed user) → 100 seconds

Task 2: 10 MB → 1 second

Task 3: 10 MB → 1 second

...

Total time: 100 seconds (Task 1 bound)

AQE divides Task 1 into 10 sub-tasks:

Task 1a: 100 MB → 10 seconds

Task 1b: 100 MB → 10 seconds

...

Task 2: 10 MB → 1 second

Total time: 10 seconds (10x faster)

**설정**:

spark.sql.adaptive.skewJoin.enabled=true

spark.sql.adaptive.skewJoin.skewedPartitionFactor=5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

**4. Dynamic Partition Pruning (DPP)** (AQE와 함께 발전)

Star schema의 fact + dimension join:

SELECT f.amount

FROM fact_sales f JOIN dim_date d ON f.date_id = d.id

WHERE d.year = 2024

**전통**: `fact_sales` 전체 스캔 + join + filter.

**DPP**:

1. `dim_date`에서 year=2024인 id 먼저 추출.

2. 이 id들을 **fact_sales의 partition filter로 pushdown**.

3. `fact_sales` 스캔 시 **해당 partition만 로드**.

**효과**:

- 파티션된 fact_sales (year별)이면 **1 TB → 50 GB**.

- **20배 빠른 쿼리**.

**실측 효과**:

**Databricks TPC-DS 벤치마크**:

- AQE 비활성: 100% (baseline).

- AQE 활성: **130~200%**.

- 일부 쿼리: **3배+ 향상**.

- 평균: **1.8배**.

**거의 공짜**. 기존 코드 변경 없이.

**Spark 버전별**:

- **Spark 3.0**: AQE 도입 (기본 비활성).

- **Spark 3.2**: AQE 기본 활성화.

- **Spark 3.3+**: 개선 지속.

**내부 작동**:

AQE는 Spark의 **query execution을 stage 단위로** 실행하되, **stage 사이에 pause**한다:

1. Stage 1 실행.

2. Stage 1 완료 → shuffle 파일 생성.

3. **Shuffle 파일의 실제 통계 수집**.

4. Stage 2의 physical plan을 **re-optimize**.

5. Stage 2 실행.

6. 반복.

이 **stage 단위 통계 → 재최적화** 루프가 AQE의 핵심.

**한계와 함정**:

**1. Streaming에 제한적**: Structured Streaming은 AQE 덜 적용됨.

**2. 작은 쿼리엔 불필요**: Re-optimization 오버헤드가 작은 쿼리에선 역효과.

**3. 복잡한 쿼리 예측 어려움**: 매우 복잡한 쿼리는 여전히 수동 튜닝 필요.

**4. DPP와의 상호작용**: 복잡한 경우 예측 안 되는 계획 변경.

**실전 권장**:

기본 설정 (Spark 3.2+는 이미 기본)

spark.sql.adaptive.enabled=true

spark.sql.adaptive.coalescePartitions.enabled=true

spark.sql.adaptive.skewJoin.enabled=true

파라미터 조정

spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB # 목표 partition 크기

spark.sql.adaptive.coalescePartitions.minPartitionSize=32MB # 최소

spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 # skew 기준

**교훈**:

AQE는 **"optimization은 한 번에 끝나지 않는다"** 는 철학의 구현이다. 전통 optimizer는 **ahead-of-time** (실행 전 모든 결정), AQE는 **just-in-time** (필요 시 재최적화).

이는 **JIT 컴파일러**의 철학과 같다:

- 처음엔 inexpert 결정.

- 런타임 관찰.

- 최적화.

분산 데이터 처리에서 같은 원칙 적용. Spark가 선구자였고, 이후 Trino, Presto도 유사 기능 추가.

**"정적 vs 동적 최적화"**의 트레이드오프:

- **정적**: 단순, 예측 가능. 완전 최적은 아닐 수 있음.

- **동적**: 복잡, 오버헤드. 실제 데이터에 적응.

AQE는 이 둘 사이의 **실용적 타협**이다. 완벽한 동적 재계획은 아니지만, stage 경계에서 핵심 결정을 조정함으로써 대부분의 이득을 얻는다.

**미래**:

AQE는 계속 발전 중:

- **Multi-stage re-optimization**.

- **Cost model 개선**.

- **ML 기반 최적화** (실험적).

- **Streaming adaptation**.

Spark가 10년 넘게 데이터 처리 영역을 선도하는 이유는 **이런 지속적 혁신** 때문이다. "충분히 좋다"에 만족하지 않고, 매 버전마다 의미 있는 개선을 가져온다. AQE는 그 한 예이며, 앞으로도 더 나아질 것이다.

마치며: 데이터의 정교한 엔진

핵심 정리

1. **RDD**: 메모리 기반 분산 컬렉션. Lineage로 장애 복구.

2. **DataFrame**: 구조화된 API. Schema + optimizer.

3. **Catalyst**: Rule + Cost 기반 SQL 최적화.

4. **Tungsten**: Off-heap binary format. CPU 캐시 친화.

5. **WSCG**: 런타임 Java 코드 생성. Volcano 오버헤드 제거.

6. **Shuffle**: 분산의 핵심이자 병목. Sort shuffle 기본.

7. **AQE**: 런타임 통계로 재최적화. 30-200% 향상.

Spark가 우리에게 준 것

Spark는 **빅데이터 처리의 민주화**다. MapReduce 시대엔:

- Java로 map/reduce 작성.

- 복잡한 최적화 수동.

- 느린 반복.

Spark 이후:

- SQL 또는 간단한 DataFrame API.

- 자동 최적화 (Catalyst + Tungsten + AQE).

- 빠른 반복.

- 배치 + 스트림 + ML 통합.

**진입 장벽이 낮아지면서 더 많은 사람이 데이터를 다룰 수 있게** 되었다. 데이터 분석가도 SQL로 복잡한 분산 쿼리 실행. 데이터 과학자도 머신러닝 파이프라인을 PySpark로.

실전 권장

- **기본 설정 이해**: `spark.executor.memory`, `spark.sql.shuffle.partitions`, AQE.

- **Broadcast join 활용**: 작은 dimension 테이블.

- **Partition column 선택**: 쿼리 패턴에 맞게.

- **EXPLAIN 읽기**: 느린 쿼리의 첫 단계.

- **Spark UI 활용**: Task 분포, GC time 모니터.

- **적절한 file format**: Parquet/ORC 선호.

마지막 교훈

Spark의 성공은 **기술적 우수성** + **타이밍** + **생태계**의 합이다:

1. **기술**: RDD, Catalyst, Tungsten의 혁신.

2. **타이밍**: 메모리 가격 하락 + 빅데이터 수요 폭발.

3. **생태계**: Python/R 지원, ML, 스트리밍, SQL 모두 통합.

이 세 가지 중 하나라도 빠졌으면 Spark는 지금 위치에 없을 것이다. **기술은 필요하지만 충분하지 않다**는 교훈이다.

당신이 다음에 Spark job을 실행할 때, 아래에서 벌어지는 일을 기억하자:

- Catalyst가 수십 개의 rule로 쿼리 재작성.

- Tungsten이 바이너리 포맷으로 메모리 절약.

- WSCG가 Java 코드 생성 + 컴파일.

- AQE가 실행 중 통계로 재최적화.

- Shuffle이 데이터 재분배.

이 모든 것이 **수 ms~수 초** 안에 일어난다. 그리고 당신은 단지 `.filter().groupBy().count()`만 썼다. 이것이 현대 데이터 엔지니어링의 마법이다.

참고 자료

- [Apache Spark Documentation](https://spark.apache.org/docs/latest/)

- [Learning Spark, 2nd Edition (Jules Damji et al.)](https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/)

- [Spark SQL: Relational Data Processing (Armbrust et al., 2015)](https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf)

- [Resilient Distributed Datasets (Zaharia et al., 2012)](https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf)

- [Databricks Engineering Blog](https://www.databricks.com/blog/category/engineering)

- [Spark Internals by Reynold Xin](https://www.slideshare.net/rxin/)

- [Project Tungsten: Bringing Spark Closer to Bare Metal](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html)

- [Apache Spark as a Compiler (Databricks Blog)](https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html)

- [Adaptive Query Execution in Apache Spark](https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html)

- [High Performance Spark (Holden Karau, Rachel Warren)](https://www.oreilly.com/library/view/high-performance-spark/9781491943199/)

현재 단락 (1/1201)

2010년 UC Berkeley의 AMPLab에서 **Matei Zaharia**가 논문을 발표했다: "**Resilient Distributed Datasets: A Fault-...

작성 글자: 0원문 글자: 34,539작성 단락: 0/1201