- Authors

- Name
- Youngju Kim
- @fjvbn20031
목차
- 데이터 엔지니어링 개요
- Apache Spark
- Apache Kafka
- Apache Airflow
- dbt (Data Build Tool)
- Delta Lake / Apache Iceberg
- 피처 스토어 (Feature Store)
- 데이터 품질 & 모니터링
- 클라우드 데이터 플랫폼
- 퀴즈
데이터 엔지니어링 개요
데이터 엔지니어링은 AI/ML 시스템의 근간을 이루는 분야입니다. 데이터 파이프라인을 설계하고 구축하여 분석 및 모델 학습에 적합한 형태로 데이터를 제공하는 것이 핵심 역할입니다.
역할 비교: 데이터 엔지니어 vs 데이터 사이언티스트 vs ML 엔지니어
| 역할 | 주요 책임 | 핵심 도구 |
|---|---|---|
| 데이터 엔지니어 | 파이프라인 구축, 데이터 저장소 관리 | Spark, Kafka, Airflow, dbt |
| 데이터 사이언티스트 | 데이터 분석, 모델 개발 | Python, Jupyter, Pandas, Scikit-learn |
| ML 엔지니어 | 모델 배포, 서빙 인프라 | MLflow, Kubeflow, TFX, Ray |
현대 데이터 스택 아키텍처
현대 데이터 스택(Modern Data Stack)은 클라우드 중심으로 재편되었습니다:
- 수집(Ingestion): Fivetran, Airbyte, Kafka
- 저장(Storage): Snowflake, BigQuery, Redshift, Delta Lake
- 변환(Transformation): dbt, Spark, Beam
- 오케스트레이션(Orchestration): Airflow, Prefect, Dagster
- 시각화(Visualization): Tableau, Looker, Metabase
배치 처리 vs 스트리밍 처리
**배치 처리(Batch Processing)**는 대량의 데이터를 주기적으로 처리합니다. 데이터 웨어하우스 ETL 작업, 일별 리포트 생성, 모델 재학습 등에 사용됩니다.
**스트리밍 처리(Stream Processing)**는 데이터가 생성되는 즉시 처리합니다. 실시간 사기 감지, 이상 탐지, 추천 시스템 업데이트 등에 적합합니다.
Lambda 아키텍처 vs Kappa 아키텍처
Lambda 아키텍처는 배치 레이어와 스피드 레이어를 병렬로 운영합니다. 정확한 배치 결과와 실시간 스트림 결과를 서빙 레이어에서 합칩니다. 복잡도가 높지만 정확성을 보장합니다.
Kappa 아키텍처는 스트리밍 레이어만 사용하여 단순화합니다. Kafka Streams, Flink 같은 도구의 발전으로 실용적인 선택이 되었습니다. 유지보수가 쉽고 일관성 있는 코드베이스를 유지할 수 있습니다.
Apache Spark
Apache Spark는 대규모 데이터 처리를 위한 통합 분석 엔진입니다. 인메모리 처리로 Hadoop MapReduce보다 최대 100배 빠른 성능을 제공합니다.
Spark 아키텍처
Spark 클러스터는 세 가지 핵심 컴포넌트로 구성됩니다:
- Driver: SparkContext를 실행하고 작업을 조율하는 마스터 프로세스
- Executor: 실제 데이터 처리를 수행하는 워커 프로세스
- Cluster Manager: YARN, Kubernetes, Mesos 또는 Spark Standalone
RDD, DataFrame, Dataset API
**RDD (Resilient Distributed Dataset)**는 Spark의 기본 데이터 구조로, 분산 컬렉션에 대한 불변 연산을 제공합니다.
DataFrame은 스키마가 있는 분산 컬렉션으로, SQL과 유사한 API를 제공합니다. Catalyst 옵티마이저를 통해 자동으로 쿼리를 최적화합니다.
Dataset은 DataFrame의 타입 안전 버전으로, Scala/Java에서 컴파일 타임 타입 체크를 지원합니다.
PySpark를 이용한 AI 피처 엔지니어링
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
spark = SparkSession.builder \
.appName("AI Feature Engineering") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 데이터 로드 및 변환
df = spark.read.parquet("s3://data-lake/features/")
df_clean = df.dropna() \
.withColumn("age_group", when(col("age") < 30, "young")
.when(col("age") < 50, "middle")
.otherwise("senior"))
# 집계 통계
stats_df = df_clean.groupBy("category").agg(
count("*").alias("cnt"),
avg("score").alias("avg_score")
)
# 피처 벡터 생성
assembler = VectorAssembler(
inputCols=["age", "income", "score"],
outputCol="features"
)
ml_df = assembler.transform(df_clean)
# 모델 학습
rf = RandomForestClassifier(numTrees=100, featuresCol="features")
model = rf.fit(ml_df)
Spark SQL 최적화 팁
- 파티셔닝: 자주 필터링하는 컬럼으로 파티션을 나눕니다.
- Broadcast Join: 작은 테이블은 브로드캐스트 조인을 사용합니다.
- 캐싱: 반복 사용하는 DataFrame은
cache()또는persist()로 메모리에 유지합니다. - AQE (Adaptive Query Execution): Spark 3.0+의 동적 쿼리 최적화를 활용합니다.
Apache Kafka
Apache Kafka는 고처리량, 내결함성을 갖춘 분산 이벤트 스트리밍 플랫폼입니다. LinkedIn이 개발하여 오픈소스로 공개했으며, 실시간 AI 파이프라인의 핵심 인프라로 자리잡았습니다.
Kafka 아키텍처
- Broker: 메시지를 저장하고 전달하는 서버. 클러스터는 여러 Broker로 구성됩니다.
- Topic: 메시지가 발행되는 카테고리/피드 이름
- Partition: Topic을 분할하여 병렬 처리를 가능하게 합니다. 각 파티션은 순서 보장.
- Consumer Group: 동일 Topic을 공유하는 컨슈머 집합. 로드 밸런싱 제공.
- Offset: 파티션 내 메시지의 고유 위치 식별자
프로듀서/컨슈머 패턴
from kafka import KafkaProducer, KafkaConsumer
import json
import numpy as np
# 프로듀서: 실시간 피처 데이터 전송
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def send_feature(data):
producer.send('ml-features', value={
'timestamp': data['ts'],
'features': data['features'].tolist(),
'user_id': data['user_id']
})
# 컨슈머: 실시간 AI 추론
consumer = KafkaConsumer(
'ml-features',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
import onnxruntime as ort
session = ort.InferenceSession('model.onnx')
for msg in consumer:
features = np.array(msg.value['features']).reshape(1, -1)
prediction = session.run(None, {'input': features.astype(np.float32)})[0]
print(f"User {msg.value['user_id']}: {prediction}")
Kafka Streams와 ksqlDB
Kafka Streams는 Java/Scala 라이브러리로, Kafka 토픽에서 읽어 변환 후 다시 Kafka에 쓰는 스트림 처리를 지원합니다. ksqlDB는 SQL을 사용해 스트리밍 쿼리를 작성할 수 있게 해주는 이벤트 스트리밍 데이터베이스입니다.
실시간 AI 파이프라인에서 Kafka의 역할:
- 이벤트 소스 (사용자 행동, 센서 데이터 등)
- 피처 엔지니어링 결과 전달
- 모델 예측 결과 발행
- A/B 테스트 결과 수집
Apache Airflow
Apache Airflow는 파이프라인을 프로그래밍 방식으로 정의, 스케줄링, 모니터링하는 워크플로우 관리 플랫폼입니다.
DAG (Directed Acyclic Graph) 개념
DAG은 방향이 있고 순환이 없는 그래프로, Airflow에서 워크플로우를 표현하는 방법입니다. 각 노드는 Task이며, 엣지는 의존성(실행 순서)을 나타냅니다.
Operator 유형
- PythonOperator: Python 함수 실행
- BashOperator: 셸 명령 실행
- SQLExecuteQueryOperator: SQL 쿼리 실행
- KubernetesPodOperator: Kubernetes Pod 실행
- SparkSubmitOperator: Spark 잡 제출
TaskFlow API를 이용한 ML 파이프라인
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule='@daily', start_date=datetime(2026, 1, 1))
def ml_training_pipeline():
@task
def extract_data():
# 데이터 추출
return {'rows': 10000, 'path': '/tmp/data.parquet'}
@task
def preprocess(info: dict):
# 전처리
import pandas as pd
df = pd.read_parquet(info['path'])
df_clean = df.dropna()
return df_clean.to_dict()
@task
def train_model(data: dict):
# 모델 학습
from sklearn.ensemble import GradientBoostingClassifier
import mlflow
with mlflow.start_run():
model = GradientBoostingClassifier()
# training logic here
mlflow.sklearn.log_model(model, "model")
return "model_trained"
@task
def validate_model(status: str):
# 모델 검증
if status == "model_trained":
return {"accuracy": 0.92, "passed": True}
return {"passed": False}
raw = extract_data()
processed = preprocess(raw)
result = train_model(processed)
validate_model(result)
dag_instance = ml_training_pipeline()
Airflow 베스트 프랙티스
- 멱등성(Idempotency): 동일 DAG 실행 결과가 항상 동일해야 합니다.
- 원자성(Atomicity): Task는 성공 또는 실패만 있어야 합니다.
- XCom 최소화: 대용량 데이터는 XCom 대신 외부 저장소를 사용합니다.
- SLA 설정: 중요 파이프라인에는 SLA(Service Level Agreement)를 설정합니다.
dbt (Data Build Tool)
dbt는 분석 엔지니어가 SQL로 데이터 변환 로직을 작성하고, 버전 관리, 테스트, 문서화를 할 수 있게 해주는 도구입니다.
dbt의 역할과 ELT 패턴
전통적인 ETL(Extract-Transform-Load)과 달리, dbt는 ELT(Extract-Load-Transform) 패턴을 채택합니다. 원시 데이터를 먼저 데이터 웨어하우스에 로드한 후, 웨어하우스 내부에서 SQL로 변환합니다.
모델 레이어 구조
models/
├── staging/ # 원시 소스 데이터 정제
│ ├── stg_orders.sql
│ └── stg_customers.sql
├── intermediate/ # 비즈니스 로직 중간 변환
│ └── int_order_items.sql
└── marts/ # 최종 비즈니스 용 테이블
├── fct_orders.sql
└── dim_customers.sql
Jinja 템플릿과 ref() 함수
dbt는 Jinja 템플릿 엔진을 사용하여 SQL에 동적 기능을 추가합니다:
-- models/marts/fct_orders.sql
SELECT
o.order_id,
o.customer_id,
c.customer_name,
o.total_amount,
o.created_at
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
WHERE o.status = 'completed'
ref() 함수는 모델 간 의존성을 자동으로 추론하여 올바른 순서로 실행합니다.
데이터 테스트와 문서화
# schema.yml
models:
- name: fct_orders
description: '완료된 주문 사실 테이블'
columns:
- name: order_id
description: '주문 고유 ID'
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
Delta Lake / Apache Iceberg
Delta Lake와 Apache Iceberg는 데이터 레이크에 ACID 트랜잭션을 가져다주는 오픈 테이블 포맷입니다.
핵심 기능
- ACID 트랜잭션: 동시 읽기/쓰기 작업에서 데이터 일관성 보장
- 타임 트래블: 특정 시점의 데이터 스냅샷으로 조회 가능
- 스키마 진화: 기존 데이터를 깨지 않고 스키마 변경 가능
- 데이터 버전 관리: 변경 이력 추적 및 롤백
Delta Lake Merge/Upsert 작업
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Upsert (Merge) 작업
delta_table = DeltaTable.forPath(spark, "/delta/users")
updates_df = spark.read.parquet("/tmp/updates/")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.user_id = source.user_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# 타임 트래블: 어제 데이터 조회
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2026-03-16") \
.load("/delta/users")
# 버전으로 조회
df_v1 = spark.read.format("delta") \
.option("versionAsOf", 1) \
.load("/delta/users")
# 변경 이력 확인
delta_table.history().show()
Z-Order 클러스터링
Z-Order는 관련 데이터를 같은 파일에 위치시켜 쿼리 성능을 향상시킵니다:
spark.sql("""
OPTIMIZE delta.`/delta/events`
ZORDER BY (user_id, event_date)
""")
피처 스토어 (Feature Store)
피처 스토어는 ML 피처를 중앙 집중식으로 관리하는 저장소입니다. 피처의 재사용, 일관성, 거버넌스를 보장합니다.
온라인 vs 오프라인 피처 스토어
오프라인 피처 스토어는 모델 학습에 사용되는 대규모 히스토리 피처를 저장합니다. Parquet, Delta Lake 형태로 데이터 레이크에 저장됩니다.
온라인 피처 스토어는 실시간 추론에 사용되는 최신 피처를 낮은 지연 시간으로 제공합니다. Redis, DynamoDB, Cassandra 같은 인메모리/NoSQL 데이터베이스에 저장됩니다.
Feast (오픈소스 피처 스토어)
from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64
# 피처 정의
user_stats = FeatureView(
name="user_stats",
entities=["user_id"],
schema=[
Field(name="avg_purchase_amount", dtype=Float32),
Field(name="purchase_count_30d", dtype=Int64),
],
ttl=timedelta(days=30),
)
# 온라인 피처 조회 (실시간 추론)
store = FeatureStore(repo_path=".")
features = store.get_online_features(
features=["user_stats:avg_purchase_amount", "user_stats:purchase_count_30d"],
entity_rows=[{"user_id": 1001}],
).to_dict()
관리형 피처 스토어
- Tecton: 엔터프라이즈급 피처 플랫폼, 실시간 변환 지원
- Vertex AI Feature Store: GCP 통합 관리형 서비스
- SageMaker Feature Store: AWS 통합 관리형 서비스
- Databricks Feature Store: Delta Lake 기반 피처 관리
데이터 품질 & 모니터링
데이터 품질은 AI 시스템의 신뢰성을 결정합니다. "Garbage in, garbage out"은 ML에서 특히 중요합니다.
Great Expectations 데이터 검증
Great Expectations는 데이터 파이프라인에 자동화된 데이터 검증을 추가하는 도구입니다:
import great_expectations as gx
context = gx.get_context()
datasource = context.sources.add_pandas("my_datasource")
# 기대값(Expectation) 정의
suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_suite"
)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("total_amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_unique("order_id")
validator.save_expectation_suite()
# 검증 실행
checkpoint = context.add_or_update_checkpoint(
name="orders_checkpoint",
validations=[{"batch_request": batch_request, "expectation_suite_name": "orders_suite"}],
)
result = checkpoint.run()
데이터 드리프트 감지
**데이터 드리프트(Data Drift)**는 프로덕션 데이터의 통계적 특성이 학습 데이터와 달라지는 현상입니다. Evidently AI, WhyLogs, Nannyml 같은 도구로 감지합니다:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=train_df, current_data=prod_df)
report.show(mode='inline')
데이터 계보 (Data Lineage) 추적
데이터 계보는 데이터의 출처부터 소비까지의 여정을 추적합니다. Apache Atlas, OpenLineage, Marquez 같은 도구를 사용합니다.
클라우드 데이터 플랫폼
AWS 데이터 스택
- Amazon S3: 오브젝트 스토리지, 데이터 레이크의 기반
- AWS Glue: 서버리스 ETL 서비스, 데이터 카탈로그
- Amazon Athena: S3 데이터를 SQL로 분석
- Amazon Redshift: 클라우드 데이터 웨어하우스
- Amazon EMR: 관리형 Hadoop/Spark 클러스터
GCP 데이터 스택
- Google BigQuery: 서버리스 데이터 웨어하우스, ML 내장
- Cloud Dataflow: Apache Beam 기반 데이터 처리
- Cloud Pub/Sub: 실시간 메시징 서비스 (Kafka 대안)
- Vertex AI: 통합 ML 플랫폼
Azure 데이터 스택
- Azure Data Lake Storage (ADLS): 계층적 네임스페이스 스토리지
- Azure Synapse Analytics: 통합 분석 서비스
- Azure Event Hub: 대규모 이벤트 스트리밍 (Kafka 호환)
- Azure Databricks: Spark 기반 분석 플랫폼
Snowflake 데이터 플랫폼
Snowflake는 멀티클라우드 데이터 플랫폼으로, 스토리지와 컴퓨팅을 분리한 아키텍처가 특징입니다:
- 가상 웨어하우스: 독립적으로 확장 가능한 컴퓨팅 클러스터
- 데이터 공유: 조직 간 안전한 데이터 공유
- Snowpipe: 자동화된 지속적 데이터 수집
- Cortex: 내장 AI/ML 기능
퀴즈
Q1. Lambda 아키텍처와 Kappa 아키텍처의 핵심 차이점은 무엇인가요?
정답: Lambda 아키텍처는 배치 레이어와 스피드 레이어를 모두 유지하는 반면, Kappa 아키텍처는 스트리밍 처리만 사용하여 단순화합니다.
설명: Lambda 아키텍처는 정확성(배치)과 실시간성(스트림)을 모두 제공하지만 두 레이어를 각각 유지해야 해서 복잡도가 높습니다. Kafka Streams, Apache Flink 같은 도구의 발전으로 스트리밍만으로도 충분한 정확성을 제공할 수 있게 되면서 Kappa 아키텍처가 현실적인 선택이 되었습니다.
Q2. Apache Spark에서 DataFrame과 RDD의 주요 차이점은?
정답: DataFrame은 스키마(Schema)가 있는 구조화된 데이터를 표현하며, Catalyst 옵티마이저를 통해 자동 쿼리 최적화가 이루어집니다. RDD는 타입이 없는 저수준 API로 더 세밀한 제어가 가능하지만 최적화는 사용자가 직접 해야 합니다.
설명: 대부분의 경우 DataFrame 또는 Dataset API를 사용하는 것이 권장됩니다. RDD는 커스텀 직렬화나 저수준 제어가 필요한 경우에만 사용합니다. Spark 3.0+에서는 Adaptive Query Execution(AQE)으로 런타임 최적화도 지원합니다.
Q3. Kafka에서 Consumer Group의 역할은 무엇인가요?
정답: Consumer Group은 동일한 Topic을 구독하는 컨슈머의 집합으로, 파티션을 각 컨슈머에게 분배하여 병렬 처리와 로드 밸런싱을 제공합니다.
설명: 하나의 파티션은 동일 Consumer Group 내 하나의 컨슈머만 읽을 수 있습니다. 컨슈머 수를 파티션 수보다 늘려도 추가 컨슈머는 유휴 상태가 됩니다. 서로 다른 Consumer Group은 동일 토픽을 독립적으로 읽을 수 있어, 같은 이벤트 스트림을 여러 목적으로 사용할 수 있습니다.
Q4. dbt에서 ref() 함수의 역할은 무엇인가요?
정답: ref() 함수는 다른 dbt 모델을 참조할 때 사용하며, 모델 간 의존성을 자동으로 추론하여 올바른 실행 순서를 결정합니다.
설명: ref()를 사용하면 dbt가 DAG(의존성 그래프)을 구성하고, 환경(개발/프로덕션)에 따라 올바른 데이터베이스 스키마와 테이블 이름을 자동으로 해결합니다. 이를 통해 코드 재사용성과 유지보수성이 크게 향상됩니다.
Q5. 피처 스토어에서 온라인 스토어와 오프라인 스토어의 차이점은?
정답: 오프라인 스토어는 모델 학습을 위한 대규모 히스토리 데이터를 저장하며(Delta Lake, Parquet), 온라인 스토어는 실시간 추론을 위한 최신 피처를 낮은 지연 시간으로 제공합니다(Redis, DynamoDB).
설명: 두 스토어를 분리하는 이유는 요구사항이 다르기 때문입니다. 학습은 수십억 행의 데이터를 배치로 처리하는 높은 처리량이 필요하고, 추론은 밀리초 단위의 낮은 지연 시간이 필요합니다. Feast 같은 피처 스토어는 두 스토어 간 피처 일관성을 보장하는 역할도 합니다.