Skip to content
Published on

[AWS] Kinesis完全ガイド:リアルタイムストリーミングデータ処理の全て

Authors

1. ストリーミングデータとは

1.1 バッチ処理 vs ストリーム処理

従来のデータ処理はバッチ方式でした。一定期間データを蓄積してから一括処理する方式です。 しかし、現代のアプリケーションではリアルタイムの応答が必須となっています。

バッチ処理 (Batch Processing)
============================
[データ収集] --> [ストレージ] --> [定期処理] --> [結果]
     |                               |
     +---- 数分 〜 数時間 -----------+

ストリーム処理 (Stream Processing)
==================================
[データ生成] --> [ストリーム] --> [即時処理] --> [結果]
     |                              |
     +---- ミリ秒 〜 数秒 ---------+

1.2 ストリーミングデータの活用事例

  • リアルタイムログ分析: サーバーログを即時収集して異常検知
  • IoTセンサーデータ: 数百万台のデバイスから発生するテレメトリデータ
  • クリックストリーム分析: ユーザー行動をリアルタイムで追跡してパーソナライズ
  • 金融取引モニタリング: 不正検知のためのリアルタイムトランザクション分析
  • ソーシャルメディアフィード: リアルタイムトレンドとセンチメント分析

2. AWS Kinesisファミリー概要

AWS Kinesisは、リアルタイムストリーミングデータを収集、処理、分析するための完全マネージドサービス群です。

+------------------------------------------------------------------+
|                    AWS Kinesisファミリー                            |
+------------------------------------------------------------------+
|                                                                    |
|  +------------------+  +------------------+  +------------------+  |
|  | Kinesis Data     |  | Amazon Data      |  | Managed Service  |  |
|  | Streams          |  | Firehose         |  | for Apache Flink |  |
|  |                  |  |                  |  |                  |  |
|  | リアルタイム     |  | 配信パイプ      |  | ストリーム分析   |  |
|  | データ           |  | ライン           |  | (SQL, Java,      |  |
|  | ストリーミング   |  | (S3, Redshift   |  |  Python, Scala)  |  |
|  |                  |  |  OpenSearch等)   |  |                  |  |
|  +------------------+  +------------------+  +------------------+  |
|                                                                    |
|  +------------------+                                              |
|  | Kinesis Video    |                                              |
|  | Streams          |                                              |
|  |                  |                                              |
|  | ビデオ           |                                              |
|  | ストリーミング   |                                              |
|  | 取込・分析       |                                              |
|  +------------------+                                              |
+------------------------------------------------------------------+
サービス主な用途データタイプレイテンシ
Data Streamsリアルタイムデータ収集/処理レコード(バイト)リアルタイム(ミリ秒)
Data Firehoseデータ配信パイプラインレコード(バイト)準リアルタイム(60秒〜)
Managed Flinkストリーム分析/変換ストリームデータリアルタイム(ミリ秒)
Video Streamsビデオ取込/再生メディアフレームリアルタイム(1〜10秒)

3. Kinesis Data Streams 深掘り

3.1 コアアーキテクチャ

Kinesis Data Streamsは大規模リアルタイムデータストリーミングサービスの中核です。

                        Kinesis Data Stream
+----------------------------------------------------------------------+
|                                                                      |
|  Producer        Shard 1          Shard 2          Shard 3           |
|  --------     +----------+     +----------+     +----------+         |
|  |App A | --> |Record 1  | --> |Record 4  | --> |Record 7  |         |
|  |App B | --> |Record 2  | --> |Record 5  | --> |Record 8  |         |
|  |App C | --> |Record 3  | --> |Record 6  | --> |Record 9  |         |
|  --------     +----------+     +----------+     +----------+         |
|                    |                |                |                |
|                    v                v                v                |
|               Consumer A       Consumer A       Consumer A           |
|               Consumer B       Consumer B       Consumer B           |
+----------------------------------------------------------------------+

3.2 コア構成要素

ストリーム (Stream)

  • シャードの論理的グループ
  • データレコードの順序が保証される単位

シャード (Shard)

  • ストリームの基本スループット単位
  • 書き込み: 1MB/秒 または 1,000レコード/秒
  • 読み取り: 2MB/秒(共有ファンアウト)、2MB/秒/コンシューマ(拡張ファンアウト)

レコード (Record)

  • データの基本単位
  • パーティションキー、シーケンス番号、データブロブ(最大1MB)で構成

パーティションキー (Partition Key)

  • レコードがどのシャードに割り当てられるかを決定
  • MD5ハッシュを使用してシャードにマッピング
  • 同じパーティションキーのレコードは同じシャードに順序通り格納

シーケンス番号 (Sequence Number)

  • 各レコードに自動割り当てされる一意の識別子
  • シャード内でのレコードの順序を保証
パーティションキー ハッシュ過程
================================

Partition Key: "user-123"
         |
         v
    MD5("user-123") = 0x7A3B...
         |
         v
    Hash Range: 0 ~ 2^128 - 1
         |
         v
    Shard 1: [0 ~ 2^127]          <-- この範囲にマッピング
    Shard 2: [2^127 ~ 2^128 - 1]

3.3 プロデューサー (Producers)

Kinesisストリームにデータを送信する方法は複数あります。

1) PutRecord / PutRecords API

最も基本的な方式です。

import boto3
import json

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')

# 単一レコード送信
response = kinesis.put_record(
    StreamName='my-data-stream',
    Data=json.dumps({
        'event_type': 'page_view',
        'user_id': 'user-123',
        'page': '/products/laptop',
        'timestamp': '2026-03-20T10:30:00Z'
    }).encode('utf-8'),
    PartitionKey='user-123'
)
print(f"Shard ID: {response['ShardId']}")
print(f"Sequence Number: {response['SequenceNumber']}")

# バッチレコード送信
records = []
for i in range(100):
    records.append({
        'Data': json.dumps({
            'event_type': 'click',
            'user_id': f'user-{i % 10}',
            'element': 'buy_button',
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8'),
        'PartitionKey': f'user-{i % 10}'
    })

response = kinesis.put_records(
    StreamName='my-data-stream',
    Records=records
)
print(f"Failed records: {response['FailedRecordCount']}")

2) KPL (Kinesis Producer Library)

高性能プロデューシングのためのライブラリです。

  • レコード集約 (Aggregation): 複数の小さなレコードを1つのKinesisレコードにまとめる
  • レコード収集 (Collection): 複数のKinesisレコードを1つのPutRecords呼び出しにまとめる
  • 自動リトライ: 失敗したレコードを自動的に再送信
  • CloudWatchメトリクス: 自動的にパフォーマンスメトリクスを発行
KPL 集約プロセス
=================

User Record A (100 bytes) --+
User Record B (200 bytes) --+--> Kinesis Record (500 bytes)
User Record C (200 bytes) --+         |
                                      v
User Record D (300 bytes) --+    PutRecords API
User Record E (400 bytes) --+--> (単一呼び出し)
User Record F (100 bytes) --+

3) AWS SDK直接使用

各種言語のAWS SDKを通じて直接APIを呼び出すことができます。

4) Kinesis Agent

サーバーにインストールしてログファイルを自動的にストリームに送信するエージェントです。

3.4 コンシューマー (Consumers)

1) 共有ファンアウト (Shared Fan-Out) - GetRecords

デフォルトのコンシューマー方式で、シャードあたり2MB/秒の読み取りスループットを全コンシューマーで共有します。

import boto3
import json
import time

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')

# シャードイテレーター取得
response = kinesis.get_shard_iterator(
    StreamName='my-data-stream',
    ShardId='shardId-000000000000',
    ShardIteratorType='LATEST'  # TRIM_HORIZON, AT_SEQUENCE_NUMBER, AT_TIMESTAMP等
)
shard_iterator = response['ShardIterator']

# レコードポーリング
while True:
    response = kinesis.get_records(
        ShardIterator=shard_iterator,
        Limit=100
    )

    for record in response['Records']:
        data = json.loads(record['Data'].decode('utf-8'))
        print(f"Partition Key: {record['PartitionKey']}")
        print(f"Sequence Number: {record['SequenceNumber']}")
        print(f"Data: {data}")

    shard_iterator = response['NextShardIterator']

    # GetRecordsはシャードあたり毎秒5回まで呼び出し可能
    time.sleep(0.2)

2) 拡張ファンアウト (Enhanced Fan-Out) - SubscribeToShard

HTTP/2を使用したプッシュベースの配信方式です。

共有ファンアウト vs 拡張ファンアウト
=======================================

共有ファンアウト (Shared Fan-Out):
+--------+     +--------+
| Shard  | --> | 2MB/s  | --> Consumer A (ポーリング)
|        |     | (共有) | --> Consumer B (ポーリング)
+--------+     +--------+     Consumer C (ポーリング)

3つのコンシューマーが2MB/sを共有
各コンシューマー: ~0.67 MB/s

拡張ファンアウト (Enhanced Fan-Out):
+--------+     +--------+
| Shard  | --> | 2MB/s  | --> Consumer A (HTTP/2 プッシュ)
|        |     | 2MB/s  | --> Consumer B (HTTP/2 プッシュ)
|        |     | 2MB/s  | --> Consumer C (HTTP/2 プッシュ)
+--------+     +--------+

各コンシューマーに専用の2MB/sを割当
レイテンシ: ~70ms (共有: ~200ms+)

3.5 KCL (Kinesis Client Library)

KCLは分散コンシューマーアプリケーション開発を簡素化するライブラリです。

コア機能:

  • リース管理: DynamoDBテーブルを使用してシャードごとの所有権を追跡
  • チェックポインティング: 処理進捗をDynamoDBに保存して障害復旧をサポート
  • 自動ロードバランシング: ワーカー数に応じてシャードを自動再分配
  • リシャーディング処理: シャード分割・マージ時に自動対応
KCL アーキテクチャ
===================

                DynamoDB (Lease Table)
                +---------------------+
                | Shard ID | Worker   |
                |----------|----------|
                | shard-0  | worker-1 |
                | shard-1  | worker-2 |
                | shard-2  | worker-1 |
                | shard-3  | worker-2 |
                +---------------------+
                      ^         ^
                      |         |
              +-------+         +-------+
              |                         |
        +-----------+           +-----------+
        | Worker 1  |           | Worker 2  |
        | (EC2/ECS) |           | (EC2/ECS) |
        |           |           |           |
        | shard-0   |           | shard-1   |
        | shard-2   |           | shard-3   |
        +-----------+           +-----------+

3.6 データ保持期間

  • デフォルト: 24時間
  • 最大: 365日
  • 保持期間が長いほどコスト増加
  • データリプレイが必要な場合は保持期間を延長

3.7 シャード分割とマージ

シャード分割 (Split)
=====================
Shard 1 [0 ~ 100]
         |
    split at 50
         |
    +----+----+
    |         |
Shard 3    Shard 4
[0 ~ 50]  [51 ~ 100]

シャードマージ (Merge)
=======================
Shard 3 [0 ~ 50]     --> Shard 5
Shard 4 [51 ~ 100]   --> [0 ~ 100]
  • 分割: ホットシャードの負荷を分散する時に使用
  • マージ: トラフィックが減少した2つの隣接シャードを統合する時に使用
  • オンデマンドモードでは自動的に処理

3.8 キャパシティモード

プロビジョニングモード (Provisioned Mode)

  • シャード数を直接指定
  • 予測可能なワークロードに適合
  • シャード時間あたりの課金

オンデマンドモード (On-Demand Mode)

  • 自動的にシャード数を調整
  • 予測不可能なワークロードに適合
  • データ量ベースの課金
  • 2025年新規: On-Demand Advantageモード(従来比60%安いデータ使用料)

4. Amazon Data Firehose(旧Kinesis Data Firehose)

4.1 概要

Amazon Data Firehoseは、ストリーミングデータをデータレイク、データストア、 分析サービスに安定的に配信する完全マネージドサービスです。

Amazon Data Firehose アーキテクチャ
======================================

+----------+     +-----------+     +------------+     +----------+
| ソース   |     | Firehose  |     | 変換       |     | 配信先   |
|          | --> | 配信      | --> | (任意)     | --> |          |
| - Direct |     | ストリーム|     | - Lambda   |     | - S3     |
| - Kinesis|     |           |     | - フォーマ |     | - Redshift|
|   Data   |     | バッファ: |     |   ット変換 |     | - Open-  |
|   Streams|     | - サイズ  |     | - 圧縮     |     |   Search |
| - MSK    |     | - 時間    |     | - 暗号化   |     | - Splunk |
|          |     |           |     |            |     | - HTTP   |
+----------+     +-----------+     +------------+     +----------+
                                                           |
                                                      +----------+
                                                      | バックアップ|
                                                      | S3 (原本) |
                                                      +----------+

4.2 主要特徴

シャード管理不要

  • Data Streamsと異なりシャードを直接管理する必要なし
  • 自動的にスケールアウト/イン

バッファリング設定

  • サイズベース: 1MB〜128MB
  • 時間ベース: 60秒〜900秒
  • どちらかの条件に達したら配信

データ変換

  • Lambda関数によるカスタム変換
  • Apache Parquet、ORC等のカラムナーフォーマットへの自動変換
  • Gzip、Snappy、Zip圧縮サポート
  • SSE-S3またはSSE-KMS暗号化

4.3 Firehose配信先

配信先特徴
Amazon S3最も一般的、Parquet/ORC変換可能
Amazon RedshiftS3経由後COPYコマンドでロード
Amazon OpenSearchログ分析、検索インデキシング
Splunkセキュリティモニタリング、運用ログ
HTTPエンドポイントカスタム配信先、Datadog等
Snowflakeクラウドデータウェアハウス
Apache Icebergオープンテーブルフォーマット

4.4 コード例: Firehose送信

import boto3
import json

firehose = boto3.client('firehose', region_name='ap-northeast-1')

# 単一レコード送信
response = firehose.put_record(
    DeliveryStreamName='my-firehose-stream',
    Record={
        'Data': json.dumps({
            'event_type': 'purchase',
            'user_id': 'user-456',
            'product': 'laptop',
            'amount': 1299.99,
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8')
    }
)

# バッチ送信
records = []
for i in range(50):
    records.append({
        'Data': json.dumps({
            'event_type': 'page_view',
            'user_id': f'user-{i}',
            'page': f'/product/{i}',
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8')
    })

response = firehose.put_record_batch(
    DeliveryStreamName='my-firehose-stream',
    Records=records
)
print(f"Failed records: {response['FailedPutCount']}")

5. Kinesis Video Streams

5.1 概要

Kinesis Video Streamsは、カメラ、RADAR、LIDAR、ドローン等から発生するビデオおよびメディアデータを 安全に取り込み、再生できる完全マネージドサービスです。

5.2 主要機能

Kinesis Video Streams アーキテクチャ
=======================================

+----------+     +------------------+     +------------------+
| デバイス |     | Kinesis Video    |     | コンシューマー   |
|          | --> | Streams          | --> |                  |
| - カメラ |     |                  |     | - HLS再生        |
| - ドローン|    | - 自動スケール   |     | - DASH再生       |
| - LIDAR  |     | - 耐久性ストレージ|    | - GetMedia API   |
| - スマート|    | - 暗号化         |     | - Rekognition    |
|   フォン |     |                  |     | - SageMaker      |
+----------+     +------------------+     +------------------+
  • HLS (HTTP Live Streaming): Webブラウザとモバイルでライブ/アーカイブ再生可能
  • WebRTC: 超低遅延双方向メディアストリーミング
  • Amazon Rekognition連携: 顔認識、オブジェクト検出等のコンピュータビジョン分析
  • ストレージ階層: ホットストレージ(リアルタイムアクセス)とウォームストレージ(コスト効率的保管)

6. 料金モデル

6.1 Kinesis Data Streams料金

プロビジョニングモード:

項目コスト(米国東部基準)
シャード時間~0.015 USD/シャード/時間
PUTペイロードユニット (25KB)~0.014 USD/百万ユニット
拡張ファンアウト データ取得~0.013 USD/GB
拡張ファンアウト コンシューマーシャード時間~0.015 USD/コンシューマー/シャード/時間
長期保持(24時間超過)~0.023 USD/シャード/時間

オンデマンドモード:

項目コスト(米国東部基準)
ストリーム時間 (Standard)~0.04 USD/時間
データ書き込み (Standard)~0.08 USD/GB
データ読み取り (Standard)~0.04 USD/GB
データ書き込み (Advantage)~0.032 USD/GB
データ読み取り (Advantage)~0.016 USD/GB

6.2 Amazon Data Firehose料金

項目コスト(米国東部基準)
データ取込(最初の500TB/月)~0.029 USD/GB
フォーマット変換~0.018 USD/GB
VPC配信~0.01 USD/GB + 時間あたり料金

7. エンドツーエンドアーキテクチャ例:リアルタイム分析パイプライン

リアルタイム分析パイプラインアーキテクチャ
=============================================

+----------+   +---------+   +----------+   +---------+   +----------+
| Web/     |   | Kinesis |   | Lambda   |   | Firehose|   | S3       |
| Mobile   |-->| Data    |-->| (変換/   |-->| 配信    |-->| (Data    |
| App      |   | Streams |   |  フィルタ)|  | ストリーム|  |  Lake)   |
+----------+   +---------+   +----------+   +---------+   +----------+
                    |                                           |
                    v                                           v
              +-----------+                              +----------+
              | Managed   |                              | Athena   |
              | Flink     |                              | (Ad-hoc  |
              | (リアル   |                              |  クエリ) |
              |  タイム分析)|                             +----------+
              +-----------+                                    |
                    |                                          v
                    v                                    +----------+
              +-----------+                              | Quick-   |
              | DynamoDB  |                              | Sight    |
              | (リアル   |                              | (BI      |
              |  タイムDB)|                              |ダッシュ  |
              +-----------+                              | ボード)  |
                                                         +----------+

7.1 完全なプロデューサー/コンシューマー例

# === producer.py ===
import boto3
import json
import time
import random
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
STREAM_NAME = 'clickstream-data'

def generate_click_event():
    """クリックストリームイベント生成"""
    pages = ['/home', '/products', '/cart', '/checkout', '/profile']
    actions = ['view', 'click', 'scroll', 'submit']
    user_id = f'user-{random.randint(1, 1000)}'

    return {
        'user_id': user_id,
        'page': random.choice(pages),
        'action': random.choice(actions),
        'session_id': f'sess-{random.randint(1, 100)}',
        'timestamp': datetime.utcnow().isoformat() + 'Z',
        'device': random.choice(['mobile', 'desktop', 'tablet']),
        'country': random.choice(['KR', 'US', 'JP', 'DE'])
    }

def send_events(batch_size=50, interval=1.0):
    """イベントをKinesisストリームに送信"""
    while True:
        records = []
        for _ in range(batch_size):
            event = generate_click_event()
            records.append({
                'Data': json.dumps(event).encode('utf-8'),
                'PartitionKey': event['user_id']
            })

        try:
            response = kinesis.put_records(
                StreamName=STREAM_NAME,
                Records=records
            )

            failed = response['FailedRecordCount']
            if failed > 0:
                print(f"Warning: {failed} records failed")

                for i, record_response in enumerate(response['Records']):
                    if 'ErrorCode' in record_response:
                        print(f"  Error: {record_response['ErrorCode']}")
            else:
                print(f"Sent {batch_size} records successfully")

        except Exception as e:
            print(f"Error: {e}")

        time.sleep(interval)

if __name__ == '__main__':
    send_events()
# === consumer.py ===
import boto3
import json
import time

kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
STREAM_NAME = 'clickstream-data'

def get_shard_ids():
    """ストリームの全シャードIDを返す"""
    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    return [
        shard['ShardId']
        for shard in response['StreamDescription']['Shards']
    ]

def process_records(records):
    """レコード処理ロジック"""
    page_views = {}
    for record in records:
        data = json.loads(record['Data'].decode('utf-8'))
        page = data.get('page', 'unknown')
        page_views[page] = page_views.get(page, 0) + 1

        # 異常行動検知の例
        if data.get('action') == 'submit' and data.get('page') == '/checkout':
            print(f"[ALERT] Checkout event: user={data['user_id']}")

    for page, count in page_views.items():
        print(f"  Page: {page}, Views: {count}")

def consume_stream():
    """ストリームからデータを読み取り処理"""
    shard_ids = get_shard_ids()
    print(f"Found {len(shard_ids)} shards")

    shard_iterators = {}
    for shard_id in shard_ids:
        response = kinesis.get_shard_iterator(
            StreamName=STREAM_NAME,
            ShardId=shard_id,
            ShardIteratorType='LATEST'
        )
        shard_iterators[shard_id] = response['ShardIterator']

    while True:
        for shard_id in shard_ids:
            try:
                response = kinesis.get_records(
                    ShardIterator=shard_iterators[shard_id],
                    Limit=100
                )

                if response['Records']:
                    print(f"\n--- Shard: {shard_id} ---")
                    print(f"Records received: {len(response['Records'])}")
                    process_records(response['Records'])

                shard_iterators[shard_id] = response['NextShardIterator']

            except kinesis.exceptions.ExpiredIteratorException:
                response = kinesis.get_shard_iterator(
                    StreamName=STREAM_NAME,
                    ShardId=shard_id,
                    ShardIteratorType='LATEST'
                )
                shard_iterators[shard_id] = response['ShardIterator']

        time.sleep(1)

if __name__ == '__main__':
    consume_stream()

8. 主要制限事項と参考情報

項目制限
レコード最大サイズ1 MB
PutRecordsリクエストあたりの最大レコード数500個
PutRecordsリクエストあたりの最大サイズ5 MB
シャードあたりの書き込みスループット1 MB/秒 または 1,000レコード/秒
シャードあたりの読み取りスループット(共有)2 MB/秒、GetRecords毎秒5回
シャードあたりの読み取りスループット(拡張ファンアウト)コンシューマーあたり2 MB/秒
最大登録コンシューマー(拡張ファンアウト)ストリームあたり20個
データ保持24時間(デフォルト)〜365日
ストリームあたりの最大シャード数デフォルト500(上限緩和申請可能)

9. まとめ

AWS Kinesisファミリーは、リアルタイムデータストリーミングのための包括的なソリューションを提供します。

  • Kinesis Data Streams: リアルタイムデータ収集と処理の中核。シャードベースのアーキテクチャで スケーラブルであり、KCLと拡張ファンアウトを通じて多様なコンシューマーパターンをサポートします。
  • Amazon Data Firehose: データをS3、Redshift、OpenSearch等に自動的に配信するパイプライン。 シャード管理なしで簡単に使用できます。
  • Managed Flink: Apache Flinkベースのマネージドストリーム分析サービスで、 SQLとコードベースの分析の両方をサポートします。
  • Video Streams: ビデオ/メディアデータの取込、保存、再生のための専門サービスです。

次の記事では、Kinesisの実践アーキテクチャパターンとApache Kafka、SQSとの比較を取り上げます。