목차
1. 데이터 엔지니어링이란
데이터 엔지니어링은 원시 데이터를 수집, 변환, 저장하여 분석 가능한 상태로 만드는 일련의 과정이다. 데이터 과학자가 모델을 만들고, 분석가가 인사이트를 도출하려면, 먼저 데이터가 깨끗하고 접근 가능한 형태로 준비되어 있어야 한다. 이 준비 작업을 책임지는 것이 데이터 엔지니어이다.
역할 비교
| 역할 | 주요 업무 | 핵심 기술 |
|---|---|---|
| 데이터 엔지니어 | 파이프라인 구축, 인프라 관리, 데이터 변환 | Python, SQL, Spark, Kafka, Airflow |
| 데이터 사이언티스트 | 모델링, 예측, 실험 설계 | Python, R, TensorFlow, 통계학 |
| 데이터 분석가 | 리포팅, 대시보드, 비즈니스 인사이트 | SQL, Tableau, Excel, BI 도구 |
데이터 엔지니어는 신뢰할 수 있는 데이터 인프라를 만드는 사람이다. 아무리 뛰어난 모델도 데이터 파이프라인이 불안정하면 의미가 없다.
데이터 엔지니어의 핵심 역량
- SQL 숙련도: 복잡한 조인, 윈도우 함수, CTE 등을 자유자재로 사용
- 프로그래밍: Python이 사실상 표준. Scala, Java도 Spark 생태계에서 사용
- 분산 시스템 이해: 파티셔닝, 복제, CAP 정리 등 기본 개념 숙지
- 클라우드 서비스: AWS, GCP, Azure의 데이터 관련 서비스 활용 능력
- 데이터 모델링: 정규화, 비정규화, 차원 모델링에 대한 이해
2. 데이터 아키텍처
현대 데이터 아키텍처는 크게 네 가지 패턴으로 분류된다.
2.1 데이터 레이크 (Data Lake)
데이터 레이크는 구조화, 반구조화, 비구조화 데이터를 원시 형태 그대로 저장하는 대규모 저장소이다. 스키마를 읽기 시점에 적용한다(Schema-on-Read).
원시 데이터 저장소 (Data Lake)
├── 구조화 데이터 (CSV, Parquet, ORC)
├── 반구조화 데이터 (JSON, XML, Avro)
└── 비구조화 데이터 (이미지, 로그, 텍스트)
장점: 저렴한 스토리지, 유연한 스키마, 모든 형태의 데이터 수용 단점: 관리가 어렵고, 잘못하면 "데이터 늪(Data Swamp)"이 될 수 있음
대표 기술: AWS S3, Azure Data Lake Storage, Google Cloud Storage
2.2 데이터 웨어하우스 (Data Warehouse)
데이터 웨어하우스는 분석에 최적화된 구조화 데이터 저장소이다. 스키마를 쓰기 시점에 적용한다(Schema-on-Write).
분석용 저장소 (Data Warehouse)
├── 팩트 테이블 (매출, 주문, 클릭)
├── 디멘전 테이블 (사용자, 상품, 시간)
└── 집계 테이블 (일별/월별 요약)
장점: 빠른 쿼리 성능, 일관된 스키마, ACID 트랜잭션 지원 단점: 비구조화 데이터 처리 한계, 스키마 변경이 어려움
대표 기술: Snowflake, BigQuery, Amazon Redshift
2.3 레이크하우스 (Lakehouse)
레이크하우스는 데이터 레이크의 유연성과 데이터 웨어하우스의 관리 기능을 결합한 아키텍처이다.
핵심 기능:
- ACID 트랜잭션 지원
- 스키마 강제 및 발전(Schema Evolution)
- 데이터 레이크 위에서 SQL 분석 가능
- 스트리밍과 배치 처리 통합
대표 기술: Delta Lake, Apache Iceberg, Apache Hudi
2.4 메달리온 아키텍처 (Medallion Architecture)
메달리온 아키텍처는 데이터를 세 단계로 계층화하여 관리한다.
Bronze (원시 데이터)
→ 소스에서 그대로 수집한 데이터
→ 최소한의 변환만 적용
Silver (정제 데이터)
→ 중복 제거, 타입 변환, 검증 완료
→ 비즈니스 로직 적용 전 상태
Gold (비즈니스 데이터)
→ 집계, 결합, 비즈니스 규칙 적용
→ 대시보드와 ML 모델에서 직접 사용
이 패턴은 Databricks에서 대중화되었으며, 데이터 품질을 단계별로 보장할 수 있다는 장점이 있다.
3. ETL vs ELT
3.1 ETL (Extract, Transform, Load)
ETL은 전통적인 데이터 통합 방식이다.
소스 → [추출] → [변환] → [적재] → 웨어하우스
- Extract: 소스 시스템에서 데이터 추출
- Transform: 스테이징 영역에서 데이터 정제, 변환, 집계
- Load: 변환된 데이터를 타겟 시스템에 적재
3.2 ELT (Extract, Load, Transform)
ELT는 클라우드 웨어하우스의 강력한 컴퓨팅 파워를 활용하는 현대적 방식이다.
소스 → [추출] → [적재] → 웨어하우스 → [변환]
- Extract: 소스에서 데이터 추출
- Load: 원시 데이터를 그대로 웨어하우스에 적재
- Transform: 웨어하우스 내부에서 SQL로 변환
3.3 언제 무엇을 사용하는가
| 기준 | ETL | ELT |
|---|---|---|
| 데이터 볼륨 | 소~중규모 | 대규모 |
| 변환 복잡도 | 복잡한 비즈니스 로직 | SQL로 표현 가능한 변환 |
| 인프라 | 온프레미스 | 클라우드 |
| 데이터 보안 | 민감 데이터 사전 마스킹 필요 | 웨어하우스 내 접근 제어로 충분 |
| 대표 도구 | Informatica, Talend | dbt, Fivetran, Airbyte |
3.4 주요 도구
dbt (data build tool): SQL 기반 변환 도구. ELT의 T 부분을 담당한다.
-- dbt 모델 예시: 일별 매출 집계
-- models/marts/daily_revenue.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
payments AS (
SELECT * FROM {{ ref('stg_payments') }}
)
SELECT
o.order_date,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(p.amount) AS total_revenue,
AVG(p.amount) AS avg_order_value
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.status = 'completed'
GROUP BY o.order_date
Airbyte: 오픈소스 데이터 통합 플랫폼. 300개 이상의 커넥터를 제공한다.
Fivetran: 관리형 데이터 통합 서비스. 설정이 간편하고 안정적이다.
4. 배치 처리
배치 처리는 축적된 대량의 데이터를 한꺼번에 처리하는 방식이다. 실시간성이 필요하지 않은 대부분의 분석 작업에 적합하다.
4.1 Apache Spark
Apache Spark는 대규모 데이터 처리를 위한 통합 분석 엔진이다.
핵심 특징:
- 인메모리 처리: MapReduce 대비 최대 100배 빠른 성능
- 통합 API: 배치, 스트리밍, ML, 그래프 처리를 하나의 프레임워크로
- 다양한 언어 지원: Python(PySpark), Scala, Java, R, SQL
# PySpark 예시: 일별 매출 집계
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count
spark = SparkSession.builder \
.appName("DailyRevenue") \
.getOrCreate()
# 데이터 읽기
orders = spark.read.parquet("s3://data-lake/orders/")
payments = spark.read.parquet("s3://data-lake/payments/")
# 변환
daily_revenue = (
orders
.join(payments, "order_id")
.filter(col("status") == "completed")
.groupBy("order_date")
.agg(
count("order_id").alias("total_orders"),
spark_sum("amount").alias("total_revenue")
)
.orderBy("order_date")
)
# 결과 저장
daily_revenue.write \
.mode("overwrite") \
.parquet("s3://data-warehouse/daily_revenue/")
4.2 Spark SQL과 DataFrame
Spark SQL을 사용하면 SQL에 익숙한 분석가도 대규모 데이터를 처리할 수 있다.
# DataFrame 등록 후 SQL 사용
orders.createOrReplaceTempView("orders")
payments.createOrReplaceTempView("payments")
result = spark.sql("""
SELECT
o.order_date,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(p.amount) AS total_revenue
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.status = 'completed'
GROUP BY o.order_date
ORDER BY o.order_date
""")
4.3 MapReduce와의 비교
| 기준 | MapReduce | Spark |
|---|---|---|
| 처리 속도 | 디스크 기반, 느림 | 인메모리 기반, 빠름 |
| 프로그래밍 모델 | Map과 Reduce 두 단계 | 풍부한 변환 연산 제공 |
| 실시간 처리 | 불가 | Structured Streaming 지원 |
| 학습 곡선 | 높음 | 상대적으로 낮음 |
| 생태계 | Hadoop 생태계 | 독립적 + Hadoop 호환 |
5. 스트림 처리
스트림 처리는 데이터가 생성되는 즉시 실시간으로 처리하는 방식이다.
5.1 Apache Kafka
Kafka는 분산 이벤트 스트리밍 플랫폼이다. 실시간 데이터 파이프라인과 스트리밍 애플리케이션의 핵심 인프라로 사용된다.
핵심 개념:
- Topic: 메시지가 발행되는 카테고리
- Producer: 토픽에 메시지를 발행하는 주체
- Consumer: 토픽에서 메시지를 구독하는 주체
- Broker: 메시지를 저장하고 전달하는 서버
- Partition: 토픽을 분할하여 병렬 처리 지원
- Consumer Group: 여러 컨슈머가 토픽을 분담하여 처리
# Kafka Producer 예시 (Python)
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 주문 이벤트 발행
event = {
"order_id": "ORD-12345",
"user_id": "USR-678",
"amount": 45000,
"timestamp": "2026-04-13T10:30:00Z"
}
producer.send('order-events', value=event)
producer.flush()
# Kafka Consumer 예시 (Python)
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'order-events',
bootstrap_servers=['localhost:9092'],
group_id='order-processing-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
for message in consumer:
order = message.value
print(f"주문 처리: {order['order_id']}, 금액: {order['amount']}")
5.2 CDC (Change Data Capture)
CDC는 데이터베이스의 변경 사항을 실시간으로 캡처하여 다른 시스템에 전파하는 기술이다.
운영 DB → [CDC] → Kafka → [Stream Processing] → 웨어하우스
→ 검색 엔진
→ 캐시
대표 도구: Debezium. MySQL, PostgreSQL, MongoDB 등 다양한 DB에서 변경 이벤트를 캡처하여 Kafka로 전송한다.
{
"before": null,
"after": {
"id": 1001,
"name": "김철수",
"email": "kim@example.com"
},
"source": {
"connector": "postgresql",
"db": "users_db",
"table": "users"
},
"op": "c",
"ts_ms": 1681364400000
}
위 JSON은 Debezium이 캡처한 CDC 이벤트의 예시이다. "op": "c"는 INSERT 작업을 의미한다.
5.3 Apache Flink
Flink는 상태 기반 스트림 처리 엔진이다. 정확히 한 번(Exactly-once) 처리를 보장하며, 이벤트 시간 기반 윈도우 처리에 강점이 있다.
// Flink 스트림 처리 예시 (Java)
DataStream<OrderEvent> orders = env
.addSource(new FlinkKafkaConsumer<>(
"order-events",
new OrderEventSchema(),
kafkaProps
));
// 5분 단위 윈도우 집계
DataStream<WindowedRevenue> revenue = orders
.keyBy(OrderEvent::getCategory)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new RevenueAggregator());
revenue.addSink(new JdbcSink<>(...));
5.4 배치 vs 스트리밍 비교
| 기준 | 배치 처리 | 스트림 처리 |
|---|---|---|
| 지연 시간 | 분~시간 | 밀리초~초 |
| 데이터 완전성 | 완전한 데이터셋 | 점진적으로 도착 |
| 복잡도 | 상대적으로 단순 | 상태 관리, 순서 보장 등 복잡 |
| 비용 | 상대적으로 저렴 | 항상 실행 중이므로 비용 높음 |
| 적합한 사용 사례 | 일별 리포트, ML 학습 | 실시간 대시보드, 이상 탐지 |
6. 워크플로우 오케스트레이션
6.1 Apache Airflow
Airflow는 워크플로우를 프로그래밍 방식으로 작성, 스케줄링, 모니터링하는 플랫폼이다. 복잡한 데이터 파이프라인을 DAG(Directed Acyclic Graph)로 정의한다.
핵심 개념:
- DAG: 작업의 실행 순서와 의존성을 정의하는 비순환 방향 그래프
- Operator: 실제 작업을 수행하는 단위 (BashOperator, PythonOperator 등)
- Task: DAG 내의 개별 작업 인스턴스
- Sensor: 특정 조건이 충족될 때까지 대기하는 특수 Operator
- XCom: Task 간 데이터 전달 메커니즘
# Airflow DAG 예시
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['team@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='daily_revenue_pipeline',
default_args=default_args,
description='일별 매출 데이터 처리 파이프라인',
schedule_interval='0 2 * * *', # 매일 오전 2시
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['revenue', 'daily'],
) as dag:
# 1. 소스 데이터 존재 확인
check_source = S3KeySensor(
task_id='check_source_data',
bucket_name='raw-data-bucket',
bucket_key='orders/{{ ds }}/*.parquet',
timeout=3600,
poke_interval=300,
)
# 2. 데이터 추출
extract = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders_from_source,
)
# 3. 데이터 변환
transform = PythonOperator(
task_id='transform_orders',
python_callable=transform_and_aggregate,
)
# 4. 웨어하우스 적재
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_snowflake,
)
# 5. 데이터 품질 검증
validate = PythonOperator(
task_id='validate_data_quality',
python_callable=run_quality_checks,
)
# 의존성 정의
check_source >> extract >> transform >> load >> validate
6.2 DAG 설계 모범 사례
- 멱등성(Idempotency): 같은 작업을 여러 번 실행해도 결과가 동일해야 한다
- 원자성(Atomicity): 각 Task는 하나의 논리적 단위만 수행한다
- 재시도 전략: 일시적 오류를 대비한 적절한 재시도 설정
- 모니터링: SLA 설정, 실패 알림, 실행 시간 추적
- 테스트: DAG 구조 테스트, Task 단위 테스트 필수
6.3 다른 오케스트레이션 도구
| 도구 | 특징 | 적합한 경우 |
|---|---|---|
| Apache Airflow | Python 기반, 풍부한 생태계 | 복잡한 배치 파이프라인 |
| Prefect | 현대적 API, 동적 워크플로우 | 유연한 워크플로우 필요 |
| Dagster | 데이터 자산 중심, 강력한 타입 시스템 | 데이터 품질 중시 |
| Mage | 노트북 스타일 인터페이스 | 빠른 프로토타이핑 |
| AWS Step Functions | 서버리스, AWS 네이티브 | AWS 중심 아키텍처 |
7. 데이터 웨어하우스
7.1 주요 클라우드 웨어하우스
Snowflake
- 컴퓨팅과 스토리지 분리 아키텍처
- 멀티클라우드 지원 (AWS, Azure, GCP)
- Time Travel, Zero-Copy Clone 같은 고유 기능
- 동시성(Concurrency) 처리에 강점
Google BigQuery
- 서버리스 아키텍처 (인프라 관리 불필요)
- 쿼리 기반 과금 모델
- ML 모델을 SQL로 생성 가능 (BigQuery ML)
- 대규모 데이터 분석에 최적화
Amazon Redshift
- AWS 생태계와 긴밀한 통합
- Redshift Serverless로 서버리스 옵션 제공
- Redshift Spectrum으로 S3 데이터 직접 쿼리 가능
- 기존 PostgreSQL 도구와 호환
7.2 차원 모델링
데이터 웨어하우스에서 가장 널리 사용되는 모델링 기법은 스타 스키마와 스노우플레이크 스키마이다.
스타 스키마
중앙에 팩트 테이블이 있고, 주변에 디멘전 테이블이 직접 연결되는 구조이다.
-- 팩트 테이블
CREATE TABLE fact_sales (
sale_id BIGINT PRIMARY KEY,
date_key INT REFERENCES dim_date(date_key),
product_key INT REFERENCES dim_product(product_key),
customer_key INT REFERENCES dim_customer(customer_key),
store_key INT REFERENCES dim_store(store_key),
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(12,2),
discount_amount DECIMAL(10,2)
);
-- 디멘전 테이블
CREATE TABLE dim_product (
product_key INT PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(200),
category VARCHAR(100),
subcategory VARCHAR(100),
brand VARCHAR(100),
unit_cost DECIMAL(10,2)
);
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
full_date DATE,
year INT,
quarter INT,
month INT,
week INT,
day_of_week VARCHAR(20),
is_holiday BOOLEAN
);
스노우플레이크 스키마
디멘전 테이블이 더 세분화된 하위 테이블로 정규화된 구조이다. 스타 스키마보다 저장 공간을 절약하지만 조인이 많아져 쿼리가 복잡해진다.
7.3 SCD (Slowly Changing Dimension)
디멘전 데이터의 변경 이력을 관리하는 방법이다.
| 유형 | 설명 | 예시 |
|---|---|---|
| SCD Type 1 | 기존 값을 덮어씀 | 고객 전화번호 변경 시 이전 번호 삭제 |
| SCD Type 2 | 새로운 행을 추가하여 이력 보존 | 고객 주소 변경 시 유효 기간과 함께 새 행 추가 |
| SCD Type 3 | 이전 값과 현재 값을 별도 컬럼으로 관리 | current_address, previous_address |
8. 데이터 품질
데이터 품질은 파이프라인의 신뢰성을 결정하는 핵심 요소이다. "쓰레기가 들어가면 쓰레기가 나온다(Garbage In, Garbage Out)"는 데이터 분야의 오래된 격언이다.
8.1 데이터 품질의 6가지 차원
- 정확성(Accuracy): 데이터가 실제 값을 올바르게 반영하는가
- 완전성(Completeness): 누락된 데이터가 없는가
- 일관성(Consistency): 여러 시스템 간 데이터가 모순되지 않는가
- 적시성(Timeliness): 데이터가 필요한 시점에 이용 가능한가
- 유일성(Uniqueness): 중복 데이터가 없는가
- 유효성(Validity): 데이터가 정의된 규칙과 형식을 따르는가
8.2 Great Expectations
Great Expectations는 데이터 검증, 문서화, 프로파일링을 위한 오픈소스 라이브러리이다.
import great_expectations as gx
# 데이터 컨텍스트 생성
context = gx.get_context()
# 데이터소스 연결
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset(name="orders")
# 기대치(Expectation) 정의
batch = data_asset.build_batch_request(dataframe=df)
validator = context.get_validator(batch_request=batch)
# 데이터 품질 규칙 정의
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between(
"amount", min_value=0, max_value=1000000
)
validator.expect_column_values_to_be_in_set(
"status", ["pending", "completed", "cancelled", "refunded"]
)
validator.expect_column_values_to_match_regex(
"email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
# 검증 실행
results = validator.validate()
print(f"성공 여부: {results.success}")
8.3 데이터 관측성 (Data Observability)
데이터 관측성은 데이터 시스템의 상태를 지속적으로 모니터링하여 문제를 조기에 탐지하는 것이다.
핵심 지표:
- Freshness: 데이터가 얼마나 최신인가
- Volume: 예상 대비 데이터 양 변화
- Schema: 스키마 변경 감지
- Distribution: 데이터 분포 이상 탐지
- Lineage: 데이터의 출처와 흐름 추적
대표 도구: Monte Carlo, Atlan, Soda, Elementary
9. 데이터 거버넌스
9.1 메타데이터 관리
메타데이터는 "데이터에 대한 데이터"로, 크게 세 가지로 분류된다.
- 기술 메타데이터: 스키마, 데이터 타입, 파티션 정보, 저장 위치
- 비즈니스 메타데이터: 데이터 정의, 소유자, SLA, 비즈니스 규칙
- 운영 메타데이터: 처리 시간, 행 수, 에러 로그, 접근 이력
9.2 데이터 카탈로그
데이터 카탈로그는 조직 내 모든 데이터 자산을 검색, 탐색, 이해할 수 있게 하는 도구이다.
주요 기능:
- 자동 메타데이터 수집 및 인덱싱
- 데이터 리니지(Lineage) 시각화
- 데이터 사전(Dictionary) 관리
- 태그 및 분류 체계
- 협업 기능 (댓글, 평가, 위키)
대표 도구: Apache Atlas, DataHub, Atlan, Alation
9.3 접근 제어
데이터 접근 제어의 핵심 원칙:
- 최소 권한 원칙: 업무에 필요한 최소한의 권한만 부여
- 역할 기반 접근 제어(RBAC): 역할 단위로 권한 관리
-- Snowflake RBAC 예시
CREATE ROLE data_analyst;
CREATE ROLE data_engineer;
CREATE ROLE data_admin;
-- 분석가 역할: 읽기만 가능
GRANT USAGE ON DATABASE analytics_db TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics_db.gold TO ROLE data_analyst;
-- 엔지니어 역할: 읽기/쓰기 가능
GRANT ALL PRIVILEGES ON DATABASE analytics_db TO ROLE data_engineer;
GRANT ALL PRIVILEGES ON ALL SCHEMAS IN DATABASE analytics_db TO ROLE data_engineer;
-- 사용자에게 역할 할당
GRANT ROLE data_analyst TO USER analyst_kim;
GRANT ROLE data_engineer TO USER engineer_park;
- 행 수준 보안(Row-Level Security): 사용자별로 접근 가능한 행을 제한
- 열 수준 보안(Column-Level Security): 민감 컬럼에 대한 접근 제한 (마스킹 포함)
- 감사 로그(Audit Log): 모든 데이터 접근 이력 기록
10. 실전 파이프라인 예제
10.1 전체 아키텍처
실제 환경에서의 엔드투엔드 데이터 파이프라인 구성 예시이다.
[소스 시스템]
├── 운영 DB (PostgreSQL) ─── Debezium CDC ──┐
├── 웹 이벤트 (Clickstream) ─── SDK ──────────┤
├── 외부 API ─── Airbyte ─────────────────────┤
└── 파일 (CSV/Excel) ─── Airflow ─────────────┤
│
[메시지 브로커] │
└── Apache Kafka ◄────────────────────────────┘
├── 실시간 경로: Flink → 실시간 대시보드
└── 배치 경로: Spark → S3 (Data Lake)
│
[변환 레이어] │
└── dbt (Silver/Gold) ◄──────┘
│
[웨어하우스]
└── Snowflake
├── Gold 레이어 → Looker/Tableau 대시보드
└── ML Feature Store → 모델 학습
10.2 Airflow로 전체 파이프라인 오케스트레이션
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=10),
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='e2e_data_pipeline',
default_args=default_args,
schedule_interval='0 3 * * *',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# 1단계: Spark로 원시 데이터 처리
spark_process = SparkSubmitOperator(
task_id='spark_raw_processing',
application='s3://scripts/process_raw_data.py',
conn_id='spark_default',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
},
)
# 2단계: dbt로 데이터 변환
dbt_transform = DbtCloudRunJobOperator(
task_id='dbt_transform',
job_id=12345,
check_interval=30,
timeout=3600,
)
# 3단계: 데이터 품질 검증
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=run_great_expectations_suite,
)
# 4단계: 완료 알림
notify = SlackWebhookOperator(
task_id='slack_notification',
slack_webhook_conn_id='slack_webhook',
message='Daily pipeline completed successfully.',
)
spark_process >> dbt_transform >> quality_check >> notify
10.3 Kafka + Spark 실시간 처리 예제
# Spark Structured Streaming + Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("RealTimeRevenue") \
.getOrCreate()
# Kafka 스키마 정의
order_schema = StructType([
StructField("order_id", StringType()),
StructField("user_id", StringType()),
StructField("amount", DoubleType()),
StructField("category", StringType()),
StructField("timestamp", TimestampType()),
])
# Kafka에서 스트리밍 데이터 읽기
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "order-events")
.option("startingOffsets", "latest")
.load()
)
# JSON 파싱
orders = (
raw_stream
.select(from_json(
col("value").cast("string"),
order_schema
).alias("data"))
.select("data.*")
)
# 5분 윈도우 집계
windowed_revenue = (
orders
.withWatermark("timestamp", "10 minutes")
.groupBy(
window("timestamp", "5 minutes"),
"category"
)
.agg(spark_sum("amount").alias("revenue"))
)
# 결과를 콘솔에 출력 (프로덕션에서는 DB나 대시보드로)
query = (
windowed_revenue.writeStream
.outputMode("update")
.format("console")
.option("truncate", False)
.trigger(processingTime="1 minute")
.start()
)
query.awaitTermination()
마무리
데이터 엔지니어링은 넓은 범위의 기술과 개념을 포함하는 분야이다. 이 글에서 다룬 내용을 요약하면 다음과 같다.
| 영역 | 핵심 기술 | 대표 도구 |
|---|---|---|
| 데이터 저장 | 레이크, 웨어하우스, 레이크하우스 | S3, Snowflake, Delta Lake |
| 데이터 통합 | ETL/ELT | dbt, Airbyte, Fivetran |
| 배치 처리 | 분산 컴퓨팅 | Apache Spark |
| 스트림 처리 | 이벤트 스트리밍, CDC | Apache Kafka, Flink, Debezium |
| 오케스트레이션 | 워크플로우 관리 | Apache Airflow, Dagster |
| 데이터 품질 | 검증, 관측성 | Great Expectations, Monte Carlo |
| 거버넌스 | 메타데이터, 접근 제어 | DataHub, Apache Atlas |
처음부터 모든 기술을 한꺼번에 배울 필요는 없다. SQL과 Python을 기반으로 하나의 파이프라인을 직접 구축해보는 것이 가장 효과적인 학습 방법이다. 작은 프로젝트에서 시작하여 점진적으로 확장해 나가자.
퀴즈: 데이터 엔지니어링 기본 개념
Q1. 데이터 레이크와 데이터 웨어하우스의 가장 큰 차이점은?
A: 데이터 레이크는 Schema-on-Read 방식으로 원시 데이터를 그대로 저장하고 읽는 시점에 스키마를 적용한다. 데이터 웨어하우스는 Schema-on-Write 방식으로 데이터 저장 시점에 미리 정의된 스키마를 강제한다.
Q2. ETL과 ELT에서 변환이 일어나는 위치의 차이는?
A: ETL에서는 스테이징 영역(별도 서버)에서 변환이 일어난 후 웨어하우스에 적재된다. ELT에서는 원시 데이터를 먼저 웨어하우스에 적재한 후 웨어하우스의 컴퓨팅 파워를 활용하여 내부에서 변환한다.
Q3. Kafka에서 Consumer Group의 역할은?
A: Consumer Group은 여러 Consumer가 하나의 토픽을 분담하여 처리할 수 있게 한다. 같은 그룹 내의 Consumer들은 각각 다른 파티션을 담당하여 병렬 처리가 가능하다. 이를 통해 수평 확장이 용이해진다.
Q4. 메달리온 아키텍처의 세 계층은?
A: Bronze(원시 데이터 그대로 저장), Silver(정제 및 검증 완료된 데이터), Gold(비즈니스 로직과 집계가 적용된 분석용 데이터)이다.
Q5. Airflow에서 DAG의 역할은?
A: DAG(Directed Acyclic Graph)는 작업(Task)들의 실행 순서와 의존성을 정의하는 비순환 방향 그래프이다. 어떤 작업이 어떤 작업 이후에 실행되어야 하는지를 선언적으로 표현한다.
현재 단락 (1/539)
1. [데이터 엔지니어링이란](#1-데이터-엔지니어링이란)