Skip to content

Split View: 데이터 엔지니어링 파이프라인 완전 가이드 2025: ETL/ELT, Spark, Airflow, 실시간 스트리밍

✨ Learn with Quiz
|

데이터 엔지니어링 파이프라인 완전 가이드 2025: ETL/ELT, Spark, Airflow, 실시간 스트리밍

1. 데이터 엔지니어링 개요

데이터 엔지니어의 역할

데이터 엔지니어는 조직의 데이터 인프라를 설계, 구축, 유지보수하는 역할을 담당합니다. 데이터 과학자가 분석할 수 있도록 데이터를 수집, 변환, 저장하는 파이프라인을 만드는 것이 핵심 업무입니다.

데이터 엔지니어의 핵심 역할
├── 데이터 수집 (Ingestion)
│   ├── API, DB, 파일, 스트림에서 데이터 추출
│   └── 다양한 소스를 통합
├── 데이터 변환 (Transformation)
│   ├── 정제, 정규화, 집계
│   └── 비즈니스 로직 적용
├── 데이터 저장 (Storage)
│   ├── 데이터 웨어하우스 설계
│   └── 데이터 레이크 구축
├── 파이프라인 오케스트레이션
│   ├── 워크플로 자동화
│   └── 스케줄링 및 모니터링
└── 데이터 품질 관리
    ├── 데이터 검증
    └── 데이터 계약 (Data Contract)

필수 기술 스택

프로그래밍       : Python, SQL, Scala/Java
데이터 처리      : Spark, Flink, Beam
오케스트레이션   : Airflow, Dagster, Prefect
스트리밍         : Kafka, Kinesis, Pub/Sub
변환 도구        : dbt, Dataform
클라우드          : AWS(Redshift, Glue), GCP(BigQuery), Azure(Synapse)
컨테이너         : Docker, Kubernetes
IaC              : Terraform, Pulumi
모니터링         : Datadog, Grafana, Monte Carlo

2. ETL vs ELT

전통적 ETL

ETL은 Extract(추출), Transform(변환), Load(적재)의 약자로, 데이터를 소스에서 추출한 후 변환 서버에서 가공하여 최종 저장소에 적재합니다.

ETL 흐름:
소스 DB ──Extract──→ 변환 서버 ──Transform──→ Load──→ 데이터 웨어하우스
                      (ETL 서버)
# 전통적 ETL 예시 (Python)
import pandas as pd
from sqlalchemy import create_engine

# Extract: 소스 DB에서 데이터 추출
source_engine = create_engine('postgresql://source_db:5432/sales')
raw_data = pd.read_sql('SELECT * FROM orders WHERE date >= %s', source_engine, params=['2025-01-01'])

# Transform: 데이터 변환
transformed = raw_data.copy()
transformed['total_with_tax'] = transformed['total'] * 1.1
transformed['order_month'] = pd.to_datetime(transformed['order_date']).dt.to_period('M')
transformed = transformed.dropna(subset=['customer_id'])
transformed = transformed[transformed['total'] > 0]

# Load: 웨어하우스에 적재
wh_engine = create_engine('postgresql://warehouse:5432/analytics')
transformed.to_sql('fact_orders', wh_engine, if_exists='append', index=False)

현대적 ELT

ELT는 Extract(추출), Load(적재), Transform(변환)의 순서로, 원본 데이터를 먼저 웨어하우스에 적재한 후 웨어하우스 내에서 변환합니다.

ELT 흐름:
소스 DB ──Extract──→ Load──→ 데이터 웨어하우스 ──Transform──→ 분석용 테이블
                                 (dbt 등으로 변환)
-- ELT: 웨어하우스 내에서 dbt로 변환
-- models/marts/fact_orders.sql
WITH source_orders AS (
    SELECT * FROM raw.orders
    WHERE order_date >= '2025-01-01'
),
cleaned AS (
    SELECT
        order_id,
        customer_id,
        total,
        total * 1.1 AS total_with_tax,
        DATE_TRUNC('month', order_date) AS order_month,
        order_date
    FROM source_orders
    WHERE customer_id IS NOT NULL
      AND total > 0
)
SELECT * FROM cleaned

ETL vs ELT 비교표

항목ETLELT
변환 위치별도 서버웨어하우스 내부
확장성ETL 서버 성능에 의존웨어하우스 컴퓨팅 활용
원본 데이터변환 후 원본 유실 가능원본 보존
비용ETL 서버 운영 비용웨어하우스 컴퓨팅 비용
유연성변환 로직 변경 시 재처리SQL로 유연하게 재변환
대표 도구Informatica, Talenddbt, Dataform
적합한 경우레거시 시스템, 규제 요건클라우드 네이티브, 빅데이터

3. 배치 vs 스트림 처리

배치 처리 (Batch Processing)

일정 주기로 대량의 데이터를 한꺼번에 처리하는 방식입니다.

배치 처리:
[데이터 축적] ──→ [일괄 처리] ──→ [결과 저장]
  (1시간/1)      (Spark)      (웨어하우스)

특징:
- 높은 처리량 (Throughput)
- 지연 시간이  (~시간)
- 비용 효율적
- 재처리 용이

스트림 처리 (Stream Processing)

데이터가 도착하는 즉시 실시간으로 처리하는 방식입니다.

스트림 처리:
[이벤트 발생] ──→ [즉시 처리] ──→ [실시간 결과]
  (연속적)        (Flink)      (대시보드/알림)

특징:
- 낮은 지연 시간 (밀리초~)
- 연속 처리
- 복잡한 장애 처리
- 이벤트 순서 관리 필요

선택 기준

배치를 선택하는 경우:
  -//월 단위 보고서
  - 대규모 데이터 집계
  - 비용 최적화 우선
  - 실시간 필요 없음

스트림을 선택하는 경우:
  - 실시간 대시보드
  - 사기 탐지
  - 실시간 추천
  - IoT 센서 데이터
  - 알림/알럿

하이브리드 (Lambda/Kappa):
  - 배치 + 스트림 동시 운영
  - 실시간 근사치 + 배치 정확 결과

4. Apache Spark

4.1 Spark 개요

Apache Spark는 대규모 데이터 처리를 위한 통합 분석 엔진입니다. 인메모리 연산으로 MapReduce 대비 100배 빠른 성능을 제공합니다.

Spark 아키텍처:
┌─────────────────────────────────┐
Spark Application├─────────────────────────────────┤
SparkSQLStreamingMLlib├─────────────────────────────────┤
DataFrame / Dataset├─────────────────────────────────┤
RDD (Core Engine)├─────────────────────────────────┤
StandaloneYARNMesosK8s│
└─────────────────────────────────┘

4.2 PySpark 기본

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lit

# SparkSession 생성
spark = SparkSession.builder \
    .appName("DataPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# 데이터 읽기
orders_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://data-lake/raw/orders/")

users_df = spark.read.parquet("s3://data-lake/raw/users/")

# 데이터 변환
result = orders_df \
    .filter(col("status") == "completed") \
    .join(users_df, orders_df.user_id == users_df.id, "inner") \
    .groupBy("user_id", "username") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("amount").alias("total_spent"),
        avg("amount").alias("avg_order_value")
    ) \
    .filter(col("total_orders") > 5)

# 결과 저장
result.write \
    .mode("overwrite") \
    .partitionBy("order_month") \
    .parquet("s3://data-lake/processed/user_summary/")

4.3 SparkSQL

# 임시 뷰 등록
orders_df.createOrReplaceTempView("orders")
users_df.createOrReplaceTempView("users")

# SQL로 분석
monthly_revenue = spark.sql("""
    SELECT
        DATE_TRUNC('month', order_date) AS month,
        COUNT(DISTINCT user_id) AS unique_customers,
        COUNT(*) AS total_orders,
        SUM(amount) AS revenue,
        AVG(amount) AS avg_order_value
    FROM orders
    WHERE status = 'completed'
    GROUP BY DATE_TRUNC('month', order_date)
    ORDER BY month
""")

monthly_revenue.show()

4.4 파티셔닝과 캐싱

# 파티셔닝 최적화
# 파티션 수 확인
print(f"Partitions: {orders_df.rdd.getNumPartitions()}")

# 재파티셔닝 (셔플 발생)
orders_repartitioned = orders_df.repartition(100, "order_date")

# 합병 (셔플 없이 파티션 수 줄이기)
orders_coalesced = orders_df.coalesce(50)

# 캐싱
from pyspark.storagelevel import StorageLevel

# 메모리 캐싱
orders_df.cache()
orders_df.count()  # 캐시 트리거

# 메모리 + 디스크 캐싱
users_df.persist(StorageLevel.MEMORY_AND_DISK)

# 캐시 해제
orders_df.unpersist()

4.5 Spark 성능 튜닝

# 브로드캐스트 조인 (작은 테이블)
from pyspark.sql.functions import broadcast

# 작은 테이블을 브로드캐스트
result = orders_df.join(
    broadcast(users_df),
    orders_df.user_id == users_df.id
)

# AQE (Adaptive Query Execution) 활성화
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Spark 성능 최적화 체크리스트:
  [ ] 적절한 파티션 수 설정 (코어 수의 2~3)
  [ ] 데이터 스큐 처리 (salting, AQE)
  [ ] 브로드캐스트 조인 활용
  [ ] 불필요한 셔플 최소화
  [ ] 컬럼 프루닝 (필요한 컬럼만 선택)
  [ ] Predicate Pushdown 활용
  [ ] 캐싱 전략 수립
  [ ] 직렬화 포맷 최적화 (Parquet, ORC)

5. Apache Airflow

5.1 Airflow 개요

Apache Airflow는 워크플로를 프로그래밍 방식으로 작성, 스케줄링, 모니터링하기 위한 플랫폼입니다. DAG(방향 비순환 그래프)로 태스크 간의 의존 관계를 정의합니다.

Airflow 아키텍처:
┌──────────────────────────────────┐
Web Server    (UI / REST API)├──────────────────────────────────┤
Scheduler    (DAG 파싱, 태스크 스케줄링)├──────────────────────────────────┤
Executor  (Local/Celery/Kubernetes)├──────────────────────────────────┤
Metadata Database    (PostgreSQL/MySQL)└──────────────────────────────────┘

5.2 DAG 작성

# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='일별 ETL 파이프라인',
    schedule_interval='0 6 * * *',  # 매일 오전 6시
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['etl', 'daily'],
    max_active_runs=1,
) as dag:

    # 센서: 파일 도착 대기
    wait_for_data = FileSensor(
        task_id='wait_for_data',
        filepath='/data/raw/daily_export.csv',
        poke_interval=300,  # 5분마다 확인
        timeout=3600,       # 최대 1시간 대기
        mode='poke',
    )

    # 추출
    def extract_data(**context):
        import pandas as pd
        execution_date = context['ds']
        df = pd.read_csv(f'/data/raw/daily_export_{execution_date}.csv')
        df.to_parquet(f'/data/staging/extract_{execution_date}.parquet')
        return len(df)

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    # 변환
    def transform_data(**context):
        import pandas as pd
        execution_date = context['ds']
        df = pd.read_parquet(f'/data/staging/extract_{execution_date}.parquet')

        # 데이터 정제
        df = df.dropna(subset=['customer_id', 'amount'])
        df = df[df['amount'] > 0]
        df['amount_with_tax'] = df['amount'] * 1.1

        df.to_parquet(f'/data/staging/transform_{execution_date}.parquet')
        return len(df)

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    # 적재
    load = PostgresOperator(
        task_id='load',
        postgres_conn_id='warehouse',
        sql='sql/load_daily_orders.sql',
    )

    # 데이터 품질 검증
    def validate_data(**context):
        execution_date = context['ds']
        # 레코드 수 검증, NULL 체크, 범위 검증
        row_count = context['ti'].xcom_pull(task_ids='transform')
        if row_count < 100:
            raise ValueError(f'레코드 수 부족: {row_count}')

    validate = PythonOperator(
        task_id='validate',
        python_callable=validate_data,
    )

    # 알림
    notify = BashOperator(
        task_id='notify',
        bash_command='echo "ETL 완료: {{ ds }}" | mail -s "ETL Success" team@company.com',
    )

    # 의존 관계 정의
    wait_for_data >> extract >> transform >> load >> validate >> notify

5.3 TaskFlow API (Airflow 2.x)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['etl'],
)
def modern_etl_pipeline():

    @task()
    def extract():
        """소스에서 데이터 추출"""
        import pandas as pd
        df = pd.read_csv('/data/raw/orders.csv')
        return df.to_dict()

    @task()
    def transform(raw_data: dict):
        """데이터 변환 및 정제"""
        import pandas as pd
        df = pd.DataFrame(raw_data)
        df = df[df['amount'] > 0]
        df['processed_at'] = datetime.now().isoformat()
        return df.to_dict()

    @task()
    def load(transformed_data: dict):
        """웨어하우스에 적재"""
        import pandas as pd
        df = pd.DataFrame(transformed_data)
        # 웨어하우스에 적재 로직
        print(f"Loaded {len(df)} records")

    # 자동 의존 관계 설정
    raw = extract()
    transformed = transform(raw)
    load(transformed)

# DAG 인스턴스화
modern_etl_pipeline()

5.4 Connections, Variables, XCom

# Connections: UI 또는 CLI로 설정
# airflow connections add 'warehouse' \
#   --conn-type 'postgres' \
#   --conn-host 'warehouse.example.com' \
#   --conn-port 5432 \
#   --conn-login 'etl_user' \
#   --conn-password 'secret'

# Variables
from airflow.models import Variable

env = Variable.get("environment", default_var="dev")
config = Variable.get("pipeline_config", deserialize_json=True)

# XCom: 태스크 간 데이터 전달
def producer_task(**context):
    context['ti'].xcom_push(key='row_count', value=1000)

def consumer_task(**context):
    row_count = context['ti'].xcom_pull(
        task_ids='producer',
        key='row_count'
    )
    print(f"이전 태스크 결과: {row_count}행")

5.5 동적 DAG 생성

# dags/dynamic_dag_factory.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 설정 기반 동적 DAG 생성
configs = [
    {"name": "sales", "source": "mysql", "schedule": "@daily"},
    {"name": "users", "source": "postgres", "schedule": "@hourly"},
    {"name": "logs", "source": "s3", "schedule": "0 */6 * * *"},
]

def create_etl_dag(config):
    dag = DAG(
        dag_id=f"etl_{config['name']}",
        schedule_interval=config['schedule'],
        start_date=datetime(2025, 1, 1),
        catchup=False,
    )

    def process(**kwargs):
        print(f"Processing {config['name']} from {config['source']}")

    with dag:
        PythonOperator(
            task_id='process',
            python_callable=process,
        )

    return dag

for config in configs:
    globals()[f"etl_{config['name']}"] = create_etl_dag(config)

6. 실시간 스트리밍

6.1 Apache Kafka

Kafka는 분산 이벤트 스트리밍 플랫폼으로, 대규모 실시간 데이터 파이프라인의 핵심입니다.

Kafka 아키텍처:
Producer ──→ Broker(Topic/Partition) ──→ Consumer Group
                  ├── Partition 0: [msg1, msg2, msg3...]
                  ├── Partition 1: [msg4, msg5, msg6...]
                  └── Partition 2: [msg7, msg8, msg9...]
# Kafka Producer (Python)
from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'acks': 'all',
    'retries': 3,
    'linger.ms': 10,
    'batch.size': 16384,
}

producer = Producer(config)

def delivery_callback(err, msg):
    if err:
        print(f'전송 실패: {err}')
    else:
        print(f'전송 완료: {msg.topic()} [{msg.partition()}]')

# 메시지 전송
for i in range(100):
    event = {
        'user_id': f'user_{i}',
        'action': 'page_view',
        'timestamp': '2025-03-25T10:00:00Z',
        'page': '/products'
    }
    producer.produce(
        topic='user-events',
        key=str(event['user_id']),
        value=json.dumps(event),
        callback=delivery_callback
    )

producer.flush()
# Kafka Consumer (Python)
from confluent_kafka import Consumer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'analytics-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}

consumer = Consumer(config)
consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))
        print(f"수신: {event['user_id']} - {event['action']}")

        # 수동 커밋
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Flink는 상태 기반 스트림 처리 엔진으로, 정확히 한 번(exactly-once) 시맨틱스를 제공합니다.

# PyFlink 스트림 처리
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60000)  # 60초마다 체크포인트

t_env = StreamTableEnvironment.create(env)

# Kafka 소스 테이블 정의
t_env.execute_sql("""
    CREATE TABLE user_events (
        user_id STRING,
        action STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-processor',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# 윈도우 집계
t_env.execute_sql("""
    CREATE TABLE page_view_stats (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        page STRING,
        view_count BIGINT,
        unique_users BIGINT
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://analytics-db:5432/stats',
        'table-name' = 'page_view_stats',
        'driver' = 'org.postgresql.Driver'
    )
""")

t_env.execute_sql("""
    INSERT INTO page_view_stats
    SELECT
        window_start,
        window_end,
        action AS page,
        COUNT(*) AS view_count,
        COUNT(DISTINCT user_id) AS unique_users
    FROM TABLE(
        TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
    )
    GROUP BY window_start, window_end, action
""")

6.3 Exactly-Once 시맨틱스

전달 보장 수준:
  At-most-once  : 메시지 유실 가능, 중복 없음
  At-least-once : 메시지 유실 없음, 중복 가능
  Exactly-once  : 메시지 유실 없음, 중복 없음 (가장 어려움)

Kafka + Flink Exactly-Once:
  1. Kafka 트랜잭셔널 프로듀서
  2. Flink 체크포인트 (Chandy-Lamport)
  3. Two-Phase Commit Protocol
  4. Kafka Consumer 오프셋을 체크포인트와 연동

7. dbt (data build tool)

7.1 dbt 개요

dbt는 ELT에서 T(Transform)를 담당하는 도구입니다. SQL로 데이터 변환 로직을 작성하고, 소프트웨어 엔지니어링 베스트 프랙티스(버전 관리, 테스트, 문서화)를 데이터 변환에 적용합니다.

dbt 프로젝트 구조:
my_dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/
│   │   ├── stg_orders.sql
│   │   ├── stg_customers.sql
│   │   └── _staging_sources.yml
│   ├── intermediate/
│   │   └── int_order_items_grouped.sql
│   └── marts/
│       ├── dim_customers.sql
│       ├── fact_orders.sql
│       └── _marts_schema.yml
├── tests/
│   └── assert_positive_revenue.sql
├── macros/
│   └── generate_schema_name.sql
└── seeds/
    └── country_codes.csv

7.2 모델 작성

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
    SELECT
        id AS order_id,
        user_id AS customer_id,
        amount AS order_amount,
        status AS order_status,
        created_at AS ordered_at
    FROM source
    WHERE status != 'cancelled'
)
SELECT * FROM renamed
-- models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
)
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.customer_segment,
    o.order_amount,
    o.order_amount * 1.1 AS amount_with_tax,
    o.order_status,
    o.ordered_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id

{% if is_incremental() %}
WHERE o.ordered_at > (SELECT MAX(ordered_at) FROM {{ this }})
{% endif %}

7.3 소스와 테스트

# models/staging/_staging_sources.yml
version: 2

sources:
  - name: raw
    database: raw_db
    schema: public
    tables:
      - name: orders
        loaded_at_field: _loaded_at
        freshness:
          warn_after:
            count: 12
            period: hour
          error_after:
            count: 24
            period: hour
        columns:
          - name: id
            tests:
              - unique
              - not_null
          - name: amount
            tests:
              - not_null

  - name: raw
    tables:
      - name: customers
        columns:
          - name: id
            tests:
              - unique
              - not_null
# models/marts/_marts_schema.yml
version: 2

models:
  - name: fact_orders
    description: "주문 팩트 테이블"
    columns:
      - name: order_id
        description: "주문 고유 ID"
        tests:
          - unique
          - not_null
      - name: order_amount
        tests:
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
-- tests/assert_positive_revenue.sql
-- 커스텀 테스트: 매출이 양수인지 검증
SELECT order_id, order_amount
FROM {{ ref('fact_orders') }}
WHERE order_amount < 0

7.4 dbt 커맨드

# 모든 모델 빌드
dbt run

# 특정 모델만 빌드
dbt run --select fact_orders

# 모델 + 다운스트림 빌드
dbt run --select stg_orders+

# 테스트 실행
dbt test

# 문서 생성
dbt docs generate
dbt docs serve

# 소스 신선도 확인
dbt source freshness

# 시드 데이터 로드
dbt seed

# 전체 파이프라인 (빌드 + 테스트)
dbt build

8. 데이터 웨어하우스

비교표

항목BigQuerySnowflakeRedshift
벤더Google CloudSnowflakeAWS
아키텍처서버리스컴퓨팅/스토리지 분리MPP 클러스터
과금쿼리당 (온디맨드)크레딧 기반노드 시간당
확장성자동웨어하우스 리사이징노드 추가
동시성2000+ 슬롯멀티클러스터WLM 설정
반정형 데이터STRUCT, ARRAYVARIANTSUPER
ML 통합BigQuery MLSnowparkRedshift ML
비용 효율소규모에 유리중규모에 유리대규모 상시 운영에 유리

BigQuery 예시

-- BigQuery: 파티션 + 클러스터링
CREATE TABLE analytics.fact_orders
PARTITION BY DATE(ordered_at)
CLUSTER BY customer_segment, order_status
AS
SELECT
    order_id,
    customer_id,
    customer_segment,
    order_amount,
    order_status,
    ordered_at
FROM staging.orders;

-- 비용 예측 (dry run)
-- 1TB 스캔 = 약 $5 (온디맨드)

Snowflake 예시

-- Snowflake: 웨어하우스 생성 및 관리
CREATE WAREHOUSE etl_wh
    WITH WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE
    MIN_CLUSTER_COUNT = 1
    MAX_CLUSTER_COUNT = 3;

-- 데이터 로드
COPY INTO raw.orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
PATTERN = '.*[.]parquet';

9. 데이터 레이크 / 레이크하우스

테이블 포맷 비교

전통적 데이터 레이크 문제점:
  - ACID 트랜잭션 없음
  - 스키마 강제 없음
  - 시간 여행(Time Travel) 불가
  - 작은 파일 문제

레이크하우스 테이블 포맷이 해결:
  Delta Lake  : Databricks 주도, Spark 통합 최강
  Apache Iceberg : Netflix 개발, 벤더 중립
  Apache Hudi : Uber 개발, 증분 처리 특화
기능Delta LakeApache IcebergApache Hudi
ACID 트랜잭션OOO
스키마 진화OOO
시간 여행OOO
파티션 진화제한적O (숨은 파티셔닝)제한적
엔진 호환Spark 위주Spark, Flink, TrinoSpark, Flink
주요 플랫폼Databricks다수 벤더 채택AWS 중심
# Delta Lake 예시 (PySpark)
from delta.tables import DeltaTable

# Delta 테이블 생성
orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .save("s3://data-lake/delta/orders")

# UPSERT (Merge)
delta_table = DeltaTable.forPath(spark, "s3://data-lake/delta/orders")

delta_table.alias("target").merge(
    new_orders_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# 시간 여행
old_data = spark.read \
    .format("delta") \
    .option("versionAsOf", 5) \
    .load("s3://data-lake/delta/orders")

10. 데이터 품질

10.1 Great Expectations

import great_expectations as gx

context = gx.get_context()

# 데이터 소스 연결
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("orders", filepath_or_buffer="orders.csv")

# Expectation Suite 정의
suite = context.add_expectation_suite("orders_validation")

# 기대치 정의
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=100000
    )
)

# 검증 실행
results = context.run_checkpoint(
    checkpoint_name="orders_checkpoint"
)
print(f"성공: {results.success}")

10.2 데이터 계약 (Data Contracts)

# data-contracts/orders-contract.yaml
dataContractSpecification: 0.9.3
id: orders-contract
info:
  title: Orders Data Contract
  version: 1.0.0
  owner: data-team
  contact:
    email: data-team@company.com

schema:
  type: object
  properties:
    order_id:
      type: string
      description: "고유 주문 ID"
      required: true
      unique: true
    customer_id:
      type: string
      required: true
    amount:
      type: number
      minimum: 0
      maximum: 100000
    status:
      type: string
      enum: ["pending", "completed", "cancelled"]
    created_at:
      type: timestamp
      required: true

quality:
  completeness:
    - field: order_id
      threshold: 100
    - field: customer_id
      threshold: 99.9
  freshness:
    maxDelay: "PT1H"  # 1시간 이내

11. 오케스트레이션 비교

Airflow vs Dagster vs Prefect vs Mage

항목AirflowDagsterPrefectMage
접근 방식DAG 중심자산(Asset) 중심플로우 중심블록 중심
학습 곡선높음중간낮음낮음
로컬 개발복잡우수우수우수
테스트어려움내장 지원좋음좋음
UI기능적모던모던모던
커뮤니티매우 큼성장 중성장 중소규모
프로덕션 실적매우 많음늘어나는 중늘어나는 중초기 단계
클라우드MWAA, ComposerDagster CloudPrefect CloudMage Pro
선택 가이드:
├── 대규모 기업, 복잡한 워크플로 → Airflow
├── 데이터 자산 중심 사고 → Dagster
├── 빠른 시작, Python 네이티브 → Prefect
└── 노코드/로우코드 선호 → Mage

12. 모던 데이터 스택 다이어그램

모던 데이터 스택 (2025):

데이터 소스                수집/통합            저장              변환            분석/BI
──────────           ──────────         ──────────       ──────────      ──────────
 SaaS APIs  ──┐
 Databases  ──┼──→ Fivetran/Airbyte ──→ Snowflake  ──→   dbt      ──→  Looker
 Event Logs ──┤                         BigQuery       Dataform       Metabase
 Files      ──┘                         Redshift                     Tableau

 Kafka/     ──────→ Flink/Spark   ──→ Delta Lake  ──→  Spark SQL  ──→ 실시간
 Kinesis             Streaming         Iceberg                       대시보드

                                 오케스트레이션: Airflow / Dagster
                                 품질: Great Expectations / dbt tests
                                 카탈로그: DataHub / Atlan / OpenMetadata
                                 모니터링: Monte Carlo / Datadog

13. 퀴즈

Q1: ETL vs ELT

ETL과 ELT의 핵심 차이점은 무엇이며, 언제 ELT를 선택해야 하나요?

정답:

핵심 차이점은 변환(Transform)이 발생하는 위치입니다. ETL은 별도의 변환 서버에서 변환 후 적재하고, ELT는 데이터를 먼저 웨어하우스에 적재한 후 웨어하우스 내에서 변환합니다.

ELT를 선택해야 하는 경우:

  • 클라우드 웨어하우스(BigQuery, Snowflake)를 사용하는 경우
  • 원본 데이터 보존이 중요한 경우
  • 변환 로직이 자주 바뀌어 유연성이 필요한 경우
  • dbt 같은 도구로 SQL 기반 변환을 원하는 경우

Q2: Spark 파티셔닝

Spark에서 repartition()과 coalesce()의 차이점은 무엇인가요?

정답:

repartition()은 전체 셔플을 수행하여 데이터를 지정된 수의 파티션으로 균등하게 재분배합니다. 파티션 수를 늘리거나 특정 컬럼으로 파티셔닝할 때 사용합니다.

coalesce()는 셔플 없이 파티션 수만 줄입니다. 기존 파티션을 합치는 것이라 파티션 수를 줄일 때만 사용할 수 있으며, 네트워크 비용이 적습니다.

파티션 수를 줄여야 할 때는 coalesce(), 늘리거나 균등 분배가 필요할 때는 repartition()을 사용합니다.

Q3: Airflow XCom

Airflow에서 XCom의 역할과 제한 사항은 무엇인가요?

정답:

XCom(Cross-Communication)은 Airflow 태스크 간에 소량의 데이터를 전달하는 메커니즘입니다. 메타데이터 DB에 저장됩니다.

제한 사항:

  • 소량 데이터만 전달(기본 48KB, 최대 수 MB)
  • 대용량 데이터는 S3/GCS 등 외부 저장소 경로만 전달
  • 직렬화 가능한 데이터만 전달(JSON serializable)
  • 메타데이터 DB에 부하를 줄 수 있음

대안: 대용량 데이터는 임시 파일이나 클라우드 스토리지를 사용하고, XCom으로는 파일 경로만 전달합니다.

Q4: Exactly-Once 시맨틱스

Kafka에서 Exactly-Once 시맨틱스를 구현하는 방법을 설명하세요.

정답:

Kafka Exactly-Once는 3가지 요소로 구현됩니다.

  1. Idempotent Producer: Producer에 enable.idempotence=true를 설정하면 브로커가 중복 메시지를 자동으로 제거합니다.

  2. Transactional Producer: 여러 파티션/토픽에 걸친 원자적 쓰기를 보장합니다. initTransactions(), beginTransaction(), commitTransaction() API를 사용합니다.

  3. Consumer read_committed: Consumer에서 isolation.level=read_committed를 설정하면 커밋된 트랜잭션의 메시지만 읽습니다.

Flink와 연동 시, Flink의 체크포인트 메커니즘과 Kafka의 트랜잭셔널 API를 결합하여 End-to-End Exactly-Once를 달성합니다.

Q5: dbt 증분 모델

dbt의 incremental 모델은 어떻게 동작하며, 언제 사용하나요?

정답:

dbt incremental 모델은 마지막 실행 이후 새로 추가되거나 변경된 데이터만 처리합니다.

동작 방식:

  1. 첫 실행 시 전체 데이터를 처리 (CREATE TABLE AS)
  2. 이후 실행 시 is_incremental() 조건으로 새 데이터만 필터링
  3. 새 데이터를 기존 테이블에 MERGE 또는 INSERT

사용 시기:

  • 대용량 팩트 테이블 (매번 전체 재구축이 비용이 높을 때)
  • 이벤트/로그 데이터 (시간순 append)
  • 점진적으로 증가하는 데이터

핵심은 unique_key 설정과 적절한 증분 조건(WHERE) 지정입니다.


14. 참고 자료

Data Engineering Pipeline Complete Guide 2025: ETL/ELT, Spark, Airflow, Real-Time Streaming

1. Data Engineering Overview

The Role of a Data Engineer

Data engineers design, build, and maintain an organization's data infrastructure. Their core responsibility is creating pipelines that collect, transform, and store data so that data scientists and analysts can derive insights.

Core Responsibilities of a Data Engineer
├── Data Ingestion
│   ├── Extract data from APIs, DBs, files, streams
│   └── Integrate diverse sources
├── Data Transformation
│   ├── Cleaning, normalization, aggregation
│   └── Apply business logic
├── Data Storage
│   ├── Data warehouse design
│   └── Data lake architecture
├── Pipeline Orchestration
│   ├── Workflow automation
│   └── Scheduling and monitoring
└── Data Quality Management
    ├── Data validation
    └── Data contracts

Essential Tech Stack

Programming      : Python, SQL, Scala/Java
Data Processing  : Spark, Flink, Beam
Orchestration    : Airflow, Dagster, Prefect
Streaming        : Kafka, Kinesis, Pub/Sub
Transformation   : dbt, Dataform
Cloud            : AWS (Redshift, Glue), GCP (BigQuery), Azure (Synapse)
Containers       : Docker, Kubernetes
IaC              : Terraform, Pulumi
Monitoring       : Datadog, Grafana, Monte Carlo

2. ETL vs ELT

Traditional ETL

ETL stands for Extract, Transform, Load. Data is extracted from the source, transformed on a dedicated server, then loaded into the final data store.

ETL Flow:
Source DB ──Extract──→ Transform Server ──Transform──→ Load──→ Data Warehouse
                       (ETL Server)
# Traditional ETL Example (Python)
import pandas as pd
from sqlalchemy import create_engine

# Extract: Pull data from source DB
source_engine = create_engine('postgresql://source_db:5432/sales')
raw_data = pd.read_sql('SELECT * FROM orders WHERE date >= %s', source_engine, params=['2025-01-01'])

# Transform: Apply transformations
transformed = raw_data.copy()
transformed['total_with_tax'] = transformed['total'] * 1.1
transformed['order_month'] = pd.to_datetime(transformed['order_date']).dt.to_period('M')
transformed = transformed.dropna(subset=['customer_id'])
transformed = transformed[transformed['total'] > 0]

# Load: Write to warehouse
wh_engine = create_engine('postgresql://warehouse:5432/analytics')
transformed.to_sql('fact_orders', wh_engine, if_exists='append', index=False)

Modern ELT

ELT stands for Extract, Load, Transform. Raw data is loaded into the warehouse first, then transformed within the warehouse itself.

ELT Flow:
Source DB ──Extract──→ Load──→ Data Warehouse ──Transform──→ Analytics Tables
                                (Transform with dbt, etc.)
-- ELT: Transform inside the warehouse with dbt
-- models/marts/fact_orders.sql
WITH source_orders AS (
    SELECT * FROM raw.orders
    WHERE order_date >= '2025-01-01'
),
cleaned AS (
    SELECT
        order_id,
        customer_id,
        total,
        total * 1.1 AS total_with_tax,
        DATE_TRUNC('month', order_date) AS order_month,
        order_date
    FROM source_orders
    WHERE customer_id IS NOT NULL
      AND total > 0
)
SELECT * FROM cleaned

ETL vs ELT Comparison

AspectETLELT
Transform LocationSeparate serverInside warehouse
ScalabilityDepends on ETL serverLeverages warehouse compute
Raw DataMay be lost after transformPreserved
CostETL server operation costWarehouse compute cost
FlexibilityRe-processing needed for logic changesFlexible re-transformation with SQL
Key ToolsInformatica, Talenddbt, Dataform
Best ForLegacy systems, regulatory requirementsCloud-native, big data

3. Batch vs Stream Processing

Batch Processing

Processes large volumes of data at scheduled intervals.

Batch Processing:
[Accumulate Data] ──→ [Bulk Process] ──→ [Store Results]
  (hourly/daily)       (Spark, etc.)     (Warehouse)

Characteristics:
- High throughput
- High latency (minutes to hours)
- Cost-efficient
- Easy to reprocess

Stream Processing

Processes data in real time as it arrives.

Stream Processing:
[Event Occurs] ──→ [Immediate Processing] ──→ [Real-Time Results]
  (continuous)      (Flink, etc.)              (Dashboard/Alerts)

Characteristics:
- Low latency (milliseconds to seconds)
- Continuous processing
- Complex failure handling
- Event ordering required

Selection Criteria

Choose Batch when:
  - Daily/weekly/monthly reports
  - Large-scale data aggregation
  - Cost optimization priority
  - No real-time requirement

Choose Stream when:
  - Real-time dashboards
  - Fraud detection
  - Real-time recommendations
  - IoT sensor data
  - Alerting

Hybrid (Lambda/Kappa):
  - Run batch + stream simultaneously
  - Real-time approximation + batch accuracy

4. Apache Spark

4.1 Spark Overview

Apache Spark is a unified analytics engine for large-scale data processing. Its in-memory computation delivers 100x faster performance than MapReduce.

Spark Architecture:
+---------------------------------+
|        Spark Application        |
+---------------------------------+
| SparkSQL  | Streaming |  MLlib  |
+---------------------------------+
|       DataFrame / Dataset       |
+---------------------------------+
|        RDD (Core Engine)        |
+---------------------------------+
| Standalone | YARN | Mesos | K8s |
+---------------------------------+

4.2 PySpark Basics

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lit

# Create SparkSession
spark = SparkSession.builder \
    .appName("DataPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read data
orders_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://data-lake/raw/orders/")

users_df = spark.read.parquet("s3://data-lake/raw/users/")

# Transform data
result = orders_df \
    .filter(col("status") == "completed") \
    .join(users_df, orders_df.user_id == users_df.id, "inner") \
    .groupBy("user_id", "username") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("amount").alias("total_spent"),
        avg("amount").alias("avg_order_value")
    ) \
    .filter(col("total_orders") > 5)

# Save results
result.write \
    .mode("overwrite") \
    .partitionBy("order_month") \
    .parquet("s3://data-lake/processed/user_summary/")

4.3 SparkSQL

# Register temp views
orders_df.createOrReplaceTempView("orders")
users_df.createOrReplaceTempView("users")

# SQL analytics
monthly_revenue = spark.sql("""
    SELECT
        DATE_TRUNC('month', order_date) AS month,
        COUNT(DISTINCT user_id) AS unique_customers,
        COUNT(*) AS total_orders,
        SUM(amount) AS revenue,
        AVG(amount) AS avg_order_value
    FROM orders
    WHERE status = 'completed'
    GROUP BY DATE_TRUNC('month', order_date)
    ORDER BY month
""")

monthly_revenue.show()

4.4 Partitioning and Caching

# Partition optimization
# Check current partitions
print(f"Partitions: {orders_df.rdd.getNumPartitions()}")

# Repartition (causes full shuffle)
orders_repartitioned = orders_df.repartition(100, "order_date")

# Coalesce (reduce partitions without shuffle)
orders_coalesced = orders_df.coalesce(50)

# Caching
from pyspark.storagelevel import StorageLevel

# Memory caching
orders_df.cache()
orders_df.count()  # Triggers cache materialization

# Memory + disk caching
users_df.persist(StorageLevel.MEMORY_AND_DISK)

# Release cache
orders_df.unpersist()

4.5 Spark Performance Tuning

# Broadcast join (for small tables)
from pyspark.sql.functions import broadcast

# Broadcast the smaller table
result = orders_df.join(
    broadcast(users_df),
    orders_df.user_id == users_df.id
)

# Enable AQE (Adaptive Query Execution)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Spark Performance Optimization Checklist:
  [ ] Set appropriate partition count (2-3x number of cores)
  [ ] Handle data skew (salting, AQE)
  [ ] Leverage broadcast joins
  [ ] Minimize unnecessary shuffles
  [ ] Column pruning (select only needed columns)
  [ ] Use Predicate Pushdown
  [ ] Establish caching strategy
  [ ] Optimize serialization format (Parquet, ORC)

5. Apache Airflow

5.1 Airflow Overview

Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It defines task dependencies using DAGs (Directed Acyclic Graphs).

Airflow Architecture:
+----------------------------------+
|           Web Server             |
|       (UI / REST API)            |
+----------------------------------+
|           Scheduler              |
|   (DAG Parsing, Task Scheduling) |
+----------------------------------+
|           Executor               |
|   (Local/Celery/Kubernetes)      |
+----------------------------------+
|        Metadata Database         |
|       (PostgreSQL/MySQL)         |
+----------------------------------+

5.2 Writing DAGs

# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['etl', 'daily'],
    max_active_runs=1,
) as dag:

    # Sensor: Wait for data arrival
    wait_for_data = FileSensor(
        task_id='wait_for_data',
        filepath='/data/raw/daily_export.csv',
        poke_interval=300,  # Check every 5 minutes
        timeout=3600,       # Wait up to 1 hour
        mode='poke',
    )

    # Extract
    def extract_data(**context):
        import pandas as pd
        execution_date = context['ds']
        df = pd.read_csv(f'/data/raw/daily_export_{execution_date}.csv')
        df.to_parquet(f'/data/staging/extract_{execution_date}.parquet')
        return len(df)

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    # Transform
    def transform_data(**context):
        import pandas as pd
        execution_date = context['ds']
        df = pd.read_parquet(f'/data/staging/extract_{execution_date}.parquet')

        # Data cleaning
        df = df.dropna(subset=['customer_id', 'amount'])
        df = df[df['amount'] > 0]
        df['amount_with_tax'] = df['amount'] * 1.1

        df.to_parquet(f'/data/staging/transform_{execution_date}.parquet')
        return len(df)

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    # Load
    load = PostgresOperator(
        task_id='load',
        postgres_conn_id='warehouse',
        sql='sql/load_daily_orders.sql',
    )

    # Data quality validation
    def validate_data(**context):
        execution_date = context['ds']
        row_count = context['ti'].xcom_pull(task_ids='transform')
        if row_count < 100:
            raise ValueError(f'Insufficient row count: {row_count}')

    validate = PythonOperator(
        task_id='validate',
        python_callable=validate_data,
    )

    # Notification
    notify = BashOperator(
        task_id='notify',
        bash_command='echo "ETL complete: {{ ds }}" | mail -s "ETL Success" team@company.com',
    )

    # Define dependencies
    wait_for_data >> extract >> transform >> load >> validate >> notify

5.3 TaskFlow API (Airflow 2.x)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['etl'],
)
def modern_etl_pipeline():

    @task()
    def extract():
        """Extract data from source"""
        import pandas as pd
        df = pd.read_csv('/data/raw/orders.csv')
        return df.to_dict()

    @task()
    def transform(raw_data: dict):
        """Transform and clean data"""
        import pandas as pd
        df = pd.DataFrame(raw_data)
        df = df[df['amount'] > 0]
        df['processed_at'] = datetime.now().isoformat()
        return df.to_dict()

    @task()
    def load(transformed_data: dict):
        """Load into warehouse"""
        import pandas as pd
        df = pd.DataFrame(transformed_data)
        print(f"Loaded {len(df)} records")

    # Automatic dependency resolution
    raw = extract()
    transformed = transform(raw)
    load(transformed)

# Instantiate the DAG
modern_etl_pipeline()

5.4 Connections, Variables, XCom

# Connections: Configure via UI or CLI
# airflow connections add 'warehouse' \
#   --conn-type 'postgres' \
#   --conn-host 'warehouse.example.com' \
#   --conn-port 5432 \
#   --conn-login 'etl_user' \
#   --conn-password 'secret'

# Variables
from airflow.models import Variable

env = Variable.get("environment", default_var="dev")
config = Variable.get("pipeline_config", deserialize_json=True)

# XCom: Pass data between tasks
def producer_task(**context):
    context['ti'].xcom_push(key='row_count', value=1000)

def consumer_task(**context):
    row_count = context['ti'].xcom_pull(
        task_ids='producer',
        key='row_count'
    )
    print(f"Previous task result: {row_count} rows")

5.5 Dynamic DAG Generation

# dags/dynamic_dag_factory.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# Configuration-driven dynamic DAG creation
configs = [
    {"name": "sales", "source": "mysql", "schedule": "@daily"},
    {"name": "users", "source": "postgres", "schedule": "@hourly"},
    {"name": "logs", "source": "s3", "schedule": "0 */6 * * *"},
]

def create_etl_dag(config):
    dag = DAG(
        dag_id=f"etl_{config['name']}",
        schedule_interval=config['schedule'],
        start_date=datetime(2025, 1, 1),
        catchup=False,
    )

    def process(**kwargs):
        print(f"Processing {config['name']} from {config['source']}")

    with dag:
        PythonOperator(
            task_id='process',
            python_callable=process,
        )

    return dag

for config in configs:
    globals()[f"etl_{config['name']}"] = create_etl_dag(config)

6. Real-Time Streaming

6.1 Apache Kafka

Kafka is a distributed event streaming platform and the backbone of large-scale real-time data pipelines.

Kafka Architecture:
Producer ──→ Broker(Topic/Partition) ──→ Consumer Group
                  |
                  +-- Partition 0: [msg1, msg2, msg3...]
                  +-- Partition 1: [msg4, msg5, msg6...]
                  +-- Partition 2: [msg7, msg8, msg9...]
# Kafka Producer (Python)
from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'acks': 'all',
    'retries': 3,
    'linger.ms': 10,
    'batch.size': 16384,
}

producer = Producer(config)

def delivery_callback(err, msg):
    if err:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered: {msg.topic()} [{msg.partition()}]')

# Send messages
for i in range(100):
    event = {
        'user_id': f'user_{i}',
        'action': 'page_view',
        'timestamp': '2025-03-25T10:00:00Z',
        'page': '/products'
    }
    producer.produce(
        topic='user-events',
        key=str(event['user_id']),
        value=json.dumps(event),
        callback=delivery_callback
    )

producer.flush()
# Kafka Consumer (Python)
from confluent_kafka import Consumer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'analytics-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}

consumer = Consumer(config)
consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))
        print(f"Received: {event['user_id']} - {event['action']}")

        # Manual commit
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Flink is a stateful stream processing engine that provides exactly-once semantics.

# PyFlink Stream Processing
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60000)  # Checkpoint every 60 seconds

t_env = StreamTableEnvironment.create(env)

# Define Kafka source table
t_env.execute_sql("""
    CREATE TABLE user_events (
        user_id STRING,
        action STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-processor',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# Window aggregation
t_env.execute_sql("""
    CREATE TABLE page_view_stats (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        page STRING,
        view_count BIGINT,
        unique_users BIGINT
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://analytics-db:5432/stats',
        'table-name' = 'page_view_stats',
        'driver' = 'org.postgresql.Driver'
    )
""")

t_env.execute_sql("""
    INSERT INTO page_view_stats
    SELECT
        window_start,
        window_end,
        action AS page,
        COUNT(*) AS view_count,
        COUNT(DISTINCT user_id) AS unique_users
    FROM TABLE(
        TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
    )
    GROUP BY window_start, window_end, action
""")

6.3 Exactly-Once Semantics

Delivery Guarantee Levels:
  At-most-once  : Messages may be lost, no duplicates
  At-least-once : No message loss, duplicates possible
  Exactly-once  : No message loss, no duplicates (hardest to achieve)

Kafka + Flink Exactly-Once:
  1. Kafka Transactional Producer
  2. Flink Checkpointing (Chandy-Lamport)
  3. Two-Phase Commit Protocol
  4. Kafka Consumer offsets linked to checkpoints

7. dbt (data build tool)

7.1 dbt Overview

dbt handles the T (Transform) in ELT. It lets you write data transformation logic in SQL while applying software engineering best practices (version control, testing, documentation) to data transformations.

dbt Project Structure:
my_dbt_project/
+-- dbt_project.yml
+-- profiles.yml
+-- models/
|   +-- staging/
|   |   +-- stg_orders.sql
|   |   +-- stg_customers.sql
|   |   +-- _staging_sources.yml
|   +-- intermediate/
|   |   +-- int_order_items_grouped.sql
|   +-- marts/
|       +-- dim_customers.sql
|       +-- fact_orders.sql
|       +-- _marts_schema.yml
+-- tests/
|   +-- assert_positive_revenue.sql
+-- macros/
|   +-- generate_schema_name.sql
+-- seeds/
    +-- country_codes.csv

7.2 Writing Models

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
    SELECT
        id AS order_id,
        user_id AS customer_id,
        amount AS order_amount,
        status AS order_status,
        created_at AS ordered_at
    FROM source
    WHERE status != 'cancelled'
)
SELECT * FROM renamed
-- models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
)
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.customer_segment,
    o.order_amount,
    o.order_amount * 1.1 AS amount_with_tax,
    o.order_status,
    o.ordered_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id

{% if is_incremental() %}
WHERE o.ordered_at > (SELECT MAX(ordered_at) FROM {{ this }})
{% endif %}

7.3 Sources and Tests

# models/staging/_staging_sources.yml
version: 2

sources:
  - name: raw
    database: raw_db
    schema: public
    tables:
      - name: orders
        loaded_at_field: _loaded_at
        freshness:
          warn_after:
            count: 12
            period: hour
          error_after:
            count: 24
            period: hour
        columns:
          - name: id
            tests:
              - unique
              - not_null
          - name: amount
            tests:
              - not_null

  - name: raw
    tables:
      - name: customers
        columns:
          - name: id
            tests:
              - unique
              - not_null
# models/marts/_marts_schema.yml
version: 2

models:
  - name: fact_orders
    description: "Orders fact table"
    columns:
      - name: order_id
        description: "Unique order ID"
        tests:
          - unique
          - not_null
      - name: order_amount
        tests:
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
-- tests/assert_positive_revenue.sql
-- Custom test: verify all revenue is positive
SELECT order_id, order_amount
FROM {{ ref('fact_orders') }}
WHERE order_amount < 0

7.4 dbt Commands

# Build all models
dbt run

# Build a specific model
dbt run --select fact_orders

# Build model + downstream models
dbt run --select stg_orders+

# Run tests
dbt test

# Generate documentation
dbt docs generate
dbt docs serve

# Check source freshness
dbt source freshness

# Load seed data
dbt seed

# Full pipeline (build + test)
dbt build

8. Data Warehouses

Comparison

AspectBigQuerySnowflakeRedshift
VendorGoogle CloudSnowflakeAWS
ArchitectureServerlessCompute/storage separationMPP cluster
PricingPer-query (on-demand)Credit-basedPer-node-hour
ScalabilityAutomaticWarehouse resizingAdd nodes
Concurrency2000+ slotsMulti-clusterWLM configuration
Semi-structuredSTRUCT, ARRAYVARIANTSUPER
ML IntegrationBigQuery MLSnowparkRedshift ML
Cost EfficiencyBest for small scaleBest for medium scaleBest for large always-on

BigQuery Example

-- BigQuery: Partitioning + Clustering
CREATE TABLE analytics.fact_orders
PARTITION BY DATE(ordered_at)
CLUSTER BY customer_segment, order_status
AS
SELECT
    order_id,
    customer_id,
    customer_segment,
    order_amount,
    order_status,
    ordered_at
FROM staging.orders;

-- Cost estimation (dry run)
-- 1TB scanned = approximately $5 (on-demand)

Snowflake Example

-- Snowflake: Warehouse creation and management
CREATE WAREHOUSE etl_wh
    WITH WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE
    MIN_CLUSTER_COUNT = 1
    MAX_CLUSTER_COUNT = 3;

-- Data loading
COPY INTO raw.orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
PATTERN = '.*[.]parquet';

9. Data Lake / Lakehouse

Table Format Comparison

Traditional Data Lake Problems:
  - No ACID transactions
  - No schema enforcement
  - No time travel
  - Small files problem

Lakehouse Table Formats Solve These:
  Delta Lake     : Led by Databricks, best Spark integration
  Apache Iceberg : Developed by Netflix, vendor-neutral
  Apache Hudi    : Developed by Uber, specializes in incremental processing
FeatureDelta LakeApache IcebergApache Hudi
ACID TransactionsYesYesYes
Schema EvolutionYesYesYes
Time TravelYesYesYes
Partition EvolutionLimitedYes (hidden partitioning)Limited
Engine CompatibilitySpark-centricSpark, Flink, TrinoSpark, Flink
Primary PlatformDatabricksMulti-vendor adoptionAWS-centric
# Delta Lake Example (PySpark)
from delta.tables import DeltaTable

# Create Delta table
orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .save("s3://data-lake/delta/orders")

# UPSERT (Merge)
delta_table = DeltaTable.forPath(spark, "s3://data-lake/delta/orders")

delta_table.alias("target").merge(
    new_orders_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time Travel
old_data = spark.read \
    .format("delta") \
    .option("versionAsOf", 5) \
    .load("s3://data-lake/delta/orders")

10. Data Quality

10.1 Great Expectations

import great_expectations as gx

context = gx.get_context()

# Connect data source
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("orders", filepath_or_buffer="orders.csv")

# Define Expectation Suite
suite = context.add_expectation_suite("orders_validation")

# Define expectations
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=100000
    )
)

# Run validation
results = context.run_checkpoint(
    checkpoint_name="orders_checkpoint"
)
print(f"Success: {results.success}")

10.2 Data Contracts

# data-contracts/orders-contract.yaml
dataContractSpecification: 0.9.3
id: orders-contract
info:
  title: Orders Data Contract
  version: 1.0.0
  owner: data-team
  contact:
    email: data-team@company.com

schema:
  type: object
  properties:
    order_id:
      type: string
      description: "Unique order identifier"
      required: true
      unique: true
    customer_id:
      type: string
      required: true
    amount:
      type: number
      minimum: 0
      maximum: 100000
    status:
      type: string
      enum: ["pending", "completed", "cancelled"]
    created_at:
      type: timestamp
      required: true

quality:
  completeness:
    - field: order_id
      threshold: 100
    - field: customer_id
      threshold: 99.9
  freshness:
    maxDelay: "PT1H"  # Within 1 hour

11. Orchestration Comparison

Airflow vs Dagster vs Prefect vs Mage

AspectAirflowDagsterPrefectMage
ApproachDAG-centricAsset-centricFlow-centricBlock-centric
Learning CurveHighMediumLowLow
Local DevelopmentComplexExcellentExcellentExcellent
TestingDifficultBuilt-in supportGoodGood
UIFunctionalModernModernModern
CommunityVery largeGrowingGrowingSmall
Production Track RecordExtensiveIncreasingIncreasingEarly stage
Cloud OfferingMWAA, ComposerDagster CloudPrefect CloudMage Pro
Selection Guide:
+-- Large enterprise, complex workflows --> Airflow
+-- Data asset-centric thinking --> Dagster
+-- Quick start, Python-native --> Prefect
+-- No-code/low-code preference --> Mage

12. Modern Data Stack Diagram

Modern Data Stack (2025):

Data Sources           Ingestion            Storage           Transform          Analytics/BI
-----------           ----------          ----------        ----------         ----------
 SaaS APIs  --+
 Databases  --+---> Fivetran/Airbyte ---> Snowflake   --->    dbt       --->  Looker
 Event Logs --+                           BigQuery          Dataform          Metabase
 Files      --+                           Redshift                           Tableau

 Kafka/     -------> Flink/Spark    ---> Delta Lake   --->  Spark SQL  --->  Real-time
 Kinesis              Streaming          Iceberg                            Dashboards

                                Orchestration: Airflow / Dagster
                                Quality: Great Expectations / dbt tests
                                Catalog: DataHub / Atlan / OpenMetadata
                                Monitoring: Monte Carlo / Datadog

13. Quiz

Q1: ETL vs ELT

What is the core difference between ETL and ELT, and when should you choose ELT?

Answer:

The core difference is where transformation occurs. ETL transforms data on a separate server before loading, while ELT loads raw data into the warehouse first and transforms it there.

Choose ELT when:

  • Using cloud warehouses (BigQuery, Snowflake) with elastic compute
  • Raw data preservation is important
  • Transformation logic changes frequently and flexibility is needed
  • You want SQL-based transformation with tools like dbt

Q2: Spark Partitioning

What is the difference between repartition() and coalesce() in Spark?

Answer:

repartition() performs a full shuffle to redistribute data evenly across the specified number of partitions. Use it when increasing partitions or partitioning by a specific column.

coalesce() reduces partition count without a full shuffle. It merges existing partitions and can only decrease the number of partitions, with lower network overhead.

Use coalesce() when reducing partitions, repartition() when increasing or needing even distribution.

Q3: Airflow XCom

What is the role and limitations of XCom in Airflow?

Answer:

XCom (Cross-Communication) is a mechanism for passing small amounts of data between Airflow tasks. Data is stored in the metadata database.

Limitations:

  • Only for small data (default 48KB, max a few MB)
  • Large datasets should use external storage (S3/GCS), passing only the file path via XCom
  • Only JSON-serializable data can be passed
  • Can put load on the metadata database

Alternative: For large data, use temporary files or cloud storage, and pass only the file path through XCom.

Q4: Exactly-Once Semantics

How do you implement exactly-once semantics with Kafka?

Answer:

Kafka exactly-once is implemented through three components:

  1. Idempotent Producer: Setting enable.idempotence=true allows brokers to automatically deduplicate messages.

  2. Transactional Producer: Guarantees atomic writes across multiple partitions/topics. Uses initTransactions(), beginTransaction(), commitTransaction() APIs.

  3. Consumer read_committed: Setting isolation.level=read_committed ensures consumers only read messages from committed transactions.

When combined with Flink, Flink's checkpointing mechanism and Kafka's transactional API work together to achieve end-to-end exactly-once delivery.

Q5: dbt Incremental Models

How do dbt incremental models work, and when should you use them?

Answer:

dbt incremental models process only new or changed data since the last run.

How it works:

  1. First run processes all data (CREATE TABLE AS)
  2. Subsequent runs filter new data using the is_incremental() condition
  3. New data is MERGEd or INSERTed into the existing table

When to use:

  • Large fact tables (where full rebuilds are expensive)
  • Event/log data (time-ordered appends)
  • Data that grows incrementally

The key is setting a proper unique_key and appropriate incremental filter condition (WHERE clause).


14. References