Skip to content
Published on

データエンジニアリング&AIパイプライン完全ガイド: SparkからKafkaまで

Authors

目次

  1. データエンジニアリング概要
  2. Apache Spark
  3. Apache Kafka
  4. Apache Airflow
  5. dbt (Data Build Tool)
  6. Delta Lake / Apache Iceberg
  7. フィーチャーストア
  8. データ品質とモニタリング
  9. クラウドデータプラットフォーム
  10. クイズ

データエンジニアリング概要

データエンジニアリングはAI/MLシステムの根幹をなす分野です。データパイプラインを設計・構築し、分析やモデル学習に適した形でデータを提供することが中心的な役割です。

役割比較: データエンジニア vs データサイエンティスト vs MLエンジニア

役割主な責任主要ツール
データエンジニアパイプライン構築、データストア管理Spark、Kafka、Airflow、dbt
データサイエンティストデータ分析、モデル開発Python、Jupyter、Pandas、Scikit-learn
MLエンジニアモデルデプロイ、サービングインフラMLflow、Kubeflow、TFX、Ray

モダンデータスタックアーキテクチャ

モダンデータスタックはクラウド中心に再編されています:

  • インジェスト: Fivetran、Airbyte、Kafka
  • ストレージ: Snowflake、BigQuery、Redshift、Delta Lake
  • 変換: dbt、Spark、Beam
  • オーケストレーション: Airflow、Prefect、Dagster
  • 可視化: Tableau、Looker、Metabase

バッチ処理 vs ストリーミング処理

バッチ処理は大量のデータを定期的に処理します。データウェアハウスのETLジョブ、日次レポート生成、モデル再学習などに使用します。

ストリーミング処理はデータが生成されたらすぐに処理します。リアルタイム不正検知、異常検知、レコメンデーションシステムの更新などに適しています。

Lambdaアーキテクチャ vs Kappaアーキテクチャ

Lambdaアーキテクチャはバッチレイヤーとスピードレイヤーを並列で運用します。正確なバッチ結果とリアルタイムストリーム結果をサービングレイヤーで統合します。精度は高いですが運用の複雑さが増します。

Kappaアーキテクチャはストリーミングレイヤーのみを使って単純化します。Kafka StreamsやApache Flinkの進化により実用的な選択肢となりました。一貫性のあるコードベースで保守しやすいのが特徴です。


Apache Spark

Apache Sparkは大規模データ処理のための統合分析エンジンです。インメモリ処理によりHadoop MapReduceと比較して最大100倍の性能を発揮します。

Sparkアーキテクチャ

Sparkクラスターは3つのコアコンポーネントで構成されます:

  • Driver: SparkContextを実行してジョブを調整するマスタープロセス
  • Executor: 実際のデータ処理を行うワーカープロセス
  • クラスターマネージャー: YARN、Kubernetes、Mesos、またはSpark Standalone

RDD、DataFrame、Dataset API

RDD (Resilient Distributed Dataset) はSparkの基本データ構造で、分散コレクションに対する不変な操作を提供します。

DataFrame はスキーマを持つ分散コレクションで、SQLライクなAPIを提供します。Catalystオプティマイザーによってクエリを自動最適化します。

Dataset はDataFrameの型安全バージョンで、ScalaとJavaでコンパイル時の型チェックをサポートします。

PySparkによるAIフィーチャーエンジニアリング

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

spark = SparkSession.builder \
    .appName("AI Feature Engineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# データロードと変換
df = spark.read.parquet("s3://data-lake/features/")
df_clean = df.dropna() \
    .withColumn("age_group", when(col("age") < 30, "young")
                              .when(col("age") < 50, "middle")
                              .otherwise("senior"))

# 集計統計
stats_df = df_clean.groupBy("category").agg(
    count("*").alias("cnt"),
    avg("score").alias("avg_score")
)

# フィーチャーベクター生成
assembler = VectorAssembler(
    inputCols=["age", "income", "score"],
    outputCol="features"
)
ml_df = assembler.transform(df_clean)

# モデル学習
rf = RandomForestClassifier(numTrees=100, featuresCol="features")
model = rf.fit(ml_df)

Spark SQL最適化のヒント

  • パーティショニング: 頻繁にフィルタリングするカラムでパーティションを分けます。
  • ブロードキャストジョイン: 小さいテーブルにはブロードキャストジョインを使います。
  • キャッシュ: 繰り返し使うDataFrameは cache() または persist() でメモリに保持します。
  • AQE (Adaptive Query Execution): Spark 3.0以降の動的クエリ最適化を活用します。

Apache Kafka

Apache Kafkaは高スループット、耐障害性を備えた分散イベントストリーミングプラットフォームです。LinkedInが開発してオープンソース化し、リアルタイムAIパイプラインの中核インフラとなっています。

Kafkaアーキテクチャ

  • Broker: メッセージを保存・配信するサーバー。クラスターは複数のBrokerで構成されます。
  • Topic: メッセージが発行されるカテゴリ/フィード名
  • Partition: Topicを分割して並列処理を可能にします。各パーティションは順序を保証します。
  • Consumer Group: 同じTopicを共有するコンシューマーの集合。負荷分散を提供します。
  • Offset: パーティション内のメッセージの一意な位置識別子

プロデューサー/コンシューマーパターン

from kafka import KafkaProducer, KafkaConsumer
import json
import numpy as np

# プロデューサー: リアルタイムフィーチャーデータ送信
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_feature(data):
    producer.send('ml-features', value={
        'timestamp': data['ts'],
        'features': data['features'].tolist(),
        'user_id': data['user_id']
    })

# コンシューマー: リアルタイムAI推論
consumer = KafkaConsumer(
    'ml-features',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

import onnxruntime as ort
session = ort.InferenceSession('model.onnx')

for msg in consumer:
    features = np.array(msg.value['features']).reshape(1, -1)
    prediction = session.run(None, {'input': features.astype(np.float32)})[0]
    print(f"User {msg.value['user_id']}: {prediction}")

Kafka StreamsとksqlDB

Kafka StreamsはJava/Scalaライブラリで、KafkaトピックからデータをRead・変換してKafkaに書き戻すストリーム処理をサポートします。ksqlDBはSQLを使ってストリーミングクエリを書けるイベントストリーミングデータベースです。

リアルタイムAIパイプラインにおけるKafkaの役割:

  1. イベントソース (ユーザー行動、センサーデータなど)
  2. フィーチャーエンジニアリング結果の配送
  3. モデル予測結果の発行
  4. A/Bテスト結果の収集

Apache Airflow

Apache Airflowはパイプラインをプログラム的に定義、スケジューリング、モニタリングするワークフロー管理プラットフォームです。

DAG (Directed Acyclic Graph) の概念

DAGは有向非循環グラフで、Airflowでワークフローを表現する方法です。各ノードはTaskで、エッジは依存関係(実行順序)を表します。

Operatorの種類

  • PythonOperator: Python関数を実行
  • BashOperator: シェルコマンドを実行
  • SQLExecuteQueryOperator: SQLクエリを実行
  • KubernetesPodOperator: Kubernetes Podを実行
  • SparkSubmitOperator: Sparkジョブを送信

TaskFlow APIを使ったMLパイプライン

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2026, 1, 1))
def ml_training_pipeline():

    @task
    def extract_data():
        # データ抽出
        return {'rows': 10000, 'path': '/tmp/data.parquet'}

    @task
    def preprocess(info: dict):
        # 前処理
        import pandas as pd
        df = pd.read_parquet(info['path'])
        df_clean = df.dropna()
        return df_clean.to_dict()

    @task
    def train_model(data: dict):
        # モデル学習
        from sklearn.ensemble import GradientBoostingClassifier
        import mlflow

        with mlflow.start_run():
            model = GradientBoostingClassifier()
            # training logic here
            mlflow.sklearn.log_model(model, "model")
        return "model_trained"

    @task
    def validate_model(status: str):
        # モデル検証
        if status == "model_trained":
            return {"accuracy": 0.92, "passed": True}
        return {"passed": False}

    raw = extract_data()
    processed = preprocess(raw)
    result = train_model(processed)
    validate_model(result)

dag_instance = ml_training_pipeline()

Airflowのベストプラクティス

  • 冪等性 (Idempotency): 同じDAGを複数回実行しても結果が常に同じであるべきです。
  • 原子性 (Atomicity): Taskは成功か失敗のみで、部分的な状態があってはなりません。
  • XComの最小化: 大容量データはXComではなく外部ストレージを使います。
  • SLA設定: 重要なパイプラインにはSLA (Service Level Agreement) を設定します。

dbt (Data Build Tool)

dbtはアナリティクスエンジニアがSQLでデータ変換ロジックを書き、バージョン管理、テスト、ドキュメント化を行えるツールです。

dbtの役割とELTパターン

従来のETL (Extract-Transform-Load) とは異なり、dbtはELT (Extract-Load-Transform) パターンを採用しています。生データをまずデータウェアハウスにロードし、ウェアハウス内でSQLを使って変換します。

モデルレイヤー構造

models/
├── staging/          # 生ソースデータのクリーニング
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/     # ビジネスロジックの中間変換
│   └── int_order_items.sql
└── marts/           # 最終的なビジネス向けテーブル
    ├── fct_orders.sql
    └── dim_customers.sql

Jinjaテンプレートとref()関数

dbtはJinjaテンプレートエンジンを使ってSQLに動的な機能を追加します:

-- models/marts/fct_orders.sql
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.total_amount,
    o.created_at
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id
WHERE o.status = 'completed'

ref() 関数はモデル間の依存関係を自動的に推論し、正しい順序で実行します。

データテストとドキュメント化

# schema.yml
models:
  - name: fct_orders
    description: '完了した注文のファクトテーブル'
    columns:
      - name: order_id
        description: '注文の一意なID'
        tests:
          - unique
          - not_null
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

Delta Lake / Apache Iceberg

Delta LakeとApache IcebergはデータレイクにACIDトランザクションをもたらすオープンテーブルフォーマットです。

主要機能

  • ACIDトランザクション: 同時読み書き時のデータ整合性を保証
  • タイムトラベル: 任意の時点のデータスナップショットをクエリ可能
  • スキーマ進化: 既存データを壊さずにスキーマを変更可能
  • データバージョン管理: 変更履歴の追跡とロールバック

Delta Lake Merge/Upsert操作

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Upsert (Merge) 操作
delta_table = DeltaTable.forPath(spark, "/delta/users")
updates_df = spark.read.parquet("/tmp/updates/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.user_id = source.user_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# タイムトラベル: 昨日のデータを参照
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2026-03-16") \
    .load("/delta/users")

# バージョン指定で参照
df_v1 = spark.read.format("delta") \
    .option("versionAsOf", 1) \
    .load("/delta/users")

# 変更履歴を確認
delta_table.history().show()

Z-Orderクラスタリング

Z-Orderは関連データを同じファイルに配置してクエリパフォーマンスを向上させます:

spark.sql("""
    OPTIMIZE delta.`/delta/events`
    ZORDER BY (user_id, event_date)
""")

フィーチャーストア

フィーチャーストアはMLフィーチャーを一元管理するリポジトリです。フィーチャーの再利用、一貫性、ガバナンスを保証します。

オンラインストア vs オフラインストア

オフラインフィーチャーストアはモデル学習に使う大規模な履歴フィーチャーを保存します。データレイクにParquetやDelta Lake形式で格納されます。

オンラインフィーチャーストアはリアルタイム推論に使う最新フィーチャーを低レイテンシで提供します。Redis、DynamoDB、CassandraなどのインメモリまたはNoSQLデータベースに格納されます。

Feast (オープンソースフィーチャーストア)

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64

# フィーチャービューの定義
user_stats = FeatureView(
    name="user_stats",
    entities=["user_id"],
    schema=[
        Field(name="avg_purchase_amount", dtype=Float32),
        Field(name="purchase_count_30d", dtype=Int64),
    ],
    ttl=timedelta(days=30),
)

# オンラインフィーチャー取得 (リアルタイム推論)
store = FeatureStore(repo_path=".")
features = store.get_online_features(
    features=["user_stats:avg_purchase_amount", "user_stats:purchase_count_30d"],
    entity_rows=[{"user_id": 1001}],
).to_dict()

マネージドフィーチャーストア

  • Tecton: エンタープライズ向けフィーチャープラットフォーム、リアルタイム変換対応
  • Vertex AI Feature Store: GCP統合マネージドサービス
  • SageMaker Feature Store: AWS統合マネージドサービス
  • Databricks Feature Store: Delta Lakeベースのフィーチャー管理

データ品質とモニタリング

データ品質はAIシステムの信頼性を左右します。MLにおける「Garbage in, garbage out」は特に重要です。

Great Expectationsによるデータ検証

Great Expectationsはデータパイプラインに自動化されたデータ検証を追加するツールです:

import great_expectations as gx

context = gx.get_context()
datasource = context.sources.add_pandas("my_datasource")

# 期待値 (Expectation) の定義
suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_suite"
)

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("total_amount", min_value=0, max_value=100000)
validator.expect_column_values_to_be_unique("order_id")
validator.save_expectation_suite()

# 検証の実行
checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[{"batch_request": batch_request, "expectation_suite_name": "orders_suite"}],
)
result = checkpoint.run()

データドリフト検知

データドリフトは本番データの統計的特性が学習データとずれていく現象です。Evidently AI、WhyLogs、Nannymlなどのツールで検知します:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=train_df, current_data=prod_df)
report.show(mode='inline')

データリネージ (Data Lineage) 追跡

データリネージはデータの出所から消費までの旅を追跡します。Apache Atlas、OpenLineage、Marquezなどのツールが広く使われています。


クラウドデータプラットフォーム

AWSデータスタック

  • Amazon S3: オブジェクトストレージ、データレイクの基盤
  • AWS Glue: サーバーレスETLサービス、データカタログ
  • Amazon Athena: S3データをSQLで分析
  • Amazon Redshift: クラウドデータウェアハウス
  • Amazon EMR: マネージドHadoop/Sparkクラスター

GCPデータスタック

  • Google BigQuery: サーバーレスデータウェアハウス、ML内蔵
  • Cloud Dataflow: Apache Beamベースのデータ処理
  • Cloud Pub/Sub: リアルタイムメッセージングサービス (Kafkaの代替)
  • Vertex AI: 統合MLプラットフォーム

Azureデータスタック

  • Azure Data Lake Storage (ADLS): 階層的名前空間ストレージ
  • Azure Synapse Analytics: 統合分析サービス
  • Azure Event Hub: 大規模イベントストリーミング (Kafka互換)
  • Azure Databricks: Sparkベースの分析プラットフォーム

Snowflakeデータプラットフォーム

Snowflakeはマルチクラウドデータプラットフォームで、ストレージとコンピューティングを分離したアーキテクチャが特徴です:

  • 仮想ウェアハウス: 独立してスケール可能なコンピューティングクラスター
  • データ共有: 組織間での安全なデータ共有
  • Snowpipe: 自動化された継続的データ取り込み
  • Cortex: 内蔵AI/ML機能

クイズ

Q1. LambdaアーキテクチャとKappaアーキテクチャの主な違いは何ですか?

回答: Lambdaアーキテクチャはバッチレイヤーとスピードレイヤーの両方を維持しますが、Kappaアーキテクチャはストリーミングレイヤーのみを使って単純化します。

解説: Lambdaアーキテクチャは精度(バッチ)とリアルタイム性(ストリーム)の両方を提供しますが、2つのレイヤーを別々に維持する必要があり運用複雑度が高くなります。Kafka StreamsやApache Flinkの進化によりストリーミングだけでも十分な精度を実現できるようになり、Kappaアーキテクチャが現実的な選択肢となりました。

Q2. Apache SparkにおけるDataFrameとRDDの主な違いは?

回答: DataFrameはスキーマを持つ構造化データを表現し、Catalystオプティマイザーによる自動クエリ最適化が行われます。RDDはスキーマのない低レベルAPIで細かい制御が可能ですが、最適化は開発者自身が行う必要があります。

解説: ほとんどの場合、DataFrameまたはDataset APIの使用が推奨されます。RDDはカスタムシリアライゼーションや低レベル制御が必要な場合にのみ使用します。Spark 3.0以降ではAdaptive Query Execution (AQE) によるランタイム最適化もサポートされています。

Q3. KafkaにおけるConsumer Groupの役割は何ですか?

回答: Consumer Groupは同じTopicを購読するコンシューマーの集合で、パーティションを各コンシューマーに分配して並列処理と負荷分散を提供します。

解説: 同じConsumer Group内では、各パーティションは1つのコンシューマーのみが読み取ります。コンシューマー数がパーティション数を超えると余剰コンシューマーはアイドル状態になります。異なるConsumer Groupは同じトピックを独立して読み取れるため、同じイベントストリームを複数の目的で使用できます。

Q4. dbtのref()関数の役割は何ですか?

回答: ref()関数は他のdbtモデルを参照する際に使用し、モデル間の依存関係を自動的に推論して正しい実行順序を決定します。

解説: ref()を使うことでdbtがDAG(依存グラフ)を構成し、環境(開発/本番)に応じて正しいデータベーススキーマとテーブル名を自動解決します。これによりコードの再利用性と保守性が大幅に向上します。

Q5. フィーチャーストアにおけるオンラインストアとオフラインストアの違いは何ですか?

回答: オフラインストアはモデル学習用の大規模な履歴データを保存し(Delta Lake、Parquet)、オンラインストアはリアルタイム推論用の最新フィーチャーを低レイテンシで提供します(Redis、DynamoDB)。

解説: 2つのストアを分ける理由は要件が異なるためです。学習は数十億行のデータをバッチで処理する高スループットが必要で、推論はミリ秒単位の低レイテンシが必要です。FeastのようなフィーチャーストアはÂ2つのストア間のフィーチャー一貫性も保証する役割を担います。


参考資料