- Authors

- Name
- Youngju Kim
- @fjvbn20031
目次
- データエンジニアリングとは
- データアーキテクチャ
- ETL vs ELT
- バッチ処理
- ストリーム処理
- ワークフローオーケストレーション
- データウェアハウス
- データ品質
- データガバナンス
- 実践パイプライン例
1. データエンジニアリングとは
データエンジニアリングとは、生データを収集・変換・保存し、分析可能な状態にする一連のプロセスである。データサイエンティストがモデルを構築し、アナリストがインサイトを導出するためには、まずデータがクリーンでアクセス可能な形に整備されている必要がある。この準備作業を担当するのがデータエンジニアである。
役割の比較
| 役割 | 主な業務 | コアスキル |
|---|---|---|
| データエンジニア | パイプライン構築、インフラ管理、データ変換 | Python, SQL, Spark, Kafka, Airflow |
| データサイエンティスト | モデリング、予測、実験設計 | Python, R, TensorFlow, 統計学 |
| データアナリスト | レポーティング、ダッシュボード、ビジネスインサイト | SQL, Tableau, Excel, BIツール |
データエンジニアは信頼できるデータインフラを構築する人である。いくら優れたモデルでも、データパイプラインが不安定であれば意味がない。
データエンジニアのコアコンピタンス
- SQL習熟度: 複雑なJOIN、ウィンドウ関数、CTEなどを自在に使いこなす
- プログラミング: Pythonが事実上の標準。ScalaやJavaもSpark生態系で使用
- 分散システムの理解: パーティショニング、レプリケーション、CAP定理等の基本概念
- クラウドサービス: AWS、GCP、Azureのデータ関連サービスの活用能力
- データモデリング: 正規化、非正規化、ディメンショナルモデリングの理解
2. データアーキテクチャ
現代のデータアーキテクチャは大きく4つのパターンに分類される。
2.1 データレイク (Data Lake)
データレイクは、構造化・半構造化・非構造化データを生の形で保存する大規模ストレージである。スキーマは読み取り時に適用する(Schema-on-Read)。
生データストア (Data Lake)
├── 構造化データ (CSV, Parquet, ORC)
├── 半構造化データ (JSON, XML, Avro)
└── 非構造化データ (画像, ログ, テキスト)
利点: 安価なストレージ、柔軟なスキーマ、あらゆる形式のデータを格納可能 欠点: 管理が難しく、適切に管理しないと「データスワンプ(Data Swamp)」になりうる
代表技術: AWS S3, Azure Data Lake Storage, Google Cloud Storage
2.2 データウェアハウス (Data Warehouse)
データウェアハウスは、分析に最適化された構造化データストアである。スキーマは書き込み時に適用する(Schema-on-Write)。
分析用ストア (Data Warehouse)
├── ファクトテーブル (売上, 注文, クリック)
├── ディメンションテーブル (ユーザー, 商品, 日付)
└── 集約テーブル (日別/月別サマリー)
利点: 高速なクエリ性能、一貫したスキーマ、ACIDトランザクション対応 欠点: 非構造化データの取り扱いに限界、スキーマ変更が困難
代表技術: Snowflake, BigQuery, Amazon Redshift
2.3 レイクハウス (Lakehouse)
レイクハウスは、データレイクの柔軟性とデータウェアハウスの管理機能を融合したアーキテクチャである。
主な機能:
- ACIDトランザクション対応
- スキーマの強制と進化(Schema Evolution)
- データレイク上でのSQL分析
- ストリーミングとバッチ処理の統合
代表技術: Delta Lake, Apache Iceberg, Apache Hudi
2.4 メダリオンアーキテクチャ (Medallion Architecture)
メダリオンアーキテクチャは、データを3段階のレイヤーで管理する手法である。
Bronze(生データ)
→ ソースからそのまま取り込んだデータ
→ 最小限の変換のみ適用
Silver(精製データ)
→ 重複排除、型変換、バリデーション完了
→ ビジネスロジック適用前の状態
Gold(ビジネスデータ)
→ 集約、結合、ビジネスルール適用済み
→ ダッシュボードやMLモデルから直接利用
このパターンはDatabricksによって普及し、各段階でデータ品質を保証できるという利点がある。
3. ETL vs ELT
3.1 ETL (Extract, Transform, Load)
ETLは伝統的なデータ統合手法である。
ソース → [抽出] → [変換] → [ロード] → ウェアハウス
- Extract: ソースシステムからデータを抽出
- Transform: ステージングエリアでデータのクレンジング、変換、集約
- Load: 変換済みデータをターゲットシステムにロード
3.2 ELT (Extract, Load, Transform)
ELTはクラウドウェアハウスの強力なコンピューティングパワーを活用する現代的な手法である。
ソース → [抽出] → [ロード] → ウェアハウス → [変換]
- Extract: ソースからデータを抽出
- Load: 生データをそのままウェアハウスにロード
- Transform: ウェアハウス内部でSQLを使って変換
3.3 使い分けの基準
| 基準 | ETL | ELT |
|---|---|---|
| データ量 | 小〜中規模 | 大規模 |
| 変換の複雑さ | 複雑なビジネスロジック | SQLで表現可能な変換 |
| インフラ | オンプレミス | クラウド |
| データセキュリティ | 機密データの事前マスキング必要 | ウェアハウス内のアクセス制御で十分 |
| 代表ツール | Informatica, Talend | dbt, Fivetran, Airbyte |
3.4 主要ツール
dbt (data build tool): SQLベースの変換ツール。ELTのTの部分を担当する。
-- dbtモデル例: 日別売上集計
-- models/marts/daily_revenue.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
payments AS (
SELECT * FROM {{ ref('stg_payments') }}
)
SELECT
o.order_date,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(p.amount) AS total_revenue,
AVG(p.amount) AS avg_order_value
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.status = 'completed'
GROUP BY o.order_date
Airbyte: オープンソースのデータ統合プラットフォーム。300以上のコネクタを提供。
Fivetran: マネージドデータ統合サービス。設定が簡単で安定している。
4. バッチ処理
バッチ処理は、蓄積された大量のデータを一括で処理する方式である。リアルタイム性が不要な大半の分析ワークロードに適している。
4.1 Apache Spark
Apache Sparkは大規模データ処理のための統合分析エンジンである。
主な特徴:
- インメモリ処理: MapReduceと比較して最大100倍高速
- 統合API: バッチ、ストリーミング、ML、グラフ処理を1つのフレームワークで
- 多言語サポート: Python(PySpark), Scala, Java, R, SQL
# PySpark例: 日別売上集計
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, count
spark = SparkSession.builder \
.appName("DailyRevenue") \
.getOrCreate()
# データ読み込み
orders = spark.read.parquet("s3://data-lake/orders/")
payments = spark.read.parquet("s3://data-lake/payments/")
# 変換
daily_revenue = (
orders
.join(payments, "order_id")
.filter(col("status") == "completed")
.groupBy("order_date")
.agg(
count("order_id").alias("total_orders"),
spark_sum("amount").alias("total_revenue")
)
.orderBy("order_date")
)
# 結果の保存
daily_revenue.write \
.mode("overwrite") \
.parquet("s3://data-warehouse/daily_revenue/")
4.2 Spark SQLとDataFrame
Spark SQLを使えば、SQLに慣れたアナリストも大規模データを処理できる。
# DataFrameを登録してSQL使用
orders.createOrReplaceTempView("orders")
payments.createOrReplaceTempView("payments")
result = spark.sql("""
SELECT
o.order_date,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(p.amount) AS total_revenue
FROM orders o
JOIN payments p ON o.order_id = p.order_id
WHERE p.status = 'completed'
GROUP BY o.order_date
ORDER BY o.order_date
""")
4.3 MapReduceとの比較
| 基準 | MapReduce | Spark |
|---|---|---|
| 処理速度 | ディスクベース、低速 | インメモリベース、高速 |
| プログラミングモデル | MapとReduceの2段階のみ | 豊富な変換操作 |
| リアルタイム処理 | 不可 | Structured Streaming対応 |
| 学習コスト | 高い | 比較的低い |
| エコシステム | Hadoopエコシステム | 独立 + Hadoop互換 |
5. ストリーム処理
ストリーム処理は、データが生成された瞬間にリアルタイムで処理する方式である。
5.1 Apache Kafka
Kafkaは分散イベントストリーミングプラットフォームである。リアルタイムデータパイプラインとストリーミングアプリケーションの中核インフラとして使用される。
主要概念:
- Topic: メッセージが発行されるカテゴリ
- Producer: トピックにメッセージを発行する主体
- Consumer: トピックからメッセージを購読する主体
- Broker: メッセージを保存・配信するサーバー
- Partition: トピックを分割して並列処理をサポート
- Consumer Group: 複数のConsumerがトピックを分担して処理
# Kafka Producer例 (Python)
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 注文イベントの発行
event = {
"order_id": "ORD-12345",
"user_id": "USR-678",
"amount": 45000,
"timestamp": "2026-04-13T10:30:00Z"
}
producer.send('order-events', value=event)
producer.flush()
# Kafka Consumer例 (Python)
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'order-events',
bootstrap_servers=['localhost:9092'],
group_id='order-processing-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
for message in consumer:
order = message.value
print(f"注文処理: {order['order_id']}, 金額: {order['amount']}")
5.2 CDC (Change Data Capture)
CDCは、データベースの変更をリアルタイムでキャプチャし、他のシステムに伝播する技術である。
運用DB → [CDC] → Kafka → [Stream Processing] → ウェアハウス
→ 検索エンジン
→ キャッシュ
代表ツール: Debezium。MySQL、PostgreSQL、MongoDB等の様々なDBから変更イベントをキャプチャし、Kafkaに送信する。
{
"before": null,
"after": {
"id": 1001,
"name": "田中太郎",
"email": "tanaka@example.com"
},
"source": {
"connector": "postgresql",
"db": "users_db",
"table": "users"
},
"op": "c",
"ts_ms": 1681364400000
}
上記のJSONはDebeziumがキャプチャしたCDCイベントの例である。"op": "c"はINSERT操作を意味する。
5.3 Apache Flink
Flinkはステートフルなストリーム処理エンジンである。Exactly-once処理を保証し、イベント時間ベースのウィンドウ処理に強みがある。
// Flinkストリーム処理例 (Java)
DataStream<OrderEvent> orders = env
.addSource(new FlinkKafkaConsumer<>(
"order-events",
new OrderEventSchema(),
kafkaProps
));
// 5分間のタンブリングウィンドウ集計
DataStream<WindowedRevenue> revenue = orders
.keyBy(OrderEvent::getCategory)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new RevenueAggregator());
revenue.addSink(new JdbcSink<>(...));
5.4 バッチ vs ストリーミング比較
| 基準 | バッチ処理 | ストリーム処理 |
|---|---|---|
| レイテンシ | 分〜時間 | ミリ秒〜秒 |
| データの完全性 | 完全なデータセット | 段階的に到着 |
| 複雑さ | 比較的シンプル | 状態管理、順序保証等が複雑 |
| コスト | 比較的安価 | 常時稼働のため高コスト |
| 適するユースケース | 日次レポート、ML学習 | リアルタイムダッシュボード、異常検知 |
6. ワークフローオーケストレーション
6.1 Apache Airflow
Airflowは、ワークフローをプログラマティックに作成・スケジューリング・監視するプラットフォームである。複雑なデータパイプラインをDAG(有向非巡回グラフ)として定義する。
主要概念:
- DAG: タスクの実行順序と依存関係を定義する有向非巡回グラフ
- Operator: 実際の作業を実行するユニット(BashOperator, PythonOperator等)
- Task: DAG内の個別作業インスタンス
- Sensor: 特定の条件が満たされるまで待機する特殊なOperator
- XCom: Task間のデータ受け渡しメカニズム
# Airflow DAG例
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['team@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='daily_revenue_pipeline',
default_args=default_args,
description='日別売上データ処理パイプライン',
schedule_interval='0 2 * * *', # 毎日午前2時
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['revenue', 'daily'],
) as dag:
# 1. ソースデータの存在確認
check_source = S3KeySensor(
task_id='check_source_data',
bucket_name='raw-data-bucket',
bucket_key='orders/{{ ds }}/*.parquet',
timeout=3600,
poke_interval=300,
)
# 2. データ抽出
extract = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders_from_source,
)
# 3. データ変換
transform = PythonOperator(
task_id='transform_orders',
python_callable=transform_and_aggregate,
)
# 4. ウェアハウスへのロード
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_snowflake,
)
# 5. データ品質検証
validate = PythonOperator(
task_id='validate_data_quality',
python_callable=run_quality_checks,
)
# 依存関係の定義
check_source >> extract >> transform >> load >> validate
6.2 DAG設計のベストプラクティス
- 冪等性(Idempotency): 同じタスクを複数回実行しても結果が同じであること
- 原子性(Atomicity): 各Taskは1つの論理的な作業単位のみ実行
- リトライ戦略: 一時的なエラーに備えた適切なリトライ設定
- モニタリング: SLA設定、障害アラート、実行時間の追跡
- テスト: DAG構造テスト、Task単位のユニットテストが必須
6.3 他のオーケストレーションツール
| ツール | 特徴 | 適するケース |
|---|---|---|
| Apache Airflow | Pythonベース、豊富なエコシステム | 複雑なバッチパイプライン |
| Prefect | モダンAPI、動的ワークフロー | 柔軟なワークフローが必要 |
| Dagster | データアセット中心、強力な型システム | データ品質重視 |
| Mage | ノートブックスタイルのUI | 高速プロトタイピング |
| AWS Step Functions | サーバーレス、AWSネイティブ | AWS中心のアーキテクチャ |
7. データウェアハウス
7.1 主要クラウドウェアハウス
Snowflake
- コンピュートとストレージの分離アーキテクチャ
- マルチクラウド対応(AWS, Azure, GCP)
- Time Travel、Zero-Copy Cloneなどの独自機能
- 同時実行(Concurrency)処理に強み
Google BigQuery
- サーバーレスアーキテクチャ(インフラ管理不要)
- クエリ単位の課金モデル
- SQLでMLモデルを作成可能(BigQuery ML)
- 大規模データ分析に最適化
Amazon Redshift
- AWSエコシステムとの緊密な統合
- Redshift Serverlessでサーバーレスオプション提供
- Redshift SpectrumでS3データを直接クエリ可能
- 既存のPostgreSQLツールと互換
7.2 ディメンショナルモデリング
データウェアハウスで最も広く使われるモデリング手法は、スタースキーマとスノーフレークスキーマである。
スタースキーマ
中央にファクトテーブルがあり、周囲にディメンションテーブルが直接接続する構造である。
-- ファクトテーブル
CREATE TABLE fact_sales (
sale_id BIGINT PRIMARY KEY,
date_key INT REFERENCES dim_date(date_key),
product_key INT REFERENCES dim_product(product_key),
customer_key INT REFERENCES dim_customer(customer_key),
store_key INT REFERENCES dim_store(store_key),
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(12,2),
discount_amount DECIMAL(10,2)
);
-- ディメンションテーブル
CREATE TABLE dim_product (
product_key INT PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(200),
category VARCHAR(100),
subcategory VARCHAR(100),
brand VARCHAR(100),
unit_cost DECIMAL(10,2)
);
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
full_date DATE,
year INT,
quarter INT,
month INT,
week INT,
day_of_week VARCHAR(20),
is_holiday BOOLEAN
);
スノーフレークスキーマ
ディメンションテーブルがさらに細分化されたサブテーブルに正規化された構造である。スタースキーマよりストレージを節約できるが、JOINが増えてクエリが複雑になる。
7.3 SCD (Slowly Changing Dimension)
ディメンションデータの変更履歴を管理する方法である。
| タイプ | 説明 | 例 |
|---|---|---|
| SCD Type 1 | 既存値を上書き | 電話番号変更時に旧番号を削除 |
| SCD Type 2 | 新しい行を追加して履歴を保持 | 住所変更時に有効期間付きで新行を追加 |
| SCD Type 3 | 現在値と旧値を別カラムで管理 | current_address, previous_address |
8. データ品質
データ品質は、パイプラインの信頼性を決定する重要な要素である。「ゴミを入れればゴミが出る(Garbage In, Garbage Out)」というのは、データ分野で古くから言われてきた格言である。
8.1 データ品質の6つの次元
- 正確性(Accuracy): データが実際の値を正しく反映しているか
- 完全性(Completeness): 欠損データがないか
- 一貫性(Consistency): 複数システム間でデータに矛盾がないか
- 適時性(Timeliness): データが必要な時点で利用可能か
- 一意性(Uniqueness): 重複データがないか
- 妥当性(Validity): データが定義されたルールと形式に従っているか
8.2 Great Expectations
Great Expectationsはデータの検証、ドキュメント化、プロファイリングのためのオープンソースライブラリである。
import great_expectations as gx
# データコンテキストの作成
context = gx.get_context()
# データソース接続
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset(name="orders")
# 期待値(Expectation)の定義
batch = data_asset.build_batch_request(dataframe=df)
validator = context.get_validator(batch_request=batch)
# データ品質ルールの定義
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between(
"amount", min_value=0, max_value=1000000
)
validator.expect_column_values_to_be_in_set(
"status", ["pending", "completed", "cancelled", "refunded"]
)
validator.expect_column_values_to_match_regex(
"email", r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
# 検証の実行
results = validator.validate()
print(f"成功: {results.success}")
8.3 データオブザーバビリティ (Data Observability)
データオブザーバビリティとは、データシステムの状態を継続的に監視し、問題を早期に検出するプラクティスである。
主要指標:
- Freshness: データがどれだけ最新か
- Volume: 予想に対するデータ量の変化
- Schema: スキーマ変更の検知
- Distribution: データ分布の異常検出
- Lineage: データの出所と流れの追跡
代表ツール: Monte Carlo, Atlan, Soda, Elementary
9. データガバナンス
9.1 メタデータ管理
メタデータは「データに関するデータ」であり、大きく3種類に分類される。
- 技術メタデータ: スキーマ、データ型、パーティション情報、保存場所
- ビジネスメタデータ: データ定義、オーナー、SLA、ビジネスルール
- 運用メタデータ: 処理時間、行数、エラーログ、アクセス履歴
9.2 データカタログ
データカタログは、組織内の全データ資産を検索・探索・理解できるようにするツールである。
主な機能:
- メタデータの自動収集とインデックス作成
- データリネージ(Lineage)の可視化
- データディクショナリの管理
- タグと分類体系
- コラボレーション機能(コメント、評価、Wiki)
代表ツール: Apache Atlas, DataHub, Atlan, Alation
9.3 アクセス制御
データアクセス制御の核心原則:
- 最小権限の原則: 業務に必要な最小限の権限のみ付与
- ロールベースアクセス制御(RBAC): ロール単位で権限を管理
-- Snowflake RBAC例
CREATE ROLE data_analyst;
CREATE ROLE data_engineer;
CREATE ROLE data_admin;
-- アナリストロール: 読み取り専用
GRANT USAGE ON DATABASE analytics_db TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics_db.gold TO ROLE data_analyst;
-- エンジニアロール: 読み書き可能
GRANT ALL PRIVILEGES ON DATABASE analytics_db TO ROLE data_engineer;
GRANT ALL PRIVILEGES ON ALL SCHEMAS IN DATABASE analytics_db TO ROLE data_engineer;
-- ユーザーにロールを割り当て
GRANT ROLE data_analyst TO USER analyst_kim;
GRANT ROLE data_engineer TO USER engineer_park;
- 行レベルセキュリティ(RLS): ユーザーごとにアクセス可能な行を制限
- 列レベルセキュリティ(CLS): 機密カラムへのアクセス制限(マスキング含む)
- 監査ログ(Audit Log): 全データアクセス履歴の記録
10. 実践パイプライン例
10.1 全体アーキテクチャ
実環境でのエンドツーエンドデータパイプライン構成例である。
[ソースシステム]
├── 運用DB (PostgreSQL) ─── Debezium CDC ──┐
├── Webイベント (Clickstream) ─── SDK ───────┤
├── 外部API ─── Airbyte ────────────────────┤
└── ファイル (CSV/Excel) ─── Airflow ────────┤
│
[メッセージブローカー] │
└── Apache Kafka <────────────────────────────┘
├── リアルタイム経路: Flink → リアルタイムダッシュボード
└── バッチ経路: Spark → S3 (Data Lake)
│
[変換レイヤー] │
└── dbt (Silver/Gold) <───────────┘
│
[ウェアハウス]
└── Snowflake
├── Goldレイヤー → Looker/Tableau ダッシュボード
└── ML Feature Store → モデル学習
10.2 Airflowによるパイプライン全体のオーケストレーション
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=10),
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='e2e_data_pipeline',
default_args=default_args,
schedule_interval='0 3 * * *',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# ステージ1: Sparkで生データ処理
spark_process = SparkSubmitOperator(
task_id='spark_raw_processing',
application='s3://scripts/process_raw_data.py',
conn_id='spark_default',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
},
)
# ステージ2: dbtでデータ変換
dbt_transform = DbtCloudRunJobOperator(
task_id='dbt_transform',
job_id=12345,
check_interval=30,
timeout=3600,
)
# ステージ3: データ品質検証
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=run_great_expectations_suite,
)
# ステージ4: 完了通知
notify = SlackWebhookOperator(
task_id='slack_notification',
slack_webhook_conn_id='slack_webhook',
message='Daily pipeline completed successfully.',
)
spark_process >> dbt_transform >> quality_check >> notify
10.3 Kafka + Spark リアルタイム処理例
# Spark Structured Streaming + Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("RealTimeRevenue") \
.getOrCreate()
# Kafkaスキーマ定義
order_schema = StructType([
StructField("order_id", StringType()),
StructField("user_id", StringType()),
StructField("amount", DoubleType()),
StructField("category", StringType()),
StructField("timestamp", TimestampType()),
])
# Kafkaからストリーミングデータを読み込み
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "order-events")
.option("startingOffsets", "latest")
.load()
)
# JSONパース
orders = (
raw_stream
.select(from_json(
col("value").cast("string"),
order_schema
).alias("data"))
.select("data.*")
)
# 5分ウィンドウ集計
windowed_revenue = (
orders
.withWatermark("timestamp", "10 minutes")
.groupBy(
window("timestamp", "5 minutes"),
"category"
)
.agg(spark_sum("amount").alias("revenue"))
)
# 結果をコンソールに出力(本番ではDBやダッシュボードへ)
query = (
windowed_revenue.writeStream
.outputMode("update")
.format("console")
.option("truncate", False)
.trigger(processingTime="1 minute")
.start()
)
query.awaitTermination()
まとめ
データエンジニアリングは、幅広い技術と概念を含む分野である。本記事で取り上げた内容を要約すると以下の通りである。
| 領域 | コア技術 | 代表ツール |
|---|---|---|
| データストレージ | レイク、ウェアハウス、レイクハウス | S3, Snowflake, Delta Lake |
| データ統合 | ETL/ELT | dbt, Airbyte, Fivetran |
| バッチ処理 | 分散コンピューティング | Apache Spark |
| ストリーム処理 | イベントストリーミング、CDC | Apache Kafka, Flink, Debezium |
| オーケストレーション | ワークフロー管理 | Apache Airflow, Dagster |
| データ品質 | 検証、オブザーバビリティ | Great Expectations, Monte Carlo |
| ガバナンス | メタデータ、アクセス制御 | DataHub, Apache Atlas |
最初からすべての技術を一度に学ぶ必要はない。SQLとPythonを基盤として、1つのパイプラインを実際に構築してみることが最も効果的な学習方法である。小さなプロジェクトから始めて、段階的に拡張していこう。
クイズ: データエンジニアリングの基本概念
Q1. データレイクとデータウェアハウスの最大の違いは?
A: データレイクはSchema-on-Read方式で、生データをそのまま保存し読み取り時にスキーマを適用する。データウェアハウスはSchema-on-Write方式で、データ保存時に事前定義されたスキーマを強制する。
Q2. ETLとELTで変換が行われる場所の違いは?
A: ETLではステージングエリア(別サーバー)で変換が行われた後、ウェアハウスにロードされる。ELTでは生データをまずウェアハウスにロードし、その後ウェアハウスのコンピューティングパワーを活用して内部で変換する。
Q3. KafkaにおけるConsumer Groupの役割は?
A: Consumer Groupは複数のConsumerが1つのトピックを分担して処理できるようにする仕組みである。同じグループ内のConsumerはそれぞれ異なるパーティションを担当し、並列処理が可能になる。これにより水平スケーリングが容易になる。
Q4. メダリオンアーキテクチャの3つの層は?
A: Bronze(生データをそのまま保存)、Silver(クレンジングと検証が完了したデータ)、Gold(ビジネスロジックと集約が適用された分析用データ)である。
Q5. AirflowにおけるDAGの役割は?
A: DAG(有向非巡回グラフ)はタスクの実行順序と依存関係を定義する。どのタスクがどのタスクの後に実行されるべきかを宣言的に表現するものである。