Skip to content
Published on

[AWS] Complete Guide to Kinesis: Real-Time Streaming Data Processing

Authors

1. What is Streaming Data

1.1 Batch Processing vs Stream Processing

Traditional data processing relied on batch methods -- collecting data over a period and processing it all at once. But modern applications demand real-time responsiveness.

Batch Processing
============================
[Data Collection] --> [Storage] --> [Periodic Processing] --> [Result]
     |                                    |
     +---- Minutes to Hours --------------+

Stream Processing
==================================
[Data Generation] --> [Stream] --> [Immediate Processing] --> [Result]
     |                                   |
     +---- Milliseconds to Seconds ------+

1.2 Streaming Data Use Cases

  • Real-time log analysis: Collect server logs instantly for anomaly detection
  • IoT sensor data: Telemetry data from millions of devices
  • Clickstream analysis: Track user behavior in real-time for personalization
  • Financial transaction monitoring: Real-time transaction analysis for fraud detection
  • Social media feeds: Real-time trend and sentiment analysis

2. AWS Kinesis Family Overview

AWS Kinesis is a collection of fully managed services for collecting, processing, and analyzing real-time streaming data.

+------------------------------------------------------------------+
|                    AWS Kinesis Family                              |
+------------------------------------------------------------------+
|                                                                    |
|  +------------------+  +------------------+  +------------------+  |
|  | Kinesis Data     |  | Amazon Data      |  | Managed Service  |  |
|  | Streams          |  | Firehose         |  | for Apache Flink |  |
|  |                  |  |                  |  |                  |  |
|  | Real-time data   |  | Delivery         |  | Stream analytics |  |
|  | streaming        |  | pipeline         |  | (SQL, Java,      |  |
|  |                  |  | (S3, Redshift,   |  |  Python, Scala)  |  |
|  |                  |  |  OpenSearch etc.) |  |                  |  |
|  +------------------+  +------------------+  +------------------+  |
|                                                                    |
|  +------------------+                                              |
|  | Kinesis Video    |                                              |
|  | Streams          |                                              |
|  |                  |                                              |
|  | Video streaming  |                                              |
|  | ingestion &      |                                              |
|  | analysis         |                                              |
|  +------------------+                                              |
+------------------------------------------------------------------+
ServicePrimary UseData TypeLatency
Data StreamsReal-time data collection/processingRecords (bytes)Real-time (ms)
Data FirehoseData delivery pipelineRecords (bytes)Near real-time (60s+)
Managed FlinkStream analytics/transformationStream dataReal-time (ms)
Video StreamsVideo ingestion/playbackMedia framesReal-time (1-10s)

3. Kinesis Data Streams Deep Dive

3.1 Core Architecture

Kinesis Data Streams is the backbone of large-scale real-time data streaming services.

                        Kinesis Data Stream
+----------------------------------------------------------------------+
|                                                                      |
|  Producer        Shard 1          Shard 2          Shard 3           |
|  --------     +----------+     +----------+     +----------+         |
|  |App A | --> |Record 1  | --> |Record 4  | --> |Record 7  |         |
|  |App B | --> |Record 2  | --> |Record 5  | --> |Record 8  |         |
|  |App C | --> |Record 3  | --> |Record 6  | --> |Record 9  |         |
|  --------     +----------+     +----------+     +----------+         |
|                    |                |                |                |
|                    v                v                v                |
|               Consumer A       Consumer A       Consumer A           |
|               Consumer B       Consumer B       Consumer B           |
+----------------------------------------------------------------------+

3.2 Core Components

Stream

  • A logical group of shards
  • The unit where data record ordering is guaranteed

Shard

  • The base throughput unit of a stream
  • Write: 1MB/s or 1,000 records per second
  • Read: 2MB/s (shared fan-out), 2MB/s per consumer (enhanced fan-out)

Record

  • The basic unit of data
  • Composed of a partition key, sequence number, and data blob (up to 1MB)

Partition Key

  • Determines which shard receives a record
  • Uses MD5 hashing to map to shards
  • Records with the same partition key are stored in order on the same shard

Sequence Number

  • A unique identifier automatically assigned to each record
  • Guarantees record ordering within a shard
Partition Key Hashing Process
==============================

Partition Key: "user-123"
         |
         v
    MD5("user-123") = 0x7A3B...
         |
         v
    Hash Range: 0 ~ 2^128 - 1
         |
         v
    Shard 1: [0 ~ 2^127]          <-- Mapped to this range
    Shard 2: [2^127 ~ 2^128 - 1]

3.3 Producers

There are several ways to send data to a Kinesis stream.

1) PutRecord / PutRecords API

The most basic approach.

import boto3
import json

kinesis = boto3.client('kinesis', region_name='us-east-1')

# Single record
response = kinesis.put_record(
    StreamName='my-data-stream',
    Data=json.dumps({
        'event_type': 'page_view',
        'user_id': 'user-123',
        'page': '/products/laptop',
        'timestamp': '2026-03-20T10:30:00Z'
    }).encode('utf-8'),
    PartitionKey='user-123'
)
print(f"Shard ID: {response['ShardId']}")
print(f"Sequence Number: {response['SequenceNumber']}")

# Batch records
records = []
for i in range(100):
    records.append({
        'Data': json.dumps({
            'event_type': 'click',
            'user_id': f'user-{i % 10}',
            'element': 'buy_button',
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8'),
        'PartitionKey': f'user-{i % 10}'
    })

response = kinesis.put_records(
    StreamName='my-data-stream',
    Records=records
)
print(f"Failed records: {response['FailedRecordCount']}")

2) KPL (Kinesis Producer Library)

A high-performance producing library.

  • Record Aggregation: Bundles multiple small records into a single Kinesis record
  • Record Collection: Groups multiple Kinesis records into a single PutRecords call
  • Automatic Retries: Automatically retransmits failed records
  • CloudWatch Metrics: Automatically publishes performance metrics
KPL Aggregation Process
========================

User Record A (100 bytes) --+
User Record B (200 bytes) --+--> Kinesis Record (500 bytes)
User Record C (200 bytes) --+         |
                                      v
User Record D (300 bytes) --+    PutRecords API
User Record E (400 bytes) --+--> (single call)
User Record F (100 bytes) --+

3) AWS SDK Direct Usage

You can call APIs directly through AWS SDKs in various languages.

4) Kinesis Agent

An agent installed on servers that automatically sends log files to the stream.

3.4 Consumers

1) Shared Fan-Out - GetRecords

The default consumer method where all consumers share 2MB/s read throughput per shard.

import boto3
import json
import time

kinesis = boto3.client('kinesis', region_name='us-east-1')

# Get shard iterator
response = kinesis.get_shard_iterator(
    StreamName='my-data-stream',
    ShardId='shardId-000000000000',
    ShardIteratorType='LATEST'  # TRIM_HORIZON, AT_SEQUENCE_NUMBER, AT_TIMESTAMP, etc.
)
shard_iterator = response['ShardIterator']

# Poll for records
while True:
    response = kinesis.get_records(
        ShardIterator=shard_iterator,
        Limit=100
    )

    for record in response['Records']:
        data = json.loads(record['Data'].decode('utf-8'))
        print(f"Partition Key: {record['PartitionKey']}")
        print(f"Sequence Number: {record['SequenceNumber']}")
        print(f"Data: {data}")

    shard_iterator = response['NextShardIterator']

    # GetRecords can be called up to 5 times per second per shard
    time.sleep(0.2)

2) Enhanced Fan-Out - SubscribeToShard

A push-based delivery method using HTTP/2.

Shared Fan-Out vs Enhanced Fan-Out
=====================================

Shared Fan-Out:
+--------+     +--------+
| Shard  | --> | 2MB/s  | --> Consumer A (polling)
|        |     |(shared)| --> Consumer B (polling)
+--------+     +--------+     Consumer C (polling)

3 consumers share 2MB/s
Each consumer: ~0.67 MB/s

Enhanced Fan-Out:
+--------+     +--------+
| Shard  | --> | 2MB/s  | --> Consumer A (HTTP/2 push)
|        |     | 2MB/s  | --> Consumer B (HTTP/2 push)
|        |     | 2MB/s  | --> Consumer C (HTTP/2 push)
+--------+     +--------+

Each consumer gets dedicated 2MB/s
Latency: ~70ms (shared: ~200ms+)

3.5 KCL (Kinesis Client Library)

KCL simplifies the development of distributed consumer applications.

Core Features:

  • Lease Management: Tracks per-shard ownership using a DynamoDB table
  • Checkpointing: Stores processing progress in DynamoDB for failure recovery
  • Automatic Load Balancing: Redistributes shards based on the number of workers
  • Resharding Handling: Automatically responds to shard splits and merges
KCL Architecture
=================

                DynamoDB (Lease Table)
                +---------------------+
                | Shard ID | Worker   |
                |----------|----------|
                | shard-0  | worker-1 |
                | shard-1  | worker-2 |
                | shard-2  | worker-1 |
                | shard-3  | worker-2 |
                +---------------------+
                      ^         ^
                      |         |
              +-------+         +-------+
              |                         |
        +-----------+           +-----------+
        | Worker 1  |           | Worker 2  |
        | (EC2/ECS) |           | (EC2/ECS) |
        |           |           |           |
        | shard-0   |           | shard-1   |
        | shard-2   |           | shard-3   |
        +-----------+           +-----------+

3.6 Data Retention

  • Default: 24 hours
  • Maximum: 365 days
  • Longer retention increases cost
  • Extend retention when data replay is needed

3.7 Shard Splitting and Merging

Shard Split
============
Shard 1 [0 ~ 100]
         |
    split at 50
         |
    +----+----+
    |         |
Shard 3    Shard 4
[0 ~ 50]  [51 ~ 100]

Shard Merge
============
Shard 3 [0 ~ 50]     --> Shard 5
Shard 4 [51 ~ 100]   --> [0 ~ 100]
  • Split: Used to distribute load from a hot shard
  • Merge: Used to combine two adjacent shards with reduced traffic
  • Handled automatically in on-demand mode

3.8 Capacity Modes

Provisioned Mode

  • You specify the number of shards
  • Suitable for predictable workloads
  • Billed per shard-hour

On-Demand Mode

  • Automatically adjusts shard count
  • Suitable for unpredictable workloads
  • Billed based on data volume
  • New in 2025: On-Demand Advantage mode (60% cheaper data usage rates)

4. Amazon Data Firehose (formerly Kinesis Data Firehose)

4.1 Overview

Amazon Data Firehose is a fully managed service for reliably delivering streaming data to data lakes, data stores, and analytics services.

Amazon Data Firehose Architecture
====================================

+----------+     +-----------+     +------------+     +----------+
| Source   |     | Firehose  |     | Transform  |     | Dest.    |
|          | --> | Delivery  | --> | (optional) | --> |          |
| - Direct |     | Stream    |     | - Lambda   |     | - S3     |
| - Kinesis|     |           |     | - Format   |     | - Redshift|
|   Data   |     | Buffering:|     | - Compress |     | - Open-  |
|   Streams|     | - Size    |     | - Encrypt  |     |   Search |
| - MSK    |     | - Time    |     |            |     | - Splunk |
|          |     |           |     |            |     | - HTTP   |
+----------+     +-----------+     +------------+     +----------+
                                                           |
                                                      +----------+
                                                      | Backup S3|
                                                      | (source) |
                                                      +----------+

4.2 Key Features

No Shard Management

  • Unlike Data Streams, no need to manage shards directly
  • Scales automatically

Buffering Settings

  • Size-based: 1MB to 128MB
  • Time-based: 60 seconds to 900 seconds
  • Delivers when either condition is met

Data Transformation

  • Custom transformations via Lambda functions
  • Automatic conversion to columnar formats like Apache Parquet and ORC
  • Gzip, Snappy, Zip compression support
  • SSE-S3 or SSE-KMS encryption

4.3 Firehose Delivery Destinations

DestinationCharacteristics
Amazon S3Most common, Parquet/ORC conversion available
Amazon RedshiftVia S3, then loaded with COPY command
Amazon OpenSearchLog analysis, search indexing
SplunkSecurity monitoring, operational logs
HTTP EndpointsCustom destinations, Datadog, etc.
SnowflakeCloud data warehouse
Apache IcebergOpen table format

4.4 Code Example: Firehose Delivery

import boto3
import json

firehose = boto3.client('firehose', region_name='us-east-1')

# Single record
response = firehose.put_record(
    DeliveryStreamName='my-firehose-stream',
    Record={
        'Data': json.dumps({
            'event_type': 'purchase',
            'user_id': 'user-456',
            'product': 'laptop',
            'amount': 1299.99,
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8')
    }
)

# Batch delivery
records = []
for i in range(50):
    records.append({
        'Data': json.dumps({
            'event_type': 'page_view',
            'user_id': f'user-{i}',
            'page': f'/product/{i}',
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8')
    })

response = firehose.put_record_batch(
    DeliveryStreamName='my-firehose-stream',
    Records=records
)
print(f"Failed records: {response['FailedPutCount']}")

5. Kinesis Video Streams

5.1 Overview

Kinesis Video Streams is a fully managed service for securely ingesting and playing back video and media data from cameras, RADAR, LIDAR, drones, and more.

5.2 Key Features

Kinesis Video Streams Architecture
=====================================

+----------+     +------------------+     +------------------+
| Devices  |     | Kinesis Video    |     | Consumers        |
|          | --> | Streams          | --> |                  |
| - Camera |     |                  |     | - HLS Playback   |
| - Drone  |     | - Auto scaling   |     | - DASH Playback  |
| - LIDAR  |     | - Durable storage|     | - GetMedia API   |
| - Smart  |     | - Encryption     |     | - Rekognition    |
|   phone  |     |                  |     | - SageMaker      |
+----------+     +------------------+     +------------------+
  • HLS (HTTP Live Streaming): Live and archive playback on web browsers and mobile
  • WebRTC: Ultra-low latency bidirectional media streaming
  • Amazon Rekognition Integration: Face recognition, object detection, and computer vision analytics
  • Storage Tiers: Hot storage (real-time access) and warm storage (cost-effective retention)

6. Pricing Model

6.1 Kinesis Data Streams Pricing

Provisioned Mode:

ItemCost (US East)
Shard hour~0.015 USD/shard/hour
PUT payload unit (25KB)~0.014 USD/million units
Enhanced fan-out data retrieval~0.013 USD/GB
Enhanced fan-out consumer shard hour~0.015 USD/consumer/shard/hour
Extended retention (beyond 24h)~0.023 USD/shard/hour

On-Demand Mode:

ItemCost (US East)
Stream hour (Standard)~0.04 USD/hour
Data write (Standard)~0.08 USD/GB
Data read (Standard)~0.04 USD/GB
Data write (Advantage)~0.032 USD/GB
Data read (Advantage)~0.016 USD/GB

6.2 Amazon Data Firehose Pricing

ItemCost (US East)
Data ingestion (first 500TB/month)~0.029 USD/GB
Format conversion~0.018 USD/GB
VPC delivery~0.01 USD/GB + hourly rate

7. End-to-End Architecture Example: Real-Time Analytics Pipeline

Real-Time Analytics Pipeline Architecture
============================================

+----------+   +---------+   +----------+   +---------+   +----------+
| Web/     |   | Kinesis |   | Lambda   |   | Firehose|   | S3       |
| Mobile   |-->| Data    |-->|(Transform|-->| Delivery|-->| (Data    |
| App      |   | Streams |   | /Filter) |   | Stream  |   |  Lake)   |
+----------+   +---------+   +----------+   +---------+   +----------+
                    |                                           |
                    v                                           v
              +-----------+                              +----------+
              | Managed   |                              | Athena   |
              | Flink     |                              | (Ad-hoc  |
              | (Real-time|                              |  Queries)|
              |  analysis)|                              +----------+
              +-----------+                                    |
                    |                                          v
                    v                                    +----------+
              +-----------+                              | Quick-   |
              | DynamoDB  |                              | Sight    |
              | (Real-time|                              | (BI      |
              |  dashboard)|                             |Dashboard)|
              +-----------+                              +----------+

7.1 Complete Producer/Consumer Example

# === producer.py ===
import boto3
import json
import time
import random
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = 'clickstream-data'

def generate_click_event():
    """Generate a clickstream event"""
    pages = ['/home', '/products', '/cart', '/checkout', '/profile']
    actions = ['view', 'click', 'scroll', 'submit']
    user_id = f'user-{random.randint(1, 1000)}'

    return {
        'user_id': user_id,
        'page': random.choice(pages),
        'action': random.choice(actions),
        'session_id': f'sess-{random.randint(1, 100)}',
        'timestamp': datetime.utcnow().isoformat() + 'Z',
        'device': random.choice(['mobile', 'desktop', 'tablet']),
        'country': random.choice(['KR', 'US', 'JP', 'DE'])
    }

def send_events(batch_size=50, interval=1.0):
    """Send events to Kinesis stream"""
    while True:
        records = []
        for _ in range(batch_size):
            event = generate_click_event()
            records.append({
                'Data': json.dumps(event).encode('utf-8'),
                'PartitionKey': event['user_id']
            })

        try:
            response = kinesis.put_records(
                StreamName=STREAM_NAME,
                Records=records
            )

            failed = response['FailedRecordCount']
            if failed > 0:
                print(f"Warning: {failed} records failed")

                for i, record_response in enumerate(response['Records']):
                    if 'ErrorCode' in record_response:
                        print(f"  Error: {record_response['ErrorCode']}")
            else:
                print(f"Sent {batch_size} records successfully")

        except Exception as e:
            print(f"Error: {e}")

        time.sleep(interval)

if __name__ == '__main__':
    send_events()
# === consumer.py ===
import boto3
import json
import time

kinesis = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = 'clickstream-data'

def get_shard_ids():
    """Return all shard IDs in the stream"""
    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    return [
        shard['ShardId']
        for shard in response['StreamDescription']['Shards']
    ]

def process_records(records):
    """Record processing logic"""
    page_views = {}
    for record in records:
        data = json.loads(record['Data'].decode('utf-8'))
        page = data.get('page', 'unknown')
        page_views[page] = page_views.get(page, 0) + 1

        # Anomaly detection example
        if data.get('action') == 'submit' and data.get('page') == '/checkout':
            print(f"[ALERT] Checkout event: user={data['user_id']}")

    for page, count in page_views.items():
        print(f"  Page: {page}, Views: {count}")

def consume_stream():
    """Read and process data from the stream"""
    shard_ids = get_shard_ids()
    print(f"Found {len(shard_ids)} shards")

    shard_iterators = {}
    for shard_id in shard_ids:
        response = kinesis.get_shard_iterator(
            StreamName=STREAM_NAME,
            ShardId=shard_id,
            ShardIteratorType='LATEST'
        )
        shard_iterators[shard_id] = response['ShardIterator']

    while True:
        for shard_id in shard_ids:
            try:
                response = kinesis.get_records(
                    ShardIterator=shard_iterators[shard_id],
                    Limit=100
                )

                if response['Records']:
                    print(f"\n--- Shard: {shard_id} ---")
                    print(f"Records received: {len(response['Records'])}")
                    process_records(response['Records'])

                shard_iterators[shard_id] = response['NextShardIterator']

            except kinesis.exceptions.ExpiredIteratorException:
                response = kinesis.get_shard_iterator(
                    StreamName=STREAM_NAME,
                    ShardId=shard_id,
                    ShardIteratorType='LATEST'
                )
                shard_iterators[shard_id] = response['ShardIterator']

        time.sleep(1)

if __name__ == '__main__':
    consume_stream()

8. Key Limits and Notes

ItemLimit
Maximum record size1 MB
Maximum records per PutRecords request500
Maximum PutRecords request size5 MB
Write throughput per shard1 MB/s or 1,000 records/s
Read throughput per shard (shared)2 MB/s, GetRecords 5 calls/s
Read throughput per shard (enhanced fan-out)2 MB/s per consumer
Maximum registered consumers (enhanced fan-out)20 per stream
Data retention24 hours (default) to 365 days
Maximum shards per stream500 default (increase requestable)

9. Summary

The AWS Kinesis family provides a comprehensive solution for real-time data streaming.

  • Kinesis Data Streams: The core of real-time data collection and processing. Scalable shard-based architecture with diverse consumer patterns through KCL and enhanced fan-out.
  • Amazon Data Firehose: Automatic delivery pipeline to S3, Redshift, OpenSearch, and more. Easy to use with no shard management.
  • Managed Flink: A managed stream analytics service based on Apache Flink, supporting both SQL and code-based analytics.
  • Video Streams: A specialized service for video/media data ingestion, storage, and playback.

In the next post, we will cover practical Kinesis architecture patterns and comparisons with Apache Kafka and SQS.