Skip to content
Published on

データエンジニアリングパイプライン完全ガイド2025:ETL/ELT、Spark、Airflow、リアルタイムストリーミング

Authors

1. データエンジニアリング概要(がいよう)

データエンジニアの役割(やくわり)

データエンジニアは、組織(そしき)のデータインフラを設計(せっけい)・構築(こうちく)・保守(ほしゅ)する役割(やくわり)を担(にな)います。データサイエンティストが分析(ぶんせき)できるよう、データを収集(しゅうしゅう)・変換(へんかん)・格納(かくのう)するパイプラインを構築(こうちく)するのが核心業務(かくしんぎょうむ)です。

データエンジニアの核心的な役割
├── データ収集(Ingestion)
│   ├── APIDB、ファイル、ストリームからデータ抽出
│   └── 多様なソースの統合
├── データ変換(Transformation)
│   ├── クレンジング、正規化、集約
│   └── ビジネスロジック適用
├── データ格納(Storage)
│   ├── データウェアハウス設計
│   └── データレイク構築
├── パイプラインオーケストレーション
│   ├── ワークフロー自動化
│   └── スケジューリングとモニタリング
└── データ品質管理
    ├── データ検証
    └── データ契約(Data Contract)

必須(ひっす)技術(ぎじゅつ)スタック

プログラミング     : Python, SQL, Scala/Java
データ処理         : Spark, Flink, Beam
オーケストレーション : Airflow, Dagster, Prefect
ストリーミング     : Kafka, Kinesis, Pub/Sub
変換ツール         : dbt, Dataform
クラウド           : AWS(Redshift, Glue), GCP(BigQuery), Azure(Synapse)
コンテナ           : Docker, Kubernetes
IaC               : Terraform, Pulumi
モニタリング       : Datadog, Grafana, Monte Carlo

2. ETL vs ELT

伝統的(でんとうてき)なETL

ETLはExtract(抽出(ちゅうしゅつ))、Transform(変換(へんかん))、Load(格納(かくのう))の略(りゃく)で、データをソースから抽出(ちゅうしゅつ)した後(あと)、変換サーバーで加工(かこう)して最終(さいしゅう)ストレージに格納(かくのう)します。

ETLフロー:
ソースDB ──Extract──→ 変換サーバー ──Transform──→ Load──→ データウェアハウス
                       (ETLサーバー)
# 伝統的なETL例(Python)
import pandas as pd
from sqlalchemy import create_engine

# Extract:ソースDBからデータ抽出
source_engine = create_engine('postgresql://source_db:5432/sales')
raw_data = pd.read_sql('SELECT * FROM orders WHERE date >= %s', source_engine, params=['2025-01-01'])

# Transform:データ変換
transformed = raw_data.copy()
transformed['total_with_tax'] = transformed['total'] * 1.1
transformed['order_month'] = pd.to_datetime(transformed['order_date']).dt.to_period('M')
transformed = transformed.dropna(subset=['customer_id'])
transformed = transformed[transformed['total'] > 0]

# Load:ウェアハウスに格納
wh_engine = create_engine('postgresql://warehouse:5432/analytics')
transformed.to_sql('fact_orders', wh_engine, if_exists='append', index=False)

現代的(げんだいてき)なELT

ELTはExtract(抽出)、Load(格納)、Transform(変換)の順序(じゅんじょ)で、生(なま)データをまずウェアハウスに格納(かくのう)してから、ウェアハウス内(ない)で変換(へんかん)します。

ELTフロー:
ソースDB ──Extract──→ Load──→ データウェアハウス ──Transform──→ 分析用テーブル
                                  (dbt等で変換)
-- ELT:ウェアハウス内でdbtにより変換
-- models/marts/fact_orders.sql
WITH source_orders AS (
    SELECT * FROM raw.orders
    WHERE order_date >= '2025-01-01'
),
cleaned AS (
    SELECT
        order_id,
        customer_id,
        total,
        total * 1.1 AS total_with_tax,
        DATE_TRUNC('month', order_date) AS order_month,
        order_date
    FROM source_orders
    WHERE customer_id IS NOT NULL
      AND total > 0
)
SELECT * FROM cleaned

ETL vs ELT比較表(ひかくひょう)

項目ETLELT
変換場所別サーバーウェアハウス内部
スケーラビリティETLサーバー性能に依存ウェアハウスの演算力活用
元データ変換後に原本喪失の可能性原本保持
コストETLサーバー運用コストウェアハウス演算コスト
柔軟性変換ロジック変更時に再処理SQLで柔軟に再変換
代表ツールInformatica, Talenddbt, Dataform
適合ケースレガシーシステム、規制要件クラウドネイティブ、ビッグデータ

3. バッチ vs ストリーム処理(しょり)

バッチ処理(しょり)

一定(いってい)の周期(しゅうき)で大量(たいりょう)のデータを一括(いっかつ)処理(しょり)する方式(ほうしき)です。

バッチ処理:
[データ蓄積] ──→ [一括処理] ──→ [結果格納]
  (1時間/1)     (Spark等)     (ウェアハウス)

特徴:
- 高スループット
- 高レイテンシ(分~時間)
- コスト効率的
- 再処理が容易

ストリーム処理(しょり)

データが到着(とうちゃく)した時点(じてん)でリアルタイムに処理(しょり)する方式(ほうしき)です。

ストリーム処理:
[イベント発生] ──→ [即座に処理] ──→ [リアルタイム結果]
  (連続的)         (Flink等)       (ダッシュボード/アラート)

特徴:
- 低レイテンシ(ミリ秒~秒)
- 連続処理
- 複雑な障害処理
- イベント順序管理が必要

選択基準(せんたくきじゅん)

バッチを選択する場合:
  - 日次/週次/月次レポート
  - 大規模データ集約
  - コスト最適化優先
  - リアルタイム不要

ストリームを選択する場合:
  - リアルタイムダッシュボード
  - 不正検知
  - リアルタイムレコメンデーション
  - IoTセンサーデータ
  - アラート/通知

ハイブリッド(Lambda/Kappa):
  - バッチ + ストリーム同時運用
  - リアルタイム近似値 + バッチ正確結果

4. Apache Spark

4.1 Spark概要(がいよう)

Apache Sparkは大規模(だいきぼ)データ処理(しょり)のための統合分析(とうごうぶんせき)エンジンです。インメモリ演算(えんざん)でMapReduce比(ひ)100倍(ばい)の性能(せいのう)を提供(ていきょう)します。

Sparkアーキテクチャ:
┌─────────────────────────────────┐
Spark Application├─────────────────────────────────┤
SparkSQLStreamingMLlib├─────────────────────────────────┤
DataFrame / Dataset├─────────────────────────────────┤
RDD (Core Engine)├─────────────────────────────────┤
StandaloneYARNMesosK8s│
└─────────────────────────────────┘

4.2 PySparkの基本(きほん)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lit

# SparkSession作成
spark = SparkSession.builder \
    .appName("DataPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# データ読み込み
orders_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://data-lake/raw/orders/")

users_df = spark.read.parquet("s3://data-lake/raw/users/")

# データ変換
result = orders_df \
    .filter(col("status") == "completed") \
    .join(users_df, orders_df.user_id == users_df.id, "inner") \
    .groupBy("user_id", "username") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("amount").alias("total_spent"),
        avg("amount").alias("avg_order_value")
    ) \
    .filter(col("total_orders") > 5)

# 結果保存
result.write \
    .mode("overwrite") \
    .partitionBy("order_month") \
    .parquet("s3://data-lake/processed/user_summary/")

4.3 SparkSQL

# 一時ビュー登録
orders_df.createOrReplaceTempView("orders")
users_df.createOrReplaceTempView("users")

# SQLで分析
monthly_revenue = spark.sql("""
    SELECT
        DATE_TRUNC('month', order_date) AS month,
        COUNT(DISTINCT user_id) AS unique_customers,
        COUNT(*) AS total_orders,
        SUM(amount) AS revenue,
        AVG(amount) AS avg_order_value
    FROM orders
    WHERE status = 'completed'
    GROUP BY DATE_TRUNC('month', order_date)
    ORDER BY month
""")

monthly_revenue.show()

4.4 パーティショニングとキャッシュ

# パーティション最適化
# 現在のパーティション数を確認
print(f"Partitions: {orders_df.rdd.getNumPartitions()}")

# リパーティション(シャッフル発生)
orders_repartitioned = orders_df.repartition(100, "order_date")

# コアレス(シャッフルなしでパーティション数を削減)
orders_coalesced = orders_df.coalesce(50)

# キャッシュ
from pyspark.storagelevel import StorageLevel

# メモリキャッシュ
orders_df.cache()
orders_df.count()  # キャッシュをトリガー

# メモリ + ディスクキャッシュ
users_df.persist(StorageLevel.MEMORY_AND_DISK)

# キャッシュ解放
orders_df.unpersist()

4.5 Sparkパフォーマンスチューニング

# ブロードキャストジョイン(小さいテーブル用)
from pyspark.sql.functions import broadcast

# 小さいテーブルをブロードキャスト
result = orders_df.join(
    broadcast(users_df),
    orders_df.user_id == users_df.id
)

# AQE(Adaptive Query Execution)有効化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Sparkパフォーマンス最適化チェックリスト:
  [ ] 適切なパーティション数を設定(コア数の23倍)
  [ ] データスキュー処理(salting、AQE  [ ] ブロードキャストジョイン活用
  [ ] 不必要なシャッフル最小化
  [ ] カラムプルーニング(必要なカラムのみ選択)
  [ ] Predicate Pushdown活用
  [ ] キャッシュ戦略策定
  [ ] シリアライゼーションフォーマット最適化(Parquet、ORC

5. Apache Airflow

5.1 Airflow概要(がいよう)

Apache Airflowは、ワークフローをプログラム的(てき)に作成(さくせい)・スケジューリング・モニタリングするためのプラットフォームです。DAG(有向(ゆうこう)非巡回(ひじゅんかい)グラフ)でタスク間(かん)の依存関係(いぞんかんけい)を定義(ていぎ)します。

Airflowアーキテクチャ:
┌──────────────────────────────────┐
Web Server       (UI / REST API)├──────────────────────────────────┤
Scheduler   (DAGパース、タスクスケジューリング)├──────────────────────────────────┤
Executor   (Local/Celery/Kubernetes)├──────────────────────────────────┤
Metadata Database       (PostgreSQL/MySQL)└──────────────────────────────────┘

5.2 DAGの作成(さくせい)

# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data-team@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

with DAG(
    dag_id='daily_etl_pipeline',
    default_args=default_args,
    description='日次ETLパイプライン',
    schedule_interval='0 6 * * *',  # 毎日午前6時
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['etl', 'daily'],
    max_active_runs=1,
) as dag:

    # センサー:ファイル到着待ち
    wait_for_data = FileSensor(
        task_id='wait_for_data',
        filepath='/data/raw/daily_export.csv',
        poke_interval=300,  # 5分ごとに確認
        timeout=3600,       # 最大1時間待機
        mode='poke',
    )

    # 抽出
    def extract_data(**context):
        import pandas as pd
        execution_date = context['ds']
        df = pd.read_csv(f'/data/raw/daily_export_{execution_date}.csv')
        df.to_parquet(f'/data/staging/extract_{execution_date}.parquet')
        return len(df)

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    # 変換
    def transform_data(**context):
        import pandas as pd
        execution_date = context['ds']
        df = pd.read_parquet(f'/data/staging/extract_{execution_date}.parquet')

        # データクレンジング
        df = df.dropna(subset=['customer_id', 'amount'])
        df = df[df['amount'] > 0]
        df['amount_with_tax'] = df['amount'] * 1.1

        df.to_parquet(f'/data/staging/transform_{execution_date}.parquet')
        return len(df)

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    # 格納
    load = PostgresOperator(
        task_id='load',
        postgres_conn_id='warehouse',
        sql='sql/load_daily_orders.sql',
    )

    # データ品質検証
    def validate_data(**context):
        execution_date = context['ds']
        row_count = context['ti'].xcom_pull(task_ids='transform')
        if row_count < 100:
            raise ValueError(f'レコード数不足: {row_count}')

    validate = PythonOperator(
        task_id='validate',
        python_callable=validate_data,
    )

    # 通知
    notify = BashOperator(
        task_id='notify',
        bash_command='echo "ETL完了: {{ ds }}" | mail -s "ETL Success" team@company.com',
    )

    # 依存関係定義
    wait_for_data >> extract >> transform >> load >> validate >> notify

5.3 TaskFlow API(Airflow 2.x)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['etl'],
)
def modern_etl_pipeline():

    @task()
    def extract():
        """ソースからデータ抽出"""
        import pandas as pd
        df = pd.read_csv('/data/raw/orders.csv')
        return df.to_dict()

    @task()
    def transform(raw_data: dict):
        """データ変換と整形"""
        import pandas as pd
        df = pd.DataFrame(raw_data)
        df = df[df['amount'] > 0]
        df['processed_at'] = datetime.now().isoformat()
        return df.to_dict()

    @task()
    def load(transformed_data: dict):
        """ウェアハウスに格納"""
        import pandas as pd
        df = pd.DataFrame(transformed_data)
        print(f"Loaded {len(df)} records")

    # 自動依存関係設定
    raw = extract()
    transformed = transform(raw)
    load(transformed)

# DAGインスタンス化
modern_etl_pipeline()

5.4 Connections、Variables、XCom

# Connections:UIまたはCLIで設定
# airflow connections add 'warehouse' \
#   --conn-type 'postgres' \
#   --conn-host 'warehouse.example.com' \
#   --conn-port 5432 \
#   --conn-login 'etl_user' \
#   --conn-password 'secret'

# Variables
from airflow.models import Variable

env = Variable.get("environment", default_var="dev")
config = Variable.get("pipeline_config", deserialize_json=True)

# XCom:タスク間データ受け渡し
def producer_task(**context):
    context['ti'].xcom_push(key='row_count', value=1000)

def consumer_task(**context):
    row_count = context['ti'].xcom_pull(
        task_ids='producer',
        key='row_count'
    )
    print(f"前タスクの結果: {row_count}行")

5.5 動的(どうてき)DAG生成(せいせい)

# dags/dynamic_dag_factory.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 設定ベースの動的DAG生成
configs = [
    {"name": "sales", "source": "mysql", "schedule": "@daily"},
    {"name": "users", "source": "postgres", "schedule": "@hourly"},
    {"name": "logs", "source": "s3", "schedule": "0 */6 * * *"},
]

def create_etl_dag(config):
    dag = DAG(
        dag_id=f"etl_{config['name']}",
        schedule_interval=config['schedule'],
        start_date=datetime(2025, 1, 1),
        catchup=False,
    )

    def process(**kwargs):
        print(f"Processing {config['name']} from {config['source']}")

    with dag:
        PythonOperator(
            task_id='process',
            python_callable=process,
        )

    return dag

for config in configs:
    globals()[f"etl_{config['name']}"] = create_etl_dag(config)

6. リアルタイムストリーミング

6.1 Apache Kafka

Kafkaは分散(ぶんさん)イベントストリーミングプラットフォームで、大規模(だいきぼ)リアルタイムデータパイプラインの基盤(きばん)です。

Kafkaアーキテクチャ:
Producer ──→ Broker(Topic/Partition) ──→ Consumer Group
                  ├── Partition 0: [msg1, msg2, msg3...]
                  ├── Partition 1: [msg4, msg5, msg6...]
                  └── Partition 2: [msg7, msg8, msg9...]
# Kafka Producer(Python)
from confluent_kafka import Producer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'acks': 'all',
    'retries': 3,
    'linger.ms': 10,
    'batch.size': 16384,
}

producer = Producer(config)

def delivery_callback(err, msg):
    if err:
        print(f'送信失敗: {err}')
    else:
        print(f'送信完了: {msg.topic()} [{msg.partition()}]')

# メッセージ送信
for i in range(100):
    event = {
        'user_id': f'user_{i}',
        'action': 'page_view',
        'timestamp': '2025-03-25T10:00:00Z',
        'page': '/products'
    }
    producer.produce(
        topic='user-events',
        key=str(event['user_id']),
        value=json.dumps(event),
        callback=delivery_callback
    )

producer.flush()
# Kafka Consumer(Python)
from confluent_kafka import Consumer
import json

config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'analytics-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}

consumer = Consumer(config)
consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))
        print(f"受信: {event['user_id']} - {event['action']}")

        # 手動コミット
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Flinkは状態(じょうたい)ベースのストリーム処理(しょり)エンジンで、正確(せいかく)に1回(exactly-once)のセマンティクスを提供(ていきょう)します。

# PyFlinkストリーム処理
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60000)  # 60秒ごとにチェックポイント

t_env = StreamTableEnvironment.create(env)

# Kafkaソーステーブル定義
t_env.execute_sql("""
    CREATE TABLE user_events (
        user_id STRING,
        action STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-processor',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# ウィンドウ集約
t_env.execute_sql("""
    CREATE TABLE page_view_stats (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        page STRING,
        view_count BIGINT,
        unique_users BIGINT
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://analytics-db:5432/stats',
        'table-name' = 'page_view_stats',
        'driver' = 'org.postgresql.Driver'
    )
""")

t_env.execute_sql("""
    INSERT INTO page_view_stats
    SELECT
        window_start,
        window_end,
        action AS page,
        COUNT(*) AS view_count,
        COUNT(DISTINCT user_id) AS unique_users
    FROM TABLE(
        TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
    )
    GROUP BY window_start, window_end, action
""")

6.3 Exactly-Onceセマンティクス

配信保証レベル:
  At-most-once  :メッセージ喪失の可能性あり、重複なし
  At-least-once :メッセージ喪失なし、重複の可能性あり
  Exactly-once  :メッセージ喪失なし、重複なし(最も困難)

Kafka + Flink Exactly-Once:
  1. Kafkaトランザクショナルプロデューサー
  2. Flinkチェックポイント(Chandy-Lamport)
  3. Two-Phase Commit Protocol
  4. Kafkaコンシューマーオフセットをチェックポイントと連動

7. dbt(data build tool)

7.1 dbt概要(がいよう)

dbtはELTのT(Transform)を担当(たんとう)するツールです。SQLでデータ変換(へんかん)ロジックを作成(さくせい)し、ソフトウェアエンジニアリングのベストプラクティス(バージョン管理(かんり)、テスト、ドキュメンテーション)をデータ変換(へんかん)に適用(てきよう)します。

dbtプロジェクト構造:
my_dbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── staging/
│   │   ├── stg_orders.sql
│   │   ├── stg_customers.sql
│   │   └── _staging_sources.yml
│   ├── intermediate/
│   │   └── int_order_items_grouped.sql
│   └── marts/
│       ├── dim_customers.sql
│       ├── fact_orders.sql
│       └── _marts_schema.yml
├── tests/
│   └── assert_positive_revenue.sql
├── macros/
│   └── generate_schema_name.sql
└── seeds/
    └── country_codes.csv

7.2 モデルの作成(さくせい)

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
    SELECT
        id AS order_id,
        user_id AS customer_id,
        amount AS order_amount,
        status AS order_status,
        created_at AS ordered_at
    FROM source
    WHERE status != 'cancelled'
)
SELECT * FROM renamed
-- models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
)
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.customer_segment,
    o.order_amount,
    o.order_amount * 1.1 AS amount_with_tax,
    o.order_status,
    o.ordered_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id

{% if is_incremental() %}
WHERE o.ordered_at > (SELECT MAX(ordered_at) FROM {{ this }})
{% endif %}

7.3 ソースとテスト

# models/staging/_staging_sources.yml
version: 2

sources:
  - name: raw
    database: raw_db
    schema: public
    tables:
      - name: orders
        loaded_at_field: _loaded_at
        freshness:
          warn_after:
            count: 12
            period: hour
          error_after:
            count: 24
            period: hour
        columns:
          - name: id
            tests:
              - unique
              - not_null
          - name: amount
            tests:
              - not_null

  - name: raw
    tables:
      - name: customers
        columns:
          - name: id
            tests:
              - unique
              - not_null
# models/marts/_marts_schema.yml
version: 2

models:
  - name: fact_orders
    description: "注文ファクトテーブル"
    columns:
      - name: order_id
        description: "注文一意ID"
        tests:
          - unique
          - not_null
      - name: order_amount
        tests:
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
-- tests/assert_positive_revenue.sql
-- カスタムテスト:売上が正の値であることを検証
SELECT order_id, order_amount
FROM {{ ref('fact_orders') }}
WHERE order_amount < 0

7.4 dbtコマンド

# 全モデルビルド
dbt run

# 特定モデルのみビルド
dbt run --select fact_orders

# モデル + ダウンストリームをビルド
dbt run --select stg_orders+

# テスト実行
dbt test

# ドキュメント生成
dbt docs generate
dbt docs serve

# ソース鮮度確認
dbt source freshness

# シードデータロード
dbt seed

# フルパイプライン(ビルド + テスト)
dbt build

8. データウェアハウス

比較表(ひかくひょう)

項目BigQuerySnowflakeRedshift
ベンダーGoogle CloudSnowflakeAWS
アーキテクチャサーバーレスコンピュート/ストレージ分離MPPクラスタ
課金クエリ当たり(オンデマンド)クレジットベースノード時間当たり
スケーラビリティ自動ウェアハウスリサイジングノード追加
同時実行2000+スロットマルチクラスタWLM設定
半構造化データSTRUCT, ARRAYVARIANTSUPER
ML統合BigQuery MLSnowparkRedshift ML
コスト効率小規模に有利中規模に有利大規模常時運用に有利

BigQuery例(れい)

-- BigQuery:パーティション + クラスタリング
CREATE TABLE analytics.fact_orders
PARTITION BY DATE(ordered_at)
CLUSTER BY customer_segment, order_status
AS
SELECT
    order_id,
    customer_id,
    customer_segment,
    order_amount,
    order_status,
    ordered_at
FROM staging.orders;

-- コスト見積もり(dry run)
-- 1TBスキャン = 約$5(オンデマンド)

Snowflake例(れい)

-- Snowflake:ウェアハウス作成と管理
CREATE WAREHOUSE etl_wh
    WITH WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE
    MIN_CLUSTER_COUNT = 1
    MAX_CLUSTER_COUNT = 3;

-- データロード
COPY INTO raw.orders
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
PATTERN = '.*[.]parquet';

9. データレイク / レイクハウス

テーブルフォーマット比較(ひかく)

伝統的なデータレイクの問題点:
  - ACIDトランザクションなし
  - スキーマ強制なし
  - タイムトラベル不可
  - 小ファイル問題

レイクハウステーブルフォーマットが解決:
  Delta Lake     :Databricks主導、Spark統合最強
  Apache Iceberg :Netflix開発、ベンダー中立
  Apache Hudi    :Uber開発、増分処理に特化
機能Delta LakeApache IcebergApache Hudi
ACIDトランザクションOOO
スキーマ進化OOO
タイムトラベルOOO
パーティション進化制限ありO(隠れたパーティショニング)制限あり
エンジン互換性Spark中心Spark, Flink, TrinoSpark, Flink
主要プラットフォームDatabricks複数ベンダー採用AWS中心
# Delta Lake例(PySpark)
from delta.tables import DeltaTable

# Deltaテーブル作成
orders_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .save("s3://data-lake/delta/orders")

# UPSERT(Merge)
delta_table = DeltaTable.forPath(spark, "s3://data-lake/delta/orders")

delta_table.alias("target").merge(
    new_orders_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# タイムトラベル
old_data = spark.read \
    .format("delta") \
    .option("versionAsOf", 5) \
    .load("s3://data-lake/delta/orders")

10. データ品質(ひんしつ)

10.1 Great Expectations

import great_expectations as gx

context = gx.get_context()

# データソース接続
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("orders", filepath_or_buffer="orders.csv")

# Expectation Suite定義
suite = context.add_expectation_suite("orders_validation")

# 期待値定義
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=100000
    )
)

# 検証実行
results = context.run_checkpoint(
    checkpoint_name="orders_checkpoint"
)
print(f"成功: {results.success}")

10.2 データ契約(けいやく)(Data Contracts)

# data-contracts/orders-contract.yaml
dataContractSpecification: 0.9.3
id: orders-contract
info:
  title: Orders Data Contract
  version: 1.0.0
  owner: data-team
  contact:
    email: data-team@company.com

schema:
  type: object
  properties:
    order_id:
      type: string
      description: "一意な注文ID"
      required: true
      unique: true
    customer_id:
      type: string
      required: true
    amount:
      type: number
      minimum: 0
      maximum: 100000
    status:
      type: string
      enum: ["pending", "completed", "cancelled"]
    created_at:
      type: timestamp
      required: true

quality:
  completeness:
    - field: order_id
      threshold: 100
    - field: customer_id
      threshold: 99.9
  freshness:
    maxDelay: "PT1H"  # 1時間以内

11. オーケストレーション比較(ひかく)

Airflow vs Dagster vs Prefect vs Mage

項目AirflowDagsterPrefectMage
アプローチDAG中心アセット中心フロー中心ブロック中心
学習曲線高い中程度低い低い
ローカル開発複雑優秀優秀優秀
テスト困難内蔵サポート良好良好
UI機能的モダンモダンモダン
コミュニティ非常に大きい成長中成長中小規模
プロダクション実績非常に多い増加中増加中初期段階
クラウドMWAA, ComposerDagster CloudPrefect CloudMage Pro
選択ガイド:
├── 大企業、複雑なワークフロー → Airflow
├── データアセット中心の考え方 → Dagster
├── クイックスタート、Pythonネイティブ → Prefect
└── ノーコード/ローコード志向 → Mage

12. モダンデータスタックダイアグラム

モダンデータスタック(2025):

データソース          収集/統合           格納             変換            分析/BI
──────────         ──────────       ──────────      ──────────     ──────────
 SaaS APIs  ──┐
 Databases  ──┼──→ Fivetran/Airbyte ──→ Snowflake  ──→   dbt     ──→  Looker
 Event Logs ──┤                         BigQuery       Dataform      Metabase
 Files      ──┘                         Redshift                    Tableau

 Kafka/     ──────→ Flink/Spark   ──→ Delta Lake  ──→ Spark SQL ──→ リアルタイム
 Kinesis             Streaming        Iceberg                      ダッシュボード

                               オーケストレーション: Airflow / Dagster
                               品質: Great Expectations / dbt tests
                               カタログ: DataHub / Atlan / OpenMetadata
                               モニタリング: Monte Carlo / Datadog

13. クイズ

Q1: ETL vs ELT

ETLとELTの核心的な違いは何ですか?また、いつELTを選択すべきですか?

回答(かいとう):

核心的(かくしんてき)な違(ちが)いは、変換(Transform)が発生(はっせい)する場所(ばしょ)です。ETLは別(べつ)の変換サーバーで変換後(へんかんご)に格納(かくのう)し、ELTはデータをまずウェアハウスに格納(かくのう)してからウェアハウス内(ない)で変換(へんかん)します。

ELTを選択(せんたく)すべき場合(ばあい):

  • クラウドウェアハウス(BigQuery、Snowflake)を使用している場合
  • 元データの保存が重要な場合
  • 変換ロジックが頻繁に変わり、柔軟性が必要な場合
  • dbtのようなツールでSQLベースの変換を行いたい場合

Q2: Sparkパーティショニング

Sparkでrepartition()とcoalesce()の違いは何ですか?

回答(かいとう):

repartition()はフルシャッフルを実行(じっこう)し、指定(してい)された数(かず)のパーティションにデータを均等(きんとう)に再分配(さいぶんぱい)します。パーティション数(すう)を増(ふ)やす場合(ばあい)や特定(とくてい)カラムでパーティショニングする場合(ばあい)に使用(しよう)します。

coalesce()はシャッフルなしでパーティション数(すう)だけを減(へ)らします。既存(きそん)のパーティションを統合(とうごう)するため、パーティション数(すう)を減(へ)らす場合(ばあい)にのみ使用(しよう)でき、ネットワークコストが低(ひく)いです。

パーティション数を減らす場合はcoalesce()、増やす場合や均等分配が必要な場合はrepartition()を使用します。

Q3: Airflow XCom

AirflowにおけるXComの役割と制限事項は何ですか?

回答(かいとう):

XCom(Cross-Communication)はAirflowのタスク間(かん)で少量(しょうりょう)のデータを受(う)け渡(わた)すメカニズムです。メタデータDBに保存(ほぞん)されます。

制限事項(せいげんじこう):

  • 少量データのみ転送可能(デフォルト48KB、最大数MB)
  • 大容量データはS3/GCS等の外部ストレージのパスのみ転送
  • シリアライズ可能なデータのみ転送可能(JSON serializable)
  • メタデータDBに負荷がかかる可能性あり

代替策:大容量データは一時ファイルやクラウドストレージを使用し、XComではファイルパスのみ転送します。

Q4: Exactly-Onceセマンティクス

KafkaでExactly-Onceセマンティクスを実現する方法を説明してください。

回答(かいとう):

KafkaのExactly-Onceは3つの要素(ようそ)で実現(じつげん)されます。

  1. Idempotent Producer:enable.idempotence=trueを設定すると、ブローカーが重複メッセージを自動的に除去します。

  2. Transactional Producer:複数パーティション/トピックにまたがる原子的書き込みを保証します。initTransactions()、beginTransaction()、commitTransaction() APIを使用します。

  3. Consumer read_committed:isolation.level=read_committedを設定すると、コミット済みトランザクションのメッセージのみ読み取ります。

FlinkとKafkaを連携させる場合、FlinkのチェックポイントメカニズムとkafkaのトランザクショナルAPIを組み合わせてEnd-to-End Exactly-Onceを達成します。

Q5: dbt増分(ぞうぶん)モデル

dbtのincrementalモデルはどのように動作し、いつ使用しますか?

回答(かいとう):

dbt incrementalモデルは、最後(さいご)の実行(じっこう)以降(いこう)に新(あら)たに追加(ついか)または変更(へんこう)されたデータのみを処理(しょり)します。

動作(どうさ)の仕組(しく)み:

  1. 初回実行時に全データを処理(CREATE TABLE AS)
  2. 以降の実行時はis_incremental()条件で新データのみフィルタリング
  3. 新データを既存テーブルにMERGEまたはINSERT

使用(しよう)するタイミング:

  • 大容量ファクトテーブル(毎回の全再構築がコスト高の場合)
  • イベント/ログデータ(時系列append)
  • 段階的に増加するデータ

キーポイントはunique_keyの設定と適切な増分条件(WHERE句)の指定です。


14. 参考資料(さんこうしりょう)