Skip to content
Published on

Hadoop アーキテクチャを見直す — HDFS、YARN、そしてその先へ

Authors

はじめに

しばらくの間、「ビッグデータ」という言葉はほとんど「Hadoop」と同義のように使われていました。2010 年代の前半から中盤にかけて、多くの企業がオンプレミスのデータセンターに数十から数百台のサーバーで構成された Hadoop クラスタを構築し、データエンジニアの求人にはほぼ例外なく Hadoop の経験が要件として書かれていました。

ところが 2020 年代に入ると、空気が変わりました。クラウドのオブジェクトストレージ(S3、GCS、Azure Blob)が事実上のデータ保存の標準になり、Spark が演算エンジンの主流となり、Iceberg / Delta / Hudi といったテーブルフォーマットを基盤とする「レイクハウス(Lakehouse)」アーキテクチャが登場しました。そのため「Hadoop は死んだ」と言う人もいます。

この記事は、その真偽を裁くためのものではありません。むしろ、Hadoop がどんな問題を解くために、どのように設計されたのかを最初から振り返り、それらの設計判断のうち、どこが今も有効で、どこが時代遅れになったのかを落ち着いて整理することを目的とします。Hadoop をきちんと理解すると、その上に築かれた現代のデータスタックのほぼすべての概念(分散ストレージ、データローカリティ、リソーススケジューリング、シャッフル)がより鮮明に見えてくるからです。

この記事で扱う内容は次のとおりです。

  • Hadoop が登場した背景と中核となる設計思想
  • HDFS の構造: NameNode、DataNode、ブロック、レプリケーション
  • HDFS の読み取り / 書き込み経路を図で追う
  • YARN の構造: ResourceManager、NodeManager、ApplicationMaster
  • MapReduce の仕組みとシャッフルの流れ
  • Hive、Spark、HBase などのエコシステム
  • オブジェクトストレージ / クラウド / レイクハウスの時代に変わった Hadoop の立ち位置
  • 実務での運用ポイントとよくある落とし穴

注: この記事の技術的な説明は Apache Hadoop の公式ドキュメント(hadoop.apache.org)に従っています。バージョンによって細かい動作が異なる場合があるので、実際の運用では使用するディストリビューションとバージョンのドキュメントを確認することをおすすめします。

Hadoop が解こうとした問題

Hadoop の起源は、Google が 2003 年と 2004 年に発表した 2 本の論文、すなわち GFS(Google File System)と MapReduce の論文です。当時 Google はウェブ全体をクロールしインデックス化するために膨大な量のデータを処理する必要があり、そのために高価な高性能サーバー数台ではなく、安価な汎用サーバー(commodity hardware)を数千台束ねて使う方向を選びました。

このアプローチには 2 つの大きな前提がありました。

  1. サーバーはいつでも故障する。数千台を運用すれば毎日数台が死ぬのが普通である。したがって障害は例外ではなく日常であり、システムは障害に耐える(fault-tolerant)ように設計しなければならない。
  2. データが大きすぎる場合、データを演算のある場所へ移すよりも、演算をデータのある場所へ移すほうがはるかに安い。これが「データローカリティ(data locality)」の概念です。

Hadoop はこの 2 つの思想をそのままオープンソースで実装したものです。HDFS は GFS に対応し、Hadoop MapReduce は Google の MapReduce に対応します。最初は Doug Cutting が Nutch という検索エンジンプロジェクトの一部として始め、のちに Apache のトップレベルプロジェクトとして独立しました。

中核となる設計目標を整理すると次のとおりです。

┌──────────────────────────────────────────────────────────────┐
│                 Hadoop の中核的な設計目標                     │
├──────────────────────────────────────────────────────────────┤
│ 1. スケーラビリティ(Scalability)                             │
│    水平スケール — ノードを追加して容量/スループットを増やす  │
│                                                              │
│ 2. 耐障害性(Fault Tolerance)                                 │
│    障害は正常 — レプリケーションと再試行でデータ/作業を守る  │
│                                                              │
│ 3. データローカリティ(Data Locality)                        │
│    演算をデータの近くへ — ネットワーク転送を最小化           │
│                                                              │
│ 4. 汎用ハードウェア(Commodity Hardware)                      │
│    安価なサーバー多数 — 高価な単一機材に依存しない           │
│                                                              │
│ 5. スループット優先(High Throughput, not Low Latency)        │
│    大容量バッチ処理に最適 — リアルタイム応答が目標ではない   │
└──────────────────────────────────────────────────────────────┘

最後の項目が特に重要です。Hadoop は本質的に「大きなデータを一度に順次まとめて読み、バッチで処理する」ことに最適化されています。小さなファイルを数百万個扱ったり、低レイテンシで単件照会をしたりする用途には、最初からあまり向いていませんでした。この特性は、後で落とし穴を扱う際に再び登場します。

Hadoop の全体構成

Hadoop は大きく 3 つの層に分けて理解すると分かりやすいです。

┌───────────────────────────────────────────────────────────────┐
│   処理レイヤ (Processing)                                     │
│   MapReduce · Spark · Tez · Hive · Pig · Flink ...           │
├───────────────────────────────────────────────────────────────┤
│   リソース管理レイヤ (Resource Management)                   │
│   YARN — ResourceManager / NodeManager / ApplicationMaster   │
├───────────────────────────────────────────────────────────────┤
│   ストレージレイヤ (Storage)                                 │
│   HDFS — NameNode / DataNode / Block / Replication           │
└───────────────────────────────────────────────────────────────┘
  • ストレージレイヤは HDFS が担当します。データを分散保存し、レプリケーションで保護します。
  • リソース管理レイヤは YARN が担当します。CPU / メモリといったリソースを複数のアプリケーションに分け与えます。
  • 処理レイヤにはさまざまなエンジンが載ります。初期は MapReduce だけでしたが、YARN の導入以降、Spark や Tez など複数のエンジンが同じクラスタを共有するようになりました。

この 3 層の分離は、Hadoop 2.x で YARN が導入されたことで明確になりました。Hadoop 1.x の時代は、MapReduce がストレージを除くすべて(スケジューリング + 実行)を担っていたため、MapReduce 以外の処理方式を使うのが難しかったのです。YARN は「リソース管理」を「処理エンジン」から切り離すことで、Hadoop を単なる MapReduce プラットフォームではなく、汎用の分散処理オペレーティングシステムのようなものにしました。

それでは、各レイヤを 1 つずつ深く見ていきます。

HDFS — 分散ファイルシステム

基本概念: ブロックとレプリケーション

HDFS の最も根本的なアイデアは「大きなファイルを固定サイズのブロック(block)に分割し、複数のノードに分散保存する」というものです。デフォルトのブロックサイズは長らく 64MB でしたが、最近のディストリビューションでは 128MB が一般的です(dfs.blocksize で設定)。

一般的なローカルファイルシステムのブロック(通常 4KB 程度)と比べると桁違いに大きいのですが、理由は明確です。ブロックが小さいと NameNode が管理すべきメタデータの数が爆発し、ディスクシーク(seek)コストが全体の転送時間に占める割合が大きくなります。ブロックを大きく取れば、一度シークしたあと長く順次読み取りができるのでスループットが良くなります。

ファイル: bigfile.log (380 MB)、ブロックサイズ 128 MB

   bigfile.log
   ┌──────────────┬──────────────┬──────────────┐
   │  Block A     │  Block B     │  Block C     │
   │  128 MB      │  128 MB      │  124 MB      │
   └──────────────┴──────────────┴──────────────┘

各ブロックはレプリケーション係数(replication factor、デフォルト3)
の分だけ複数の DataNode に複製されます。

   Block A ──▶ DataNode 1, DataNode 3, DataNode 5
   Block B ──▶ DataNode 2, DataNode 3, DataNode 6
   Block C ──▶ DataNode 1, DataNode 4, DataNode 6

レプリケーション係数が 3 ということは、同じブロックの複製が異なる 3 つの DataNode に存在するという意味です。したがって DataNode が 1 台や 2 台死んでもデータは失われません。この「レプリケーションによる耐障害性」が HDFS の核心です。

NameNode と DataNode

HDFS はマスター・スレーブ(master-slave)構造です。

┌─────────────────────────────────────────────────────────────┐
│                        NameNode (マスター)                   │
│  - ファイルシステムのネームスペース(ディレクトリ/ファイル木)を管理 │
│  - 各ファイルがどのブロックで構成されているかを記録          │
│  - 各ブロックがどの DataNode にあるかのマッピングを保持      │
│  - メタデータはメモリに常駐 (高速な照会のため)              │
└─────────────────────────────────────────────────────────────┘
        │            │            │            │
        ▼            ▼            ▼            ▼
   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
   │DataNode1│  │DataNode2│  │DataNode3│  │DataNode4│   (スレーブ)
   │ブロック保存│ │ブロック保存│ │ブロック保存│ │ブロック保存│
   └─────────┘  └─────────┘  └─────────┘  └─────────┘
   - 実際のブロックデータをローカルディスクに保存
   - 定期的に NameNode へハートビートとブロックレポートを送信
  • NameNode はメタデータのみを管理します。実際のデータのバイトは持っていません。代わりに「どのファイルがどのブロックで構成され、それらのブロックがどの DataNode にあるのか」をすべて把握しています。この情報は高速な照会のためにメモリに常駐します。
  • DataNode は実際のブロックデータを自身のローカルディスクに保存します。そして定期的に NameNode へハートビート(生存信号)とブロックレポート(自分が持つブロックの一覧)を送ります。

NameNode がメタデータをメモリに載せるという点は重要な含意を持ちます。ファイルやブロックが増えるほど NameNode のメモリ使用量が増えるため、「小さなファイルが数百万個」というワークロードは NameNode に大きな負担を与えます。これがまさに悪名高い「スモールファイル問題(small files problem)」です。

NameNode の永続化: FsImage と EditLog

NameNode のメタデータはメモリにありますが、再起動時に復元できる必要があります。そのために 2 つのディスク構造を使います。

┌──────────── NameNode メタデータの永続化 ──────────────┐
│                                                       │
│   FsImage : 特定時点のファイルシステム全体のスナップショット │
│   EditLog : FsImage 以降に発生したすべての変更のログ │
│                                                       │
│   再起動時の復元手順:                                 │
│     1) FsImage をロード (基準スナップショット)        │
│     2) EditLog を再生(replay) (それ以降の変更を反映)  │
│     3) メモリ上の最新メタデータを完成                 │
│                                                       │
└─────────────────────────────────────────────────────────┘

EditLog が際限なく大きくなると再起動が遅くなるので、定期的に FsImage と EditLog をマージして新しい FsImage を作る処理(checkpointing)が必要です。この処理は、以前は Secondary NameNode が担当し、HA(高可用性)構成では Standby NameNode が担当します。

ここでよくある誤解を 1 つ押さえておきます。Secondary NameNode は NameNode のバックアップ(代替)ではありません。名前とは裏腹にフェイルオーバー(failover)を実行できず、ただチェックポイントを手伝う補助役にすぎません。本当の高可用性は、以下で説明する HA 構成で達成します。

NameNode 高可用性(HA)

単一 NameNode 構成では、NameNode が単一障害点(SPOF、Single Point of Failure)になります。NameNode が死ぬとクラスタ全体のメタデータにアクセスできなくなるので、事実上すべての作業が止まります。これを解決するために HDFS は HA 構成を提供します。

        ┌──────────────┐   共有 EditLog     ┌──────────────┐
        │   Active     │◀────────────────▶│   Standby    │
        │   NameNode   │  (JournalNodes)   │   NameNode   │
        └──────┬───────┘                   └──────┬───────┘
               │                                  │
               │   DataNode は両方の NameNode に   │
               │   ハートビートとブロックレポートを送信 │
               ▼                                  ▼
        ┌───────────────────────────────────────────────┐
        │   DataNode 1 · DataNode 2 · DataNode 3 · ...   │
        └───────────────────────────────────────────────┘

   Active が死ぬ → ZKFC + ZooKeeper が検知 → Standby を Active へ昇格
  • Active NameNode がすべてのクライアント要求を処理します。
  • Standby NameNode は JournalNode を通じて共有される EditLog を再生し続け、Active と同期した状態を保ちます。
  • ZKFC(ZooKeeper Failover Controller)と ZooKeeper が Active の生存を監視し、障害発生時に Standby を新しい Active へ自動昇格(failover)します。

この過程で「スプリットブレイン(split-brain)」、つまり 2 つの NameNode が同時に自分が Active だと信じる状況を防ぐためのフェンシング(fencing)機構が重要です。誤った設定はデータ破損につながりうるので、運用時には注意が必要です。

HDFS の書き込み経路

それでは、実際にクライアントが HDFS にファイルを書き込むとき何が起きるのかを追ってみましょう。

[HDFS 書き込み経路]

  Client
    │ 1. create() 要求 — 「このパスにファイルを作りたい」
  NameNode
    │   - 権限/存在確認、ネームスペースにファイル項目を作成
    │ 2. 最初のブロックを置く DataNode の一覧を返す (複製3 → 3個)
    │      例: DN1 ──▶ DN2 ──▶ DN3 (パイプライン)
  Client
    │ 3. データをパケット単位で DN1 へ送信
  ┌─────────┐  複製パイプライン   ┌─────────┐        ┌─────────┐
  │  DN1    │ ── パケット転送 ──▶│  DN2    │ ──────▶ │  DN3    │
  │ディスク↓│                    │ディスク↓│        │ディスク↓│
  └─────────┘                    └─────────┘        └─────────┘
    │                                                    │
    │ 4. ack は逆方向:  DN3 ──▶ DN2 ──▶ DN1 ──▶ Client     │
  Client
    │ 5. ブロックが満杯になれば次のブロックのため 2〜4 を繰り返す
    │ 6. close() — NameNode に書き込み完了を通知
  NameNode (メタデータを最終確定)

核心は、レプリケーションが「複製パイプライン(replication pipeline)」方式で起きるという点です。クライアントは最初の DataNode にのみデータを送り、その DataNode が 2 番目へ、2 番目が 3 番目へとパケットを流します。こうすると、クライアントのアップロード帯域が一度だけ使われながらも 3 つの複製が作られます。

もう 1 つ重要なのが複製配置ポリシー(replica placement policy)です。デフォルトのポリシーはおおむね次のとおりです。

[デフォルトの複製配置 (replication = 3、ラック認識ベース)]

  複製1 : クライアントのいるノード(または同じラックの任意のノード)
  複製2 : 複製1 と異なるラックのノード
  複製3 : 複製2 と同じラックの別のノード

  目的:
   - 同じラックに2個 → ラック内ネットワークで高速書き込み
   - 別のラックに1個 → ラック全体の障害時にもデータを保全

この「ラック認識(rack awareness)」ポリシーは、書き込み性能(同じラック内ではネットワークが速い)と耐久性(ラック全体が死んでも生き残る)を折衷した結果です。

HDFS の読み取り経路

読み取りは比較的シンプルです。

[HDFS 読み取り経路]

  Client
    │ 1. open() — 「このファイルを読みたい」
  NameNode
    │ 2. ファイルのブロック一覧 + 各ブロックの DataNode 位置を返す
    │    (クライアントに近い順に並べて渡す)
  Client
    │ 3. 各ブロックごとに最も近い DataNode から直接読み取り
  ┌─────────┐  ┌─────────┐  ┌─────────┐
  │  DN (A) │  │  DN (B) │  │  DN (C) │   ← ブロックごとに近いノードを選ぶ
  └─────────┘  └─────────┘  └─────────┘
    │ 4. もしある DataNode が応答しなければ
    │    → 別の複製を持つ DataNode へ再試行
  Client (ブロックを繋ぎ合わせて完全なファイルに)

ここで NameNode は「どこから読め」という位置情報を渡すだけで、実際のデータはクライアントが DataNode と直接やり取りします。つまりデータトラフィックは NameNode を経由しません。これが、NameNode がメタデータに集中しながらもクラスタ全体のスループットを高く保てる秘訣です。

また NameNode が位置を並べる際に「クライアントに近い(同じノード → 同じラック → 別のラック)」順を優先するので、可能ならローカルまたは同じラックから読み取ります。これが読み取りにおけるデータローカリティです。

YARN — リソース管理レイヤ

なぜ YARN が必要だったのか

Hadoop 1.x では MapReduce が 2 つの仕事を同時にこなしていました。1 つはクラスタのリソース(スロット)を管理し作業をスケジューリングする仕事(JobTracker)、もう 1 つは実際のマップ / リデュースタスクを実行する仕事でした。この構造には問題がありました。

  • JobTracker にすべての負荷が集中し、スケーラビリティに限界がありました(おおむね 4,000 ノード規模でボトルネック)。
  • クラスタが MapReduce のジョブだけしか動かせませんでした。Spark のように別の方式でデータを処理したくても、同じクラスタを使うのが難しかったのです。
  • リソースが「マップスロット」と「リデューススロット」に静的に分けられ、非効率が大きかったのです。

YARN(Yet Another Resource Negotiator)はこの「リソース管理」と「作業実行」を分離しました。その結果、同じクラスタ上で MapReduce、Spark、Tez、Flink など複数の処理エンジンがリソースを共有して共存できるようになりました。

YARN の構成要素

┌──────────────────────────────────────────────────────────────┐
│                  ResourceManager (RM, マスター)              │
│   - Scheduler: クラスタのリソースをアプリに割り当てる        │
│   - ApplicationsManager: AM の起動/再起動を管理              │
└───────────────┬──────────────────────────────────────────────┘
                │ リソース割り当て / 状態報告
   ┌────────────┼────────────────────────┬───────────────┐
   ▼            ▼                        ▼               ▼
┌────────┐  ┌────────┐              ┌────────┐      ┌────────┐
│ Node-  │  │ Node-  │              │ Node-  │      │ Node-  │
│Manager │  │Manager │   ...        │Manager │      │Manager │
│ (NM)   │  │ (NM)   │              │ (NM)   │      │ (NM)   │
└───┬────┘  └───┬────┘              └───┬────┘      └───┬────┘
    │           │                       │               │
 コンテナ     コンテナ                コンテナ         コンテナ
 ┌──────┐   ┌──────┐               ┌──────┐        ┌──────┐
 │ App  │   │ Task │               │ Task │        │ Task │
 │Master│   │      │               │      │        │      │
 └──────┘   └──────┘               └──────┘        └──────┘
  • ResourceManager(RM)はクラスタ全体のマスターです。内部的に Scheduler(リソース割り当て担当)と ApplicationsManager(ApplicationMaster の寿命管理担当)に分かれます。
  • NodeManager(NM)は各ノードに 1 つずつ起動し、そのノードのリソース(CPU / メモリ)を管理してコンテナを実行・監視します。NameNode-DataNode の関係と同様に、RM-NM もマスター・スレーブ構造です。
  • ApplicationMaster(AM)はアプリケーション(ジョブ)ごとに 1 つずつ起動する「そのジョブ専用の管理者」です。ジョブに必要なリソースを RM に要求し、割り当てられたコンテナで実際のタスクを実行・監視します。
  • Container はリソース(例: メモリ 2GB + vCore 1 個)の束です。実際の作業(タスク)はコンテナの中で実行されます。

ここで ApplicationMaster の存在が YARN 設計の核心です。クラスタ全体を管理する RM は、ジョブ 1 つ 1 つの細部(タスクの進捗、再試行など)には関与しません。その仕事はジョブごとに別途起動する AM が担います。こうして責任を分散させることで、RM の負荷を減らしスケーラビリティを確保します。

YARN スケジューリングのシーケンス

クライアントがジョブを投入したとき、リソースがどのように交渉されるのかを順番に見ていきます。

[YARN ジョブ投入および実行シーケンス]

  Client                RM              NM(複数ノード)       AM
    │                    │                   │               │
    │ 1. submitApplication                   │               │
    │───────────────────▶│                   │               │
    │                    │ 2. AM コンテナ割り当て要求         │
    │                    │──────────────────▶│               │
    │                    │                   │ 3. AM コンテナ │
    │                    │                   │    起動        │
    │                    │                   │──────────────▶│
    │                    │ 4. AM 登録(register)               │
    │                    │◀──────────────────────────────────│
    │                    │ 5. タスク用コンテナ要求            │
    │                    │◀──────────────────────────────────│
    │                    │ 6. コンテナ割り当て応答            │
    │                    │──────────────────────────────────▶│
    │                    │                   │ 7. AM が NM に │
    │                    │                   │   タスク実行を │
    │                    │                   │   指示         │
    │                    │                   │◀──────────────│
    │                    │                   │ 8. タスク実行  │
    │                    │                   │  (コンテナ内)  │
    │ 9. 進捗を照会      │                   │               │
    │◀───────────────────┼───────────────────┼───────────────│
    │                    │ 10. 完了後 AM 登録解除、リソース返却 │
    │                    │◀──────────────────────────────────│
    ▼                    ▼                   ▼               ▼

流れを言葉にするとこうなります。

  1. クライアントが RM にアプリケーションを投入します。
  2. RM がどの NM で AM を起動するか決め、その NM に AM 用コンテナを起動するよう指示します。
  3. AM が起動すると RM に自身を登録し、ジョブ遂行に必要なタスク用コンテナを RM に要求します。
  4. RM の Scheduler がリソース状況を見てコンテナを割り当てます。
  5. AM は割り当てられたコンテナのある NM に実際のタスクを実行するよう指示します。
  6. タスクがコンテナの中で動き、AM が進捗を追跡します。
  7. ジョブが終わると AM が登録を解除し、リソースを返却します。

スケジューラの種類

YARN の Scheduler はポリシーに応じて複数の種類があります。代表的なのは次の 3 つです。

スケジューラ中核となる動作適した状況
FIFO Scheduler投入順に処理単純なテスト、単一ユーザー
Capacity Schedulerキューごとに容量を事前に保証複数チームが最低リソースを保証されるべきとき
Fair Scheduler実行中のジョブにリソースを公平に分配多様なジョブがリソースを均等に分け合いたいとき

大規模なマルチテナントクラスタでは通常 Capacity Scheduler か Fair Scheduler を使います。チーム別 / プロジェクト別にキューを分け、各キューに最低保証容量と最大使用上限を設定してリソース争奪を制御します。

MapReduce — 元祖の処理モデル

Map と Reduce

MapReduce は 2 段階の関数型演算でデータを処理します。

  • Map: 入力を受け取り、(キー、値)ペアの中間結果を作る。
  • Reduce: 同じキーを持つ値を集めて集計 / 要約する。

古典的なワードカウント(word count)の例を見てみましょう。

// Mapper: 1 行を受け取り、各単語について (word, 1) を放出
public class WordCountMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

  private final static IntWritable ONE = new IntWritable(1);
  private final Text word = new Text();

  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String line = value.toString();
    for (String token : line.split("\\s+")) {
      if (!token.isEmpty()) {
        word.set(token);
        context.write(word, ONE);
      }
    }
  }
}
// Reducer: 同じ単語に対する 1 をすべて足す
public class WordCountReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {

  private final IntWritable result = new IntWritable();

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable v : values) {
      sum += v.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}
// Driver: ジョブ設定と投入
public class WordCountDriver {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCountDriver.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    job.setCombinerClass(WordCountReducer.class); // マップ段階での部分集計
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

シャッフルとソート

MapReduce で最も重く、最も頻繁に性能ボトルネックになる段階が、まさに Map と Reduce の間のシャッフル(shuffle)です。Map の出力をキー基準で再分配し、同じキーが同じ Reducer に集まるようにする過程です。

[MapReduce 全体のデータフロー (シャッフルを含む)]

  入力 (HDFS ブロック群)
   ┌────────┐ ┌────────┐ ┌────────┐
   │ Split1 │ │ Split2 │ │ Split3 │
   └───┬────┘ └───┬────┘ └───┬────┘
       ▼          ▼          ▼
   ┌────────┐ ┌────────┐ ┌────────┐
   │ Map 1  │ │ Map 2  │ │ Map 3  │     ← 各 Map が (キー,値) を放出
   └───┬────┘ └───┬────┘ └───┬────┘
       │          │          │
       │  [パーティション + ソート + (任意)コンバイナ]
       │          │          │
       └────┬─────┴─────┬────┘
            │  S H U F F L E  │           ← ネットワークでキー基準に再分配
       ┌────┴─────┐ ┌────┴─────┐
       ▼          ▼ ▼          ▼
   ┌────────────────┐ ┌────────────────┐
   │  Reduce 1      │ │  Reduce 2      │  ← 同じキーは同じ Reducer へ
   │ (キー A,B 担当) │ │ (キー C,D 担当) │
   └───────┬────────┘ └───────┬────────┘
           ▼                  ▼
       ┌────────┐         ┌────────┐
       │ 出力 1 │         │ 出力 2 │       ← HDFS に結果を書き込み
       └────────┘         └────────┘

シャッフル段階で起きることをさらに細分化すると次のとおりです。

[Map 側 → Reduce 側のシャッフル詳細]

  Map 側:
   1. map() 出力がメモリバッファに溜まる
   2. バッファが満杯になればディスクへスピル(spill) — このときパーティション別にソート
   3. 複数のスピルファイルを 1 つにマージ(merge)
   4. (コンバイナ設定時) マップ側で部分集計してデータ量を縮小

  Reduce 側:
   5. 各 Map から自分のパーティションのデータを取得(fetch、ネットワーク)
   6. 取得したデータをキー基準でマージ + ソート
   7. reduce() 呼び出し — 同じキーの値を一度に処理

シャッフルはディスク I/O とネットワーク I/O の両方を発生させるためコストが大きいです。そのため、コンバイナ(combiner)でマップ段階に先んじて部分集計を行い、データ量を減らすことが重要です。ただし、コンバイナは交換法則と結合法則が成り立つ演算(例: 合計、カウント)にのみ安全に適用できます。平均のようにそうでない場合は誤った結果が出ることがあるので注意が必要です。

MapReduce の限界

MapReduce は堅牢ですが遅いです。最大の理由は、各段階の中間結果をディスク(HDFS)に書き込む点です。複数段階からなる複雑なパイプラインを MapReduce で組むと、段階ごとにディスクへ書いて再び読むコストが累積します。反復(iteration)が多い機械学習アルゴリズムでは、このコストが致命的でした。

この限界こそが、Spark が登場して急速に主流となった背景です。

Hadoop エコシステム

Hadoop は単一の製品ではなく、巨大なエコシステムです。HDFS と YARN の上にさまざまなツールが積み重なっています。

┌──────────────────────────────────────────────────────────────┐
│  SQL / クエリ:  Hive, Impala, Presto/Trino                   │
│  スクリプト:    Pig                                          │
│  処理エンジン:  MapReduce, Spark, Tez, Flink                 │
│  NoSQL:         HBase (HDFS 上の分散キー値/カラムストア)      │
│  収集:          Sqoop(RDB-HDFS), Flume/Kafka(ストリーム)     │
│  調整:          ZooKeeper (分散コーディネーション)           │
│  ワークフロー:  Oozie, Airflow                               │
├──────────────────────────────────────────────────────────────┤
│  YARN  (リソース管理)                                        │
├──────────────────────────────────────────────────────────────┤
│  HDFS  (分散ストレージ)                                      │
└──────────────────────────────────────────────────────────────┘

特によく言及される 3 つを取り上げます。

Hive — HDFS 上の SQL

Hive は HDFS に保存されたデータを SQL で照会できるようにするデータウェアハウスツールです。ユーザーが SQL(正確には HiveQL)を書くと、Hive がそれを内部的に MapReduce(または Tez / Spark)ジョブに変換して実行します。

-- 外部テーブル定義: HDFS パスのデータをテーブルのように扱う
CREATE EXTERNAL TABLE access_log (
  ip        STRING,
  ts        STRING,
  url       STRING,
  status    INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data/access_log/';

-- ステータスコード別に集計
SELECT status, COUNT(*) AS cnt
FROM access_log
GROUP BY status
ORDER BY cnt DESC;

Hive の登場には大きな意味がありました。Java で MapReduce を直接書けない多くのアナリストやデータ利用者が、慣れた SQL で Hadoop のデータにアクセスできるようになったからです。さらに Hive Metastore(テーブルのスキーマ / 位置のメタデータを管理する保存先)は、のちに Spark や Presto / Trino といった他のエンジンでも標準のように共有され、データカタログの事実上の標準となりました。

Spark — インメモリ処理エンジン

Spark は MapReduce のディスク中心の限界を克服するために登場しました。核心となるアイデアは、中間結果を可能な限りメモリに保持し、RDD(のちに DataFrame / Dataset)という抽象の上で DAG(有向非巡回グラフ)形式の実行計画を構成することです。

項目MapReduceSpark
中間データ各段階でディスクに書き込み可能ならメモリに保持
プログラミングモデルMap/Reduce 2 段階を強制DAG ベースの柔軟な演算
反復演算非常に非効率適している (ML など)
レイテンシ高い比較的低い
適した用途単純な大容量バッチ複雑なパイプライン、ML、インタラクティブ

重要な点は、Spark が Hadoop を丸ごと置き換えるのではなく、Hadoop の一部(MapReduce)を置き換えるということです。Spark は YARN 上で実行でき、HDFS をデータソースとして使えます。つまり Spark は処理レイヤの新しい強者であって、ストレージ / リソース管理レイヤまで丸ごと置き換えるものではありません(もっともクラウドではこの構図がまた変わります。後で扱います)。

HBase — HDFS 上の NoSQL

HDFS は大きなファイルを順次読み書きすることに最適化されているため、単件のランダム読み書きには不向きです。HBase はこの隙間を埋める分散カラム指向 NoSQL データベースで、Google の Bigtable 論文をもとに作られました。HDFS の上に載りますが、ランダムアクセスと低レイテンシの単件照会を可能にします。

[HBase の位置づけ]

   アプリケーション (低レイテンシの単件照会が必要)
   ┌──────────┐
   │  HBase   │   ← ランダム読み書き、行単位アクセス
   └────┬─────┘
   ┌──────────┐
   │  HDFS    │   ← 実際のデータファイル(HFile)を永続保存
   └──────────┘

オブジェクトストレージ・クラウド・レイクハウスの時代

ここまでが「古典的な Hadoop」の姿です。次は 2020 年代の変化について話す番です。結論から言うと、Hadoop の中核的なアイデアは生き残りましたが、その実装体(特に HDFS)の役割はクラウドで大きく縮みました。

最大の変化: ストレージとコンピュートの分離

古典的な Hadoop の核心的な前提は「データローカリティ」でした。つまり演算をデータのあるノードへ送り、ネットワークを節約することです。そのためにストレージ(HDFS)とコンピュート(MapReduce / YARN)が同じ物理ノードに同居している必要がありました(co-location)。

ところがクラウドでは話が変わります。

  • ネットワークが非常に速くなりました。データセンター内部の帯域が十分に大きく、「データを移すコスト」が以前ほど大きくありません。
  • オブジェクトストレージ(S3 など)が事実上無限に安く、耐久性の高い保存先を提供します。可用性 / 耐久性をクラウド事業者が担います。
  • ストレージとコンピュートを分離すれば、それぞれを独立してスケールできます。データはためておきつつ、演算が必要なときだけコンピュートをオンオフすればよいのです。
[古典的な Hadoop]                 [クラウド/レイクハウス]

 ┌──────────────┐                 コンピュート(必要なときだけ)
 │ノード=ストレージ│               ┌──────┐ ┌──────┐ ┌──────┐
 │     + コンピュート│ (co-location)│Spark │ │Trino │ │Flink │
 │ HDFS  +  YARN  │                └───┬──┘ └───┬──┘ └───┬──┘
 └──────────────┘                     │        │        │
   データローカリティ中心             └────────┼────────┘
   ストレージ・コンピュート結合                ▼
                                       ┌────────────────┐
                                       │ オブジェクトストレージ │
                                       │  S3 / GCS / ... │  (分離)
                                       └────────────────┘
                                  ストレージ/コンピュートを独立スケール

この分離がクラウドデータアーキテクチャの核心的な変化です。そしてこの構図において、HDFS の場所がオブジェクトストレージに置き換わるケースが増えました。

HDFS vs オブジェクトストレージ

項目HDFSオブジェクトストレージ (S3 など)
運用主体自分で運用(NameNode/DataNode)クラウド事業者のマネージド
スケールノード追加、NameNode メモリ制約事実上無制限
コストモデルサーバー/ディスクの固定費使用量ベース(保存+リクエスト)
一貫性強い一貫性強い一貫性(現代の S3 基準)
小さなファイルNameNode の負担(スモールファイル問題)比較的自由
データローカリティあり(同じノード/ラック)なし(ネットワーク経由)
rename/ディレクトリ高速なメタ演算高コストまたは非アトミックなことがある
適した環境オンプレミス、固定ワークロードクラウド、弾力的なワークロード

オブジェクトストレージは万能ではありません。最も有名な違いは、「ディレクトリの名前変更(rename)」が HDFS では高速なメタデータ演算である一方、オブジェクトストレージでは事実上コピー + 削除なので遅く、非アトミックになりうる点です。多くのデータパイプラインが「一時ディレクトリに書き、最後に rename でコミットする」パターンに依存しているため、この違いはテーブルフォーマット登場の背景になります。

テーブルフォーマットとレイクハウス

オブジェクトストレージはただの「ファイルの塊」にすぎず、テーブルという概念を知りません。トランザクション、スキーマ変更、タイムトラベル(time travel)、同時書き込みといった機能を与えてくれません。この隙間を埋めるために登場したのが、Apache Iceberg、Delta Lake、Apache Hudi といった「オープンテーブルフォーマット」です。

これらはオブジェクトストレージ上のファイル群を「テーブル」として抽象化し、メタデータレイヤを通じて ACID トランザクション、スキーマ進化、スナップショットベースのタイムトラベルなどを提供します。その結果、データレイクの柔軟性(どんなデータでも安く保存)とデータウェアハウスの信頼性(ACID、スキーマ)を合わせた「レイクハウス(Lakehouse)」アーキテクチャが可能になりました。

[レイクハウスの階層構造]

 ┌──────────────────────────────────────────────┐
 │  クエリ/処理: Spark, Trino, Flink, Dremio ... │
 ├──────────────────────────────────────────────┤
 │  テーブルフォーマット: Iceberg / Delta / Hudi │
 │   - ACID トランザクション - スキーマ進化 - タイムトラベル │
 ├──────────────────────────────────────────────┤
 │  ファイルフォーマット: Parquet / ORC (カラム指向) │
 ├──────────────────────────────────────────────┤
 │  ストレージ: オブジェクトストレージ(S3/GCS) または HDFS │
 └──────────────────────────────────────────────┘

興味深い点は、このスタックのいくつかの断片が Hadoop エコシステムに由来するという事実です。カラム指向のファイルフォーマットである Parquet と ORC はいずれも Hadoop 生態系で生まれ、テーブルフォーマットのメタデータ管理は Hive Metastore の限界を克服しようとする試みから出発しました。つまりレイクハウスは Hadoop と断絶したものではなく、Hadoop が解いていた問題をクラウド環境に合わせて再設計した結果に近いのです。

Hadoop の時代 vs レイクハウスの時代

観点Hadoop の時代 (2010 年代)レイクハウスの時代 (2020 年代)
ストレージHDFS (自前運用)オブジェクトストレージ (マネージド)
ストレージ-コンピュート結合(co-location)分離
主な演算エンジンMapReduceSpark, Trino, Flink
テーブル/トランザクションHive MetastoreIceberg/Delta/Hudi
リソース管理YARNKubernetes も台頭
スケール単位ノード(ストレージ+コンピュート同時)ストレージ/コンピュート独立
運用負担高い(自前でクラスタ運用)比較的低い(マネージド)

ここで YARN の場所も揺らいでいる点に注目できます。クラウドネイティブな環境では、リソーススケジューリングを Kubernetes が担うケースが増えています。Spark も YARN だけでなく Kubernetes 上で直接実行できます。

では Hadoop は死んだのか

「Hadoop は死んだ」という言葉は半分だけ正しいです。より正確に表現するとこうなります。

  • HDFS を自前で運用するオンプレミスクラスタの新規構築は明らかに減りました。クラウドを使うなら、オブジェクトストレージのほうが合理的な選択であることが多いです。
  • MapReduce は事実上、新規開発でほとんど使われません。Spark などに置き換わりました。
  • しかし Hadoop が確立した概念(分散ストレージ、レプリケーション、データローカリティ、リソーススケジューリング、シャッフル、カラムフォーマット、メタストア)は、現代のデータスタックの土台としてそのまま生きています。
  • また、データ主権 / 規制、コスト、セキュリティなどの理由で、いまだに大規模なオンプレミス Hadoop クラスタを運用する組織は少なくありません。こうした環境では Hadoop の知識は今も実務的に重要です。

要するに Hadoop は「特定の製品」としては頂点を過ぎましたが、「アイデアと語彙」としては現代のデータエンジニアリングの共通語として残っています。

実務・運用で知っておくべきこと

スモールファイル問題

先に触れた NameNode のメモリ制約のため、HDFS では小さなファイルが増えることを警戒すべきです。NameNode はファイル / ディレクトリ / ブロック 1 つあたり一定量のメモリを消費するので、1KB のファイル 1 億個は 1GB のファイル 1 億個と似たメタデータ負担を与えながら、処理効率ははるかに悪くなります。

代表的な緩和策は次のとおりです。

  • 小さなファイルを大きなファイルに統合するコンパクション(compaction)ジョブを定期的に回す
  • HAR(Hadoop Archive)や SequenceFile のようなコンテナフォーマットでまとめる
  • 収集段階でバッチサイズを大きくし、そもそも小さなファイルが生まれにくくする

データフォーマットの選択

同じデータでも、どのファイルフォーマットで保存するかによって性能とコストが大きく変わります。

フォーマット特徴適したケース
テキスト(CSV/JSON)人が読みやすい、非効率一時/デバッグ、外部連携
Avro行指向、スキーマ進化に優れる収集/ストリーミング、全行読み取り
Parquetカラム指向、圧縮/スキャン効率分析クエリ(特定カラムのみ読む)
ORCカラム指向、Hive と相性が良いHive ベースの分析

分析ワークロードでは、カラム指向フォーマット(Parquet / ORC)がほぼ常に有利です。クエリに必要なカラムだけを読み、カラム単位で圧縮がよく効き、述語プッシュダウン(predicate pushdown)で不要なデータスキャンを減らせるからです。

レプリケーション係数とコスト

レプリケーション係数 3 は安全ですが保存コストが 3 倍です。非常に大きなコールドデータ(ほとんど読まれない古いデータ)に対しては、HDFS のイレイジャーコーディング(Erasure Coding)を検討できます。イレイジャーコーディングはレプリケーションの代わりにパリティを使い、似たような耐久性を約 1.5 倍程度の保存オーバーヘッドで達成します。ただしエンコード / デコードの演算コストがあり、復旧が遅くなることがあるので、頻繁に読むホットデータよりコールドデータに適しています。

リソースチューニング

YARN 環境でジョブが遅かったり、しばしば死んだりするなら、次を点検します。

[よくある点検ポイント]

 - コンテナメモリ( yarn.scheduler.maximum-allocation-mb )が
   タスクに十分か? OutOfMemory でコンテナがキルされていないか
 - マップ/リデュースタスク数が適切か (少なすぎると並列性不足、
   多すぎるとスケジューリングのオーバーヘッド)
 - データスキュー(skew): 特定のキーに値が偏り、ある Reducer だけが長く動くか
 - シャッフル量が過剰でないか (コンバイナ/パーティショナで減らせるか)
 - キュー設定が適切か (特定のキューがリソースを独占していないか)

特にデータスキューは分散処理で最も多く、最も厄介な性能問題です。キーの分布が一方に偏ると、一部のタスクだけが過負荷になり、ジョブ全体がそのタスクを待つことになります。ソルト(salt)キーを加えたり、パーティショナに手を入れたりして分散を均等にするのが解決策です。

よくある誤解と落とし穴

最後に、Hadoop をめぐってよく見られる誤解を整理します。

  1. 「Hadoop = 速い」という誤解。Hadoop は速いのではなく、大きなデータをスループットよく扱うように設計されています。単件の低レイテンシ応答が目標なら、Hadoop は誤った道具です。

  2. 「Secondary NameNode はバックアップだ」という誤解。Secondary NameNode はチェックポイントの補助であって、フェイルオーバーをしません。本当の高可用性は HA(Active / Standby)構成で得られます。

  3. 「Spark が Hadoop を丸ごと置き換える」という誤解。Spark は処理エンジン(MapReduce)を置き換えるだけで、ストレージ(HDFS)やリソース管理(YARN)まで丸ごと置き換えるものではありません。ただしクラウドでは、オブジェクトストレージと Kubernetes の組み合わせがこの 2 つの役割を代わりに担うこともあります。

  4. 「オブジェクトストレージへ移せば無条件で良い」という誤解。rename コスト、一貫性モデル、メタデータ演算の特性などで HDFS と異なるので、テーブルフォーマット(Iceberg / Delta / Hudi)とともに設計しないと、かえって落とし穴にはまることがあります。

  5. 「小さなファイルは多くても平気」という誤解。NameNode のメモリと処理効率の両方に悪影響を与えます。コンパクションを運用ルーティンに組み込むべきです。

  6. 「レプリケーション 3 が常に正解」という誤解。ホットデータには合理的ですが、ほとんど読まない大容量のコールドデータにはイレイジャーコーディングのほうが経済的なことがあります。

おわりに

Hadoop は「障害が日常である安価なサーバー数千台で、巨大なデータを安全かつ効率的に処理する」という 1 つの大きな問いへの答えでした。その答えの具体的な実装体(HDFS、MapReduce)は、クラウドとオブジェクトストレージ、レイクハウスの登場によって、かなりの部分を別のツールに譲りました。

しかし、その答えが確立した概念は今も生きています。データを分割して分散し、レプリケーションで保護するという発想、演算をデータの近くへ送るという発想、リソース管理と実行を分離するという発想、シャッフルでキーを再分配するという発想は、今日われわれが使うほぼすべての分散データシステムの土台に横たわっています。

ですから「Hadoop を学ぶべきか」という問いに、私はこう答えたいと思います。特定の製品としての Hadoop を新たに導入する機会は減りましたが、Hadoop が教えてくれた考え方を理解することは、現代のデータエンジニアにとって今も大きな資産です。レイクハウスであれ、クラウドネイティブなスタックであれ、その根には Hadoop が先に直面し、解いてきた問題があるからです。

参考資料