Skip to content
Published on

스트림 처리 완전 가이드 2025: Apache Flink, Watermark, Checkpointing, Exactly-Once, Event Time 심층 분석

Authors

들어가며: 시간과 순서의 미묘한 문제

한 가지 질문에서 시작하자

"지난 1시간 동안 일어난 이벤트의 합을 실시간으로 보여줘."

쉬워 보인다. 그러나 다음 사실들을 고려하면?

  • 이벤트가 네트워크 지연으로 늦게 도착할 수 있다.
  • 서버가 다른 시간대에 있다.
  • 모바일 앱이 오프라인이었다가 다시 연결되면서 한꺼번에 올림.
  • 같은 이벤트가 재전송되어 중복될 수 있다.
  • 처리 도중 서버가 크래시한다.
  • Kafka 파티션 간 순서가 다르다.

"1시간"은 발생 시간(event time) 인가 아니면 처리 시간(processing time) 인가? 늦게 도착한 이벤트는 어떻게 처리? 재시작 후 이미 처리한 이벤트를 또 처리하면?

이 모든 문제를 해결하는 것이 현대 스트림 처리 엔진이다. 대표주자가 Apache Flink, 그리고 Kafka Streams, Apache Spark Structured Streaming, Google Dataflow다.

이 글에서 다룰 것

  1. Stream processing의 본질: Batch vs Stream.
  2. Event Time vs Processing Time: 시간의 두 가지 개념.
  3. Watermark: 시간의 진행을 추적.
  4. Windowing: 무한 스트림에서 유한 구간 정의.
  5. Checkpointing: Chandy-Lamport 알고리즘.
  6. Exactly-once: 정확히 한 번 처리의 비밀.
  7. State backend: RocksDB 기반 상태 관리.
  8. Savepoint: 업그레이드와 디버깅.

왜 이걸 알아야 하는가: 실시간 분석, 사기 탐지, 추천 시스템, 모니터링 — 모든 "지금 무슨 일이 일어나는가"를 다루는 시스템이 스트림 처리 위에 있다. Flink만 이해해도 Kafka Streams, Spark Streaming, Dataflow 모두 공통 개념을 공유한다.


1. Batch vs Stream: 근본적 차이

Batch 처리

전통적 데이터 처리:

매일 자정에 실행:
1. 어제 데이터 전부 로드.
2. 처리 (집계, 변환, 분석).
3. 결과 저장.
4. 내일 다시.

장점:

  • 단순. 시작과 끝이 명확.
  • 데이터가 유한. 한 번에 다 볼 수 있음.
  • 실패 시 재실행 쉬움.
  • Map-Reduce, Spark, Hive가 강력.

단점:

  • 지연: 데이터 발생 후 몇 시간~하루 뒤 결과.
  • 실시간 의사결정 불가.

Stream 처리

데이터가 도착하는 순간마다:
1. 이벤트를 받음.
2. 즉시 처리.
3. 결과를 실시간으로 갱신.
4. 영원히 계속.

장점:

  • 저지연: 밀리초~초 단위 응답.
  • 실시간 의사결정.
  • 지속적 업데이트.

단점:

  • 복잡: 늦게 오는 데이터, 시계 문제.
  • 시작과 끝이 불분명.
  • 실패 시 복구 어려움.

실전의 선택

둘 다 필요한 경우가 많다:

  • Lambda Architecture: Batch + Stream 병행. 복잡.
  • Kappa Architecture: 모두 Stream으로 통합. 더 단순.

Flink는 Streaming-first로 설계되어 둘 다 처리한다. Batch는 "유한한 스트림"으로 본다.


2. Event Time vs Processing Time

두 가지 시간

스트림 처리에서 "시간"은 애매한 단어다. 최소 세 가지 시간이 존재한다:

  1. Event Time: 이벤트가 실제로 발생한 시간. 데이터 자체에 포함.
  2. Ingestion Time: 이벤트가 시스템에 들어온 시간.
  3. Processing Time: 이벤트가 실제 처리된 시간. 계산 중인 벽시계.

왜 중요한가

질문: "오후 3시에 일어난 이벤트들을 집계해라."

  • Processing time 기준: "오후 3시에 내가 처리한 것들". 이는 실제 이벤트 발생 시간과 무관.
  • Event time 기준: "실제로 오후 3시에 일어난 것들". 언제 처리되었는지는 상관없음.

예시: 네트워크 지연으로 이벤트 X가 오후 3시 5분에 처리됨. 하지만 X의 실제 발생은 오후 2시 58분이었다면?

  • Processing time: 3시대에 속함.
  • Event time: 2시대에 속함.

비즈니스 논리는 event time을 원한다. "오후 3시의 CPU 사용량" 같은 건 실제 3시의 측정값을 말한다.

Event Time의 도전

Event time을 쓰려면:

  1. 데이터에 timestamp 포함: 이벤트 생성 시 시간 기록.
  2. 순서 보장 안 됨: 늦게 오는 이벤트 대비.
  3. 진행 추적: "이제 3시 이전 데이터는 다 왔다"를 어떻게 아는가?
  4. 세션 경계: 유저 세션은 event time 기반이어야.

Flink는 event time을 기본으로 한다. 이것이 Flink의 가장 큰 강점 중 하나.

실전 예시

// Flink에서 event time timestamp 추출
stream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) -> event.getEventTime())
    )
  • forBoundedOutOfOrderness(5s): 5초 이내 out-of-order 허용.
  • withTimestampAssigner: 이벤트에서 timestamp 추출.

이 설정이 watermark 생성의 기반이다.


3. Watermark: 시간의 진행을 추적

문제

Event time으로 집계할 때:

SELECT COUNT(*) FROM events WHERE event_time BETWEEN '15:00' AND '16:00'

언제 이 집계를 출력해야 하는가? 영원히 기다릴 순 없다. 늦게 오는 이벤트가 있을지도 모르니까.

해결: Watermark

Watermark란

Watermark는 "이 시간 이전의 데이터는 거의 다 왔다"는 추정이다. 시스템이 "시간이 여기까지 흘렀다"고 선언하는 신호.

Event stream (out-of-order):
12:05, 12:01, 12:03, 12:08, 12:02, 12:10, 12:07, ...

Watermark: 12:05
"12:05 이전의 데이터는 대부분 도착했다"
12:00~12:05 구간 집계 확정 가능

Watermark는 시간의 최소 보장 지점이다. 이후 올 데이터는 대부분 watermark보다 큰 시간을 가질 것이다.

Perfect vs Heuristic Watermark

Perfect watermark: 이 시간 이전의 모든 데이터가 도착함을 보장. 실전에선 거의 불가능 (네트워크 장애, 무한 지연).

Heuristic watermark: 이 시간 이전의 데이터가 대부분 도착했을 것이라는 추정. 실전에서 씀.

Watermark 생성 전략

1. Periodic Watermark 주기적으로 watermark 방출:

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))

"최근 본 event time - 10초"를 watermark로 발행. 10초 지연 허용.

2. Punctuated Watermark 특정 이벤트에 watermark 포함:

WatermarkStrategy
    .<Event>forGenerator(() -> new PunctuatedGenerator());

3. Monotonic Watermark 완벽하게 순서대로 오는 경우 (매우 드묾):

WatermarkStrategy.<Event>forMonotonousTimestamps();

Late Data 처리

Watermark가 t에 도달한 후 t 이전의 이벤트가 오면? Late data.

옵션:

  • Drop: 무시 (기본).
  • Side output: 별도 스트림으로 보내 따로 처리.
  • Allowed lateness: 일정 기간 내 늦은 데이터도 받아 기존 결과 갱신.
stream
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .sideOutputLateData(lateOutputTag)
    .sum("value");

Watermark의 tradeoff

  • 너무 공격적 (작은 out-of-order 허용): 빠른 결과, 하지만 많은 late data → 부정확.
  • 너무 보수적 (큰 out-of-order 허용): 느린 결과, 하지만 정확.

실전 선택:

  • Real-time 대시보드: 공격적 (5~30초).
  • 정확한 과금: 보수적 (수 분~수 시간).
  • 사기 탐지: 중간 (1~5분).

4. Windowing: 무한을 유한으로

문제

스트림은 무한하다. "모든 이벤트의 합"은 절대 완료되지 않는다. 그래서 유한한 구간(window) 을 정의해서 집계한다.

Window 종류

Tumbling Window (고정 구간):

|--W1--|--W2--|--W3--|
  • 겹치지 않음.
  • 매 N분마다.
  • "매 5분의 평균".
stream.window(TumblingEventTimeWindows.of(Time.minutes(5)))

Sliding Window (슬라이딩):

|--W1--|
   |--W2--|
      |--W3--|
  • 겹침.
  • 예: 10분 창을 1분마다.
  • "최근 10분 동안의 평균을 1분마다 갱신".
stream.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))

Session Window (세션 기반):

|--S1--|     |--S2--|  |S3|
      gap         gap
  • 활동이 없는 gap로 구분.
  • 사용자 세션 분석에 적합.
stream.window(EventTimeSessionWindows.withGap(Time.minutes(10)))

Window의 작동

Flink는 각 window를 상태로 유지한다:

  1. 이벤트가 도착하면 해당 window에 추가.
  2. Window의 끝(경계)에서 집계 계산.
  3. Watermark가 window 끝을 지나면 출력.

Session Window의 특이성

Session window는 동적이다. 이벤트 도착에 따라 window가 merge되거나 확장된다:

이벤트 도착: 10:00
→ 새 window: [10:00, 10:10]  (10분 gap)

이벤트 도착: 10:05
→ 기존 window 확장: [10:00, 10:15]

이벤트 도착: 10:30
→ 새 window: [10:30, 10:40]

이벤트 도착: 10:20 (late)
→ 기존 두 window 사이. 어떻게?
→ 두 window를 merge: [10:00, 10:40]

복잡하지만 Flink가 처리한다.

Aggregation 함수

Window에 적용할 연산:

  • sum, min, max, count, avg
  • 사용자 정의 reduce 또는 aggregate
  • ProcessWindowFunction (가장 유연, 하지만 메모리 사용)
  • AggregateFunction (incremental, 효율적)

Incremental aggregation:

public class AverageAggregate implements AggregateFunction<Input, Accumulator, Double> {
    public Accumulator createAccumulator() { ... }
    public Accumulator add(Input value, Accumulator acc) { ... }
    public Double getResult(Accumulator acc) { ... }
    public Accumulator merge(Accumulator a, Accumulator b) { ... }
}

이벤트가 올 때마다 증분 업데이트. Window 끝에 결과만 방출. 메모리 효율적.


5. State: 스트림 처리의 심장

왜 State인가

순수 함수는 "입력 → 출력"이다. 그러나 스트림 처리는 상태를 기억해야 한다:

  • "사용자별 누적 카운트".
  • "최근 10분의 평균".
  • "이 이벤트를 이전에 본 적 있는가?"
  • "현재 세션의 상태".

Keyed State vs Operator State

Keyed State: 특정 key별 상태. 가장 흔함.

// 사용자별 카운트
stream.keyBy(e -> e.userId)
      .process(new CountPerUser());

Flink가 userId별로 상태를 자동 분리. 각 TaskManager가 자기 key 범위의 상태만 관리.

Operator State: 병렬 인스턴스별 상태. 드물게 사용 (Kafka source의 offset 등).

State Primitive

Flink의 키 상태 타입:

  • ValueState: 단일 값. value(), update().
  • ListState: 리스트. add(), get(), clear().
  • MapState: 키-값 맵. put(), get(), remove().
  • ReducingState: reduce 함수 자동 적용.
  • AggregatingState: aggregate 함수 자동 적용.

State Backend

상태가 어디에 저장되는가?

HashMapStateBackend (이전 MemoryStateBackend):

  • 메모리에 Java 객체로.
  • 빠름.
  • 메모리 제약.
  • 중소 규모 용도.

EmbeddedRocksDBStateBackend:

  • RocksDB (Facebook의 LSM-tree 기반 KV 스토어)에 저장.
  • 디스크 기반: 수 TB 가능.
  • 느리지만 확장성.
  • 대규모 용도.

RocksDB State Backend 상세

RocksDB는 Flink 대규모 상태 관리의 표준:

  • LSM-tree: 순차 쓰기로 빠른 인서트.
  • Compaction: 백그라운드 정리.
  • Bloom filter: 빠른 존재 확인.
  • 압축: Snappy/LZ4로 저장 공간 절약.

Flink는 RocksDB를 각 TaskManager 로컬에 두고, checkpoint 시 원격 저장소(S3, HDFS)에 복사한다.

State 크기 관리

상태가 무한히 증가하면 문제. 관리 방법:

  1. TTL (Time-To-Live):
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(1))
    .setUpdateType(OnCreateAndWrite)
    .build();

1시간 지난 상태는 자동 삭제.

  1. 명시적 clear: 이벤트 처리 중 필요 없으면 직접 clear().

  2. Window 상태: Flink가 window 끝나면 자동 정리.

State와 Keyed Partitioning

keyBy는 stream을 key 기반으로 재분배한다:

Input: [(a, 1), (b, 2), (a, 3), (c, 4), (b, 5)]

keyBy(key):
TaskManager 1: a → [(a,1), (a,3)]
TaskManager 2: b → [(b,2), (b,5)]
TaskManager 3: c → [(c,4)]

같은 key는 항상 같은 TaskManager에 간다. 이 덕분에:

  • 상태 관리가 로컬에서.
  • 네트워크 전송 최소.
  • 확장 가능 (key 수에 비례).

주의: Skew. 한 key가 너무 많으면 그 TaskManager가 병목.


6. Checkpointing: Chandy-Lamport 알고리즘

왜 Checkpoint가 필요한가

스트림 처리는 무한히 실행된다. 서버 장애는 반드시 일어난다. 장애 후:

  • 진행 상태는?
  • 어느 이벤트까지 처리했나?
  • 집계 중인 상태는?

Checkpoint = 특정 시점의 전체 상태 스냅샷. 장애 시 이 시점부터 재개.

단순 방법의 문제

"모든 TaskManager가 동시에 상태 저장" — 어떻게 동기화?

  • 분산 시스템에서 정확한 동시성 불가능.
  • 네트워크 지연, 시계 오차.
  • TaskManager마다 다른 시점을 저장하면 불일치.

Chandy-Lamport 알고리즘

1985년 Chandy와 Lamport가 제안한 distributed snapshot 알고리즘. Flink의 checkpoint 기반.

아이디어: barrier라는 특수 마커를 데이터 스트림에 삽입. Barrier가 operator에 도달하면 그 시점의 상태를 저장.

Flink의 Barrier 기반 Checkpoint

단계:

  1. Trigger: JobManager가 checkpoint 요청.
  2. Source barrier injection: Source operator가 현재 위치에 barrier를 삽입.
  3. Barrier propagation: Barrier가 이벤트 스트림과 함께 흐름.
  4. Operator snapshot: Barrier가 operator에 도달하면:
    • 해당 operator의 상태를 저장.
    • Barrier를 downstream으로 전달.
  5. Sink confirmation: 모든 sink에 barrier 도달 시 checkpoint 완료.
  6. Acknowledge: JobManager에게 확인.
Source ----B----event----event----
         operator1 ----B----event----
                     operator2 ----B----
                                  sink

B = barrier
각 operator가 B 도달 시 상태 저장

Alignment의 문제

Operator에 여러 입력 스트림이 들어오면 (예: join):

  • 모든 입력의 barrier가 도착할 때까지 기다림 (alignment).
  • 그 동안 먼저 도착한 스트림의 이벤트를 버퍼링.
  • 모든 barrier 도착 → 상태 저장 → barrier 전파.

문제: Alignment 시간 동안 처리 지연. 대량 데이터면 버퍼 폭발.

Unaligned Checkpoint

Flink 1.11+의 개선: unaligned checkpoint. Barrier가 도착하면 즉시 아래로 전달, in-flight 이벤트도 상태에 포함.

  • 장점: Backpressure 상황에서도 빠른 checkpoint.
  • 단점: Checkpoint 크기 증가.

Checkpoint 빈도

너무 자주 → 오버헤드. 너무 드물게 → 장애 시 재처리 많음.

실전 권장: 10초 ~ 5분. 워크로드에 따라.

env.enableCheckpointing(60000);  // 1분마다
env.getCheckpointConfig().setCheckpointTimeout(600000);  // 10분 timeout
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);

실패 복구

Checkpoint 실패 시:

  1. 마지막 성공한 checkpoint 로드.
  2. Source는 해당 offset부터 재시작 (Kafka의 경우).
  3. Operator 상태를 복원.
  4. 처리 재개.

이는 at-least-once를 제공한다. Exactly-once를 위해선 추가 필요.


7. Exactly-Once: 정확히 한 번

약속 체계

스트림 처리의 전달 보장:

  • At-most-once: 한 번 이하 (중복 없음, 손실 가능).
  • At-least-once: 한 번 이상 (손실 없음, 중복 가능).
  • Exactly-once: 정확히 한 번 (가장 강함).

단순 재전송은 at-least-once. 장애 복구 후 이전에 처리한 이벤트를 다시 처리하게 된다.

Flink의 Exactly-Once

Flink는 두 종류의 exactly-once를 구분한다:

1. Internal exactly-once: Flink 내부의 상태 변경. Checkpoint 복구 시 정확히 한 번 상태에 반영. Barrier 알고리즘이 이를 보장.

2. End-to-end exactly-once: 외부 sink (Kafka, DB 등)까지 포함한 정확히 한 번. 더 어려움.

내부 Exactly-Once의 작동

이벤트 X가 처리됨 → 상태 업데이트 (메모리)
Checkpoint barrier 도착 → 상태 스냅샷 저장
장애 발생 → checkpoint 복구 → 상태 복원

만약 이벤트 X가 checkpoint 이전에 처리됐다면:
→ 복구 시 상태에 이미 반영됨
Source가 X를 재전송해도 무시됨? No, 재처리됨
→ 그러나 상태가 이미 X를 반영했으므로 결과는 같음

만약 이벤트 X가 checkpoint 이후에 처리됐다면:
→ 복구 시 상태에 없음
Source가 X 재전송
→ 처리 후 상태에 반영

핵심: 상태와 source offset이 같은 checkpoint에 저장. 따라서 일관성.

End-to-End Exactly-Once

외부 시스템 (sink)까지 포함할 때의 어려움:

Scenario: Flink가 Kafka에 결과를 쓴다.

Flink가 "X=100" 쓰기 → Kafka 전송 완료 → checkpoint 저장 전 장애
복구 → checkpoint에서 "X 아직 처리 안 됨"Flink가 다시 "X=100" 쓰기
Kafka에 "X=100"이 두 번

해결: Two-Phase Commit Protocol

Flink의 TwoPhaseCommitSinkFunction:

  1. Pre-commit: Sink에 쓰기 시작. 트랜잭션 시작 또는 임시 파일.
  2. Checkpoint barrier 도착: Flink가 pre-commit 상태를 저장.
  3. Checkpoint 완료: Sink가 commit (트랜잭션 커밋 또는 파일 이동).
  4. 장애 시: Checkpoint 복구 → pre-commit 상태 확인 → 아직 commit 안 됐으면 commit.

Kafka Producer의 Transactional API

Kafka 0.11+의 transactional producer가 이를 가능하게 한다:

producer.initTransactions();
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
// checkpoint 직전
producer.commitTransaction();  // 또는 abortTransaction()

Flink의 Kafka sink는 이를 활용해 end-to-end exactly-once를 제공한다.

JDBC 싱크의 처리

DB에 쓰기도 idempotent 또는 transactional로 만들어야:

  • Upsert 기반: Primary key로 INSERT ... ON CONFLICT UPDATE. Idempotent.
  • Transaction 기반: 각 batch를 transaction으로.

실전에서

Flink + Kafka의 조합은 end-to-end exactly-once가 기본. 다른 sink는 구현에 따라 다름.

중요: Exactly-once는 성능 비용이 있다. At-least-once + idempotent sink가 더 단순하고 빠를 수 있다.


8. Backpressure: 과부하 대응

Backpressure란

Source가 너무 빠르게 이벤트를 생성하고 downstream이 처리를 못 따라갈 때 발생.

단순 대응

  • 큐가 가득 참.
  • Source를 느리게.
  • 연쇄적으로 upstream 모두 느려짐.

Flink의 Credit-Based Flow Control

Flink 1.5+ 사용:

  1. 각 operator가 downstream에게 credit 요청.
  2. Downstream이 "N개 버퍼가 빈다"고 알려줌.
  3. Upstream은 그만큼만 전송.
  4. 버퍼 참 → credit 0 → 전송 중단.

효과: Operator 간 부드러운 backpressure. 극단적 상황 없음.

Backpressure 모니터링

Flink Web UI에서 실시간 확인:

  • HIGH: 해당 operator가 downstream 때문에 지연됨.
  • LOW: 정상.
  • OK: 여유 있음.

모든 operator가 HIGH면 sink나 외부 시스템이 병목. 중간 operator만 HIGH면 그 operator가 느림.

대응

  1. 병목 operator의 병렬성 증가.
  2. Operator의 로직 최적화.
  3. 외부 시스템 확장 (sink DB 등).
  4. Backpressure threshold 조정.

9. Savepoint: 업그레이드의 열쇠

Checkpoint vs Savepoint

  • Checkpoint: Flink 자동. 장애 복구용.
  • Savepoint: 사용자 트리거. 버전 업그레이드, 설정 변경 용.

Savepoint 사용

# Savepoint 생성
flink savepoint <jobId> s3://my-bucket/savepoints/

# 다른 버전의 job으로 재시작
flink run -s s3://my-bucket/savepoints/savepoint-xxx new-version.jar

Operator UID

Savepoint는 operator의 상태를 저장한다. 재시작 시 UID로 매칭.

stream
    .map(new MyMapper()).uid("my-mapper-uid")  // 명시적 UID
    .keyBy(...)
    .process(new MyProcess()).uid("my-process-uid")

UID를 지정하지 않으면 Flink가 자동 생성. 코드 변경 시 자동 UID가 바뀔 수 있음 → 상태 복구 실패.

교훈: 모든 stateful operator에 명시적 UID 설정.

Schema Evolution

Savepoint 이후 상태의 schema가 바뀌면?

  • 옛 상태: {name: String, age: Int}
  • 새 상태: {name: String, age: Int, email: String}

Flink는 특정 타입 (POJO, Avro)에 대해 schema evolution을 지원. 새 필드는 default value.

복잡한 변경은 수동 마이그레이션 필요.


전형적 파이프라인

// 1. Kafka source
DataStream<Event> source = env.fromSource(
    KafkaSource.<Event>builder()
        .setBootstrapServers("kafka:9092")
        .setTopics("input-topic")
        .setValueOnlyDeserializer(new EventDeserializer())
        .setStartingOffsets(OffsetsInitializer.committedOffsets())
        .build(),
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((e, t) -> e.getEventTime()),
    "kafka-source"
);

// 2. 처리
DataStream<Result> result = source
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new MyAggregator())
    .uid("my-aggregator");

// 3. Kafka sink
result.sinkTo(
    KafkaSink.<Result>builder()
        .setBootstrapServers("kafka:9092")
        .setRecordSerializer(...)
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .setTransactionalIdPrefix("flink-")
        .build()
).uid("kafka-sink");

// 4. Checkpoint
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

성능 튜닝

Parallelism:

env.setParallelism(10);  // 전역 기본
source.setParallelism(4);  // source는 Kafka partition 수와 맞춤

Network buffer:

env.setBufferTimeout(100);  // flush 간격 (ms)

State backend:

env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");

모니터링

Flink metrics을 Prometheus로:

  • Throughput (records/sec).
  • Latency (처리 지연).
  • Backpressure.
  • Checkpoint 시간과 크기.
  • State 크기.

디버깅

Flink Web UI:

  • Job 토폴로지.
  • Operator별 metric.
  • Checkpoint 히스토리.
  • Exception 스택 트레이스.

Kafka Streams

Kafka의 공식 stream processing 라이브러리:

장점:

  • Kafka 내장: 별도 클러스터 불필요.
  • 라이브러리: 자체 애플리케이션으로 배포.
  • 단순.

단점:

  • Kafka에 강하게 결합.
  • 대규모 상태 처리 약함 (로컬 RocksDB).
  • Flink보다 기능 적음.

선택: Kafka 중심, 소규모 처리 → Kafka Streams.

Spark Structured Streaming

Spark의 stream 처리:

장점:

  • Spark 통합: batch와 같은 API.
  • 큰 생태계.
  • ML 통합.

단점:

  • Micro-batching: 진정한 실시간 아님 (수백 ms 지연).
  • Event time 지원이 Flink만큼 강력하지 않음.

선택: 이미 Spark 쓰고 있으면 → Structured Streaming.

Apache Beam + Dataflow

Google의 스트림 처리:

장점:

  • 통합 모델: Batch와 stream 같은 API.
  • 멀티 러너: Flink, Spark 등에서 실행.
  • Cloud Dataflow: 관리형 서비스.

선택: Google Cloud 환경 → Dataflow.

Flink의 포지션

Flink 강점:

  • 진짜 스트리밍: micro-batch 없음.
  • Event time 1급 시민: Watermark, window.
  • Exactly-once: End-to-end.
  • 대규모 상태: RocksDB 기반.
  • Savepoint: 업그레이드 우아.

실전 선택:

  • 낮은 지연 + 정확한 event time: Flink.
  • Kafka 중심 소규모: Kafka Streams.
  • Spark 환경 확장: Structured Streaming.
  • Google Cloud: Dataflow.

12. 흔한 함정과 교훈

함정 1: Processing Time 오용

// 잘못된 예
stream.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))

Processing time 사용 시:

  • 재실행이 다른 결과: 실행 시간에 따라 windowing.
  • Backpressure 시 부정확.
  • 디버깅 어려움.

교훈: 거의 항상 event time 사용.

함정 2: Watermark 설정 오류

너무 짧은 out-of-order allowance:

  • 실제 네트워크 지연보다 짧으면 많은 late data.

너무 긴 allowance:

  • 결과가 너무 느림.

교훈: 실제 데이터를 측정해서 결정. Late data 비율 모니터링.

함정 3: Key Skew

stream.keyBy(e -> e.userId)

특정 사용자(VIP, 봇)가 이벤트의 90%를 차지하면? 그 key의 operator가 병목.

해결:

  • Pre-aggregation: source에서 먼저 집계.
  • Key salting: userId + "_" + randomSalt % 10.
  • 이후 full key로 재집계.

함정 4: State 폭발

TTL 없이 state를 유지하면 무한 증가.

해결: TTL 설정 + 주기적 cleanup.

함정 5: Checkpoint 실패

Checkpoint가 timeout 또는 실패:

  • State가 너무 커서.
  • 네트워크 혼잡.
  • Alignment 시간이 너무 김.

해결:

  • Unaligned checkpoint.
  • Incremental checkpoint (RocksDB).
  • 병렬성 증가.
  • State 크기 감소.

함정 6: UID 누락

명시적 UID 없이 배포 → 코드 변경 시 상태 복구 실패.

교훈: 모든 stateful operator에 uid 명시. CI에서 검증.


퀴즈로 복습하기

Q1. Event time과 processing time의 차이와, 왜 event time이 일반적으로 선호되는가?

A.

Event Time: 이벤트가 실제로 발생한 시간. 이벤트 데이터에 포함 (timestamp field).

Processing Time: 이벤트가 시스템에 의해 처리되는 시간. 계산하는 CPU의 벽시계.

예시: 모바일 앱이 오프라인에서 10개 이벤트 생성 (event time: 10:00~10:05). 이후 온라인되면서 한꺼번에 서버로 전송 (processing time: 10:30).

Processing time 기반 "오후 10시의 이벤트": 10:30에 도착한 10개 모두. 사실과 다름.

Event time 기반 "오후 10시의 이벤트": 10:00~10:05의 원래 그 시간에 속한 10개. 올바른 비즈니스 의미.

Event Time이 선호되는 이유:

  1. 일관성: 재처리해도 같은 결과. Processing time은 재실행 시 타이밍에 따라 다른 결과.

  2. 네트워크 지연 무관: 늦게 온 이벤트도 원래 시간으로 집계.

  3. Backpressure 안전: 처리가 느려져도 논리적 결과는 동일.

  4. 재생산 가능: 과거 이벤트 재처리 시 동일한 결과.

  5. 비즈니스 의미: 사용자, 센서, 거래의 "시간"은 발생 시점. 처리 시점이 아님.

Processing Time이 유용한 경우:

  1. 저지연이 전부: 알림, 단순 tail-log 집계.
  2. Event time 없음: 소스에 timestamp가 없음.
  3. 테스트/개발: 단순화.

대가: Event time은 watermark 관리, late data 처리, out-of-order 허용 등 복잡도가 추가된다. Flink는 이 복잡도를 추상화해서 event time을 쉽게 쓸 수 있게 한다.

결론: 거의 모든 실전 스트림 처리는 event time을 쓴다. Processing time은 예외적 경우만. Flink의 "event time 1급 시민" 철학이 다른 엔진(특히 Spark Structured Streaming 초기 버전)보다 강한 이유이다.

Q2. Watermark가 어떻게 "시간의 진행"을 표현하며, 너무 공격적/보수적 설정의 트레이드오프는?

A.

Watermark의 정의: "시간 T 이전의 이벤트는 거의 다 도착했다"는 시스템의 추정(selection). 이는 결정이 아니라 추정이다.

어떻게 작동하는가:

  1. Flink가 주기적으로 watermark를 생성 (또는 이벤트에서 추출).
  2. 가장 흔한 전략: WatermarkStrategy.forBoundedOutOfOrderness(duration).
    • "최근에 본 event time 중 최댓값 - duration"을 watermark로.
    • duration허용하는 out-of-order 범위.

예시:

  • 최근 본 이벤트 시간들: 10:05, 10:03, 10:07, 10:02, 10:10.
  • 최댓값: 10:10.
  • Duration = 5초.
  • Watermark: 10:10 - 5초 = 10:09:55.
  • 의미: "10:09:55 이전의 이벤트는 거의 다 왔을 것".
  1. Watermark가 operator에 도달하면:
    • Event time window가 닫힘 (해당 window의 집계 출력).
    • Timer 실행.

공격적 설정 (작은 duration, 예: 1초):

장점:

  • 빠른 결과 (거의 실시간).
  • 메모리 사용 적음 (window 빨리 닫힘).
  • 실시간 대시보드에 좋음.

단점:

  • Late data 많음: 실제 네트워크 지연이 1초를 넘으면 많은 이벤트가 watermark 이후 도착.
  • 부정확한 결과: Late data가 drop되면 손실.
  • Allowed lateness 필요: 추가 복잡도.

보수적 설정 (큰 duration, 예: 1시간):

장점:

  • 정확: 거의 모든 데이터 포함.
  • 네트워크 장애 여유.
  • 정산, 청구 시스템에 좋음.

단점:

  • 지연: 1시간 창의 결과가 1시간 + 1시간 = 2시간 후.
  • 메모리 사용: Window가 오래 열림.
  • 인간이 보기엔 느림.

실전 권장:

용도Duration
실시간 모니터링5~30초
사기 탐지1~5분
사용자 세션 분석10~30분
결제/청구1~4시간
법적 감사24시간+

튜닝 방법:

  1. Late data 비율 측정: 처음엔 관대하게 설정하고 late data 모니터링.
  2. 점진적 축소: Late data 비율이 낮으면 duration 감소.
  3. Allowed lateness 병행: 중요한 경우 window를 좀 더 열어두기.
  4. Side output: Late data를 따로 수집해 분석.

깨달음:

Watermark는 확실성과 지연의 트레이드오프다. 완벽한 정확성은 무한 지연이 필요하고, 즉시 결과는 부정확을 감수해야 한다. 엔지니어의 역할은 비즈니스 요구에 맞는 점을 찾는 것이다.

이 트레이드오프는 분산 시스템의 보편적 법칙(FLP impossibility, CAP theorem의 변형)이다. Watermark는 그 법칙을 실용적으로 우회하는 방법이다.

Q3. Chandy-Lamport 알고리즘이 어떻게 분산 상태의 "일관된 스냅샷"을 만드는가?

A. 이 알고리즘은 1985년 Chandy와 Lamport가 발표한 고전이며, 현대 스트림 처리 시스템의 checkpoint 기반이다.

문제:

분산 시스템의 여러 노드가 각자 상태를 가진다. 모든 노드의 상태를 동시에 저장하고 싶다. 그러나:

  • 정확한 동시성 불가능 (분산 시스템 본질).
  • 노드마다 다른 시점에 저장하면 불일치 (중간 메시지 포함 여부가 다름).
  • 네트워크로 이동 중인 메시지는 어디에?

Chandy-Lamport의 해결:

핵심 아이디어: Marker라는 특수 메시지를 채널에 삽입. Marker가 지나가는 순간이 "이 노드의 스냅샷 시점".

알고리즘:

  1. Initiator 노드가 시작:

    • 자신의 상태를 로컬에 저장.
    • 모든 out-going 채널에 marker 전송.
  2. Marker를 처음 받은 노드:

    • 자신의 상태를 저장.
    • 들어온 채널의 상태는 빈 상태 (marker 이전 메시지 없음).
    • 자신의 모든 out-going 채널에 marker 전송.
  3. Marker를 두 번째 이후 받는 노드:

    • 해당 in-coming 채널에서 "상태 저장 이후 marker 도착까지" 사이에 온 메시지들을 채널 상태로 기록.
  4. 알고리즘 완료: 모든 채널이 marker를 전달받으면 전역 스냅샷 완성.

결과: 일관된(consistent) 전역 스냅샷. 이는 다음을 의미:

  • 이 스냅샷은 가능한 실행 상태다 (실제 발생했을 수 있는 상태).
  • 인과 관계(happens-before)를 유지.
  • 복구 시 이 상태에서 재개하면 정합성 유지.

"가능한 상태"의 의미:

스냅샷이 실제로 동시에 벌어진 순간을 포착한 건 아니다. 하지만 이 상태는 어떤 가상의 순간에 일어날 수 있었을 법한 상태다. 이것으로 충분하다.

Flink의 응용:

Chandy-Lamport를 스트림 처리에 적용한 것이 Flink의 barrier 기반 checkpoint:

  • Marker → Barrier: 스트림에 특수 이벤트 삽입.
  • 채널 → Flink operator 간 edge.
  • 노드 → Operator.
Source --B--e1--e2-->
             \
              Operator1 --B--e3-->
                             \
                              Operator2 --B-->
                                           \
                                            Sink

Barrier가 지나가는 순간 각 operator가 상태를 저장. 간단하고 우아.

Alignment 문제:

Flink operator가 여러 입력을 가지면 (예: two-stream join):

  • 각 입력 채널에서 barrier가 다른 시간에 도착.
  • 먼저 도착한 스트림의 이벤트를 buffer에 쌓고 대기.
  • 모든 입력에서 barrier 도착 → 상태 저장 → barrier 전파.

이유: "barrier 이전" vs "barrier 이후"를 명확히 구분해야 checkpoint 일관성 유지.

단점: Backpressure 시 alignment가 오래 걸려 checkpoint 지연.

Flink의 해결: Unaligned checkpoint (1.11+):

  • Barrier를 즉시 전파.
  • In-flight 이벤트도 체크포인트 상태에 포함.
  • 단, 체크포인트 크기 증가.

의미:

Chandy-Lamport는 1985년에 만들어진 알고리즘이 지금도 현역이라는 예다. 분산 시스템의 근본 문제는 시간이 지나도 변하지 않는다. 우아한 해결은 시대를 초월한다.

Flink뿐만 아니라 Spark Structured Streaming, Kafka Streams, Apache Beam 모두 유사한 메커니즘을 쓴다. Chandy-Lamport는 분산 상태 관리의 공통 언어다.

Q4. Flink의 end-to-end exactly-once는 어떻게 2-phase commit을 활용하는가?

A.

Exactly-once의 세 수준:

  1. Source exactly-once: Source에서 같은 이벤트를 두 번 읽지 않음. Kafka의 consumer offset 관리로 가능.

  2. Internal exactly-once: Flink 내부의 상태 변경이 정확히 한 번 반영됨. Checkpoint 복구로 달성.

  3. Sink exactly-once (end-to-end): 외부 sink (Kafka, DB, 파일 시스템)에 정확히 한 번 쓰기. 가장 어려움.

Sink의 어려움:

Flink가 결과를 Kafka에 쓴다.

시나리오:
1. Flink가 record XKafka에 쓴다.
2. Kafka는 성공 응답.
3. Flink가 checkpoint 저장을 시작.
4. Checkpoint 저장 직전 장애.
5. 복구 시 이전 checkpoint로 돌아감.
6. X는 이미 Kafka에 있는데, Flink 상태에선 없음.
7. Flink가 X를 다시 처리해 Kafka에 쓴다.
8. Kafka에 X가 두 번 존재. 중복!

해결책: 2-Phase Commit Protocol

Flink의 TwoPhaseCommitSinkFunction 추상 클래스가 이 패턴을 구현한다:

Phase 1: Pre-commit (일반 쓰기 시):

  • Sink에 데이터를 "임시로" 쓴다.
  • Kafka: transactional producer로 transaction 시작 + 메시지 쓰기.
  • 파일 시스템: 임시 파일에 쓰기.
  • DB: BEGIN TRANSACTION.

이 단계에서 데이터는 사실상 쓰여졌지만 외부에서 볼 수 없다.

Phase 2: Commit (checkpoint 완료 후):

Flink가 checkpoint barrier 처리:
1. Operator 상태 저장.
2. Sink의 pre-commit 상태를 checkpoint에 기록 (Kafka transaction ID).
3. Checkpoint 완료 응답.

Checkpoint가 성공적으로 완료되었다는 알림을 받으면:
4. Sink가 commit 실행.
   - Kafka: producer.commitTransaction()
   - 파일: rename 임시 → 최종
   - DB: COMMIT

장애 시나리오 복구:

시나리오 A: Pre-commit 후 장애

  • Checkpoint 이전이므로 이 pre-commit은 checkpoint에 없음.
  • 복구 시 이전 checkpoint로 돌아감.
  • 저장소에는 abandon된 transaction 또는 임시 파일만.
  • Flink는 이들을 abort (roll back).
  • 다시 처리. 결과: 정확히 한 번 쓰여짐.

시나리오 B: Checkpoint 완료 후, commit 전 장애

  • Checkpoint에는 pre-commit 상태가 저장됨.
  • 복구 시 checkpoint 로드.
  • Flink는 pre-commit을 인식.
  • Commit을 재시도.
  • Kafka transaction은 commit 또는 이미 abort.
  • 결과: 정확히 한 번.

시나리오 C: Commit 중 장애

  • Commit은 원자적이어야.
  • 대부분 외부 시스템이 atomic commit 지원.
  • 실패 시 재시도 (idempotent).

Kafka의 Transactional Producer:

2-phase commit의 "sink" 측 구현:

producer.initTransactions();  // 최초 1회
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
// ... 
producer.commitTransaction();  // 또는 abortTransaction()
  • Transaction ID를 producer에 할당.
  • 같은 ID의 이전 transaction은 abort 후 새로 시작.
  • Consumer는 isolation.level=read_committed로 commit된 메시지만 읽음.

Idempotent Producer:

Exactly-once의 또 다른 축. Kafka 0.11+의 idempotent producer는 같은 메시지의 중복 전송을 자동 무시. Sequence number로 중복 탐지.

2PC의 대가:

  • 지연: Commit이 checkpoint 이후. 결과 출력이 checkpoint 주기에 의존.
  • 복잡도: Sink 구현이 까다로움.
  • 제약: 모든 sink가 2PC 지원은 아님.

대안: Idempotent Sink

2PC 대신 idempotent 쓰기:

  • 같은 레코드를 여러 번 써도 최종 상태 동일.
  • 예: INSERT ... ON CONFLICT DO UPDATE (upsert).
  • 예: Key-value 스토어의 put (같은 key는 덮어쓰기).

이 경우 at-least-once + idempotent = exactly-once 효과. 단순하고 빠름. 많은 실전 시스템이 이 접근을 선호.

결론:

Flink의 end-to-end exactly-once는 거저 얻어지지 않는다. 2PC는 구체적 구현이 필요하고 지연이 있다. 많은 경우 idempotent sink가 더 실용적이다. 하지만 금융, 청구 같이 엄격한 시스템에선 2PC exactly-once의 보장이 필요하다.

교훈: "Exactly-once는 어려운 문제지만 Flink가 이를 실용적 수준에서 해결했다". 이는 10년 이상의 엔지니어링 노력의 결과이며, 스트림 처리의 진화에서 중요한 이정표다.

Q5. Flink의 RocksDB state backend가 거대 상태(TB급)를 어떻게 관리하는가?

A.

문제: 스트림 처리는 오래 동작하면서 상태가 누적된다:

  • 사용자별 세션 (수억 사용자).
  • 시계열 window (수백 개 동시).
  • Join 상태 (양쪽 스트림 버퍼링).
  • Machine learning 특성.

메모리에 올릴 수 없는 크기 (TB급)가 될 수 있다. 단순 JVM heap state backend는 불가능.

RocksDB State Backend의 해결:

Flink는 RocksDB를 embedded key-value 저장소로 통합. RocksDB는 Facebook이 개발한 LSM-tree 기반 KV 스토어.

아키텍처:

TaskManager
├── JVM Heap (작업 메모리)
└── RocksDB Instance
    ├── MemTable (메모리 버퍼)
    ├── Block Cache (자주 쓰는 데이터)
    ├── SST Files (디스크의 정렬된 파일들)
    └── WAL (write-ahead log)

작동:

  1. 쓰기:

    • 먼저 MemTable (메모리)에 추가.
    • WAL에도 기록 (내구성).
    • MemTable이 차면 SST 파일로 flush.
    • 순차 I/O (빠름).
  2. 읽기:

    • MemTable → Block Cache → SST 파일 순서 검색.
    • Bloom filter로 SST 파일 건너뛰기.
    • LRU 캐시로 hot data 빠르게.
  3. Compaction:

    • 백그라운드에서 SST 파일 병합.
    • Tombstone 제거.
    • 공간 회수.

Flink와의 통합:

1. Keyed state → RocksDB:

ValueState<Long> counter = getRuntimeContext().getState(...)
counter.update(100);  // 내부적으로 RocksDB put

Flink가 (key, namespace, state_name) 조합을 RocksDB 키로 사용. 값은 직렬화된 state value.

2. Incremental Checkpoint:

이것이 RocksDB state backend의 가장 큰 장점이다:

일반 checkpoint: 전체 상태 저장 → 매번 TB 전송
Incremental checkpoint: 바뀐 SST 파일만 저장 → 수 GB 전송
  • RocksDB는 SST 파일이 immutable.
  • Checkpoint마다 "이전에 없던 SST 파일"만 원격으로 복사.
  • 기존 파일은 재사용.
  • 체크포인트 크기 대폭 감소, 시간 대폭 감소.

3. Async Snapshot:

  • MemTable과 SST 파일의 스냅샷을 비동기로.
  • 메인 작업이 block되지 않음.
  • Flink 1.10+에서 효율 크게 향상.

성능 특성:

  • 쓰기: 매우 빠름 (sequential I/O).
  • 포인트 읽기: 빠름 (bloom filter + cache).
  • 범위 스캔: 상당히 빠름 (정렬된 SST).
  • 공간: 압축 (Snappy/LZ4), 효율적.
  • 지연: Heap보다 수 배 느림 (serialization + disk).

Heap state vs RocksDB state:

항목HashMap (Heap)RocksDB
속도매우 빠름느림 (~10배)
크기JVM heap 제한수 TB 가능
GC 영향큰 상태 시 GC pause없음
Incremental checkpoint불가능지원
Serialization불필요매번 필요
용도작고 빠른 상태큰 상태

실전 튜닝:

Memtable 크기 조정:

rocksDbOptions.setWriteBufferSize(128 * 1024 * 1024);  // 128 MB

Block cache 크기:

rocksDbOptions.setBlockCacheSize(1024 * 1024 * 1024);  // 1 GB

Compaction 전략:

  • Level compaction: 읽기 우선 (기본).
  • Universal compaction: 쓰기 우선.

SSD 권장:

  • HDD에선 너무 느림.
  • NVMe가 이상적.

단점:

  1. Serialization 오버헤드: 모든 상태 접근이 직렬화/역직렬화.
  2. 디스크 의존: 스토리지가 병목 가능.
  3. 튜닝 복잡: 많은 RocksDB 파라미터.
  4. Compaction stall: 백그라운드 compaction이 일시 병목 가능.

실전 사용:

Heap 선택:

  • 상태 < 1 GB.
  • 응답 지연이 극도로 중요.
  • GC tuning 가능.

RocksDB 선택:

  • 상태 > 1 GB.
  • TB급 확장 필요.
  • Incremental checkpoint 활용.
  • 일반적인 프로덕션.

사례:

  • Alibaba: 페타바이트 급 상태를 Flink + RocksDB로 관리.
  • Netflix: 대규모 추천 시스템 상태.
  • Uber: 실시간 요금 계산.

결론:

RocksDB state backend는 Flink를 진정한 대규모 스트림 처리 엔진으로 만든 결정적 요소다. 메모리 제한을 넘어 "컴퓨터 한 대의 한계를 넘는" 상태를 관리할 수 있게 했다.

이는 LSM-tree의 우아함이 Flink의 스트림 처리와 만나 탄생한 강력한 조합이다. 현대 스트림 처리가 수십억 이벤트와 페타바이트 상태를 다루는 것은 이 기술 없이는 불가능했을 것이다.

RocksDB 자체의 설계 (LSM-tree, compaction, immutable SST)와 Flink의 incremental checkpoint가 시너지를 이룬다. 각자 단독으로는 못 했을 성능을 함께 달성한다. 이것이 시스템 설계의 미학이다.


마치며: 시간과 상태의 예술

핵심 정리

  1. Event time vs processing time: Event time이 거의 항상 정답.
  2. Watermark: 시간의 진행을 추정하는 신호.
  3. Windowing: Tumbling, sliding, session.
  4. State: Flink의 심장. Heap 또는 RocksDB.
  5. Checkpointing: Chandy-Lamport barrier 기반.
  6. Exactly-once: 2PC 또는 idempotent.
  7. Savepoint: 업그레이드와 버전 관리.
  8. Backpressure: Credit-based 자연스러운 제어.

스트림 처리의 철학

Stream processing은 시간과 상태를 다루는 예술이다. 전통 batch는:

  • 완성된 데이터셋.
  • 정해진 순서.
  • 유한한 계산.

Stream은:

  • 영원히 흐르는 데이터.
  • 불확실한 순서.
  • 무한한 계산.

이 차이가 완전히 다른 문제와 해결책을 만든다. Watermark, checkpoint, exactly-once — 모두 이 본질적 어려움에 대한 답이다.

실전 권장

  • Flink 시작: Kafka → Flink → Kafka 파이프라인.
  • Event time + watermark 기본.
  • RocksDB state backend 프로덕션.
  • Checkpoint + savepoint: 1~5분 주기.
  • 모든 operator에 명시적 UID.
  • Late data 모니터링.
  • Backpressure alert.

마지막 교훈

Apache Flink는 학술적 엄밀함실용적 엔지니어링이 만난 결과다. Chandy-Lamport 같은 1985년 논문을 현대 클라우드 환경에 적용했다. 이는 컴퓨터 과학의 발전이 어떻게 축적되는지 보여주는 좋은 예다.

스트림 처리를 이해하면:

  • 실시간 시스템을 설계할 수 있다.
  • 분산 상태 관리의 진짜 어려움을 이해한다.
  • 시간의 의미에 대해 더 깊이 생각하게 된다.
  • batch와 stream의 통합 (Kappa architecture)을 평가할 수 있다.

당신이 다음에 "실시간으로 이 데이터를 처리해야 한다"는 요구를 받으면, 이 글의 지식이 도움이 될 것이다. 그냥 setInterval()을 쓰지 말자. Flink의 우아한 답이 기다리고 있다.


참고 자료