Skip to content

필사 모드: [AWS] Complete Guide to Kinesis: Real-Time Streaming Data Processing

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

1. What is Streaming Data

1.1 Batch Processing vs Stream Processing

Traditional data processing relied on batch methods -- collecting data over a period and processing it all at once.

But modern applications demand real-time responsiveness.

Batch Processing

============================

[Data Collection] --> [Storage] --> [Periodic Processing] --> [Result]

| |

+---- Minutes to Hours --------------+

Stream Processing

==================================

[Data Generation] --> [Stream] --> [Immediate Processing] --> [Result]

| |

+---- Milliseconds to Seconds ------+

1.2 Streaming Data Use Cases

- **Real-time log analysis**: Collect server logs instantly for anomaly detection

- **IoT sensor data**: Telemetry data from millions of devices

- **Clickstream analysis**: Track user behavior in real-time for personalization

- **Financial transaction monitoring**: Real-time transaction analysis for fraud detection

- **Social media feeds**: Real-time trend and sentiment analysis

2. AWS Kinesis Family Overview

AWS Kinesis is a collection of fully managed services for collecting, processing, and analyzing real-time streaming data.

+------------------------------------------------------------------+

| AWS Kinesis Family |

+------------------------------------------------------------------+

| |

| +------------------+ +------------------+ +------------------+ |

| | Kinesis Data | | Amazon Data | | Managed Service | |

| | Streams | | Firehose | | for Apache Flink | |

| | | | | | | |

| | Real-time data | | Delivery | | Stream analytics | |

| | streaming | | pipeline | | (SQL, Java, | |

| | | | (S3, Redshift, | | Python, Scala) | |

| | | | OpenSearch etc.) | | | |

| +------------------+ +------------------+ +------------------+ |

| |

| +------------------+ |

| | Kinesis Video | |

| | Streams | |

| | | |

| | Video streaming | |

| | ingestion & | |

| | analysis | |

| +------------------+ |

+------------------------------------------------------------------+

| Service | Primary Use | Data Type | Latency |

| ------------- | ------------------------------------ | --------------- | --------------------- |

| Data Streams | Real-time data collection/processing | Records (bytes) | Real-time (ms) |

| Data Firehose | Data delivery pipeline | Records (bytes) | Near real-time (60s+) |

| Managed Flink | Stream analytics/transformation | Stream data | Real-time (ms) |

| Video Streams | Video ingestion/playback | Media frames | Real-time (1-10s) |

3. Kinesis Data Streams Deep Dive

3.1 Core Architecture

Kinesis Data Streams is the backbone of large-scale real-time data streaming services.

Kinesis Data Stream

+----------------------------------------------------------------------+

| |

| Producer Shard 1 Shard 2 Shard 3 |

| -------- +----------+ +----------+ +----------+ |

| |App A | --> |Record 1 | --> |Record 4 | --> |Record 7 | |

| |App B | --> |Record 2 | --> |Record 5 | --> |Record 8 | |

| |App C | --> |Record 3 | --> |Record 6 | --> |Record 9 | |

| -------- +----------+ +----------+ +----------+ |

| | | | |

| v v v |

| Consumer A Consumer A Consumer A |

| Consumer B Consumer B Consumer B |

+----------------------------------------------------------------------+

3.2 Core Components

**Stream**

- A logical group of shards

- The unit where data record ordering is guaranteed

**Shard**

- The base throughput unit of a stream

- Write: 1MB/s or 1,000 records per second

- Read: 2MB/s (shared fan-out), 2MB/s per consumer (enhanced fan-out)

**Record**

- The basic unit of data

- Composed of a partition key, sequence number, and data blob (up to 1MB)

**Partition Key**

- Determines which shard receives a record

- Uses MD5 hashing to map to shards

- Records with the same partition key are stored in order on the same shard

**Sequence Number**

- A unique identifier automatically assigned to each record

- Guarantees record ordering within a shard

Partition Key Hashing Process

==============================

Partition Key: "user-123"

|

v

MD5("user-123") = 0x7A3B...

|

v

Hash Range: 0 ~ 2^128 - 1

|

v

Shard 1: [0 ~ 2^127] <-- Mapped to this range

Shard 2: [2^127 ~ 2^128 - 1]

3.3 Producers

There are several ways to send data to a Kinesis stream.

**1) PutRecord / PutRecords API**

The most basic approach.

kinesis = boto3.client('kinesis', region_name='us-east-1')

Single record

response = kinesis.put_record(

StreamName='my-data-stream',

Data=json.dumps({

'event_type': 'page_view',

'user_id': 'user-123',

'page': '/products/laptop',

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8'),

PartitionKey='user-123'

)

print(f"Shard ID: {response['ShardId']}")

print(f"Sequence Number: {response['SequenceNumber']}")

Batch records

records = []

for i in range(100):

records.append({

'Data': json.dumps({

'event_type': 'click',

'user_id': f'user-{i % 10}',

'element': 'buy_button',

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8'),

'PartitionKey': f'user-{i % 10}'

})

response = kinesis.put_records(

StreamName='my-data-stream',

Records=records

)

print(f"Failed records: {response['FailedRecordCount']}")

**2) KPL (Kinesis Producer Library)**

A high-performance producing library.

- **Record Aggregation**: Bundles multiple small records into a single Kinesis record

- **Record Collection**: Groups multiple Kinesis records into a single PutRecords call

- **Automatic Retries**: Automatically retransmits failed records

- **CloudWatch Metrics**: Automatically publishes performance metrics

KPL Aggregation Process

========================

User Record A (100 bytes) --+

User Record B (200 bytes) --+--> Kinesis Record (500 bytes)

User Record C (200 bytes) --+ |

v

User Record D (300 bytes) --+ PutRecords API

User Record E (400 bytes) --+--> (single call)

User Record F (100 bytes) --+

**3) AWS SDK Direct Usage**

You can call APIs directly through AWS SDKs in various languages.

**4) Kinesis Agent**

An agent installed on servers that automatically sends log files to the stream.

3.4 Consumers

**1) Shared Fan-Out - GetRecords**

The default consumer method where all consumers share 2MB/s read throughput per shard.

kinesis = boto3.client('kinesis', region_name='us-east-1')

Get shard iterator

response = kinesis.get_shard_iterator(

StreamName='my-data-stream',

ShardId='shardId-000000000000',

ShardIteratorType='LATEST' # TRIM_HORIZON, AT_SEQUENCE_NUMBER, AT_TIMESTAMP, etc.

)

shard_iterator = response['ShardIterator']

Poll for records

while True:

response = kinesis.get_records(

ShardIterator=shard_iterator,

Limit=100

)

for record in response['Records']:

data = json.loads(record['Data'].decode('utf-8'))

print(f"Partition Key: {record['PartitionKey']}")

print(f"Sequence Number: {record['SequenceNumber']}")

print(f"Data: {data}")

shard_iterator = response['NextShardIterator']

GetRecords can be called up to 5 times per second per shard

time.sleep(0.2)

**2) Enhanced Fan-Out - SubscribeToShard**

A push-based delivery method using HTTP/2.

Shared Fan-Out vs Enhanced Fan-Out

=====================================

Shared Fan-Out:

+--------+ +--------+

| Shard | --> | 2MB/s | --> Consumer A (polling)

| | |(shared)| --> Consumer B (polling)

+--------+ +--------+ Consumer C (polling)

3 consumers share 2MB/s

Each consumer: ~0.67 MB/s

Enhanced Fan-Out:

+--------+ +--------+

| Shard | --> | 2MB/s | --> Consumer A (HTTP/2 push)

| | | 2MB/s | --> Consumer B (HTTP/2 push)

| | | 2MB/s | --> Consumer C (HTTP/2 push)

+--------+ +--------+

Each consumer gets dedicated 2MB/s

Latency: ~70ms (shared: ~200ms+)

3.5 KCL (Kinesis Client Library)

KCL simplifies the development of distributed consumer applications.

**Core Features:**

- **Lease Management**: Tracks per-shard ownership using a DynamoDB table

- **Checkpointing**: Stores processing progress in DynamoDB for failure recovery

- **Automatic Load Balancing**: Redistributes shards based on the number of workers

- **Resharding Handling**: Automatically responds to shard splits and merges

KCL Architecture

=================

DynamoDB (Lease Table)

+---------------------+

| Shard ID | Worker |

|----------|----------|

| shard-0 | worker-1 |

| shard-1 | worker-2 |

| shard-2 | worker-1 |

| shard-3 | worker-2 |

+---------------------+

^ ^

| |

+-------+ +-------+

| |

+-----------+ +-----------+

| Worker 1 | | Worker 2 |

| (EC2/ECS) | | (EC2/ECS) |

| | | |

| shard-0 | | shard-1 |

| shard-2 | | shard-3 |

+-----------+ +-----------+

3.6 Data Retention

- **Default**: 24 hours

- **Maximum**: 365 days

- Longer retention increases cost

- Extend retention when data replay is needed

3.7 Shard Splitting and Merging

Shard Split

============

Shard 1 [0 ~ 100]

|

split at 50

|

+----+----+

| |

Shard 3 Shard 4

[0 ~ 50] [51 ~ 100]

Shard Merge

============

Shard 3 [0 ~ 50] --> Shard 5

Shard 4 [51 ~ 100] --> [0 ~ 100]

- **Split**: Used to distribute load from a hot shard

- **Merge**: Used to combine two adjacent shards with reduced traffic

- Handled automatically in on-demand mode

3.8 Capacity Modes

**Provisioned Mode**

- You specify the number of shards

- Suitable for predictable workloads

- Billed per shard-hour

**On-Demand Mode**

- Automatically adjusts shard count

- Suitable for unpredictable workloads

- Billed based on data volume

- New in 2025: On-Demand Advantage mode (60% cheaper data usage rates)

4. Amazon Data Firehose (formerly Kinesis Data Firehose)

4.1 Overview

Amazon Data Firehose is a fully managed service for reliably delivering streaming data to data lakes,

data stores, and analytics services.

Amazon Data Firehose Architecture

====================================

+----------+ +-----------+ +------------+ +----------+

| Source | | Firehose | | Transform | | Dest. |

| | --> | Delivery | --> | (optional) | --> | |

| - Direct | | Stream | | - Lambda | | - S3 |

| - Kinesis| | | | - Format | | - Redshift|

| Data | | Buffering:| | - Compress | | - Open- |

| Streams| | - Size | | - Encrypt | | Search |

| - MSK | | - Time | | | | - Splunk |

| | | | | | | - HTTP |

+----------+ +-----------+ +------------+ +----------+

|

+----------+

| Backup S3|

| (source) |

+----------+

4.2 Key Features

**No Shard Management**

- Unlike Data Streams, no need to manage shards directly

- Scales automatically

**Buffering Settings**

- **Size-based**: 1MB to 128MB

- **Time-based**: 60 seconds to 900 seconds

- Delivers when either condition is met

**Data Transformation**

- Custom transformations via Lambda functions

- Automatic conversion to columnar formats like Apache Parquet and ORC

- Gzip, Snappy, Zip compression support

- SSE-S3 or SSE-KMS encryption

4.3 Firehose Delivery Destinations

| Destination | Characteristics |

| ----------------- | --------------------------------------------- |

| Amazon S3 | Most common, Parquet/ORC conversion available |

| Amazon Redshift | Via S3, then loaded with COPY command |

| Amazon OpenSearch | Log analysis, search indexing |

| Splunk | Security monitoring, operational logs |

| HTTP Endpoints | Custom destinations, Datadog, etc. |

| Snowflake | Cloud data warehouse |

| Apache Iceberg | Open table format |

4.4 Code Example: Firehose Delivery

firehose = boto3.client('firehose', region_name='us-east-1')

Single record

response = firehose.put_record(

DeliveryStreamName='my-firehose-stream',

Record={

'Data': json.dumps({

'event_type': 'purchase',

'user_id': 'user-456',

'product': 'laptop',

'amount': 1299.99,

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8')

}

)

Batch delivery

records = []

for i in range(50):

records.append({

'Data': json.dumps({

'event_type': 'page_view',

'user_id': f'user-{i}',

'page': f'/product/{i}',

'timestamp': '2026-03-20T10:30:00Z'

}).encode('utf-8')

})

response = firehose.put_record_batch(

DeliveryStreamName='my-firehose-stream',

Records=records

)

print(f"Failed records: {response['FailedPutCount']}")

5. Kinesis Video Streams

5.1 Overview

Kinesis Video Streams is a fully managed service for securely ingesting and playing back video and media data

from cameras, RADAR, LIDAR, drones, and more.

5.2 Key Features

Kinesis Video Streams Architecture

=====================================

+----------+ +------------------+ +------------------+

| Devices | | Kinesis Video | | Consumers |

| | --> | Streams | --> | |

| - Camera | | | | - HLS Playback |

| - Drone | | - Auto scaling | | - DASH Playback |

| - LIDAR | | - Durable storage| | - GetMedia API |

| - Smart | | - Encryption | | - Rekognition |

| phone | | | | - SageMaker |

+----------+ +------------------+ +------------------+

- **HLS (HTTP Live Streaming)**: Live and archive playback on web browsers and mobile

- **WebRTC**: Ultra-low latency bidirectional media streaming

- **Amazon Rekognition Integration**: Face recognition, object detection, and computer vision analytics

- **Storage Tiers**: Hot storage (real-time access) and warm storage (cost-effective retention)

6. Pricing Model

6.1 Kinesis Data Streams Pricing

**Provisioned Mode:**

| Item | Cost (US East) |

| ------------------------------------ | ------------------------------ |

| Shard hour | ~0.015 USD/shard/hour |

| PUT payload unit (25KB) | ~0.014 USD/million units |

| Enhanced fan-out data retrieval | ~0.013 USD/GB |

| Enhanced fan-out consumer shard hour | ~0.015 USD/consumer/shard/hour |

| Extended retention (beyond 24h) | ~0.023 USD/shard/hour |

**On-Demand Mode:**

| Item | Cost (US East) |

| ---------------------- | -------------- |

| Stream hour (Standard) | ~0.04 USD/hour |

| Data write (Standard) | ~0.08 USD/GB |

| Data read (Standard) | ~0.04 USD/GB |

| Data write (Advantage) | ~0.032 USD/GB |

| Data read (Advantage) | ~0.016 USD/GB |

6.2 Amazon Data Firehose Pricing

| Item | Cost (US East) |

| ---------------------------------- | -------------------------- |

| Data ingestion (first 500TB/month) | ~0.029 USD/GB |

| Format conversion | ~0.018 USD/GB |

| VPC delivery | ~0.01 USD/GB + hourly rate |

7. End-to-End Architecture Example: Real-Time Analytics Pipeline

Real-Time Analytics Pipeline Architecture

============================================

+----------+ +---------+ +----------+ +---------+ +----------+

| Web/ | | Kinesis | | Lambda | | Firehose| | S3 |

| Mobile |-->| Data |-->|(Transform|-->| Delivery|-->| (Data |

| App | | Streams | | /Filter) | | Stream | | Lake) |

+----------+ +---------+ +----------+ +---------+ +----------+

| |

v v

+-----------+ +----------+

| Managed | | Athena |

| Flink | | (Ad-hoc |

| (Real-time| | Queries)|

| analysis)| +----------+

+-----------+ |

| v

v +----------+

+-----------+ | Quick- |

| DynamoDB | | Sight |

| (Real-time| | (BI |

| dashboard)| |Dashboard)|

+-----------+ +----------+

7.1 Complete Producer/Consumer Example

=== producer.py ===

from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

STREAM_NAME = 'clickstream-data'

def generate_click_event():

"""Generate a clickstream event"""

pages = ['/home', '/products', '/cart', '/checkout', '/profile']

actions = ['view', 'click', 'scroll', 'submit']

user_id = f'user-{random.randint(1, 1000)}'

return {

'user_id': user_id,

'page': random.choice(pages),

'action': random.choice(actions),

'session_id': f'sess-{random.randint(1, 100)}',

'timestamp': datetime.utcnow().isoformat() + 'Z',

'device': random.choice(['mobile', 'desktop', 'tablet']),

'country': random.choice(['KR', 'US', 'JP', 'DE'])

}

def send_events(batch_size=50, interval=1.0):

"""Send events to Kinesis stream"""

while True:

records = []

for _ in range(batch_size):

event = generate_click_event()

records.append({

'Data': json.dumps(event).encode('utf-8'),

'PartitionKey': event['user_id']

})

try:

response = kinesis.put_records(

StreamName=STREAM_NAME,

Records=records

)

failed = response['FailedRecordCount']

if failed > 0:

print(f"Warning: {failed} records failed")

for i, record_response in enumerate(response['Records']):

if 'ErrorCode' in record_response:

print(f" Error: {record_response['ErrorCode']}")

else:

print(f"Sent {batch_size} records successfully")

except Exception as e:

print(f"Error: {e}")

time.sleep(interval)

if __name__ == '__main__':

send_events()

=== consumer.py ===

kinesis = boto3.client('kinesis', region_name='us-east-1')

STREAM_NAME = 'clickstream-data'

def get_shard_ids():

"""Return all shard IDs in the stream"""

response = kinesis.describe_stream(StreamName=STREAM_NAME)

return [

shard['ShardId']

for shard in response['StreamDescription']['Shards']

]

def process_records(records):

"""Record processing logic"""

page_views = {}

for record in records:

data = json.loads(record['Data'].decode('utf-8'))

page = data.get('page', 'unknown')

page_views[page] = page_views.get(page, 0) + 1

Anomaly detection example

if data.get('action') == 'submit' and data.get('page') == '/checkout':

print(f"[ALERT] Checkout event: user={data['user_id']}")

for page, count in page_views.items():

print(f" Page: {page}, Views: {count}")

def consume_stream():

"""Read and process data from the stream"""

shard_ids = get_shard_ids()

print(f"Found {len(shard_ids)} shards")

shard_iterators = {}

for shard_id in shard_ids:

response = kinesis.get_shard_iterator(

StreamName=STREAM_NAME,

ShardId=shard_id,

ShardIteratorType='LATEST'

)

shard_iterators[shard_id] = response['ShardIterator']

while True:

for shard_id in shard_ids:

try:

response = kinesis.get_records(

ShardIterator=shard_iterators[shard_id],

Limit=100

)

if response['Records']:

print(f"\n--- Shard: {shard_id} ---")

print(f"Records received: {len(response['Records'])}")

process_records(response['Records'])

shard_iterators[shard_id] = response['NextShardIterator']

except kinesis.exceptions.ExpiredIteratorException:

response = kinesis.get_shard_iterator(

StreamName=STREAM_NAME,

ShardId=shard_id,

ShardIteratorType='LATEST'

)

shard_iterators[shard_id] = response['ShardIterator']

time.sleep(1)

if __name__ == '__main__':

consume_stream()

8. Key Limits and Notes

| Item | Limit |

| ----------------------------------------------- | ---------------------------------- |

| Maximum record size | 1 MB |

| Maximum records per PutRecords request | 500 |

| Maximum PutRecords request size | 5 MB |

| Write throughput per shard | 1 MB/s or 1,000 records/s |

| Read throughput per shard (shared) | 2 MB/s, GetRecords 5 calls/s |

| Read throughput per shard (enhanced fan-out) | 2 MB/s per consumer |

| Maximum registered consumers (enhanced fan-out) | 20 per stream |

| Data retention | 24 hours (default) to 365 days |

| Maximum shards per stream | 500 default (increase requestable) |

9. Summary

The AWS Kinesis family provides a comprehensive solution for real-time data streaming.

- **Kinesis Data Streams**: The core of real-time data collection and processing. Scalable shard-based

architecture with diverse consumer patterns through KCL and enhanced fan-out.

- **Amazon Data Firehose**: Automatic delivery pipeline to S3, Redshift, OpenSearch, and more.

Easy to use with no shard management.

- **Managed Flink**: A managed stream analytics service based on Apache Flink,

supporting both SQL and code-based analytics.

- **Video Streams**: A specialized service for video/media data ingestion, storage, and playback.

In the next post, we will cover practical Kinesis architecture patterns and comparisons with Apache Kafka and SQS.

현재 단락 (1/488)

Traditional data processing relied on batch methods -- collecting data over a period and processing ...

작성 글자: 0원문 글자: 16,579작성 단락: 0/488