Skip to content

✍️ 필사 모드: Apache Spark 내부 완전 가이드 2025: RDD, Catalyst Optimizer, Tungsten, Whole-Stage Codegen, Shuffle 심층 분석

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.

들어가며: Spark는 어떻게 MapReduce를 쫓아냈는가

2010년, UC Berkeley에서

2010년 UC Berkeley의 AMPLab에서 Matei Zaharia가 논문을 발표했다: "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing".

당시 빅데이터의 왕은 Hadoop MapReduce였다. Spark는 두 가지 주장을 했다:

  1. 메모리 기반으로 100배 빠르다.
  2. API가 훨씬 단순하다.

처음엔 MapReduce 진영이 회의적이었다. 하지만 Spark는:

  • 2013: Apache에 기증.
  • 2014: Databricks 창업.
  • 2016: Spark 2.0 (DataFrame, Catalyst, Tungsten).
  • 2020: Spark 3.0 (Adaptive Query Execution).
  • 현재: 빅데이터 처리의 사실상 표준.

Hadoop MapReduce는 사라졌다. Spark는 전 세계 수만 기업의 데이터 파이프라인의 중추가 되었다.

왜 이렇게 빨라졌는가

Spark의 성능 혁명은 세 가지 축에서 나왔다:

  1. RDD (Resilient Distributed Dataset): In-memory 연산.
  2. Catalyst Optimizer: 쿼리 최적화.
  3. Tungsten: CPU와 메모리 최적화.

이 세 가지가 합쳐져 MapReduce 대비 100배, 때로는 1000배 성능을 낸다.

이 글에서 다룰 것

  1. RDD의 탄생과 lineage.
  2. DataFrame/Dataset: 구조화된 데이터.
  3. Catalyst optimizer: 쿼리 재작성.
  4. Physical plan과 cost-based 최적화.
  5. Tungsten project: CPU 캐시 친화적 실행.
  6. Whole-stage code generation: JIT으로 코드 생성.
  7. Shuffle: 분산 집계의 비밀.
  8. Adaptive Query Execution: 런타임 최적화.
  9. 실전 튜닝.

왜 이 지식이 중요한가

  • 데이터 엔지니어: Spark 튜닝의 80%는 내부 이해.
  • DA/DS: 왜 이 쿼리가 느린지 설명.
  • 플랫폼 엔지니어: 클러스터 최적화.
  • 다른 시스템 학습: Dask, Ray, Flink가 Spark의 개념을 차용.

1. RDD: 모든 것의 시작

탄생 배경

2009년경 빅데이터 처리는 MapReduce가 지배했다. 문제는:

  1. 반복 알고리즘이 극도로 느림 (ML, 그래프).
  2. 각 Map-Reduce job이 디스크를 왕복.
  3. 인터랙티브 분석이 불가능 (매번 몇 분~시간).

Zaharia의 통찰: "메모리에서 데이터를 유지하면 되지 않을까?"

문제: 메모리는 휘발성. 노드 장애 시 어떻게?

해결: Lineage (계보). 데이터가 아닌 변환 연산의 기록을 유지. 장애 시 재계산.

RDD란

RDD (Resilient Distributed Dataset):

  • Resilient: 장애에 견딤 (lineage로).
  • Distributed: 클러스터에 분산.
  • Dataset: 데이터 집합.

핵심 특성:

  1. Immutable: 한 번 만들어지면 변경 불가.
  2. Lazy: Transformation은 action이 불릴 때까지 실행 안 됨.
  3. Typed: Java/Scala의 타입 시스템 활용.
  4. Partitioned: 여러 worker에 분산.

Transformation vs Action

Transformation: 새 RDD 생성. Lazy.

val lines = sc.textFile("data.txt")  // RDD[String]
val words = lines.flatMap(_.split(" "))  // 아직 실행 안 됨
val counts = words.map(w => (w, 1))
val reduced = counts.reduceByKey(_ + _)  // 여전히 실행 안 됨

Action: 실제 계산 트리거. Eager.

reduced.collect()  // 여기서 전체 체인이 실행됨

왜 Lazy인가

Lazy evaluation의 장점:

  1. 최적화 기회: 여러 transformation을 한 번에 처리. 예: map → filter → map을 하나의 pass로.
  2. 불필요한 계산 회피: take(10)만 필요하면 10개만.
  3. Pipeline: 중간 데이터를 메모리에 쌓지 않음.

Lineage와 Fault Tolerance

val rdd1 = sc.textFile("data.txt")      // Dependency: HDFS
val rdd2 = rdd1.map(x => x.toUpperCase)  // Dependency: rdd1
val rdd3 = rdd2.filter(x => x.startsWith("A"))  // Dependency: rdd2

만약 rdd3를 계산 중에 한 partition이 노드 장애로 손실되면:

  1. Lineage 확인: rdd3 → rdd2 → rdd1 → file.
  2. 손실된 partition을 재계산.
  3. 다른 partition은 그대로.

장점: Replication 없이 내결함성. 디스크 I/O 없음.

단점: Lineage가 길면 재계산이 오래 걸림. Checkpoint로 해결 (중간 저장).

Narrow vs Wide Dependency

RDD 간 의존성은 두 종류:

Narrow dependency:

  • 자식 partition이 부모의 소수 partition에 의존.
  • 예: map, filter, union.
  • 한 stage 내에서 pipeline 실행 가능.
  • 재계산 시 영향 범위 작음.

Wide dependency (shuffle dependency):

  • 자식 partition이 부모의 여러 partition에 의존.
  • 예: groupByKey, reduceByKey, join.
  • Shuffle 필요: 데이터 재분산.
  • Stage 경계.
Narrow:                Wide:
Parent  Parent         Parent  Parent  Parent
  |       |              |   /  |  \   |
  ▼       ▼              ▼  ▼   ▼  ▼   ▼
Child1  Child2         ↓   ↓   ↓    ↓  ↓
                       Shuffle 발생
                       ↓    ↓    ↓
                      Child1 Child2 Child3

DAG와 Stage

Spark가 액션을 받으면:

  1. RDD graph를 DAG로 변환.
  2. Wide dependency를 경계로 stage 분할.
  3. 각 stage는 tasks로 분해 (partition 수 만큼).
  4. Stages를 순차적으로, task를 병렬로 실행.
Action: reduce()
Logical DAG:
  RDD1 → map → RDD2 → reduceByKey → RDD3 → collect
  
Stages:
  Stage 1: textFile → map (narrow, pipeline)
  [Shuffle]
  Stage 2: reduceByKey → collect

RDD의 한계

RDD는 훌륭했지만 몇 가지 약점:

  1. Schema 없음: 그냥 Java/Scala 객체의 컬렉션.
  2. Optimizer 없음: 사용자가 직접 최적화.
  3. 느린 직렬화: Java serialization.
  4. 메모리 오버헤드: JVM 객체의 overhead.

Spark 2.0은 이를 해결하기 위해 DataFrame/Dataset + Catalyst + Tungsten을 도입했다.


2. DataFrame & Dataset: 구조화 혁명

DataFrame의 등장

Spark 1.3 (2015)에서 DataFrame 도입. SQL의 테이블처럼 구조화된 RDD:

val df = spark.read.json("people.json")
df.show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Bob |
// | 25|Alice|
// +---+----+

df.filter($"age" > 20).select("name").show()

RDD와 차이:

  • Schema 있음: 컬럼과 타입 정보.
  • SQL처럼 조작: 고수준 API.
  • Optimizer 적용: Catalyst가 알아서 최적화.
  • Tungsten 메모리 포맷: JVM 객체 아닌 바이너리.

Dataset (Spark 1.6+)

Dataset: DataFrame + 타입 안전성.

case class Person(name: String, age: Int)

val ds = spark.read.json("people.json").as[Person]
ds.filter(_.age > 20).map(_.name)

장점:

  • 컴파일 타임 타입 체크.
  • Object API와 DataFrame 최적화 모두.

Scala/Java에만 존재. Python은 DataFrame만 사용 (Python은 정적 타입이 아니므로).

DataFrame = Dataset[Row]

사실 DataFrame은 Dataset[Row]의 alias. Row는 untyped.

// 같은 것
val df: DataFrame = spark.read.json(...)
val df: Dataset[Row] = spark.read.json(...)

성능 차이

단순 예시: 1억 행 필터링.

  • RDD: 100초.
  • DataFrame/Dataset: 15초.

6~7배 차이. 이유:

  1. Catalyst 최적화 (predicate pushdown 등).
  2. Tungsten 바이너리 포맷.
  3. Whole-stage codegen.

3. Catalyst Optimizer: 쿼리 재작성의 마법

Catalyst의 4단계

Catalyst는 SQL/DataFrame query를 4단계로 처리한다:

SQL or DataFrame
1. Analysis (unresolved plan → resolved logical plan)
2. Logical Optimization (rule-based)
3. Physical Planning (여러 물리 계획 생성 + 비용 비교)
4. Code Generation (Tungsten)
RDD (실행)

1단계: Analysis

입력: Unresolved logical plan.

SQL: SELECT name FROM users WHERE age > 20
     ↓ parser
UnresolvedRelation("users")
UnresolvedAttribute("age")
UnresolvedAttribute("name")

Catalog를 참조해서 resolve:

  • 테이블 이름 → 실제 데이터 소스.
  • 컬럼 이름 → 컬럼 타입.
  • 함수 → 실제 구현.

결과: Resolved Logical Plan.

2단계: Logical Optimization

Rule-based 최적화. 수십~수백 개의 rule을 반복 적용:

Constant Folding:

age * 2 + 3  (미리 계산)

Predicate Pushdown:

SELECT * FROM (table JOIN other) WHERE table.x > 10

      ↓ pushdown

SELECT * FROM (SELECT * FROM table WHERE x > 10) JOIN other

필터를 조인 전으로 밀어 넣음. 중간 결과 크기 감소.

Projection Pushdown:

SELECT name FROM users  
// users에 50개 컬럼 있어도 name만 읽음

Simplification:

WHERE true AND x > 10WHERE x > 10
WHERE false            →  never executes

Subquery Elimination:

SELECT * FROM t WHERE id IN (SELECT id FROM s)
SELECT * FROM t JOIN s ON t.id = s.id

Logical Plan 예시

val df = spark.sql("""
  SELECT u.name, COUNT(*) as orders
  FROM users u
  JOIN orders o ON u.id = o.user_id
  WHERE u.country = 'KR'
  GROUP BY u.name
""")

df.explain(true)

Parsed Logical Plan:

'Project ['u.name, 'COUNT(1) AS orders]
+- 'Filter ('u.country = KR)
   +- 'Join Inner, ('u.id = 'o.user_id)
      :- 'UnresolvedRelation `users`
      +- 'UnresolvedRelation `orders`

Analyzed Logical Plan:

Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Filter (country#12 = KR)
   +- Join Inner, (id#9 = user_id#15)
      :- Relation[id#9,name#10,age#11,country#12] parquet
      +- Relation[user_id#15,total#16] parquet

Optimized Logical Plan:

Aggregate [name#10], [name#10, count(1) AS orders#20]
+- Project [name#10]
   +- Join Inner, (id#9 = user_id#15)
      :- Project [id#9, name#10]
         +- Filter (country#12 = KR)
            +- Relation[...] parquet
      +- Project [user_id#15]
         +- Relation[...] parquet

변화:

  • Filter가 users 바로 위로 이동.
  • Projection이 각 relation 바로 위로.
  • 불필요한 컬럼 제거.

3단계: Physical Planning

논리 계획을 실행 가능한 물리 연산으로 변환. Catalyst는 여러 물리 계획을 생성하고 비교.

Join 선택 예시:

  • BroadcastHashJoin: 작은 테이블이면.
  • SortMergeJoin: 큰 테이블들.
  • ShuffledHashJoin: 중간 크기.
  • BroadcastNestedLoop: cartesian이면.

Catalyst는 각 옵션의 cost를 추정하고 가장 저렴한 것 선택.

4단계: Code Generation

선택된 물리 계획을 Java 바이트코드로 변환. 다음 섹션에서 자세히.

Cost-Based Optimizer (CBO)

Spark 2.2+부터 통계 기반 최적화:

ANALYZE TABLE users COMPUTE STATISTICS FOR COLUMNS id, name, age

이후 Catalyst가:

  • Table 크기, 행 수.
  • 컬럼별 min/max/null count.
  • 컬럼별 distinct count.

이 정보로 join 순서 최적화, selectivity 추정 등.

기본 RBO (rule-based optimizer)와 조합되어 더 나은 계획 생성.


4. Tungsten: CPU와 메모리의 재발명

Tungsten 프로젝트

Spark 1.5 (2015)에 시작. "Spark 성능을 하드웨어 한계까지".

핵심 목표:

  1. JVM 메모리 오버헤드 제거.
  2. CPU 캐시 활용.
  3. Vectorized 실행.
  4. 코드 생성으로 CPU 파이프라인 최적화.

JVM 객체의 문제

Java 객체의 오버헤드:

Integer i = 42;

int 값은 4바이트지만, Integer 객체는:

  • Object header: ~16 바이트.
  • Value field: 4 바이트.
  • Padding: 4 바이트.
  • 총 24 바이트.

6배 오버헤드. 수백만 객체가 있으면 메모리 낭비 심각.

String 더 심함:

String s = "hello";
// Object: 40+ bytes
// char[] 객체: 24 bytes + 2 * 5 = 34 bytes
// 총 60+ bytes

해결: Off-heap Binary Format

Tungsten은 데이터를 JVM 힙 밖의 바이너리 포맷에 저장:

Row: [age=30, name="Alice"]

Binary encoding (off-heap):
[bitmap][age:4bytes][name_offset:4bytes][name_length:4bytes]...[name_data]
  • No JVM objects.
  • No GC pressure.
  • Cache-friendly layout.
  • 직접 메모리 접근: sun.misc.Unsafe.

UnsafeRow

Spark의 기본 row 표현:

public final class UnsafeRow extends MutableRow {
    private Object baseObject;  // byte[] or null (off-heap)
    private long baseOffset;
    private int sizeInBytes;
    private int numFields;
    // ...
}

각 필드 접근:

long getLong(int i) {
    return Platform.getLong(baseObject, baseOffset + offsets[i]);
}

Direct memory access. JVM 객체 생성 없음.

Cache Friendliness

Tungsten 포맷은 연속 메모리:

Row 0: [col0][col1][col2]
Row 1: [col0][col1][col2]
Row 2: [col0][col1][col2]

특정 컬럼만 읽으면:

col0: 0, 12, 24, ... (stride: row size)

Spatial locality 나쁨. 그러나 일반 접근 패턴에선 여전히 빠름.

Vectorized reader (Parquet 등)는 columnar 포맷으로 직접 읽음 → 더 캐시 친화적.

메모리 관리

Tungsten은 자체 메모리 매니저:

  • Execution memory: Shuffle, join, sort용.
  • Storage memory: Cache (persist).
  • 두 영역이 동적으로 서로 빌림.
  • Off-heap + on-heap 지원.

Spark 설정:

spark.executor.memory = 8g
spark.memory.fraction = 0.6  # execution + storage
spark.memory.storageFraction = 0.5  # storage의 최소 보장
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g

성능 효과

Tungsten 전후 벤치마크:

  • 메모리 사용: 30~70% 감소.
  • 쿼리 속도: 2~5배 향상.
  • GC pause: 크게 감소.

특히 aggregation, join 등 메모리 집약적 작업에서 효과 큼.


5. Whole-Stage Code Generation

전통 Volcano 모델의 문제

전통 SQL 엔진은 Volcano 모델 (iterator-based):

while (child.hasNext()) {
    Row row = child.next();
    if (predicate(row)) {
        process(row);
    }
}

각 연산자가 next()를 호출. 함수 호출 오버헤드:

  • Virtual function calls: CPU 파이프라인 깨짐.
  • Object allocation: row 객체 계속 생성.
  • Cache misses: 매번 다른 코드 경로.

단순 쿼리도 연산자 수에 비례하는 오버헤드.

아이디어: 통합 코드 생성

"여러 연산자를 하나의 함수로 병합". 예:

SELECT name FROM users WHERE age > 20

Volcano:

ScanFilterProject

세 개의 iterator 객체.

Codegen:

// 런타임에 생성된 코드
while (scanner.hasNext()) {
    Row row = scanner.next();
    if (row.getInt("age") > 20) {
        emit(row.getString("name"));
    }
}

한 개의 루프. 함수 호출 없음. 컴파일러가 최적화.

Whole-Stage CodeGen

Spark 2.0 (2016)의 Whole-Stage Code Generation (WSCG):

  1. Stage 단위로 합병: Shuffle 경계까지 모두.
  2. Java 소스 코드 생성: 런타임에.
  3. Janino로 즉시 컴파일.
  4. 직접 실행: JVM이 JIT 최적화.

결과:

  • 함수 호출 감소 (virtual call → inlined).
  • 중간 객체 제거.
  • CPU pipeline 활용.
  • 2~10배 성능 향상.

예시: 생성된 코드

SELECT id, name FROM users WHERE age > 20 AND country = 'KR'

생성된 Java 코드 (간소화):

public class GeneratedIterator {
    private scan_input_0;  // Parquet reader
    
    public boolean hasNext() { return scan_input_0.hasNext(); }
    
    public UnsafeRow next() {
        while (scan_input_0.hasNext()) {
            UnsafeRow row = scan_input_0.next();
            int age = row.getInt(2);
            if (age > 20) {
                UTF8String country = row.getUTF8String(3);
                if (country.equals("KR")) {
                    // project id, name
                    UnsafeRow output = /* build row */;
                    return output;
                }
            }
        }
        return null;
    }
}

모든 것이 인라인. 루프 하나. JIT이 바이트코드를 네이티브로 최적화.

Vectorized Execution과의 차이

Whole-stage codegen은 row-at-a-time 기반이다. 반면 vectorized executionbatch-at-a-time:

while (batch = next_batch()) {  // 1024 rows
    for (row : batch) {
        // process
    }
}

ClickHouse, Snowflake, DuckDB가 vectorized.

Spark의 접근은 둘 다 활용:

  • Columnar scan: Parquet, ORC를 vectorized로 읽음.
  • Row-based processing: Codegen으로 각 row 처리.
  • Spark 3.2+: 일부 연산자에 vectorized 지원 추가.

EXPLAIN으로 확인

df.explain("codegen")

출력:

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
...

실제 생성된 Java 코드를 볼 수 있다. 디버깅에 유용.


6. Shuffle: 분산 집계의 핵심

왜 Shuffle이 필요한가

Wide dependency (groupBy, join 등)는 데이터 재분배가 필요하다:

Stage 1 (4 partitions):
  P0: [(a,1), (b,2), (c,3)]
  P1: [(a,4), (d,5)]
  P2: [(b,6), (c,7), (e,8)]
  P3: [(d,9), (a,10)]

reduceByKey:
  모든 'a'는 같은 partition으로
  모든 'b'는 같은 partition으로
  ...

Stage 2 (4 partitions after shuffle):
  P0: [(a, [1,4,10])][(a,15)]
  P1: [(b, [2,6])][(b,8)]
  P2: [(c, [3,7])][(c,10)]
  P3: [(d, [5,9]), (e, [8])][(d,14),(e,8)]

Shuffle은 stage 사이의 데이터 이동. 네트워크와 디스크 I/O 집약적.

Shuffle 과정

Write 단계 (Map 측):

  1. 각 task가 로컬 파일에 결과 쓰기.
  2. 각 reduce task에 해당하는 partition으로 분류.
  3. 디스크에 저장 (나중에 reduce가 fetch).

Read 단계 (Reduce 측):

  1. 모든 map task로부터 자기 partition 데이터 fetch.
  2. 네트워크로 전송 (executor 간).
  3. 메모리에 모음.
  4. 처리 (sort, aggregate 등).

Shuffle의 비용

Shuffle은 Spark의 가장 비싼 연산:

  • 디스크 I/O: write + read.
  • 네트워크: 모든 executor 간 cross.
  • Serialization: 객체 ↔ 바이트.
  • Sort: 보통 필요.

규모:

  • 작은 job: 수 MB.
  • 큰 job: 수 TB.

1 TB shuffle = 많은 시간과 자원.

Shuffle 전략의 진화

1. Hash Shuffle (초기 Spark):

  • 각 map task가 각 reduce partition별로 별도 파일.
  • M × R 파일 (M=map tasks, R=reduce tasks).
  • 100 × 200 = 20,000 파일.
  • 파일 시스템 부담.

2. Sort Shuffle (1.1+, 기본):

  • 각 map task가 하나의 파일에 정렬된 결과.
  • 인덱스 파일로 partition 위치 표시.
  • M 파일만.
  • 훨씬 효율적.

3. Tungsten Sort Shuffle (1.5+):

  • Tungsten의 binary format 활용.
  • 캐시 친화적.
  • 더 빠름.

Broadcast Join

작은 테이블을 조인할 때의 최적화:

val small = ... // 10 MB
val big = ...   // 1 TB

val joined = big.join(broadcast(small), "key")

Broadcast:

  1. Driver가 small 테이블을 모든 executor에 복사.
  2. 각 executor가 big의 자기 partition과 로컬에서 조인.
  3. Shuffle 없음.

조건: 작은 테이블이 메모리에 들어가야. 기본 spark.sql.autoBroadcastJoinThreshold = 10 MB.

Shuffle join 대비: 수 배~수십 배 빠름.

Shuffle Partitioning

spark.sql.shuffle.partitions: shuffle 후 partition 수. 기본 200.

너무 작음:

  • 큰 partition.
  • 일부 task의 OOM.
  • 병렬성 부족.

너무 큼:

  • 많은 작은 파일.
  • 스케줄링 오버헤드.

가이드: 각 partition이 100~200 MB가 되도록.

Shuffle 최적화 기법

1. 파일 수 줄이기:

df.coalesce(10).write.parquet(...)

작은 partition들을 병합.

2. Partitioning 조정:

df.repartition(50, $"key")  // key 기반 재분할

3. Broadcast 강제:

val hint_df = big.join(broadcast(small), ...)
// 또는 SQL hint
// SELECT /*+ BROADCAST(small) */ ...

4. Skew 처리: 다음 섹션.


7. Adaptive Query Execution (AQE)

Spark 3.0의 킬러 기능

Adaptive Query Execution (AQE): 런타임에 수집한 통계를 바탕으로 쿼리 계획을 동적으로 재최적화.

spark.sql.adaptive.enabled = true  # 3.2+부터 기본 true

주요 기능

1. Dynamically coalescing shuffle partitions:

전통적 문제:

  • spark.sql.shuffle.partitions = 200을 정해둠.
  • 작은 shuffle은 200 partition이 과도.
  • 큰 shuffle은 부족.

AQE:

  • Shuffle 후 실제 데이터 크기 확인.
  • 작은 partition을 자동 병합.
  • 예: 200 → 20 (각 partition이 충분한 크기).

2. Dynamically switching join strategies:

계획 시:

  • SortMergeJoin 결정 (두 테이블 크기 추정).

실행 중:

  • 실제 한 테이블이 예상보다 작은 것 판명.
  • 동적으로 BroadcastHashJoin으로 전환.

3. Dynamically optimizing skew joins:

Skew 감지:

  • 특정 key의 partition이 평균보다 훨씬 큼.

AQE:

  • 큰 partition을 여러 sub-partition으로 분할.
  • 각각 별도 task로 실행.
  • 결과를 union.

실측 효과

Databricks 벤치마크 (TPC-DS):

  • AQE 전: 100%.
  • AQE 후: 130% 이상.
  • 일부 쿼리: 3배+ 향상.

거의 공짜로 얻는 성능. AQE는 Spark 3.0의 가장 중요한 개선 중 하나.


8. Dynamic Partition Pruning

문제

Star schema의 fact와 dimension join:

SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024

전통:

  1. fact_sales 전체 스캔 (테라바이트 급).
  2. dim_date 필터 (year=2024).
  3. Join.

fact_sales는 date_id로 partitioned되어 있어도 전체 스캔. Filter가 dim에만 있어서.

AQE + DPP

Spark 3.0의 Dynamic Partition Pruning:

  1. dim_date에서 year=2024인 id 먼저 수집.
  2. 그 id들을 fact_sales의 partition filter로 pushdown.
  3. fact_sales 스캔 시 해당 partition만 로드.

효과:

  • 수 TB → 수 GB.
  • 10~100배 빠른 쿼리.

조건

  • 파티션된 테이블.
  • Dimension 필터가 highly selective.
  • Spark 3.0+.

9. 실전 튜닝

메모리 설정

--conf spark.executor.memory=16g
--conf spark.executor.memoryOverhead=2g
--conf spark.executor.cores=4
--conf spark.executor.instances=10

원칙:

  • Executor당 cores 4~6: GC, I/O 균형.
  • Memory overhead = max(384MB, 10% of memory).
  • Instances: 클러스터 용량과 workload에 따라.

Shuffle 튜닝

--conf spark.sql.shuffle.partitions=200  # 기본, 조정
--conf spark.sql.adaptive.enabled=true
--conf spark.sql.adaptive.coalescePartitions.enabled=true
--conf spark.sql.adaptive.skewJoin.enabled=true

Partition 수 계산:

  • 총 데이터 / 목표 크기 (200 MB).
  • 예: 1 TB → 5000 partitions.

Broadcast 임계값

--conf spark.sql.autoBroadcastJoinThreshold=100MB

기본 10 MB는 너무 작을 수 있음. 100 MB까지 올리면 더 많은 broadcast join.

주의: Executor 메모리 확인. Broadcast 테이블이 모든 executor에 복제되므로.

Cache 전략

df.cache()  // MEMORY_AND_DISK (기본)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  // 직렬화 (작음)

언제 cache:

  • 여러 번 사용되는 DataFrame.
  • 계산 비용 큰 DataFrame.

주의: 메모리 부족하면 disk로 spill. 재계산이 더 나을 수 있음.

파티셔닝

Partitioned write:

df.write.partitionBy("year", "month").parquet(...)

파일을 year/month별 디렉토리로 저장. 쿼리 시 partition pruning으로 빠른 읽기.

주의: 너무 많은 partition은 작은 파일 문제.

Skew 수동 처리

AQE가 부족하면 수동:

// Salting
val salted = df.withColumn("salted_key", 
    concat($"key", lit("_"), (rand() * 10).cast("int")))

// Join 후 다시 그룹화

10. 성능 디버깅

Spark UI

Spark Web UI가 가장 중요한 도구:

  • Jobs: Action별 실행.
  • Stages: Stage별 task 분포.
  • SQL/DataFrame: 논리/물리 계획.
  • Executors: 리소스 사용.
  • Storage: Cache 상태.

주의할 Metric

Task duration 분포:

  • MedianMax 차이가 크면 skew.
  • 99th percentile이 나쁘면 병목.

Shuffle read/write:

  • 크기가 예상보다 크면 filter pushdown 실패.
  • 작으면 효율적.

GC time:

  • Task 시간의 10% 이상이면 GC 문제.
  • Memory 증설 또는 parallelism 증가.

Explain 명령어

df.explain()          // 물리 계획
df.explain(true)      // 4단계 모두
df.explain("cost")    // CBO 비용
df.explain("codegen") // 생성된 코드
df.explain("formatted") // 보기 쉬운 포맷

Spark SQL Shell

빠른 테스트:

spark-sql
> SELECT ...

대화형 쿼리와 EXPLAIN.


11. 흔한 함정과 교훈

함정 1: 너무 많은 작은 파일

증상: 작은 작업이 수 분 걸림.

원인: 수백만 개 작은 파일.

해결: coalesce 또는 repartition으로 병합.

함정 2: Skew

증상: 99% task는 빠르고 1%가 영원히.

원인: 한 key가 압도적으로 많음 (예: user_id = null).

해결:

  1. AQE의 skew join 활성화.
  2. Null key 필터.
  3. Salting.

함정 3: Broadcast OOM

증상: Executor OOM 발생.

원인: Broadcast 테이블이 너무 큼.

해결:

  • spark.sql.autoBroadcastJoinThreshold 낮춤.
  • 명시적 broadcast() 제거.
  • Sort-merge join으로.

함정 4: 너무 많은 Shuffle Partition

증상: 작업이 느리고 많은 작은 task.

원인: spark.sql.shuffle.partitions 너무 큼.

해결: AQE coalesce 활성화, 또는 수동 조정.

함정 5: Python UDF의 오버헤드

증상: PySpark 작업이 극도로 느림.

원인: Python UDF는 JVM ↔ Python 직렬화 왕복.

해결:

  • SQL 내장 함수 사용.
  • Pandas UDF (vectorized, Apache Arrow 사용).
@pandas_udf("double")
def my_udf(s: pd.Series) -> pd.Series:
    return s * 2

일반 UDF보다 10배 이상 빠름.

함정 6: 잘못된 Partition 수

증상: Under-utilization 또는 OOM.

원인:

  • 너무 적음: 큰 task, OOM.
  • 너무 많음: 오버헤드.

해결: 각 partition 100~200 MB 원칙.


퀴즈로 복습하기

Q1. RDD의 "lineage"가 어떻게 장애 복구를 가능하게 하는가?

A.

Lineage의 의미: RDD는 데이터 자체가 아닌 **"어떻게 만들어졌는지의 기록"**을 유지한다.

val rdd1 = sc.textFile("hdfs://data.txt")      // source
val rdd2 = rdd1.map(_.toUpperCase)              // parent: rdd1
val rdd3 = rdd2.filter(_.startsWith("A"))       // parent: rdd2
val rdd4 = rdd3.flatMap(_.split(","))           // parent: rdd3

각 RDD는 부모 RDD와 변환 함수를 기억한다. 이 관계를 이어가면 원천 데이터 소스부터 현재까지의 전체 계보가 된다.

장애 복구 원리:

시나리오: rdd4.collect() 실행 중 executor 하나가 죽음. 그 executor가 처리하던 rdd4의 partition 3이 사라짐.

복구 과정:

  1. 손실 감지: Driver가 task 실패 감지.
  2. Lineage 추적: rdd4 partition 3 → rdd3 partition 3 → rdd2 partition 3 → rdd1 partition 3.
  3. 재계산: 다른 executor에서 원천부터 다시 계산.
    • HDFS에서 원본 텍스트 partition 3 읽기.
    • map 적용.
    • filter 적용.
    • flatMap 적용.
  4. 완료: 손실된 partition 복구.

Key: Partition 단위 재계산. 전체 RDD가 아닌 손실된 partition만. 다른 partition들은 영향 없음.

Replication과의 비교:

Traditional fault tolerance (replication):

  • 모든 데이터를 여러 노드에 복제.
  • 네트워크 I/O 많음 (복제 시).
  • 디스크 사용 2~3배.
  • 장애 시 즉시 다른 복제본 사용.

RDD lineage:

  • Replication 없음. 데이터 한 번만.
  • 네트워크/디스크 절약.
  • 장애 시 재계산 (시간 걸림).
  • 메모리 기반 연산과 궁합 좋음.

왜 Spark의 선택은 lineage인가:

Spark는 반복 연산(ML, 그래프)을 타겟으로 했다. 이런 워크로드는:

  • 데이터가 메모리에 있어 빠름.
  • Lineage 재계산도 (메모리니까) 빠름.
  • Replication 비용이 반복적 쓰기로 누적.

Lineage의 한계:

1. Lineage가 길면 재계산이 오래:

rdd1 → rdd2 → rdd3 → ... → rdd100 → action

rdd99 partition 손실 시, rdd1부터 재계산. 수 분~시간 가능.

해결: Checkpoint:

rdd50.checkpoint()
  • rdd50을 디스크에 저장.
  • Lineage를 이 지점에서 "잘라냄".
  • 이후 복구는 rdd50부터.

2. Wide dependency의 문제:

rdd1 (4 partitions) --shuffle--> rdd2 (4 partitions)

rdd2 partition 3 손실 → rdd1의 모든 partition 필요 (shuffle이라 재분배). 비용이 큼.

해결: Shuffle 결과를 영구 저장. Spark가 기본으로 함 (shuffle files가 로컬 디스크에 남음).

3. 외부 시스템 부작용:

rdd.foreach(x => saveToDatabase(x))

재계산 시 중복 저장 위험. Lineage는 순수 함수 가정.

해결: Idempotent 연산, 또는 정확히 한 번 semantic을 별도 구현.

교훈:

RDD lineage는 **"replication 없이 내결함성"**을 가능하게 했다. 이것이 Spark가 MapReduce 대비 100배 빠를 수 있었던 기반이다. 메모리 연산 + 경량 복구 = 빅데이터의 새 패러다임.

현대 적용:

  • RDD lineage → DataFrame, Dataset 아래에서도 유지.
  • Checkpoint → 장기 실행 job에서 중요.
  • Structured Streaming에서도 비슷한 아이디어 (log-based recovery).

분산 시스템 철학적 관점:

Lineage는 "상태를 저장하지 말고, 만드는 방법을 저장하라" 는 접근. 이는 함수형 프로그래밍의 철학과 일치한다:

  • Immutable: RDD는 변하지 않음.
  • Referential transparency: 같은 입력 → 같은 출력 (재계산 가능).
  • Composition: RDD들의 체인.

Spark는 분산 컴퓨팅에 함수형 원칙을 적용해서 새 영역을 열었다. 그 단순하고 우아한 아이디어가 10년 넘게 이 분야를 이끌고 있다.

Q2. Catalyst Optimizer가 어떻게 쿼리를 "rewrite"하여 성능을 개선하는가?

A.

Catalyst의 핵심 아이디어: SQL/DataFrame 쿼리를 tree 로 표현하고, 수십~수백 개의 rewrite rule을 반복 적용해서 더 효율적인 tree로 변환한다.

4단계 처리 과정:

1. Parsing → Unresolved Logical Plan

SELECT name FROM users WHERE age > 20

Project [name]
  Filter (age > 20)
    UnresolvedRelation("users")

2. Analysis → Resolved Logical Plan

Catalog 참조로:

  • users → 실제 테이블 스키마.
  • name, age → 컬럼 참조.
  • 타입 검증.
Project [name#10]
  Filter (age#11 > 20)
    Relation[name#10, age#11, email#12] parquet

3. Logical Optimization → Optimized Logical Plan

이 단계가 진짜 마법. 많은 rule이 반복 적용된다:

Rule 1: Projection Pushdown

Project [name#10]
  Filter (age#11 > 20)
    Relation[name#10, age#11, email#12]  ← email 불필요

Project [name#10]
  Filter (age#11 > 20)
    Project [name#10, age#11]  ← 필요한 컬럼만
      Relation[name, age, email]

이점: Parquet reader가 name, age 컬럼만 읽음. email 스킵. I/O 절약.

Rule 2: Filter Pushdown to Data Source

Parquet/ORC는 predicate pushdown을 지원한다:

Filter (age > 20)
  Relation(parquet)

Relation(parquet) WITH FILTER (age > 20)

Parquet reader가 min/max 통계를 보고 row group 전체를 스킵. 수십 배 빠른 읽기.

Rule 3: Constant Folding

WHERE age > 18 + 2

WHERE age > 20

매 행마다 18+2를 계산하지 않음.

Rule 4: Simplification

WHERE true AND x > 10

WHERE x > 10
WHERE x > 10 OR false

WHERE x > 10

Rule 5: Join Reorder

SELECT * FROM a JOIN b ON a.id = b.id JOIN c ON b.cid = c.id
WHERE a.country = 'KR'

CBO (Cost-Based Optimizer)가 통계를 보고 최적 순서 결정:

  • a를 먼저 필터 (country='KR').
  • 작아진 ab 조인.
  • 결과와 c 조인.

Catalyst는 지수 개의 조인 순서를 탐색하고 가장 싼 것 선택.

Rule 6: Subquery Unnesting

SELECT * FROM orders
WHERE user_id IN (SELECT id FROM users WHERE country = 'KR')

SELECT o.* FROM orders o
INNER JOIN (SELECT id FROM users WHERE country = 'KR') u
ON o.user_id = u.id

Semi-join으로 변환. 훨씬 빠름.

Rule 7: Column Pruning Through Join

SELECT a.name FROM a JOIN b ON a.id = b.aid

b에서는 aid만 필요 (조인 키). 다른 컬럼 스킵.

Rule 8: Decimal 연산 최적화

SELECT SUM(price * quantity) FROM sales

Decimal 곱셈은 비쌈. Catalyst는 overflow를 고려해서 필요한 정밀도만 사용.

4. Physical Planning

논리 계획 → 실행 연산자:

Project → org.apache.spark.sql.execution.ProjectExec
FilterFilterExec
JoinSortMergeJoinExec (또는 BroadcastHashJoinExec)
AggregateHashAggregateExec

여러 물리 계획을 생성하고 cost를 비교해서 선택.

예: Join 전략 선택

SELECT * FROM big JOIN small ON big.id = small.id

Catalyst는 다음을 고려:

  • small 크기 < spark.sql.autoBroadcastJoinThreshold (기본 10MB)?
    • Yes: BroadcastHashJoin 선택 (small을 모든 executor에 복사, shuffle 없음).
    • No: SortMergeJoin 선택 (양쪽 shuffle).

Broadcast join은 수 배~수십 배 빠르다.

실전 효과:

간단한 쿼리:

SELECT u.name, SUM(o.amount)
FROM users u JOIN orders o ON u.id = o.user_id
WHERE u.country = 'KR'
GROUP BY u.name

최적화 전 (naive 실행):

  1. users 전체 스캔.
  2. orders 전체 스캔.
  3. 전체 join.
  4. filter (country='KR').
  5. group by.

1 TB orders 스캔. 수십 분.

최적화 후 (Catalyst):

  1. users에 filter pushdown (country='KR' → 10% 데이터만).
  2. Projection pushdown (id, name만 읽음).
  3. orders도 필요 컬럼만 읽음 (user_id, amount).
  4. Broadcast join (users가 충분히 작아서).
  5. group by.

I/O 90% 절약. 10배 빠른 쿼리.

Catalyst의 확장성:

Catalyst의 rule은 사용자 정의 가능:

object MyRule extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
    case Filter(p, child) => /* custom logic */
  }
}

spark.experimental.extraOptimizations = Seq(MyRule)

이 확장성이 Databricks의 Delta Lake, Iceberg 같은 프로젝트의 기반.

교훈:

Catalyst는 **"규칙 기반 트리 변환"**이라는 단순한 아이디어로 거대한 성과를 냈다. 각 rule은 단순하지만 조합되면 놀라운 최적화.

전통 DB (PostgreSQL, MySQL)도 쿼리 optimizer가 있지만 Catalyst의 강점:

  1. Scala tree API로 쉬운 rule 작성.
  2. Extensible: 외부 rule 추가.
  3. DataFrame API와 통합: SQL이 아닌 코드도 최적화.

결과: 기본 SQL을 작성해도 손으로 최적화한 것과 비슷한 성능. 이것이 Spark가 데이터 엔지니어에게 사랑받는 이유다.

Q3. Tungsten이 JVM 객체 대신 binary format을 쓰는 이유와 그 효과는?

A.

JVM 객체의 숨은 비용:

평범한 Java 코드:

Integer age = 30;
String name = "Alice";

메모리 사용을 보면:

  • Integer: 16 bytes header + 4 bytes value + 4 bytes padding = 24 bytes (4 bytes 데이터에)
  • String "Alice":
    • String 객체: ~40 bytes
    • 내부 char[]: 16 bytes header + 10 bytes (5 chars) + padding = ~32 bytes
    • 총: ~72 bytes (5 chars 데이터에)

실제 데이터 (int 4 + 5 chars = 9 bytes)가 96 bytes를 차지. 10배 오버헤드.

빅데이터에선 이것이 재앙:

1억 row에 5개 필드가 있다면:

  • 실제 데이터: ~5 GB.
  • JVM 객체로: ~50 GB.

메모리 부족. GC pressure. 캐시 miss.

Tungsten의 해결:

Row를 연속된 바이트 배열에 저장:

Row(age=30, name="Alice", email="a@b.c")

Tungsten format:
[null_bitmap:8B][age:8B][name_ptr:8B][email_ptr:8B][name:"Alice"][email:"a@b.c"]
  • 고정 크기 필드 (int, long, double): 인라인.
  • 가변 크기 필드 (string, array): pointer + 뒷부분에 실제 데이터.
  • Null bitmap: 각 필드의 null 여부.
  • Header 없음: 순수 데이터만.

결과:

  • 5개 필드 row: ~40 bytes (JVM 객체 ~200 bytes에서).
  • 5배 메모리 절약.

UnsafeRow:

Spark의 구현 클래스:

public class UnsafeRow extends BaseMutableRow {
    private Object baseObject;   // byte[] 또는 null (off-heap)
    private long baseOffset;      // 시작 위치
    private int numFields;
    private int sizeInBytes;
}

직접 메모리 접근 (sun.misc.Unsafe):

public long getLong(int ordinal) {
    assertIndexIsValid(ordinal);
    return Platform.getLong(baseObject, baseOffset + ordinal * 8L);
}

JVM을 거치지 않고 포인터 산술로 바로 메모리 접근. Unsafe는 이름대로 안전장치를 건너뛴다. JVM이 검증 안 함.

효과 1: 메모리 절약

1억 row, 10 필드 DataFrame:

  • JVM object: ~30 GB.
  • Tungsten: ~5 GB.

6배 적은 메모리. 같은 클러스터에서 6배 큰 데이터 처리 가능.

효과 2: GC pressure 제거

JVM GC는 많은 작은 객체에서 느려진다. Young generation을 계속 순회해야.

Tungsten은 적은 수의 큰 배열만 사용:

  • Off-heap: GC 대상 아예 아님.
  • On-heap: GC 하더라도 객체 수가 적음.

결과: Full GC 빈도 대폭 감소. Pause time 대폭 감소.

효과 3: Cache friendliness

CPU cache는 연속된 메모리를 선호 (spatial locality). 각 cache line은 64 bytes.

JVM 객체:

Integer 1 @ address 0x1000
Integer 2 @ address 0x8000  ← 멀리 떨어짐 (heap 할당 랜덤)
Integer 3 @ address 0x4000

하나 읽을 때마다 cache miss 가능성. 수십 ~ 수백 cycles 낭비.

Tungsten:

[field1][field2][field3][field4]... 연속

한 번의 메모리 접근이 여러 필드를 cache에 로드. 다음 접근은 hit.

성능: 2~5배 향상 (메모리 집약적 워크로드).

효과 4: Direct serialization

UnsafeRow는 이미 byte array. 네트워크로 전송 시:

  • Java 객체: Kryo/Java serialization 필요 (느림).
  • UnsafeRow: 그대로 전송 (zero serialization).

Shuffle이 훨씬 빠름.

효과 5: Vectorized 처리의 기반

연속 메모리는 SIMD (벡터화 명령어) 친화적:

// 일반
for (int i = 0; i < n; i++) {
    result[i] = a[i] + b[i];
}

// SIMD (4개 동시 처리)
for (int i = 0; i < n; i += 4) {
    _mm_store_ps(result + i, _mm_add_ps(_mm_load_ps(a + i), _mm_load_ps(b + i)));
}

Tungsten의 연속 메모리가 SIMD의 기반. JVM은 자체적으로 SIMD를 잘 활용하지 못하지만, 데이터 레이아웃이 친화적이면 JIT이 인식.

비용과 제약:

복잡도: Tungsten은 매우 낮은 레벨이다. 버그가 있으면 JVM crash. Spark 팀이 조심스럽게 개발.

디버깅 어려움: JVM heap dump, profiler가 덜 유용.

특정 연산의 오버헤드: 가변 크기 필드(string) 수정이 복잡.

Python 호환: PySpark에서 UnsafeRow를 Python 객체로 변환할 때 비용.

Arrow와의 협력:

최근 Spark는 Apache Arrow와 통합:

  • Arrow: 메모리 columnar 포맷.
  • Python ↔ JVM 교환 시 Arrow 사용.
  • Pandas UDF가 Arrow 덕분에 10배 빠름.

Tungsten과 Arrow는 서로 보완. Tungsten은 row-wise 처리, Arrow는 column-wise 교환.

Spark 3.x의 Vectorized Execution:

Spark 3.0+는 일부 연산자에 columnar 처리 도입:

  • Parquet/ORC vectorized reader.
  • Columnar aggregation.
  • 4096 rows batch 단위.

Tungsten row format에서 columnar batch로 진화. DuckDB, ClickHouse의 방식을 따라감.

교훈:

Tungsten은 Spark의 **"underlying 재발명"**이다. 표면 API는 그대로지만 내부가 완전히 바뀌었다. 사용자는 변경 없이 2~10배 성능 향상.

이는 **"하드웨어를 존중하는 설계"**의 중요성을 보여준다:

  • CPU cache가 싫어하는 것 (random access) 피하기.
  • GC의 약점 (작은 객체) 회피.
  • Memory bandwidth 아끼기.

JVM이 "추상화"를 제공하지만, 고성능이 필요하면 그 추상화를 영리하게 우회해야 한다. Tungsten은 "Java/Scala 속에서 C 수준 성능을 낸다"는 도전의 답이다.

모든 빅데이터 엔진이 결국 같은 방향으로 진화: 바이너리 포맷, 연속 메모리, 벡터화. Tungsten은 Spark의 해답이었고, 같은 아이디어가 Flink, Presto, Snowflake에서도 발견된다. 수렴 진화의 예다.

Q4. Whole-Stage Code Generation이 어떻게 기존 Volcano 모델보다 빠른가?

A.

전통 Volcano 모델:

1970년대부터 쓰인 query execution model. 각 연산자는 iterator:

interface QueryOperator {
    Row next();
    boolean hasNext();
}

예시: SELECT name FROM users WHERE age > 20

ProjectFilterScan

실행:

Project.next() {
    while (true) {
        Row row = Filter.next();
        if (row == null) return null;
        return new Row(row.getString("name"));
    }
}

Filter.next() {
    while (true) {
        Row row = Scan.next();
        if (row == null) return null;
        if (row.getInt("age") > 20) return row;
    }
}

Scan.next() {
    return disk.readNextRow();
}

각 연산자가 독립 객체. next()를 계속 호출.

Volcano의 비용:

  1. Virtual function calls: next()는 인터페이스 메소드. JVM은 virtual dispatch 사용 → inline 어려움.

  2. Object allocation: 매 row마다 새 Row 객체 생성 가능성. GC 부담.

  3. Branch prediction 실패: 연산자마다 다른 코드 경로 실행. CPU pipeline 예측 실패.

  4. Cache miss: 매 연산자의 코드가 다른 메모리 위치.

  5. Boxing: Java의 경우 Integer 같은 wrapper 사용 → heap 할당.

결과: 단순 쿼리도 Volcano 오버헤드만으로 2-5배 느림.

Whole-Stage Code Generation의 답:

아이디어: "여러 연산자를 하나의 함수로 병합". 런타임에 Java 코드를 생성하고 컴파일.

예시: 같은 쿼리

Volcano (의사 코드):

for each row from Scan:
    call Filter.next(row)
    call Project.next(filter_output)
    output project_output

3개 iterator, 수많은 함수 호출.

WSCG가 생성하는 코드:

public class GeneratedIterator {
    public Object[] next() {
        while (scanner.hasNext()) {
            InternalRow row = scanner.next();
            int age = row.getInt(2);   // age 컬럼
            
            if (age > 20) {  // filter inlined
                UTF8String name = row.getUTF8String(1);  // name 컬럼
                return new Object[]{ name };  // project inlined
            }
            // 조건 불만족, 다음 row로
        }
        return null;  // 끝
    }
}

핵심 차이:

  1. 단일 함수: Scan, Filter, Project가 한 함수의 내부. 함수 호출 없음.

  2. Inline: 컴파일러(JVM JIT)가 모든 것을 인라인. Virtual call 제거.

  3. 직접 메모리 접근: row.getInt(2)는 Tungsten의 Unsafe.getInt(address).

  4. 단순한 loop: 하나의 while 루프. CPU pipeline 친화적.

성능 향상:

Databricks 벤치마크 (TPC-DS):

  • Volcano: 100%.
  • WSCG: 400% ~ 1000%.

4-10배 향상. 마법이 아니라 오버헤드 제거.

어떻게 작동하는가:

  1. Spark의 최적화 단계:

    • Logical plan → Physical plan.
    • 여러 연산자를 stage로 그룹화 (narrow dependency 내).
    • 각 stage가 WSCG 대상.
  2. Code generation:

    • Physical plan 트리를 순회.
    • 각 연산자가 자기 코드 조각 제공.
    • 합쳐서 Java source code 생성.
  3. Compilation:

    • Janino (가벼운 Java compiler)로 컴파일.
    • 바이트코드 생성.
    • ClassLoader에 로드.
  4. Execution:

    • 컴파일된 클래스의 인스턴스 생성.
    • 일반 Java 메소드처럼 호출.
    • JVM JIT가 최적화 (hot path → native code).

실전 예시:

spark.range(1, 1000000)
     .filter($"id" > 500000)
     .select($"id" * 2)
     .count()

EXPLAIN 결과:

*(1) Filter (id#0L > 500000)
+- *(1) Range (1, 1000000, step=1, splits=...)

*(1)이 WSCG stage 번호. Range, Filter, Project가 하나의 코드로 통합.

explain("codegen")으로 실제 생성된 코드 확인 가능:

public Object generate(Object[] references) {
    return new GeneratedIteratorForCodegenStage1(references);
}

final class GeneratedIteratorForCodegenStage1 extends BufferedRowIterator {
    // ... 수백 줄의 생성된 코드
}

제약:

  1. 64KB 메소드 한계: JVM의 단일 메소드가 64KB 초과 불가. 너무 많은 연산자 병합 시 분할.

  2. 복잡한 연산자: Window function, UDF 등은 codegen 어려움. Fall back to Volcano.

  3. 디버깅 어려움: 생성된 코드를 디버거로 추적 힘듦.

  4. 컴파일 오버헤드: 매 쿼리마다 컴파일. 작은 쿼리엔 역효과 가능.

WSCG vs Vectorized:

Spark의 WSCG는 row-at-a-time + inline. 또 다른 접근:

Vectorized Execution (ClickHouse, DuckDB, Presto 일부):

  • Batch 단위 (1024 rows).
  • SIMD 명령 활용.
  • Columnar 처리.

장점: SIMD의 힘. 단점: 구현 복잡.

Spark는 두 접근을 조합:

  • Parquet reader: vectorized (columnar).
  • 연산자: WSCG (row-wise).
  • 향후: 더 많은 vectorization 도입 중.

Hyper / Umbra 비교:

독일 TU Munich의 HyPer/Umbra 연구 시스템은 더 급진적:

  • LLVM IR로 컴파일.
  • 네이티브 코드 (JIT 없음).
  • 성능이 C++ 손 작성 코드 수준.

Spark WSCG는 Java 바이트코드까지. JVM JIT에 의존. Umbra만큼은 아니지만 훨씬 간단하고 이식성 있다.

Rough numbers:

  • Volcano: baseline.
  • WSCG: 4-10x faster.
  • Vectorized: 4-10x faster.
  • LLVM-based: 10-20x faster.

Spark는 "충분히 빠르면서 구현 가능"의 균형을 찾았다.

교훈:

WSCG는 전통 지혜의 재평가다. Volcano 모델이 40년간 지배한 이유는 우아함과 유연성이지만, 현대 CPU 아키텍처 (deep pipeline, cache, SIMD)에선 적합하지 않다.

해결: "연산자 추상화를 유지하면서도 런타임에 구체화". 이것이 codegen의 본질.

이는 "지연된 optimization" 철학이다:

  • 사용자는 high-level 코드 작성.
  • 시스템이 물리적 최적화.
  • 컴파일러의 전통적 역할을 런타임으로.

JIT 컴파일러가 수십 년간 해온 일이지만, 빅데이터에 적용한 것은 Spark가 처음 크게 성공시켰다. 이후 거의 모든 현대 데이터 엔진이 이 패턴을 따르고 있다.

현대 SQL 엔진의 성능은 하드웨어 성능의 몇 % 를 사용하는가의 경쟁이다. Volcano는 10%, WSCG는 40%, vectorized는 60%, LLVM-based는 80%. 각 진화가 이 수치를 밀어올린다.

Q5. Adaptive Query Execution (AQE)이 실제 실행 통계를 어떻게 활용하는가?

A.

Static optimization의 한계:

전통 쿼리 optimizer (Catalyst 포함)는 실행 전에 최적화를 결정한다. 근거는:

  • 통계 (ANALYZE TABLE).
  • 데이터 소스 메타데이터.
  • Schema.

문제: 통계가 부정확하거나 최신이 아닐 수 있다:

  1. 데이터가 변경됨 (ANALYZE 이후).
  2. 통계 계산이 비쌈 (대용량).
  3. 복잡한 조건의 selectivity 추정 어려움.
  4. Join 결과 크기 예측 어려움.

결과: 잘못된 계획 선택 → 느린 쿼리.

AQE의 기본 아이디어:

"Stage 경계에서 실제 통계를 수집하고, 남은 계획을 재최적화"

Spark는 이미 shuffle 단계에서 중간 결과가 디스크에 저장된다. 이 데이터의 실제 크기를 측정해서 남은 단계를 최적화할 수 있다.

AQE의 세 가지 최적화:

1. Dynamically Coalescing Shuffle Partitions

문제: spark.sql.shuffle.partitions 기본값 200. 실제 데이터 크기와 무관.

시나리오:

  • 작은 table filter 결과: 100 MB.
  • 200 partition = 각 500 KB.
  • 200개의 매우 작은 task. 오버헤드 비율 높음.

AQE 해결:

  • Shuffle 완료 후 실제 partition 크기 측정.
  • 작은 partition들을 병합.
  • 예: 200 → 20 (각 ~5 MB).
  • Task 수 감소, 오버헤드 감소, 성능 향상.

설정:

spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB

2. Dynamically Switching Join Strategies

문제: Join 전략 선택 시 table 크기 추정이 부정확.

시나리오:

SELECT * FROM big JOIN medium ON big.id = medium.id
WHERE medium.status = 'active'

계획 시 추정: medium 크기 = 100 MB → SortMergeJoin 선택.

실제 실행: 필터 후 medium.active = 5 MB. Broadcast가 훨씬 나았을 것.

AQE 해결:

  • Medium 필터 결과를 shuffle.
  • 실제 크기가 5 MB임을 확인.
  • BroadcastHashJoin으로 동적 전환.
  • 수 배 빠름.

예시:

Before AQE: 120 seconds (SortMergeJoin)
After AQE:  25 seconds (auto-switched to BroadcastHashJoin)

설정:

spark.sql.adaptive.autoBroadcastJoinThreshold=10MB

3. Dynamically Optimizing Skew Joins

문제: Data skew. 일부 key가 과도하게 많음.

시나리오:

SELECT * FROM orders JOIN users ON orders.user_id = users.id
  • 대부분 user는 10~100 orders.
  • 한 user (봇, 관리자)가 1,000,000 orders.
  • 해당 partition이 100배 크다.
  • 그 task가 100배 오래 걸림.
  • 나머지 tasks는 빨리 끝나지만 skewed task 기다림.

전통적 해결: Salting (수동, 복잡).

AQE 해결:

  1. Shuffle 후 각 partition 크기 확인.
  2. 평균의 5배 이상 크면 skewed로 판단.
  3. Skewed partition을 여러 sub-partition으로 분할.
  4. 각 sub-partition이 다른 task에서 처리.
  5. 반대쪽 partition도 복제 (broadcast 필요).

예시:

Before:
Task 1: 1 GB (skewed user)100 seconds
Task 2: 10 MB1 second
Task 3: 10 MB1 second
...
Total time: 100 seconds (Task 1 bound)
AQE divides Task 1 into 10 sub-tasks:
Task 1a: 100 MB10 seconds
Task 1b: 100 MB10 seconds
...
Task 2: 10 MB1 second
Total time: 10 seconds (10x faster)

설정:

spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

4. Dynamic Partition Pruning (DPP) (AQE와 함께 발전)

Star schema의 fact + dimension join:

SELECT f.amount
FROM fact_sales f JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024

전통: fact_sales 전체 스캔 + join + filter.

DPP:

  1. dim_date에서 year=2024인 id 먼저 추출.
  2. 이 id들을 fact_sales의 partition filter로 pushdown.
  3. fact_sales 스캔 시 해당 partition만 로드.

효과:

  • 파티션된 fact_sales (year별)이면 1 TB → 50 GB.
  • 20배 빠른 쿼리.

실측 효과:

Databricks TPC-DS 벤치마크:

  • AQE 비활성: 100% (baseline).
  • AQE 활성: 130~200%.
  • 일부 쿼리: 3배+ 향상.
  • 평균: 1.8배.

거의 공짜. 기존 코드 변경 없이.

Spark 버전별:

  • Spark 3.0: AQE 도입 (기본 비활성).
  • Spark 3.2: AQE 기본 활성화.
  • Spark 3.3+: 개선 지속.

내부 작동:

AQE는 Spark의 query execution을 stage 단위로 실행하되, stage 사이에 pause한다:

  1. Stage 1 실행.
  2. Stage 1 완료 → shuffle 파일 생성.
  3. Shuffle 파일의 실제 통계 수집.
  4. Stage 2의 physical plan을 re-optimize.
  5. Stage 2 실행.
  6. 반복.

stage 단위 통계 → 재최적화 루프가 AQE의 핵심.

한계와 함정:

1. Streaming에 제한적: Structured Streaming은 AQE 덜 적용됨.

2. 작은 쿼리엔 불필요: Re-optimization 오버헤드가 작은 쿼리에선 역효과.

3. 복잡한 쿼리 예측 어려움: 매우 복잡한 쿼리는 여전히 수동 튜닝 필요.

4. DPP와의 상호작용: 복잡한 경우 예측 안 되는 계획 변경.

실전 권장:

# 기본 설정 (Spark 3.2+는 이미 기본)
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true

# 파라미터 조정
spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB  # 목표 partition 크기
spark.sql.adaptive.coalescePartitions.minPartitionSize=32MB  # 최소
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5  # skew 기준

교훈:

AQE는 "optimization은 한 번에 끝나지 않는다" 는 철학의 구현이다. 전통 optimizer는 ahead-of-time (실행 전 모든 결정), AQE는 just-in-time (필요 시 재최적화).

이는 JIT 컴파일러의 철학과 같다:

  • 처음엔 inexpert 결정.
  • 런타임 관찰.
  • 최적화.

분산 데이터 처리에서 같은 원칙 적용. Spark가 선구자였고, 이후 Trino, Presto도 유사 기능 추가.

**"정적 vs 동적 최적화"**의 트레이드오프:

  • 정적: 단순, 예측 가능. 완전 최적은 아닐 수 있음.
  • 동적: 복잡, 오버헤드. 실제 데이터에 적응.

AQE는 이 둘 사이의 실용적 타협이다. 완벽한 동적 재계획은 아니지만, stage 경계에서 핵심 결정을 조정함으로써 대부분의 이득을 얻는다.

미래:

AQE는 계속 발전 중:

  • Multi-stage re-optimization.
  • Cost model 개선.
  • ML 기반 최적화 (실험적).
  • Streaming adaptation.

Spark가 10년 넘게 데이터 처리 영역을 선도하는 이유는 이런 지속적 혁신 때문이다. "충분히 좋다"에 만족하지 않고, 매 버전마다 의미 있는 개선을 가져온다. AQE는 그 한 예이며, 앞으로도 더 나아질 것이다.


마치며: 데이터의 정교한 엔진

핵심 정리

  1. RDD: 메모리 기반 분산 컬렉션. Lineage로 장애 복구.
  2. DataFrame: 구조화된 API. Schema + optimizer.
  3. Catalyst: Rule + Cost 기반 SQL 최적화.
  4. Tungsten: Off-heap binary format. CPU 캐시 친화.
  5. WSCG: 런타임 Java 코드 생성. Volcano 오버헤드 제거.
  6. Shuffle: 분산의 핵심이자 병목. Sort shuffle 기본.
  7. AQE: 런타임 통계로 재최적화. 30-200% 향상.

Spark가 우리에게 준 것

Spark는 빅데이터 처리의 민주화다. MapReduce 시대엔:

  • Java로 map/reduce 작성.
  • 복잡한 최적화 수동.
  • 느린 반복.

Spark 이후:

  • SQL 또는 간단한 DataFrame API.
  • 자동 최적화 (Catalyst + Tungsten + AQE).
  • 빠른 반복.
  • 배치 + 스트림 + ML 통합.

진입 장벽이 낮아지면서 더 많은 사람이 데이터를 다룰 수 있게 되었다. 데이터 분석가도 SQL로 복잡한 분산 쿼리 실행. 데이터 과학자도 머신러닝 파이프라인을 PySpark로.

실전 권장

  • 기본 설정 이해: spark.executor.memory, spark.sql.shuffle.partitions, AQE.
  • Broadcast join 활용: 작은 dimension 테이블.
  • Partition column 선택: 쿼리 패턴에 맞게.
  • EXPLAIN 읽기: 느린 쿼리의 첫 단계.
  • Spark UI 활용: Task 분포, GC time 모니터.
  • 적절한 file format: Parquet/ORC 선호.

마지막 교훈

Spark의 성공은 기술적 우수성 + 타이밍 + 생태계의 합이다:

  1. 기술: RDD, Catalyst, Tungsten의 혁신.
  2. 타이밍: 메모리 가격 하락 + 빅데이터 수요 폭발.
  3. 생태계: Python/R 지원, ML, 스트리밍, SQL 모두 통합.

이 세 가지 중 하나라도 빠졌으면 Spark는 지금 위치에 없을 것이다. 기술은 필요하지만 충분하지 않다는 교훈이다.

당신이 다음에 Spark job을 실행할 때, 아래에서 벌어지는 일을 기억하자:

  • Catalyst가 수십 개의 rule로 쿼리 재작성.
  • Tungsten이 바이너리 포맷으로 메모리 절약.
  • WSCG가 Java 코드 생성 + 컴파일.
  • AQE가 실행 중 통계로 재최적화.
  • Shuffle이 데이터 재분배.

이 모든 것이 수 ms~수 초 안에 일어난다. 그리고 당신은 단지 .filter().groupBy().count()만 썼다. 이것이 현대 데이터 엔지니어링의 마법이다.


참고 자료

현재 단락 (1/1216)

2010년 UC Berkeley의 AMPLab에서 **Matei Zaharia**가 논문을 발표했다: "**Resilient Distributed Datasets: A Fault-...

작성 글자: 0원문 글자: 34,947작성 단락: 0/1216