Skip to content
Published on

リアルタイム金融データパイプライン構築:Kafka、Flinkストリーミングアーキテクチャ実践ガイド

Authors
  • Name
    Twitter
リアルタイム金融データパイプライン

はじめに

金融市場ではデータの速度がそのまま競争力となる。株式気配値、FX為替レート、債券利回り、デリバティブ価格など、毎秒数十万件のイベントが発生し、これをミリ秒単位で収集・処理して意思決定に反映する必要がある。バッチ処理ベースの従来型ETLパイプラインでは、このようなリアルタイム要件を満たすことができない。

Apache KafkaとApache Flinkは、リアルタイム金融データパイプラインの中核を成すコンポーネントである。Kafkaは高スループット・耐久性のあるメッセージブローカーとして市場データの収集と配信を担い、Flinkはステートフルストリーム処理エンジンとして複合イベント処理(CEP)、ウィンドウ集計、異常検知を実行する。

本記事では、リアルタイム金融データパイプラインのアーキテクチャ設計からKafkaプロデューサー実装、Flinkストリーム処理、CDC連携、異常取引検知、低遅延最適化、障害復旧戦略まで、実践的なコードとともに解説する。

注意: 本記事のコード例は教育目的であり、本番環境ではセキュリティ、規制準拠、パフォーマンスチューニング等の追加考慮が必要です。

金融データパイプラインアーキテクチャ

全体アーキテクチャ概要

リアルタイム金融データパイプラインは以下のレイヤー構造を持つ。

レイヤー構成要素役割遅延目標
収集レイヤーKafka Producer、FIX Gateway市場データ・注文イベント収集1ms以下
メッセージングレイヤーApache Kafkaイベントバッファリング、ルーティング、耐久性保証2-5ms
処理レイヤーApache Flinkストリーム処理、ウィンドウ集計、CEP10-50ms
ストレージレイヤーTimescaleDB、Apache Iceberg時系列ストレージ、分析用データレイク100ms以下
サービングレイヤーRedis、gRPC APIリアルタイムダッシュボード、アラートサービス5ms以下

データフロー

取引所/データベンダー --> Kafka Producer --> Kafka Cluster --> Flink Job --> 分析/アラート
                                               |                             |
                                          CDC Connector                 TimescaleDB
                                               |                             |
                                          レガシーDB                    Iceberg Lake

コア設計原則は以下の通りである。

  1. イベントソーシング(Event Sourcing): すべての状態変更を不変のイベントとして記録
  2. バックプレッシャー伝播: 処理速度に合わせて収集速度を自動調整
  3. Exactly-Onceセマンティクス: 障害時でもデータの正確性を保証
  4. スキーマ進化(Schema Evolution): Avro/Protobufベースのスキーマレジストリを活用

Kafka市場データ収集

Kafkaプロデューサー実装

市場データを収集してKafkaトピックへ送信するプロデューサーを実装する。低遅延と耐久性を両立させるため、ackslinger.msbatch.size等を細かくチューニングする。

from confluent_kafka import Producer, KafkaError
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
import time

# Avroスキーマ定義
MARKET_DATA_SCHEMA = """
{
  "type": "record",
  "name": "MarketData",
  "namespace": "com.finance.marketdata",
  "fields": [
    {"name": "symbol", "type": "string"},
    {"name": "price", "type": "double"},
    {"name": "volume", "type": "long"},
    {"name": "bid", "type": "double"},
    {"name": "ask", "type": "double"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "exchange", "type": "string"},
    {"name": "event_type", "type": {"type": "enum", "name": "EventType",
      "symbols": ["TRADE", "QUOTE", "BBO"]}}
  ]
}
"""

class MarketDataProducer:
    def __init__(self, bootstrap_servers: str, schema_registry_url: str):
        self.schema_registry = SchemaRegistryClient(
            {"url": schema_registry_url}
        )
        self.avro_serializer = AvroSerializer(
            self.schema_registry, MARKET_DATA_SCHEMA
        )

        self.producer = Producer({
            "bootstrap.servers": bootstrap_servers,
            "acks": "all",                # 全ISRレプリカ確認(耐久性)
            "linger.ms": 1,               # 1msバッチ待機(遅延vsスループット)
            "batch.size": 16384,          # 16KBバッチサイズ
            "compression.type": "lz4",    # LZ4圧縮(速度優先)
            "max.in.flight.requests.per.connection": 5,
            "enable.idempotence": True,   # 冪等性保証
            "retries": 3,
            "retry.backoff.ms": 100,
            "partitioner": "murmur2",     # シンボルベースパーティショニング
        })

    def send_market_data(self, symbol: str, data: dict):
        """市場データをKafkaへ送信"""
        try:
            self.producer.produce(
                topic="market-data-raw",
                key=symbol.encode("utf-8"),
                value=self.avro_serializer(
                    data,
                    SerializationContext("market-data-raw", MessageField.VALUE)
                ),
                callback=self._delivery_callback,
                timestamp=int(time.time() * 1000),
            )
            self.producer.poll(0)  # 非同期コールバック処理
        except KafkaError as e:
            print(f"Kafka produce error: {e}")

    def _delivery_callback(self, err, msg):
        if err:
            print(f"Delivery failed for {msg.key()}: {err}")
        # 本番ではメトリクス収集(Prometheusカウンター等)

    def flush(self):
        self.producer.flush(timeout=5)

トピック設計戦略

金融データのトピック設計において、パーティショニング戦略はパフォーマンスに直接影響する。

戦略パーティションキーメリットデメリット
シンボルベース銘柄コード同一銘柄の順序保証ホットシンボル偏り
取引所ベース取引所ID取引所別独立処理シンボル順序非保証
ハッシュベース複合キーハッシュ均等分散順序保証困難
タイムスタンプベース時間バケット時間順序処理パーティション数予測困難

本番環境ではシンボルベースパーティショニングをデフォルトとし、ホットシンボル(例:AAPL、TSLA等の取引量上位銘柄)に対してはサブパーティショニングを適用する。

Flinkストリーム処理

ストリーム処理フレームワーク比較

特性Apache FlinkSpark Structured StreamingKafka StreamsApache Storm
処理モデルネイティブストリームマイクロバッチネイティブストリームネイティブストリーム
遅延ミリ秒レベル100ms-数秒ミリ秒レベルミリ秒レベル
状態管理RocksDBベース大規模状態Sparkメモリ/ディスクローカルRocksDB外部ストア必要
Exactly-OnceチェックポイントベースチェックポイントベーストランザクションベースAt-Least-Once
SQLサポートFlink SQL(完全なSQL)Spark SQL(豊富なエコシステム)KSQL(限定的)非対応
デプロイモデルスタンドアロンクラスタSparkクラスタライブラリ(JVM組み込み)スタンドアロンクラスタ
イベントタイム処理ウォーターマークベース高度対応ウォーターマーク対応限定的対応限定的
適用用途低遅延CEP、金融リアルタイム処理大規模バッチ+ストリーム統合Kafkaネイティブマイクロサービスレガシーストリームシステム

金融ドメインではミリ秒レベルの遅延、大規模状態管理、イベントタイム処理が必須であるため、Apache Flinkが最も適している。

Flink SQLを使用すると、複雑なストリーム処理ロジックを宣言的に表現できる。

-- Kafkaソーステーブル定義
CREATE TABLE market_data_raw (
    symbol        STRING,
    price         DOUBLE,
    volume        BIGINT,
    bid           DOUBLE,
    ask           DOUBLE,
    event_time    TIMESTAMP(3),
    exchange      STRING,
    event_type    STRING,
    -- ウォーターマーク:最大5秒の遅延を許容
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'market-data-raw',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'properties.group.id' = 'flink-market-processor',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081',
    'scan.startup.mode' = 'latest-offset'
);

-- 1分間VWAP(出来高加重平均価格)集計
CREATE TABLE vwap_1m (
    symbol        STRING,
    window_start  TIMESTAMP(3),
    window_end    TIMESTAMP(3),
    vwap          DOUBLE,
    total_volume  BIGINT,
    trade_count   BIGINT,
    high_price    DOUBLE,
    low_price     DOUBLE,
    PRIMARY KEY (symbol, window_start) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://timescaledb:5432/market',
    'table-name' = 'vwap_1m'
);

INSERT INTO vwap_1m
SELECT
    symbol,
    window_start,
    window_end,
    SUM(price * volume) / SUM(volume) AS vwap,
    SUM(volume) AS total_volume,
    COUNT(*) AS trade_count,
    MAX(price) AS high_price,
    MIN(price) AS low_price
FROM TABLE(
    TUMBLE(TABLE market_data_raw, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
WHERE event_type = 'TRADE'
GROUP BY symbol, window_start, window_end;

ウィンドウ集計と分析

ウィンドウタイプ別活用

金融データ分析において、ウィンドウ戦略は分析目的に応じて異なる選択が求められる。

ウィンドウタイプ説明金融活用例
タンブリングウィンドウ固定サイズ、非重複1分/5分/1時間OHLCVローソク足生成
スライディングウィンドウ固定サイズ、重複許容移動平均(MA)、ボラティリティ計算
セッションウィンドウアクティビティベース動的サイズトレーダーセッション別取引分析
グローバルウィンドウカスタムトリガー場開始/終了イベントベース集計
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class MovingAverageJob {

    public static DataStream<PriceAlert> computeMovingAverage(
            DataStream<MarketData> marketStream) {

        return marketStream
            .keyBy(MarketData::getSymbol)
            .window(SlidingEventTimeWindows.of(
                Time.minutes(5),   // ウィンドウサイズ:5分
                Time.seconds(30)   // スライド間隔:30秒
            ))
            .process(new ProcessWindowFunction<MarketData, PriceAlert,
                     String, TimeWindow>() {
                @Override
                public void process(String symbol,
                        Context context,
                        Iterable<MarketData> elements,
                        Collector<PriceAlert> out) {

                    double sum = 0;
                    long count = 0;
                    double maxPrice = Double.MIN_VALUE;
                    double minPrice = Double.MAX_VALUE;

                    for (MarketData data : elements) {
                        sum += data.getPrice();
                        count++;
                        maxPrice = Math.max(maxPrice, data.getPrice());
                        minPrice = Math.min(minPrice, data.getPrice());
                    }

                    double ma = sum / count;
                    double volatility = (maxPrice - minPrice) / ma;

                    // ボラティリティが閾値を超えた場合アラート生成
                    if (volatility > 0.02) {
                        out.collect(new PriceAlert(
                            symbol,
                            ma,
                            volatility,
                            context.window().getEnd(),
                            "HIGH_VOLATILITY"
                        ));
                    }
                }
            });
    }
}

CDC連携

Debezium CDCコネクタ設定

レガシー金融システムのデータベース変更をリアルタイムでキャプチャし、Kafkaへ送信する。Debeziumはデータベーストランザクションログを読み取り、行レベルの変更をイベントに変換する。

# debezium-postgres-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: finance-cdc-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 3
  config:
    # データベース接続
    database.hostname: postgres-finance.internal
    database.port: '5432'
    database.user: cdc_reader
    database.password: 'VAULT_SECRET_REF'
    database.dbname: finance_core
    database.server.name: finance-cdc

    # CDC設定
    plugin.name: pgoutput
    slot.name: flink_cdc_slot
    publication.name: finance_pub

    # キャプチャ対象テーブル
    table.include.list: >
      public.orders,
      public.transactions,
      public.positions,
      public.account_balances

    # スキーマと変換
    key.converter: io.confluent.connect.avro.AvroConverter
    value.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry:8081
    value.converter.schema.registry.url: http://schema-registry:8081

    # スナップショットモード
    snapshot.mode: initial
    snapshot.locking.mode: none

    # ハートビート(WAL保持防止)
    heartbeat.interval.ms: '10000'

    # トピックルーティング
    transforms: route
    transforms.route.type: org.apache.kafka.connect.transforms.RegexRouter
    transforms.route.regex: 'finance-cdc.public.(.*)'
    transforms.route.replacement: 'cdc.$1'

    # 障害復旧
    errors.tolerance: all
    errors.deadletterqueue.topic.name: cdc-dlq
    errors.deadletterqueue.context.headers.enable: true

CDC適用時の注意事項

CDCを金融システムに適用する際に必ず考慮すべき事項がある。

  • トランザクション境界の保持: Debeziumのprovide.transaction.metadataオプションを有効にしてトランザクション単位でイベントをグループ化
  • スキーマ変更対応: DDL変更時にコネクタ再起動なしでスキーマ進化を処理できるよう、スキーマレジストリと互換性モードを設定
  • WALディスク使用量監視: コネクタ障害時にWALファイルが蓄積してディスク枯渇のリスクがあるため、ハートビートとスロット監視が必須
  • 順序保証: 同一レコードの変更順序が保証される必要があるため、テーブルPKをメッセージキーとして使用

異常取引検知

複合イベント処理(CEP)を使用すると、時系列イベントから特定のパターンを検知できる。以下は急激な価格変動と異常な取引量を検知するFlinkジョブである。

# PyFlinkを活用した異常取引検知
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit, call

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(10000)  # 10秒チェックポイント

t_env = StreamTableEnvironment.create(env)

# 異常取引検知SQL
anomaly_detection_sql = """
WITH price_stats AS (
    SELECT
        symbol,
        price,
        volume,
        event_time,
        -- 過去5分間の移動平均/標準偏差
        AVG(price) OVER w AS price_ma,
        STDDEV_POP(price) OVER w AS price_std,
        AVG(volume) OVER w AS volume_ma
    FROM market_data_raw
    WINDOW w AS (
        PARTITION BY symbol
        ORDER BY event_time
        RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW
    )
),
anomalies AS (
    SELECT
        symbol,
        price,
        volume,
        event_time,
        price_ma,
        price_std,
        volume_ma,
        -- Zスコアベースの異常スコア
        ABS(price - price_ma) / NULLIF(price_std, 0) AS price_zscore,
        CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) AS volume_ratio,
        CASE
            WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 3.0
                THEN 'PRICE_SPIKE'
            WHEN CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 5.0
                THEN 'VOLUME_SURGE'
            WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 2.0
                AND CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 3.0
                THEN 'COMBINED_ANOMALY'
            ELSE 'NORMAL'
        END AS anomaly_type
    FROM price_stats
)
SELECT * FROM anomalies
WHERE anomaly_type <> 'NORMAL'
"""

# アラートシンクテーブル(Kafkaトピックへ送信)
t_env.execute_sql("""
CREATE TABLE anomaly_alerts (
    symbol        STRING,
    price         DOUBLE,
    volume        BIGINT,
    event_time    TIMESTAMP(3),
    anomaly_type  STRING,
    price_zscore  DOUBLE,
    volume_ratio  DOUBLE
) WITH (
    'connector' = 'kafka',
    'topic' = 'anomaly-alerts',
    'properties.bootstrap.servers' = 'kafka-broker:9092',
    'format' = 'json'
)
""")

異常検知閾値基準

異常タイプ検知条件重大度対応措置
急激な価格変動(Price Spike)Zスコア3.0超過Critical即時アラート、自動取引停止
取引量急増(Volume Surge)平均比5倍超過Warning監視強化、担当者通知
複合異常(Combined)価格Z 2.0 + 取引量3倍Highポジションリスク再評価
スプレッド拡大(Spread Widening)Bid-Askスプレッド2倍超過Warning流動性モニタリング

低遅延最適化

メッセージブローカー比較(金融観点)

特性Apache KafkaSolace PubSub+TIBCO EMS29West (Informatica)
遅延2-10msマイクロ秒レベルミリ秒レベルマイクロ秒レベル
スループット毎秒数百万メッセージ毎秒数百万メッセージ毎秒数万メッセージ毎秒数百万メッセージ
耐久性ディスクベースログメモリ+ディスクディスクベースメモリベース
プロトコル独自プロトコル、RESTAMQP、MQTT、RESTJMS、AMQPマルチキャストUDP
コストオープンソース商用ライセンス商用ライセンス商用ライセンス
適用用途イベントソーシング、汎用ストリーミングハイブリッドクラウド金融レガシー金融統合超低遅延トレーディング

Kafka低遅延チューニングチェックリスト

  1. OSレベル: ページキャッシュ最適化、vm.swappiness=1、専用ディスク(NVMe SSD)
  2. JVMレベル: G1GCまたはZGC使用、ヒープサイズ6-8GB、GCログ監視
  3. Kafkaブローカー: num.io.threads増加、log.flush.interval.messages=1(耐久性優先時)
  4. ネットワーク: TCPソケットバッファサイズ調整(socket.send.buffer.bytessocket.receive.buffer.bytes
  5. プロデューサー: linger.ms=0-1batch.size最適化、compression.type=lz4
  6. コンシューマー: fetch.min.bytes=1fetch.max.wait.ms最小化

Flinkパフォーマンス最適化

# flink-conf.yaml - 金融ワークロード最適化
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

# 状態バックエンド(大規模状態向けRocksDB)
state.backend: rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true

# チェックポイント設定
execution.checkpointing.interval: 10s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 5s
execution.checkpointing.timeout: 60s
execution.checkpointing.max-concurrent-checkpoints: 1

# ネットワークバッファ
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 128mb
taskmanager.network.memory.max: 1gb

# ウォーターマーク設定
pipeline.auto-watermark-interval: 200ms

# 再起動戦略
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 60s
restart-strategy.exponential-delay.backoff-multiplier: 2.0

障害復旧とデータ整合性

障害シナリオ別対応戦略

障害タイプ影響復旧戦略RTO/RPO
Kafkaブローカー障害パーティションリーダー変更ISR自動リーダー選出、min.insync.replicas=2RTO数秒、RPO 0
Flink TaskManager障害処理中断チェックポイントから自動復旧RTO 10-30秒、RPOチェックポイント間隔
ネットワーク断絶データ損失リスクプロデューサーリトライ、コンシューマーオフセットロールバックネットワーク復旧時間依存
スキーマレジストリ障害シリアライゼーション失敗ローカルキャッシュ活用、マルチインスタンスRTO数秒
CDCスロット喪失変更イベント欠落スナップショット再実行、イベント補正RTO数分-数時間

Exactly-Onceセマンティクスの現実

理論的にはExactly-Once処理を保証するが、実際には以下のようなエッジケースが存在する。

  • プロデューサータイムアウト: ブローカーでメッセージを受信したがACKがプロデューサーに到達しなかった場合、プロデューサーが再送信して重複が発生する可能性がある。enable.idempotence=trueの設定でこれを防止する。
  • Flinkチェックポイント失敗: チェックポイント完了前に障害が発生すると、最後に成功したチェックポイントへロールバックされる。チェックポイント間隔分のデータが再処理される。
  • バックプレッシャー蓄積: ダウンストリームの処理速度が遅いとKafkaコンシューマーのラグが増加し、極端な場合はメモリ不足でジョブが失敗する可能性がある。Flinkのクレジットベースフロー制御がこれを緩和するが、根本的には処理容量の確保が必要である。

データ損失防止パターン

[Producer]                [Kafka]               [Flink]
    |  -- produce(msg) -->   |                      |
    |  <-- ack (success) --  |                      |
    |                        |  -- consume(msg) -->  |
    |                        |                  [process]
    |                        |                  [checkpoint]
    |                        |  <-- commit offset -- |
    |                        |                      |
    |   === ブローカー障害 ===  |                      |
    |  -- produce(msg) -->   X (失敗)               |
    |  -- retry -->          | (新リーダー)          |
    |  <-- ack (success) --  |                      |

重要なのは、プロデューサーの冪等性 + KafkaのISRレプリケーション + Flinkのチェックポイントが三重でデータ整合性を保証することである。

運用チェックリスト

デプロイ前点検

  • Kafkaクラスタ最低3ブローカー、replication.factor=3min.insync.replicas=2
  • Flinkチェックポイントストレージを外部ストレージ(S3/HDFS)に設定
  • スキーマレジストリ互換性モード:BACKWARD(デフォルト)またはFULL
  • デッドレターキュー(DLQ)トピック作成と監視設定
  • ネットワーク遅延測定:プロデューサー-ブローカー間、ブローカー-コンシューマー間RTT測定

監視指標

指標説明警告閾値
Consumer Lagコンシューマーがプロデューサーより遅れているメッセージ数10,000以上
End-to-End Latencyプロデューサー送信からFlink処理完了まで100ms超過
Checkpoint DurationFlinkチェックポイント所要時間チェックポイント間隔の50%超過
Kafka ISR ShrinkISRから外れたレプリカ数1以上
GC Pause TimeJVM GC停止時間200ms超過
Backpressure RatioFlinkタスクのバックプレッシャー比率0.5超過

負荷テスト

本番デプロイ前に、想定ピークトラフィックの2-3倍の負荷でテストする必要がある。金融市場でブラックスワンイベント発生時には通常の10倍以上のトラフィックが発生する可能性があるため、十分な余裕容量の確保が不可欠である。

参考資料