- Published on
Data Engineering Pipeline Complete Guide 2025: ETL/ELT, Spark, Airflow, Real-Time Streaming
- Authors

- Name
- Youngju Kim
- @fjvbn20031
1. Data Engineering Overview
The Role of a Data Engineer
Data engineers design, build, and maintain an organization's data infrastructure. Their core responsibility is creating pipelines that collect, transform, and store data so that data scientists and analysts can derive insights.
Core Responsibilities of a Data Engineer
├── Data Ingestion
│ ├── Extract data from APIs, DBs, files, streams
│ └── Integrate diverse sources
├── Data Transformation
│ ├── Cleaning, normalization, aggregation
│ └── Apply business logic
├── Data Storage
│ ├── Data warehouse design
│ └── Data lake architecture
├── Pipeline Orchestration
│ ├── Workflow automation
│ └── Scheduling and monitoring
└── Data Quality Management
├── Data validation
└── Data contracts
Essential Tech Stack
Programming : Python, SQL, Scala/Java
Data Processing : Spark, Flink, Beam
Orchestration : Airflow, Dagster, Prefect
Streaming : Kafka, Kinesis, Pub/Sub
Transformation : dbt, Dataform
Cloud : AWS (Redshift, Glue), GCP (BigQuery), Azure (Synapse)
Containers : Docker, Kubernetes
IaC : Terraform, Pulumi
Monitoring : Datadog, Grafana, Monte Carlo
2. ETL vs ELT
Traditional ETL
ETL stands for Extract, Transform, Load. Data is extracted from the source, transformed on a dedicated server, then loaded into the final data store.
ETL Flow:
Source DB ──Extract──→ Transform Server ──Transform──→ Load──→ Data Warehouse
(ETL Server)
# Traditional ETL Example (Python)
import pandas as pd
from sqlalchemy import create_engine
# Extract: Pull data from source DB
source_engine = create_engine('postgresql://source_db:5432/sales')
raw_data = pd.read_sql('SELECT * FROM orders WHERE date >= %s', source_engine, params=['2025-01-01'])
# Transform: Apply transformations
transformed = raw_data.copy()
transformed['total_with_tax'] = transformed['total'] * 1.1
transformed['order_month'] = pd.to_datetime(transformed['order_date']).dt.to_period('M')
transformed = transformed.dropna(subset=['customer_id'])
transformed = transformed[transformed['total'] > 0]
# Load: Write to warehouse
wh_engine = create_engine('postgresql://warehouse:5432/analytics')
transformed.to_sql('fact_orders', wh_engine, if_exists='append', index=False)
Modern ELT
ELT stands for Extract, Load, Transform. Raw data is loaded into the warehouse first, then transformed within the warehouse itself.
ELT Flow:
Source DB ──Extract──→ Load──→ Data Warehouse ──Transform──→ Analytics Tables
(Transform with dbt, etc.)
-- ELT: Transform inside the warehouse with dbt
-- models/marts/fact_orders.sql
WITH source_orders AS (
SELECT * FROM raw.orders
WHERE order_date >= '2025-01-01'
),
cleaned AS (
SELECT
order_id,
customer_id,
total,
total * 1.1 AS total_with_tax,
DATE_TRUNC('month', order_date) AS order_month,
order_date
FROM source_orders
WHERE customer_id IS NOT NULL
AND total > 0
)
SELECT * FROM cleaned
ETL vs ELT Comparison
| Aspect | ETL | ELT |
|---|---|---|
| Transform Location | Separate server | Inside warehouse |
| Scalability | Depends on ETL server | Leverages warehouse compute |
| Raw Data | May be lost after transform | Preserved |
| Cost | ETL server operation cost | Warehouse compute cost |
| Flexibility | Re-processing needed for logic changes | Flexible re-transformation with SQL |
| Key Tools | Informatica, Talend | dbt, Dataform |
| Best For | Legacy systems, regulatory requirements | Cloud-native, big data |
3. Batch vs Stream Processing
Batch Processing
Processes large volumes of data at scheduled intervals.
Batch Processing:
[Accumulate Data] ──→ [Bulk Process] ──→ [Store Results]
(hourly/daily) (Spark, etc.) (Warehouse)
Characteristics:
- High throughput
- High latency (minutes to hours)
- Cost-efficient
- Easy to reprocess
Stream Processing
Processes data in real time as it arrives.
Stream Processing:
[Event Occurs] ──→ [Immediate Processing] ──→ [Real-Time Results]
(continuous) (Flink, etc.) (Dashboard/Alerts)
Characteristics:
- Low latency (milliseconds to seconds)
- Continuous processing
- Complex failure handling
- Event ordering required
Selection Criteria
Choose Batch when:
- Daily/weekly/monthly reports
- Large-scale data aggregation
- Cost optimization priority
- No real-time requirement
Choose Stream when:
- Real-time dashboards
- Fraud detection
- Real-time recommendations
- IoT sensor data
- Alerting
Hybrid (Lambda/Kappa):
- Run batch + stream simultaneously
- Real-time approximation + batch accuracy
4. Apache Spark
4.1 Spark Overview
Apache Spark is a unified analytics engine for large-scale data processing. Its in-memory computation delivers 100x faster performance than MapReduce.
Spark Architecture:
+---------------------------------+
| Spark Application |
+---------------------------------+
| SparkSQL | Streaming | MLlib |
+---------------------------------+
| DataFrame / Dataset |
+---------------------------------+
| RDD (Core Engine) |
+---------------------------------+
| Standalone | YARN | Mesos | K8s |
+---------------------------------+
4.2 PySpark Basics
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lit
# Create SparkSession
spark = SparkSession.builder \
.appName("DataPipeline") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read data
orders_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("s3://data-lake/raw/orders/")
users_df = spark.read.parquet("s3://data-lake/raw/users/")
# Transform data
result = orders_df \
.filter(col("status") == "completed") \
.join(users_df, orders_df.user_id == users_df.id, "inner") \
.groupBy("user_id", "username") \
.agg(
count("order_id").alias("total_orders"),
sum("amount").alias("total_spent"),
avg("amount").alias("avg_order_value")
) \
.filter(col("total_orders") > 5)
# Save results
result.write \
.mode("overwrite") \
.partitionBy("order_month") \
.parquet("s3://data-lake/processed/user_summary/")
4.3 SparkSQL
# Register temp views
orders_df.createOrReplaceTempView("orders")
users_df.createOrReplaceTempView("users")
# SQL analytics
monthly_revenue = spark.sql("""
SELECT
DATE_TRUNC('month', order_date) AS month,
COUNT(DISTINCT user_id) AS unique_customers,
COUNT(*) AS total_orders,
SUM(amount) AS revenue,
AVG(amount) AS avg_order_value
FROM orders
WHERE status = 'completed'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month
""")
monthly_revenue.show()
4.4 Partitioning and Caching
# Partition optimization
# Check current partitions
print(f"Partitions: {orders_df.rdd.getNumPartitions()}")
# Repartition (causes full shuffle)
orders_repartitioned = orders_df.repartition(100, "order_date")
# Coalesce (reduce partitions without shuffle)
orders_coalesced = orders_df.coalesce(50)
# Caching
from pyspark.storagelevel import StorageLevel
# Memory caching
orders_df.cache()
orders_df.count() # Triggers cache materialization
# Memory + disk caching
users_df.persist(StorageLevel.MEMORY_AND_DISK)
# Release cache
orders_df.unpersist()
4.5 Spark Performance Tuning
# Broadcast join (for small tables)
from pyspark.sql.functions import broadcast
# Broadcast the smaller table
result = orders_df.join(
broadcast(users_df),
orders_df.user_id == users_df.id
)
# Enable AQE (Adaptive Query Execution)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Spark Performance Optimization Checklist:
[ ] Set appropriate partition count (2-3x number of cores)
[ ] Handle data skew (salting, AQE)
[ ] Leverage broadcast joins
[ ] Minimize unnecessary shuffles
[ ] Column pruning (select only needed columns)
[ ] Use Predicate Pushdown
[ ] Establish caching strategy
[ ] Optimize serialization format (Parquet, ORC)
5. Apache Airflow
5.1 Airflow Overview
Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. It defines task dependencies using DAGs (Directed Acyclic Graphs).
Airflow Architecture:
+----------------------------------+
| Web Server |
| (UI / REST API) |
+----------------------------------+
| Scheduler |
| (DAG Parsing, Task Scheduling) |
+----------------------------------+
| Executor |
| (Local/Celery/Kubernetes) |
+----------------------------------+
| Metadata Database |
| (PostgreSQL/MySQL) |
+----------------------------------+
5.2 Writing DAGs
# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-team@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline',
schedule_interval='0 6 * * *', # Daily at 6 AM
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['etl', 'daily'],
max_active_runs=1,
) as dag:
# Sensor: Wait for data arrival
wait_for_data = FileSensor(
task_id='wait_for_data',
filepath='/data/raw/daily_export.csv',
poke_interval=300, # Check every 5 minutes
timeout=3600, # Wait up to 1 hour
mode='poke',
)
# Extract
def extract_data(**context):
import pandas as pd
execution_date = context['ds']
df = pd.read_csv(f'/data/raw/daily_export_{execution_date}.csv')
df.to_parquet(f'/data/staging/extract_{execution_date}.parquet')
return len(df)
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
# Transform
def transform_data(**context):
import pandas as pd
execution_date = context['ds']
df = pd.read_parquet(f'/data/staging/extract_{execution_date}.parquet')
# Data cleaning
df = df.dropna(subset=['customer_id', 'amount'])
df = df[df['amount'] > 0]
df['amount_with_tax'] = df['amount'] * 1.1
df.to_parquet(f'/data/staging/transform_{execution_date}.parquet')
return len(df)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
# Load
load = PostgresOperator(
task_id='load',
postgres_conn_id='warehouse',
sql='sql/load_daily_orders.sql',
)
# Data quality validation
def validate_data(**context):
execution_date = context['ds']
row_count = context['ti'].xcom_pull(task_ids='transform')
if row_count < 100:
raise ValueError(f'Insufficient row count: {row_count}')
validate = PythonOperator(
task_id='validate',
python_callable=validate_data,
)
# Notification
notify = BashOperator(
task_id='notify',
bash_command='echo "ETL complete: {{ ds }}" | mail -s "ETL Success" team@company.com',
)
# Define dependencies
wait_for_data >> extract >> transform >> load >> validate >> notify
5.3 TaskFlow API (Airflow 2.x)
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['etl'],
)
def modern_etl_pipeline():
@task()
def extract():
"""Extract data from source"""
import pandas as pd
df = pd.read_csv('/data/raw/orders.csv')
return df.to_dict()
@task()
def transform(raw_data: dict):
"""Transform and clean data"""
import pandas as pd
df = pd.DataFrame(raw_data)
df = df[df['amount'] > 0]
df['processed_at'] = datetime.now().isoformat()
return df.to_dict()
@task()
def load(transformed_data: dict):
"""Load into warehouse"""
import pandas as pd
df = pd.DataFrame(transformed_data)
print(f"Loaded {len(df)} records")
# Automatic dependency resolution
raw = extract()
transformed = transform(raw)
load(transformed)
# Instantiate the DAG
modern_etl_pipeline()
5.4 Connections, Variables, XCom
# Connections: Configure via UI or CLI
# airflow connections add 'warehouse' \
# --conn-type 'postgres' \
# --conn-host 'warehouse.example.com' \
# --conn-port 5432 \
# --conn-login 'etl_user' \
# --conn-password 'secret'
# Variables
from airflow.models import Variable
env = Variable.get("environment", default_var="dev")
config = Variable.get("pipeline_config", deserialize_json=True)
# XCom: Pass data between tasks
def producer_task(**context):
context['ti'].xcom_push(key='row_count', value=1000)
def consumer_task(**context):
row_count = context['ti'].xcom_pull(
task_ids='producer',
key='row_count'
)
print(f"Previous task result: {row_count} rows")
5.5 Dynamic DAG Generation
# dags/dynamic_dag_factory.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# Configuration-driven dynamic DAG creation
configs = [
{"name": "sales", "source": "mysql", "schedule": "@daily"},
{"name": "users", "source": "postgres", "schedule": "@hourly"},
{"name": "logs", "source": "s3", "schedule": "0 */6 * * *"},
]
def create_etl_dag(config):
dag = DAG(
dag_id=f"etl_{config['name']}",
schedule_interval=config['schedule'],
start_date=datetime(2025, 1, 1),
catchup=False,
)
def process(**kwargs):
print(f"Processing {config['name']} from {config['source']}")
with dag:
PythonOperator(
task_id='process',
python_callable=process,
)
return dag
for config in configs:
globals()[f"etl_{config['name']}"] = create_etl_dag(config)
6. Real-Time Streaming
6.1 Apache Kafka
Kafka is a distributed event streaming platform and the backbone of large-scale real-time data pipelines.
Kafka Architecture:
Producer ──→ Broker(Topic/Partition) ──→ Consumer Group
|
+-- Partition 0: [msg1, msg2, msg3...]
+-- Partition 1: [msg4, msg5, msg6...]
+-- Partition 2: [msg7, msg8, msg9...]
# Kafka Producer (Python)
from confluent_kafka import Producer
import json
config = {
'bootstrap.servers': 'kafka-broker:9092',
'acks': 'all',
'retries': 3,
'linger.ms': 10,
'batch.size': 16384,
}
producer = Producer(config)
def delivery_callback(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered: {msg.topic()} [{msg.partition()}]')
# Send messages
for i in range(100):
event = {
'user_id': f'user_{i}',
'action': 'page_view',
'timestamp': '2025-03-25T10:00:00Z',
'page': '/products'
}
producer.produce(
topic='user-events',
key=str(event['user_id']),
value=json.dumps(event),
callback=delivery_callback
)
producer.flush()
# Kafka Consumer (Python)
from confluent_kafka import Consumer
import json
config = {
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'analytics-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer = Consumer(config)
consumer.subscribe(['user-events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f'Consumer error: {msg.error()}')
continue
event = json.loads(msg.value().decode('utf-8'))
print(f"Received: {event['user_id']} - {event['action']}")
# Manual commit
consumer.commit(asynchronous=False)
finally:
consumer.close()
6.2 Apache Flink
Flink is a stateful stream processing engine that provides exactly-once semantics.
# PyFlink Stream Processing
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60000) # Checkpoint every 60 seconds
t_env = StreamTableEnvironment.create(env)
# Define Kafka source table
t_env.execute_sql("""
CREATE TABLE user_events (
user_id STRING,
action STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-processor',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Window aggregation
t_env.execute_sql("""
CREATE TABLE page_view_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
page STRING,
view_count BIGINT,
unique_users BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://analytics-db:5432/stats',
'table-name' = 'page_view_stats',
'driver' = 'org.postgresql.Driver'
)
""")
t_env.execute_sql("""
INSERT INTO page_view_stats
SELECT
window_start,
window_end,
action AS page,
COUNT(*) AS view_count,
COUNT(DISTINCT user_id) AS unique_users
FROM TABLE(
TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, action
""")
6.3 Exactly-Once Semantics
Delivery Guarantee Levels:
At-most-once : Messages may be lost, no duplicates
At-least-once : No message loss, duplicates possible
Exactly-once : No message loss, no duplicates (hardest to achieve)
Kafka + Flink Exactly-Once:
1. Kafka Transactional Producer
2. Flink Checkpointing (Chandy-Lamport)
3. Two-Phase Commit Protocol
4. Kafka Consumer offsets linked to checkpoints
7. dbt (data build tool)
7.1 dbt Overview
dbt handles the T (Transform) in ELT. It lets you write data transformation logic in SQL while applying software engineering best practices (version control, testing, documentation) to data transformations.
dbt Project Structure:
my_dbt_project/
+-- dbt_project.yml
+-- profiles.yml
+-- models/
| +-- staging/
| | +-- stg_orders.sql
| | +-- stg_customers.sql
| | +-- _staging_sources.yml
| +-- intermediate/
| | +-- int_order_items_grouped.sql
| +-- marts/
| +-- dim_customers.sql
| +-- fact_orders.sql
| +-- _marts_schema.yml
+-- tests/
| +-- assert_positive_revenue.sql
+-- macros/
| +-- generate_schema_name.sql
+-- seeds/
+-- country_codes.csv
7.2 Writing Models
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
user_id AS customer_id,
amount AS order_amount,
status AS order_status,
created_at AS ordered_at
FROM source
WHERE status != 'cancelled'
)
SELECT * FROM renamed
-- models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
)
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.customer_segment,
o.order_amount,
o.order_amount * 1.1 AS amount_with_tax,
o.order_status,
o.ordered_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o.ordered_at > (SELECT MAX(ordered_at) FROM {{ this }})
{% endif %}
7.3 Sources and Tests
# models/staging/_staging_sources.yml
version: 2
sources:
- name: raw
database: raw_db
schema: public
tables:
- name: orders
loaded_at_field: _loaded_at
freshness:
warn_after:
count: 12
period: hour
error_after:
count: 24
period: hour
columns:
- name: id
tests:
- unique
- not_null
- name: amount
tests:
- not_null
- name: raw
tables:
- name: customers
columns:
- name: id
tests:
- unique
- not_null
# models/marts/_marts_schema.yml
version: 2
models:
- name: fact_orders
description: "Orders fact table"
columns:
- name: order_id
description: "Unique order ID"
tests:
- unique
- not_null
- name: order_amount
tests:
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
-- tests/assert_positive_revenue.sql
-- Custom test: verify all revenue is positive
SELECT order_id, order_amount
FROM {{ ref('fact_orders') }}
WHERE order_amount < 0
7.4 dbt Commands
# Build all models
dbt run
# Build a specific model
dbt run --select fact_orders
# Build model + downstream models
dbt run --select stg_orders+
# Run tests
dbt test
# Generate documentation
dbt docs generate
dbt docs serve
# Check source freshness
dbt source freshness
# Load seed data
dbt seed
# Full pipeline (build + test)
dbt build
8. Data Warehouses
Comparison
| Aspect | BigQuery | Snowflake | Redshift |
|---|---|---|---|
| Vendor | Google Cloud | Snowflake | AWS |
| Architecture | Serverless | Compute/storage separation | MPP cluster |
| Pricing | Per-query (on-demand) | Credit-based | Per-node-hour |
| Scalability | Automatic | Warehouse resizing | Add nodes |
| Concurrency | 2000+ slots | Multi-cluster | WLM configuration |
| Semi-structured | STRUCT, ARRAY | VARIANT | SUPER |
| ML Integration | BigQuery ML | Snowpark | Redshift ML |
| Cost Efficiency | Best for small scale | Best for medium scale | Best for large always-on |
BigQuery Example
-- BigQuery: Partitioning + Clustering
CREATE TABLE analytics.fact_orders
PARTITION BY DATE(ordered_at)
CLUSTER BY customer_segment, order_status
AS
SELECT
order_id,
customer_id,
customer_segment,
order_amount,
order_status,
ordered_at
FROM staging.orders;
-- Cost estimation (dry run)
-- 1TB scanned = approximately $5 (on-demand)
Snowflake Example
-- Snowflake: Warehouse creation and management
CREATE WAREHOUSE etl_wh
WITH WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3;
-- Data loading
COPY INTO raw.orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
PATTERN = '.*[.]parquet';
9. Data Lake / Lakehouse
Table Format Comparison
Traditional Data Lake Problems:
- No ACID transactions
- No schema enforcement
- No time travel
- Small files problem
Lakehouse Table Formats Solve These:
Delta Lake : Led by Databricks, best Spark integration
Apache Iceberg : Developed by Netflix, vendor-neutral
Apache Hudi : Developed by Uber, specializes in incremental processing
| Feature | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| ACID Transactions | Yes | Yes | Yes |
| Schema Evolution | Yes | Yes | Yes |
| Time Travel | Yes | Yes | Yes |
| Partition Evolution | Limited | Yes (hidden partitioning) | Limited |
| Engine Compatibility | Spark-centric | Spark, Flink, Trino | Spark, Flink |
| Primary Platform | Databricks | Multi-vendor adoption | AWS-centric |
# Delta Lake Example (PySpark)
from delta.tables import DeltaTable
# Create Delta table
orders_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.save("s3://data-lake/delta/orders")
# UPSERT (Merge)
delta_table = DeltaTable.forPath(spark, "s3://data-lake/delta/orders")
delta_table.alias("target").merge(
new_orders_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time Travel
old_data = spark.read \
.format("delta") \
.option("versionAsOf", 5) \
.load("s3://data-lake/delta/orders")
10. Data Quality
10.1 Great Expectations
import great_expectations as gx
context = gx.get_context()
# Connect data source
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("orders", filepath_or_buffer="orders.csv")
# Define Expectation Suite
suite = context.add_expectation_suite("orders_validation")
# Define expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=100000
)
)
# Run validation
results = context.run_checkpoint(
checkpoint_name="orders_checkpoint"
)
print(f"Success: {results.success}")
10.2 Data Contracts
# data-contracts/orders-contract.yaml
dataContractSpecification: 0.9.3
id: orders-contract
info:
title: Orders Data Contract
version: 1.0.0
owner: data-team
contact:
email: data-team@company.com
schema:
type: object
properties:
order_id:
type: string
description: "Unique order identifier"
required: true
unique: true
customer_id:
type: string
required: true
amount:
type: number
minimum: 0
maximum: 100000
status:
type: string
enum: ["pending", "completed", "cancelled"]
created_at:
type: timestamp
required: true
quality:
completeness:
- field: order_id
threshold: 100
- field: customer_id
threshold: 99.9
freshness:
maxDelay: "PT1H" # Within 1 hour
11. Orchestration Comparison
Airflow vs Dagster vs Prefect vs Mage
| Aspect | Airflow | Dagster | Prefect | Mage |
|---|---|---|---|---|
| Approach | DAG-centric | Asset-centric | Flow-centric | Block-centric |
| Learning Curve | High | Medium | Low | Low |
| Local Development | Complex | Excellent | Excellent | Excellent |
| Testing | Difficult | Built-in support | Good | Good |
| UI | Functional | Modern | Modern | Modern |
| Community | Very large | Growing | Growing | Small |
| Production Track Record | Extensive | Increasing | Increasing | Early stage |
| Cloud Offering | MWAA, Composer | Dagster Cloud | Prefect Cloud | Mage Pro |
Selection Guide:
+-- Large enterprise, complex workflows --> Airflow
+-- Data asset-centric thinking --> Dagster
+-- Quick start, Python-native --> Prefect
+-- No-code/low-code preference --> Mage
12. Modern Data Stack Diagram
Modern Data Stack (2025):
Data Sources Ingestion Storage Transform Analytics/BI
----------- ---------- ---------- ---------- ----------
SaaS APIs --+
Databases --+---> Fivetran/Airbyte ---> Snowflake ---> dbt ---> Looker
Event Logs --+ BigQuery Dataform Metabase
Files --+ Redshift Tableau
Kafka/ -------> Flink/Spark ---> Delta Lake ---> Spark SQL ---> Real-time
Kinesis Streaming Iceberg Dashboards
Orchestration: Airflow / Dagster
Quality: Great Expectations / dbt tests
Catalog: DataHub / Atlan / OpenMetadata
Monitoring: Monte Carlo / Datadog
13. Quiz
Q1: ETL vs ELT
What is the core difference between ETL and ELT, and when should you choose ELT?
Answer:
The core difference is where transformation occurs. ETL transforms data on a separate server before loading, while ELT loads raw data into the warehouse first and transforms it there.
Choose ELT when:
- Using cloud warehouses (BigQuery, Snowflake) with elastic compute
- Raw data preservation is important
- Transformation logic changes frequently and flexibility is needed
- You want SQL-based transformation with tools like dbt
Q2: Spark Partitioning
What is the difference between repartition() and coalesce() in Spark?
Answer:
repartition() performs a full shuffle to redistribute data evenly across the specified number of partitions. Use it when increasing partitions or partitioning by a specific column.
coalesce() reduces partition count without a full shuffle. It merges existing partitions and can only decrease the number of partitions, with lower network overhead.
Use coalesce() when reducing partitions, repartition() when increasing or needing even distribution.
Q3: Airflow XCom
What is the role and limitations of XCom in Airflow?
Answer:
XCom (Cross-Communication) is a mechanism for passing small amounts of data between Airflow tasks. Data is stored in the metadata database.
Limitations:
- Only for small data (default 48KB, max a few MB)
- Large datasets should use external storage (S3/GCS), passing only the file path via XCom
- Only JSON-serializable data can be passed
- Can put load on the metadata database
Alternative: For large data, use temporary files or cloud storage, and pass only the file path through XCom.
Q4: Exactly-Once Semantics
How do you implement exactly-once semantics with Kafka?
Answer:
Kafka exactly-once is implemented through three components:
-
Idempotent Producer: Setting enable.idempotence=true allows brokers to automatically deduplicate messages.
-
Transactional Producer: Guarantees atomic writes across multiple partitions/topics. Uses initTransactions(), beginTransaction(), commitTransaction() APIs.
-
Consumer read_committed: Setting isolation.level=read_committed ensures consumers only read messages from committed transactions.
When combined with Flink, Flink's checkpointing mechanism and Kafka's transactional API work together to achieve end-to-end exactly-once delivery.
Q5: dbt Incremental Models
How do dbt incremental models work, and when should you use them?
Answer:
dbt incremental models process only new or changed data since the last run.
How it works:
- First run processes all data (CREATE TABLE AS)
- Subsequent runs filter new data using the
is_incremental()condition - New data is MERGEd or INSERTed into the existing table
When to use:
- Large fact tables (where full rebuilds are expensive)
- Event/log data (time-ordered appends)
- Data that grows incrementally
The key is setting a proper unique_key and appropriate incremental filter condition (WHERE clause).
14. References
- Apache Spark Official Documentation
- Apache Airflow Official Documentation
- Apache Kafka Official Documentation
- Apache Flink Official Documentation
- dbt Official Documentation
- Great Expectations Documentation
- Delta Lake Official Documentation
- Apache Iceberg Official Documentation
- BigQuery Official Documentation
- Snowflake Official Documentation
- Fundamentals of Data Engineering (O'Reilly)
- The Data Warehouse Toolkit (Kimball)