- Published on
データエンジニアリングパイプライン完全ガイド2025:ETL/ELT、Spark、Airflow、リアルタイムストリーミング
- Authors

- Name
- Youngju Kim
- @fjvbn20031
1. データエンジニアリング概要(がいよう)
データエンジニアの役割(やくわり)
データエンジニアは、組織(そしき)のデータインフラを設計(せっけい)・構築(こうちく)・保守(ほしゅ)する役割(やくわり)を担(にな)います。データサイエンティストが分析(ぶんせき)できるよう、データを収集(しゅうしゅう)・変換(へんかん)・格納(かくのう)するパイプラインを構築(こうちく)するのが核心業務(かくしんぎょうむ)です。
データエンジニアの核心的な役割
├── データ収集(Ingestion)
│ ├── API、DB、ファイル、ストリームからデータ抽出
│ └── 多様なソースの統合
├── データ変換(Transformation)
│ ├── クレンジング、正規化、集約
│ └── ビジネスロジック適用
├── データ格納(Storage)
│ ├── データウェアハウス設計
│ └── データレイク構築
├── パイプラインオーケストレーション
│ ├── ワークフロー自動化
│ └── スケジューリングとモニタリング
└── データ品質管理
├── データ検証
└── データ契約(Data Contract)
必須(ひっす)技術(ぎじゅつ)スタック
プログラミング : Python, SQL, Scala/Java
データ処理 : Spark, Flink, Beam
オーケストレーション : Airflow, Dagster, Prefect
ストリーミング : Kafka, Kinesis, Pub/Sub
変換ツール : dbt, Dataform
クラウド : AWS(Redshift, Glue), GCP(BigQuery), Azure(Synapse)
コンテナ : Docker, Kubernetes
IaC : Terraform, Pulumi
モニタリング : Datadog, Grafana, Monte Carlo
2. ETL vs ELT
伝統的(でんとうてき)なETL
ETLはExtract(抽出(ちゅうしゅつ))、Transform(変換(へんかん))、Load(格納(かくのう))の略(りゃく)で、データをソースから抽出(ちゅうしゅつ)した後(あと)、変換サーバーで加工(かこう)して最終(さいしゅう)ストレージに格納(かくのう)します。
ETLフロー:
ソースDB ──Extract──→ 変換サーバー ──Transform──→ Load──→ データウェアハウス
(ETLサーバー)
# 伝統的なETL例(Python)
import pandas as pd
from sqlalchemy import create_engine
# Extract:ソースDBからデータ抽出
source_engine = create_engine('postgresql://source_db:5432/sales')
raw_data = pd.read_sql('SELECT * FROM orders WHERE date >= %s', source_engine, params=['2025-01-01'])
# Transform:データ変換
transformed = raw_data.copy()
transformed['total_with_tax'] = transformed['total'] * 1.1
transformed['order_month'] = pd.to_datetime(transformed['order_date']).dt.to_period('M')
transformed = transformed.dropna(subset=['customer_id'])
transformed = transformed[transformed['total'] > 0]
# Load:ウェアハウスに格納
wh_engine = create_engine('postgresql://warehouse:5432/analytics')
transformed.to_sql('fact_orders', wh_engine, if_exists='append', index=False)
現代的(げんだいてき)なELT
ELTはExtract(抽出)、Load(格納)、Transform(変換)の順序(じゅんじょ)で、生(なま)データをまずウェアハウスに格納(かくのう)してから、ウェアハウス内(ない)で変換(へんかん)します。
ELTフロー:
ソースDB ──Extract──→ Load──→ データウェアハウス ──Transform──→ 分析用テーブル
(dbt等で変換)
-- ELT:ウェアハウス内でdbtにより変換
-- models/marts/fact_orders.sql
WITH source_orders AS (
SELECT * FROM raw.orders
WHERE order_date >= '2025-01-01'
),
cleaned AS (
SELECT
order_id,
customer_id,
total,
total * 1.1 AS total_with_tax,
DATE_TRUNC('month', order_date) AS order_month,
order_date
FROM source_orders
WHERE customer_id IS NOT NULL
AND total > 0
)
SELECT * FROM cleaned
ETL vs ELT比較表(ひかくひょう)
| 項目 | ETL | ELT |
|---|---|---|
| 変換場所 | 別サーバー | ウェアハウス内部 |
| スケーラビリティ | ETLサーバー性能に依存 | ウェアハウスの演算力活用 |
| 元データ | 変換後に原本喪失の可能性 | 原本保持 |
| コスト | ETLサーバー運用コスト | ウェアハウス演算コスト |
| 柔軟性 | 変換ロジック変更時に再処理 | SQLで柔軟に再変換 |
| 代表ツール | Informatica, Talend | dbt, Dataform |
| 適合ケース | レガシーシステム、規制要件 | クラウドネイティブ、ビッグデータ |
3. バッチ vs ストリーム処理(しょり)
バッチ処理(しょり)
一定(いってい)の周期(しゅうき)で大量(たいりょう)のデータを一括(いっかつ)処理(しょり)する方式(ほうしき)です。
バッチ処理:
[データ蓄積] ──→ [一括処理] ──→ [結果格納]
(1時間/1日) (Spark等) (ウェアハウス)
特徴:
- 高スループット
- 高レイテンシ(分~時間)
- コスト効率的
- 再処理が容易
ストリーム処理(しょり)
データが到着(とうちゃく)した時点(じてん)でリアルタイムに処理(しょり)する方式(ほうしき)です。
ストリーム処理:
[イベント発生] ──→ [即座に処理] ──→ [リアルタイム結果]
(連続的) (Flink等) (ダッシュボード/アラート)
特徴:
- 低レイテンシ(ミリ秒~秒)
- 連続処理
- 複雑な障害処理
- イベント順序管理が必要
選択基準(せんたくきじゅん)
バッチを選択する場合:
- 日次/週次/月次レポート
- 大規模データ集約
- コスト最適化優先
- リアルタイム不要
ストリームを選択する場合:
- リアルタイムダッシュボード
- 不正検知
- リアルタイムレコメンデーション
- IoTセンサーデータ
- アラート/通知
ハイブリッド(Lambda/Kappa):
- バッチ + ストリーム同時運用
- リアルタイム近似値 + バッチ正確結果
4. Apache Spark
4.1 Spark概要(がいよう)
Apache Sparkは大規模(だいきぼ)データ処理(しょり)のための統合分析(とうごうぶんせき)エンジンです。インメモリ演算(えんざん)でMapReduce比(ひ)100倍(ばい)の性能(せいのう)を提供(ていきょう)します。
Sparkアーキテクチャ:
┌─────────────────────────────────┐
│ Spark Application │
├─────────────────────────────────┤
│ SparkSQL │ Streaming │ MLlib │
├─────────────────────────────────┤
│ DataFrame / Dataset │
├─────────────────────────────────┤
│ RDD (Core Engine) │
├─────────────────────────────────┤
│ Standalone │ YARN │ Mesos │ K8s│
└─────────────────────────────────┘
4.2 PySparkの基本(きほん)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lit
# SparkSession作成
spark = SparkSession.builder \
.appName("DataPipeline") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# データ読み込み
orders_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("s3://data-lake/raw/orders/")
users_df = spark.read.parquet("s3://data-lake/raw/users/")
# データ変換
result = orders_df \
.filter(col("status") == "completed") \
.join(users_df, orders_df.user_id == users_df.id, "inner") \
.groupBy("user_id", "username") \
.agg(
count("order_id").alias("total_orders"),
sum("amount").alias("total_spent"),
avg("amount").alias("avg_order_value")
) \
.filter(col("total_orders") > 5)
# 結果保存
result.write \
.mode("overwrite") \
.partitionBy("order_month") \
.parquet("s3://data-lake/processed/user_summary/")
4.3 SparkSQL
# 一時ビュー登録
orders_df.createOrReplaceTempView("orders")
users_df.createOrReplaceTempView("users")
# SQLで分析
monthly_revenue = spark.sql("""
SELECT
DATE_TRUNC('month', order_date) AS month,
COUNT(DISTINCT user_id) AS unique_customers,
COUNT(*) AS total_orders,
SUM(amount) AS revenue,
AVG(amount) AS avg_order_value
FROM orders
WHERE status = 'completed'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month
""")
monthly_revenue.show()
4.4 パーティショニングとキャッシュ
# パーティション最適化
# 現在のパーティション数を確認
print(f"Partitions: {orders_df.rdd.getNumPartitions()}")
# リパーティション(シャッフル発生)
orders_repartitioned = orders_df.repartition(100, "order_date")
# コアレス(シャッフルなしでパーティション数を削減)
orders_coalesced = orders_df.coalesce(50)
# キャッシュ
from pyspark.storagelevel import StorageLevel
# メモリキャッシュ
orders_df.cache()
orders_df.count() # キャッシュをトリガー
# メモリ + ディスクキャッシュ
users_df.persist(StorageLevel.MEMORY_AND_DISK)
# キャッシュ解放
orders_df.unpersist()
4.5 Sparkパフォーマンスチューニング
# ブロードキャストジョイン(小さいテーブル用)
from pyspark.sql.functions import broadcast
# 小さいテーブルをブロードキャスト
result = orders_df.join(
broadcast(users_df),
orders_df.user_id == users_df.id
)
# AQE(Adaptive Query Execution)有効化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Sparkパフォーマンス最適化チェックリスト:
[ ] 適切なパーティション数を設定(コア数の2~3倍)
[ ] データスキュー処理(salting、AQE)
[ ] ブロードキャストジョイン活用
[ ] 不必要なシャッフル最小化
[ ] カラムプルーニング(必要なカラムのみ選択)
[ ] Predicate Pushdown活用
[ ] キャッシュ戦略策定
[ ] シリアライゼーションフォーマット最適化(Parquet、ORC)
5. Apache Airflow
5.1 Airflow概要(がいよう)
Apache Airflowは、ワークフローをプログラム的(てき)に作成(さくせい)・スケジューリング・モニタリングするためのプラットフォームです。DAG(有向(ゆうこう)非巡回(ひじゅんかい)グラフ)でタスク間(かん)の依存関係(いぞんかんけい)を定義(ていぎ)します。
Airflowアーキテクチャ:
┌──────────────────────────────────┐
│ Web Server │
│ (UI / REST API) │
├──────────────────────────────────┤
│ Scheduler │
│ (DAGパース、タスクスケジューリング)│
├──────────────────────────────────┤
│ Executor │
│ (Local/Celery/Kubernetes) │
├──────────────────────────────────┤
│ Metadata Database │
│ (PostgreSQL/MySQL) │
└──────────────────────────────────┘
5.2 DAGの作成(さくせい)
# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-team@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
description='日次ETLパイプライン',
schedule_interval='0 6 * * *', # 毎日午前6時
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['etl', 'daily'],
max_active_runs=1,
) as dag:
# センサー:ファイル到着待ち
wait_for_data = FileSensor(
task_id='wait_for_data',
filepath='/data/raw/daily_export.csv',
poke_interval=300, # 5分ごとに確認
timeout=3600, # 最大1時間待機
mode='poke',
)
# 抽出
def extract_data(**context):
import pandas as pd
execution_date = context['ds']
df = pd.read_csv(f'/data/raw/daily_export_{execution_date}.csv')
df.to_parquet(f'/data/staging/extract_{execution_date}.parquet')
return len(df)
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
# 変換
def transform_data(**context):
import pandas as pd
execution_date = context['ds']
df = pd.read_parquet(f'/data/staging/extract_{execution_date}.parquet')
# データクレンジング
df = df.dropna(subset=['customer_id', 'amount'])
df = df[df['amount'] > 0]
df['amount_with_tax'] = df['amount'] * 1.1
df.to_parquet(f'/data/staging/transform_{execution_date}.parquet')
return len(df)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
# 格納
load = PostgresOperator(
task_id='load',
postgres_conn_id='warehouse',
sql='sql/load_daily_orders.sql',
)
# データ品質検証
def validate_data(**context):
execution_date = context['ds']
row_count = context['ti'].xcom_pull(task_ids='transform')
if row_count < 100:
raise ValueError(f'レコード数不足: {row_count}')
validate = PythonOperator(
task_id='validate',
python_callable=validate_data,
)
# 通知
notify = BashOperator(
task_id='notify',
bash_command='echo "ETL完了: {{ ds }}" | mail -s "ETL Success" team@company.com',
)
# 依存関係定義
wait_for_data >> extract >> transform >> load >> validate >> notify
5.3 TaskFlow API(Airflow 2.x)
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['etl'],
)
def modern_etl_pipeline():
@task()
def extract():
"""ソースからデータ抽出"""
import pandas as pd
df = pd.read_csv('/data/raw/orders.csv')
return df.to_dict()
@task()
def transform(raw_data: dict):
"""データ変換と整形"""
import pandas as pd
df = pd.DataFrame(raw_data)
df = df[df['amount'] > 0]
df['processed_at'] = datetime.now().isoformat()
return df.to_dict()
@task()
def load(transformed_data: dict):
"""ウェアハウスに格納"""
import pandas as pd
df = pd.DataFrame(transformed_data)
print(f"Loaded {len(df)} records")
# 自動依存関係設定
raw = extract()
transformed = transform(raw)
load(transformed)
# DAGインスタンス化
modern_etl_pipeline()
5.4 Connections、Variables、XCom
# Connections:UIまたはCLIで設定
# airflow connections add 'warehouse' \
# --conn-type 'postgres' \
# --conn-host 'warehouse.example.com' \
# --conn-port 5432 \
# --conn-login 'etl_user' \
# --conn-password 'secret'
# Variables
from airflow.models import Variable
env = Variable.get("environment", default_var="dev")
config = Variable.get("pipeline_config", deserialize_json=True)
# XCom:タスク間データ受け渡し
def producer_task(**context):
context['ti'].xcom_push(key='row_count', value=1000)
def consumer_task(**context):
row_count = context['ti'].xcom_pull(
task_ids='producer',
key='row_count'
)
print(f"前タスクの結果: {row_count}行")
5.5 動的(どうてき)DAG生成(せいせい)
# dags/dynamic_dag_factory.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 設定ベースの動的DAG生成
configs = [
{"name": "sales", "source": "mysql", "schedule": "@daily"},
{"name": "users", "source": "postgres", "schedule": "@hourly"},
{"name": "logs", "source": "s3", "schedule": "0 */6 * * *"},
]
def create_etl_dag(config):
dag = DAG(
dag_id=f"etl_{config['name']}",
schedule_interval=config['schedule'],
start_date=datetime(2025, 1, 1),
catchup=False,
)
def process(**kwargs):
print(f"Processing {config['name']} from {config['source']}")
with dag:
PythonOperator(
task_id='process',
python_callable=process,
)
return dag
for config in configs:
globals()[f"etl_{config['name']}"] = create_etl_dag(config)
6. リアルタイムストリーミング
6.1 Apache Kafka
Kafkaは分散(ぶんさん)イベントストリーミングプラットフォームで、大規模(だいきぼ)リアルタイムデータパイプラインの基盤(きばん)です。
Kafkaアーキテクチャ:
Producer ──→ Broker(Topic/Partition) ──→ Consumer Group
│
├── Partition 0: [msg1, msg2, msg3...]
├── Partition 1: [msg4, msg5, msg6...]
└── Partition 2: [msg7, msg8, msg9...]
# Kafka Producer(Python)
from confluent_kafka import Producer
import json
config = {
'bootstrap.servers': 'kafka-broker:9092',
'acks': 'all',
'retries': 3,
'linger.ms': 10,
'batch.size': 16384,
}
producer = Producer(config)
def delivery_callback(err, msg):
if err:
print(f'送信失敗: {err}')
else:
print(f'送信完了: {msg.topic()} [{msg.partition()}]')
# メッセージ送信
for i in range(100):
event = {
'user_id': f'user_{i}',
'action': 'page_view',
'timestamp': '2025-03-25T10:00:00Z',
'page': '/products'
}
producer.produce(
topic='user-events',
key=str(event['user_id']),
value=json.dumps(event),
callback=delivery_callback
)
producer.flush()
# Kafka Consumer(Python)
from confluent_kafka import Consumer
import json
config = {
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'analytics-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer = Consumer(config)
consumer.subscribe(['user-events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f'Consumer error: {msg.error()}')
continue
event = json.loads(msg.value().decode('utf-8'))
print(f"受信: {event['user_id']} - {event['action']}")
# 手動コミット
consumer.commit(asynchronous=False)
finally:
consumer.close()
6.2 Apache Flink
Flinkは状態(じょうたい)ベースのストリーム処理(しょり)エンジンで、正確(せいかく)に1回(exactly-once)のセマンティクスを提供(ていきょう)します。
# PyFlinkストリーム処理
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60000) # 60秒ごとにチェックポイント
t_env = StreamTableEnvironment.create(env)
# Kafkaソーステーブル定義
t_env.execute_sql("""
CREATE TABLE user_events (
user_id STRING,
action STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-processor',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# ウィンドウ集約
t_env.execute_sql("""
CREATE TABLE page_view_stats (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
page STRING,
view_count BIGINT,
unique_users BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://analytics-db:5432/stats',
'table-name' = 'page_view_stats',
'driver' = 'org.postgresql.Driver'
)
""")
t_env.execute_sql("""
INSERT INTO page_view_stats
SELECT
window_start,
window_end,
action AS page,
COUNT(*) AS view_count,
COUNT(DISTINCT user_id) AS unique_users
FROM TABLE(
TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, action
""")
6.3 Exactly-Onceセマンティクス
配信保証レベル:
At-most-once :メッセージ喪失の可能性あり、重複なし
At-least-once :メッセージ喪失なし、重複の可能性あり
Exactly-once :メッセージ喪失なし、重複なし(最も困難)
Kafka + Flink Exactly-Once:
1. Kafkaトランザクショナルプロデューサー
2. Flinkチェックポイント(Chandy-Lamport)
3. Two-Phase Commit Protocol
4. Kafkaコンシューマーオフセットをチェックポイントと連動
7. dbt(data build tool)
7.1 dbt概要(がいよう)
dbtはELTのT(Transform)を担当(たんとう)するツールです。SQLでデータ変換(へんかん)ロジックを作成(さくせい)し、ソフトウェアエンジニアリングのベストプラクティス(バージョン管理(かんり)、テスト、ドキュメンテーション)をデータ変換(へんかん)に適用(てきよう)します。
dbtプロジェクト構造:
my_dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/
│ │ ├── stg_orders.sql
│ │ ├── stg_customers.sql
│ │ └── _staging_sources.yml
│ ├── intermediate/
│ │ └── int_order_items_grouped.sql
│ └── marts/
│ ├── dim_customers.sql
│ ├── fact_orders.sql
│ └── _marts_schema.yml
├── tests/
│ └── assert_positive_revenue.sql
├── macros/
│ └── generate_schema_name.sql
└── seeds/
└── country_codes.csv
7.2 モデルの作成(さくせい)
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
user_id AS customer_id,
amount AS order_amount,
status AS order_status,
created_at AS ordered_at
FROM source
WHERE status != 'cancelled'
)
SELECT * FROM renamed
-- models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
)
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.customer_segment,
o.order_amount,
o.order_amount * 1.1 AS amount_with_tax,
o.order_status,
o.ordered_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o.ordered_at > (SELECT MAX(ordered_at) FROM {{ this }})
{% endif %}
7.3 ソースとテスト
# models/staging/_staging_sources.yml
version: 2
sources:
- name: raw
database: raw_db
schema: public
tables:
- name: orders
loaded_at_field: _loaded_at
freshness:
warn_after:
count: 12
period: hour
error_after:
count: 24
period: hour
columns:
- name: id
tests:
- unique
- not_null
- name: amount
tests:
- not_null
- name: raw
tables:
- name: customers
columns:
- name: id
tests:
- unique
- not_null
# models/marts/_marts_schema.yml
version: 2
models:
- name: fact_orders
description: "注文ファクトテーブル"
columns:
- name: order_id
description: "注文一意ID"
tests:
- unique
- not_null
- name: order_amount
tests:
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
-- tests/assert_positive_revenue.sql
-- カスタムテスト:売上が正の値であることを検証
SELECT order_id, order_amount
FROM {{ ref('fact_orders') }}
WHERE order_amount < 0
7.4 dbtコマンド
# 全モデルビルド
dbt run
# 特定モデルのみビルド
dbt run --select fact_orders
# モデル + ダウンストリームをビルド
dbt run --select stg_orders+
# テスト実行
dbt test
# ドキュメント生成
dbt docs generate
dbt docs serve
# ソース鮮度確認
dbt source freshness
# シードデータロード
dbt seed
# フルパイプライン(ビルド + テスト)
dbt build
8. データウェアハウス
比較表(ひかくひょう)
| 項目 | BigQuery | Snowflake | Redshift |
|---|---|---|---|
| ベンダー | Google Cloud | Snowflake | AWS |
| アーキテクチャ | サーバーレス | コンピュート/ストレージ分離 | MPPクラスタ |
| 課金 | クエリ当たり(オンデマンド) | クレジットベース | ノード時間当たり |
| スケーラビリティ | 自動 | ウェアハウスリサイジング | ノード追加 |
| 同時実行 | 2000+スロット | マルチクラスタ | WLM設定 |
| 半構造化データ | STRUCT, ARRAY | VARIANT | SUPER |
| ML統合 | BigQuery ML | Snowpark | Redshift ML |
| コスト効率 | 小規模に有利 | 中規模に有利 | 大規模常時運用に有利 |
BigQuery例(れい)
-- BigQuery:パーティション + クラスタリング
CREATE TABLE analytics.fact_orders
PARTITION BY DATE(ordered_at)
CLUSTER BY customer_segment, order_status
AS
SELECT
order_id,
customer_id,
customer_segment,
order_amount,
order_status,
ordered_at
FROM staging.orders;
-- コスト見積もり(dry run)
-- 1TBスキャン = 約$5(オンデマンド)
Snowflake例(れい)
-- Snowflake:ウェアハウス作成と管理
CREATE WAREHOUSE etl_wh
WITH WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3;
-- データロード
COPY INTO raw.orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
PATTERN = '.*[.]parquet';
9. データレイク / レイクハウス
テーブルフォーマット比較(ひかく)
伝統的なデータレイクの問題点:
- ACIDトランザクションなし
- スキーマ強制なし
- タイムトラベル不可
- 小ファイル問題
レイクハウステーブルフォーマットが解決:
Delta Lake :Databricks主導、Spark統合最強
Apache Iceberg :Netflix開発、ベンダー中立
Apache Hudi :Uber開発、増分処理に特化
| 機能 | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| ACIDトランザクション | O | O | O |
| スキーマ進化 | O | O | O |
| タイムトラベル | O | O | O |
| パーティション進化 | 制限あり | O(隠れたパーティショニング) | 制限あり |
| エンジン互換性 | Spark中心 | Spark, Flink, Trino | Spark, Flink |
| 主要プラットフォーム | Databricks | 複数ベンダー採用 | AWS中心 |
# Delta Lake例(PySpark)
from delta.tables import DeltaTable
# Deltaテーブル作成
orders_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("order_date") \
.save("s3://data-lake/delta/orders")
# UPSERT(Merge)
delta_table = DeltaTable.forPath(spark, "s3://data-lake/delta/orders")
delta_table.alias("target").merge(
new_orders_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# タイムトラベル
old_data = spark.read \
.format("delta") \
.option("versionAsOf", 5) \
.load("s3://data-lake/delta/orders")
10. データ品質(ひんしつ)
10.1 Great Expectations
import great_expectations as gx
context = gx.get_context()
# データソース接続
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("orders", filepath_or_buffer="orders.csv")
# Expectation Suite定義
suite = context.add_expectation_suite("orders_validation")
# 期待値定義
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=100000
)
)
# 検証実行
results = context.run_checkpoint(
checkpoint_name="orders_checkpoint"
)
print(f"成功: {results.success}")
10.2 データ契約(けいやく)(Data Contracts)
# data-contracts/orders-contract.yaml
dataContractSpecification: 0.9.3
id: orders-contract
info:
title: Orders Data Contract
version: 1.0.0
owner: data-team
contact:
email: data-team@company.com
schema:
type: object
properties:
order_id:
type: string
description: "一意な注文ID"
required: true
unique: true
customer_id:
type: string
required: true
amount:
type: number
minimum: 0
maximum: 100000
status:
type: string
enum: ["pending", "completed", "cancelled"]
created_at:
type: timestamp
required: true
quality:
completeness:
- field: order_id
threshold: 100
- field: customer_id
threshold: 99.9
freshness:
maxDelay: "PT1H" # 1時間以内
11. オーケストレーション比較(ひかく)
Airflow vs Dagster vs Prefect vs Mage
| 項目 | Airflow | Dagster | Prefect | Mage |
|---|---|---|---|---|
| アプローチ | DAG中心 | アセット中心 | フロー中心 | ブロック中心 |
| 学習曲線 | 高い | 中程度 | 低い | 低い |
| ローカル開発 | 複雑 | 優秀 | 優秀 | 優秀 |
| テスト | 困難 | 内蔵サポート | 良好 | 良好 |
| UI | 機能的 | モダン | モダン | モダン |
| コミュニティ | 非常に大きい | 成長中 | 成長中 | 小規模 |
| プロダクション実績 | 非常に多い | 増加中 | 増加中 | 初期段階 |
| クラウド | MWAA, Composer | Dagster Cloud | Prefect Cloud | Mage Pro |
選択ガイド:
├── 大企業、複雑なワークフロー → Airflow
├── データアセット中心の考え方 → Dagster
├── クイックスタート、Pythonネイティブ → Prefect
└── ノーコード/ローコード志向 → Mage
12. モダンデータスタックダイアグラム
モダンデータスタック(2025):
データソース 収集/統合 格納 変換 分析/BI
────────── ────────── ────────── ────────── ──────────
SaaS APIs ──┐
Databases ──┼──→ Fivetran/Airbyte ──→ Snowflake ──→ dbt ──→ Looker
Event Logs ──┤ BigQuery Dataform Metabase
Files ──┘ Redshift Tableau
Kafka/ ──────→ Flink/Spark ──→ Delta Lake ──→ Spark SQL ──→ リアルタイム
Kinesis Streaming Iceberg ダッシュボード
オーケストレーション: Airflow / Dagster
品質: Great Expectations / dbt tests
カタログ: DataHub / Atlan / OpenMetadata
モニタリング: Monte Carlo / Datadog
13. クイズ
Q1: ETL vs ELT
ETLとELTの核心的な違いは何ですか?また、いつELTを選択すべきですか?
回答(かいとう):
核心的(かくしんてき)な違(ちが)いは、変換(Transform)が発生(はっせい)する場所(ばしょ)です。ETLは別(べつ)の変換サーバーで変換後(へんかんご)に格納(かくのう)し、ELTはデータをまずウェアハウスに格納(かくのう)してからウェアハウス内(ない)で変換(へんかん)します。
ELTを選択(せんたく)すべき場合(ばあい):
- クラウドウェアハウス(BigQuery、Snowflake)を使用している場合
- 元データの保存が重要な場合
- 変換ロジックが頻繁に変わり、柔軟性が必要な場合
- dbtのようなツールでSQLベースの変換を行いたい場合
Q2: Sparkパーティショニング
Sparkでrepartition()とcoalesce()の違いは何ですか?
回答(かいとう):
repartition()はフルシャッフルを実行(じっこう)し、指定(してい)された数(かず)のパーティションにデータを均等(きんとう)に再分配(さいぶんぱい)します。パーティション数(すう)を増(ふ)やす場合(ばあい)や特定(とくてい)カラムでパーティショニングする場合(ばあい)に使用(しよう)します。
coalesce()はシャッフルなしでパーティション数(すう)だけを減(へ)らします。既存(きそん)のパーティションを統合(とうごう)するため、パーティション数(すう)を減(へ)らす場合(ばあい)にのみ使用(しよう)でき、ネットワークコストが低(ひく)いです。
パーティション数を減らす場合はcoalesce()、増やす場合や均等分配が必要な場合はrepartition()を使用します。
Q3: Airflow XCom
AirflowにおけるXComの役割と制限事項は何ですか?
回答(かいとう):
XCom(Cross-Communication)はAirflowのタスク間(かん)で少量(しょうりょう)のデータを受(う)け渡(わた)すメカニズムです。メタデータDBに保存(ほぞん)されます。
制限事項(せいげんじこう):
- 少量データのみ転送可能(デフォルト48KB、最大数MB)
- 大容量データはS3/GCS等の外部ストレージのパスのみ転送
- シリアライズ可能なデータのみ転送可能(JSON serializable)
- メタデータDBに負荷がかかる可能性あり
代替策:大容量データは一時ファイルやクラウドストレージを使用し、XComではファイルパスのみ転送します。
Q4: Exactly-Onceセマンティクス
KafkaでExactly-Onceセマンティクスを実現する方法を説明してください。
回答(かいとう):
KafkaのExactly-Onceは3つの要素(ようそ)で実現(じつげん)されます。
-
Idempotent Producer:enable.idempotence=trueを設定すると、ブローカーが重複メッセージを自動的に除去します。
-
Transactional Producer:複数パーティション/トピックにまたがる原子的書き込みを保証します。initTransactions()、beginTransaction()、commitTransaction() APIを使用します。
-
Consumer read_committed:isolation.level=read_committedを設定すると、コミット済みトランザクションのメッセージのみ読み取ります。
FlinkとKafkaを連携させる場合、FlinkのチェックポイントメカニズムとkafkaのトランザクショナルAPIを組み合わせてEnd-to-End Exactly-Onceを達成します。
Q5: dbt増分(ぞうぶん)モデル
dbtのincrementalモデルはどのように動作し、いつ使用しますか?
回答(かいとう):
dbt incrementalモデルは、最後(さいご)の実行(じっこう)以降(いこう)に新(あら)たに追加(ついか)または変更(へんこう)されたデータのみを処理(しょり)します。
動作(どうさ)の仕組(しく)み:
- 初回実行時に全データを処理(CREATE TABLE AS)
- 以降の実行時は
is_incremental()条件で新データのみフィルタリング - 新データを既存テーブルにMERGEまたはINSERT
使用(しよう)するタイミング:
- 大容量ファクトテーブル(毎回の全再構築がコスト高の場合)
- イベント/ログデータ(時系列append)
- 段階的に増加するデータ
キーポイントはunique_keyの設定と適切な増分条件(WHERE句)の指定です。