Skip to content

필사 모드: メッセージキューと非同期入門:Queue・Kafka・RabbitMQ・Redis・asyncio

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

はじめに — なぜキューが必要なのか

サービスが大きくなると「今すぐ処理しなくてもよい仕事」が増えます。会員登録後のウェルカムメール送信、アップロードされた動画のエンコード、決済後の精算集計。こうした仕事をリクエスト処理中に同期的にやると、ユーザーはひたすら待たされます。だから私たちは仕事をキューに入れて、あとで裏でゆっくり処理します。

メッセージキューは、この考えをシステムにしたものです。プロデューサー(producer)がメッセージを入れ、コンシューマー(consumer)が取り出して処理します。あいだにキューが入ることで、生産と消費が分離(decouple)され、速度差を吸収(buffering)し、コンシューマーが死んでもメッセージが残って再処理(reliability)されます。

問題は「メッセージキュー」という一語の下に、性格のかなり異なるツールが並んでいることです。Kafka、RabbitMQ、Redisはどれもキューと呼ばれますが、設計思想が違います。この記事はこれらを一つずつ対比し、最後にキューとよく混同されるPython asyncioまで整理します。

これらの概念を目で確かめたいなら、このサイトに新しく作った[メッセージキュー・プレイグラウンド](/tools/message-queue-playground)でインタラクティブに可視化できます。

素朴なFIFOキューから

もっとも単純なキューは先入れ先出し(FIFO)のデータ構造です。先に入ったものが先に出ます。プログラミング言語の標準ライブラリにも入っている、あのキューです。

from collections import deque

q = deque()

q.append("job-1") # enqueue

q.append("job-2")

print(q.popleft()) # dequeue -> job-1

print(q.popleft()) # dequeue -> job-2

このキューの核心的な性質は三つです。順序が保たれ、一つのメッセージは一つのコンシューマーだけに渡り、取り出すと消えます。概念は明快ですが、限界も明快です。この`deque`は一つのプロセスのメモリの中にしか存在しません。プロセスが死ねばキューも消え、別サーバーのコンシューマーと共有することもできません。

分散メッセージキューシステムは、まさにこの限界を越えます。キューをネットワークの向こうへ移し、ディスクに保存し、複数のプロデューサーとコンシューマーが接続できるようにします。ただし、そのやり方はそれぞれ異なります。

Kafka — 消せないログ

Kafkaは実は伝統的な意味の「キュー」ではありません。Kafkaは**追記専用ログ(append-only log)**です。メッセージをキューから取り出して消すのではなく、ログファイルの末尾に追記し続け、コンシューマーは「自分がどこまで読んだか」だけを覚えます。

核心概念を整理すると次のとおりです。

- **トピック(topic)とパーティション(partition)**:トピックはメッセージの分類で、各トピックは複数のパーティションに分割されます。パーティションが並列性の単位です。

- **オフセット(offset)**:各パーティション内でのメッセージの順番です。コンシューマーはオフセットを保存しておき、望めば過去のオフセットに戻って**リプレイ(replay)**できます。

- **コンシューマーグループ(consumer group)**:同じグループに属するコンシューマーがパーティションを分け合います。一つのパーティションはグループ内で一つのコンシューマーにだけ割り当てられます。だからコンシューマーを増やすとスループットが上がりますが、コンシューマー数がパーティション数を超えると、余ったコンシューマーは遊びます。

- **パーティション単位の順序保証**:順序はパーティション内でのみ保証されます。トピック全体の全域順序は保証されません。

図にすると次のようになります。

topic: orders

┌─────────────────────────────────────────┐

│ partition 0: [o0][o1][o2][o3] ... <- consumer A

│ partition 1: [o0][o1][o2] ... <- consumer B

│ partition 2: [o0][o1][o2][o3][o4] <- consumer C

└─────────────────────────────────────────┘

各コンシューマーは自分のオフセットを覚える

Kafkaが強力なのはログが残るからです。メッセージが消費と同時に消えないので、新しいコンシューマーがあとから接続して最初から読み直せます。イベントソーシング、ストリーム処理、複数のシステムが同じイベントをそれぞれ消費する構成によく合います。その代わり、順序がパーティション単位であること、そして「同じキーは同じパーティションへ送らないと順序が保たれない」ことを常に意識する必要があります。

RabbitMQ — 賢いルーティング

RabbitMQはAMQPプロトコルを実装した伝統的なメッセージブローカーです。Kafkaがログなら、RabbitMQは**賢い郵便局**です。核心は、プロデューサーがキューに直接入れないことです。プロデューサーは**エクスチェンジ(exchange)**にメッセージを送り、エクスチェンジがルールに従ってキューへ**ルーティング**します。

ルーティングの方式はエクスチェンジのタイプで決まります。

- **direct**:ルーティングキーが正確に一致するキューへ送ります。

- **fanout**:ルーティングキーを無視し、接続されたすべてのキューへコピーして送ります。ブロードキャストです。

- **topic**:ルーティングキーをパターンでマッチします。たとえば`order.*`や`order.#`のようなパターンで部分一致させます。

プロデューサー、エクスチェンジ、バインディング、キュー、コンシューマーの関係はこうです。

producer --> [exchange] --binding (routing key)--> [queue] --> consumer

├── direct : キー正確一致

├── fanout : すべてへコピー

└── topic : パターン一致 (order.*, order.#)

RabbitMQのもう一つの核心は**確認応答(ack)**です。コンシューマーがメッセージを処理してackを送ってはじめて、キューはそのメッセージを削除します。もしコンシューマーが処理の途中で死んでackを送れなければ、ブローカーはそのメッセージを別のコンシューマーへ再配信します。おかげで「処理中の消失」を防げます。

Kafkaとの決定的な違いは、消費後にメッセージが消える点です。RabbitMQは過去のメッセージを巻き戻してリプレイするのには向きません。その代わり、複雑なルーティング、作業分配、優先度キュー、遅延キューといった、きめ細かいメッセージフロー制御に強いです。

Redis — 軽くて多才な三つの方式

Redisはもともとインメモリのデータストアですが、メッセージングにも広く使われます。興味深いのは、Redisが異なる三つのメッセージング方式を提供することです。

**1. リストベースのキュー(LPUSH / BRPOP)。** もっとも単純な作業キューです。一方でリストに押し込み、もう一方で取り出します。`BRPOP`はブロッキング方式なので、メッセージがなければコンシューマーは待機します。

producer

LPUSH tasks "job-1"

LPUSH tasks "job-2"

consumer (メッセージが来るまで最大5秒待つ)

BRPOP tasks 5

**2. Pub/Sub(発行/購読)。** チャンネルに発行すると、その瞬間に購読中のすべてのコンシューマーへ配信されます。ただしこれは**fire-and-forget**です。購読者がいなければメッセージはそのまま消え、保存されません。リアルタイム通知のように「今聞いている人にだけ」送るのに向きます。

subscriber

SUBSCRIBE news

publisher (購読中の全員へ配信、保存されない)

PUBLISH news "hello"

**3. Streams(ストリーム)。** リストの単純さと、Kafkaの永続性・コンシューマーグループを折衷したのがRedis Streamsです。メッセージがログのように蓄積され、コンシューマーグループで分けて消費し、処理確認(ack)もできます。Kafkaほど大規模ではない規模で似たパターンが欲しいときに便利です。

ストリームへ追加

XADD mystream * event "signup" user "alice"

コンシューマーグループを作成してから読む

XGROUP CREATE mystream g1 0

XREADGROUP GROUP g1 consumer1 COUNT 1 STREAMS mystream >

Redisの魅力は、「すでにキャッシュとして使っているそのRedis」にキューを載せられる軽さです。ただし三つの方式は性質が異なるので、消失を許すのか(Pub/Sub)そうでないのか(リスト、Streams)を明確にして選ぶ必要があります。

いつ何を使うか — 比較表

ここまでの内容を一つの表にまとめると次のとおりです。

| 項目 | FIFOキュー | Kafka | RabbitMQ | Redis |

| --- | --- | --- | --- | --- |

| モデル | インメモリのデータ構造 | パーティション追記専用ログ | AMQPブローカー | リスト/Pub-Sub/Streams |

| 消費後のメッセージ | 消える | 残る(リプレイ可能) | 消える(ack後) | 方式による |

| 順序保証 | 全体 | パーティション単位 | キュー単位 | リスト・Streamsは保持 |

| ルーティング | なし | キー → パーティション | エクスチェンジ(強力) | 単純 |

| 強み | 単純さ | 大容量ストリーム、リプレイ | 複雑なルーティング、作業分配 | 軽量、多目的 |

| 代表的な用途 | プロセス内バッファ | イベントソーシング、ログパイプライン | マイクロサービスの作業キュー | キャッシュ兼用キュー、リアルタイム通知 |

簡単な選択ガイドはこうです。

- 一つのプロセス内の一時バッファなら**標準ライブラリのキュー**で十分です。

- イベントを長く保管し、複数のコンシューマーがリプレイ・再処理する必要があれば**Kafka**です。

- 複雑なルーティングと作業分配、確実なack基盤の処理が必要なら**RabbitMQ**です。

- すでにRedisを使っていて軽いキューやリアルタイム通知なら**Redis**が実用的です。

非同期とキューは違う — Python asyncio

ここでよく生じる誤解を押さえましょう。「非同期処理」と「メッセージキュー」はしばしば一緒に登場しますが、同じものではありません。メッセージキューはプロセスとサーバーをまたぐインフラであり、asyncioは一つのプロセスの中で並行性を扱うプログラミングモデルです。

Python asyncioの核心は**単一スレッドのイベントループ**です。スレッドを増やさず、一つのスレッドが複数のタスクのあいだを行き来して処理します。秘訣はI/Oの待ち時間です。ネットワーク応答やディスクを待つあいだ、そのタスクをいったん止め(`await`)、その隙に別のタスクを進めます。

async def fetch(name, delay):

print(f"start {name}")

await asyncio.sleep(delay) # このあいだ別のコルーチンが動く

print(f"done {name}")

return name

async def main():

三つのタスクを同時に進める (合計待ち時間ではなく最大値に近い)

results = await asyncio.gather(

fetch("a", 2),

fetch("b", 1),

fetch("c", 3),

)

print(results)

asyncio.run(main())

いくつか用語を整理しましょう。

- **コルーチン(coroutine)**:`async def`で定義した関数。途中で止まって再開できます。

- **await**:「ここで待つが、そのあいだイベントループが別の仕事をできるよう譲れ」という地点です。

- **gather**:複数のコルーチンを同時にスケジューリングして一緒に進めます。

もっとも重要な区別は**並行性(concurrency)と並列性(parallelism)**です。asyncioは並行性を与えます。複数のタスクが重なって進みますが、実際に同じ瞬間に実行されているのは一つのスレッドだけです。一方、並列性は複数のCPUコアで本当に同時に実行されることです。

この違いが実務で意味することは明白です。asyncioは**I/Oバウンド**の作業(ネットワーク、ディスク、DB待ち)に強いです。待つ時間が多いほど得が大きくなります。しかし**CPUバウンド**の作業(重い計算)には役立ちません。計算中は譲る隙がなく、イベントループが詰まってしまうからです。CPUバウンドにはマルチプロセシングや別ワーカーが必要です。

キューとasyncioは一緒に使われる

二つが違うからといって無関係なわけではありません。むしろ一緒に使われることが多いです。典型的な構成はこうです。ウェブサーバーがリクエストを受けると、重い仕事を自分でやらずにメッセージキューへ入れます。そして別のワーカープロセスがキューから仕事を取り出して処理し、そのワーカーの中で複数のメッセージをasyncioで同時に扱います。

request --> web server --(message)--> [queue: Kafka/RabbitMQ/Redis]

|

v

worker process

└ asyncio で複数メッセージを同時処理

つまり、キューは「仕事をあとへ、別のプロセスへ渡す」層であり、asyncioは「一つのプロセスの中で待ち時間を効率的に使う」層です。層が違うので、二つは競合せず補い合います。

よくある落とし穴

最後に、実務でよく引っかかる点を押さえます。

- **正確に一度(exactly-once)という幻想**:ほとんどのキューはデフォルトで「少なくとも一度(at-least-once)」を保証します。つまり同じメッセージが二度来ることがあります。だからコンシューマーは**冪等(idempotent)**に、同じメッセージを何度処理しても結果が同じになるよう設計すべきです。

- **順序への過信**:Kafkaの順序はパーティション単位です。全域順序が必要ならパーティションを一つにするか、キー設計を慎重にする必要があり、これは並列性を犠牲にします。

- **Pub/Subに永続性を期待する**:Redis Pub/Subは保存しません。取り逃したメッセージは消えます。消失が問題ならリストやStreams、あるいは別のブローカーを使うべきです。

- **asyncioの中でのブロッキング呼び出し**:イベントループの中で同期ブロッキング関数(たとえば普通の`time.sleep`やブロッキングDBドライバ)を呼ぶと、ループ全体が止まります。async対応のライブラリを使うか、別スレッドへ渡す必要があります。

- **デッドレターの無視**:失敗し続けるメッセージを無限に再試行するとキューが詰まります。失敗したメッセージを別に集めるデッドレターキューを置くのが安全です。

おわりに

メッセージキューは一つの概念ではなくスペクトルです。プロセス内の単純なFIFOキューから始まり、消せないログ(Kafka)、賢いルーティング(RabbitMQ)、軽くて多才なツール(Redis)へと続きます。そしてこれらとは性質が違うがよく一緒に使われるasyncioは、一つのプロセスの中で待ち時間を効率的に扱う並行性モデルです。

核心は「何が正しいか」ではなく「何がこの問題に合うか」です。リプレイが必要ならKafka、ルーティングが必要ならRabbitMQ、軽さが必要ならRedis、プロセス内の並行性ならasyncio。この対応を頭に入れておけば、たいていの選択が楽になります。

概念を自分で動かしてみたいなら、[メッセージキュー・プレイグラウンド](/tools/message-queue-playground)で各方式がどう違って動くかを可視化してみてください。

参考資料

- Apache Kafka公式ドキュメント: https://kafka.apache.org/documentation/

- RabbitMQチュートリアル: https://www.rabbitmq.com/tutorials

- Redis Streams紹介: https://redis.io/docs/latest/develop/data-types/streams/

- Python asyncio公式ドキュメント: https://docs.python.org/3/library/asyncio.html

- AMQP 0-9-1の概念: https://www.rabbitmq.com/tutorials/amqp-concepts

현재 단락 (1/110)

サービスが大きくなると「今すぐ処理しなくてもよい仕事」が増えます。会員登録後のウェルカムメール送信、アップロードされた動画のエンコード、決済後の精算集計。こうした仕事をリクエスト処理中に同期的にやると...

작성 글자: 0원문 글자: 7,306작성 단락: 0/110