- Authors

- Name
- Youngju Kim
- @fjvbn20031
1. ストリーミングデータとは
1.1 バッチ処理 vs ストリーム処理
従来のデータ処理はバッチ方式でした。一定期間データを蓄積してから一括処理する方式です。 しかし、現代のアプリケーションではリアルタイムの応答が必須となっています。
バッチ処理 (Batch Processing)
============================
[データ収集] --> [ストレージ] --> [定期処理] --> [結果]
| |
+---- 数分 〜 数時間 -----------+
ストリーム処理 (Stream Processing)
==================================
[データ生成] --> [ストリーム] --> [即時処理] --> [結果]
| |
+---- ミリ秒 〜 数秒 ---------+
1.2 ストリーミングデータの活用事例
- リアルタイムログ分析: サーバーログを即時収集して異常検知
- IoTセンサーデータ: 数百万台のデバイスから発生するテレメトリデータ
- クリックストリーム分析: ユーザー行動をリアルタイムで追跡してパーソナライズ
- 金融取引モニタリング: 不正検知のためのリアルタイムトランザクション分析
- ソーシャルメディアフィード: リアルタイムトレンドとセンチメント分析
2. AWS Kinesisファミリー概要
AWS Kinesisは、リアルタイムストリーミングデータを収集、処理、分析するための完全マネージドサービス群です。
+------------------------------------------------------------------+
| AWS Kinesisファミリー |
+------------------------------------------------------------------+
| |
| +------------------+ +------------------+ +------------------+ |
| | Kinesis Data | | Amazon Data | | Managed Service | |
| | Streams | | Firehose | | for Apache Flink | |
| | | | | | | |
| | リアルタイム | | 配信パイプ | | ストリーム分析 | |
| | データ | | ライン | | (SQL, Java, | |
| | ストリーミング | | (S3, Redshift | | Python, Scala) | |
| | | | OpenSearch等) | | | |
| +------------------+ +------------------+ +------------------+ |
| |
| +------------------+ |
| | Kinesis Video | |
| | Streams | |
| | | |
| | ビデオ | |
| | ストリーミング | |
| | 取込・分析 | |
| +------------------+ |
+------------------------------------------------------------------+
| サービス | 主な用途 | データタイプ | レイテンシ |
|---|---|---|---|
| Data Streams | リアルタイムデータ収集/処理 | レコード(バイト) | リアルタイム(ミリ秒) |
| Data Firehose | データ配信パイプライン | レコード(バイト) | 準リアルタイム(60秒〜) |
| Managed Flink | ストリーム分析/変換 | ストリームデータ | リアルタイム(ミリ秒) |
| Video Streams | ビデオ取込/再生 | メディアフレーム | リアルタイム(1〜10秒) |
3. Kinesis Data Streams 深掘り
3.1 コアアーキテクチャ
Kinesis Data Streamsは大規模リアルタイムデータストリーミングサービスの中核です。
Kinesis Data Stream
+----------------------------------------------------------------------+
| |
| Producer Shard 1 Shard 2 Shard 3 |
| -------- +----------+ +----------+ +----------+ |
| |App A | --> |Record 1 | --> |Record 4 | --> |Record 7 | |
| |App B | --> |Record 2 | --> |Record 5 | --> |Record 8 | |
| |App C | --> |Record 3 | --> |Record 6 | --> |Record 9 | |
| -------- +----------+ +----------+ +----------+ |
| | | | |
| v v v |
| Consumer A Consumer A Consumer A |
| Consumer B Consumer B Consumer B |
+----------------------------------------------------------------------+
3.2 コア構成要素
ストリーム (Stream)
- シャードの論理的グループ
- データレコードの順序が保証される単位
シャード (Shard)
- ストリームの基本スループット単位
- 書き込み: 1MB/秒 または 1,000レコード/秒
- 読み取り: 2MB/秒(共有ファンアウト)、2MB/秒/コンシューマ(拡張ファンアウト)
レコード (Record)
- データの基本単位
- パーティションキー、シーケンス番号、データブロブ(最大1MB)で構成
パーティションキー (Partition Key)
- レコードがどのシャードに割り当てられるかを決定
- MD5ハッシュを使用してシャードにマッピング
- 同じパーティションキーのレコードは同じシャードに順序通り格納
シーケンス番号 (Sequence Number)
- 各レコードに自動割り当てされる一意の識別子
- シャード内でのレコードの順序を保証
パーティションキー ハッシュ過程
================================
Partition Key: "user-123"
|
v
MD5("user-123") = 0x7A3B...
|
v
Hash Range: 0 ~ 2^128 - 1
|
v
Shard 1: [0 ~ 2^127] <-- この範囲にマッピング
Shard 2: [2^127 ~ 2^128 - 1]
3.3 プロデューサー (Producers)
Kinesisストリームにデータを送信する方法は複数あります。
1) PutRecord / PutRecords API
最も基本的な方式です。
import boto3
import json
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
# 単一レコード送信
response = kinesis.put_record(
StreamName='my-data-stream',
Data=json.dumps({
'event_type': 'page_view',
'user_id': 'user-123',
'page': '/products/laptop',
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8'),
PartitionKey='user-123'
)
print(f"Shard ID: {response['ShardId']}")
print(f"Sequence Number: {response['SequenceNumber']}")
# バッチレコード送信
records = []
for i in range(100):
records.append({
'Data': json.dumps({
'event_type': 'click',
'user_id': f'user-{i % 10}',
'element': 'buy_button',
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8'),
'PartitionKey': f'user-{i % 10}'
})
response = kinesis.put_records(
StreamName='my-data-stream',
Records=records
)
print(f"Failed records: {response['FailedRecordCount']}")
2) KPL (Kinesis Producer Library)
高性能プロデューシングのためのライブラリです。
- レコード集約 (Aggregation): 複数の小さなレコードを1つのKinesisレコードにまとめる
- レコード収集 (Collection): 複数のKinesisレコードを1つのPutRecords呼び出しにまとめる
- 自動リトライ: 失敗したレコードを自動的に再送信
- CloudWatchメトリクス: 自動的にパフォーマンスメトリクスを発行
KPL 集約プロセス
=================
User Record A (100 bytes) --+
User Record B (200 bytes) --+--> Kinesis Record (500 bytes)
User Record C (200 bytes) --+ |
v
User Record D (300 bytes) --+ PutRecords API
User Record E (400 bytes) --+--> (単一呼び出し)
User Record F (100 bytes) --+
3) AWS SDK直接使用
各種言語のAWS SDKを通じて直接APIを呼び出すことができます。
4) Kinesis Agent
サーバーにインストールしてログファイルを自動的にストリームに送信するエージェントです。
3.4 コンシューマー (Consumers)
1) 共有ファンアウト (Shared Fan-Out) - GetRecords
デフォルトのコンシューマー方式で、シャードあたり2MB/秒の読み取りスループットを全コンシューマーで共有します。
import boto3
import json
import time
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
# シャードイテレーター取得
response = kinesis.get_shard_iterator(
StreamName='my-data-stream',
ShardId='shardId-000000000000',
ShardIteratorType='LATEST' # TRIM_HORIZON, AT_SEQUENCE_NUMBER, AT_TIMESTAMP等
)
shard_iterator = response['ShardIterator']
# レコードポーリング
while True:
response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in response['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"Partition Key: {record['PartitionKey']}")
print(f"Sequence Number: {record['SequenceNumber']}")
print(f"Data: {data}")
shard_iterator = response['NextShardIterator']
# GetRecordsはシャードあたり毎秒5回まで呼び出し可能
time.sleep(0.2)
2) 拡張ファンアウト (Enhanced Fan-Out) - SubscribeToShard
HTTP/2を使用したプッシュベースの配信方式です。
共有ファンアウト vs 拡張ファンアウト
=======================================
共有ファンアウト (Shared Fan-Out):
+--------+ +--------+
| Shard | --> | 2MB/s | --> Consumer A (ポーリング)
| | | (共有) | --> Consumer B (ポーリング)
+--------+ +--------+ Consumer C (ポーリング)
3つのコンシューマーが2MB/sを共有
各コンシューマー: ~0.67 MB/s
拡張ファンアウト (Enhanced Fan-Out):
+--------+ +--------+
| Shard | --> | 2MB/s | --> Consumer A (HTTP/2 プッシュ)
| | | 2MB/s | --> Consumer B (HTTP/2 プッシュ)
| | | 2MB/s | --> Consumer C (HTTP/2 プッシュ)
+--------+ +--------+
各コンシューマーに専用の2MB/sを割当
レイテンシ: ~70ms (共有: ~200ms+)
3.5 KCL (Kinesis Client Library)
KCLは分散コンシューマーアプリケーション開発を簡素化するライブラリです。
コア機能:
- リース管理: DynamoDBテーブルを使用してシャードごとの所有権を追跡
- チェックポインティング: 処理進捗をDynamoDBに保存して障害復旧をサポート
- 自動ロードバランシング: ワーカー数に応じてシャードを自動再分配
- リシャーディング処理: シャード分割・マージ時に自動対応
KCL アーキテクチャ
===================
DynamoDB (Lease Table)
+---------------------+
| Shard ID | Worker |
|----------|----------|
| shard-0 | worker-1 |
| shard-1 | worker-2 |
| shard-2 | worker-1 |
| shard-3 | worker-2 |
+---------------------+
^ ^
| |
+-------+ +-------+
| |
+-----------+ +-----------+
| Worker 1 | | Worker 2 |
| (EC2/ECS) | | (EC2/ECS) |
| | | |
| shard-0 | | shard-1 |
| shard-2 | | shard-3 |
+-----------+ +-----------+
3.6 データ保持期間
- デフォルト: 24時間
- 最大: 365日
- 保持期間が長いほどコスト増加
- データリプレイが必要な場合は保持期間を延長
3.7 シャード分割とマージ
シャード分割 (Split)
=====================
Shard 1 [0 ~ 100]
|
split at 50
|
+----+----+
| |
Shard 3 Shard 4
[0 ~ 50] [51 ~ 100]
シャードマージ (Merge)
=======================
Shard 3 [0 ~ 50] --> Shard 5
Shard 4 [51 ~ 100] --> [0 ~ 100]
- 分割: ホットシャードの負荷を分散する時に使用
- マージ: トラフィックが減少した2つの隣接シャードを統合する時に使用
- オンデマンドモードでは自動的に処理
3.8 キャパシティモード
プロビジョニングモード (Provisioned Mode)
- シャード数を直接指定
- 予測可能なワークロードに適合
- シャード時間あたりの課金
オンデマンドモード (On-Demand Mode)
- 自動的にシャード数を調整
- 予測不可能なワークロードに適合
- データ量ベースの課金
- 2025年新規: On-Demand Advantageモード(従来比60%安いデータ使用料)
4. Amazon Data Firehose(旧Kinesis Data Firehose)
4.1 概要
Amazon Data Firehoseは、ストリーミングデータをデータレイク、データストア、 分析サービスに安定的に配信する完全マネージドサービスです。
Amazon Data Firehose アーキテクチャ
======================================
+----------+ +-----------+ +------------+ +----------+
| ソース | | Firehose | | 変換 | | 配信先 |
| | --> | 配信 | --> | (任意) | --> | |
| - Direct | | ストリーム| | - Lambda | | - S3 |
| - Kinesis| | | | - フォーマ | | - Redshift|
| Data | | バッファ: | | ット変換 | | - Open- |
| Streams| | - サイズ | | - 圧縮 | | Search |
| - MSK | | - 時間 | | - 暗号化 | | - Splunk |
| | | | | | | - HTTP |
+----------+ +-----------+ +------------+ +----------+
|
+----------+
| バックアップ|
| S3 (原本) |
+----------+
4.2 主要特徴
シャード管理不要
- Data Streamsと異なりシャードを直接管理する必要なし
- 自動的にスケールアウト/イン
バッファリング設定
- サイズベース: 1MB〜128MB
- 時間ベース: 60秒〜900秒
- どちらかの条件に達したら配信
データ変換
- Lambda関数によるカスタム変換
- Apache Parquet、ORC等のカラムナーフォーマットへの自動変換
- Gzip、Snappy、Zip圧縮サポート
- SSE-S3またはSSE-KMS暗号化
4.3 Firehose配信先
| 配信先 | 特徴 |
|---|---|
| Amazon S3 | 最も一般的、Parquet/ORC変換可能 |
| Amazon Redshift | S3経由後COPYコマンドでロード |
| Amazon OpenSearch | ログ分析、検索インデキシング |
| Splunk | セキュリティモニタリング、運用ログ |
| HTTPエンドポイント | カスタム配信先、Datadog等 |
| Snowflake | クラウドデータウェアハウス |
| Apache Iceberg | オープンテーブルフォーマット |
4.4 コード例: Firehose送信
import boto3
import json
firehose = boto3.client('firehose', region_name='ap-northeast-1')
# 単一レコード送信
response = firehose.put_record(
DeliveryStreamName='my-firehose-stream',
Record={
'Data': json.dumps({
'event_type': 'purchase',
'user_id': 'user-456',
'product': 'laptop',
'amount': 1299.99,
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8')
}
)
# バッチ送信
records = []
for i in range(50):
records.append({
'Data': json.dumps({
'event_type': 'page_view',
'user_id': f'user-{i}',
'page': f'/product/{i}',
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8')
})
response = firehose.put_record_batch(
DeliveryStreamName='my-firehose-stream',
Records=records
)
print(f"Failed records: {response['FailedPutCount']}")
5. Kinesis Video Streams
5.1 概要
Kinesis Video Streamsは、カメラ、RADAR、LIDAR、ドローン等から発生するビデオおよびメディアデータを 安全に取り込み、再生できる完全マネージドサービスです。
5.2 主要機能
Kinesis Video Streams アーキテクチャ
=======================================
+----------+ +------------------+ +------------------+
| デバイス | | Kinesis Video | | コンシューマー |
| | --> | Streams | --> | |
| - カメラ | | | | - HLS再生 |
| - ドローン| | - 自動スケール | | - DASH再生 |
| - LIDAR | | - 耐久性ストレージ| | - GetMedia API |
| - スマート| | - 暗号化 | | - Rekognition |
| フォン | | | | - SageMaker |
+----------+ +------------------+ +------------------+
- HLS (HTTP Live Streaming): Webブラウザとモバイルでライブ/アーカイブ再生可能
- WebRTC: 超低遅延双方向メディアストリーミング
- Amazon Rekognition連携: 顔認識、オブジェクト検出等のコンピュータビジョン分析
- ストレージ階層: ホットストレージ(リアルタイムアクセス)とウォームストレージ(コスト効率的保管)
6. 料金モデル
6.1 Kinesis Data Streams料金
プロビジョニングモード:
| 項目 | コスト(米国東部基準) |
|---|---|
| シャード時間 | ~0.015 USD/シャード/時間 |
| PUTペイロードユニット (25KB) | ~0.014 USD/百万ユニット |
| 拡張ファンアウト データ取得 | ~0.013 USD/GB |
| 拡張ファンアウト コンシューマーシャード時間 | ~0.015 USD/コンシューマー/シャード/時間 |
| 長期保持(24時間超過) | ~0.023 USD/シャード/時間 |
オンデマンドモード:
| 項目 | コスト(米国東部基準) |
|---|---|
| ストリーム時間 (Standard) | ~0.04 USD/時間 |
| データ書き込み (Standard) | ~0.08 USD/GB |
| データ読み取り (Standard) | ~0.04 USD/GB |
| データ書き込み (Advantage) | ~0.032 USD/GB |
| データ読み取り (Advantage) | ~0.016 USD/GB |
6.2 Amazon Data Firehose料金
| 項目 | コスト(米国東部基準) |
|---|---|
| データ取込(最初の500TB/月) | ~0.029 USD/GB |
| フォーマット変換 | ~0.018 USD/GB |
| VPC配信 | ~0.01 USD/GB + 時間あたり料金 |
7. エンドツーエンドアーキテクチャ例:リアルタイム分析パイプライン
リアルタイム分析パイプラインアーキテクチャ
=============================================
+----------+ +---------+ +----------+ +---------+ +----------+
| Web/ | | Kinesis | | Lambda | | Firehose| | S3 |
| Mobile |-->| Data |-->| (変換/ |-->| 配信 |-->| (Data |
| App | | Streams | | フィルタ)| | ストリーム| | Lake) |
+----------+ +---------+ +----------+ +---------+ +----------+
| |
v v
+-----------+ +----------+
| Managed | | Athena |
| Flink | | (Ad-hoc |
| (リアル | | クエリ) |
| タイム分析)| +----------+
+-----------+ |
| v
v +----------+
+-----------+ | Quick- |
| DynamoDB | | Sight |
| (リアル | | (BI |
| タイムDB)| |ダッシュ |
+-----------+ | ボード) |
+----------+
7.1 完全なプロデューサー/コンシューマー例
# === producer.py ===
import boto3
import json
import time
import random
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
STREAM_NAME = 'clickstream-data'
def generate_click_event():
"""クリックストリームイベント生成"""
pages = ['/home', '/products', '/cart', '/checkout', '/profile']
actions = ['view', 'click', 'scroll', 'submit']
user_id = f'user-{random.randint(1, 1000)}'
return {
'user_id': user_id,
'page': random.choice(pages),
'action': random.choice(actions),
'session_id': f'sess-{random.randint(1, 100)}',
'timestamp': datetime.utcnow().isoformat() + 'Z',
'device': random.choice(['mobile', 'desktop', 'tablet']),
'country': random.choice(['KR', 'US', 'JP', 'DE'])
}
def send_events(batch_size=50, interval=1.0):
"""イベントをKinesisストリームに送信"""
while True:
records = []
for _ in range(batch_size):
event = generate_click_event()
records.append({
'Data': json.dumps(event).encode('utf-8'),
'PartitionKey': event['user_id']
})
try:
response = kinesis.put_records(
StreamName=STREAM_NAME,
Records=records
)
failed = response['FailedRecordCount']
if failed > 0:
print(f"Warning: {failed} records failed")
for i, record_response in enumerate(response['Records']):
if 'ErrorCode' in record_response:
print(f" Error: {record_response['ErrorCode']}")
else:
print(f"Sent {batch_size} records successfully")
except Exception as e:
print(f"Error: {e}")
time.sleep(interval)
if __name__ == '__main__':
send_events()
# === consumer.py ===
import boto3
import json
import time
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
STREAM_NAME = 'clickstream-data'
def get_shard_ids():
"""ストリームの全シャードIDを返す"""
response = kinesis.describe_stream(StreamName=STREAM_NAME)
return [
shard['ShardId']
for shard in response['StreamDescription']['Shards']
]
def process_records(records):
"""レコード処理ロジック"""
page_views = {}
for record in records:
data = json.loads(record['Data'].decode('utf-8'))
page = data.get('page', 'unknown')
page_views[page] = page_views.get(page, 0) + 1
# 異常行動検知の例
if data.get('action') == 'submit' and data.get('page') == '/checkout':
print(f"[ALERT] Checkout event: user={data['user_id']}")
for page, count in page_views.items():
print(f" Page: {page}, Views: {count}")
def consume_stream():
"""ストリームからデータを読み取り処理"""
shard_ids = get_shard_ids()
print(f"Found {len(shard_ids)} shards")
shard_iterators = {}
for shard_id in shard_ids:
response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType='LATEST'
)
shard_iterators[shard_id] = response['ShardIterator']
while True:
for shard_id in shard_ids:
try:
response = kinesis.get_records(
ShardIterator=shard_iterators[shard_id],
Limit=100
)
if response['Records']:
print(f"\n--- Shard: {shard_id} ---")
print(f"Records received: {len(response['Records'])}")
process_records(response['Records'])
shard_iterators[shard_id] = response['NextShardIterator']
except kinesis.exceptions.ExpiredIteratorException:
response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType='LATEST'
)
shard_iterators[shard_id] = response['ShardIterator']
time.sleep(1)
if __name__ == '__main__':
consume_stream()
8. 主要制限事項と参考情報
| 項目 | 制限 |
|---|---|
| レコード最大サイズ | 1 MB |
| PutRecordsリクエストあたりの最大レコード数 | 500個 |
| PutRecordsリクエストあたりの最大サイズ | 5 MB |
| シャードあたりの書き込みスループット | 1 MB/秒 または 1,000レコード/秒 |
| シャードあたりの読み取りスループット(共有) | 2 MB/秒、GetRecords毎秒5回 |
| シャードあたりの読み取りスループット(拡張ファンアウト) | コンシューマーあたり2 MB/秒 |
| 最大登録コンシューマー(拡張ファンアウト) | ストリームあたり20個 |
| データ保持 | 24時間(デフォルト)〜365日 |
| ストリームあたりの最大シャード数 | デフォルト500(上限緩和申請可能) |
9. まとめ
AWS Kinesisファミリーは、リアルタイムデータストリーミングのための包括的なソリューションを提供します。
- Kinesis Data Streams: リアルタイムデータ収集と処理の中核。シャードベースのアーキテクチャで スケーラブルであり、KCLと拡張ファンアウトを通じて多様なコンシューマーパターンをサポートします。
- Amazon Data Firehose: データをS3、Redshift、OpenSearch等に自動的に配信するパイプライン。 シャード管理なしで簡単に使用できます。
- Managed Flink: Apache Flinkベースのマネージドストリーム分析サービスで、 SQLとコードベースの分析の両方をサポートします。
- Video Streams: ビデオ/メディアデータの取込、保存、再生のための専門サービスです。
次の記事では、Kinesisの実践アーキテクチャパターンとApache Kafka、SQSとの比較を取り上げます。