Skip to content
Published on

Data Engineering & AI Pipeline Guide: From Apache Spark to Kafka

Authors

Table of Contents

  1. Data Engineering Overview
  2. Apache Spark
  3. Apache Kafka
  4. Apache Airflow
  5. dbt (Data Build Tool)
  6. Delta Lake / Apache Iceberg
  7. Feature Store
  8. Data Quality & Monitoring
  9. Cloud Data Platforms
  10. Quiz

Data Engineering Overview

Data engineering forms the backbone of any AI/ML system. The core responsibility is designing and building data pipelines that deliver data in the right form for analysis and model training.

Role Comparison: Data Engineer vs Data Scientist vs ML Engineer

RolePrimary ResponsibilitiesCore Tools
Data EngineerPipeline construction, data store managementSpark, Kafka, Airflow, dbt
Data ScientistData analysis, model developmentPython, Jupyter, Pandas, Scikit-learn
ML EngineerModel deployment, serving infrastructureMLflow, Kubeflow, TFX, Ray

Modern Data Stack Architecture

The Modern Data Stack has been reshaped around cloud-native tools:

  • Ingestion: Fivetran, Airbyte, Kafka
  • Storage: Snowflake, BigQuery, Redshift, Delta Lake
  • Transformation: dbt, Spark, Beam
  • Orchestration: Airflow, Prefect, Dagster
  • Visualization: Tableau, Looker, Metabase

Batch Processing vs Stream Processing

Batch Processing handles large volumes of data periodically. It is used for data warehouse ETL jobs, daily report generation, and model retraining.

Stream Processing handles data as soon as it is generated. It is suited for real-time fraud detection, anomaly detection, and recommendation system updates.

Lambda Architecture vs Kappa Architecture

Lambda Architecture runs batch and speed layers in parallel. Accurate batch results and real-time stream results are combined at the serving layer. It provides high accuracy at the cost of operational complexity.

Kappa Architecture simplifies by using only a streaming layer. Advances in tools like Kafka Streams and Apache Flink have made this approach practical. It maintains a consistent, easier-to-manage codebase.


Apache Spark

Apache Spark is a unified analytics engine for large-scale data processing. Its in-memory processing provides up to 100x better performance than Hadoop MapReduce.

Spark Architecture

A Spark cluster consists of three core components:

  • Driver: The master process that runs SparkContext and orchestrates jobs
  • Executor: Worker processes that perform actual data processing
  • Cluster Manager: YARN, Kubernetes, Mesos, or Spark Standalone

RDD, DataFrame, and Dataset APIs

RDD (Resilient Distributed Dataset) is Spark's fundamental data structure, providing immutable operations on distributed collections.

DataFrame is a distributed collection with a schema, offering a SQL-like API. It automatically optimizes queries through the Catalyst optimizer.

Dataset is the type-safe version of DataFrame, supporting compile-time type checking in Scala and Java.

PySpark for AI Feature Engineering

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

spark = SparkSession.builder \
    .appName("AI Feature Engineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Load and transform data
df = spark.read.parquet("s3://data-lake/features/")
df_clean = df.dropna() \
    .withColumn("age_group", when(col("age") < 30, "young")
                              .when(col("age") < 50, "middle")
                              .otherwise("senior"))

# Aggregate statistics
stats_df = df_clean.groupBy("category").agg(
    count("*").alias("cnt"),
    avg("score").alias("avg_score")
)

# Build feature vector
assembler = VectorAssembler(
    inputCols=["age", "income", "score"],
    outputCol="features"
)
ml_df = assembler.transform(df_clean)

# Train model
rf = RandomForestClassifier(numTrees=100, featuresCol="features")
model = rf.fit(ml_df)

Spark SQL Optimization Tips

  • Partitioning: Partition data by columns you frequently filter on.
  • Broadcast Join: Use broadcast joins for small tables.
  • Caching: Use cache() or persist() for DataFrames reused multiple times.
  • AQE (Adaptive Query Execution): Leverage Spark 3.0+ dynamic query optimization.

Apache Kafka

Apache Kafka is a high-throughput, fault-tolerant distributed event streaming platform. Developed by LinkedIn and later open-sourced, it has become the core infrastructure for real-time AI pipelines.

Kafka Architecture

  • Broker: A server that stores and delivers messages. A cluster consists of multiple brokers.
  • Topic: The category or feed name where messages are published
  • Partition: Divides a topic to enable parallel processing. Each partition guarantees ordering.
  • Consumer Group: A set of consumers sharing the same topic, providing load balancing.
  • Offset: A unique position identifier for a message within a partition

Producer/Consumer Pattern

from kafka import KafkaProducer, KafkaConsumer
import json
import numpy as np

# Producer: send real-time feature data
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_feature(data):
    producer.send('ml-features', value={
        'timestamp': data['ts'],
        'features': data['features'].tolist(),
        'user_id': data['user_id']
    })

# Consumer: real-time AI inference
consumer = KafkaConsumer(
    'ml-features',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

import onnxruntime as ort
session = ort.InferenceSession('model.onnx')

for msg in consumer:
    features = np.array(msg.value['features']).reshape(1, -1)
    prediction = session.run(None, {'input': features.astype(np.float32)})[0]
    print(f"User {msg.value['user_id']}: {prediction}")

Kafka Streams and ksqlDB

Kafka Streams is a Java/Scala library that enables stream processing from Kafka topics — reading, transforming, and writing back to Kafka. ksqlDB is an event streaming database that lets you write streaming queries using SQL.

Kafka's role in real-time AI pipelines:

  1. Event source (user behavior, sensor data, etc.)
  2. Feature engineering results transport
  3. Model prediction publishing
  4. A/B test result collection

Apache Airflow

Apache Airflow is a workflow management platform for programmatically defining, scheduling, and monitoring pipelines.

DAG (Directed Acyclic Graph) Concept

A DAG is a directed graph with no cycles, representing a workflow in Airflow. Each node is a Task, and edges represent dependencies (execution order).

Operator Types

  • PythonOperator: Execute a Python function
  • BashOperator: Execute a shell command
  • SQLExecuteQueryOperator: Execute a SQL query
  • KubernetesPodOperator: Run a Kubernetes pod
  • SparkSubmitOperator: Submit a Spark job

ML Pipeline with TaskFlow API

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2026, 1, 1))
def ml_training_pipeline():

    @task
    def extract_data():
        # Extract data
        return {'rows': 10000, 'path': '/tmp/data.parquet'}

    @task
    def preprocess(info: dict):
        # Preprocessing
        import pandas as pd
        df = pd.read_parquet(info['path'])
        df_clean = df.dropna()
        return df_clean.to_dict()

    @task
    def train_model(data: dict):
        # Model training
        from sklearn.ensemble import GradientBoostingClassifier
        import mlflow

        with mlflow.start_run():
            model = GradientBoostingClassifier()
            # training logic here
            mlflow.sklearn.log_model(model, "model")
        return "model_trained"

    @task
    def validate_model(status: str):
        # Model validation
        if status == "model_trained":
            return {"accuracy": 0.92, "passed": True}
        return {"passed": False}

    raw = extract_data()
    processed = preprocess(raw)
    result = train_model(processed)
    validate_model(result)

dag_instance = ml_training_pipeline()

Airflow Best Practices

  • Idempotency: Running the same DAG multiple times should always produce the same result.
  • Atomicity: A task should either succeed or fail completely.
  • Minimize XCom: Use external storage instead of XCom for large data.
  • SLA settings: Set SLAs (Service Level Agreements) for critical pipelines.

dbt (Data Build Tool)

dbt enables analytics engineers to write data transformation logic in SQL, with version control, testing, and documentation built in.

dbt's Role and the ELT Pattern

Unlike traditional ETL (Extract-Transform-Load), dbt embraces the ELT (Extract-Load-Transform) pattern. Raw data is first loaded into the data warehouse, then transformed within it using SQL.

Model Layer Structure

models/
├── staging/          # Clean raw source data
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/     # Intermediate business logic transforms
│   └── int_order_items.sql
└── marts/           # Final business-facing tables
    ├── fct_orders.sql
    └── dim_customers.sql

Jinja Templating and the ref() Function

dbt uses the Jinja templating engine to add dynamic capabilities to SQL:

-- models/marts/fct_orders.sql
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.total_amount,
    o.created_at
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id
WHERE o.status = 'completed'

The ref() function automatically infers dependencies between models and executes them in the correct order.

Data Testing and Documentation

# schema.yml
models:
  - name: fct_orders
    description: 'Completed orders fact table'
    columns:
      - name: order_id
        description: 'Unique order ID'
        tests:
          - unique
          - not_null
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

Delta Lake / Apache Iceberg

Delta Lake and Apache Iceberg are open table formats that bring ACID transactions to the data lake.

Key Features

  • ACID Transactions: Ensures data consistency under concurrent reads and writes
  • Time Travel: Query data snapshots at any point in time
  • Schema Evolution: Change schemas without breaking existing data
  • Data Versioning: Track change history and enable rollback

Delta Lake Merge/Upsert Operations

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Upsert (Merge) operation
delta_table = DeltaTable.forPath(spark, "/delta/users")
updates_df = spark.read.parquet("/tmp/updates/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.user_id = source.user_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time Travel: query yesterday's data
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2026-03-16") \
    .load("/delta/users")

# Query by version
df_v1 = spark.read.format("delta") \
    .option("versionAsOf", 1) \
    .load("/delta/users")

# View change history
delta_table.history().show()

Z-Order Clustering

Z-Order co-locates related data in the same files to improve query performance:

spark.sql("""
    OPTIMIZE delta.`/delta/events`
    ZORDER BY (user_id, event_date)
""")

Feature Store

A feature store is a centralized repository for managing ML features. It ensures feature reuse, consistency, and governance across the ML lifecycle.

Online vs Offline Feature Store

Offline Feature Store stores large-scale historical features used for model training. Data is stored in a data lake as Parquet or Delta Lake format.

Online Feature Store serves the latest features for real-time inference with low latency. Data is stored in in-memory or NoSQL databases such as Redis, DynamoDB, or Cassandra.

Feast (Open-Source Feature Store)

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64

# Define feature view
user_stats = FeatureView(
    name="user_stats",
    entities=["user_id"],
    schema=[
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="purchase_count_30d", dtype=Int64),
    ],
    ttl=timedelta(days=30),
)

# Online feature retrieval (real-time inference)
store = FeatureStore(repo_path=".")
features = store.get_online_features(
    features=["user_stats:avg_purchase_amount", "user_stats:purchase_count_30d"],
    entity_rows=[{"user_id": 1001}],
).to_dict()

Managed Feature Stores

  • Tecton: Enterprise-grade feature platform with real-time transformation support
  • Vertex AI Feature Store: GCP managed integration
  • SageMaker Feature Store: AWS managed integration
  • Databricks Feature Store: Delta Lake-based feature management

Data Quality & Monitoring

Data quality determines the reliability of AI systems. "Garbage in, garbage out" is especially critical in ML.

Great Expectations for Data Validation

Great Expectations adds automated data validation to data pipelines:

import great_expectations as gx

context = gx.get_context()
datasource = context.sources.add_pandas("my_datasource")

# Define expectations
suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_suite"
)

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("total_amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_unique("order_id")
validator.save_expectation_suite()

# Run validation
checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[{"batch_request": batch_request, "expectation_suite_name": "orders_suite"}],
)
result = checkpoint.run()

Data Drift Detection

Data drift occurs when the statistical properties of production data diverge from training data. Tools like Evidently AI, WhyLogs, and Nannyml detect this:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=train_df, current_data=prod_df)
report.show(mode='inline')

Data Lineage Tracking

Data lineage tracks the full journey of data from source to consumption. Tools like Apache Atlas, OpenLineage, and Marquez are commonly used for this purpose.


Cloud Data Platforms

AWS Data Stack

  • Amazon S3: Object storage, foundation of the data lake
  • AWS Glue: Serverless ETL service and data catalog
  • Amazon Athena: SQL analytics directly on S3 data
  • Amazon Redshift: Cloud data warehouse
  • Amazon EMR: Managed Hadoop/Spark cluster

GCP Data Stack

  • Google BigQuery: Serverless data warehouse with built-in ML
  • Cloud Dataflow: Apache Beam-based data processing
  • Cloud Pub/Sub: Real-time messaging service (Kafka alternative)
  • Vertex AI: Unified ML platform

Azure Data Stack

  • Azure Data Lake Storage (ADLS): Hierarchical namespace storage
  • Azure Synapse Analytics: Integrated analytics service
  • Azure Event Hub: Large-scale event streaming (Kafka compatible)
  • Azure Databricks: Spark-based analytics platform

Snowflake Data Platform

Snowflake is a multi-cloud data platform with a distinctive architecture that separates storage and compute:

  • Virtual Warehouses: Independently scalable compute clusters
  • Data Sharing: Secure data sharing across organizations
  • Snowpipe: Automated continuous data ingestion
  • Cortex: Built-in AI/ML capabilities

Quiz

Q1. What is the key difference between Lambda and Kappa architectures?

Answer: Lambda Architecture maintains both a batch layer and a speed layer, whereas Kappa Architecture simplifies by using only a streaming layer.

Explanation: Lambda Architecture provides both accuracy (batch) and real-time results (stream), but requires maintaining two separate layers, increasing operational complexity. Advances in tools like Kafka Streams and Apache Flink have made streaming alone sufficiently accurate, turning Kappa Architecture into a practical choice.

Q2. What is the main difference between DataFrame and RDD in Apache Spark?

Answer: DataFrame represents structured data with a schema and benefits from automatic query optimization via the Catalyst optimizer. RDD is a lower-level, schema-less API offering finer control, but optimization is the developer's responsibility.

Explanation: In most cases, the DataFrame or Dataset API is recommended. Use RDD only when custom serialization or low-level control is needed. Spark 3.0+ also supports runtime optimization via Adaptive Query Execution (AQE).

Q3. What is the role of a Consumer Group in Kafka?

Answer: A Consumer Group is a set of consumers subscribing to the same topic, distributing partitions across consumers to provide parallel processing and load balancing.

Explanation: Within the same Consumer Group, each partition is consumed by exactly one consumer. Adding more consumers than partitions leaves excess consumers idle. Different Consumer Groups can independently read the same topic, enabling the same event stream to be used for multiple purposes.

Q4. What does the ref() function do in dbt?

Answer: The ref() function is used to reference another dbt model, automatically inferring inter-model dependencies and determining the correct execution order.

Explanation: Using ref() allows dbt to build a DAG (dependency graph) and automatically resolve the correct database schema and table name per environment (development/production). This significantly improves code reusability and maintainability.

Q5. What is the difference between an online store and an offline store in a feature store?

Answer: The offline store holds large-scale historical data for model training (Delta Lake, Parquet), while the online store serves the latest features for real-time inference with low latency (Redis, DynamoDB).

Explanation: The two stores are separated because their requirements differ. Training requires high throughput to process billions of rows in batch, while inference requires millisecond-level latency. Feature stores like Feast also ensure feature consistency between the two stores.


References