Skip to content

필사 모드: [AWS] Kinesis完全ガイド:リアルタイムストリーミングデータ処理の全て

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

1. ストリーミングデータとは

1.1 バッチ処理 vs ストリーム処理

従来のデータ処理はバッチ方式でした。一定期間データを蓄積してから一括処理する方式です。

しかし、現代のアプリケーションではリアルタイムの応答が必須となっています。

バッチ処理 (Batch Processing)

============================

[データ収集] --> [ストレージ] --> [定期処理] --> [結果]

| |

+---- 数分 〜 数時間 -----------+

ストリーム処理 (Stream Processing)

==================================

[データ生成] --> [ストリーム] --> [即時処理] --> [結果]

| |

+---- ミリ秒 〜 数秒 ---------+

1.2 ストリーミングデータの活用事例

- **リアルタイムログ分析**: サーバーログを即時収集して異常検知

- **IoTセンサーデータ**: 数百万台のデバイスから発生するテレメトリデータ

- **クリックストリーム分析**: ユーザー行動をリアルタイムで追跡してパーソナライズ

- **金融取引モニタリング**: 不正検知のためのリアルタイムトランザクション分析

- **ソーシャルメディアフィード**: リアルタイムトレンドとセンチメント分析

2. AWS Kinesisファミリー概要

AWS Kinesisは、リアルタイムストリーミングデータを収集、処理、分析するための完全マネージドサービス群です。

+------------------------------------------------------------------+

| AWS Kinesisファミリー |

+------------------------------------------------------------------+

| |

| +------------------+ +------------------+ +------------------+ |

| | Kinesis Data | | Amazon Data | | Managed Service | |

| | Streams | | Firehose | | for Apache Flink | |

| | | | | | | |

| | リアルタイム | | 配信パイプ | | ストリーム分析 | |

| | データ | | ライン | | (SQL, Java, | |

| | ストリーミング | | (S3, Redshift | | Python, Scala) | |

| | | | OpenSearch等) | | | |

| +------------------+ +------------------+ +------------------+ |

| |

| +------------------+ |

| | Kinesis Video | |

| | Streams | |

| | | |

| | ビデオ | |

| | ストリーミング | |

| | 取込・分析 | |

| +------------------+ |

+------------------------------------------------------------------+

| サービス | 主な用途 | データタイプ | レイテンシ |

| ------------- | --------------------------- | ------------------ | ------------------------ |

| Data Streams | リアルタイムデータ収集/処理 | レコード(バイト) | リアルタイム(ミリ秒) |

| Data Firehose | データ配信パイプライン | レコード(バイト) | 準リアルタイム(60秒〜) |

| Managed Flink | ストリーム分析/変換 | ストリームデータ | リアルタイム(ミリ秒) |

| Video Streams | ビデオ取込/再生 | メディアフレーム | リアルタイム(1〜10秒) |

3. Kinesis Data Streams 深掘り

3.1 コアアーキテクチャ

Kinesis Data Streamsは大規模リアルタイムデータストリーミングサービスの中核です。

Kinesis Data Stream

+----------------------------------------------------------------------+

| |

| Producer Shard 1 Shard 2 Shard 3 |

| -------- +----------+ +----------+ +----------+ |

| |App A | --> |Record 1 | --> |Record 4 | --> |Record 7 | |

| |App B | --> |Record 2 | --> |Record 5 | --> |Record 8 | |

| |App C | --> |Record 3 | --> |Record 6 | --> |Record 9 | |

| -------- +----------+ +----------+ +----------+ |

| | | | |

| v v v |

| Consumer A Consumer A Consumer A |

| Consumer B Consumer B Consumer B |

+----------------------------------------------------------------------+

3.2 コア構成要素

**ストリーム (Stream)**

- シャードの論理的グループ

- データレコードの順序が保証される単位

**シャード (Shard)**

- ストリームの基本スループット単位

- 書き込み: 1MB/秒 または 1,000レコード/秒

- 読み取り: 2MB/秒(共有ファンアウト)、2MB/秒/コンシューマ(拡張ファンアウト)

**レコード (Record)**

- データの基本単位

- パーティションキー、シーケンス番号、データブロブ(最大1MB)で構成

**パーティションキー (Partition Key)**

- レコードがどのシャードに割り当てられるかを決定

- MD5ハッシュを使用してシャードにマッピング

- 同じパーティションキーのレコードは同じシャードに順序通り格納

**シーケンス番号 (Sequence Number)**

- 各レコードに自動割り当てされる一意の識別子

- シャード内でのレコードの順序を保証

パーティションキー ハッシュ過程

================================

Partition Key: "user-123"

|

v

MD5("user-123") = 0x7A3B...

|

v

Hash Range: 0 ~ 2^128 - 1

|

v

Shard 1: [0 ~ 2^127] <-- この範囲にマッピング

Shard 2: [2^127 ~ 2^128 - 1]

3.3 プロデューサー (Producers)

Kinesisストリームにデータを送信する方法は複数あります。

**1) PutRecord / PutRecords API**

最も基本的な方式です。

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')

単一レコード送信

response = kinesis.put_record(

StreamName='my-data-stream',

Data=json.dumps({

'event_type': 'page_view',

'user_id': 'user-123',

'page': '/products/laptop',

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8'),

PartitionKey='user-123'

)

print(f"Shard ID: {response['ShardId']}")

print(f"Sequence Number: {response['SequenceNumber']}")

バッチレコード送信

records = []

for i in range(100):

records.append({

'Data': json.dumps({

'event_type': 'click',

'user_id': f'user-{i % 10}',

'element': 'buy_button',

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8'),

'PartitionKey': f'user-{i % 10}'

})

response = kinesis.put_records(

StreamName='my-data-stream',

Records=records

)

print(f"Failed records: {response['FailedRecordCount']}")

**2) KPL (Kinesis Producer Library)**

高性能プロデューシングのためのライブラリです。

- **レコード集約 (Aggregation)**: 複数の小さなレコードを1つのKinesisレコードにまとめる

- **レコード収集 (Collection)**: 複数のKinesisレコードを1つのPutRecords呼び出しにまとめる

- **自動リトライ**: 失敗したレコードを自動的に再送信

- **CloudWatchメトリクス**: 自動的にパフォーマンスメトリクスを発行

KPL 集約プロセス

=================

User Record A (100 bytes) --+

User Record B (200 bytes) --+--> Kinesis Record (500 bytes)

User Record C (200 bytes) --+ |

v

User Record D (300 bytes) --+ PutRecords API

User Record E (400 bytes) --+--> (単一呼び出し)

User Record F (100 bytes) --+

**3) AWS SDK直接使用**

各種言語のAWS SDKを通じて直接APIを呼び出すことができます。

**4) Kinesis Agent**

サーバーにインストールしてログファイルを自動的にストリームに送信するエージェントです。

3.4 コンシューマー (Consumers)

**1) 共有ファンアウト (Shared Fan-Out) - GetRecords**

デフォルトのコンシューマー方式で、シャードあたり2MB/秒の読み取りスループットを全コンシューマーで共有します。

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')

シャードイテレーター取得

response = kinesis.get_shard_iterator(

StreamName='my-data-stream',

ShardId='shardId-000000000000',

ShardIteratorType='LATEST' # TRIM_HORIZON, AT_SEQUENCE_NUMBER, AT_TIMESTAMP等

)

shard_iterator = response['ShardIterator']

レコードポーリング

while True:

response = kinesis.get_records(

ShardIterator=shard_iterator,

Limit=100

)

for record in response['Records']:

data = json.loads(record['Data'].decode('utf-8'))

print(f"Partition Key: {record['PartitionKey']}")

print(f"Sequence Number: {record['SequenceNumber']}")

print(f"Data: {data}")

shard_iterator = response['NextShardIterator']

GetRecordsはシャードあたり毎秒5回まで呼び出し可能

time.sleep(0.2)

**2) 拡張ファンアウト (Enhanced Fan-Out) - SubscribeToShard**

HTTP/2を使用したプッシュベースの配信方式です。

共有ファンアウト vs 拡張ファンアウト

=======================================

共有ファンアウト (Shared Fan-Out):

+--------+ +--------+

| Shard | --> | 2MB/s | --> Consumer A (ポーリング)

| | | (共有) | --> Consumer B (ポーリング)

+--------+ +--------+ Consumer C (ポーリング)

3つのコンシューマーが2MB/sを共有

各コンシューマー: ~0.67 MB/s

拡張ファンアウト (Enhanced Fan-Out):

+--------+ +--------+

| Shard | --> | 2MB/s | --> Consumer A (HTTP/2 プッシュ)

| | | 2MB/s | --> Consumer B (HTTP/2 プッシュ)

| | | 2MB/s | --> Consumer C (HTTP/2 プッシュ)

+--------+ +--------+

各コンシューマーに専用の2MB/sを割当

レイテンシ: ~70ms (共有: ~200ms+)

3.5 KCL (Kinesis Client Library)

KCLは分散コンシューマーアプリケーション開発を簡素化するライブラリです。

**コア機能:**

- **リース管理**: DynamoDBテーブルを使用してシャードごとの所有権を追跡

- **チェックポインティング**: 処理進捗をDynamoDBに保存して障害復旧をサポート

- **自動ロードバランシング**: ワーカー数に応じてシャードを自動再分配

- **リシャーディング処理**: シャード分割・マージ時に自動対応

KCL アーキテクチャ

===================

DynamoDB (Lease Table)

+---------------------+

| Shard ID | Worker |

|----------|----------|

| shard-0 | worker-1 |

| shard-1 | worker-2 |

| shard-2 | worker-1 |

| shard-3 | worker-2 |

+---------------------+

^ ^

| |

+-------+ +-------+

| |

+-----------+ +-----------+

| Worker 1 | | Worker 2 |

| (EC2/ECS) | | (EC2/ECS) |

| | | |

| shard-0 | | shard-1 |

| shard-2 | | shard-3 |

+-----------+ +-----------+

3.6 データ保持期間

- **デフォルト**: 24時間

- **最大**: 365日

- 保持期間が長いほどコスト増加

- データリプレイが必要な場合は保持期間を延長

3.7 シャード分割とマージ

シャード分割 (Split)

=====================

Shard 1 [0 ~ 100]

|

split at 50

|

+----+----+

| |

Shard 3 Shard 4

[0 ~ 50] [51 ~ 100]

シャードマージ (Merge)

=======================

Shard 3 [0 ~ 50] --> Shard 5

Shard 4 [51 ~ 100] --> [0 ~ 100]

- **分割**: ホットシャードの負荷を分散する時に使用

- **マージ**: トラフィックが減少した2つの隣接シャードを統合する時に使用

- オンデマンドモードでは自動的に処理

3.8 キャパシティモード

**プロビジョニングモード (Provisioned Mode)**

- シャード数を直接指定

- 予測可能なワークロードに適合

- シャード時間あたりの課金

**オンデマンドモード (On-Demand Mode)**

- 自動的にシャード数を調整

- 予測不可能なワークロードに適合

- データ量ベースの課金

- 2025年新規: On-Demand Advantageモード(従来比60%安いデータ使用料)

4. Amazon Data Firehose(旧Kinesis Data Firehose)

4.1 概要

Amazon Data Firehoseは、ストリーミングデータをデータレイク、データストア、

分析サービスに安定的に配信する完全マネージドサービスです。

Amazon Data Firehose アーキテクチャ

======================================

+----------+ +-----------+ +------------+ +----------+

| ソース | | Firehose | | 変換 | | 配信先 |

| | --> | 配信 | --> | (任意) | --> | |

| - Direct | | ストリーム| | - Lambda | | - S3 |

| - Kinesis| | | | - フォーマ | | - Redshift|

| Data | | バッファ: | | ット変換 | | - Open- |

| Streams| | - サイズ | | - 圧縮 | | Search |

| - MSK | | - 時間 | | - 暗号化 | | - Splunk |

| | | | | | | - HTTP |

+----------+ +-----------+ +------------+ +----------+

|

+----------+

| バックアップ|

| S3 (原本) |

+----------+

4.2 主要特徴

**シャード管理不要**

- Data Streamsと異なりシャードを直接管理する必要なし

- 自動的にスケールアウト/イン

**バッファリング設定**

- **サイズベース**: 1MB〜128MB

- **時間ベース**: 60秒〜900秒

- どちらかの条件に達したら配信

**データ変換**

- Lambda関数によるカスタム変換

- Apache Parquet、ORC等のカラムナーフォーマットへの自動変換

- Gzip、Snappy、Zip圧縮サポート

- SSE-S3またはSSE-KMS暗号化

4.3 Firehose配信先

| 配信先 | 特徴 |

| ------------------ | ---------------------------------- |

| Amazon S3 | 最も一般的、Parquet/ORC変換可能 |

| Amazon Redshift | S3経由後COPYコマンドでロード |

| Amazon OpenSearch | ログ分析、検索インデキシング |

| Splunk | セキュリティモニタリング、運用ログ |

| HTTPエンドポイント | カスタム配信先、Datadog等 |

| Snowflake | クラウドデータウェアハウス |

| Apache Iceberg | オープンテーブルフォーマット |

4.4 コード例: Firehose送信

firehose = boto3.client('firehose', region_name='ap-northeast-1')

単一レコード送信

response = firehose.put_record(

DeliveryStreamName='my-firehose-stream',

Record={

'Data': json.dumps({

'event_type': 'purchase',

'user_id': 'user-456',

'product': 'laptop',

'amount': 1299.99,

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8')

}

)

バッチ送信

records = []

for i in range(50):

records.append({

'Data': json.dumps({

'event_type': 'page_view',

'user_id': f'user-{i}',

'page': f'/product/{i}',

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8')

})

response = firehose.put_record_batch(

DeliveryStreamName='my-firehose-stream',

Records=records

)

print(f"Failed records: {response['FailedPutCount']}")

5. Kinesis Video Streams

5.1 概要

Kinesis Video Streamsは、カメラ、RADAR、LIDAR、ドローン等から発生するビデオおよびメディアデータを

安全に取り込み、再生できる完全マネージドサービスです。

5.2 主要機能

Kinesis Video Streams アーキテクチャ

=======================================

+----------+ +------------------+ +------------------+

| デバイス | | Kinesis Video | | コンシューマー |

| | --> | Streams | --> | |

| - カメラ | | | | - HLS再生 |

| - ドローン| | - 自動スケール | | - DASH再生 |

| - LIDAR | | - 耐久性ストレージ| | - GetMedia API |

| - スマート| | - 暗号化 | | - Rekognition |

| フォン | | | | - SageMaker |

+----------+ +------------------+ +------------------+

- **HLS (HTTP Live Streaming)**: Webブラウザとモバイルでライブ/アーカイブ再生可能

- **WebRTC**: 超低遅延双方向メディアストリーミング

- **Amazon Rekognition連携**: 顔認識、オブジェクト検出等のコンピュータビジョン分析

- **ストレージ階層**: ホットストレージ(リアルタイムアクセス)とウォームストレージ(コスト効率的保管)

6. 料金モデル

6.1 Kinesis Data Streams料金

**プロビジョニングモード:**

| 項目 | コスト(米国東部基準) |

| ------------------------------------------- | --------------------------------------- |

| シャード時間 | ~0.015 USD/シャード/時間 |

| PUTペイロードユニット (25KB) | ~0.014 USD/百万ユニット |

| 拡張ファンアウト データ取得 | ~0.013 USD/GB |

| 拡張ファンアウト コンシューマーシャード時間 | ~0.015 USD/コンシューマー/シャード/時間 |

| 長期保持(24時間超過) | ~0.023 USD/シャード/時間 |

**オンデマンドモード:**

| 項目 | コスト(米国東部基準) |

| -------------------------- | ---------------------- |

| ストリーム時間 (Standard) | ~0.04 USD/時間 |

| データ書き込み (Standard) | ~0.08 USD/GB |

| データ読み取り (Standard) | ~0.04 USD/GB |

| データ書き込み (Advantage) | ~0.032 USD/GB |

| データ読み取り (Advantage) | ~0.016 USD/GB |

6.2 Amazon Data Firehose料金

| 項目 | コスト(米国東部基準) |

| ---------------------------- | ----------------------------- |

| データ取込(最初の500TB/月) | ~0.029 USD/GB |

| フォーマット変換 | ~0.018 USD/GB |

| VPC配信 | ~0.01 USD/GB + 時間あたり料金 |

7. エンドツーエンドアーキテクチャ例:リアルタイム分析パイプライン

リアルタイム分析パイプラインアーキテクチャ

=============================================

+----------+ +---------+ +----------+ +---------+ +----------+

| Web/ | | Kinesis | | Lambda | | Firehose| | S3 |

| Mobile |-->| Data |-->| (変換/ |-->| 配信 |-->| (Data |

| App | | Streams | | フィルタ)| | ストリーム| | Lake) |

+----------+ +---------+ +----------+ +---------+ +----------+

| |

v v

+-----------+ +----------+

| Managed | | Athena |

| Flink | | (Ad-hoc |

| (リアル | | クエリ) |

| タイム分析)| +----------+

+-----------+ |

| v

v +----------+

+-----------+ | Quick- |

| DynamoDB | | Sight |

| (リアル | | (BI |

| タイムDB)| |ダッシュ |

+-----------+ | ボード) |

+----------+

7.1 完全なプロデューサー/コンシューマー例

=== producer.py ===

from datetime import datetime

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')

STREAM_NAME = 'clickstream-data'

def generate_click_event():

"""クリックストリームイベント生成"""

pages = ['/home', '/products', '/cart', '/checkout', '/profile']

actions = ['view', 'click', 'scroll', 'submit']

user_id = f'user-{random.randint(1, 1000)}'

return {

'user_id': user_id,

'page': random.choice(pages),

'action': random.choice(actions),

'session_id': f'sess-{random.randint(1, 100)}',

'timestamp': datetime.utcnow().isoformat() + 'Z',

'device': random.choice(['mobile', 'desktop', 'tablet']),

'country': random.choice(['KR', 'US', 'JP', 'DE'])

}

def send_events(batch_size=50, interval=1.0):

"""イベントをKinesisストリームに送信"""

while True:

records = []

for _ in range(batch_size):

event = generate_click_event()

records.append({

'Data': json.dumps(event).encode('utf-8'),

'PartitionKey': event['user_id']

})

try:

response = kinesis.put_records(

StreamName=STREAM_NAME,

Records=records

)

failed = response['FailedRecordCount']

if failed > 0:

print(f"Warning: {failed} records failed")

for i, record_response in enumerate(response['Records']):

if 'ErrorCode' in record_response:

print(f" Error: {record_response['ErrorCode']}")

else:

print(f"Sent {batch_size} records successfully")

except Exception as e:

print(f"Error: {e}")

time.sleep(interval)

if __name__ == '__main__':

send_events()

=== consumer.py ===

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')

STREAM_NAME = 'clickstream-data'

def get_shard_ids():

"""ストリームの全シャードIDを返す"""

response = kinesis.describe_stream(StreamName=STREAM_NAME)

return [

shard['ShardId']

for shard in response['StreamDescription']['Shards']

]

def process_records(records):

"""レコード処理ロジック"""

page_views = {}

for record in records:

data = json.loads(record['Data'].decode('utf-8'))

page = data.get('page', 'unknown')

page_views[page] = page_views.get(page, 0) + 1

異常行動検知の例

if data.get('action') == 'submit' and data.get('page') == '/checkout':

print(f"[ALERT] Checkout event: user={data['user_id']}")

for page, count in page_views.items():

print(f" Page: {page}, Views: {count}")

def consume_stream():

"""ストリームからデータを読み取り処理"""

shard_ids = get_shard_ids()

print(f"Found {len(shard_ids)} shards")

shard_iterators = {}

for shard_id in shard_ids:

response = kinesis.get_shard_iterator(

StreamName=STREAM_NAME,

ShardId=shard_id,

ShardIteratorType='LATEST'

)

shard_iterators[shard_id] = response['ShardIterator']

while True:

for shard_id in shard_ids:

try:

response = kinesis.get_records(

ShardIterator=shard_iterators[shard_id],

Limit=100

)

if response['Records']:

print(f"\n--- Shard: {shard_id} ---")

print(f"Records received: {len(response['Records'])}")

process_records(response['Records'])

shard_iterators[shard_id] = response['NextShardIterator']

except kinesis.exceptions.ExpiredIteratorException:

response = kinesis.get_shard_iterator(

StreamName=STREAM_NAME,

ShardId=shard_id,

ShardIteratorType='LATEST'

)

shard_iterators[shard_id] = response['ShardIterator']

time.sleep(1)

if __name__ == '__main__':

consume_stream()

8. 主要制限事項と参考情報

| 項目 | 制限 |

| -------------------------------------------------------- | --------------------------------- |

| レコード最大サイズ | 1 MB |

| PutRecordsリクエストあたりの最大レコード数 | 500個 |

| PutRecordsリクエストあたりの最大サイズ | 5 MB |

| シャードあたりの書き込みスループット | 1 MB/秒 または 1,000レコード/秒 |

| シャードあたりの読み取りスループット(共有) | 2 MB/秒、GetRecords毎秒5回 |

| シャードあたりの読み取りスループット(拡張ファンアウト) | コンシューマーあたり2 MB/秒 |

| 最大登録コンシューマー(拡張ファンアウト) | ストリームあたり20個 |

| データ保持 | 24時間(デフォルト)〜365日 |

| ストリームあたりの最大シャード数 | デフォルト500(上限緩和申請可能) |

9. まとめ

AWS Kinesisファミリーは、リアルタイムデータストリーミングのための包括的なソリューションを提供します。

- **Kinesis Data Streams**: リアルタイムデータ収集と処理の中核。シャードベースのアーキテクチャで

スケーラブルであり、KCLと拡張ファンアウトを通じて多様なコンシューマーパターンをサポートします。

- **Amazon Data Firehose**: データをS3、Redshift、OpenSearch等に自動的に配信するパイプライン。

シャード管理なしで簡単に使用できます。

- **Managed Flink**: Apache Flinkベースのマネージドストリーム分析サービスで、

SQLとコードベースの分析の両方をサポートします。

- **Video Streams**: ビデオ/メディアデータの取込、保存、再生のための専門サービスです。

次の記事では、Kinesisの実践アーキテクチャパターンとApache Kafka、SQSとの比較を取り上げます。

현재 단락 (1/489)

従来のデータ処理はバッチ方式でした。一定期間データを蓄積してから一括処理する方式です。

작성 글자: 0원문 글자: 14,052작성 단락: 0/489