1. What is Streaming Data
1.1 Batch Processing vs Stream Processing
Traditional data processing relied on batch methods -- collecting data over a period and processing it all at once.
But modern applications demand real-time responsiveness.
Batch Processing
============================
[Data Collection] --> [Storage] --> [Periodic Processing] --> [Result]
| |
+---- Minutes to Hours --------------+
Stream Processing
==================================
[Data Generation] --> [Stream] --> [Immediate Processing] --> [Result]
| |
+---- Milliseconds to Seconds ------+
1.2 Streaming Data Use Cases
- **Real-time log analysis**: Collect server logs instantly for anomaly detection
- **IoT sensor data**: Telemetry data from millions of devices
- **Clickstream analysis**: Track user behavior in real-time for personalization
- **Financial transaction monitoring**: Real-time transaction analysis for fraud detection
- **Social media feeds**: Real-time trend and sentiment analysis
2. AWS Kinesis Family Overview
AWS Kinesis is a collection of fully managed services for collecting, processing, and analyzing real-time streaming data.
+------------------------------------------------------------------+
| AWS Kinesis Family |
+------------------------------------------------------------------+
| |
| +------------------+ +------------------+ +------------------+ |
| | Kinesis Data | | Amazon Data | | Managed Service | |
| | Streams | | Firehose | | for Apache Flink | |
| | | | | | | |
| | Real-time data | | Delivery | | Stream analytics | |
| | streaming | | pipeline | | (SQL, Java, | |
| | | | (S3, Redshift, | | Python, Scala) | |
| | | | OpenSearch etc.) | | | |
| +------------------+ +------------------+ +------------------+ |
| |
| +------------------+ |
| | Kinesis Video | |
| | Streams | |
| | | |
| | Video streaming | |
| | ingestion & | |
| | analysis | |
| +------------------+ |
+------------------------------------------------------------------+
| Service | Primary Use | Data Type | Latency |
| ------------- | ------------------------------------ | --------------- | --------------------- |
| Data Streams | Real-time data collection/processing | Records (bytes) | Real-time (ms) |
| Data Firehose | Data delivery pipeline | Records (bytes) | Near real-time (60s+) |
| Managed Flink | Stream analytics/transformation | Stream data | Real-time (ms) |
| Video Streams | Video ingestion/playback | Media frames | Real-time (1-10s) |
3. Kinesis Data Streams Deep Dive
3.1 Core Architecture
Kinesis Data Streams is the backbone of large-scale real-time data streaming services.
Kinesis Data Stream
+----------------------------------------------------------------------+
| |
| Producer Shard 1 Shard 2 Shard 3 |
| -------- +----------+ +----------+ +----------+ |
| |App A | --> |Record 1 | --> |Record 4 | --> |Record 7 | |
| |App B | --> |Record 2 | --> |Record 5 | --> |Record 8 | |
| |App C | --> |Record 3 | --> |Record 6 | --> |Record 9 | |
| -------- +----------+ +----------+ +----------+ |
| | | | |
| v v v |
| Consumer A Consumer A Consumer A |
| Consumer B Consumer B Consumer B |
+----------------------------------------------------------------------+
3.2 Core Components
**Stream**
- A logical group of shards
- The unit where data record ordering is guaranteed
**Shard**
- The base throughput unit of a stream
- Write: 1MB/s or 1,000 records per second
- Read: 2MB/s (shared fan-out), 2MB/s per consumer (enhanced fan-out)
**Record**
- The basic unit of data
- Composed of a partition key, sequence number, and data blob (up to 1MB)
**Partition Key**
- Determines which shard receives a record
- Uses MD5 hashing to map to shards
- Records with the same partition key are stored in order on the same shard
**Sequence Number**
- A unique identifier automatically assigned to each record
- Guarantees record ordering within a shard
Partition Key Hashing Process
==============================
Partition Key: "user-123"
|
v
MD5("user-123") = 0x7A3B...
|
v
Hash Range: 0 ~ 2^128 - 1
|
v
Shard 1: [0 ~ 2^127] <-- Mapped to this range
Shard 2: [2^127 ~ 2^128 - 1]
3.3 Producers
There are several ways to send data to a Kinesis stream.
**1) PutRecord / PutRecords API**
The most basic approach.
kinesis = boto3.client('kinesis', region_name='us-east-1')
Single record
response = kinesis.put_record(
StreamName='my-data-stream',
Data=json.dumps({
'event_type': 'page_view',
'user_id': 'user-123',
'page': '/products/laptop',
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8'),
PartitionKey='user-123'
)
print(f"Shard ID: {response['ShardId']}")
print(f"Sequence Number: {response['SequenceNumber']}")
Batch records
records = []
for i in range(100):
records.append({
'Data': json.dumps({
'event_type': 'click',
'user_id': f'user-{i % 10}',
'element': 'buy_button',
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8'),
'PartitionKey': f'user-{i % 10}'
})
response = kinesis.put_records(
StreamName='my-data-stream',
Records=records
)
print(f"Failed records: {response['FailedRecordCount']}")
**2) KPL (Kinesis Producer Library)**
A high-performance producing library.
- **Record Aggregation**: Bundles multiple small records into a single Kinesis record
- **Record Collection**: Groups multiple Kinesis records into a single PutRecords call
- **Automatic Retries**: Automatically retransmits failed records
- **CloudWatch Metrics**: Automatically publishes performance metrics
KPL Aggregation Process
========================
User Record A (100 bytes) --+
User Record B (200 bytes) --+--> Kinesis Record (500 bytes)
User Record C (200 bytes) --+ |
v
User Record D (300 bytes) --+ PutRecords API
User Record E (400 bytes) --+--> (single call)
User Record F (100 bytes) --+
**3) AWS SDK Direct Usage**
You can call APIs directly through AWS SDKs in various languages.
**4) Kinesis Agent**
An agent installed on servers that automatically sends log files to the stream.
3.4 Consumers
**1) Shared Fan-Out - GetRecords**
The default consumer method where all consumers share 2MB/s read throughput per shard.
kinesis = boto3.client('kinesis', region_name='us-east-1')
Get shard iterator
response = kinesis.get_shard_iterator(
StreamName='my-data-stream',
ShardId='shardId-000000000000',
ShardIteratorType='LATEST' # TRIM_HORIZON, AT_SEQUENCE_NUMBER, AT_TIMESTAMP, etc.
)
shard_iterator = response['ShardIterator']
Poll for records
while True:
response = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in response['Records']:
data = json.loads(record['Data'].decode('utf-8'))
print(f"Partition Key: {record['PartitionKey']}")
print(f"Sequence Number: {record['SequenceNumber']}")
print(f"Data: {data}")
shard_iterator = response['NextShardIterator']
GetRecords can be called up to 5 times per second per shard
time.sleep(0.2)
**2) Enhanced Fan-Out - SubscribeToShard**
A push-based delivery method using HTTP/2.
Shared Fan-Out vs Enhanced Fan-Out
=====================================
Shared Fan-Out:
+--------+ +--------+
| Shard | --> | 2MB/s | --> Consumer A (polling)
| | |(shared)| --> Consumer B (polling)
+--------+ +--------+ Consumer C (polling)
3 consumers share 2MB/s
Each consumer: ~0.67 MB/s
Enhanced Fan-Out:
+--------+ +--------+
| Shard | --> | 2MB/s | --> Consumer A (HTTP/2 push)
| | | 2MB/s | --> Consumer B (HTTP/2 push)
| | | 2MB/s | --> Consumer C (HTTP/2 push)
+--------+ +--------+
Each consumer gets dedicated 2MB/s
Latency: ~70ms (shared: ~200ms+)
3.5 KCL (Kinesis Client Library)
KCL simplifies the development of distributed consumer applications.
**Core Features:**
- **Lease Management**: Tracks per-shard ownership using a DynamoDB table
- **Checkpointing**: Stores processing progress in DynamoDB for failure recovery
- **Automatic Load Balancing**: Redistributes shards based on the number of workers
- **Resharding Handling**: Automatically responds to shard splits and merges
KCL Architecture
=================
DynamoDB (Lease Table)
+---------------------+
| Shard ID | Worker |
|----------|----------|
| shard-0 | worker-1 |
| shard-1 | worker-2 |
| shard-2 | worker-1 |
| shard-3 | worker-2 |
+---------------------+
^ ^
| |
+-------+ +-------+
| |
+-----------+ +-----------+
| Worker 1 | | Worker 2 |
| (EC2/ECS) | | (EC2/ECS) |
| | | |
| shard-0 | | shard-1 |
| shard-2 | | shard-3 |
+-----------+ +-----------+
3.6 Data Retention
- **Default**: 24 hours
- **Maximum**: 365 days
- Longer retention increases cost
- Extend retention when data replay is needed
3.7 Shard Splitting and Merging
Shard Split
============
Shard 1 [0 ~ 100]
|
split at 50
|
+----+----+
| |
Shard 3 Shard 4
[0 ~ 50] [51 ~ 100]
Shard Merge
============
Shard 3 [0 ~ 50] --> Shard 5
Shard 4 [51 ~ 100] --> [0 ~ 100]
- **Split**: Used to distribute load from a hot shard
- **Merge**: Used to combine two adjacent shards with reduced traffic
- Handled automatically in on-demand mode
3.8 Capacity Modes
**Provisioned Mode**
- You specify the number of shards
- Suitable for predictable workloads
- Billed per shard-hour
**On-Demand Mode**
- Automatically adjusts shard count
- Suitable for unpredictable workloads
- Billed based on data volume
- New in 2025: On-Demand Advantage mode (60% cheaper data usage rates)
4. Amazon Data Firehose (formerly Kinesis Data Firehose)
4.1 Overview
Amazon Data Firehose is a fully managed service for reliably delivering streaming data to data lakes,
data stores, and analytics services.
Amazon Data Firehose Architecture
====================================
+----------+ +-----------+ +------------+ +----------+
| Source | | Firehose | | Transform | | Dest. |
| | --> | Delivery | --> | (optional) | --> | |
| - Direct | | Stream | | - Lambda | | - S3 |
| - Kinesis| | | | - Format | | - Redshift|
| Data | | Buffering:| | - Compress | | - Open- |
| Streams| | - Size | | - Encrypt | | Search |
| - MSK | | - Time | | | | - Splunk |
| | | | | | | - HTTP |
+----------+ +-----------+ +------------+ +----------+
|
+----------+
| Backup S3|
| (source) |
+----------+
4.2 Key Features
**No Shard Management**
- Unlike Data Streams, no need to manage shards directly
- Scales automatically
**Buffering Settings**
- **Size-based**: 1MB to 128MB
- **Time-based**: 60 seconds to 900 seconds
- Delivers when either condition is met
**Data Transformation**
- Custom transformations via Lambda functions
- Automatic conversion to columnar formats like Apache Parquet and ORC
- Gzip, Snappy, Zip compression support
- SSE-S3 or SSE-KMS encryption
4.3 Firehose Delivery Destinations
| Destination | Characteristics |
| ----------------- | --------------------------------------------- |
| Amazon S3 | Most common, Parquet/ORC conversion available |
| Amazon Redshift | Via S3, then loaded with COPY command |
| Amazon OpenSearch | Log analysis, search indexing |
| Splunk | Security monitoring, operational logs |
| HTTP Endpoints | Custom destinations, Datadog, etc. |
| Snowflake | Cloud data warehouse |
| Apache Iceberg | Open table format |
4.4 Code Example: Firehose Delivery
firehose = boto3.client('firehose', region_name='us-east-1')
Single record
response = firehose.put_record(
DeliveryStreamName='my-firehose-stream',
Record={
'Data': json.dumps({
'event_type': 'purchase',
'user_id': 'user-456',
'product': 'laptop',
'amount': 1299.99,
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8')
}
)
Batch delivery
records = []
for i in range(50):
records.append({
'Data': json.dumps({
'event_type': 'page_view',
'user_id': f'user-{i}',
'page': f'/product/{i}',
'timestamp': '2026-03-20T10:30:00Z'
}).encode('utf-8')
})
response = firehose.put_record_batch(
DeliveryStreamName='my-firehose-stream',
Records=records
)
print(f"Failed records: {response['FailedPutCount']}")
5. Kinesis Video Streams
5.1 Overview
Kinesis Video Streams is a fully managed service for securely ingesting and playing back video and media data
from cameras, RADAR, LIDAR, drones, and more.
5.2 Key Features
Kinesis Video Streams Architecture
=====================================
+----------+ +------------------+ +------------------+
| Devices | | Kinesis Video | | Consumers |
| | --> | Streams | --> | |
| - Camera | | | | - HLS Playback |
| - Drone | | - Auto scaling | | - DASH Playback |
| - LIDAR | | - Durable storage| | - GetMedia API |
| - Smart | | - Encryption | | - Rekognition |
| phone | | | | - SageMaker |
+----------+ +------------------+ +------------------+
- **HLS (HTTP Live Streaming)**: Live and archive playback on web browsers and mobile
- **WebRTC**: Ultra-low latency bidirectional media streaming
- **Amazon Rekognition Integration**: Face recognition, object detection, and computer vision analytics
- **Storage Tiers**: Hot storage (real-time access) and warm storage (cost-effective retention)
6. Pricing Model
6.1 Kinesis Data Streams Pricing
**Provisioned Mode:**
| Item | Cost (US East) |
| ------------------------------------ | ------------------------------ |
| Shard hour | ~0.015 USD/shard/hour |
| PUT payload unit (25KB) | ~0.014 USD/million units |
| Enhanced fan-out data retrieval | ~0.013 USD/GB |
| Enhanced fan-out consumer shard hour | ~0.015 USD/consumer/shard/hour |
| Extended retention (beyond 24h) | ~0.023 USD/shard/hour |
**On-Demand Mode:**
| Item | Cost (US East) |
| ---------------------- | -------------- |
| Stream hour (Standard) | ~0.04 USD/hour |
| Data write (Standard) | ~0.08 USD/GB |
| Data read (Standard) | ~0.04 USD/GB |
| Data write (Advantage) | ~0.032 USD/GB |
| Data read (Advantage) | ~0.016 USD/GB |
6.2 Amazon Data Firehose Pricing
| Item | Cost (US East) |
| ---------------------------------- | -------------------------- |
| Data ingestion (first 500TB/month) | ~0.029 USD/GB |
| Format conversion | ~0.018 USD/GB |
| VPC delivery | ~0.01 USD/GB + hourly rate |
7. End-to-End Architecture Example: Real-Time Analytics Pipeline
Real-Time Analytics Pipeline Architecture
============================================
+----------+ +---------+ +----------+ +---------+ +----------+
| Web/ | | Kinesis | | Lambda | | Firehose| | S3 |
| Mobile |-->| Data |-->|(Transform|-->| Delivery|-->| (Data |
| App | | Streams | | /Filter) | | Stream | | Lake) |
+----------+ +---------+ +----------+ +---------+ +----------+
| |
v v
+-----------+ +----------+
| Managed | | Athena |
| Flink | | (Ad-hoc |
| (Real-time| | Queries)|
| analysis)| +----------+
+-----------+ |
| v
v +----------+
+-----------+ | Quick- |
| DynamoDB | | Sight |
| (Real-time| | (BI |
| dashboard)| |Dashboard)|
+-----------+ +----------+
7.1 Complete Producer/Consumer Example
=== producer.py ===
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = 'clickstream-data'
def generate_click_event():
"""Generate a clickstream event"""
pages = ['/home', '/products', '/cart', '/checkout', '/profile']
actions = ['view', 'click', 'scroll', 'submit']
user_id = f'user-{random.randint(1, 1000)}'
return {
'user_id': user_id,
'page': random.choice(pages),
'action': random.choice(actions),
'session_id': f'sess-{random.randint(1, 100)}',
'timestamp': datetime.utcnow().isoformat() + 'Z',
'device': random.choice(['mobile', 'desktop', 'tablet']),
'country': random.choice(['KR', 'US', 'JP', 'DE'])
}
def send_events(batch_size=50, interval=1.0):
"""Send events to Kinesis stream"""
while True:
records = []
for _ in range(batch_size):
event = generate_click_event()
records.append({
'Data': json.dumps(event).encode('utf-8'),
'PartitionKey': event['user_id']
})
try:
response = kinesis.put_records(
StreamName=STREAM_NAME,
Records=records
)
failed = response['FailedRecordCount']
if failed > 0:
print(f"Warning: {failed} records failed")
for i, record_response in enumerate(response['Records']):
if 'ErrorCode' in record_response:
print(f" Error: {record_response['ErrorCode']}")
else:
print(f"Sent {batch_size} records successfully")
except Exception as e:
print(f"Error: {e}")
time.sleep(interval)
if __name__ == '__main__':
send_events()
=== consumer.py ===
kinesis = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = 'clickstream-data'
def get_shard_ids():
"""Return all shard IDs in the stream"""
response = kinesis.describe_stream(StreamName=STREAM_NAME)
return [
shard['ShardId']
for shard in response['StreamDescription']['Shards']
]
def process_records(records):
"""Record processing logic"""
page_views = {}
for record in records:
data = json.loads(record['Data'].decode('utf-8'))
page = data.get('page', 'unknown')
page_views[page] = page_views.get(page, 0) + 1
Anomaly detection example
if data.get('action') == 'submit' and data.get('page') == '/checkout':
print(f"[ALERT] Checkout event: user={data['user_id']}")
for page, count in page_views.items():
print(f" Page: {page}, Views: {count}")
def consume_stream():
"""Read and process data from the stream"""
shard_ids = get_shard_ids()
print(f"Found {len(shard_ids)} shards")
shard_iterators = {}
for shard_id in shard_ids:
response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType='LATEST'
)
shard_iterators[shard_id] = response['ShardIterator']
while True:
for shard_id in shard_ids:
try:
response = kinesis.get_records(
ShardIterator=shard_iterators[shard_id],
Limit=100
)
if response['Records']:
print(f"\n--- Shard: {shard_id} ---")
print(f"Records received: {len(response['Records'])}")
process_records(response['Records'])
shard_iterators[shard_id] = response['NextShardIterator']
except kinesis.exceptions.ExpiredIteratorException:
response = kinesis.get_shard_iterator(
StreamName=STREAM_NAME,
ShardId=shard_id,
ShardIteratorType='LATEST'
)
shard_iterators[shard_id] = response['ShardIterator']
time.sleep(1)
if __name__ == '__main__':
consume_stream()
8. Key Limits and Notes
| Item | Limit |
| ----------------------------------------------- | ---------------------------------- |
| Maximum record size | 1 MB |
| Maximum records per PutRecords request | 500 |
| Maximum PutRecords request size | 5 MB |
| Write throughput per shard | 1 MB/s or 1,000 records/s |
| Read throughput per shard (shared) | 2 MB/s, GetRecords 5 calls/s |
| Read throughput per shard (enhanced fan-out) | 2 MB/s per consumer |
| Maximum registered consumers (enhanced fan-out) | 20 per stream |
| Data retention | 24 hours (default) to 365 days |
| Maximum shards per stream | 500 default (increase requestable) |
9. Summary
The AWS Kinesis family provides a comprehensive solution for real-time data streaming.
- **Kinesis Data Streams**: The core of real-time data collection and processing. Scalable shard-based
architecture with diverse consumer patterns through KCL and enhanced fan-out.
- **Amazon Data Firehose**: Automatic delivery pipeline to S3, Redshift, OpenSearch, and more.
Easy to use with no shard management.
- **Managed Flink**: A managed stream analytics service based on Apache Flink,
supporting both SQL and code-based analytics.
- **Video Streams**: A specialized service for video/media data ingestion, storage, and playback.
In the next post, we will cover practical Kinesis architecture patterns and comparisons with Apache Kafka and SQS.
현재 단락 (1/488)
Traditional data processing relied on batch methods -- collecting data over a period and processing ...