Skip to content

필사 모드: 데이터 엔지니어링 파이프라인 완전 가이드 2025: ETL/ELT, Spark, Airflow, 실시간 스트리밍

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

1. 데이터 엔지니어링 개요

데이터 엔지니어의 역할

데이터 엔지니어는 조직의 데이터 인프라를 설계, 구축, 유지보수하는 역할을 담당합니다. 데이터 과학자가 분석할 수 있도록 데이터를 수집, 변환, 저장하는 파이프라인을 만드는 것이 핵심 업무입니다.

데이터 엔지니어의 핵심 역할

├── 데이터 수집 (Ingestion)

│ ├── API, DB, 파일, 스트림에서 데이터 추출

│ └── 다양한 소스를 통합

├── 데이터 변환 (Transformation)

│ ├── 정제, 정규화, 집계

│ └── 비즈니스 로직 적용

├── 데이터 저장 (Storage)

│ ├── 데이터 웨어하우스 설계

│ └── 데이터 레이크 구축

├── 파이프라인 오케스트레이션

│ ├── 워크플로 자동화

│ └── 스케줄링 및 모니터링

└── 데이터 품질 관리

├── 데이터 검증

└── 데이터 계약 (Data Contract)

필수 기술 스택

프로그래밍 : Python, SQL, Scala/Java

데이터 처리 : Spark, Flink, Beam

오케스트레이션 : Airflow, Dagster, Prefect

스트리밍 : Kafka, Kinesis, Pub/Sub

변환 도구 : dbt, Dataform

클라우드 : AWS(Redshift, Glue), GCP(BigQuery), Azure(Synapse)

컨테이너 : Docker, Kubernetes

IaC : Terraform, Pulumi

모니터링 : Datadog, Grafana, Monte Carlo

2. ETL vs ELT

전통적 ETL

ETL은 Extract(추출), Transform(변환), Load(적재)의 약자로, 데이터를 소스에서 추출한 후 변환 서버에서 가공하여 최종 저장소에 적재합니다.

ETL 흐름:

소스 DB ──Extract──→ 변환 서버 ──Transform──→ Load──→ 데이터 웨어하우스

(ETL 서버)

전통적 ETL 예시 (Python)

from sqlalchemy import create_engine

Extract: 소스 DB에서 데이터 추출

source_engine = create_engine('postgresql://source_db:5432/sales')

raw_data = pd.read_sql('SELECT * FROM orders WHERE date >= %s', source_engine, params=['2025-01-01'])

Transform: 데이터 변환

transformed = raw_data.copy()

transformed['total_with_tax'] = transformed['total'] * 1.1

transformed['order_month'] = pd.to_datetime(transformed['order_date']).dt.to_period('M')

transformed = transformed.dropna(subset=['customer_id'])

transformed = transformed[transformed['total'] > 0]

Load: 웨어하우스에 적재

wh_engine = create_engine('postgresql://warehouse:5432/analytics')

transformed.to_sql('fact_orders', wh_engine, if_exists='append', index=False)

현대적 ELT

ELT는 Extract(추출), Load(적재), Transform(변환)의 순서로, 원본 데이터를 먼저 웨어하우스에 적재한 후 웨어하우스 내에서 변환합니다.

ELT 흐름:

소스 DB ──Extract──→ Load──→ 데이터 웨어하우스 ──Transform──→ 분석용 테이블

(dbt 등으로 변환)

-- ELT: 웨어하우스 내에서 dbt로 변환

-- models/marts/fact_orders.sql

WITH source_orders AS (

SELECT * FROM raw.orders

WHERE order_date >= '2025-01-01'

),

cleaned AS (

SELECT

order_id,

customer_id,

total,

total * 1.1 AS total_with_tax,

DATE_TRUNC('month', order_date) AS order_month,

order_date

FROM source_orders

WHERE customer_id IS NOT NULL

AND total > 0

)

SELECT * FROM cleaned

ETL vs ELT 비교표

| 항목 | ETL | ELT |

|------|-----|-----|

| 변환 위치 | 별도 서버 | 웨어하우스 내부 |

| 확장성 | ETL 서버 성능에 의존 | 웨어하우스 컴퓨팅 활용 |

| 원본 데이터 | 변환 후 원본 유실 가능 | 원본 보존 |

| 비용 | ETL 서버 운영 비용 | 웨어하우스 컴퓨팅 비용 |

| 유연성 | 변환 로직 변경 시 재처리 | SQL로 유연하게 재변환 |

| 대표 도구 | Informatica, Talend | dbt, Dataform |

| 적합한 경우 | 레거시 시스템, 규제 요건 | 클라우드 네이티브, 빅데이터 |

3. 배치 vs 스트림 처리

배치 처리 (Batch Processing)

일정 주기로 대량의 데이터를 한꺼번에 처리하는 방식입니다.

배치 처리:

[데이터 축적] ──→ [일괄 처리] ──→ [결과 저장]

(1시간/1일) (Spark 등) (웨어하우스)

특징:

- 높은 처리량 (Throughput)

- 지연 시간이 김 (분~시간)

- 비용 효율적

- 재처리 용이

스트림 처리 (Stream Processing)

데이터가 도착하는 즉시 실시간으로 처리하는 방식입니다.

스트림 처리:

[이벤트 발생] ──→ [즉시 처리] ──→ [실시간 결과]

(연속적) (Flink 등) (대시보드/알림)

특징:

- 낮은 지연 시간 (밀리초~초)

- 연속 처리

- 복잡한 장애 처리

- 이벤트 순서 관리 필요

선택 기준

배치를 선택하는 경우:

- 일/주/월 단위 보고서

- 대규모 데이터 집계

- 비용 최적화 우선

- 실시간 필요 없음

스트림을 선택하는 경우:

- 실시간 대시보드

- 사기 탐지

- 실시간 추천

- IoT 센서 데이터

- 알림/알럿

하이브리드 (Lambda/Kappa):

- 배치 + 스트림 동시 운영

- 실시간 근사치 + 배치 정확 결과

4. Apache Spark

4.1 Spark 개요

Apache Spark는 대규모 데이터 처리를 위한 통합 분석 엔진입니다. 인메모리 연산으로 MapReduce 대비 100배 빠른 성능을 제공합니다.

Spark 아키텍처:

┌─────────────────────────────────┐

│ Spark Application │

├─────────────────────────────────┤

│ SparkSQL │ Streaming │ MLlib │

├─────────────────────────────────┤

│ DataFrame / Dataset │

├─────────────────────────────────┤

│ RDD (Core Engine) │

├─────────────────────────────────┤

│ Standalone │ YARN │ Mesos │ K8s│

└─────────────────────────────────┘

4.2 PySpark 기본

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, sum, avg, count, when, lit

SparkSession 생성

spark = SparkSession.builder \

.appName("DataPipeline") \

.config("spark.sql.adaptive.enabled", "true") \

.config("spark.sql.shuffle.partitions", "200") \

.getOrCreate()

데이터 읽기

orders_df = spark.read \

.option("header", "true") \

.option("inferSchema", "true") \

.csv("s3://data-lake/raw/orders/")

users_df = spark.read.parquet("s3://data-lake/raw/users/")

데이터 변환

result = orders_df \

.filter(col("status") == "completed") \

.join(users_df, orders_df.user_id == users_df.id, "inner") \

.groupBy("user_id", "username") \

.agg(

count("order_id").alias("total_orders"),

sum("amount").alias("total_spent"),

avg("amount").alias("avg_order_value")

) \

.filter(col("total_orders") > 5)

결과 저장

result.write \

.mode("overwrite") \

.partitionBy("order_month") \

.parquet("s3://data-lake/processed/user_summary/")

4.3 SparkSQL

임시 뷰 등록

orders_df.createOrReplaceTempView("orders")

users_df.createOrReplaceTempView("users")

SQL로 분석

monthly_revenue = spark.sql("""

SELECT

DATE_TRUNC('month', order_date) AS month,

COUNT(DISTINCT user_id) AS unique_customers,

COUNT(*) AS total_orders,

SUM(amount) AS revenue,

AVG(amount) AS avg_order_value

FROM orders

WHERE status = 'completed'

GROUP BY DATE_TRUNC('month', order_date)

ORDER BY month

""")

monthly_revenue.show()

4.4 파티셔닝과 캐싱

파티셔닝 최적화

파티션 수 확인

print(f"Partitions: {orders_df.rdd.getNumPartitions()}")

재파티셔닝 (셔플 발생)

orders_repartitioned = orders_df.repartition(100, "order_date")

합병 (셔플 없이 파티션 수 줄이기)

orders_coalesced = orders_df.coalesce(50)

캐싱

from pyspark.storagelevel import StorageLevel

메모리 캐싱

orders_df.cache()

orders_df.count() # 캐시 트리거

메모리 + 디스크 캐싱

users_df.persist(StorageLevel.MEMORY_AND_DISK)

캐시 해제

orders_df.unpersist()

4.5 Spark 성능 튜닝

브로드캐스트 조인 (작은 테이블)

from pyspark.sql.functions import broadcast

작은 테이블을 브로드캐스트

result = orders_df.join(

broadcast(users_df),

orders_df.user_id == users_df.id

)

AQE (Adaptive Query Execution) 활성화

spark.conf.set("spark.sql.adaptive.enabled", "true")

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

Spark 성능 최적화 체크리스트:

[ ] 적절한 파티션 수 설정 (코어 수의 2~3배)

[ ] 데이터 스큐 처리 (salting, AQE)

[ ] 브로드캐스트 조인 활용

[ ] 불필요한 셔플 최소화

[ ] 컬럼 프루닝 (필요한 컬럼만 선택)

[ ] Predicate Pushdown 활용

[ ] 캐싱 전략 수립

[ ] 직렬화 포맷 최적화 (Parquet, ORC)

5. Apache Airflow

5.1 Airflow 개요

Apache Airflow는 워크플로를 프로그래밍 방식으로 작성, 스케줄링, 모니터링하기 위한 플랫폼입니다. DAG(방향 비순환 그래프)로 태스크 간의 의존 관계를 정의합니다.

Airflow 아키텍처:

┌──────────────────────────────────┐

│ Web Server │

│ (UI / REST API) │

├──────────────────────────────────┤

│ Scheduler │

│ (DAG 파싱, 태스크 스케줄링) │

├──────────────────────────────────┤

│ Executor │

│ (Local/Celery/Kubernetes) │

├──────────────────────────────────┤

│ Metadata Database │

│ (PostgreSQL/MySQL) │

└──────────────────────────────────┘

5.2 DAG 작성

dags/etl_pipeline.py

from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.operators.bash import BashOperator

from airflow.providers.postgres.operators.postgres import PostgresOperator

from airflow.sensors.filesystem import FileSensor

default_args = {

'owner': 'data-team',

'depends_on_past': False,

'email_on_failure': True,

'email': ['data-team@company.com'],

'retries': 3,

'retry_delay': timedelta(minutes=5),

'execution_timeout': timedelta(hours=2),

}

with DAG(

dag_id='daily_etl_pipeline',

default_args=default_args,

description='일별 ETL 파이프라인',

schedule_interval='0 6 * * *', # 매일 오전 6시

start_date=datetime(2025, 1, 1),

catchup=False,

tags=['etl', 'daily'],

max_active_runs=1,

) as dag:

센서: 파일 도착 대기

wait_for_data = FileSensor(

task_id='wait_for_data',

filepath='/data/raw/daily_export.csv',

poke_interval=300, # 5분마다 확인

timeout=3600, # 최대 1시간 대기

mode='poke',

)

추출

def extract_data(**context):

execution_date = context['ds']

df = pd.read_csv(f'/data/raw/daily_export_{execution_date}.csv')

df.to_parquet(f'/data/staging/extract_{execution_date}.parquet')

return len(df)

extract = PythonOperator(

task_id='extract',

python_callable=extract_data,

)

변환

def transform_data(**context):

execution_date = context['ds']

df = pd.read_parquet(f'/data/staging/extract_{execution_date}.parquet')

데이터 정제

df = df.dropna(subset=['customer_id', 'amount'])

df = df[df['amount'] > 0]

df['amount_with_tax'] = df['amount'] * 1.1

df.to_parquet(f'/data/staging/transform_{execution_date}.parquet')

return len(df)

transform = PythonOperator(

task_id='transform',

python_callable=transform_data,

)

적재

load = PostgresOperator(

task_id='load',

postgres_conn_id='warehouse',

sql='sql/load_daily_orders.sql',

)

데이터 품질 검증

def validate_data(**context):

execution_date = context['ds']

레코드 수 검증, NULL 체크, 범위 검증

row_count = context['ti'].xcom_pull(task_ids='transform')

if row_count < 100:

raise ValueError(f'레코드 수 부족: {row_count}')

validate = PythonOperator(

task_id='validate',

python_callable=validate_data,

)

알림

notify = BashOperator(

task_id='notify',

bash_command='echo "ETL 완료: {{ ds }}" | mail -s "ETL Success" team@company.com',

)

의존 관계 정의

wait_for_data >> extract >> transform >> load >> validate >> notify

5.3 TaskFlow API (Airflow 2.x)

from airflow.decorators import dag, task

from datetime import datetime

@dag(

schedule_interval='@daily',

start_date=datetime(2025, 1, 1),

catchup=False,

tags=['etl'],

)

def modern_etl_pipeline():

@task()

def extract():

"""소스에서 데이터 추출"""

df = pd.read_csv('/data/raw/orders.csv')

return df.to_dict()

@task()

def transform(raw_data: dict):

"""데이터 변환 및 정제"""

df = pd.DataFrame(raw_data)

df = df[df['amount'] > 0]

df['processed_at'] = datetime.now().isoformat()

return df.to_dict()

@task()

def load(transformed_data: dict):

"""웨어하우스에 적재"""

df = pd.DataFrame(transformed_data)

웨어하우스에 적재 로직

print(f"Loaded {len(df)} records")

자동 의존 관계 설정

raw = extract()

transformed = transform(raw)

load(transformed)

DAG 인스턴스화

modern_etl_pipeline()

5.4 Connections, Variables, XCom

Connections: UI 또는 CLI로 설정

airflow connections add 'warehouse' \

--conn-type 'postgres' \

--conn-host 'warehouse.example.com' \

--conn-port 5432 \

--conn-login 'etl_user' \

--conn-password 'secret'

Variables

from airflow.models import Variable

env = Variable.get("environment", default_var="dev")

config = Variable.get("pipeline_config", deserialize_json=True)

XCom: 태스크 간 데이터 전달

def producer_task(**context):

context['ti'].xcom_push(key='row_count', value=1000)

def consumer_task(**context):

row_count = context['ti'].xcom_pull(

task_ids='producer',

key='row_count'

)

print(f"이전 태스크 결과: {row_count}행")

5.5 동적 DAG 생성

dags/dynamic_dag_factory.py

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime

설정 기반 동적 DAG 생성

configs = [

{"name": "sales", "source": "mysql", "schedule": "@daily"},

{"name": "users", "source": "postgres", "schedule": "@hourly"},

{"name": "logs", "source": "s3", "schedule": "0 */6 * * *"},

]

def create_etl_dag(config):

dag = DAG(

dag_id=f"etl_{config['name']}",

schedule_interval=config['schedule'],

start_date=datetime(2025, 1, 1),

catchup=False,

)

def process(**kwargs):

print(f"Processing {config['name']} from {config['source']}")

with dag:

PythonOperator(

task_id='process',

python_callable=process,

)

return dag

for config in configs:

globals()[f"etl_{config['name']}"] = create_etl_dag(config)

6. 실시간 스트리밍

6.1 Apache Kafka

Kafka는 분산 이벤트 스트리밍 플랫폼으로, 대규모 실시간 데이터 파이프라인의 핵심입니다.

Kafka 아키텍처:

Producer ──→ Broker(Topic/Partition) ──→ Consumer Group

├── Partition 0: [msg1, msg2, msg3...]

├── Partition 1: [msg4, msg5, msg6...]

└── Partition 2: [msg7, msg8, msg9...]

Kafka Producer (Python)

from confluent_kafka import Producer

config = {

'bootstrap.servers': 'kafka-broker:9092',

'acks': 'all',

'retries': 3,

'linger.ms': 10,

'batch.size': 16384,

}

producer = Producer(config)

def delivery_callback(err, msg):

if err:

print(f'전송 실패: {err}')

else:

print(f'전송 완료: {msg.topic()} [{msg.partition()}]')

메시지 전송

for i in range(100):

event = {

'user_id': f'user_{i}',

'action': 'page_view',

'timestamp': '2025-03-25T10:00:00Z',

'page': '/products'

}

producer.produce(

topic='user-events',

key=str(event['user_id']),

value=json.dumps(event),

callback=delivery_callback

)

producer.flush()

Kafka Consumer (Python)

from confluent_kafka import Consumer

config = {

'bootstrap.servers': 'kafka-broker:9092',

'group.id': 'analytics-consumer',

'auto.offset.reset': 'earliest',

'enable.auto.commit': False,

}

consumer = Consumer(config)

consumer.subscribe(['user-events'])

try:

while True:

msg = consumer.poll(timeout=1.0)

if msg is None:

continue

if msg.error():

print(f'Consumer error: {msg.error()}')

continue

event = json.loads(msg.value().decode('utf-8'))

print(f"수신: {event['user_id']} - {event['action']}")

수동 커밋

consumer.commit(asynchronous=False)

finally:

consumer.close()

6.2 Apache Flink

Flink는 상태 기반 스트림 처리 엔진으로, 정확히 한 번(exactly-once) 시맨틱스를 제공합니다.

PyFlink 스트림 처리

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()

env.set_parallelism(4)

env.enable_checkpointing(60000) # 60초마다 체크포인트

t_env = StreamTableEnvironment.create(env)

Kafka 소스 테이블 정의

t_env.execute_sql("""

CREATE TABLE user_events (

user_id STRING,

action STRING,

event_time TIMESTAMP(3),

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

) WITH (

'connector' = 'kafka',

'topic' = 'user-events',

'properties.bootstrap.servers' = 'kafka:9092',

'properties.group.id' = 'flink-processor',

'format' = 'json',

'scan.startup.mode' = 'latest-offset'

)

""")

윈도우 집계

t_env.execute_sql("""

CREATE TABLE page_view_stats (

window_start TIMESTAMP(3),

window_end TIMESTAMP(3),

page STRING,

view_count BIGINT,

unique_users BIGINT

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:postgresql://analytics-db:5432/stats',

'table-name' = 'page_view_stats',

'driver' = 'org.postgresql.Driver'

)

""")

t_env.execute_sql("""

INSERT INTO page_view_stats

SELECT

window_start,

window_end,

action AS page,

COUNT(*) AS view_count,

COUNT(DISTINCT user_id) AS unique_users

FROM TABLE(

TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)

)

GROUP BY window_start, window_end, action

""")

6.3 Exactly-Once 시맨틱스

전달 보장 수준:

At-most-once : 메시지 유실 가능, 중복 없음

At-least-once : 메시지 유실 없음, 중복 가능

Exactly-once : 메시지 유실 없음, 중복 없음 (가장 어려움)

Kafka + Flink Exactly-Once:

1. Kafka 트랜잭셔널 프로듀서

2. Flink 체크포인트 (Chandy-Lamport)

3. Two-Phase Commit Protocol

4. Kafka Consumer 오프셋을 체크포인트와 연동

7. dbt (data build tool)

7.1 dbt 개요

dbt는 ELT에서 T(Transform)를 담당하는 도구입니다. SQL로 데이터 변환 로직을 작성하고, 소프트웨어 엔지니어링 베스트 프랙티스(버전 관리, 테스트, 문서화)를 데이터 변환에 적용합니다.

dbt 프로젝트 구조:

my_dbt_project/

├── dbt_project.yml

├── profiles.yml

├── models/

│ ├── staging/

│ │ ├── stg_orders.sql

│ │ ├── stg_customers.sql

│ │ └── _staging_sources.yml

│ ├── intermediate/

│ │ └── int_order_items_grouped.sql

│ └── marts/

│ ├── dim_customers.sql

│ ├── fact_orders.sql

│ └── _marts_schema.yml

├── tests/

│ └── assert_positive_revenue.sql

├── macros/

│ └── generate_schema_name.sql

└── seeds/

└── country_codes.csv

7.2 모델 작성

-- models/staging/stg_orders.sql

WITH source AS (

SELECT * FROM {{ source('raw', 'orders') }}

),

renamed AS (

SELECT

id AS order_id,

user_id AS customer_id,

amount AS order_amount,

status AS order_status,

created_at AS ordered_at

FROM source

WHERE status != 'cancelled'

)

SELECT * FROM renamed

-- models/marts/fact_orders.sql

{{ config(materialized='incremental', unique_key='order_id') }}

WITH orders AS (

SELECT * FROM {{ ref('stg_orders') }}

),

customers AS (

SELECT * FROM {{ ref('dim_customers') }}

)

SELECT

o.order_id,

o.customer_id,

c.customer_name,

c.customer_segment,

o.order_amount,

o.order_amount * 1.1 AS amount_with_tax,

o.order_status,

o.ordered_at

FROM orders o

JOIN customers c ON o.customer_id = c.customer_id

{% if is_incremental() %}

WHERE o.ordered_at > (SELECT MAX(ordered_at) FROM {{ this }})

{% endif %}

7.3 소스와 테스트

models/staging/_staging_sources.yml

version: 2

sources:

- name: raw

database: raw_db

schema: public

tables:

- name: orders

loaded_at_field: _loaded_at

freshness:

warn_after:

count: 12

period: hour

error_after:

count: 24

period: hour

columns:

- name: id

tests:

- unique

- not_null

- name: amount

tests:

- not_null

- name: raw

tables:

- name: customers

columns:

- name: id

tests:

- unique

- not_null

models/marts/_marts_schema.yml

version: 2

models:

- name: fact_orders

description: "주문 팩트 테이블"

columns:

- name: order_id

description: "주문 고유 ID"

tests:

- unique

- not_null

- name: order_amount

tests:

- not_null

- name: customer_id

tests:

- not_null

- relationships:

to: ref('dim_customers')

field: customer_id

-- tests/assert_positive_revenue.sql

-- 커스텀 테스트: 매출이 양수인지 검증

SELECT order_id, order_amount

FROM {{ ref('fact_orders') }}

WHERE order_amount < 0

7.4 dbt 커맨드

모든 모델 빌드

dbt run

특정 모델만 빌드

dbt run --select fact_orders

모델 + 다운스트림 빌드

dbt run --select stg_orders+

테스트 실행

dbt test

문서 생성

dbt docs generate

dbt docs serve

소스 신선도 확인

dbt source freshness

시드 데이터 로드

dbt seed

전체 파이프라인 (빌드 + 테스트)

dbt build

8. 데이터 웨어하우스

비교표

| 항목 | BigQuery | Snowflake | Redshift |

|------|----------|-----------|----------|

| 벤더 | Google Cloud | Snowflake | AWS |

| 아키텍처 | 서버리스 | 컴퓨팅/스토리지 분리 | MPP 클러스터 |

| 과금 | 쿼리당 (온디맨드) | 크레딧 기반 | 노드 시간당 |

| 확장성 | 자동 | 웨어하우스 리사이징 | 노드 추가 |

| 동시성 | 2000+ 슬롯 | 멀티클러스터 | WLM 설정 |

| 반정형 데이터 | STRUCT, ARRAY | VARIANT | SUPER |

| ML 통합 | BigQuery ML | Snowpark | Redshift ML |

| 비용 효율 | 소규모에 유리 | 중규모에 유리 | 대규모 상시 운영에 유리 |

BigQuery 예시

-- BigQuery: 파티션 + 클러스터링

CREATE TABLE analytics.fact_orders

PARTITION BY DATE(ordered_at)

CLUSTER BY customer_segment, order_status

AS

SELECT

order_id,

customer_id,

customer_segment,

order_amount,

order_status,

ordered_at

FROM staging.orders;

-- 비용 예측 (dry run)

-- 1TB 스캔 = 약 $5 (온디맨드)

Snowflake 예시

-- Snowflake: 웨어하우스 생성 및 관리

CREATE WAREHOUSE etl_wh

WITH WAREHOUSE_SIZE = 'MEDIUM'

AUTO_SUSPEND = 300

AUTO_RESUME = TRUE

MIN_CLUSTER_COUNT = 1

MAX_CLUSTER_COUNT = 3;

-- 데이터 로드

COPY INTO raw.orders

FROM @my_s3_stage/orders/

FILE_FORMAT = (TYPE = 'PARQUET')

PATTERN = '.*[.]parquet';

9. 데이터 레이크 / 레이크하우스

테이블 포맷 비교

전통적 데이터 레이크 문제점:

- ACID 트랜잭션 없음

- 스키마 강제 없음

- 시간 여행(Time Travel) 불가

- 작은 파일 문제

레이크하우스 테이블 포맷이 해결:

Delta Lake : Databricks 주도, Spark 통합 최강

Apache Iceberg : Netflix 개발, 벤더 중립

Apache Hudi : Uber 개발, 증분 처리 특화

| 기능 | Delta Lake | Apache Iceberg | Apache Hudi |

|------|-----------|---------------|-------------|

| ACID 트랜잭션 | O | O | O |

| 스키마 진화 | O | O | O |

| 시간 여행 | O | O | O |

| 파티션 진화 | 제한적 | O (숨은 파티셔닝) | 제한적 |

| 엔진 호환 | Spark 위주 | Spark, Flink, Trino | Spark, Flink |

| 주요 플랫폼 | Databricks | 다수 벤더 채택 | AWS 중심 |

Delta Lake 예시 (PySpark)

from delta.tables import DeltaTable

Delta 테이블 생성

orders_df.write \

.format("delta") \

.mode("overwrite") \

.partitionBy("order_date") \

.save("s3://data-lake/delta/orders")

UPSERT (Merge)

delta_table = DeltaTable.forPath(spark, "s3://data-lake/delta/orders")

delta_table.alias("target").merge(

new_orders_df.alias("source"),

"target.order_id = source.order_id"

).whenMatchedUpdateAll() \

.whenNotMatchedInsertAll() \

.execute()

시간 여행

old_data = spark.read \

.format("delta") \

.option("versionAsOf", 5) \

.load("s3://data-lake/delta/orders")

10. 데이터 품질

10.1 Great Expectations

context = gx.get_context()

데이터 소스 연결

datasource = context.sources.add_pandas("my_datasource")

data_asset = datasource.add_csv_asset("orders", filepath_or_buffer="orders.csv")

Expectation Suite 정의

suite = context.add_expectation_suite("orders_validation")

기대치 정의

suite.add_expectation(

gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")

)

suite.add_expectation(

gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")

)

suite.add_expectation(

gx.expectations.ExpectColumnValuesToBeBetween(

column="amount", min_value=0, max_value=100000

)

)

검증 실행

results = context.run_checkpoint(

checkpoint_name="orders_checkpoint"

)

print(f"성공: {results.success}")

10.2 데이터 계약 (Data Contracts)

data-contracts/orders-contract.yaml

dataContractSpecification: 0.9.3

id: orders-contract

info:

title: Orders Data Contract

version: 1.0.0

owner: data-team

contact:

email: data-team@company.com

schema:

type: object

properties:

order_id:

type: string

description: "고유 주문 ID"

required: true

unique: true

customer_id:

type: string

required: true

amount:

type: number

minimum: 0

maximum: 100000

status:

type: string

enum: ["pending", "completed", "cancelled"]

created_at:

type: timestamp

required: true

quality:

completeness:

- field: order_id

threshold: 100

- field: customer_id

threshold: 99.9

freshness:

maxDelay: "PT1H" # 1시간 이내

11. 오케스트레이션 비교

Airflow vs Dagster vs Prefect vs Mage

| 항목 | Airflow | Dagster | Prefect | Mage |

|------|---------|---------|---------|------|

| 접근 방식 | DAG 중심 | 자산(Asset) 중심 | 플로우 중심 | 블록 중심 |

| 학습 곡선 | 높음 | 중간 | 낮음 | 낮음 |

| 로컬 개발 | 복잡 | 우수 | 우수 | 우수 |

| 테스트 | 어려움 | 내장 지원 | 좋음 | 좋음 |

| UI | 기능적 | 모던 | 모던 | 모던 |

| 커뮤니티 | 매우 큼 | 성장 중 | 성장 중 | 소규모 |

| 프로덕션 실적 | 매우 많음 | 늘어나는 중 | 늘어나는 중 | 초기 단계 |

| 클라우드 | MWAA, Composer | Dagster Cloud | Prefect Cloud | Mage Pro |

선택 가이드:

├── 대규모 기업, 복잡한 워크플로 → Airflow

├── 데이터 자산 중심 사고 → Dagster

├── 빠른 시작, Python 네이티브 → Prefect

└── 노코드/로우코드 선호 → Mage

12. 모던 데이터 스택 다이어그램

모던 데이터 스택 (2025):

데이터 소스 수집/통합 저장 변환 분석/BI

────────── ────────── ────────── ────────── ──────────

SaaS APIs ──┐

Databases ──┼──→ Fivetran/Airbyte ──→ Snowflake ──→ dbt ──→ Looker

Event Logs ──┤ BigQuery Dataform Metabase

Files ──┘ Redshift Tableau

Kafka/ ──────→ Flink/Spark ──→ Delta Lake ──→ Spark SQL ──→ 실시간

Kinesis Streaming Iceberg 대시보드

오케스트레이션: Airflow / Dagster

품질: Great Expectations / dbt tests

카탈로그: DataHub / Atlan / OpenMetadata

모니터링: Monte Carlo / Datadog

13. 퀴즈

Q1: ETL vs ELT

**정답:**

핵심 차이점은 변환(Transform)이 발생하는 위치입니다. ETL은 별도의 변환 서버에서 변환 후 적재하고, ELT는 데이터를 먼저 웨어하우스에 적재한 후 웨어하우스 내에서 변환합니다.

ELT를 선택해야 하는 경우:

- 클라우드 웨어하우스(BigQuery, Snowflake)를 사용하는 경우

- 원본 데이터 보존이 중요한 경우

- 변환 로직이 자주 바뀌어 유연성이 필요한 경우

- dbt 같은 도구로 SQL 기반 변환을 원하는 경우

Q2: Spark 파티셔닝

**정답:**

`repartition()`은 전체 셔플을 수행하여 데이터를 지정된 수의 파티션으로 균등하게 재분배합니다. 파티션 수를 늘리거나 특정 컬럼으로 파티셔닝할 때 사용합니다.

`coalesce()`는 셔플 없이 파티션 수만 줄입니다. 기존 파티션을 합치는 것이라 파티션 수를 줄일 때만 사용할 수 있으며, 네트워크 비용이 적습니다.

파티션 수를 줄여야 할 때는 coalesce(), 늘리거나 균등 분배가 필요할 때는 repartition()을 사용합니다.

Q3: Airflow XCom

**정답:**

XCom(Cross-Communication)은 Airflow 태스크 간에 소량의 데이터를 전달하는 메커니즘입니다. 메타데이터 DB에 저장됩니다.

제한 사항:

- 소량 데이터만 전달(기본 48KB, 최대 수 MB)

- 대용량 데이터는 S3/GCS 등 외부 저장소 경로만 전달

- 직렬화 가능한 데이터만 전달(JSON serializable)

- 메타데이터 DB에 부하를 줄 수 있음

대안: 대용량 데이터는 임시 파일이나 클라우드 스토리지를 사용하고, XCom으로는 파일 경로만 전달합니다.

Q4: Exactly-Once 시맨틱스

**정답:**

Kafka Exactly-Once는 3가지 요소로 구현됩니다.

1. **Idempotent Producer**: Producer에 enable.idempotence=true를 설정하면 브로커가 중복 메시지를 자동으로 제거합니다.

2. **Transactional Producer**: 여러 파티션/토픽에 걸친 원자적 쓰기를 보장합니다. initTransactions(), beginTransaction(), commitTransaction() API를 사용합니다.

3. **Consumer read_committed**: Consumer에서 isolation.level=read_committed를 설정하면 커밋된 트랜잭션의 메시지만 읽습니다.

Flink와 연동 시, Flink의 체크포인트 메커니즘과 Kafka의 트랜잭셔널 API를 결합하여 End-to-End Exactly-Once를 달성합니다.

Q5: dbt 증분 모델

**정답:**

dbt incremental 모델은 마지막 실행 이후 새로 추가되거나 변경된 데이터만 처리합니다.

동작 방식:

1. 첫 실행 시 전체 데이터를 처리 (CREATE TABLE AS)

2. 이후 실행 시 `is_incremental()` 조건으로 새 데이터만 필터링

3. 새 데이터를 기존 테이블에 MERGE 또는 INSERT

사용 시기:

- 대용량 팩트 테이블 (매번 전체 재구축이 비용이 높을 때)

- 이벤트/로그 데이터 (시간순 append)

- 점진적으로 증가하는 데이터

핵심은 unique_key 설정과 적절한 증분 조건(WHERE) 지정입니다.

14. 참고 자료

- [Apache Spark 공식 문서](https://spark.apache.org/docs/latest/)

- [Apache Airflow 공식 문서](https://airflow.apache.org/docs/)

- [Apache Kafka 공식 문서](https://kafka.apache.org/documentation/)

- [Apache Flink 공식 문서](https://flink.apache.org/docs/stable/)

- [dbt 공식 문서](https://docs.getdbt.com/)

- [Great Expectations 문서](https://docs.greatexpectations.io/)

- [Delta Lake 공식 문서](https://docs.delta.io/)

- [Apache Iceberg 공식 문서](https://iceberg.apache.org/docs/latest/)

- [BigQuery 공식 문서](https://cloud.google.com/bigquery/docs)

- [Snowflake 공식 문서](https://docs.snowflake.com/)

- [Fundamentals of Data Engineering (O'Reilly)](https://www.oreilly.com/library/view/fundamentals-of-data/9781098108298/)

- [The Data Warehouse Toolkit (Kimball)](https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/kimball-techniques/dimensional-modeling-techniques/)

현재 단락 (1/777)

데이터 엔지니어는 조직의 데이터 인프라를 설계, 구축, 유지보수하는 역할을 담당합니다. 데이터 과학자가 분석할 수 있도록 데이터를 수집, 변환, 저장하는 파이프라인을 만드는 것이 ...

작성 글자: 0원문 글자: 21,044작성 단락: 0/777