Skip to content
Published on

[AWS] Kinesis 완전 가이드: 실시간 스트리밍 데이터 처리의 모든 것

Authors

1. 스트리밍 데이터란 무엇인가

1.1 배치 처리 vs 스트리밍 처리

전통적인 데이터 처리는 배치(batch) 방식이었습니다. 일정 기간 데이터를 모아서 한꺼번에 처리하는 방식입니다. 하지만 현대 애플리케이션에서는 실시간 반응이 필수적입니다.

배치 처리 (Batch Processing)
============================
[데이터 수집] --> [저장소] --> [주기적 처리] --> [결과]
     |                            |
     +---- 수 분 ~ 수 시간 -------+

스트리밍 처리 (Stream Processing)
==================================
[데이터 생성] --> [스트림] --> [즉시 처리] --> [결과]
     |                           |
     +---- 밀리초 ~ 수 초 -------+

1.2 스트리밍 데이터의 활용 사례

  • 실시간 로그 분석: 서버 로그를 즉시 수집하여 이상 탐지
  • IoT 센서 데이터: 수백만 개 디바이스에서 발생하는 텔레메트리 데이터
  • 클릭스트림 분석: 사용자 행동을 실시간으로 추적하여 개인화
  • 금융 거래 모니터링: 사기 탐지를 위한 실시간 트랜잭션 분석
  • 소셜 미디어 피드: 실시간 트렌드 분석과 감성 분석

2. AWS Kinesis 패밀리 개요

AWS Kinesis는 실시간 스트리밍 데이터를 수집, 처리, 분석하기 위한 완전관리형 서비스 집합입니다.

+------------------------------------------------------------------+
|                    AWS Kinesis 패밀리                               |
+------------------------------------------------------------------+
|                                                                    |
|  +------------------+  +------------------+  +------------------+  |
|  | Kinesis Data     |  | Amazon Data      |  | Managed Service  |  |
|  | Streams          |  | Firehose         |  | for Apache Flink |  |
|  |                  |  |                  |  |                  |  |
|  | 실시간 데이터    |  | 전달 파이프라인  |  | 스트림 분석      |  |
|  | 스트리밍         |  | (S3, Redshift    |  | (SQL, Java,      |  |
|  |                  |  |  OpenSearch)  |  |  Python, Scala)  |  |
|  +------------------+  +------------------+  +------------------+  |
|                                                                    |
|  +------------------+                                              |
|  | Kinesis Video    |                                              |
|  | Streams          |                                              |
|  |                  |                                              |
|  | 비디오 스트리밍  |                                              |
|  | 수집 및 분석     |                                              |
|  +------------------+                                              |
+------------------------------------------------------------------+
서비스주요 용도데이터 타입지연 시간
Data Streams실시간 데이터 수집/처리레코드(바이트)실시간 (밀리초)
Data Firehose데이터 전달 파이프라인레코드(바이트)근실시간 (60초~)
Managed Flink스트림 분석/변환스트림 데이터실시간 (밀리초)
Video Streams비디오 수집/재생미디어 프레임실시간 (1~10초)

3. Kinesis Data Streams 심층 분석

3.1 핵심 아키텍처

Kinesis Data Streams는 대규모 실시간 데이터 스트리밍 서비스의 핵심입니다.

                        Kinesis Data Stream
+----------------------------------------------------------------------+
|                                                                      |
|  Producer        Shard 1          Shard 2          Shard 3           |
|  --------     +----------+     +----------+     +----------+         |
|  |App A | --> |Record 1  | --> |Record 4  | --> |Record 7  |         |
|  |App B | --> |Record 2  | --> |Record 5  | --> |Record 8  |         |
|  |App C | --> |Record 3  | --> |Record 6  | --> |Record 9  |         |
|  --------     +----------+     +----------+     +----------+         |
|                    |                |                |                |
|                    v                v                v                |
|               Consumer A       Consumer A       Consumer A           |
|               Consumer B       Consumer B       Consumer B           |
+----------------------------------------------------------------------+

3.2 핵심 구성 요소

스트림 (Stream)

  • 샤드의 논리적 그룹
  • 데이터 레코드의 순서가 보장되는 단위

샤드 (Shard)

  • 스트림의 기본 처리량 단위
  • 쓰기: 초당 1MB 또는 1,000 레코드
  • 읽기: 초당 2MB (공유 팬아웃), 초당 2MB/컨슈머 (향상된 팬아웃)

레코드 (Record)

  • 데이터의 기본 단위
  • 파티션 키, 시퀀스 번호, 데이터 블롭(최대 1MB)으로 구성

파티션 키 (Partition Key)

  • 레코드가 어떤 샤드에 할당될지 결정
  • MD5 해시를 사용하여 샤드에 매핑
  • 동일한 파티션 키를 가진 레코드는 같은 샤드에 순서대로 저장

시퀀스 번호 (Sequence Number)

  • 각 레코드에 자동 할당되는 고유 식별자
  • 샤드 내에서 레코드의 순서를 보장
파티션 키 해싱 과정
====================

Partition Key: "user-123"
         |
         v
    MD5("user-123") = 0x7A3B...
         |
         v
    Hash Range: 0 ~ 2^128 - 1
         |
         v
    Shard 1: [0 ~ 2^127]          <-- 이 범위에 매핑됨
    Shard 2: [2^127 ~ 2^128 - 1]

3.3 프로듀서 (Producers)

데이터를 Kinesis 스트림에 전송하는 방법은 여러 가지가 있습니다.

1) PutRecord / PutRecords API

가장 기본적인 방식입니다.

import boto3
import json

kinesis = boto3.client('kinesis', region_name='ap-northeast-2')

# 단일 레코드 전송
response = kinesis.put_record(
    StreamName='my-data-stream',
    Data=json.dumps({
        'event_type': 'page_view',
        'user_id': 'user-123',
        'page': '/products/laptop',
        'timestamp': '2026-03-20T10:30:00Z'
    }).encode('utf-8'),
    PartitionKey='user-123'
)
print(f"Shard ID: {response['ShardId']}")
print(f"Sequence Number: {response['SequenceNumber']}")

# 다중 레코드 전송 (배치)
records = []
for i in range(100):
    records.append({
        'Data': json.dumps({
            'event_type': 'click',
            'user_id': f'user-{i % 10}',
            'element': 'buy_button',
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8'),
        'PartitionKey': f'user-{i % 10}'
    })

response = kinesis.put_records(
    StreamName='my-data-stream',
    Records=records
)
print(f"Failed records: {response['FailedRecordCount']}")

2) KPL (Kinesis Producer Library)

고성능 프로듀싱을 위한 라이브러리입니다.

  • 레코드 집계 (Aggregation): 여러 작은 레코드를 하나의 Kinesis 레코드로 묶음
  • 레코드 수집 (Collection): 여러 Kinesis 레코드를 하나의 PutRecords 호출로 묶음
  • 자동 재시도: 실패한 레코드를 자동으로 재전송
  • CloudWatch 메트릭: 자동으로 성능 메트릭을 발행
KPL 집계 과정
==============

User Record A (100 bytes) --+
User Record B (200 bytes) --+--> Kinesis Record (500 bytes)
User Record C (200 bytes) --+         |
                                      v
User Record D (300 bytes) --+    PutRecords API
User Record E (400 bytes) --+--> (단일 호출)
User Record F (100 bytes) --+

3) AWS SDK 직접 사용

다양한 언어의 AWS SDK를 통해 직접 API를 호출할 수 있습니다.

4) Kinesis Agent

서버에 설치하여 로그 파일을 자동으로 스트림에 전송하는 에이전트입니다.

3.4 컨슈머 (Consumers)

1) 공유 팬아웃 (Shared Fan-Out) - GetRecords

기본 컨슈머 방식으로, 샤드당 2MB/초의 읽기 처리량을 모든 컨슈머가 공유합니다.

import boto3
import json
import time

kinesis = boto3.client('kinesis', region_name='ap-northeast-2')

# 샤드 이터레이터 획득
response = kinesis.get_shard_iterator(
    StreamName='my-data-stream',
    ShardId='shardId-000000000000',
    ShardIteratorType='LATEST'  # TRIM_HORIZON, AT_SEQUENCE_NUMBER, AT_TIMESTAMP 등
)
shard_iterator = response['ShardIterator']

# 레코드 폴링
while True:
    response = kinesis.get_records(
        ShardIterator=shard_iterator,
        Limit=100
    )

    for record in response['Records']:
        data = json.loads(record['Data'].decode('utf-8'))
        print(f"Partition Key: {record['PartitionKey']}")
        print(f"Sequence Number: {record['SequenceNumber']}")
        print(f"Data: {data}")

    shard_iterator = response['NextShardIterator']

    # GetRecords는 샤드당 초당 5회까지 호출 가능
    time.sleep(0.2)

2) 향상된 팬아웃 (Enhanced Fan-Out) - SubscribeToShard

HTTP/2를 사용한 푸시 기반 전달 방식입니다.

공유 팬아웃 vs 향상된 팬아웃
===============================

공유 팬아웃 (Shared Fan-Out):
+--------+     +--------+
| Shard  | --> | 2MB/s  | --> Consumer A (폴링)
|        |     | (공유) | --> Consumer B (폴링)
+--------+     +--------+     Consumer C (폴링)

3개 컨슈머가 2MB/s를 나눠 사용
각 컨슈머: ~0.67 MB/s

향상된 팬아웃 (Enhanced Fan-Out):
+--------+     +--------+
| Shard  | --> | 2MB/s  | --> Consumer A (HTTP/2 푸시)
|        |     | 2MB/s  | --> Consumer B (HTTP/2 푸시)
|        |     | 2MB/s  | --> Consumer C (HTTP/2 푸시)
+--------+     +--------+

각 컨슈머가 전용 2MB/s 할당
지연 시간: ~70ms (공유: ~200ms+)

3.5 KCL (Kinesis Client Library)

KCL은 분산 컨슈머 애플리케이션 개발을 단순화하는 라이브러리입니다.

핵심 기능:

  • 리스 관리: DynamoDB 테이블을 사용하여 샤드별 소유권 추적
  • 체크포인팅: 처리 진행 상태를 DynamoDB에 저장하여 장애 복구 지원
  • 자동 로드 밸런싱: 워커 수에 따라 샤드 자동 재분배
  • 리샤딩 처리: 샤드 분할/병합 시 자동 대응
KCL 아키텍처
=============

                DynamoDB (Lease Table)
                +---------------------+
                | Shard ID | Worker   |
                |----------|----------|
                | shard-0  | worker-1 |
                | shard-1  | worker-2 |
                | shard-2  | worker-1 |
                | shard-3  | worker-2 |
                +---------------------+
                      ^         ^
                      |         |
              +-------+         +-------+
              |                         |
        +-----------+           +-----------+
        | Worker 1  |           | Worker 2  |
        | (EC2/ECS) |           | (EC2/ECS) |
        |           |           |           |
        | shard-0   |           | shard-1   |
        | shard-2   |           | shard-3   |
        +-----------+           +-----------+

3.6 데이터 보존 기간

  • 기본값: 24시간
  • 최대값: 365일
  • 보존 기간이 길수록 비용 증가
  • 데이터 리플레이가 필요한 경우 보존 기간 연장 활용

3.7 샤드 분할과 병합

샤드 분할 (Split)
==================
Shard 1 [0 ~ 100]
         |
    split at 50
         |
    +----+----+
    |         |
Shard 3    Shard 4
[0 ~ 50]  [51 ~ 100]

샤드 병합 (Merge)
==================
Shard 3 [0 ~ 50]     --> Shard 5
Shard 4 [51 ~ 100]   --> [0 ~ 100]
  • 분할: 핫 샤드의 부하를 분산할 때 사용
  • 병합: 트래픽이 줄어든 두 인접 샤드를 합칠 때 사용
  • 온디맨드 모드에서는 자동으로 처리

3.8 용량 모드

프로비저닝 모드 (Provisioned Mode)

  • 샤드 수를 직접 지정
  • 예측 가능한 워크로드에 적합
  • 샤드 시간당 과금

온디맨드 모드 (On-Demand Mode)

  • 자동으로 샤드 수 조정
  • 예측 불가능한 워크로드에 적합
  • 데이터 양 기반 과금
  • 2025년 신규: On-Demand Advantage 모드 (기존 대비 60% 저렴한 데이터 사용료)

4. Amazon Data Firehose (구 Kinesis Data Firehose)

4.1 개요

Amazon Data Firehose는 스트리밍 데이터를 데이터 레이크, 데이터 스토어, 분석 서비스에 안정적으로 전달하는 완전관리형 서비스입니다.

Amazon Data Firehose 아키텍처
================================

+----------+     +-----------+     +------------+     +----------+
| 소스     |     | Firehose  |     | 변환       |     | 대상     |
|          | --> | 전달      | --> | (선택)     | --> |          |
| - Direct |     | 스트림    |     | - Lambda   |     | - S3     |
| - Kinesis|     |           |     | - 포맷변환 |     | - Redshift|
|   Data   |     | 버퍼링:   |     | - 압축     |     | - Open-  |
|   Streams|     | - 크기    |     | - 암호화   |     |   Search |
| - MSK    |     | - 시간    |     |            |     | - Splunk |
|          |     |           |     |            |     | - HTTP   |
+----------+     +-----------+     +------------+     +----------+
                                                           |
                                                      +----------+
                                                      | 백업 S3  |
                                                      | (원본)   |
                                                      +----------+

4.2 핵심 특징

샤드 관리 불필요

  • Data Streams와 달리 샤드를 직접 관리할 필요 없음
  • 자동으로 확장/축소

버퍼링 설정

  • 크기 기반: 1MB ~ 128MB
  • 시간 기반: 60초 ~ 900초
  • 둘 중 하나라도 조건에 도달하면 전달

데이터 변환

  • Lambda 함수를 통한 사용자 정의 변환
  • Apache Parquet, ORC 등 열 지향 포맷으로 자동 변환
  • Gzip, Snappy, Zip 압축 지원
  • SSE-S3 또는 SSE-KMS 암호화

4.3 Firehose 전달 대상

대상특징
Amazon S3가장 일반적, Parquet/ORC 변환 가능
Amazon RedshiftS3 경유 후 COPY 명령으로 적재
Amazon OpenSearch로그 분석, 검색 인덱싱
Splunk보안 모니터링, 운영 로그
HTTP 엔드포인트사용자 정의 대상, Datadog 등
Snowflake클라우드 데이터 웨어하우스
Apache Iceberg오픈 테이블 포맷

4.4 코드 예제: Firehose 전송

import boto3
import json

firehose = boto3.client('firehose', region_name='ap-northeast-2')

# 단일 레코드 전송
response = firehose.put_record(
    DeliveryStreamName='my-firehose-stream',
    Record={
        'Data': json.dumps({
            'event_type': 'purchase',
            'user_id': 'user-456',
            'product': 'laptop',
            'amount': 1299.99,
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8')
    }
)

# 배치 전송
records = []
for i in range(50):
    records.append({
        'Data': json.dumps({
            'event_type': 'page_view',
            'user_id': f'user-{i}',
            'page': f'/product/{i}',
            'timestamp': '2026-03-20T10:30:00Z'
        }).encode('utf-8')
    })

response = firehose.put_record_batch(
    DeliveryStreamName='my-firehose-stream',
    Records=records
)
print(f"Failed records: {response['FailedPutCount']}")

5. Kinesis Video Streams

5.1 개요

Kinesis Video Streams는 카메라, RADAR, LIDAR, 드론 등에서 발생하는 비디오 및 미디어 데이터를 안전하게 수집하고 재생할 수 있는 완전관리형 서비스입니다.

5.2 주요 기능

Kinesis Video Streams 아키텍처
==================================

+----------+     +------------------+     +------------------+
| 디바이스 |     | Kinesis Video    |     | 소비자           |
|          | --> | Streams          | --> |                  |
| - 카메라 |     |                  |     | - HLS 재생       |
| - 드론   |     | - 자동 확장      |     | - DASH 재생      |
| - LIDAR  |     | - 내구성 저장    |     | - GetMedia API   |
| - 스마트 |     | - 암호화         |     | - Rekognition    |
||     |                  |     | - SageMaker      |
+----------+     +------------------+     +------------------+
  • HLS (HTTP Live Streaming): 웹 브라우저와 모바일에서 실시간 및 아카이브 재생 가능
  • WebRTC: 초저지연 양방향 미디어 스트리밍
  • Amazon Rekognition 연동: 얼굴 인식, 객체 탐지 등 컴퓨터 비전 분석
  • 스토리지 계층: 핫 스토리지(실시간 액세스)와 웜 스토리지(비용 효율적 보관)

6. 요금 모델

6.1 Kinesis Data Streams 요금

프로비저닝 모드:

항목비용 (미국 동부 기준)
샤드 시간~0.015 USD/샤드/시간
PUT 페이로드 유닛 (25KB)~0.014 USD/백만 유닛
향상된 팬아웃 데이터 검색~0.013 USD/GB
향상된 팬아웃 컨슈머 샤드 시간~0.015 USD/컨슈머/샤드/시간
장기 보존 (24시간 초과)~0.023 USD/샤드/시간

온디맨드 모드:

항목비용 (미국 동부 기준)
스트림 시간 (Standard)~0.04 USD/시간
데이터 쓰기 (Standard)~0.08 USD/GB
데이터 읽기 (Standard)~0.04 USD/GB
데이터 쓰기 (Advantage)~0.032 USD/GB
데이터 읽기 (Advantage)~0.016 USD/GB

6.2 Amazon Data Firehose 요금

항목비용 (미국 동부 기준)
데이터 수집 (처음 500TB/월)~0.029 USD/GB
포맷 변환~0.018 USD/GB
VPC 전달~0.01 USD/GB + 시간당 요금

7. 전체 아키텍처 예제: 실시간 분석 파이프라인

실시간 분석 파이프라인 아키텍처
=====================================

+----------+   +---------+   +----------+   +---------+   +----------+
| Web/     |   | Kinesis |   | Lambda   |   | Firehose|   | S3       |
| Mobile   |-->| Data    |-->| (변환/   |-->| 전달    |-->| (Data    |
| App      |   | Streams |   |  필터)   |   | 스트림  |   |  Lake)   |
+----------+   +---------+   +----------+   +---------+   +----------+
                    |                                           |
                    v                                           v
              +-----------+                              +----------+
              | Managed   |                              | Athena   |
              | Flink     |                              | (Ad-hoc  |
              | (실시간   |                              |  쿼리)   |
              |  분석)    |                              +----------+
              +-----------+                                    |
                    |                                          v
                    v                                    +----------+
              +-----------+                              | Quick-   |
              | DynamoDB  |                              | Sight    |
              | (실시간   |                              | (BI      |
              |  대시보드)|                              |  대시보드)|
              +-----------+                              +----------+

7.1 완전한 프로듀서/컨슈머 예제

# === producer.py ===
import boto3
import json
import time
import random
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='ap-northeast-2')
STREAM_NAME = 'clickstream-data'

def generate_click_event():
    """클릭스트림 이벤트 생성"""
    pages = ['/home', '/products', '/cart', '/checkout', '/profile']
    actions = ['view', 'click', 'scroll', 'submit']
    user_id = f'user-{random.randint(1, 1000)}'

    return {
        'user_id': user_id,
        'page': random.choice(pages),
        'action': random.choice(actions),
        'session_id': f'sess-{random.randint(1, 100)}',
        'timestamp': datetime.utcnow().isoformat() + 'Z',
        'device': random.choice(['mobile', 'desktop', 'tablet']),
        'country': random.choice(['KR', 'US', 'JP', 'DE'])
    }

def send_events(batch_size=50, interval=1.0):
    """이벤트를 Kinesis 스트림에 전송"""
    while True:
        records = []
        for _ in range(batch_size):
            event = generate_click_event()
            records.append({
                'Data': json.dumps(event).encode('utf-8'),
                'PartitionKey': event['user_id']
            })

        try:
            response = kinesis.put_records(
                StreamName=STREAM_NAME,
                Records=records
            )

            failed = response['FailedRecordCount']
            if failed > 0:
                print(f"Warning: {failed} records failed")

                # 실패한 레코드 재시도
                for i, record_response in enumerate(response['Records']):
                    if 'ErrorCode' in record_response:
                        print(f"  Error: {record_response['ErrorCode']}")
                        # 재시도 로직 구현
            else:
                print(f"Sent {batch_size} records successfully")

        except Exception as e:
            print(f"Error: {e}")

        time.sleep(interval)

if __name__ == '__main__':
    send_events()
# === consumer.py ===
import boto3
import json
import time

kinesis = boto3.client('kinesis', region_name='ap-northeast-2')
STREAM_NAME = 'clickstream-data'

def get_shard_ids():
    """스트림의 모든 샤드 ID를 반환"""
    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    return [
        shard['ShardId']
        for shard in response['StreamDescription']['Shards']
    ]

def process_records(records):
    """레코드 처리 로직"""
    page_views = {}
    for record in records:
        data = json.loads(record['Data'].decode('utf-8'))
        page = data.get('page', 'unknown')
        page_views[page] = page_views.get(page, 0) + 1

        # 이상 행동 탐지 예시
        if data.get('action') == 'submit' and data.get('page') == '/checkout':
            print(f"[ALERT] Checkout event: user={data['user_id']}")

    for page, count in page_views.items():
        print(f"  Page: {page}, Views: {count}")

def consume_stream():
    """스트림에서 데이터를 읽어 처리"""
    shard_ids = get_shard_ids()
    print(f"Found {len(shard_ids)} shards")

    shard_iterators = {}
    for shard_id in shard_ids:
        response = kinesis.get_shard_iterator(
            StreamName=STREAM_NAME,
            ShardId=shard_id,
            ShardIteratorType='LATEST'
        )
        shard_iterators[shard_id] = response['ShardIterator']

    while True:
        for shard_id in shard_ids:
            try:
                response = kinesis.get_records(
                    ShardIterator=shard_iterators[shard_id],
                    Limit=100
                )

                if response['Records']:
                    print(f"\n--- Shard: {shard_id} ---")
                    print(f"Records received: {len(response['Records'])}")
                    process_records(response['Records'])

                shard_iterators[shard_id] = response['NextShardIterator']

            except kinesis.exceptions.ExpiredIteratorException:
                # 이터레이터 만료 시 재생성
                response = kinesis.get_shard_iterator(
                    StreamName=STREAM_NAME,
                    ShardId=shard_id,
                    ShardIteratorType='LATEST'
                )
                shard_iterators[shard_id] = response['ShardIterator']

        time.sleep(1)

if __name__ == '__main__':
    consume_stream()

8. 주요 제한 사항 및 참고사항

항목제한
레코드 최대 크기1 MB
PutRecords 요청당 최대 레코드 수500개
PutRecords 요청당 최대 크기5 MB
샤드당 쓰기 처리량1 MB/초 또는 1,000 레코드/초
샤드당 읽기 처리량 (공유)2 MB/초, GetRecords 초당 5회
샤드당 읽기 처리량 (향상된 팬아웃)컨슈머당 2 MB/초
최대 등록 컨슈머 (향상된 팬아웃)스트림당 20개
데이터 보존24시간(기본) ~ 365일
스트림당 최대 샤드 수기본 500 (증가 요청 가능)

9. 정리

AWS Kinesis 패밀리는 실시간 데이터 스트리밍을 위한 포괄적인 솔루션을 제공합니다.

  • Kinesis Data Streams: 실시간 데이터 수집과 처리의 핵심. 샤드 기반 아키텍처로 확장 가능하며, KCL과 향상된 팬아웃을 통해 다양한 컨슈머 패턴을 지원합니다.
  • Amazon Data Firehose: 데이터를 S3, Redshift, OpenSearch 등에 자동으로 전달하는 파이프라인. 샤드 관리 없이 간편하게 사용할 수 있습니다.
  • Managed Flink: Apache Flink 기반의 관리형 스트림 분석 서비스로, SQL과 코드 기반 분석을 모두 지원합니다.
  • Video Streams: 비디오/미디어 데이터의 수집, 저장, 재생을 위한 전문 서비스입니다.

다음 포스트에서는 Kinesis의 실전 아키텍처 패턴과 Apache Kafka, SQS와의 비교를 다루겠습니다.