- Authors

- Name
- Youngju Kim
- @fjvbn20031
1. Kinesis vs Apache Kafka 詳細比較
1.1 アーキテクチャの違い
Apache KafkaとAWS Kinesisはどちらもリアルタイムデータストリーミングのためのプラットフォームですが、 根本的なアーキテクチャの思想が異なります。
Apache Kafka アーキテクチャ
============================
+----------+ +------------------------------------------+
| Producer | --> | Kafka Cluster |
+----------+ | |
| Broker 1 Broker 2 Broker 3 |
| +-------+ +-------+ +-------+ |
| |Topic A| |Topic A| |Topic A| |
| |Part 0 | |Part 1 | |Part 2 | |
| | | | | | | |
| |Topic B| |Topic B| |Topic B| |
| |Part 0 | |Part 1 | |Part 2 | |
| +-------+ +-------+ +-------+ |
| |
| KRaft (Kafka 4.0+, ZooKeeper廃止) |
+------------------------------------------+
|
v
+----------+
| Consumer |
| Group |
+----------+
AWS Kinesis Data Streams アーキテクチャ
=========================================
+----------+ +------------------------------------------+
| Producer | --> | Kinesis Stream (完全マネージド) |
+----------+ | |
| Shard 1 Shard 2 Shard 3 |
| +-------+ +-------+ +-------+ |
| |Records| |Records| |Records| |
| |1MB/s W| |1MB/s W| |1MB/s W| |
| |2MB/s R| |2MB/s R| |2MB/s R| |
| +-------+ +-------+ +-------+ |
| |
| AWS完全マネージド(インフラ管理不要) |
+------------------------------------------+
|
v
+----------+
| Consumer |
| (KCL) |
+----------+
1.2 総合比較表
| 比較項目 | AWS Kinesis | Apache Kafka(自己運用) | Amazon MSK |
|---|---|---|---|
| 管理方式 | 完全マネージド | 自己運用 | マネージドKafka |
| スケール単位 | シャード | パーティション + ブローカー | ブローカー |
| シャード/パーティションあたり書き込み | 1 MB/s | 制限なし(ディスクI/O) | ディスクI/O依存 |
| シャード/パーティションあたり読み取り | 2 MB/s(共有) | 制限なし | ディスクI/O依存 |
| 最大保持期間 | 365日 | 無制限 | 無制限 |
| 順序保証 | シャード内 | パーティション内 | パーティション内 |
| レコード最大サイズ | 1 MB | デフォルト1 MB(設定変更可) | デフォルト1 MB |
| コンシューマーモデル | KCL、拡張ファンアウト | コンシューマーグループ | コンシューマーグループ |
| プロトコル | HTTPS/HTTP2 | 独自TCPプロトコル | 独自TCP |
| エコシステム | AWSサービス統合 | Kafka Connect、Schema Registry等 | Kafkaエコシステム |
| 運用複雑度 | 低 | 高 | 中 |
| 初期セットアップ | 数分 | 数時間〜数日 | 数十分 |
1.3 スループットとレイテンシ
スループット比較
=================
Kinesis(プロビジョニング):
書き込み: 1 MB/s x シャード数
読み取り: 2 MB/s x シャード数(共有)
2 MB/s x シャード数 x コンシューマー数(拡張ファンアウト)
例: 100シャード
書き込み: 100 MB/s
読み取り: 200 MB/s(共有)または 200 MB/s x N(拡張ファンアウト)
Kafka(自己運用):
ブローカー性能に依存
単一ブローカー: 数百 MB/s 可能
クラスター: 数 GB/s 処理可能
例: 6ブローカークラスター
書き込み: 600+ MB/s
読み取り: 数 GB/s
レイテンシ比較
===============
Kinesis:
- PutRecord: 数十 ms
- GetRecords(共有ファンアウト): ~200 ms
- Enhanced Fan-Out: ~70 ms
Kafka:
- プロデューサー -> コンシューマー: ~2-10 ms(ネットワーク依存)
- エンドツーエンド: ~10-50 ms
1.4 コスト比較
月間コスト推定(米国東部基準)
================================
シナリオ: 10 MB/s持続的データ取込、3つのコンシューマー
Kinesis(プロビジョニングモード):
- 10シャード必要(10 MB/s / 1 MB/s per shard)
- シャードコスト: 10 x 0.015 x 720時間 = ~108 USD
- PUTユニット: ~360 USD(約25.9B ユニット/月)
- 拡張ファンアウト(3コンシューマー): ~324 USD
合計: ~792 USD/月
Kinesis(オンデマンドAdvantage):
- データ書き込み: ~25.9 TB x 0.032 = ~829 USD
- データ読み取り: ~25.9 TB x 3 x 0.016 = ~1,243 USD
合計: ~2,072 USD/月
Kafka(EC2自己運用):
- 3ブローカー(m5.xlarge): 3 x 140 = ~420 USD
- EBSストレージ(1TB x 3): ~300 USD
- 運用人員コスト: 別途
合計: ~720 USD/月 + 運用コスト
Amazon MSK:
- 3ブローカー(kafka.m5.large): ~456 USD
- ストレージ: ~300 USD
合計: ~756 USD/月
1.5 いつ何を選ぶべきか
Kinesisを選ぶ場合:
- AWS環境に深く統合されたアーキテクチャ
- 運用負担を最小化したい時
- Lambda、Firehose等AWSサービスとの直接連携が必要な時
- 中小規模スループット(数十MB/s以下)
- 迅速なプロトタイピングが必要な時
Kafkaを選ぶ場合:
- 非常に高いスループットが必要な時(数GB/s)
- マルチクラウドまたはハイブリッド環境
- Kafka Connectエコシステムが必要な時
- 超低レイテンシが要求される時(数ms)
- 無制限のデータ保持が必要な時
2. Kinesis vs SQS:いつ何を使うべきか
| 比較項目 | Kinesis Data Streams | Amazon SQS |
|---|---|---|
| データ処理モデル | ストリーミング(連続処理) | メッセージキュー(個別処理) |
| コンシューマー数 | 複数コンシューマー同時処理 | 基本的に単一コンシューマー |
| 順序保証 | シャード内保証 | FIFOキューのみ保証 |
| データ保持 | 24時間〜365日 | 最大14日 |
| データリプレイ | 可能(シーケンス番号ベース) | 不可(処理後削除) |
| スループット | シャードあたり1 MB/s書き込み | ほぼ無制限 |
| メッセージサイズ | 最大1 MB | 最大256 KB |
| レイテンシ | ミリ秒 | ミリ秒 |
| コストモデル | シャード時間 + データ転送 | リクエスト数ベース |
| 主な用途 | リアルタイム分析、ログ収集 | マイクロサービスデカップリング |
使用シナリオ判定フローチャート
================================
データ処理要件分析
|
+----+----+
| |
同じデータを メッセージを
複数のコンシ 一回だけ処理
ューマーが すれば良いか?
読む必要が |
あるか? SQS
|
リアルタイムの
順序保証が
必要か?
|
+----+----+
| |
YES NO
| |
Kinesis SQS FIFO
Data または
Streams Kinesis
3. Amazon Managed Service for Apache Flink
3.1 概要
Amazon Managed Service for Apache Flink(旧Kinesis Data Analytics)は、Apache Flinkを 完全マネージドインフラストラクチャ上で実行できるサービスです。
注意: 旧Kinesis Data Analytics for SQLは2025年10月以降新規作成が停止されており、 Amazon Managed Service for Apache Flinkへの移行が推奨されています。
3.2 主要機能
Managed Flink アーキテクチャ
==============================
+-----------+ +----------------------------+ +-----------+
| ソース | | Managed Flink | | シンク |
| | --> | | --> | |
| - Kinesis | | +------------------------+ | | - Kinesis |
| - MSK | | | Flink Application | | | - S3 |
| - S3 | | | | | | - DynamoDB|
| | | | - SQLクエリ | | | - Open- |
| | | | - Java/Scalaアプリ | | | Search |
| | | | - Python (PyFlink) | | | - Redshift|
| | | | | | | |
| | | | ウィンドウ集約 | | | |
| | | | パターン検出 | | | |
| | | | CEP (複合イベント処理) | | | |
| | | +------------------------+ | | |
+-----------+ +----------------------------+ +-----------+
3.3 ウィンドウ処理タイプ
ストリームデータを時間ベースでグループ化して分析するコア機能です。
ウィンドウタイプ
=================
1) タンブリングウィンドウ (Tumbling Window)
- 固定サイズ、重複なし
|-------|-------|-------|-------|
0 5 10 15 20 (秒)
2) スライディングウィンドウ (Sliding/Hopping Window)
- 固定サイズ、一定間隔でスライド
|-----------|
|-----------|
|-----------|
0 2 4 6 8 10 (秒)
サイズ: 6秒、スライド: 2秒
3) セッションウィンドウ (Session Window)
- アクティビティベース、非アクティブ期間で区分
|---event-event---| gap |--event-event-event--| gap |
<-- Session 1 --> <------ Session 2 ---->
4) グローバルウィンドウ (Global Window)
- ストリーム全体に対する単一ウィンドウ
3.4 Flink SQL 例
-- Kinesisソーステーブル定義
CREATE TABLE clickstream (
user_id VARCHAR,
page VARCHAR,
action VARCHAR,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'clickstream-data',
'aws.region' = 'ap-northeast-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
);
-- 1分タンブリングウィンドウでページ別閲覧数を集約
SELECT
page,
COUNT(*) AS view_count,
COUNT(DISTINCT user_id) AS unique_users,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end
FROM clickstream
WHERE action = 'view'
GROUP BY
page,
TUMBLE(event_time, INTERVAL '1' MINUTE);
-- 異常行動検知: 1分以内に同一ユーザーが10回以上クリック
SELECT
user_id,
COUNT(*) AS click_count,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start
FROM clickstream
WHERE action = 'click'
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' MINUTE)
HAVING COUNT(*) >= 10;
4. 実践ストリーミングアーキテクチャパターン
4.1 ログ集約パイプライン
ログ集約アーキテクチャ
========================
+----------+ +----------+ +---------+ +----------+
| App | | Kinesis | | Firehose| | S3 |
| Server 1 | --> | Agent | --> | | --> | (Raw |
+----------+ +----------+ | | | Logs) |
| | +----------+
+----------+ +----------+ | | |
| App | | Kinesis | | | v
| Server 2 | --> | Agent | --> | | +----------+
+----------+ +----------+ | | | Athena |
| | | (Query) |
+----------+ +----------+ | | +----------+
| App | | Kinesis | | |
| Server N | --> | Agent | --> | |
+----------+ +----------+ +---------+
|
v
+----------+ +----------+
| Lambda | --> | Open- |
| (リアル | | Search |
| タイム | | (検索/ |
| アラート)| | ダッシュ |
+----------+ | ボード) |
+----------+
4.2 リアルタイム分析ダッシュボード
リアルタイムダッシュボードアーキテクチャ
==========================================
+----------+ +----------+ +-----------+ +----------+
| Web/ | | API | | Kinesis | | Managed |
| Mobile | --> | Gateway | --> | Data | --> | Flink |
| Client | | | | Streams | | (集約/ |
+----------+ +----------+ +-----------+ | 分析) |
+----------+
|
+---------+----+----+---------+
| | | |
v v v v
+--------+ +--------+ +--------+ +--------+
|DynamoDB| |Time- | | S3 | |Cloud- |
|(リアル | |stream | |(長期 | |Watch |
| タイム)| |(時系列)| | 保存) | |(メト |
| | | | | | |リック) |
+--------+ +--------+ +--------+ +--------+
| |
v v
+--------------------+
| ダッシュボードアプリ |
| (React/Vue + |
| WebSocket) |
+--------------------+
4.3 IoTデータ取込
# IoTデバイスシミュレーター
import boto3
import json
import time
import random
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
STREAM_NAME = 'iot-sensor-data'
def simulate_sensor(device_id):
"""IoTセンサーデータシミュレーション"""
return {
'device_id': device_id,
'temperature': round(random.uniform(15.0, 45.0), 2),
'humidity': round(random.uniform(20.0, 90.0), 2),
'pressure': round(random.uniform(990.0, 1030.0), 2),
'battery_level': round(random.uniform(0.0, 100.0), 1),
'location': {
'lat': round(random.uniform(33.0, 38.0), 6),
'lon': round(random.uniform(126.0, 130.0), 6)
},
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
def ingest_iot_data(num_devices=100, interval=1.0):
"""IoTデータをKinesisに取込"""
device_ids = [f'sensor-{i:04d}' for i in range(num_devices)]
while True:
records = []
for device_id in device_ids:
sensor_data = simulate_sensor(device_id)
records.append({
'Data': json.dumps(sensor_data).encode('utf-8'),
'PartitionKey': device_id
})
# PutRecordsは最大500件まで
for batch_start in range(0, len(records), 500):
batch = records[batch_start:batch_start + 500]
response = kinesis.put_records(
StreamName=STREAM_NAME,
Records=batch
)
failed = response['FailedRecordCount']
if failed > 0:
print(f"Batch failed: {failed} records")
# 指数バックオフリトライロジック
retry_records = []
for i, result in enumerate(response['Records']):
if 'ErrorCode' in result:
retry_records.append(batch[i])
if retry_records:
time.sleep(0.5)
kinesis.put_records(
StreamName=STREAM_NAME,
Records=retry_records
)
print(f"Ingested {len(records)} sensor readings")
time.sleep(interval)
if __name__ == '__main__':
ingest_iot_data()
4.4 イベントソーシングパターン
イベントソーシングアーキテクチャ
==================================
+----------+ +----------+ +-----------+
| Command | | Kinesis | | Event |
| Handler | --> | Data | --> | Processor |
| | | Streams | | (Lambda/ |
| - 注文 | | (イベント| | ECS) |
| - 決済 | | ストア) | +-----------+
| - 配送 | +----------+ |
+----------+ | +-----+-----+
| | |
v v v
+----------+ +--------+ +--------+
| S3 | |DynamoDB| |SNS |
| (イベント| |(読み取り| |(通知) |
| アーカイブ)| | モデル)| | |
+----------+ +--------+ +--------+
イベントフロー例:
1. OrderCreated -> 注文作成イベント
2. PaymentProcessed -> 決済処理イベント
3. InventoryReserved -> 在庫予約イベント
4. ShipmentCreated -> 配送作成イベント
4.5 MLフィーチャーパイプライン
MLフィーチャーパイプライン
============================
+----------+ +----------+ +-----------+ +----------+
| イベント | | Kinesis | | Managed | | Feature |
| ソース | --> | Data | --> | Flink | --> | Store |
| | | Streams | | (フィー | | (Sage- |
| - クリック| | | | チャー計算)| | Maker) |
| - 購入 | | | | | +----------+
| - 検索 | | | | リアルタイム:| |
+----------+ +----------+ | - セッション数| v
| - 最近の | +----------+
| 購入数 | | MLモデル |
| - 平均 | | 推論 |
| 滞在時間 | +----------+
+-----------+
5. パフォーマンス最適化
5.1 パーティションキー設計
パーティションキー設計はKinesisパフォーマンスの最も重要な要素です。
良いパーティションキーの条件:
- 高いカーディナリティ(ユニークな値が多いこと)
- 均等な分布(特定のキーに偏らないこと)
- シャード数の最低10倍以上のユニークキーを確保
良いパーティションキー例
========================
1) ユーザーID(高カーディナリティ)
user-001 -> Shard 1
user-002 -> Shard 3
user-003 -> Shard 2
...
均等に分散
2) UUID(最高分散)
ランダムUUID -> 完璧な分散
欠点: 同じエンティティの順序保証不可
3) 複合キー
"region-userType-userId"
きめ細かい分散制御が可能
悪いパーティションキー例
========================
1) 日付 ("2026-03-20")
全レコードが同じシャードへ -> ホットシャード
2) 国コード ("KR", "US", "JP")
カーディナリティが低すぎる
特定の国にトラフィックが偏る
3) 固定値 ("default")
単一シャードに全負荷が集中
5.2 KPLを活用した集約
KPL集約最適化
==============
集約なし:
Record 1 (100B) -> PutRecord -> 1 API呼び出し
Record 2 (200B) -> PutRecord -> 1 API呼び出し
Record 3 (150B) -> PutRecord -> 1 API呼び出し
合計: 3 API呼び出し、450B送信
KPL集約使用:
Record 1 (100B) --+
Record 2 (200B) --+--> 集約レコード (450B) -> 1 API呼び出し
Record 3 (150B) --+
合計: 1 API呼び出し、450B送信
効果:
- API呼び出し数の大幅削減
- PUTユニットコスト削減
- スループットの大幅向上
5.3 拡張ファンアウト戦略
# 拡張ファンアウトコンシューマー登録
import boto3
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
# コンシューマー登録
response = kinesis.register_stream_consumer(
StreamARN='arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream',
ConsumerName='analytics-consumer'
)
consumer_arn = response['Consumer']['ConsumerARN']
print(f"Consumer ARN: {consumer_arn}")
# コンシューマー状態確認
response = kinesis.describe_stream_consumer(
StreamARN='arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream',
ConsumerName='analytics-consumer'
)
print(f"Status: {response['ConsumerDescription']['ConsumerStatus']}")
5.4 エラー処理とリトライ戦略
import time
import random
def put_records_with_retry(kinesis_client, stream_name, records, max_retries=3):
"""指数バックオフを使用したPutRecordsリトライ"""
for attempt in range(max_retries):
response = kinesis_client.put_records(
StreamName=stream_name,
Records=records
)
failed_count = response['FailedRecordCount']
if failed_count == 0:
return response
# 失敗したレコードのみ抽出
retry_records = []
for i, result in enumerate(response['Records']):
if 'ErrorCode' in result:
error_code = result['ErrorCode']
if error_code == 'ProvisionedThroughputExceededException':
retry_records.append(records[i])
else:
print(f"Non-retryable error: {error_code}")
if not retry_records:
return response
records = retry_records
# 指数バックオフ + ジッター
backoff = min(2 ** attempt * 0.1, 5.0)
jitter = random.uniform(0, backoff * 0.5)
wait_time = backoff + jitter
print(f"Retry {attempt + 1}: {len(retry_records)} records, waiting {wait_time:.2f}s")
time.sleep(wait_time)
print(f"Failed after {max_retries} retries: {len(records)} records")
return None
6. モニタリング:CloudWatchメトリクス
6.1 主要モニタリングメトリクス
| メトリクス | 説明 | アラート閾値 |
|---|---|---|
| IncomingBytes | ストリームに入るバイト数 | シャード容量の80% |
| IncomingRecords | ストリームに入るレコード数 | シャードあたり800 rec/s |
| GetRecords.IteratorAgeMilliseconds | コンシューマーの遅延 | 60,000 ms(1分) |
| WriteProvisionedThroughputExceeded | 書き込みスロットル回数 | 0超過でアラーム |
| ReadProvisionedThroughputExceeded | 読み取りスロットル回数 | 0超過でアラーム |
| GetRecords.Latency | GetRecords呼び出しレイテンシ | 1,000 ms |
| PutRecord.Latency | PutRecord呼び出しレイテンシ | 1,000 ms |
| GetRecords.Success | GetRecords成功率 | 99%未満でアラーム |
6.2 拡張モニタリング
拡張モニタリング有効化時の追加メトリクス
==========================================
シャードレベルメトリクス:
- IncomingBytes(シャード別)
- IncomingRecords(シャード別)
- IteratorAgeMilliseconds(シャード別)
- OutgoingBytes(シャード別)
- OutgoingRecords(シャード別)
- ReadProvisionedThroughputExceeded(シャード別)
- WriteProvisionedThroughputExceeded(シャード別)
ホットシャード検出:
Shard 1: IncomingBytes = 200 KB/s [正常]
Shard 2: IncomingBytes = 950 KB/s [警告! 限界付近]
Shard 3: IncomingBytes = 300 KB/s [正常]
-> Shard 2の分割を推奨
7. ベストプラクティスとアンチパターン
7.1 ベストプラクティス
1) パーティションキー設計
- 高カーディナリティキーの使用(ユーザーID、デバイスID)
- シャード数の10倍以上のユニークキーを確保
- 順序が必要な場合はエンティティIDをパーティションキーに使用
2) プロデューサー最適化
- PutRecords(バッチ)APIの使用でAPI呼び出しを最小化
- KPLの使用でレコード集約と収集を最適化
- 適切なリトライロジックの実装(指数バックオフ + ジッター)
3) コンシューマー最適化
- 複数コンシューマー時は拡張ファンアウトを使用
- KCLの使用で分散処理を自動化
- チェックポインティング頻度の最適化(頻繁すぎるとDynamoDBコスト増加)
4) キャパシティ管理
- 予測可能なワークロード: プロビジョニングモード
- 不規則なワークロード: オンデマンドモード
- CloudWatchアラームで自動スケーリングをトリガー
5) コスト最適化
- オンデマンドAdvantageモードの検討(2025年リリース)
- 保持期間を必要な分だけ設定
- 不要な拡張ファンアウトコンシューマーの登録解除
7.2 アンチパターン
1) 単一パーティションキーの使用
- 全データが1つのシャードに集中
- シャード制限に即座に到達
2) 過剰なシャード数
- コスト増加、管理の複雑化
- KCLのDynamoDBリーステーブルの負荷増加
3) チェックポイント未使用
- 障害時のデータ重複処理またはデータ損失
- 常に適切なチェックポインティング戦略を実装
4) エラー処理の欠如
- ProvisionedThroughputExceededExceptionの無視
- リトライなしで失敗データを損失
5) GetRecordsの過剰呼び出し
- シャードあたり毎秒5回の制限を遵守
- 適切なポーリング間隔を設定
8. 総合比較まとめ表
| 項目 | Kinesis Data Streams | Kinesis Firehose | Kafka | SQS | Managed Flink |
|---|---|---|---|---|---|
| タイプ | データストリーミング | データ配信 | データストリーミング | メッセージキュー | ストリーム処理 |
| 管理 | 完全マネージド | 完全マネージド | 自己/MSK | 完全マネージド | 完全マネージド |
| レイテンシ | ms | 60s+ | ms | ms | ms |
| 順序保証 | シャード内 | なし | パーティション内 | FIFOのみ | 入力依存 |
| リプレイ | 可能 | 不可 | 可能 | 不可 | 入力依存 |
| スケーリング | シャード追加 | 自動 | パーティション/ブローカー | 自動 | KPU追加 |
| 変換 | なし(コンシューマー) | Lambda | Kafka Streams | なし | Flinkアプリ |
| コストモデル | シャード+データ | データ量 | インスタンス | リクエスト数 | KPU時間 |
| AWS統合 | 高 | 非常に高 | 低/中 | 高 | 高 |
| 最適用途 | リアルタイム取込 | 自動配信 | 大量ストリーミング | タスクキュー | リアルタイム分析 |
9. まとめ
サービス選択ガイド
リアルタイムストリーミングアーキテクチャを設計する際は、要件に合ったサービスを選択することが重要です。
- シンプルなデータ配信が目的なら: Amazon Data Firehoseを使用してS3、Redshift等に直接配信
- リアルタイム処理と複数コンシューマーが必要なら: Kinesis Data Streams + KCLまたは拡張ファンアウト
- 複雑なストリーム分析が必要なら: Managed Service for Apache Flink
- 大容量処理とエコシステムが必要なら: Apache KafkaまたはAmazon MSK
- マイクロサービス間メッセージングが必要なら: Amazon SQS
各サービスの強みを理解し、複数のサービスを組み合わせて使用するのが実践での最適パターンです。 例えば、Kinesis Data Streamsで収集し、Managed Flinkでリアルタイム分析し、Data Firehoseで S3に長期保存する組み合わせは非常に一般的なアーキテクチャです。