- Published on
リアルタイム金融データパイプライン構築:Kafka、Flinkストリーミングアーキテクチャ実践ガイド
- Authors
- Name

はじめに
金融市場ではデータの速度がそのまま競争力となる。株式気配値、FX為替レート、債券利回り、デリバティブ価格など、毎秒数十万件のイベントが発生し、これをミリ秒単位で収集・処理して意思決定に反映する必要がある。バッチ処理ベースの従来型ETLパイプラインでは、このようなリアルタイム要件を満たすことができない。
Apache KafkaとApache Flinkは、リアルタイム金融データパイプラインの中核を成すコンポーネントである。Kafkaは高スループット・耐久性のあるメッセージブローカーとして市場データの収集と配信を担い、Flinkはステートフルストリーム処理エンジンとして複合イベント処理(CEP)、ウィンドウ集計、異常検知を実行する。
本記事では、リアルタイム金融データパイプラインのアーキテクチャ設計からKafkaプロデューサー実装、Flinkストリーム処理、CDC連携、異常取引検知、低遅延最適化、障害復旧戦略まで、実践的なコードとともに解説する。
注意: 本記事のコード例は教育目的であり、本番環境ではセキュリティ、規制準拠、パフォーマンスチューニング等の追加考慮が必要です。
金融データパイプラインアーキテクチャ
全体アーキテクチャ概要
リアルタイム金融データパイプラインは以下のレイヤー構造を持つ。
| レイヤー | 構成要素 | 役割 | 遅延目標 |
|---|---|---|---|
| 収集レイヤー | Kafka Producer、FIX Gateway | 市場データ・注文イベント収集 | 1ms以下 |
| メッセージングレイヤー | Apache Kafka | イベントバッファリング、ルーティング、耐久性保証 | 2-5ms |
| 処理レイヤー | Apache Flink | ストリーム処理、ウィンドウ集計、CEP | 10-50ms |
| ストレージレイヤー | TimescaleDB、Apache Iceberg | 時系列ストレージ、分析用データレイク | 100ms以下 |
| サービングレイヤー | Redis、gRPC API | リアルタイムダッシュボード、アラートサービス | 5ms以下 |
データフロー
取引所/データベンダー --> Kafka Producer --> Kafka Cluster --> Flink Job --> 分析/アラート
| |
CDC Connector TimescaleDB
| |
レガシーDB Iceberg Lake
コア設計原則は以下の通りである。
- イベントソーシング(Event Sourcing): すべての状態変更を不変のイベントとして記録
- バックプレッシャー伝播: 処理速度に合わせて収集速度を自動調整
- Exactly-Onceセマンティクス: 障害時でもデータの正確性を保証
- スキーマ進化(Schema Evolution): Avro/Protobufベースのスキーマレジストリを活用
Kafka市場データ収集
Kafkaプロデューサー実装
市場データを収集してKafkaトピックへ送信するプロデューサーを実装する。低遅延と耐久性を両立させるため、acks、linger.ms、batch.size等を細かくチューニングする。
from confluent_kafka import Producer, KafkaError
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
import time
# Avroスキーマ定義
MARKET_DATA_SCHEMA = """
{
"type": "record",
"name": "MarketData",
"namespace": "com.finance.marketdata",
"fields": [
{"name": "symbol", "type": "string"},
{"name": "price", "type": "double"},
{"name": "volume", "type": "long"},
{"name": "bid", "type": "double"},
{"name": "ask", "type": "double"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "exchange", "type": "string"},
{"name": "event_type", "type": {"type": "enum", "name": "EventType",
"symbols": ["TRADE", "QUOTE", "BBO"]}}
]
}
"""
class MarketDataProducer:
def __init__(self, bootstrap_servers: str, schema_registry_url: str):
self.schema_registry = SchemaRegistryClient(
{"url": schema_registry_url}
)
self.avro_serializer = AvroSerializer(
self.schema_registry, MARKET_DATA_SCHEMA
)
self.producer = Producer({
"bootstrap.servers": bootstrap_servers,
"acks": "all", # 全ISRレプリカ確認(耐久性)
"linger.ms": 1, # 1msバッチ待機(遅延vsスループット)
"batch.size": 16384, # 16KBバッチサイズ
"compression.type": "lz4", # LZ4圧縮(速度優先)
"max.in.flight.requests.per.connection": 5,
"enable.idempotence": True, # 冪等性保証
"retries": 3,
"retry.backoff.ms": 100,
"partitioner": "murmur2", # シンボルベースパーティショニング
})
def send_market_data(self, symbol: str, data: dict):
"""市場データをKafkaへ送信"""
try:
self.producer.produce(
topic="market-data-raw",
key=symbol.encode("utf-8"),
value=self.avro_serializer(
data,
SerializationContext("market-data-raw", MessageField.VALUE)
),
callback=self._delivery_callback,
timestamp=int(time.time() * 1000),
)
self.producer.poll(0) # 非同期コールバック処理
except KafkaError as e:
print(f"Kafka produce error: {e}")
def _delivery_callback(self, err, msg):
if err:
print(f"Delivery failed for {msg.key()}: {err}")
# 本番ではメトリクス収集(Prometheusカウンター等)
def flush(self):
self.producer.flush(timeout=5)
トピック設計戦略
金融データのトピック設計において、パーティショニング戦略はパフォーマンスに直接影響する。
| 戦略 | パーティションキー | メリット | デメリット |
|---|---|---|---|
| シンボルベース | 銘柄コード | 同一銘柄の順序保証 | ホットシンボル偏り |
| 取引所ベース | 取引所ID | 取引所別独立処理 | シンボル順序非保証 |
| ハッシュベース | 複合キーハッシュ | 均等分散 | 順序保証困難 |
| タイムスタンプベース | 時間バケット | 時間順序処理 | パーティション数予測困難 |
本番環境ではシンボルベースパーティショニングをデフォルトとし、ホットシンボル(例:AAPL、TSLA等の取引量上位銘柄)に対してはサブパーティショニングを適用する。
Flinkストリーム処理
ストリーム処理フレームワーク比較
| 特性 | Apache Flink | Spark Structured Streaming | Kafka Streams | Apache Storm |
|---|---|---|---|---|
| 処理モデル | ネイティブストリーム | マイクロバッチ | ネイティブストリーム | ネイティブストリーム |
| 遅延 | ミリ秒レベル | 100ms-数秒 | ミリ秒レベル | ミリ秒レベル |
| 状態管理 | RocksDBベース大規模状態 | Sparkメモリ/ディスク | ローカルRocksDB | 外部ストア必要 |
| Exactly-Once | チェックポイントベース | チェックポイントベース | トランザクションベース | At-Least-Once |
| SQLサポート | Flink SQL(完全なSQL) | Spark SQL(豊富なエコシステム) | KSQL(限定的) | 非対応 |
| デプロイモデル | スタンドアロンクラスタ | Sparkクラスタ | ライブラリ(JVM組み込み) | スタンドアロンクラスタ |
| イベントタイム処理 | ウォーターマークベース高度対応 | ウォーターマーク対応 | 限定的対応 | 限定的 |
| 適用用途 | 低遅延CEP、金融リアルタイム処理 | 大規模バッチ+ストリーム統合 | Kafkaネイティブマイクロサービス | レガシーストリームシステム |
金融ドメインではミリ秒レベルの遅延、大規模状態管理、イベントタイム処理が必須であるため、Apache Flinkが最も適している。
Flink SQLによるリアルタイム市場データ処理
Flink SQLを使用すると、複雑なストリーム処理ロジックを宣言的に表現できる。
-- Kafkaソーステーブル定義
CREATE TABLE market_data_raw (
symbol STRING,
price DOUBLE,
volume BIGINT,
bid DOUBLE,
ask DOUBLE,
event_time TIMESTAMP(3),
exchange STRING,
event_type STRING,
-- ウォーターマーク:最大5秒の遅延を許容
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'market-data-raw',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'properties.group.id' = 'flink-market-processor',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081',
'scan.startup.mode' = 'latest-offset'
);
-- 1分間VWAP(出来高加重平均価格)集計
CREATE TABLE vwap_1m (
symbol STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
vwap DOUBLE,
total_volume BIGINT,
trade_count BIGINT,
high_price DOUBLE,
low_price DOUBLE,
PRIMARY KEY (symbol, window_start) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://timescaledb:5432/market',
'table-name' = 'vwap_1m'
);
INSERT INTO vwap_1m
SELECT
symbol,
window_start,
window_end,
SUM(price * volume) / SUM(volume) AS vwap,
SUM(volume) AS total_volume,
COUNT(*) AS trade_count,
MAX(price) AS high_price,
MIN(price) AS low_price
FROM TABLE(
TUMBLE(TABLE market_data_raw, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
)
WHERE event_type = 'TRADE'
GROUP BY symbol, window_start, window_end;
ウィンドウ集計と分析
ウィンドウタイプ別活用
金融データ分析において、ウィンドウ戦略は分析目的に応じて異なる選択が求められる。
| ウィンドウタイプ | 説明 | 金融活用例 |
|---|---|---|
| タンブリングウィンドウ | 固定サイズ、非重複 | 1分/5分/1時間OHLCVローソク足生成 |
| スライディングウィンドウ | 固定サイズ、重複許容 | 移動平均(MA)、ボラティリティ計算 |
| セッションウィンドウ | アクティビティベース動的サイズ | トレーダーセッション別取引分析 |
| グローバルウィンドウ | カスタムトリガー | 場開始/終了イベントベース集計 |
Flink Javaによるスライディングウィンドウ移動平均実装
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class MovingAverageJob {
public static DataStream<PriceAlert> computeMovingAverage(
DataStream<MarketData> marketStream) {
return marketStream
.keyBy(MarketData::getSymbol)
.window(SlidingEventTimeWindows.of(
Time.minutes(5), // ウィンドウサイズ:5分
Time.seconds(30) // スライド間隔:30秒
))
.process(new ProcessWindowFunction<MarketData, PriceAlert,
String, TimeWindow>() {
@Override
public void process(String symbol,
Context context,
Iterable<MarketData> elements,
Collector<PriceAlert> out) {
double sum = 0;
long count = 0;
double maxPrice = Double.MIN_VALUE;
double minPrice = Double.MAX_VALUE;
for (MarketData data : elements) {
sum += data.getPrice();
count++;
maxPrice = Math.max(maxPrice, data.getPrice());
minPrice = Math.min(minPrice, data.getPrice());
}
double ma = sum / count;
double volatility = (maxPrice - minPrice) / ma;
// ボラティリティが閾値を超えた場合アラート生成
if (volatility > 0.02) {
out.collect(new PriceAlert(
symbol,
ma,
volatility,
context.window().getEnd(),
"HIGH_VOLATILITY"
));
}
}
});
}
}
CDC連携
Debezium CDCコネクタ設定
レガシー金融システムのデータベース変更をリアルタイムでキャプチャし、Kafkaへ送信する。Debeziumはデータベーストランザクションログを読み取り、行レベルの変更をイベントに変換する。
# debezium-postgres-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: finance-cdc-connector
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 3
config:
# データベース接続
database.hostname: postgres-finance.internal
database.port: '5432'
database.user: cdc_reader
database.password: 'VAULT_SECRET_REF'
database.dbname: finance_core
database.server.name: finance-cdc
# CDC設定
plugin.name: pgoutput
slot.name: flink_cdc_slot
publication.name: finance_pub
# キャプチャ対象テーブル
table.include.list: >
public.orders,
public.transactions,
public.positions,
public.account_balances
# スキーマと変換
key.converter: io.confluent.connect.avro.AvroConverter
value.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schema-registry:8081
value.converter.schema.registry.url: http://schema-registry:8081
# スナップショットモード
snapshot.mode: initial
snapshot.locking.mode: none
# ハートビート(WAL保持防止)
heartbeat.interval.ms: '10000'
# トピックルーティング
transforms: route
transforms.route.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex: 'finance-cdc.public.(.*)'
transforms.route.replacement: 'cdc.$1'
# 障害復旧
errors.tolerance: all
errors.deadletterqueue.topic.name: cdc-dlq
errors.deadletterqueue.context.headers.enable: true
CDC適用時の注意事項
CDCを金融システムに適用する際に必ず考慮すべき事項がある。
- トランザクション境界の保持: Debeziumの
provide.transaction.metadataオプションを有効にしてトランザクション単位でイベントをグループ化 - スキーマ変更対応: DDL変更時にコネクタ再起動なしでスキーマ進化を処理できるよう、スキーマレジストリと互換性モードを設定
- WALディスク使用量監視: コネクタ障害時にWALファイルが蓄積してディスク枯渇のリスクがあるため、ハートビートとスロット監視が必須
- 順序保証: 同一レコードの変更順序が保証される必要があるため、テーブルPKをメッセージキーとして使用
異常取引検知
Flink CEPによる異常パターン検知
複合イベント処理(CEP)を使用すると、時系列イベントから特定のパターンを検知できる。以下は急激な価格変動と異常な取引量を検知するFlinkジョブである。
# PyFlinkを活用した異常取引検知
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col, lit, call
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(10000) # 10秒チェックポイント
t_env = StreamTableEnvironment.create(env)
# 異常取引検知SQL
anomaly_detection_sql = """
WITH price_stats AS (
SELECT
symbol,
price,
volume,
event_time,
-- 過去5分間の移動平均/標準偏差
AVG(price) OVER w AS price_ma,
STDDEV_POP(price) OVER w AS price_std,
AVG(volume) OVER w AS volume_ma
FROM market_data_raw
WINDOW w AS (
PARTITION BY symbol
ORDER BY event_time
RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW
)
),
anomalies AS (
SELECT
symbol,
price,
volume,
event_time,
price_ma,
price_std,
volume_ma,
-- Zスコアベースの異常スコア
ABS(price - price_ma) / NULLIF(price_std, 0) AS price_zscore,
CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) AS volume_ratio,
CASE
WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 3.0
THEN 'PRICE_SPIKE'
WHEN CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 5.0
THEN 'VOLUME_SURGE'
WHEN ABS(price - price_ma) / NULLIF(price_std, 0) > 2.0
AND CAST(volume AS DOUBLE) / NULLIF(volume_ma, 0) > 3.0
THEN 'COMBINED_ANOMALY'
ELSE 'NORMAL'
END AS anomaly_type
FROM price_stats
)
SELECT * FROM anomalies
WHERE anomaly_type <> 'NORMAL'
"""
# アラートシンクテーブル(Kafkaトピックへ送信)
t_env.execute_sql("""
CREATE TABLE anomaly_alerts (
symbol STRING,
price DOUBLE,
volume BIGINT,
event_time TIMESTAMP(3),
anomaly_type STRING,
price_zscore DOUBLE,
volume_ratio DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'anomaly-alerts',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'format' = 'json'
)
""")
異常検知閾値基準
| 異常タイプ | 検知条件 | 重大度 | 対応措置 |
|---|---|---|---|
| 急激な価格変動(Price Spike) | Zスコア3.0超過 | Critical | 即時アラート、自動取引停止 |
| 取引量急増(Volume Surge) | 平均比5倍超過 | Warning | 監視強化、担当者通知 |
| 複合異常(Combined) | 価格Z 2.0 + 取引量3倍 | High | ポジションリスク再評価 |
| スプレッド拡大(Spread Widening) | Bid-Askスプレッド2倍超過 | Warning | 流動性モニタリング |
低遅延最適化
メッセージブローカー比較(金融観点)
| 特性 | Apache Kafka | Solace PubSub+ | TIBCO EMS | 29West (Informatica) |
|---|---|---|---|---|
| 遅延 | 2-10ms | マイクロ秒レベル | ミリ秒レベル | マイクロ秒レベル |
| スループット | 毎秒数百万メッセージ | 毎秒数百万メッセージ | 毎秒数万メッセージ | 毎秒数百万メッセージ |
| 耐久性 | ディスクベースログ | メモリ+ディスク | ディスクベース | メモリベース |
| プロトコル | 独自プロトコル、REST | AMQP、MQTT、REST | JMS、AMQP | マルチキャストUDP |
| コスト | オープンソース | 商用ライセンス | 商用ライセンス | 商用ライセンス |
| 適用用途 | イベントソーシング、汎用ストリーミング | ハイブリッドクラウド金融 | レガシー金融統合 | 超低遅延トレーディング |
Kafka低遅延チューニングチェックリスト
- OSレベル: ページキャッシュ最適化、
vm.swappiness=1、専用ディスク(NVMe SSD) - JVMレベル: G1GCまたはZGC使用、ヒープサイズ6-8GB、GCログ監視
- Kafkaブローカー:
num.io.threads増加、log.flush.interval.messages=1(耐久性優先時) - ネットワーク: TCPソケットバッファサイズ調整(
socket.send.buffer.bytes、socket.receive.buffer.bytes) - プロデューサー:
linger.ms=0-1、batch.size最適化、compression.type=lz4 - コンシューマー:
fetch.min.bytes=1、fetch.max.wait.ms最小化
Flinkパフォーマンス最適化
# flink-conf.yaml - 金融ワークロード最適化
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4
# 状態バックエンド(大規模状態向けRocksDB)
state.backend: rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
# チェックポイント設定
execution.checkpointing.interval: 10s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 5s
execution.checkpointing.timeout: 60s
execution.checkpointing.max-concurrent-checkpoints: 1
# ネットワークバッファ
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 128mb
taskmanager.network.memory.max: 1gb
# ウォーターマーク設定
pipeline.auto-watermark-interval: 200ms
# 再起動戦略
restart-strategy: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 1s
restart-strategy.exponential-delay.max-backoff: 60s
restart-strategy.exponential-delay.backoff-multiplier: 2.0
障害復旧とデータ整合性
障害シナリオ別対応戦略
| 障害タイプ | 影響 | 復旧戦略 | RTO/RPO |
|---|---|---|---|
| Kafkaブローカー障害 | パーティションリーダー変更 | ISR自動リーダー選出、min.insync.replicas=2 | RTO数秒、RPO 0 |
| Flink TaskManager障害 | 処理中断 | チェックポイントから自動復旧 | RTO 10-30秒、RPOチェックポイント間隔 |
| ネットワーク断絶 | データ損失リスク | プロデューサーリトライ、コンシューマーオフセットロールバック | ネットワーク復旧時間依存 |
| スキーマレジストリ障害 | シリアライゼーション失敗 | ローカルキャッシュ活用、マルチインスタンス | RTO数秒 |
| CDCスロット喪失 | 変更イベント欠落 | スナップショット再実行、イベント補正 | RTO数分-数時間 |
Exactly-Onceセマンティクスの現実
理論的にはExactly-Once処理を保証するが、実際には以下のようなエッジケースが存在する。
- プロデューサータイムアウト: ブローカーでメッセージを受信したがACKがプロデューサーに到達しなかった場合、プロデューサーが再送信して重複が発生する可能性がある。
enable.idempotence=trueの設定でこれを防止する。 - Flinkチェックポイント失敗: チェックポイント完了前に障害が発生すると、最後に成功したチェックポイントへロールバックされる。チェックポイント間隔分のデータが再処理される。
- バックプレッシャー蓄積: ダウンストリームの処理速度が遅いとKafkaコンシューマーのラグが増加し、極端な場合はメモリ不足でジョブが失敗する可能性がある。Flinkのクレジットベースフロー制御がこれを緩和するが、根本的には処理容量の確保が必要である。
データ損失防止パターン
[Producer] [Kafka] [Flink]
| -- produce(msg) --> | |
| <-- ack (success) -- | |
| | -- consume(msg) --> |
| | [process]
| | [checkpoint]
| | <-- commit offset -- |
| | |
| === ブローカー障害 === | |
| -- produce(msg) --> X (失敗) |
| -- retry --> | (新リーダー) |
| <-- ack (success) -- | |
重要なのは、プロデューサーの冪等性 + KafkaのISRレプリケーション + Flinkのチェックポイントが三重でデータ整合性を保証することである。
運用チェックリスト
デプロイ前点検
- Kafkaクラスタ最低3ブローカー、
replication.factor=3、min.insync.replicas=2 - Flinkチェックポイントストレージを外部ストレージ(S3/HDFS)に設定
- スキーマレジストリ互換性モード:
BACKWARD(デフォルト)またはFULL - デッドレターキュー(DLQ)トピック作成と監視設定
- ネットワーク遅延測定:プロデューサー-ブローカー間、ブローカー-コンシューマー間RTT測定
監視指標
| 指標 | 説明 | 警告閾値 |
|---|---|---|
| Consumer Lag | コンシューマーがプロデューサーより遅れているメッセージ数 | 10,000以上 |
| End-to-End Latency | プロデューサー送信からFlink処理完了まで | 100ms超過 |
| Checkpoint Duration | Flinkチェックポイント所要時間 | チェックポイント間隔の50%超過 |
| Kafka ISR Shrink | ISRから外れたレプリカ数 | 1以上 |
| GC Pause Time | JVM GC停止時間 | 200ms超過 |
| Backpressure Ratio | Flinkタスクのバックプレッシャー比率 | 0.5超過 |
負荷テスト
本番デプロイ前に、想定ピークトラフィックの2-3倍の負荷でテストする必要がある。金融市場でブラックスワンイベント発生時には通常の10倍以上のトラフィックが発生する可能性があるため、十分な余裕容量の確保が不可欠である。
参考資料
- Apache Kafka Documentation - Design
- Apache Flink - Use Cases
- Confluent - How a Tier-1 Bank Tuned Apache Kafka for p99 Latency for Trading
- Debezium Documentation - PostgreSQL Connector
- Kai Waehner - Top Trends for Data Streaming with Apache Kafka and Flink in 2026
- DZone - Designing Low-Latency Market Data Systems
- Confluent - Apache Flink Stream Processing Use Cases
- Onehouse - Flink vs Kafka Streams vs Spark Structured Streaming Comparison