Skip to content

Split View: 데이터 엔지니어링 & AI 파이프라인: Spark부터 Kafka까지 완전 가이드

|

데이터 엔지니어링 & AI 파이프라인: Spark부터 Kafka까지 완전 가이드

목차

  1. 데이터 엔지니어링 개요
  2. Apache Spark
  3. Apache Kafka
  4. Apache Airflow
  5. dbt (Data Build Tool)
  6. Delta Lake / Apache Iceberg
  7. 피처 스토어 (Feature Store)
  8. 데이터 품질 & 모니터링
  9. 클라우드 데이터 플랫폼
  10. 퀴즈

데이터 엔지니어링 개요

데이터 엔지니어링은 AI/ML 시스템의 근간을 이루는 분야입니다. 데이터 파이프라인을 설계하고 구축하여 분석 및 모델 학습에 적합한 형태로 데이터를 제공하는 것이 핵심 역할입니다.

역할 비교: 데이터 엔지니어 vs 데이터 사이언티스트 vs ML 엔지니어

역할주요 책임핵심 도구
데이터 엔지니어파이프라인 구축, 데이터 저장소 관리Spark, Kafka, Airflow, dbt
데이터 사이언티스트데이터 분석, 모델 개발Python, Jupyter, Pandas, Scikit-learn
ML 엔지니어모델 배포, 서빙 인프라MLflow, Kubeflow, TFX, Ray

현대 데이터 스택 아키텍처

현대 데이터 스택(Modern Data Stack)은 클라우드 중심으로 재편되었습니다:

  • 수집(Ingestion): Fivetran, Airbyte, Kafka
  • 저장(Storage): Snowflake, BigQuery, Redshift, Delta Lake
  • 변환(Transformation): dbt, Spark, Beam
  • 오케스트레이션(Orchestration): Airflow, Prefect, Dagster
  • 시각화(Visualization): Tableau, Looker, Metabase

배치 처리 vs 스트리밍 처리

**배치 처리(Batch Processing)**는 대량의 데이터를 주기적으로 처리합니다. 데이터 웨어하우스 ETL 작업, 일별 리포트 생성, 모델 재학습 등에 사용됩니다.

**스트리밍 처리(Stream Processing)**는 데이터가 생성되는 즉시 처리합니다. 실시간 사기 감지, 이상 탐지, 추천 시스템 업데이트 등에 적합합니다.

Lambda 아키텍처 vs Kappa 아키텍처

Lambda 아키텍처는 배치 레이어와 스피드 레이어를 병렬로 운영합니다. 정확한 배치 결과와 실시간 스트림 결과를 서빙 레이어에서 합칩니다. 복잡도가 높지만 정확성을 보장합니다.

Kappa 아키텍처는 스트리밍 레이어만 사용하여 단순화합니다. Kafka Streams, Flink 같은 도구의 발전으로 실용적인 선택이 되었습니다. 유지보수가 쉽고 일관성 있는 코드베이스를 유지할 수 있습니다.


Apache Spark

Apache Spark는 대규모 데이터 처리를 위한 통합 분석 엔진입니다. 인메모리 처리로 Hadoop MapReduce보다 최대 100배 빠른 성능을 제공합니다.

Spark 아키텍처

Spark 클러스터는 세 가지 핵심 컴포넌트로 구성됩니다:

  • Driver: SparkContext를 실행하고 작업을 조율하는 마스터 프로세스
  • Executor: 실제 데이터 처리를 수행하는 워커 프로세스
  • Cluster Manager: YARN, Kubernetes, Mesos 또는 Spark Standalone

RDD, DataFrame, Dataset API

**RDD (Resilient Distributed Dataset)**는 Spark의 기본 데이터 구조로, 분산 컬렉션에 대한 불변 연산을 제공합니다.

DataFrame은 스키마가 있는 분산 컬렉션으로, SQL과 유사한 API를 제공합니다. Catalyst 옵티마이저를 통해 자동으로 쿼리를 최적화합니다.

Dataset은 DataFrame의 타입 안전 버전으로, Scala/Java에서 컴파일 타임 타입 체크를 지원합니다.

PySpark를 이용한 AI 피처 엔지니어링

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

spark = SparkSession.builder \
    .appName("AI Feature Engineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# 데이터 로드 및 변환
df = spark.read.parquet("s3://data-lake/features/")
df_clean = df.dropna() \
    .withColumn("age_group", when(col("age") < 30, "young")
                              .when(col("age") < 50, "middle")
                              .otherwise("senior"))

# 집계 통계
stats_df = df_clean.groupBy("category").agg(
    count("*").alias("cnt"),
    avg("score").alias("avg_score")
)

# 피처 벡터 생성
assembler = VectorAssembler(
    inputCols=["age", "income", "score"],
    outputCol="features"
)
ml_df = assembler.transform(df_clean)

# 모델 학습
rf = RandomForestClassifier(numTrees=100, featuresCol="features")
model = rf.fit(ml_df)

Spark SQL 최적화 팁

  • 파티셔닝: 자주 필터링하는 컬럼으로 파티션을 나눕니다.
  • Broadcast Join: 작은 테이블은 브로드캐스트 조인을 사용합니다.
  • 캐싱: 반복 사용하는 DataFrame은 cache() 또는 persist()로 메모리에 유지합니다.
  • AQE (Adaptive Query Execution): Spark 3.0+의 동적 쿼리 최적화를 활용합니다.

Apache Kafka

Apache Kafka는 고처리량, 내결함성을 갖춘 분산 이벤트 스트리밍 플랫폼입니다. LinkedIn이 개발하여 오픈소스로 공개했으며, 실시간 AI 파이프라인의 핵심 인프라로 자리잡았습니다.

Kafka 아키텍처

  • Broker: 메시지를 저장하고 전달하는 서버. 클러스터는 여러 Broker로 구성됩니다.
  • Topic: 메시지가 발행되는 카테고리/피드 이름
  • Partition: Topic을 분할하여 병렬 처리를 가능하게 합니다. 각 파티션은 순서 보장.
  • Consumer Group: 동일 Topic을 공유하는 컨슈머 집합. 로드 밸런싱 제공.
  • Offset: 파티션 내 메시지의 고유 위치 식별자

프로듀서/컨슈머 패턴

from kafka import KafkaProducer, KafkaConsumer
import json
import numpy as np

# 프로듀서: 실시간 피처 데이터 전송
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_feature(data):
    producer.send('ml-features', value={
        'timestamp': data['ts'],
        'features': data['features'].tolist(),
        'user_id': data['user_id']
    })

# 컨슈머: 실시간 AI 추론
consumer = KafkaConsumer(
    'ml-features',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

import onnxruntime as ort
session = ort.InferenceSession('model.onnx')

for msg in consumer:
    features = np.array(msg.value['features']).reshape(1, -1)
    prediction = session.run(None, {'input': features.astype(np.float32)})[0]
    print(f"User {msg.value['user_id']}: {prediction}")

Kafka Streams와 ksqlDB

Kafka Streams는 Java/Scala 라이브러리로, Kafka 토픽에서 읽어 변환 후 다시 Kafka에 쓰는 스트림 처리를 지원합니다. ksqlDB는 SQL을 사용해 스트리밍 쿼리를 작성할 수 있게 해주는 이벤트 스트리밍 데이터베이스입니다.

실시간 AI 파이프라인에서 Kafka의 역할:

  1. 이벤트 소스 (사용자 행동, 센서 데이터 등)
  2. 피처 엔지니어링 결과 전달
  3. 모델 예측 결과 발행
  4. A/B 테스트 결과 수집

Apache Airflow

Apache Airflow는 파이프라인을 프로그래밍 방식으로 정의, 스케줄링, 모니터링하는 워크플로우 관리 플랫폼입니다.

DAG (Directed Acyclic Graph) 개념

DAG은 방향이 있고 순환이 없는 그래프로, Airflow에서 워크플로우를 표현하는 방법입니다. 각 노드는 Task이며, 엣지는 의존성(실행 순서)을 나타냅니다.

Operator 유형

  • PythonOperator: Python 함수 실행
  • BashOperator: 셸 명령 실행
  • SQLExecuteQueryOperator: SQL 쿼리 실행
  • KubernetesPodOperator: Kubernetes Pod 실행
  • SparkSubmitOperator: Spark 잡 제출

TaskFlow API를 이용한 ML 파이프라인

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2026, 1, 1))
def ml_training_pipeline():

    @task
    def extract_data():
        # 데이터 추출
        return {'rows': 10000, 'path': '/tmp/data.parquet'}

    @task
    def preprocess(info: dict):
        # 전처리
        import pandas as pd
        df = pd.read_parquet(info['path'])
        df_clean = df.dropna()
        return df_clean.to_dict()

    @task
    def train_model(data: dict):
        # 모델 학습
        from sklearn.ensemble import GradientBoostingClassifier
        import mlflow

        with mlflow.start_run():
            model = GradientBoostingClassifier()
            # training logic here
            mlflow.sklearn.log_model(model, "model")
        return "model_trained"

    @task
    def validate_model(status: str):
        # 모델 검증
        if status == "model_trained":
            return {"accuracy": 0.92, "passed": True}
        return {"passed": False}

    raw = extract_data()
    processed = preprocess(raw)
    result = train_model(processed)
    validate_model(result)

dag_instance = ml_training_pipeline()

Airflow 베스트 프랙티스

  • 멱등성(Idempotency): 동일 DAG 실행 결과가 항상 동일해야 합니다.
  • 원자성(Atomicity): Task는 성공 또는 실패만 있어야 합니다.
  • XCom 최소화: 대용량 데이터는 XCom 대신 외부 저장소를 사용합니다.
  • SLA 설정: 중요 파이프라인에는 SLA(Service Level Agreement)를 설정합니다.

dbt (Data Build Tool)

dbt는 분석 엔지니어가 SQL로 데이터 변환 로직을 작성하고, 버전 관리, 테스트, 문서화를 할 수 있게 해주는 도구입니다.

dbt의 역할과 ELT 패턴

전통적인 ETL(Extract-Transform-Load)과 달리, dbt는 ELT(Extract-Load-Transform) 패턴을 채택합니다. 원시 데이터를 먼저 데이터 웨어하우스에 로드한 후, 웨어하우스 내부에서 SQL로 변환합니다.

모델 레이어 구조

models/
├── staging/          # 원시 소스 데이터 정제
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/     # 비즈니스 로직 중간 변환
│   └── int_order_items.sql
└── marts/           # 최종 비즈니스 용 테이블
    ├── fct_orders.sql
    └── dim_customers.sql

Jinja 템플릿과 ref() 함수

dbt는 Jinja 템플릿 엔진을 사용하여 SQL에 동적 기능을 추가합니다:

-- models/marts/fct_orders.sql
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.total_amount,
    o.created_at
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id
WHERE o.status = 'completed'

ref() 함수는 모델 간 의존성을 자동으로 추론하여 올바른 순서로 실행합니다.

데이터 테스트와 문서화

# schema.yml
models:
  - name: fct_orders
    description: '완료된 주문 사실 테이블'
    columns:
      - name: order_id
        description: '주문 고유 ID'
        tests:
          - unique
          - not_null
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

Delta Lake / Apache Iceberg

Delta Lake와 Apache Iceberg는 데이터 레이크에 ACID 트랜잭션을 가져다주는 오픈 테이블 포맷입니다.

핵심 기능

  • ACID 트랜잭션: 동시 읽기/쓰기 작업에서 데이터 일관성 보장
  • 타임 트래블: 특정 시점의 데이터 스냅샷으로 조회 가능
  • 스키마 진화: 기존 데이터를 깨지 않고 스키마 변경 가능
  • 데이터 버전 관리: 변경 이력 추적 및 롤백

Delta Lake Merge/Upsert 작업

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Upsert (Merge) 작업
delta_table = DeltaTable.forPath(spark, "/delta/users")
updates_df = spark.read.parquet("/tmp/updates/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.user_id = source.user_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# 타임 트래블: 어제 데이터 조회
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2026-03-16") \
    .load("/delta/users")

# 버전으로 조회
df_v1 = spark.read.format("delta") \
    .option("versionAsOf", 1) \
    .load("/delta/users")

# 변경 이력 확인
delta_table.history().show()

Z-Order 클러스터링

Z-Order는 관련 데이터를 같은 파일에 위치시켜 쿼리 성능을 향상시킵니다:

spark.sql("""
    OPTIMIZE delta.`/delta/events`
    ZORDER BY (user_id, event_date)
""")

피처 스토어 (Feature Store)

피처 스토어는 ML 피처를 중앙 집중식으로 관리하는 저장소입니다. 피처의 재사용, 일관성, 거버넌스를 보장합니다.

온라인 vs 오프라인 피처 스토어

오프라인 피처 스토어는 모델 학습에 사용되는 대규모 히스토리 피처를 저장합니다. Parquet, Delta Lake 형태로 데이터 레이크에 저장됩니다.

온라인 피처 스토어는 실시간 추론에 사용되는 최신 피처를 낮은 지연 시간으로 제공합니다. Redis, DynamoDB, Cassandra 같은 인메모리/NoSQL 데이터베이스에 저장됩니다.

Feast (오픈소스 피처 스토어)

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64

# 피처 정의
user_stats = FeatureView(
    name="user_stats",
    entities=["user_id"],
    schema=[
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="purchase_count_30d", dtype=Int64),
    ],
    ttl=timedelta(days=30),
)

# 온라인 피처 조회 (실시간 추론)
store = FeatureStore(repo_path=".")
features = store.get_online_features(
    features=["user_stats:avg_purchase_amount", "user_stats:purchase_count_30d"],
    entity_rows=[{"user_id": 1001}],
).to_dict()

관리형 피처 스토어

  • Tecton: 엔터프라이즈급 피처 플랫폼, 실시간 변환 지원
  • Vertex AI Feature Store: GCP 통합 관리형 서비스
  • SageMaker Feature Store: AWS 통합 관리형 서비스
  • Databricks Feature Store: Delta Lake 기반 피처 관리

데이터 품질 & 모니터링

데이터 품질은 AI 시스템의 신뢰성을 결정합니다. "Garbage in, garbage out"은 ML에서 특히 중요합니다.

Great Expectations 데이터 검증

Great Expectations는 데이터 파이프라인에 자동화된 데이터 검증을 추가하는 도구입니다:

import great_expectations as gx

context = gx.get_context()
datasource = context.sources.add_pandas("my_datasource")

# 기대값(Expectation) 정의
suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_suite"
)

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("total_amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_unique("order_id")
validator.save_expectation_suite()

# 검증 실행
checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[{"batch_request": batch_request, "expectation_suite_name": "orders_suite"}],
)
result = checkpoint.run()

데이터 드리프트 감지

**데이터 드리프트(Data Drift)**는 프로덕션 데이터의 통계적 특성이 학습 데이터와 달라지는 현상입니다. Evidently AI, WhyLogs, Nannyml 같은 도구로 감지합니다:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=train_df, current_data=prod_df)
report.show(mode='inline')

데이터 계보 (Data Lineage) 추적

데이터 계보는 데이터의 출처부터 소비까지의 여정을 추적합니다. Apache Atlas, OpenLineage, Marquez 같은 도구를 사용합니다.


클라우드 데이터 플랫폼

AWS 데이터 스택

  • Amazon S3: 오브젝트 스토리지, 데이터 레이크의 기반
  • AWS Glue: 서버리스 ETL 서비스, 데이터 카탈로그
  • Amazon Athena: S3 데이터를 SQL로 분석
  • Amazon Redshift: 클라우드 데이터 웨어하우스
  • Amazon EMR: 관리형 Hadoop/Spark 클러스터

GCP 데이터 스택

  • Google BigQuery: 서버리스 데이터 웨어하우스, ML 내장
  • Cloud Dataflow: Apache Beam 기반 데이터 처리
  • Cloud Pub/Sub: 실시간 메시징 서비스 (Kafka 대안)
  • Vertex AI: 통합 ML 플랫폼

Azure 데이터 스택

  • Azure Data Lake Storage (ADLS): 계층적 네임스페이스 스토리지
  • Azure Synapse Analytics: 통합 분석 서비스
  • Azure Event Hub: 대규모 이벤트 스트리밍 (Kafka 호환)
  • Azure Databricks: Spark 기반 분석 플랫폼

Snowflake 데이터 플랫폼

Snowflake는 멀티클라우드 데이터 플랫폼으로, 스토리지와 컴퓨팅을 분리한 아키텍처가 특징입니다:

  • 가상 웨어하우스: 독립적으로 확장 가능한 컴퓨팅 클러스터
  • 데이터 공유: 조직 간 안전한 데이터 공유
  • Snowpipe: 자동화된 지속적 데이터 수집
  • Cortex: 내장 AI/ML 기능

퀴즈

Q1. Lambda 아키텍처와 Kappa 아키텍처의 핵심 차이점은 무엇인가요?

정답: Lambda 아키텍처는 배치 레이어와 스피드 레이어를 모두 유지하는 반면, Kappa 아키텍처는 스트리밍 처리만 사용하여 단순화합니다.

설명: Lambda 아키텍처는 정확성(배치)과 실시간성(스트림)을 모두 제공하지만 두 레이어를 각각 유지해야 해서 복잡도가 높습니다. Kafka Streams, Apache Flink 같은 도구의 발전으로 스트리밍만으로도 충분한 정확성을 제공할 수 있게 되면서 Kappa 아키텍처가 현실적인 선택이 되었습니다.

Q2. Apache Spark에서 DataFrame과 RDD의 주요 차이점은?

정답: DataFrame은 스키마(Schema)가 있는 구조화된 데이터를 표현하며, Catalyst 옵티마이저를 통해 자동 쿼리 최적화가 이루어집니다. RDD는 타입이 없는 저수준 API로 더 세밀한 제어가 가능하지만 최적화는 사용자가 직접 해야 합니다.

설명: 대부분의 경우 DataFrame 또는 Dataset API를 사용하는 것이 권장됩니다. RDD는 커스텀 직렬화나 저수준 제어가 필요한 경우에만 사용합니다. Spark 3.0+에서는 Adaptive Query Execution(AQE)으로 런타임 최적화도 지원합니다.

Q3. Kafka에서 Consumer Group의 역할은 무엇인가요?

정답: Consumer Group은 동일한 Topic을 구독하는 컨슈머의 집합으로, 파티션을 각 컨슈머에게 분배하여 병렬 처리와 로드 밸런싱을 제공합니다.

설명: 하나의 파티션은 동일 Consumer Group 내 하나의 컨슈머만 읽을 수 있습니다. 컨슈머 수를 파티션 수보다 늘려도 추가 컨슈머는 유휴 상태가 됩니다. 서로 다른 Consumer Group은 동일 토픽을 독립적으로 읽을 수 있어, 같은 이벤트 스트림을 여러 목적으로 사용할 수 있습니다.

Q4. dbt에서 ref() 함수의 역할은 무엇인가요?

정답: ref() 함수는 다른 dbt 모델을 참조할 때 사용하며, 모델 간 의존성을 자동으로 추론하여 올바른 실행 순서를 결정합니다.

설명: ref()를 사용하면 dbt가 DAG(의존성 그래프)을 구성하고, 환경(개발/프로덕션)에 따라 올바른 데이터베이스 스키마와 테이블 이름을 자동으로 해결합니다. 이를 통해 코드 재사용성과 유지보수성이 크게 향상됩니다.

Q5. 피처 스토어에서 온라인 스토어와 오프라인 스토어의 차이점은?

정답: 오프라인 스토어는 모델 학습을 위한 대규모 히스토리 데이터를 저장하며(Delta Lake, Parquet), 온라인 스토어는 실시간 추론을 위한 최신 피처를 낮은 지연 시간으로 제공합니다(Redis, DynamoDB).

설명: 두 스토어를 분리하는 이유는 요구사항이 다르기 때문입니다. 학습은 수십억 행의 데이터를 배치로 처리하는 높은 처리량이 필요하고, 추론은 밀리초 단위의 낮은 지연 시간이 필요합니다. Feast 같은 피처 스토어는 두 스토어 간 피처 일관성을 보장하는 역할도 합니다.


참고 자료

Data Engineering & AI Pipeline Guide: From Apache Spark to Kafka

Table of Contents

  1. Data Engineering Overview
  2. Apache Spark
  3. Apache Kafka
  4. Apache Airflow
  5. dbt (Data Build Tool)
  6. Delta Lake / Apache Iceberg
  7. Feature Store
  8. Data Quality & Monitoring
  9. Cloud Data Platforms
  10. Quiz

Data Engineering Overview

Data engineering forms the backbone of any AI/ML system. The core responsibility is designing and building data pipelines that deliver data in the right form for analysis and model training.

Role Comparison: Data Engineer vs Data Scientist vs ML Engineer

RolePrimary ResponsibilitiesCore Tools
Data EngineerPipeline construction, data store managementSpark, Kafka, Airflow, dbt
Data ScientistData analysis, model developmentPython, Jupyter, Pandas, Scikit-learn
ML EngineerModel deployment, serving infrastructureMLflow, Kubeflow, TFX, Ray

Modern Data Stack Architecture

The Modern Data Stack has been reshaped around cloud-native tools:

  • Ingestion: Fivetran, Airbyte, Kafka
  • Storage: Snowflake, BigQuery, Redshift, Delta Lake
  • Transformation: dbt, Spark, Beam
  • Orchestration: Airflow, Prefect, Dagster
  • Visualization: Tableau, Looker, Metabase

Batch Processing vs Stream Processing

Batch Processing handles large volumes of data periodically. It is used for data warehouse ETL jobs, daily report generation, and model retraining.

Stream Processing handles data as soon as it is generated. It is suited for real-time fraud detection, anomaly detection, and recommendation system updates.

Lambda Architecture vs Kappa Architecture

Lambda Architecture runs batch and speed layers in parallel. Accurate batch results and real-time stream results are combined at the serving layer. It provides high accuracy at the cost of operational complexity.

Kappa Architecture simplifies by using only a streaming layer. Advances in tools like Kafka Streams and Apache Flink have made this approach practical. It maintains a consistent, easier-to-manage codebase.


Apache Spark

Apache Spark is a unified analytics engine for large-scale data processing. Its in-memory processing provides up to 100x better performance than Hadoop MapReduce.

Spark Architecture

A Spark cluster consists of three core components:

  • Driver: The master process that runs SparkContext and orchestrates jobs
  • Executor: Worker processes that perform actual data processing
  • Cluster Manager: YARN, Kubernetes, Mesos, or Spark Standalone

RDD, DataFrame, and Dataset APIs

RDD (Resilient Distributed Dataset) is Spark's fundamental data structure, providing immutable operations on distributed collections.

DataFrame is a distributed collection with a schema, offering a SQL-like API. It automatically optimizes queries through the Catalyst optimizer.

Dataset is the type-safe version of DataFrame, supporting compile-time type checking in Scala and Java.

PySpark for AI Feature Engineering

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

spark = SparkSession.builder \
    .appName("AI Feature Engineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Load and transform data
df = spark.read.parquet("s3://data-lake/features/")
df_clean = df.dropna() \
    .withColumn("age_group", when(col("age") < 30, "young")
                              .when(col("age") < 50, "middle")
                              .otherwise("senior"))

# Aggregate statistics
stats_df = df_clean.groupBy("category").agg(
    count("*").alias("cnt"),
    avg("score").alias("avg_score")
)

# Build feature vector
assembler = VectorAssembler(
    inputCols=["age", "income", "score"],
    outputCol="features"
)
ml_df = assembler.transform(df_clean)

# Train model
rf = RandomForestClassifier(numTrees=100, featuresCol="features")
model = rf.fit(ml_df)

Spark SQL Optimization Tips

  • Partitioning: Partition data by columns you frequently filter on.
  • Broadcast Join: Use broadcast joins for small tables.
  • Caching: Use cache() or persist() for DataFrames reused multiple times.
  • AQE (Adaptive Query Execution): Leverage Spark 3.0+ dynamic query optimization.

Apache Kafka

Apache Kafka is a high-throughput, fault-tolerant distributed event streaming platform. Developed by LinkedIn and later open-sourced, it has become the core infrastructure for real-time AI pipelines.

Kafka Architecture

  • Broker: A server that stores and delivers messages. A cluster consists of multiple brokers.
  • Topic: The category or feed name where messages are published
  • Partition: Divides a topic to enable parallel processing. Each partition guarantees ordering.
  • Consumer Group: A set of consumers sharing the same topic, providing load balancing.
  • Offset: A unique position identifier for a message within a partition

Producer/Consumer Pattern

from kafka import KafkaProducer, KafkaConsumer
import json
import numpy as np

# Producer: send real-time feature data
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_feature(data):
    producer.send('ml-features', value={
        'timestamp': data['ts'],
        'features': data['features'].tolist(),
        'user_id': data['user_id']
    })

# Consumer: real-time AI inference
consumer = KafkaConsumer(
    'ml-features',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

import onnxruntime as ort
session = ort.InferenceSession('model.onnx')

for msg in consumer:
    features = np.array(msg.value['features']).reshape(1, -1)
    prediction = session.run(None, {'input': features.astype(np.float32)})[0]
    print(f"User {msg.value['user_id']}: {prediction}")

Kafka Streams and ksqlDB

Kafka Streams is a Java/Scala library that enables stream processing from Kafka topics — reading, transforming, and writing back to Kafka. ksqlDB is an event streaming database that lets you write streaming queries using SQL.

Kafka's role in real-time AI pipelines:

  1. Event source (user behavior, sensor data, etc.)
  2. Feature engineering results transport
  3. Model prediction publishing
  4. A/B test result collection

Apache Airflow

Apache Airflow is a workflow management platform for programmatically defining, scheduling, and monitoring pipelines.

DAG (Directed Acyclic Graph) Concept

A DAG is a directed graph with no cycles, representing a workflow in Airflow. Each node is a Task, and edges represent dependencies (execution order).

Operator Types

  • PythonOperator: Execute a Python function
  • BashOperator: Execute a shell command
  • SQLExecuteQueryOperator: Execute a SQL query
  • KubernetesPodOperator: Run a Kubernetes pod
  • SparkSubmitOperator: Submit a Spark job

ML Pipeline with TaskFlow API

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2026, 1, 1))
def ml_training_pipeline():

    @task
    def extract_data():
        # Extract data
        return {'rows': 10000, 'path': '/tmp/data.parquet'}

    @task
    def preprocess(info: dict):
        # Preprocessing
        import pandas as pd
        df = pd.read_parquet(info['path'])
        df_clean = df.dropna()
        return df_clean.to_dict()

    @task
    def train_model(data: dict):
        # Model training
        from sklearn.ensemble import GradientBoostingClassifier
        import mlflow

        with mlflow.start_run():
            model = GradientBoostingClassifier()
            # training logic here
            mlflow.sklearn.log_model(model, "model")
        return "model_trained"

    @task
    def validate_model(status: str):
        # Model validation
        if status == "model_trained":
            return {"accuracy": 0.92, "passed": True}
        return {"passed": False}

    raw = extract_data()
    processed = preprocess(raw)
    result = train_model(processed)
    validate_model(result)

dag_instance = ml_training_pipeline()

Airflow Best Practices

  • Idempotency: Running the same DAG multiple times should always produce the same result.
  • Atomicity: A task should either succeed or fail completely.
  • Minimize XCom: Use external storage instead of XCom for large data.
  • SLA settings: Set SLAs (Service Level Agreements) for critical pipelines.

dbt (Data Build Tool)

dbt enables analytics engineers to write data transformation logic in SQL, with version control, testing, and documentation built in.

dbt's Role and the ELT Pattern

Unlike traditional ETL (Extract-Transform-Load), dbt embraces the ELT (Extract-Load-Transform) pattern. Raw data is first loaded into the data warehouse, then transformed within it using SQL.

Model Layer Structure

models/
├── staging/          # Clean raw source data
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/     # Intermediate business logic transforms
│   └── int_order_items.sql
└── marts/           # Final business-facing tables
    ├── fct_orders.sql
    └── dim_customers.sql

Jinja Templating and the ref() Function

dbt uses the Jinja templating engine to add dynamic capabilities to SQL:

-- models/marts/fct_orders.sql
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.total_amount,
    o.created_at
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id
WHERE o.status = 'completed'

The ref() function automatically infers dependencies between models and executes them in the correct order.

Data Testing and Documentation

# schema.yml
models:
  - name: fct_orders
    description: 'Completed orders fact table'
    columns:
      - name: order_id
        description: 'Unique order ID'
        tests:
          - unique
          - not_null
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

Delta Lake / Apache Iceberg

Delta Lake and Apache Iceberg are open table formats that bring ACID transactions to the data lake.

Key Features

  • ACID Transactions: Ensures data consistency under concurrent reads and writes
  • Time Travel: Query data snapshots at any point in time
  • Schema Evolution: Change schemas without breaking existing data
  • Data Versioning: Track change history and enable rollback

Delta Lake Merge/Upsert Operations

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Upsert (Merge) operation
delta_table = DeltaTable.forPath(spark, "/delta/users")
updates_df = spark.read.parquet("/tmp/updates/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.user_id = source.user_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time Travel: query yesterday's data
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2026-03-16") \
    .load("/delta/users")

# Query by version
df_v1 = spark.read.format("delta") \
    .option("versionAsOf", 1) \
    .load("/delta/users")

# View change history
delta_table.history().show()

Z-Order Clustering

Z-Order co-locates related data in the same files to improve query performance:

spark.sql("""
    OPTIMIZE delta.`/delta/events`
    ZORDER BY (user_id, event_date)
""")

Feature Store

A feature store is a centralized repository for managing ML features. It ensures feature reuse, consistency, and governance across the ML lifecycle.

Online vs Offline Feature Store

Offline Feature Store stores large-scale historical features used for model training. Data is stored in a data lake as Parquet or Delta Lake format.

Online Feature Store serves the latest features for real-time inference with low latency. Data is stored in in-memory or NoSQL databases such as Redis, DynamoDB, or Cassandra.

Feast (Open-Source Feature Store)

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64

# Define feature view
user_stats = FeatureView(
    name="user_stats",
    entities=["user_id"],
    schema=[
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="purchase_count_30d", dtype=Int64),
    ],
    ttl=timedelta(days=30),
)

# Online feature retrieval (real-time inference)
store = FeatureStore(repo_path=".")
features = store.get_online_features(
    features=["user_stats:avg_purchase_amount", "user_stats:purchase_count_30d"],
    entity_rows=[{"user_id": 1001}],
).to_dict()

Managed Feature Stores

  • Tecton: Enterprise-grade feature platform with real-time transformation support
  • Vertex AI Feature Store: GCP managed integration
  • SageMaker Feature Store: AWS managed integration
  • Databricks Feature Store: Delta Lake-based feature management

Data Quality & Monitoring

Data quality determines the reliability of AI systems. "Garbage in, garbage out" is especially critical in ML.

Great Expectations for Data Validation

Great Expectations adds automated data validation to data pipelines:

import great_expectations as gx

context = gx.get_context()
datasource = context.sources.add_pandas("my_datasource")

# Define expectations
suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_suite"
)

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("total_amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_unique("order_id")
validator.save_expectation_suite()

# Run validation
checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[{"batch_request": batch_request, "expectation_suite_name": "orders_suite"}],
)
result = checkpoint.run()

Data Drift Detection

Data drift occurs when the statistical properties of production data diverge from training data. Tools like Evidently AI, WhyLogs, and Nannyml detect this:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=train_df, current_data=prod_df)
report.show(mode='inline')

Data Lineage Tracking

Data lineage tracks the full journey of data from source to consumption. Tools like Apache Atlas, OpenLineage, and Marquez are commonly used for this purpose.


Cloud Data Platforms

AWS Data Stack

  • Amazon S3: Object storage, foundation of the data lake
  • AWS Glue: Serverless ETL service and data catalog
  • Amazon Athena: SQL analytics directly on S3 data
  • Amazon Redshift: Cloud data warehouse
  • Amazon EMR: Managed Hadoop/Spark cluster

GCP Data Stack

  • Google BigQuery: Serverless data warehouse with built-in ML
  • Cloud Dataflow: Apache Beam-based data processing
  • Cloud Pub/Sub: Real-time messaging service (Kafka alternative)
  • Vertex AI: Unified ML platform

Azure Data Stack

  • Azure Data Lake Storage (ADLS): Hierarchical namespace storage
  • Azure Synapse Analytics: Integrated analytics service
  • Azure Event Hub: Large-scale event streaming (Kafka compatible)
  • Azure Databricks: Spark-based analytics platform

Snowflake Data Platform

Snowflake is a multi-cloud data platform with a distinctive architecture that separates storage and compute:

  • Virtual Warehouses: Independently scalable compute clusters
  • Data Sharing: Secure data sharing across organizations
  • Snowpipe: Automated continuous data ingestion
  • Cortex: Built-in AI/ML capabilities

Quiz

Q1. What is the key difference between Lambda and Kappa architectures?

Answer: Lambda Architecture maintains both a batch layer and a speed layer, whereas Kappa Architecture simplifies by using only a streaming layer.

Explanation: Lambda Architecture provides both accuracy (batch) and real-time results (stream), but requires maintaining two separate layers, increasing operational complexity. Advances in tools like Kafka Streams and Apache Flink have made streaming alone sufficiently accurate, turning Kappa Architecture into a practical choice.

Q2. What is the main difference between DataFrame and RDD in Apache Spark?

Answer: DataFrame represents structured data with a schema and benefits from automatic query optimization via the Catalyst optimizer. RDD is a lower-level, schema-less API offering finer control, but optimization is the developer's responsibility.

Explanation: In most cases, the DataFrame or Dataset API is recommended. Use RDD only when custom serialization or low-level control is needed. Spark 3.0+ also supports runtime optimization via Adaptive Query Execution (AQE).

Q3. What is the role of a Consumer Group in Kafka?

Answer: A Consumer Group is a set of consumers subscribing to the same topic, distributing partitions across consumers to provide parallel processing and load balancing.

Explanation: Within the same Consumer Group, each partition is consumed by exactly one consumer. Adding more consumers than partitions leaves excess consumers idle. Different Consumer Groups can independently read the same topic, enabling the same event stream to be used for multiple purposes.

Q4. What does the ref() function do in dbt?

Answer: The ref() function is used to reference another dbt model, automatically inferring inter-model dependencies and determining the correct execution order.

Explanation: Using ref() allows dbt to build a DAG (dependency graph) and automatically resolve the correct database schema and table name per environment (development/production). This significantly improves code reusability and maintainability.

Q5. What is the difference between an online store and an offline store in a feature store?

Answer: The offline store holds large-scale historical data for model training (Delta Lake, Parquet), while the online store serves the latest features for real-time inference with low latency (Redis, DynamoDB).

Explanation: The two stores are separated because their requirements differ. Training requires high throughput to process billions of rows in batch, while inference requires millisecond-level latency. Feature stores like Feast also ensure feature consistency between the two stores.


References