Skip to content

Split View: 데이터 엔지니어링 입문 — ETL, 데이터 웨어하우스, 스트리밍, 데이터 레이크

|

데이터 엔지니어링 입문 — ETL, 데이터 웨어하우스, 스트리밍, 데이터 레이크

목차

  1. 데이터 엔지니어링이란
  2. 데이터 아키텍처
  3. ETL vs ELT
  4. 배치 처리
  5. 스트림 처리
  6. 워크플로우 오케스트레이션
  7. 데이터 웨어하우스
  8. 데이터 품질
  9. 데이터 거버넌스
  10. 실전 파이프라인 예제

1. 데이터 엔지니어링이란

데이터 엔지니어링은 원시 데이터를 수집, 변환, 저장하여 분석 가능한 상태로 만드는 일련의 과정이다. 데이터 과학자가 모델을 만들고, 분석가가 인사이트를 도출하려면, 먼저 데이터가 깨끗하고 접근 가능한 형태로 준비되어 있어야 한다. 이 준비 작업을 책임지는 것이 데이터 엔지니어이다.

역할 비교

역할주요 업무핵심 기술
데이터 엔지니어파이프라인 구축, 인프라 관리, 데이터 변환Python, SQL, Spark, Kafka, Airflow
데이터 사이언티스트모델링, 예측, 실험 설계Python, R, TensorFlow, 통계학
데이터 분석가리포팅, 대시보드, 비즈니스 인사이트SQL, Tableau, Excel, BI 도구

데이터 엔지니어는 신뢰할 수 있는 데이터 인프라를 만드는 사람이다. 아무리 뛰어난 모델도 데이터 파이프라인이 불안정하면 의미가 없다.

데이터 엔지니어의 핵심 역량

  • SQL 숙련도: 복잡한 조인, 윈도우 함수, CTE 등을 자유자재로 사용
  • 프로그래밍: Python이 사실상 표준. Scala, Java도 Spark 생태계에서 사용
  • 분산 시스템 이해: 파티셔닝, 복제, CAP 정리 등 기본 개념 숙지
  • 클라우드 서비스: AWS, GCP, Azure의 데이터 관련 서비스 활용 능력
  • 데이터 모델링: 정규화, 비정규화, 차원 모델링에 대한 이해

2. 데이터 아키텍처

현대 데이터 아키텍처는 크게 네 가지 패턴으로 분류된다.

2.1 데이터 레이크 (Data Lake)

데이터 레이크는 구조화, 반구조화, 비구조화 데이터를 원시 형태 그대로 저장하는 대규모 저장소이다. 스키마를 읽기 시점에 적용한다(Schema-on-Read).

원시 데이터 저장소 (Data Lake)
├── 구조화 데이터 (CSV, Parquet, ORC)
├── 반구조화 데이터 (JSON, XML, Avro)
└── 비구조화 데이터 (이미지, 로그, 텍스트)

장점: 저렴한 스토리지, 유연한 스키마, 모든 형태의 데이터 수용 단점: 관리가 어렵고, 잘못하면 "데이터 늪(Data Swamp)"이 될 수 있음

대표 기술: AWS S3, Azure Data Lake Storage, Google Cloud Storage

2.2 데이터 웨어하우스 (Data Warehouse)

데이터 웨어하우스는 분석에 최적화된 구조화 데이터 저장소이다. 스키마를 쓰기 시점에 적용한다(Schema-on-Write).

분석용 저장소 (Data Warehouse)
├── 팩트 테이블 (매출, 주문, 클릭)
├── 디멘전 테이블 (사용자, 상품, 시간)
└── 집계 테이블 (일별/월별 요약)

장점: 빠른 쿼리 성능, 일관된 스키마, ACID 트랜잭션 지원 단점: 비구조화 데이터 처리 한계, 스키마 변경이 어려움

대표 기술: Snowflake, BigQuery, Amazon Redshift

2.3 레이크하우스 (Lakehouse)

레이크하우스는 데이터 레이크의 유연성과 데이터 웨어하우스의 관리 기능을 결합한 아키텍처이다.

핵심 기능:

  • ACID 트랜잭션 지원
  • 스키마 강제 및 발전(Schema Evolution)
  • 데이터 레이크 위에서 SQL 분석 가능
  • 스트리밍과 배치 처리 통합

대표 기술: Delta Lake, Apache Iceberg, Apache Hudi

2.4 메달리온 아키텍처 (Medallion Architecture)

메달리온 아키텍처는 데이터를 세 단계로 계층화하여 관리한다.

Bronze (원시 데이터)
  → 소스에서 그대로 수집한 데이터
  → 최소한의 변환만 적용

Silver (정제 데이터)
  → 중복 제거, 타입 변환, 검증 완료
  → 비즈니스 로직 적용 전 상태

Gold (비즈니스 데이터)
  → 집계, 결합, 비즈니스 규칙 적용
  → 대시보드와 ML 모델에서 직접 사용

이 패턴은 Databricks에서 대중화되었으며, 데이터 품질을 단계별로 보장할 수 있다는 장점이 있다.


3. ETL vs ELT

3.1 ETL (Extract, Transform, Load)

ETL은 전통적인 데이터 통합 방식이다.

소스 → [추출][변환][적재] → 웨어하우스
  1. Extract: 소스 시스템에서 데이터 추출
  2. Transform: 스테이징 영역에서 데이터 정제, 변환, 집계
  3. Load: 변환된 데이터를 타겟 시스템에 적재

3.2 ELT (Extract, Load, Transform)

ELT는 클라우드 웨어하우스의 강력한 컴퓨팅 파워를 활용하는 현대적 방식이다.

소스 → [추출][적재] → 웨어하우스 → [변환]
  1. Extract: 소스에서 데이터 추출
  2. Load: 원시 데이터를 그대로 웨어하우스에 적재
  3. Transform: 웨어하우스 내부에서 SQL로 변환

3.3 언제 무엇을 사용하는가

기준ETLELT
데이터 볼륨소~중규모대규모
변환 복잡도복잡한 비즈니스 로직SQL로 표현 가능한 변환
인프라온프레미스클라우드
데이터 보안민감 데이터 사전 마스킹 필요웨어하우스 내 접근 제어로 충분
대표 도구Informatica, Talenddbt, Fivetran, Airbyte

3.4 주요 도구

dbt (data build tool): SQL 기반 변환 도구. ELT의 T 부분을 담당한다.

-- dbt 모델 예시: 일별 매출 집계
-- models/marts/daily_revenue.sql

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

payments AS (
    SELECT * FROM {{ ref('stg_payments') }}
)

SELECT
    o.order_date,
    COUNT(DISTINCT o.order_id) AS total_orders,
    SUM(p.amount) AS total_revenue,
    AVG(p.amount) AS avg_order_value
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.status = 'completed'
GROUP BY o.order_date

Airbyte: 오픈소스 데이터 통합 플랫폼. 300개 이상의 커넥터를 제공한다.

Fivetran: 관리형 데이터 통합 서비스. 설정이 간편하고 안정적이다.


4. 배치 처리

배치 처리는 축적된 대량의 데이터를 한꺼번에 처리하는 방식이다. 실시간성이 필요하지 않은 대부분의 분석 작업에 적합하다.

4.1 Apache Spark

Apache Spark는 대규모 데이터 처리를 위한 통합 분석 엔진이다.

핵심 특징:

  • 인메모리 처리: MapReduce 대비 최대 100배 빠른 성능
  • 통합 API: 배치, 스트리밍, ML, 그래프 처리를 하나의 프레임워크로
  • 다양한 언어 지원: Python(PySpark), Scala, Java, R, SQL
# PySpark 예시: 일별 매출 집계
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count

spark = SparkSession.builder \
    .appName("DailyRevenue") \
    .getOrCreate()

# 데이터 읽기
orders = spark.read.parquet("s3://data-lake/orders/")
payments = spark.read.parquet("s3://data-lake/payments/")

# 변환
daily_revenue = (
    orders
    .join(payments, "order_id")
    .filter(col("status") == "completed")
    .groupBy("order_date")
    .agg(
        count("order_id").alias("total_orders"),
        spark_sum("amount").alias("total_revenue")
    )
    .orderBy("order_date")
)

# 결과 저장
daily_revenue.write \
    .mode("overwrite") \
    .parquet("s3://data-warehouse/daily_revenue/")

4.2 Spark SQL과 DataFrame

Spark SQL을 사용하면 SQL에 익숙한 분석가도 대규모 데이터를 처리할 수 있다.

# DataFrame 등록 후 SQL 사용
orders.createOrReplaceTempView("orders")
payments.createOrReplaceTempView("payments")

result = spark.sql("""
    SELECT
        o.order_date,
        COUNT(DISTINCT o.order_id) AS total_orders,
        SUM(p.amount) AS total_revenue
    FROM orders o
    JOIN payments p ON o.order_id = p.order_id
    WHERE p.status = 'completed'
    GROUP BY o.order_date
    ORDER BY o.order_date
""")

4.3 MapReduce와의 비교

기준MapReduceSpark
처리 속도디스크 기반, 느림인메모리 기반, 빠름
프로그래밍 모델Map과 Reduce 두 단계풍부한 변환 연산 제공
실시간 처리불가Structured Streaming 지원
학습 곡선높음상대적으로 낮음
생태계Hadoop 생태계독립적 + Hadoop 호환

5. 스트림 처리

스트림 처리는 데이터가 생성되는 즉시 실시간으로 처리하는 방식이다.

5.1 Apache Kafka

Kafka는 분산 이벤트 스트리밍 플랫폼이다. 실시간 데이터 파이프라인과 스트리밍 애플리케이션의 핵심 인프라로 사용된다.

핵심 개념:

  • Topic: 메시지가 발행되는 카테고리
  • Producer: 토픽에 메시지를 발행하는 주체
  • Consumer: 토픽에서 메시지를 구독하는 주체
  • Broker: 메시지를 저장하고 전달하는 서버
  • Partition: 토픽을 분할하여 병렬 처리 지원
  • Consumer Group: 여러 컨슈머가 토픽을 분담하여 처리
# Kafka Producer 예시 (Python)
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 주문 이벤트 발행
event = {
    "order_id": "ORD-12345",
    "user_id": "USR-678",
    "amount": 45000,
    "timestamp": "2026-04-13T10:30:00Z"
}

producer.send('order-events', value=event)
producer.flush()
# Kafka Consumer 예시 (Python)
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'order-events',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processing-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

for message in consumer:
    order = message.value
    print(f"주문 처리: {order['order_id']}, 금액: {order['amount']}")

5.2 CDC (Change Data Capture)

CDC는 데이터베이스의 변경 사항을 실시간으로 캡처하여 다른 시스템에 전파하는 기술이다.

운영 DB[CDC]Kafka[Stream Processing] → 웨어하우스
                                                → 검색 엔진
                                                → 캐시

대표 도구: Debezium. MySQL, PostgreSQL, MongoDB 등 다양한 DB에서 변경 이벤트를 캡처하여 Kafka로 전송한다.

{
  "before": null,
  "after": {
    "id": 1001,
    "name": "김철수",
    "email": "kim@example.com"
  },
  "source": {
    "connector": "postgresql",
    "db": "users_db",
    "table": "users"
  },
  "op": "c",
  "ts_ms": 1681364400000
}

위 JSON은 Debezium이 캡처한 CDC 이벤트의 예시이다. "op": "c"는 INSERT 작업을 의미한다.

Flink는 상태 기반 스트림 처리 엔진이다. 정확히 한 번(Exactly-once) 처리를 보장하며, 이벤트 시간 기반 윈도우 처리에 강점이 있다.

// Flink 스트림 처리 예시 (Java)
DataStream<OrderEvent> orders = env
    .addSource(new FlinkKafkaConsumer<>(
        "order-events",
        new OrderEventSchema(),
        kafkaProps
    ));

// 5분 단위 윈도우 집계
DataStream<WindowedRevenue> revenue = orders
    .keyBy(OrderEvent::getCategory)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new RevenueAggregator());

revenue.addSink(new JdbcSink<>(...));

5.4 배치 vs 스트리밍 비교

기준배치 처리스트림 처리
지연 시간분~시간밀리초~초
데이터 완전성완전한 데이터셋점진적으로 도착
복잡도상대적으로 단순상태 관리, 순서 보장 등 복잡
비용상대적으로 저렴항상 실행 중이므로 비용 높음
적합한 사용 사례일별 리포트, ML 학습실시간 대시보드, 이상 탐지

6. 워크플로우 오케스트레이션

6.1 Apache Airflow

Airflow는 워크플로우를 프로그래밍 방식으로 작성, 스케줄링, 모니터링하는 플랫폼이다. 복잡한 데이터 파이프라인을 DAG(Directed Acyclic Graph)로 정의한다.

핵심 개념:

  • DAG: 작업의 실행 순서와 의존성을 정의하는 비순환 방향 그래프
  • Operator: 실제 작업을 수행하는 단위 (BashOperator, PythonOperator 등)
  • Task: DAG 내의 개별 작업 인스턴스
  • Sensor: 특정 조건이 충족될 때까지 대기하는 특수 Operator
  • XCom: Task 간 데이터 전달 메커니즘
# Airflow DAG 예시
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['team@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_revenue_pipeline',
    default_args=default_args,
    description='일별 매출 데이터 처리 파이프라인',
    schedule_interval='0 2 * * *',  # 매일 오전 2시
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['revenue', 'daily'],
) as dag:

    # 1. 소스 데이터 존재 확인
    check_source = S3KeySensor(
        task_id='check_source_data',
        bucket_name='raw-data-bucket',
        bucket_key='orders/{{ ds }}/*.parquet',
        timeout=3600,
        poke_interval=300,
    )

    # 2. 데이터 추출
    extract = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders_from_source,
    )

    # 3. 데이터 변환
    transform = PythonOperator(
        task_id='transform_orders',
        python_callable=transform_and_aggregate,
    )

    # 4. 웨어하우스 적재
    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_snowflake,
    )

    # 5. 데이터 품질 검증
    validate = PythonOperator(
        task_id='validate_data_quality',
        python_callable=run_quality_checks,
    )

    # 의존성 정의
    check_source >> extract >> transform >> load >> validate

6.2 DAG 설계 모범 사례

  1. 멱등성(Idempotency): 같은 작업을 여러 번 실행해도 결과가 동일해야 한다
  2. 원자성(Atomicity): 각 Task는 하나의 논리적 단위만 수행한다
  3. 재시도 전략: 일시적 오류를 대비한 적절한 재시도 설정
  4. 모니터링: SLA 설정, 실패 알림, 실행 시간 추적
  5. 테스트: DAG 구조 테스트, Task 단위 테스트 필수

6.3 다른 오케스트레이션 도구

도구특징적합한 경우
Apache AirflowPython 기반, 풍부한 생태계복잡한 배치 파이프라인
Prefect현대적 API, 동적 워크플로우유연한 워크플로우 필요
Dagster데이터 자산 중심, 강력한 타입 시스템데이터 품질 중시
Mage노트북 스타일 인터페이스빠른 프로토타이핑
AWS Step Functions서버리스, AWS 네이티브AWS 중심 아키텍처

7. 데이터 웨어하우스

7.1 주요 클라우드 웨어하우스

Snowflake

  • 컴퓨팅과 스토리지 분리 아키텍처
  • 멀티클라우드 지원 (AWS, Azure, GCP)
  • Time Travel, Zero-Copy Clone 같은 고유 기능
  • 동시성(Concurrency) 처리에 강점

Google BigQuery

  • 서버리스 아키텍처 (인프라 관리 불필요)
  • 쿼리 기반 과금 모델
  • ML 모델을 SQL로 생성 가능 (BigQuery ML)
  • 대규모 데이터 분석에 최적화

Amazon Redshift

  • AWS 생태계와 긴밀한 통합
  • Redshift Serverless로 서버리스 옵션 제공
  • Redshift Spectrum으로 S3 데이터 직접 쿼리 가능
  • 기존 PostgreSQL 도구와 호환

7.2 차원 모델링

데이터 웨어하우스에서 가장 널리 사용되는 모델링 기법은 스타 스키마스노우플레이크 스키마이다.

스타 스키마

중앙에 팩트 테이블이 있고, 주변에 디멘전 테이블이 직접 연결되는 구조이다.

-- 팩트 테이블
CREATE TABLE fact_sales (
    sale_id         BIGINT PRIMARY KEY,
    date_key        INT REFERENCES dim_date(date_key),
    product_key     INT REFERENCES dim_product(product_key),
    customer_key    INT REFERENCES dim_customer(customer_key),
    store_key       INT REFERENCES dim_store(store_key),
    quantity        INT,
    unit_price      DECIMAL(10,2),
    total_amount    DECIMAL(12,2),
    discount_amount DECIMAL(10,2)
);

-- 디멘전 테이블
CREATE TABLE dim_product (
    product_key     INT PRIMARY KEY,
    product_id      VARCHAR(50),
    product_name    VARCHAR(200),
    category        VARCHAR(100),
    subcategory     VARCHAR(100),
    brand           VARCHAR(100),
    unit_cost       DECIMAL(10,2)
);

CREATE TABLE dim_date (
    date_key        INT PRIMARY KEY,
    full_date       DATE,
    year            INT,
    quarter         INT,
    month           INT,
    week            INT,
    day_of_week     VARCHAR(20),
    is_holiday      BOOLEAN
);

스노우플레이크 스키마

디멘전 테이블이 더 세분화된 하위 테이블로 정규화된 구조이다. 스타 스키마보다 저장 공간을 절약하지만 조인이 많아져 쿼리가 복잡해진다.

7.3 SCD (Slowly Changing Dimension)

디멘전 데이터의 변경 이력을 관리하는 방법이다.

유형설명예시
SCD Type 1기존 값을 덮어씀고객 전화번호 변경 시 이전 번호 삭제
SCD Type 2새로운 행을 추가하여 이력 보존고객 주소 변경 시 유효 기간과 함께 새 행 추가
SCD Type 3이전 값과 현재 값을 별도 컬럼으로 관리current_address, previous_address

8. 데이터 품질

데이터 품질은 파이프라인의 신뢰성을 결정하는 핵심 요소이다. "쓰레기가 들어가면 쓰레기가 나온다(Garbage In, Garbage Out)"는 데이터 분야의 오래된 격언이다.

8.1 데이터 품질의 6가지 차원

  1. 정확성(Accuracy): 데이터가 실제 값을 올바르게 반영하는가
  2. 완전성(Completeness): 누락된 데이터가 없는가
  3. 일관성(Consistency): 여러 시스템 간 데이터가 모순되지 않는가
  4. 적시성(Timeliness): 데이터가 필요한 시점에 이용 가능한가
  5. 유일성(Uniqueness): 중복 데이터가 없는가
  6. 유효성(Validity): 데이터가 정의된 규칙과 형식을 따르는가

8.2 Great Expectations

Great Expectations는 데이터 검증, 문서화, 프로파일링을 위한 오픈소스 라이브러리이다.

import great_expectations as gx

# 데이터 컨텍스트 생성
context = gx.get_context()

# 데이터소스 연결
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset(name="orders")

# 기대치(Expectation) 정의
batch = data_asset.build_batch_request(dataframe=df)
validator = context.get_validator(batch_request=batch)

# 데이터 품질 규칙 정의
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between(
    "amount", min_value=0, max_value=1000000
)
validator.expect_column_values_to_be_in_set(
    "status", ["pending", "completed", "cancelled", "refunded"]
)
validator.expect_column_values_to_match_regex(
    "email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)

# 검증 실행
results = validator.validate()
print(f"성공 여부: {results.success}")

8.3 데이터 관측성 (Data Observability)

데이터 관측성은 데이터 시스템의 상태를 지속적으로 모니터링하여 문제를 조기에 탐지하는 것이다.

핵심 지표:

  • Freshness: 데이터가 얼마나 최신인가
  • Volume: 예상 대비 데이터 양 변화
  • Schema: 스키마 변경 감지
  • Distribution: 데이터 분포 이상 탐지
  • Lineage: 데이터의 출처와 흐름 추적

대표 도구: Monte Carlo, Atlan, Soda, Elementary


9. 데이터 거버넌스

9.1 메타데이터 관리

메타데이터는 "데이터에 대한 데이터"로, 크게 세 가지로 분류된다.

  • 기술 메타데이터: 스키마, 데이터 타입, 파티션 정보, 저장 위치
  • 비즈니스 메타데이터: 데이터 정의, 소유자, SLA, 비즈니스 규칙
  • 운영 메타데이터: 처리 시간, 행 수, 에러 로그, 접근 이력

9.2 데이터 카탈로그

데이터 카탈로그는 조직 내 모든 데이터 자산을 검색, 탐색, 이해할 수 있게 하는 도구이다.

주요 기능:

  • 자동 메타데이터 수집 및 인덱싱
  • 데이터 리니지(Lineage) 시각화
  • 데이터 사전(Dictionary) 관리
  • 태그 및 분류 체계
  • 협업 기능 (댓글, 평가, 위키)

대표 도구: Apache Atlas, DataHub, Atlan, Alation

9.3 접근 제어

데이터 접근 제어의 핵심 원칙:

  1. 최소 권한 원칙: 업무에 필요한 최소한의 권한만 부여
  2. 역할 기반 접근 제어(RBAC): 역할 단위로 권한 관리
-- Snowflake RBAC 예시
CREATE ROLE data_analyst;
CREATE ROLE data_engineer;
CREATE ROLE data_admin;

-- 분석가 역할: 읽기만 가능
GRANT USAGE ON DATABASE analytics_db TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics_db.gold TO ROLE data_analyst;

-- 엔지니어 역할: 읽기/쓰기 가능
GRANT ALL PRIVILEGES ON DATABASE analytics_db TO ROLE data_engineer;
GRANT ALL PRIVILEGES ON ALL SCHEMAS IN DATABASE analytics_db TO ROLE data_engineer;

-- 사용자에게 역할 할당
GRANT ROLE data_analyst TO USER analyst_kim;
GRANT ROLE data_engineer TO USER engineer_park;
  1. 행 수준 보안(Row-Level Security): 사용자별로 접근 가능한 행을 제한
  2. 열 수준 보안(Column-Level Security): 민감 컬럼에 대한 접근 제한 (마스킹 포함)
  3. 감사 로그(Audit Log): 모든 데이터 접근 이력 기록

10. 실전 파이프라인 예제

10.1 전체 아키텍처

실제 환경에서의 엔드투엔드 데이터 파이프라인 구성 예시이다.

[소스 시스템]
  ├── 운영 DB (PostgreSQL)  ─── Debezium CDC ──┐
  ├── 웹 이벤트 (Clickstream) ─── SDK ──────────┤
  ├── 외부 API ─── Airbyte ─────────────────────┤
  └── 파일 (CSV/Excel) ─── Airflow ─────────────┤
[메시지 브로커]  └── Apache Kafka ◄────────────────────────────┘
       ├── 실시간 경로: Flink → 실시간 대시보드
       └── 배치 경로:  SparkS3 (Data Lake)
[변환 레이어]  └── dbt (Silver/Gold) ◄──────┘
[웨어하우스]
  └── Snowflake
       ├── Gold 레이어 → Looker/Tableau 대시보드
       └── ML Feature Store → 모델 학습

10.2 Airflow로 전체 파이프라인 오케스트레이션

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'execution_timeout': timedelta(hours=2),
}

with DAG(
    dag_id='e2e_data_pipeline',
    default_args=default_args,
    schedule_interval='0 3 * * *',
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # 1단계: Spark로 원시 데이터 처리
    spark_process = SparkSubmitOperator(
        task_id='spark_raw_processing',
        application='s3://scripts/process_raw_data.py',
        conn_id='spark_default',
        conf={
            'spark.executor.memory': '4g',
            'spark.executor.cores': '2',
        },
    )

    # 2단계: dbt로 데이터 변환
    dbt_transform = DbtCloudRunJobOperator(
        task_id='dbt_transform',
        job_id=12345,
        check_interval=30,
        timeout=3600,
    )

    # 3단계: 데이터 품질 검증
    quality_check = PythonOperator(
        task_id='data_quality_check',
        python_callable=run_great_expectations_suite,
    )

    # 4단계: 완료 알림
    notify = SlackWebhookOperator(
        task_id='slack_notification',
        slack_webhook_conn_id='slack_webhook',
        message='Daily pipeline completed successfully.',
    )

    spark_process >> dbt_transform >> quality_check >> notify

10.3 Kafka + Spark 실시간 처리 예제

# Spark Structured Streaming + Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("RealTimeRevenue") \
    .getOrCreate()

# Kafka 스키마 정의
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("user_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("category", StringType()),
    StructField("timestamp", TimestampType()),
])

# Kafka에서 스트리밍 데이터 읽기
raw_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "order-events")
    .option("startingOffsets", "latest")
    .load()
)

# JSON 파싱
orders = (
    raw_stream
    .select(from_json(
        col("value").cast("string"),
        order_schema
    ).alias("data"))
    .select("data.*")
)

# 5분 윈도우 집계
windowed_revenue = (
    orders
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window("timestamp", "5 minutes"),
        "category"
    )
    .agg(spark_sum("amount").alias("revenue"))
)

# 결과를 콘솔에 출력 (프로덕션에서는 DB나 대시보드로)
query = (
    windowed_revenue.writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="1 minute")
    .start()
)

query.awaitTermination()

마무리

데이터 엔지니어링은 넓은 범위의 기술과 개념을 포함하는 분야이다. 이 글에서 다룬 내용을 요약하면 다음과 같다.

영역핵심 기술대표 도구
데이터 저장레이크, 웨어하우스, 레이크하우스S3, Snowflake, Delta Lake
데이터 통합ETL/ELTdbt, Airbyte, Fivetran
배치 처리분산 컴퓨팅Apache Spark
스트림 처리이벤트 스트리밍, CDCApache Kafka, Flink, Debezium
오케스트레이션워크플로우 관리Apache Airflow, Dagster
데이터 품질검증, 관측성Great Expectations, Monte Carlo
거버넌스메타데이터, 접근 제어DataHub, Apache Atlas

처음부터 모든 기술을 한꺼번에 배울 필요는 없다. SQL과 Python을 기반으로 하나의 파이프라인을 직접 구축해보는 것이 가장 효과적인 학습 방법이다. 작은 프로젝트에서 시작하여 점진적으로 확장해 나가자.

퀴즈: 데이터 엔지니어링 기본 개념

Q1. 데이터 레이크와 데이터 웨어하우스의 가장 큰 차이점은?

A: 데이터 레이크는 Schema-on-Read 방식으로 원시 데이터를 그대로 저장하고 읽는 시점에 스키마를 적용한다. 데이터 웨어하우스는 Schema-on-Write 방식으로 데이터 저장 시점에 미리 정의된 스키마를 강제한다.

Q2. ETL과 ELT에서 변환이 일어나는 위치의 차이는?

A: ETL에서는 스테이징 영역(별도 서버)에서 변환이 일어난 후 웨어하우스에 적재된다. ELT에서는 원시 데이터를 먼저 웨어하우스에 적재한 후 웨어하우스의 컴퓨팅 파워를 활용하여 내부에서 변환한다.

Q3. Kafka에서 Consumer Group의 역할은?

A: Consumer Group은 여러 Consumer가 하나의 토픽을 분담하여 처리할 수 있게 한다. 같은 그룹 내의 Consumer들은 각각 다른 파티션을 담당하여 병렬 처리가 가능하다. 이를 통해 수평 확장이 용이해진다.

Q4. 메달리온 아키텍처의 세 계층은?

A: Bronze(원시 데이터 그대로 저장), Silver(정제 및 검증 완료된 데이터), Gold(비즈니스 로직과 집계가 적용된 분석용 데이터)이다.

Q5. Airflow에서 DAG의 역할은?

A: DAG(Directed Acyclic Graph)는 작업(Task)들의 실행 순서와 의존성을 정의하는 비순환 방향 그래프이다. 어떤 작업이 어떤 작업 이후에 실행되어야 하는지를 선언적으로 표현한다.

Data Engineering Fundamentals — ETL, Data Warehouses, Streaming, and Data Lakes

Table of Contents

  1. What Is Data Engineering
  2. Data Architecture
  3. ETL vs ELT
  4. Batch Processing
  5. Stream Processing
  6. Workflow Orchestration
  7. Data Warehouses
  8. Data Quality
  9. Data Governance
  10. End-to-End Pipeline Example

1. What Is Data Engineering

Data engineering is the discipline of collecting, transforming, and storing raw data so that it becomes accessible and usable for analysis. Before data scientists can build models or analysts can derive insights, the data must first be clean, reliable, and available. The data engineer is responsible for making that happen.

Role Comparison

RolePrimary ResponsibilitiesCore Skills
Data EngineerPipeline construction, infrastructure management, data transformationPython, SQL, Spark, Kafka, Airflow
Data ScientistModeling, prediction, experiment designPython, R, TensorFlow, Statistics
Data AnalystReporting, dashboards, business insightsSQL, Tableau, Excel, BI tools

A data engineer is the person who builds trustworthy data infrastructure. Even the most sophisticated model is worthless if the data pipeline feeding it is unreliable.

Core Competencies of a Data Engineer

  • SQL proficiency: Complex joins, window functions, CTEs, and optimization
  • Programming: Python is the de facto standard; Scala and Java are also used in the Spark ecosystem
  • Distributed systems: Partitioning, replication, the CAP theorem, and related concepts
  • Cloud services: Fluency with data-related services on AWS, GCP, or Azure
  • Data modeling: Normalization, denormalization, and dimensional modeling

2. Data Architecture

Modern data architectures fall into four main patterns.

2.1 Data Lake

A data lake is a large-scale repository that stores structured, semi-structured, and unstructured data in its raw form. Schema is applied at read time (Schema-on-Read).

Raw Data Store (Data Lake)
├── Structured data (CSV, Parquet, ORC)
├── Semi-structured data (JSON, XML, Avro)
└── Unstructured data (images, logs, text)

Pros: Cheap storage, flexible schema, accepts all data formats Cons: Hard to manage; can devolve into a "data swamp" without governance

Key technologies: AWS S3, Azure Data Lake Storage, Google Cloud Storage

2.2 Data Warehouse

A data warehouse is a structured data store optimized for analytics. Schema is applied at write time (Schema-on-Write).

Analytics Store (Data Warehouse)
├── Fact tables (sales, orders, clicks)
├── Dimension tables (users, products, dates)
└── Aggregate tables (daily/monthly summaries)

Pros: Fast query performance, consistent schema, ACID transactions Cons: Limited handling of unstructured data, schema changes are costly

Key technologies: Snowflake, BigQuery, Amazon Redshift

2.3 Lakehouse

A lakehouse combines the flexibility of a data lake with the management features of a data warehouse.

Key capabilities:

  • ACID transaction support
  • Schema enforcement and evolution
  • SQL analytics on top of a data lake
  • Unified streaming and batch processing

Key technologies: Delta Lake, Apache Iceberg, Apache Hudi

2.4 Medallion Architecture

The medallion architecture organizes data into three progressive layers.

Bronze (Raw Data)
Ingested as-is from source systems
Minimal transformation applied

Silver (Cleansed Data)
Deduplication, type casting, validation complete
Ready for business logic but not yet aggregated

Gold (Business Data)
Aggregation, joining, business rules applied
Consumed directly by dashboards and ML models

This pattern was popularized by Databricks and offers the advantage of guaranteeing data quality at each stage.


3. ETL vs ELT

3.1 ETL (Extract, Transform, Load)

ETL is the traditional approach to data integration.

Source[Extract][Transform][Load]Warehouse
  1. Extract: Pull data from source systems
  2. Transform: Cleanse, transform, and aggregate in a staging area
  3. Load: Write the transformed data to the target system

3.2 ELT (Extract, Load, Transform)

ELT is the modern approach that leverages the compute power of cloud warehouses.

Source[Extract][Load]Warehouse[Transform]
  1. Extract: Pull data from sources
  2. Load: Write raw data directly into the warehouse
  3. Transform: Use SQL inside the warehouse to transform

3.3 When to Use Which

CriterionETLELT
Data volumeSmall to mediumLarge
Transformation complexityComplex business logicExpressible in SQL
InfrastructureOn-premisesCloud
Data securitySensitive data needs pre-maskingWarehouse-level access control suffices
Key toolsInformatica, Talenddbt, Fivetran, Airbyte

3.4 Key Tools

dbt (data build tool): A SQL-based transformation tool that handles the T in ELT.

-- dbt model example: daily revenue aggregation
-- models/marts/daily_revenue.sql

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

payments AS (
    SELECT * FROM {{ ref('stg_payments') }}
)

SELECT
    o.order_date,
    COUNT(DISTINCT o.order_id) AS total_orders,
    SUM(p.amount) AS total_revenue,
    AVG(p.amount) AS avg_order_value
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.status = 'completed'
GROUP BY o.order_date

Airbyte: An open-source data integration platform with over 300 connectors.

Fivetran: A managed data integration service known for easy setup and reliability.


4. Batch Processing

Batch processing is the approach of processing accumulated large volumes of data all at once. It is suitable for the majority of analytics workloads where real-time latency is not required.

4.1 Apache Spark

Apache Spark is a unified analytics engine for large-scale data processing.

Key features:

  • In-memory processing: Up to 100x faster than MapReduce
  • Unified API: Batch, streaming, ML, and graph processing in one framework
  • Multi-language support: Python (PySpark), Scala, Java, R, SQL
# PySpark example: daily revenue aggregation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count

spark = SparkSession.builder \
    .appName("DailyRevenue") \
    .getOrCreate()

# Read data
orders = spark.read.parquet("s3://data-lake/orders/")
payments = spark.read.parquet("s3://data-lake/payments/")

# Transform
daily_revenue = (
    orders
    .join(payments, "order_id")
    .filter(col("status") == "completed")
    .groupBy("order_date")
    .agg(
        count("order_id").alias("total_orders"),
        spark_sum("amount").alias("total_revenue")
    )
    .orderBy("order_date")
)

# Save results
daily_revenue.write \
    .mode("overwrite") \
    .parquet("s3://data-warehouse/daily_revenue/")

4.2 Spark SQL and DataFrames

Spark SQL lets analysts who are comfortable with SQL process large-scale data.

# Register DataFrames and use SQL
orders.createOrReplaceTempView("orders")
payments.createOrReplaceTempView("payments")

result = spark.sql("""
    SELECT
        o.order_date,
        COUNT(DISTINCT o.order_id) AS total_orders,
        SUM(p.amount) AS total_revenue
    FROM orders o
    JOIN payments p ON o.order_id = p.order_id
    WHERE p.status = 'completed'
    GROUP BY o.order_date
    ORDER BY o.order_date
""")

4.3 MapReduce vs Spark

CriterionMapReduceSpark
SpeedDisk-based, slowIn-memory, fast
Programming modelMap and Reduce phases onlyRich set of transformations
Real-time processingNot supportedStructured Streaming
Learning curveSteepRelatively gentle
EcosystemHadoop ecosystemStandalone + Hadoop compatible

5. Stream Processing

Stream processing is the approach of processing data in real time as it is generated.

5.1 Apache Kafka

Kafka is a distributed event streaming platform. It serves as the backbone for real-time data pipelines and streaming applications.

Core concepts:

  • Topic: A category to which messages are published
  • Producer: An entity that publishes messages to a topic
  • Consumer: An entity that subscribes to messages from a topic
  • Broker: A server that stores and delivers messages
  • Partition: A subdivision of a topic for parallel processing
  • Consumer Group: Multiple consumers sharing the load of a topic
# Kafka Producer example (Python)
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Publish an order event
event = {
    "order_id": "ORD-12345",
    "user_id": "USR-678",
    "amount": 45000,
    "timestamp": "2026-04-13T10:30:00Z"
}

producer.send('order-events', value=event)
producer.flush()
# Kafka Consumer example (Python)
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'order-events',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processing-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

for message in consumer:
    order = message.value
    print(f"Processing order: {order['order_id']}, amount: {order['amount']}")

5.2 CDC (Change Data Capture)

CDC is a technique that captures database changes in real time and propagates them to other systems.

Operational DB[CDC]Kafka[Stream Processing]Warehouse
Search engine
Cache

Key tool: Debezium. It captures change events from MySQL, PostgreSQL, MongoDB, and more, then streams them to Kafka.

{
  "before": null,
  "after": {
    "id": 1001,
    "name": "John Doe",
    "email": "john@example.com"
  },
  "source": {
    "connector": "postgresql",
    "db": "users_db",
    "table": "users"
  },
  "op": "c",
  "ts_ms": 1681364400000
}

The JSON above shows an example CDC event captured by Debezium. The "op": "c" field indicates an INSERT operation.

Flink is a stateful stream processing engine. It guarantees exactly-once processing semantics and excels at event-time-based windowed operations.

// Flink stream processing example (Java)
DataStream<OrderEvent> orders = env
    .addSource(new FlinkKafkaConsumer<>(
        "order-events",
        new OrderEventSchema(),
        kafkaProps
    ));

// 5-minute tumbling window aggregation
DataStream<WindowedRevenue> revenue = orders
    .keyBy(OrderEvent::getCategory)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new RevenueAggregator());

revenue.addSink(new JdbcSink<>(...));

5.4 Batch vs Streaming Comparison

CriterionBatch ProcessingStream Processing
LatencyMinutes to hoursMilliseconds to seconds
Data completenessComplete datasetArrives incrementally
ComplexityRelatively simpleComplex (state management, ordering)
CostRelatively cheapAlways running, higher cost
Best forDaily reports, ML trainingReal-time dashboards, anomaly detection

6. Workflow Orchestration

6.1 Apache Airflow

Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. Complex data pipelines are defined as DAGs (Directed Acyclic Graphs).

Core concepts:

  • DAG: A directed acyclic graph defining task execution order and dependencies
  • Operator: The unit that performs actual work (BashOperator, PythonOperator, etc.)
  • Task: An individual work instance within a DAG
  • Sensor: A special Operator that waits until a condition is met
  • XCom: A mechanism for passing data between Tasks
# Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['team@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_revenue_pipeline',
    default_args=default_args,
    description='Daily revenue data processing pipeline',
    schedule_interval='0 2 * * *',  # Every day at 2 AM
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['revenue', 'daily'],
) as dag:

    # 1. Verify source data exists
    check_source = S3KeySensor(
        task_id='check_source_data',
        bucket_name='raw-data-bucket',
        bucket_key='orders/{{ ds }}/*.parquet',
        timeout=3600,
        poke_interval=300,
    )

    # 2. Extract data
    extract = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders_from_source,
    )

    # 3. Transform data
    transform = PythonOperator(
        task_id='transform_orders',
        python_callable=transform_and_aggregate,
    )

    # 4. Load to warehouse
    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_snowflake,
    )

    # 5. Data quality validation
    validate = PythonOperator(
        task_id='validate_data_quality',
        python_callable=run_quality_checks,
    )

    # Define dependencies
    check_source >> extract >> transform >> load >> validate

6.2 DAG Design Best Practices

  1. Idempotency: Running a task multiple times should produce the same result
  2. Atomicity: Each Task should perform a single logical unit of work
  3. Retry strategy: Configure retries to handle transient failures
  4. Monitoring: Set SLAs, failure alerts, and execution-time tracking
  5. Testing: DAG structure tests and Task-level unit tests are essential

6.3 Other Orchestration Tools

ToolCharacteristicsBest For
Apache AirflowPython-based, rich ecosystemComplex batch pipelines
PrefectModern API, dynamic workflowsFlexible workflow needs
DagsterData-asset-centric, strong typingData quality focused
MageNotebook-style interfaceRapid prototyping
AWS Step FunctionsServerless, AWS-nativeAWS-centric architectures

7. Data Warehouses

7.1 Major Cloud Warehouses

Snowflake

  • Separated compute and storage architecture
  • Multi-cloud support (AWS, Azure, GCP)
  • Unique features like Time Travel and Zero-Copy Clone
  • Strong concurrency handling

Google BigQuery

  • Serverless architecture (no infrastructure management)
  • Pay-per-query pricing model
  • Create ML models with SQL (BigQuery ML)
  • Optimized for large-scale analytics

Amazon Redshift

  • Deep integration with the AWS ecosystem
  • Serverless option via Redshift Serverless
  • Query S3 data directly with Redshift Spectrum
  • Compatible with existing PostgreSQL tools

7.2 Dimensional Modeling

The most widely used modeling techniques in data warehouses are the star schema and the snowflake schema.

Star Schema

A central fact table surrounded by dimension tables connected directly to it.

-- Fact table
CREATE TABLE fact_sales (
    sale_id         BIGINT PRIMARY KEY,
    date_key        INT REFERENCES dim_date(date_key),
    product_key     INT REFERENCES dim_product(product_key),
    customer_key    INT REFERENCES dim_customer(customer_key),
    store_key       INT REFERENCES dim_store(store_key),
    quantity        INT,
    unit_price      DECIMAL(10,2),
    total_amount    DECIMAL(12,2),
    discount_amount DECIMAL(10,2)
);

-- Dimension tables
CREATE TABLE dim_product (
    product_key     INT PRIMARY KEY,
    product_id      VARCHAR(50),
    product_name    VARCHAR(200),
    category        VARCHAR(100),
    subcategory     VARCHAR(100),
    brand           VARCHAR(100),
    unit_cost       DECIMAL(10,2)
);

CREATE TABLE dim_date (
    date_key        INT PRIMARY KEY,
    full_date       DATE,
    year            INT,
    quarter         INT,
    month           INT,
    week            INT,
    day_of_week     VARCHAR(20),
    is_holiday      BOOLEAN
);

Snowflake Schema

A variation where dimension tables are further normalized into sub-tables. It saves storage compared to a star schema but increases query complexity due to additional joins.

7.3 SCD (Slowly Changing Dimension)

Methods for tracking historical changes in dimension data.

TypeDescriptionExample
SCD Type 1Overwrite with new valuePhone number change: old number deleted
SCD Type 2Add new row preserving historyAddress change: new row with validity period
SCD Type 3Separate columns for current and previouscurrent_address, previous_address columns

8. Data Quality

Data quality is the critical factor that determines pipeline reliability. "Garbage In, Garbage Out" is a long-standing adage in the data world.

8.1 Six Dimensions of Data Quality

  1. Accuracy: Does the data correctly reflect real-world values?
  2. Completeness: Is there any missing data?
  3. Consistency: Is the data free of contradictions across systems?
  4. Timeliness: Is the data available when needed?
  5. Uniqueness: Is the data free of duplicates?
  6. Validity: Does the data conform to defined rules and formats?

8.2 Great Expectations

Great Expectations is an open-source library for data validation, documentation, and profiling.

import great_expectations as gx

# Create data context
context = gx.get_context()

# Connect data source
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset(name="orders")

# Define expectations
batch = data_asset.build_batch_request(dataframe=df)
validator = context.get_validator(batch_request=batch)

# Data quality rules
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between(
    "amount", min_value=0, max_value=1000000
)
validator.expect_column_values_to_be_in_set(
    "status", ["pending", "completed", "cancelled", "refunded"]
)
validator.expect_column_values_to_match_regex(
    "email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)

# Run validation
results = validator.validate()
print(f"Success: {results.success}")

8.3 Data Observability

Data observability is the practice of continuously monitoring data systems to detect issues early.

Key metrics:

  • Freshness: How up-to-date is the data?
  • Volume: Changes in data volume relative to expectations
  • Schema: Detecting schema changes
  • Distribution: Spotting anomalies in data distributions
  • Lineage: Tracing data origin and flow

Key tools: Monte Carlo, Atlan, Soda, Elementary


9. Data Governance

9.1 Metadata Management

Metadata is "data about data" and falls into three categories:

  • Technical metadata: Schema, data types, partition info, storage locations
  • Business metadata: Data definitions, owners, SLAs, business rules
  • Operational metadata: Processing times, row counts, error logs, access history

9.2 Data Catalog

A data catalog is a tool that enables discovery, exploration, and understanding of all data assets within an organization.

Key capabilities:

  • Automatic metadata collection and indexing
  • Data lineage visualization
  • Data dictionary management
  • Tagging and classification systems
  • Collaboration features (comments, ratings, wikis)

Key tools: Apache Atlas, DataHub, Atlan, Alation

9.3 Access Control

Core principles of data access control:

  1. Principle of Least Privilege: Grant only the minimum permissions needed
  2. Role-Based Access Control (RBAC): Manage permissions by role
-- Snowflake RBAC example
CREATE ROLE data_analyst;
CREATE ROLE data_engineer;
CREATE ROLE data_admin;

-- Analyst role: read-only access
GRANT USAGE ON DATABASE analytics_db TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics_db.gold TO ROLE data_analyst;

-- Engineer role: read/write access
GRANT ALL PRIVILEGES ON DATABASE analytics_db TO ROLE data_engineer;
GRANT ALL PRIVILEGES ON ALL SCHEMAS IN DATABASE analytics_db TO ROLE data_engineer;

-- Assign roles to users
GRANT ROLE data_analyst TO USER analyst_kim;
GRANT ROLE data_engineer TO USER engineer_park;
  1. Row-Level Security (RLS): Restrict which rows a user can access
  2. Column-Level Security (CLS): Restrict access to sensitive columns (including masking)
  3. Audit Logging: Record all data access history

10. End-to-End Pipeline Example

10.1 Overall Architecture

Here is an example of an end-to-end data pipeline in a production environment.

[Source Systems]
  ├── Operational DB (PostgreSQL)  ── Debezium CDC ──┐
  ├── Web Events (Clickstream) ─── SDK ──────────────┤
  ├── External APIs ─── Airbyte ─────────────────────┤
  └── Files (CSV/Excel) ─── Airflow ─────────────────┤
[Message Broker]  └── Apache Kafka <──────────────────────────────────┘
       ├── Real-time path: FlinkReal-time dashboards
       └── Batch path:     SparkS3 (Data Lake)
[Transformation Layer]  └── dbt (Silver/Gold) <───────────┘
[Warehouse]
  └── Snowflake
       ├── Gold layer → Looker/Tableau dashboards
       └── ML Feature StoreModel training

10.2 Orchestrating with Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'execution_timeout': timedelta(hours=2),
}

with DAG(
    dag_id='e2e_data_pipeline',
    default_args=default_args,
    schedule_interval='0 3 * * *',
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # Stage 1: Process raw data with Spark
    spark_process = SparkSubmitOperator(
        task_id='spark_raw_processing',
        application='s3://scripts/process_raw_data.py',
        conn_id='spark_default',
        conf={
            'spark.executor.memory': '4g',
            'spark.executor.cores': '2',
        },
    )

    # Stage 2: Transform with dbt
    dbt_transform = DbtCloudRunJobOperator(
        task_id='dbt_transform',
        job_id=12345,
        check_interval=30,
        timeout=3600,
    )

    # Stage 3: Data quality validation
    quality_check = PythonOperator(
        task_id='data_quality_check',
        python_callable=run_great_expectations_suite,
    )

    # Stage 4: Send completion notification
    notify = SlackWebhookOperator(
        task_id='slack_notification',
        slack_webhook_conn_id='slack_webhook',
        message='Daily pipeline completed successfully.',
    )

    spark_process >> dbt_transform >> quality_check >> notify

10.3 Kafka + Spark Real-Time Processing

# Spark Structured Streaming + Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("RealTimeRevenue") \
    .getOrCreate()

# Define Kafka schema
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("user_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("category", StringType()),
    StructField("timestamp", TimestampType()),
])

# Read streaming data from Kafka
raw_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "order-events")
    .option("startingOffsets", "latest")
    .load()
)

# Parse JSON
orders = (
    raw_stream
    .select(from_json(
        col("value").cast("string"),
        order_schema
    ).alias("data"))
    .select("data.*")
)

# 5-minute windowed aggregation
windowed_revenue = (
    orders
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window("timestamp", "5 minutes"),
        "category"
    )
    .agg(spark_sum("amount").alias("revenue"))
)

# Output to console (in production, write to a DB or dashboard)
query = (
    windowed_revenue.writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="1 minute")
    .start()
)

query.awaitTermination()

Summary

Data engineering is a field that encompasses a wide range of technologies and concepts. Here is a recap of the topics covered in this post.

AreaCore ConceptsKey Tools
Data StorageLake, Warehouse, LakehouseS3, Snowflake, Delta Lake
Data IntegrationETL/ELTdbt, Airbyte, Fivetran
Batch ProcessingDistributed computingApache Spark
Stream ProcessingEvent streaming, CDCApache Kafka, Flink, Debezium
OrchestrationWorkflow managementApache Airflow, Dagster
Data QualityValidation, ObservabilityGreat Expectations, Monte Carlo
GovernanceMetadata, Access controlDataHub, Apache Atlas

You do not need to learn every technology at once. Starting with SQL and Python and building a single pipeline from scratch is the most effective way to learn. Begin with a small project and expand incrementally from there.

Quiz: Data Engineering Fundamentals

Q1. What is the biggest difference between a data lake and a data warehouse?

A: A data lake uses a Schema-on-Read approach, storing raw data and applying schema at read time. A data warehouse uses Schema-on-Write, enforcing a predefined schema when data is stored.

Q2. Where does transformation happen in ETL vs ELT?

A: In ETL, transformation occurs in a staging area (a separate server) before loading into the warehouse. In ELT, raw data is loaded into the warehouse first, then transformed inside the warehouse using its compute power.

Q3. What is the role of a Consumer Group in Kafka?

A: A Consumer Group allows multiple consumers to share the processing load of a topic. Consumers within the same group each handle different partitions, enabling parallel processing and horizontal scaling.

Q4. What are the three layers of the Medallion Architecture?

A: Bronze (raw data stored as-is), Silver (cleansed and validated data), and Gold (aggregated data with business logic applied, ready for analytics).

Q5. What is the role of a DAG in Airflow?

A: A DAG (Directed Acyclic Graph) defines the execution order and dependencies of tasks. It declaratively specifies which tasks must run after which other tasks.