Skip to content

Split View: 실시간 금융 데이터 파이프라인 구축: Kafka, Flink 스트리밍 아키텍처 실전 가이드

✨ Learn with Quiz
|

실시간 금융 데이터 파이프라인 구축: Kafka, Flink 스트리밍 아키텍처 실전 가이드

실시간 금융 데이터 파이프라인

들어가며

금융 시장에서 데이터의 속도는 곧 경쟁력이다. 주식 호가 데이터, FX 환율, 채권 수익률, 파생상품 가격 등 초당 수십만 건의 이벤트가 발생하며, 이를 밀리초 단위로 수집하고 처리하여 의사결정에 반영해야 한다. 배치 처리 기반의 전통적인 ETL 파이프라인으로는 이러한 실시간 요구사항을 충족할 수 없다.

Apache Kafka와 Apache Flink는 실시간 금융 데이터 파이프라인의 핵심 구성 요소다. Kafka는 고처리량, 내구성 있는 메시지 브로커로서 시장 데이터의 수집과 전달을 담당하고, Flink는 상태 기반 스트림 처리 엔진으로서 복잡한 이벤트 처리(CEP), 윈도우 집계, 이상 탐지를 수행한다.

이 글에서는 실시간 금융 데이터 파이프라인의 아키텍처 설계부터 Kafka 프로듀서 구현, Flink 스트림 처리, CDC 연동, 이상 거래 탐지, 저지연 최적화, 장애 복구 전략까지 실전 코드와 함께 다룬다.

주의: 이 글의 코드 예제는 교육 목적이며, 프로덕션 환경에서는 보안, 규정 준수, 성능 튜닝 등 추가 고려가 필요합니다.

금융 데이터 파이프라인 아키텍처

전체 아키텍처 개요

실시간 금융 데이터 파이프라인은 다음과 같은 계층 구조를 가진다.

계층구성 요소역할지연 시간 목표
수집 계층Kafka Producer, FIX Gateway시장 데이터, 주문 이벤트 수집1ms 이하
메시징 계층Apache Kafka이벤트 버퍼링, 라우팅, 내구성 보장2-5ms
처리 계층Apache Flink스트림 처리, 윈도우 집계, CEP10-50ms
저장 계층TimescaleDB, Apache Iceberg시계열 저장, 분석용 데이터 레이크100ms 이하
서빙 계층Redis, gRPC API실시간 대시보드, 알림 서비스5ms 이하

데이터 흐름

거래소/데이터 벤더 --> Kafka Producer --> Kafka Cluster --> Flink Job --> 분석/알림
                                              |                            |
                                         CDC Connector              TimescaleDB
                                              |                            |
                                         Legacy DB                   Iceberg Lake

핵심 설계 원칙은 다음과 같다.

  1. 이벤트 소싱(Event Sourcing): 모든 상태 변경을 불변 이벤트로 기록
  2. 역압(Backpressure) 전파: 처리 속도에 맞춰 수집 속도를 자동 조절
  3. Exactly-Once 시맨틱스: 장애 시에도 데이터 정확성 보장
  4. 스키마 진화(Schema Evolution): Avro/Protobuf 기반 스키마 레지스트리 활용

Kafka 시장 데이터 수집

Kafka 프로듀서 구현

시장 데이터를 수집하여 Kafka 토픽으로 전송하는 프로듀서를 구현한다. 저지연과 내구성을 동시에 만족시키기 위해 acks, linger.ms, batch.size 등을 세밀하게 튜닝한다.

from confluent_kafka import Producer, KafkaError
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
import time

# Avro 스키마 정의
MARKET_DATA_SCHEMA = """
{
  "type": "record",
  "name": "MarketData",
  "namespace": "com.finance.marketdata",
  "fields": [
    {"name": "symbol", "type": "string"},
    {"name": "price", "type": "double"},
    {"name": "volume", "type": "long"},
    {"name": "bid", "type": "double"},
    {"name": "ask", "type": "double"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "exchange", "type": "string"},
    {"name": "event_type", "type": {"type": "enum", "name": "EventType",
      "symbols": ["TRADE", "QUOTE", "BBO"]}}
  ]
}
"""

class MarketDataProducer:
    def __init__(self, bootstrap_servers: str, schema_registry_url: str):
        self.schema_registry = SchemaRegistryClient(
            {"url": schema_registry_url}
        )
        self.avro_serializer = AvroSerializer(
            self.schema_registry, MARKET_DATA_SCHEMA
        )

        self.producer = Producer({
            "bootstrap.servers": bootstrap_servers,
            "acks": "all",                # 모든 ISR 복제본 확인 (내구성)
            "linger.ms": 1,               # 1ms 배치 대기 (지연 vs 처리량 균형)
            "batch.size": 16384,          # 16KB 배치 크기
            "compression.type": "lz4",    # LZ4 압축 (속도 우선)
            "max.in.flight.requests.per.connection": 5,
            "enable.idempotence": True,   # 멱등성 보장
            "retries": 3,
            "retry.backoff.ms": 100,
            "partitioner": "murmur2",     # 심볼 기반 파티셔닝
        })

    def send_market_data(self, symbol: str, data: dict):
        """시장 데이터를 Kafka로 전송"""
        try:
            self.producer.produce(
                topic="market-data-raw",
                key=symbol.encode("utf-8"),
                value=self.avro_serializer(
                    data,
                    SerializationContext("market-data-raw", MessageField.VALUE)
                ),
                callback=self._delivery_callback,
                timestamp=int(time.time() * 1000),
            )
            self.producer.poll(0)  # 비동기 콜백 처리
        except KafkaError as e:
            print(f"Kafka produce error: {e}")

    def _delivery_callback(self, err, msg):
        if err:
            print(f"Delivery failed for {msg.key()}: {err}")
        # 프로덕션에서는 메트릭 수집 (Prometheus counter 등)

    def flush(self):
        self.producer.flush(timeout=5)

토픽 설계 전략

금융 데이터의 토픽 설계에서 파티셔닝 전략은 성능에 직접적인 영향을 미친다.

전략파티션 키장점단점
심볼 기반종목 코드동일 종목 순서 보장핫 심볼 편중
거래소 기반거래소 ID거래소별 독립 처리심볼 순서 미보장
해시 기반복합 키 해시균등 분산순서 보장 어려움
타임스탬프 기반시간 버킷시간 순서 처리파티션 수 예측 어려움

프로덕션에서는 심볼 기반 파티셔닝을 기본으로 사용하되, 핫 심볼(예: AAPL, TSLA 등 거래량 상위 종목)에 대해서는 서브 파티셔닝을 적용한다.

스트림 처리 프레임워크 비교

특성Apache FlinkSpark Structured StreamingKafka StreamsApache Storm
처리 모델네이티브 스트림마이크로 배치네이티브 스트림네이티브 스트림
지연 시간밀리초 수준100ms-수초밀리초 수준밀리초 수준
상태 관리RocksDB 기반 대규모 상태Spark 메모리/디스크로컬 RocksDB외부 저장소 필요
Exactly-Once체크포인트 기반 지원체크포인트 기반 지원트랜잭션 기반At-Least-Once
SQL 지원Flink SQL (완전한 SQL)Spark SQL (풍부한 생태계)KSQL (제한적)미지원
배포 모델독립 클러스터Spark 클러스터라이브러리 (JVM 내장)독립 클러스터
이벤트 타임 처리워터마크 기반 고급 지원워터마크 지원제한적 지원제한적
적합 용도저지연 CEP, 금융 실시간 처리대규모 배치+스트림 통합Kafka 네이티브 마이크로서비스레거시 스트림 시스템

금융 도메인에서는 밀리초 수준 지연, 대규모 상태 관리, 이벤트 타임 처리가 필수적이므로 Apache Flink가 가장 적합하다.

Flink SQL을 사용하면 복잡한 스트림 처리 로직을 선언적으로 표현할 수 있다.

-- Kafka 소스 테이블 정의
CREATE TABLE market_data_raw (
    symbol        STRING,
    price         DOUBLE,
    volume        BIGINT,
    bid           DOUBLE,
    ask           DOUBLE,
    event_time    TIMESTAMP(3),
    exchange      STRING,
    event_type    STRING,
    -- 워터마크: 최대 5초 지연 허용
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'market-data-raw',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'properties.group.id' = 'flink-market-processor',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081',
    'scan.startup.mode' = 'latest-offset'
);

-- 1분 VWAP (거래량 가중 평균 가격) 집계
CREATE TABLE vwap_1m (
    symbol        STRING,
    window_start  TIMESTAMP(3),
    window_end    TIMESTAMP(3),
    vwap          DOUBLE,
    total_volume  BIGINT,
    trade_count   BIGINT,
    high_price    DOUBLE,
    low_price     DOUBLE,
    PRIMARY KEY (symbol, window_start) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://timescaledb:5432/market',
    'table-name' = 'vwap_1m'
);

INSERT INTO vwap_1m
SELECT
    symbol,
    window_start,
    window_end,
    SUM(price * volume) / SUM(volume) AS vwap,
    SUM(volume) AS total_volume,
    COUNT(*) AS trade_count,
    MAX(price) AS high_price,
    MIN(price) AS low_price
FROM TABLE(
    TUMBLE(TABLE market_data_raw, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
WHERE event_type = 'TRADE'
GROUP BY symbol, window_start, window_end;

윈도우 집계와 분석

윈도우 유형별 활용

금융 데이터 분석에서 윈도우 전략은 분석 목적에 따라 다르게 선택한다.

윈도우 유형설명금융 활용 예시
Tumbling Window고정 크기, 비중첩1분/5분/1시간 OHLCV 봉 생성
Sliding Window고정 크기, 중첩 허용이동 평균(MA), 변동성 계산
Session Window활동 기반 동적 크기트레이더 세션별 거래 분석
Global Window커스텀 트리거장 시작/종료 이벤트 기반 집계
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class MovingAverageJob {

    public static DataStream<PriceAlert> computeMovingAverage(
            DataStream<MarketData> marketStream) {

        return marketStream
            .keyBy(MarketData::getSymbol)
            .window(SlidingEventTimeWindows.of(
                Time.minutes(5),   // 윈도우 크기: 5분
                Time.seconds(30)   // 슬라이드 간격: 30초
            ))
            .process(new ProcessWindowFunction<MarketData, PriceAlert,
                     String, TimeWindow>() {
                @Override
                public void process(String symbol,
                        Context context,
                        Iterable<MarketData> elements,
                        Collector<PriceAlert> out) {

                    double sum = 0;
                    long count = 0;
                    double maxPrice = Double.MIN_VALUE;
                    double minPrice = Double.MAX_VALUE;

                    for (MarketData data : elements) {
                        sum += data.getPrice();
                        count++;
                        maxPrice = Math.max(maxPrice, data.getPrice());
                        minPrice = Math.min(minPrice, data.getPrice());
                    }

                    double ma = sum / count;
                    double volatility = (maxPrice - minPrice) / ma;

                    // 변동성이 임계값을 초과하면 알림 생성
                    if (volatility > 0.02) {
                        out.collect(new PriceAlert(
                            symbol,
                            ma,
                            volatility,
                            context.window().getEnd(),
                            "HIGH_VOLATILITY"
                        ));
                    }
                }
            });
    }
}

CDC 연동

Debezium CDC 커넥터 설정

레거시 금융 시스템의 데이터베이스 변경 사항을 실시간으로 캡처하여 Kafka로 전송한다. Debezium은 데이터베이스 트랜잭션 로그를 읽어 행 수준 변경을 이벤트로 변환한다.

# debezium-postgres-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: finance-cdc-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 3
  config:
    # 데이터베이스 연결
    database.hostname: postgres-finance.internal
    database.port: '5432'
    database.user: cdc_reader
    database.password: 'VAULT_SECRET_REF'
    database.dbname: finance_core
    database.server.name: finance-cdc

    # CDC 설정
    plugin.name: pgoutput
    slot.name: flink_cdc_slot
    publication.name: finance_pub

    # 캡처 대상 테이블
    table.include.list: >
      public.orders,
      public.transactions,
      public.positions,
      public.account_balances

    # 스키마 및 변환
    key.converter: io.confluent.connect.avro.AvroConverter
    value.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry:8081
    value.converter.schema.registry.url: http://schema-registry:8081

    # 스냅샷 모드
    snapshot.mode: initial
    snapshot.locking.mode: none

    # 하트비트 (WAL 보존 방지)
    heartbeat.interval.ms: '10000'

    # 토픽 라우팅
    transforms: route
    transforms.route.type: org.apache.kafka.connect.transforms.RegexRouter
    transforms.route.regex: 'finance-cdc.public.(.*)'
    transforms.route.replacement: 'cdc.$1'

    # 장애 복구
    errors.tolerance: all
    errors.deadletterqueue.topic.name: cdc-dlq
    errors.deadletterqueue.context.headers.enable: true

CDC 데이터 처리 주의사항

CDC를 금융 시스템에 적용할 때 반드시 고려해야 할 사항이 있다.

  • 트랜잭션 경계 보존: Debezium의 provide.transaction.metadata 옵션을 활성화하여 트랜잭션 단위로 이벤트를 그룹화
  • 스키마 변경 대응: DDL 변경 시 커넥터 재시작 없이 스키마 진화를 처리하도록 스키마 레지스트리와 호환성 모드 설정
  • WAL 디스크 사용량 모니터링: 커넥터 장애 시 WAL 파일이 누적되어 디스크 고갈 위험이 있으므로, 하트비트와 슬롯 모니터링 필수
  • 순서 보장: 동일 레코드의 변경 순서가 보장되어야 하므로, 테이블 PK를 메시지 키로 사용

이상 거래 탐지

Complex Event Processing(CEP)를 사용하면 시계열 이벤트에서 특정 패턴을 탐지할 수 있다. 다음은 급격한 가격 변동과 비정상적인 거래량을 탐지하는 Flink 잡이다.

# Flink PyFlink를 활용한 이상 거래 탐지
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit, call

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(10000)  # 10초 체크포인트

t_env = StreamTableEnvironment.create(env)

# 이상 거래 탐지 SQL
anomaly_detection_sql = """
WITH price_stats AS (
    SELECT
        symbol,
        price,
        volume,
        event_time,
        -- 지난 5분 이동 평균/표준편차
        AVG(price) OVER w AS price_ma,
        STDDEV_POP(price) OVER w AS price_std,
        AVG(volume) OVER w AS volume_ma
    FROM market_data_raw
    WINDOW w AS (
        PARTITION BY symbol
        ORDER BY event_time
        RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW
    )
),
anomalies AS (
    SELECT
        symbol,
        price,
        volume,
        event_time,
        price_ma,
        price_std,
        volume_ma,
        -- Z-Score 기반 이상 점수
        ABS(price - price_ma) / NULLIF(price_std, 0) AS price_zscore,
        CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) AS volume_ratio,
        CASE
            WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 3.0
                THEN 'PRICE_SPIKE'
            WHEN CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 5.0
                THEN 'VOLUME_SURGE'
            WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 2.0
                AND CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 3.0
                THEN 'COMBINED_ANOMALY'
            ELSE 'NORMAL'
        END AS anomaly_type
    FROM price_stats
)
SELECT * FROM anomalies
WHERE anomaly_type <> 'NORMAL'
"""

# 알림 싱크 테이블 (Kafka 토픽으로 전송)
t_env.execute_sql("""
CREATE TABLE anomaly_alerts (
    symbol        STRING,
    price         DOUBLE,
    volume        BIGINT,
    event_time    TIMESTAMP(3),
    anomaly_type  STRING,
    price_zscore  DOUBLE,
    volume_ratio  DOUBLE
) WITH (
    'connector' = 'kafka',
    'topic' = 'anomaly-alerts',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'format' = 'json'
)
""")

이상 탐지 임계값 기준

이상 유형탐지 조건심각도대응 조치
급격한 가격 변동(Price Spike)Z-Score 3.0 초과Critical즉시 알림, 자동 거래 중단
거래량 급증(Volume Surge)평균 대비 5배 초과Warning모니터링 강화, 담당자 알림
복합 이상(Combined)가격 Z 2.0 + 거래량 3배High포지션 리스크 재평가
스프레드 확대(Spread Widening)Bid-Ask 스프레드 2배 초과Warning유동성 모니터링

저지연 최적화

메시지 브로커 비교 (금융 관점)

특성Apache KafkaSolace PubSub+TIBCO EMS29West (Informatica)
지연 시간2-10ms마이크로초 수준밀리초 수준마이크로초 수준
처리량초당 수백만 메시지초당 수백만 메시지초당 수만 메시지초당 수백만 메시지
내구성디스크 기반 로그메모리+디스크디스크 기반메모리 기반
프로토콜자체 프로토콜, RESTAMQP, MQTT, RESTJMS, AMQP멀티캐스트 UDP
비용오픈소스상용 라이선스상용 라이선스상용 라이선스
적합 용도이벤트 소싱, 범용 스트리밍하이브리드 클라우드 금융레거시 금융 통합초저지연 트레이딩

Kafka 저지연 튜닝 체크리스트

  1. OS 레벨: 페이지 캐시 최적화, vm.swappiness=1, 전용 디스크 (NVMe SSD)
  2. JVM 레벨: G1GC 또는 ZGC 사용, 힙 사이즈 6-8GB, GC 로그 모니터링
  3. Kafka 브로커: num.io.threads 증가, log.flush.interval.messages=1 (내구성 우선 시)
  4. 네트워크: TCP 소켓 버퍼 크기 조정 (socket.send.buffer.bytes, socket.receive.buffer.bytes)
  5. 프로듀서: linger.ms=0-1, batch.size 적정화, compression.type=lz4
  6. 컨슈머: fetch.min.bytes=1, fetch.max.wait.ms 최소화
# flink-conf.yaml - 금융 워크로드 최적화
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

# 상태 백엔드 (대규모 상태를 위한 RocksDB)
state.backend: rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true

# 체크포인트 설정
execution.checkpointing.interval: 10s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 5s
execution.checkpointing.timeout: 60s
execution.checkpointing.max-concurrent-checkpoints: 1

# 네트워크 버퍼
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 128mb
taskmanager.network.memory.max: 1gb

# 워터마크 설정
pipeline.auto-watermark-interval: 200ms

# 재시작 전략
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 60s
restart-strategy.exponential-delay.backoff-multiplier: 2.0

장애 복구와 데이터 정합성

장애 시나리오별 대응 전략

장애 유형영향복구 전략RTO/RPO
Kafka 브로커 장애파티션 리더 변경ISR 자동 리더 선출, min.insync.replicas=2RTO 수초, RPO 0
Flink TaskManager 장애처리 중단체크포인트에서 자동 복구RTO 10-30초, RPO 체크포인트 간격
네트워크 단절데이터 유실 위험프로듀서 재시도, 컨슈머 오프셋 롤백네트워크 복구 시간 의존
스키마 레지스트리 장애직렬화 실패로컬 캐시 활용, 다중 인스턴스RTO 수초
CDC 슬롯 유실변경 이벤트 누락스냅샷 재실행, 이벤트 보정RTO 수분-수시간

Exactly-Once 시맨틱스의 현실

이론적으로 Exactly-Once 처리를 보장하지만, 실제로는 다음과 같은 엣지 케이스가 존재한다.

  • 프로듀서 타임아웃: 브로커에서 메시지를 받았지만 ACK가 프로듀서에 도달하지 못하면, 프로듀서가 재전송하여 중복이 발생할 수 있다. 이를 방지하기 위해 enable.idempotence=true를 설정한다.
  • Flink 체크포인트 실패: 체크포인트가 완료되기 전 장애가 발생하면, 마지막 성공한 체크포인트로 롤백된다. 이 경우 체크포인트 간격만큼의 데이터가 재처리된다.
  • 백프레셔 누적: 다운스트림 처리 속도가 느리면 Kafka 컨슈머의 lag이 증가하고, 극단적인 경우 메모리 부족으로 잡이 실패할 수 있다. Flink의 크레딧 기반 흐름 제어가 이를 완화하지만, 근본적으로는 처리 용량을 확보해야 한다.

데이터 손실 방지 패턴

[Producer]                [Kafka]               [Flink]
    |  -- produce(msg) -->   |                      |
    |  <-- ack (success) --  |                      |
    |                        |  -- consume(msg) -->  |
    |                        |                  [process]
    |                        |                  [checkpoint]
    |                        |  <-- commit offset -- |
    |                        |                      |
    |   === 브로커 장애 ===   |                      |
    |  -- produce(msg) -->   X (실패)               |
    |  -- retry -->          | (새 리더)             |
    |  <-- ack (success) --  |                      |

핵심은 프로듀서의 멱등성 + Kafka의 ISR 복제 + Flink의 체크포인트가 삼중으로 데이터 정합성을 보장하는 것이다.

운영 체크리스트

배포 전 점검

  • Kafka 클러스터 최소 3 브로커, replication.factor=3, min.insync.replicas=2
  • Flink 체크포인트 저장소를 S3/HDFS 등 외부 스토리지로 설정
  • 스키마 레지스트리 호환성 모드: BACKWARD (기본) 또는 FULL
  • 데드 레터 큐(DLQ) 토픽 생성 및 모니터링 설정
  • 네트워크 지연 측정: 프로듀서-브로커, 브로커-컨슈머 간 RTT 측정

모니터링 지표

지표설명경고 임계값
Consumer Lag컨슈머가 프로듀서 대비 뒤처진 메시지 수10,000 이상
End-to-End Latency프로듀서 전송부터 Flink 처리 완료까지100ms 초과
Checkpoint DurationFlink 체크포인트 소요 시간체크포인트 간격의 50% 초과
Kafka ISR ShrinkISR에서 빠진 복제본 수1 이상
GC Pause TimeJVM GC 멈춤 시간200ms 초과
Backpressure RatioFlink 태스크의 역압 비율0.5 초과

부하 테스트

프로덕션 배포 전, 예상 피크 트래픽의 2-3배 부하로 테스트해야 한다. 금융 시장에서 블랙 스완 이벤트 발생 시 평소 대비 10배 이상의 트래픽이 발생할 수 있으므로, 충분한 여유 용량을 확보해야 한다.

참고자료

Building Real-time Financial Data Pipelines: A Practical Guide to Kafka and Flink Streaming Architecture

Real-time Financial Data Pipeline

Introduction

In financial markets, speed of data is a competitive advantage. Hundreds of thousands of events per second -- stock quotes, FX rates, bond yields, derivative prices -- must be ingested and processed within milliseconds to inform trading decisions. Traditional batch-oriented ETL pipelines simply cannot meet these real-time requirements.

Apache Kafka and Apache Flink form the backbone of modern real-time financial data pipelines. Kafka serves as a high-throughput, durable message broker handling market data ingestion and delivery, while Flink acts as a stateful stream processing engine performing Complex Event Processing (CEP), windowed aggregation, and anomaly detection.

This guide covers the architecture design, Kafka producer implementation, Flink stream processing, CDC integration, anomaly detection, low-latency optimization, and failure recovery strategies for real-time financial data pipelines, complete with production-ready code examples.

Disclaimer: The code examples in this article are for educational purposes. Production deployments require additional considerations for security, regulatory compliance, and performance tuning.

Financial Data Pipeline Architecture

Architecture Overview

A real-time financial data pipeline follows a layered architecture.

LayerComponentsRoleLatency Target
IngestionKafka Producer, FIX GatewayMarket data and order event collectionSub-1ms
MessagingApache KafkaEvent buffering, routing, durability2-5ms
ProcessingApache FlinkStream processing, windowed aggregation, CEP10-50ms
StorageTimescaleDB, Apache IcebergTime-series storage, analytics data lakeSub-100ms
ServingRedis, gRPC APIReal-time dashboards, alert servicesSub-5ms

Data Flow

Exchange/Data Vendor --> Kafka Producer --> Kafka Cluster --> Flink Job --> Analytics/Alerts
                                                |                              |
                                           CDC Connector                  TimescaleDB
                                                |                              |
                                           Legacy DB                     Iceberg Lake

The core design principles are as follows.

  1. Event Sourcing: Record all state changes as immutable events
  2. Backpressure Propagation: Automatically throttle ingestion rate to match processing speed
  3. Exactly-Once Semantics: Guarantee data correctness even during failures
  4. Schema Evolution: Leverage Avro/Protobuf with a schema registry

Kafka Market Data Ingestion

Kafka Producer Implementation

The following producer ingests market data and publishes it to Kafka topics. Key configuration parameters like acks, linger.ms, and batch.size are tuned to balance low latency with durability.

from confluent_kafka import Producer, KafkaError
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
import time

# Avro schema definition
MARKET_DATA_SCHEMA = """
{
  "type": "record",
  "name": "MarketData",
  "namespace": "com.finance.marketdata",
  "fields": [
    {"name": "symbol", "type": "string"},
    {"name": "price", "type": "double"},
    {"name": "volume", "type": "long"},
    {"name": "bid", "type": "double"},
    {"name": "ask", "type": "double"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "exchange", "type": "string"},
    {"name": "event_type", "type": {"type": "enum", "name": "EventType",
      "symbols": ["TRADE", "QUOTE", "BBO"]}}
  ]
}
"""

class MarketDataProducer:
    def __init__(self, bootstrap_servers: str, schema_registry_url: str):
        self.schema_registry = SchemaRegistryClient(
            {"url": schema_registry_url}
        )
        self.avro_serializer = AvroSerializer(
            self.schema_registry, MARKET_DATA_SCHEMA
        )

        self.producer = Producer({
            "bootstrap.servers": bootstrap_servers,
            "acks": "all",                # Wait for all ISR replicas (durability)
            "linger.ms": 1,               # 1ms batch wait (latency vs throughput)
            "batch.size": 16384,          # 16KB batch size
            "compression.type": "lz4",    # LZ4 compression (speed priority)
            "max.in.flight.requests.per.connection": 5,
            "enable.idempotence": True,   # Idempotent producer
            "retries": 3,
            "retry.backoff.ms": 100,
            "partitioner": "murmur2",     # Symbol-based partitioning
        })

    def send_market_data(self, symbol: str, data: dict):
        """Send market data to Kafka"""
        try:
            self.producer.produce(
                topic="market-data-raw",
                key=symbol.encode("utf-8"),
                value=self.avro_serializer(
                    data,
                    SerializationContext("market-data-raw", MessageField.VALUE)
                ),
                callback=self._delivery_callback,
                timestamp=int(time.time() * 1000),
            )
            self.producer.poll(0)  # Async callback processing
        except KafkaError as e:
            print(f"Kafka produce error: {e}")

    def _delivery_callback(self, err, msg):
        if err:
            print(f"Delivery failed for {msg.key()}: {err}")
        # In production: collect metrics (Prometheus counters, etc.)

    def flush(self):
        self.producer.flush(timeout=5)

Topic Design Strategies

Partitioning strategy has a direct impact on performance for financial data topics.

StrategyPartition KeyAdvantagesDisadvantages
Symbol-basedTicker symbolOrdering guarantee per symbolHot symbol skew
Exchange-basedExchange IDIndependent processing per exchangeNo symbol ordering
Hash-basedComposite key hashEven distributionOrdering not guaranteed
Timestamp-basedTime bucketTemporal orderingUnpredictable partition count

In production, symbol-based partitioning is the default, with sub-partitioning applied to hot symbols (e.g., high-volume tickers like AAPL and TSLA).

Stream Processing Framework Comparison

FeatureApache FlinkSpark Structured StreamingKafka StreamsApache Storm
Processing ModelNative streamingMicro-batchNative streamingNative streaming
LatencyMillisecond-level100ms to secondsMillisecond-levelMillisecond-level
State ManagementRocksDB-based large stateSpark memory/diskLocal RocksDBExternal store required
Exactly-OnceCheckpoint-basedCheckpoint-basedTransaction-basedAt-Least-Once
SQL SupportFlink SQL (full SQL)Spark SQL (rich ecosystem)KSQL (limited)Not supported
Deployment ModelStandalone clusterSpark clusterLibrary (JVM-embedded)Standalone cluster
Event Time ProcessingAdvanced watermark supportWatermark supportLimited supportLimited
Best ForLow-latency CEP, financial real-timeLarge-scale batch+stream unifiedKafka-native microservicesLegacy stream systems

For financial workloads requiring millisecond latency, large-scale state management, and event time processing, Apache Flink is the strongest choice.

Flink SQL enables declarative expression of complex stream processing logic.

-- Kafka source table definition
CREATE TABLE market_data_raw (
    symbol        STRING,
    price         DOUBLE,
    volume        BIGINT,
    bid           DOUBLE,
    ask           DOUBLE,
    event_time    TIMESTAMP(3),
    exchange      STRING,
    event_type    STRING,
    -- Watermark: allow up to 5 seconds of late data
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'market-data-raw',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'properties.group.id' = 'flink-market-processor',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081',
    'scan.startup.mode' = 'latest-offset'
);

-- 1-minute VWAP (Volume Weighted Average Price) aggregation
CREATE TABLE vwap_1m (
    symbol        STRING,
    window_start  TIMESTAMP(3),
    window_end    TIMESTAMP(3),
    vwap          DOUBLE,
    total_volume  BIGINT,
    trade_count   BIGINT,
    high_price    DOUBLE,
    low_price     DOUBLE,
    PRIMARY KEY (symbol, window_start) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://timescaledb:5432/market',
    'table-name' = 'vwap_1m'
);

INSERT INTO vwap_1m
SELECT
    symbol,
    window_start,
    window_end,
    SUM(price * volume) / SUM(volume) AS vwap,
    SUM(volume) AS total_volume,
    COUNT(*) AS trade_count,
    MAX(price) AS high_price,
    MIN(price) AS low_price
FROM TABLE(
    TUMBLE(TABLE market_data_raw, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
WHERE event_type = 'TRADE'
GROUP BY symbol, window_start, window_end;

Windowed Aggregation and Analytics

Window Types and Their Use Cases

Different window strategies serve different analytical purposes in financial data processing.

Window TypeDescriptionFinancial Use Case
Tumbling WindowFixed-size, non-overlapping1m/5m/1h OHLCV candlestick generation
Sliding WindowFixed-size, overlappingMoving averages (MA), volatility calculation
Session WindowActivity-based, dynamic sizePer-trader session analysis
Global WindowCustom triggerMarket open/close event-based aggregation
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class MovingAverageJob {

    public static DataStream<PriceAlert> computeMovingAverage(
            DataStream<MarketData> marketStream) {

        return marketStream
            .keyBy(MarketData::getSymbol)
            .window(SlidingEventTimeWindows.of(
                Time.minutes(5),   // Window size: 5 minutes
                Time.seconds(30)   // Slide interval: 30 seconds
            ))
            .process(new ProcessWindowFunction<MarketData, PriceAlert,
                     String, TimeWindow>() {
                @Override
                public void process(String symbol,
                        Context context,
                        Iterable<MarketData> elements,
                        Collector<PriceAlert> out) {

                    double sum = 0;
                    long count = 0;
                    double maxPrice = Double.MIN_VALUE;
                    double minPrice = Double.MAX_VALUE;

                    for (MarketData data : elements) {
                        sum += data.getPrice();
                        count++;
                        maxPrice = Math.max(maxPrice, data.getPrice());
                        minPrice = Math.min(minPrice, data.getPrice());
                    }

                    double ma = sum / count;
                    double volatility = (maxPrice - minPrice) / ma;

                    // Generate alert when volatility exceeds threshold
                    if (volatility > 0.02) {
                        out.collect(new PriceAlert(
                            symbol,
                            ma,
                            volatility,
                            context.window().getEnd(),
                            "HIGH_VOLATILITY"
                        ));
                    }
                }
            });
    }
}

CDC Integration

Debezium CDC Connector Configuration

To capture real-time changes from legacy financial databases and stream them to Kafka, Debezium reads database transaction logs and converts row-level changes into events.

# debezium-postgres-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: finance-cdc-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 3
  config:
    # Database connection
    database.hostname: postgres-finance.internal
    database.port: '5432'
    database.user: cdc_reader
    database.password: 'VAULT_SECRET_REF'
    database.dbname: finance_core
    database.server.name: finance-cdc

    # CDC configuration
    plugin.name: pgoutput
    slot.name: flink_cdc_slot
    publication.name: finance_pub

    # Target tables
    table.include.list: >
      public.orders,
      public.transactions,
      public.positions,
      public.account_balances

    # Schema and transforms
    key.converter: io.confluent.connect.avro.AvroConverter
    value.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry:8081
    value.converter.schema.registry.url: http://schema-registry:8081

    # Snapshot mode
    snapshot.mode: initial
    snapshot.locking.mode: none

    # Heartbeat (prevent WAL retention)
    heartbeat.interval.ms: '10000'

    # Topic routing
    transforms: route
    transforms.route.type: org.apache.kafka.connect.transforms.RegexRouter
    transforms.route.regex: 'finance-cdc.public.(.*)'
    transforms.route.replacement: 'cdc.$1'

    # Error handling
    errors.tolerance: all
    errors.deadletterqueue.topic.name: cdc-dlq
    errors.deadletterqueue.context.headers.enable: true

CDC Considerations for Financial Systems

Critical factors when applying CDC to financial systems include the following.

  • Transaction boundary preservation: Enable Debezium's provide.transaction.metadata to group events by transaction
  • Schema change handling: Configure schema registry compatibility modes to handle DDL changes without connector restarts
  • WAL disk usage monitoring: Connector failures cause WAL file accumulation that can exhaust disk space; heartbeat and slot monitoring are essential
  • Ordering guarantees: Use the table primary key as the message key to ensure change ordering for the same record

Anomaly Detection

Complex Event Processing (CEP) enables detection of specific patterns in time-series events. The following Flink job detects sudden price movements and abnormal trading volumes.

# Anomaly detection using PyFlink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit, call

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(10000)  # 10-second checkpoint interval

t_env = StreamTableEnvironment.create(env)

# Anomaly detection SQL
anomaly_detection_sql = """
WITH price_stats AS (
    SELECT
        symbol,
        price,
        volume,
        event_time,
        -- 5-minute moving average and standard deviation
        AVG(price) OVER w AS price_ma,
        STDDEV_POP(price) OVER w AS price_std,
        AVG(volume) OVER w AS volume_ma
    FROM market_data_raw
    WINDOW w AS (
        PARTITION BY symbol
        ORDER BY event_time
        RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW
    )
),
anomalies AS (
    SELECT
        symbol,
        price,
        volume,
        event_time,
        price_ma,
        price_std,
        volume_ma,
        -- Z-Score based anomaly scoring
        ABS(price - price_ma) / NULLIF(price_std, 0) AS price_zscore,
        CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) AS volume_ratio,
        CASE
            WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 3.0
                THEN 'PRICE_SPIKE'
            WHEN CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 5.0
                THEN 'VOLUME_SURGE'
            WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 2.0
                AND CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 3.0
                THEN 'COMBINED_ANOMALY'
            ELSE 'NORMAL'
        END AS anomaly_type
    FROM price_stats
)
SELECT * FROM anomalies
WHERE anomaly_type <> 'NORMAL'
"""

# Alert sink table (published to Kafka topic)
t_env.execute_sql("""
CREATE TABLE anomaly_alerts (
    symbol        STRING,
    price         DOUBLE,
    volume        BIGINT,
    event_time    TIMESTAMP(3),
    anomaly_type  STRING,
    price_zscore  DOUBLE,
    volume_ratio  DOUBLE
) WITH (
    'connector' = 'kafka',
    'topic' = 'anomaly-alerts',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'format' = 'json'
)
""")

Anomaly Detection Thresholds

Anomaly TypeDetection ConditionSeverityResponse Action
Price SpikeZ-Score exceeds 3.0CriticalImmediate alert, automatic trade halt
Volume SurgeOver 5x average volumeWarningEnhanced monitoring, notify trader
Combined AnomalyPrice Z 2.0 + Volume 3xHighPosition risk reassessment
Spread WideningBid-Ask spread exceeds 2xWarningLiquidity monitoring

Low-Latency Optimization

Message Broker Comparison for Finance

FeatureApache KafkaSolace PubSub+TIBCO EMS29West (Informatica)
Latency2-10msMicrosecond-levelMillisecond-levelMicrosecond-level
ThroughputMillions msg/secMillions msg/secTens of thousands msg/secMillions msg/sec
DurabilityDisk-based logMemory + DiskDisk-basedMemory-based
ProtocolNative, RESTAMQP, MQTT, RESTJMS, AMQPMulticast UDP
CostOpen sourceCommercial licenseCommercial licenseCommercial license
Best ForEvent sourcing, general streamingHybrid cloud financeLegacy financial integrationUltra-low-latency trading

Kafka Low-Latency Tuning Checklist

  1. OS Level: Page cache optimization, vm.swappiness=1, dedicated disks (NVMe SSD)
  2. JVM Level: Use G1GC or ZGC, heap size 6-8GB, GC log monitoring
  3. Kafka Broker: Increase num.io.threads, set log.flush.interval.messages=1 for durability
  4. Network: Tune TCP socket buffer sizes (socket.send.buffer.bytes, socket.receive.buffer.bytes)
  5. Producer: linger.ms=0-1, optimize batch.size, compression.type=lz4
  6. Consumer: fetch.min.bytes=1, minimize fetch.max.wait.ms
# flink-conf.yaml - Financial workload optimization
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

# State backend (RocksDB for large state)
state.backend: rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true

# Checkpoint configuration
execution.checkpointing.interval: 10s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 5s
execution.checkpointing.timeout: 60s
execution.checkpointing.max-concurrent-checkpoints: 1

# Network buffers
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 128mb
taskmanager.network.memory.max: 1gb

# Watermark configuration
pipeline.auto-watermark-interval: 200ms

# Restart strategy
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 60s
restart-strategy.exponential-delay.backoff-multiplier: 2.0

Failure Recovery and Data Consistency

Recovery Strategies by Failure Scenario

Failure TypeImpactRecovery StrategyRTO/RPO
Kafka Broker FailurePartition leader changeAutomatic ISR leader election, min.insync.replicas=2RTO: seconds, RPO: 0
Flink TaskManager FailureProcessing interruptionAutomatic recovery from checkpointRTO: 10-30s, RPO: checkpoint interval
Network PartitionData loss riskProducer retries, consumer offset rollbackDepends on network recovery
Schema Registry FailureSerialization failureLocal cache, multiple instancesRTO: seconds
CDC Slot LossMissing change eventsSnapshot re-execution, event reconciliationRTO: minutes to hours

The Reality of Exactly-Once Semantics

While exactly-once processing is theoretically guaranteed, the following edge cases exist in practice.

  • Producer timeout: If the broker receives a message but the ACK fails to reach the producer, the producer retries and may cause duplicates. Setting enable.idempotence=true prevents this.
  • Flink checkpoint failure: If a failure occurs before a checkpoint completes, the system rolls back to the last successful checkpoint. Data within the checkpoint interval gets reprocessed.
  • Backpressure accumulation: When downstream processing is slow, Kafka consumer lag increases. In extreme cases, memory exhaustion can crash the job. Flink's credit-based flow control mitigates this, but fundamentally, sufficient processing capacity must be provisioned.

Data Loss Prevention Pattern

[Producer]                [Kafka]               [Flink]
    |  -- produce(msg) -->   |                      |
    |  <-- ack (success) --  |                      |
    |                        |  -- consume(msg) -->  |
    |                        |                  [process]
    |                        |                  [checkpoint]
    |                        |  <-- commit offset -- |
    |                        |                      |
    |   === Broker Failure ===                      |
    |  -- produce(msg) -->   X (fail)               |
    |  -- retry -->          | (new leader)         |
    |  <-- ack (success) --  |                      |

The key insight is that producer idempotency + Kafka ISR replication + Flink checkpointing provide triple-layered data consistency guarantees.

Operations Checklist

Pre-deployment Verification

  • Kafka cluster: minimum 3 brokers, replication.factor=3, min.insync.replicas=2
  • Flink checkpoint storage on external storage (S3/HDFS)
  • Schema registry compatibility mode: BACKWARD (default) or FULL
  • Dead Letter Queue (DLQ) topic created with monitoring configured
  • Network latency measurement: producer-to-broker and broker-to-consumer RTT

Monitoring Metrics

MetricDescriptionAlert Threshold
Consumer LagMessages behind producerOver 10,000
End-to-End LatencyProducer send to Flink processing completeOver 100ms
Checkpoint DurationTime for Flink checkpoint completionOver 50% of checkpoint interval
Kafka ISR ShrinkReplicas dropped from ISR1 or more
GC Pause TimeJVM garbage collection pauseOver 200ms
Backpressure RatioFlink task backpressure ratioOver 0.5

Load Testing

Before production deployment, test with 2-3x expected peak traffic. During black swan events in financial markets, traffic can spike 10x or more above normal, so sufficient capacity headroom is essential.

References