Skip to content

필사 모드: CDC (Change Data Capture) 완전 가이드 2025: Debezium, Kafka, 데이터 동기화, Outbox 패턴

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

TL;DR

- **CDC = 데이터 동기화의 게임 체인저**: 폴링 대신 변경 이벤트 스트림. 실시간, 효율적

- **Debezium이 사실상 표준**: PostgreSQL, MySQL, MongoDB, SQL Server 등 지원

- **WAL/binlog 기반**: DB 내부 로그를 읽어 변경 캡처. 비침습적

- **Outbox 패턴**: dual-write 문제 해결. 마이크로서비스 이벤트의 표준

- **사용 사례**: DW 동기화, 검색 인덱스 갱신, 캐시 무효화, 이벤트 드리븐 아키텍처

1. CDC가 필요한 이유

1.1 전통적 데이터 동기화

**폴링 (Polling)**:

while True:

new_records = db.query("SELECT * FROM users WHERE updated_at > ?", last_sync)

sync_to_warehouse(new_records)

last_sync = now()

sleep(60)

**문제점**:

1. **지연**: 1분 단위 → 실시간 X

2. **부하**: DB 매번 쿼리

3. **DELETE 못 잡음**: `WHERE updated_at > ?`로는 삭제 감지 X

4. **놓침**: 트랜잭션 시간차로 누락 가능

5. **확장성 X**: 데이터 늘어나면 쿼리 느려짐

1.2 CDC의 약속

[DB] → [WAL/binlog] → [CDC tool] → [Kafka] → [Consumers]

실시간, 비침습적

**장점**:

1. **실시간**: 밀리초 단위 latency

2. **DB 부하 없음**: 로그만 읽음

3. **모든 변경**: INSERT, UPDATE, DELETE 다 캡처

4. **순서 보장**: 트랜잭션 순서대로

5. **확장성**: 데이터 양과 무관

1.3 사용 사례

| 사용 사례 | 설명 |

|---|---|

| **데이터 웨어하우스 동기화** | OLTP → Snowflake/BigQuery 실시간 |

| **검색 인덱스 갱신** | DB 변경 → Elasticsearch 즉시 반영 |

| **캐시 무효화** | DB 업데이트 → Redis 캐시 무효화 |

| **이벤트 드리븐** | DB 변경을 도메인 이벤트로 |

| **마이크로서비스 동기화** | 한 서비스의 데이터를 다른 서비스가 읽음 |

| **DB 마이그레이션** | 무중단 마이그레이션 (구 → 신 DB) |

| **감사 로그** | 모든 변경 이력 보관 |

2. CDC의 작동 원리

2.1 PostgreSQL의 WAL

**WAL (Write-Ahead Log)**: 모든 변경을 디스크에 기록하는 로그. **장애 복구의 기반**.

INSERT INTO users (name) VALUES ('Alice');

[WAL]: tx_id=123, op=INSERT, table=users, data={name:'Alice'}

[Disk page write]

CDC는 이 WAL을 읽어 변경을 캡처합니다.

2.2 Logical vs Physical Replication

**Physical Replication** (Streaming):

- 디스크 페이지 수준 복제

- Standby가 Primary와 동일한 byte

- 장점: 빠름, 정확

- 단점: 다른 버전/스키마 불가능

**Logical Replication**:

- 논리적 변경(INSERT/UPDATE/DELETE) 복제

- 다른 스키마, 다른 버전 가능

- 장점: 유연함, CDC에 적합

- 단점: 약간의 오버헤드

2.3 PostgreSQL Logical Replication 설정

-- postgresql.conf

wal_level = logical

max_wal_senders = 10

max_replication_slots = 10

-- 사용자 생성

CREATE USER cdc_user REPLICATION LOGIN PASSWORD 'secret';

GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;

-- Publication 생성 (어떤 테이블을 복제할지)

CREATE PUBLICATION my_pub FOR TABLE users, orders;

-- 또는 모든 테이블

CREATE PUBLICATION my_pub FOR ALL TABLES;

-- Replication slot 생성 (변경을 보존하는 곳)

SELECT pg_create_logical_replication_slot('my_slot', 'pgoutput');

2.4 MySQL의 binlog

MySQL은 **binlog**를 사용:

my.cnf

log_bin = mysql-bin

binlog_format = ROW # CDC에는 ROW 필수

binlog_row_image = FULL

server_id = 1

expire_logs_days = 7

**binlog_format**:

- `STATEMENT`: SQL 문 자체 (재생 시 비결정적 가능)

- `ROW`: 행 변경 기록 (CDC에 적합)

- `MIXED`: 둘 다

2.5 MongoDB의 Change Streams

const changeStream = db.collection('users').watch()

changeStream.on('change', (change) => {

console.log(change)

// { operationType: 'insert', fullDocument: {...}, ... }

})

MongoDB는 **oplog** 기반.

3. Debezium — CDC의 표준

3.1 Debezium이란?

**Debezium** = 오픈소스 CDC 플랫폼. Red Hat이 시작.

**아키텍처**:

[PostgreSQL] → [Debezium connector] → [Kafka Connect] → [Kafka topics]

[Consumers]

**핵심 기능**:

- 다양한 DB 지원 (PG, MySQL, MongoDB, Oracle, SQL Server, Cassandra)

- Kafka Connect 위에 구축

- 정확히 한 번 전달 (Kafka 보장)

- 스키마 변경 감지

- 필터링, 변환 가능

3.2 Debezium 설치 (Docker Compose)

version: '3.7'

services:

zookeeper:

image: confluentinc/cp-zookeeper:7.5.0

environment:

ZOOKEEPER_CLIENT_PORT: 2181

kafka:

image: confluentinc/cp-kafka:7.5.0

depends_on: [zookeeper]

environment:

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

connect:

image: debezium/connect:2.5

depends_on: [kafka, postgres]

environment:

BOOTSTRAP_SERVERS: kafka:9092

GROUP_ID: 1

CONFIG_STORAGE_TOPIC: connect_configs

OFFSET_STORAGE_TOPIC: connect_offsets

STATUS_STORAGE_TOPIC: connect_statuses

postgres:

image: debezium/example-postgres:2.5

environment:

POSTGRES_USER: postgres

POSTGRES_PASSWORD: postgres

3.3 PostgreSQL Connector 등록

curl -X POST http://connect:8083/connectors \

-H "Content-Type: application/json" \

-d '{

"name": "postgres-connector",

"config": {

"connector.class": "io.debezium.connector.postgresql.PostgresConnector",

"database.hostname": "postgres",

"database.port": "5432",

"database.user": "cdc_user",

"database.password": "secret",

"database.dbname": "mydb",

"database.server.name": "mydb",

"table.include.list": "public.users,public.orders",

"plugin.name": "pgoutput",

"publication.name": "my_pub",

"slot.name": "my_slot"

}

}'

이제 Debezium이 PostgreSQL의 변경을 Kafka에 자동 발행합니다.

3.4 Kafka 토픽 구조

Debezium은 **테이블당 토픽** 생성:

mydb.public.users ← users 테이블의 변경

mydb.public.orders ← orders 테이블의 변경

**메시지 형식**:

{

"before": null,

"after": {

"id": 123,

"name": "Alice",

"email": "alice@example.com"

},

"source": {

"version": "2.5",

"connector": "postgresql",

"name": "mydb",

"ts_ms": 1681545600000,

"snapshot": "false",

"db": "mydb",

"schema": "public",

"table": "users",

"txId": 12345,

"lsn": 23456789

},

"op": "c", // c=create, u=update, d=delete, r=read(snapshot)

"ts_ms": 1681545600000

}

3.5 Kafka Consumer

from kafka import KafkaConsumer

consumer = KafkaConsumer(

'mydb.public.users',

bootstrap_servers=['kafka:9092'],

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

)

for message in consumer:

event = message.value

op = event['op']

if op == 'c':

print(f"새 사용자: {event['after']}")

elif op == 'u':

print(f"업데이트: {event['before']} → {event['after']}")

elif op == 'd':

print(f"삭제: {event['before']}")

4. Outbox 패턴 — Dual-Write 문제 해결

4.1 Dual-Write 문제

**시나리오**: 주문 생성 시 DB 저장 + Kafka 이벤트 발행.

def create_order(order):

db.save(order) # 1

kafka.send("orders", order) # 2

**문제**: 1과 2 사이에 크래시 발생하면?

- 1만 성공: DB에 있지만 이벤트 안 감 → 다른 서비스 불일치

- 2만 성공: 이벤트는 갔지만 DB에 없음 → 고스트 이벤트

**dual-write 문제**: 두 시스템에 원자적 쓰기는 불가능.

4.2 Outbox 패턴

**핵심**: DB 트랜잭션 안에서 outbox 테이블에 이벤트 저장.

CREATE TABLE outbox (

id UUID PRIMARY KEY,

aggregate_type VARCHAR(255),

aggregate_id VARCHAR(255),

event_type VARCHAR(255),

payload JSONB,

created_at TIMESTAMP DEFAULT NOW()

);

def create_order(order):

with db.transaction():

db.save(order)

db.save(OutboxEvent(

aggregate_type='Order',

aggregate_id=order.id,

event_type='OrderCreated',

payload=order.to_json()

))

**원자성 보장**: 두 INSERT가 같은 트랜잭션 → 함께 성공 또는 실패.

4.3 Outbox → Kafka

**방법 1: Polling Publisher** (단순)

while True:

events = db.query("""

SELECT * FROM outbox

WHERE published = false

ORDER BY created_at

LIMIT 100

""")

for event in events:

kafka.send(event.aggregate_type, event.payload)

db.execute("UPDATE outbox SET published = true WHERE id = ?", event.id)

sleep(1)

**단점**: 폴링 오버헤드, 약간의 지연.

**방법 2: CDC** (권장)

Debezium이 outbox 테이블을 모니터링 → 자동 Kafka 발행.

curl -X POST http://connect:8083/connectors \

-d '{

"name": "outbox-connector",

"config": {

"connector.class": "io.debezium.connector.postgresql.PostgresConnector",

"table.include.list": "public.outbox",

"transforms": "outbox",

"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",

"transforms.outbox.route.by.field": "aggregate_type",

"transforms.outbox.table.field.event.id": "id",

"transforms.outbox.table.field.event.payload": "payload"

}

}'

**효과**: 별도 폴링 코드 없이 자동. 거의 실시간.

5. 데이터 웨어하우스 동기화

5.1 시나리오

**OLTP** (PostgreSQL) → **DW** (Snowflake/BigQuery) 실시간 동기화.

5.2 전통적 ETL의 한계

[Extract] → [Transform] → [Load]

(배치, 일 1회) (느림) (대용량)

**문제**:

- 24시간 지연

- DB에 큰 부하

- 모든 데이터 재처리

5.3 CDC 기반 ELT

[PostgreSQL] → [Debezium] → [Kafka] → [Stream Processor] → [Snowflake]

(필요 시 transform)

**장점**:

- **실시간** (초 단위 latency)

- **DB 부하 최소** (WAL만 읽음)

- **변경만 처리** (전체 재처리 X)

5.4 도구

| 도구 | 특징 |

|---|---|

| **Debezium + Kafka Connect** | 오픈소스, 자체 호스팅 |

| **Fivetran** | 매니지드, 비쌈, 쉬움 |

| **Airbyte** | 오픈소스 + 매니지드 |

| **Stitch** | 단순, 매니지드 |

| **AWS DMS** | AWS 통합 |

| **Striim** | 엔터프라이즈 |

| **Hevo** | 노코드 |

5.5 Schema Evolution

**문제**: 소스 DB의 컬럼 추가/삭제 시 어떻게?

**Debezium의 처리**:

- 새 컬럼: 자동 감지, 이벤트에 포함

- 컬럼 삭제: 이벤트 발행 (DELETE 이벤트로)

- 타입 변경: 새 스키마로 발행

**Schema Registry** (Confluent):

- 모든 스키마 버전 저장

- 호환성 검증

- 컨슈머가 안전하게 읽기

6. 검색 인덱스 동기화

6.1 시나리오

PostgreSQL → Elasticsearch 실시간 인덱싱.

6.2 아키텍처

[PostgreSQL] → [Debezium] → [Kafka] → [Elasticsearch Sink Connector] → [Elasticsearch]

6.3 Elasticsearch Sink Connector

curl -X POST http://connect:8083/connectors \

-d '{

"name": "es-sink",

"config": {

"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"topics": "mydb.public.products",

"connection.url": "http://elasticsearch:9200",

"type.name": "_doc",

"key.ignore": "false",

"schema.ignore": "true",

"behavior.on.malformed.documents": "warn"

}

}'

6.4 효과

- 새 product 추가 → 즉시 검색 가능 (1-2초 내)

- 가격 업데이트 → 검색 결과 자동 갱신

- 삭제 → 인덱스에서 즉시 제거

기존 방식 (배치 reindex)는 시간당 1회 + 수 시간 소요. CDC는 실시간.

7. 캐시 무효화

7.1 문제

def get_user(user_id):

user = cache.get(f"user:{user_id}")

if user is None:

user = db.query(...)

cache.set(f"user:{user_id}", user, ttl=3600)

return user

def update_user(user_id, data):

db.execute("UPDATE users SET ... WHERE id=?", user_id)

cache.delete(f"user:{user_id}") # 누가 잊으면? 다른 서비스에서 업데이트하면?

**해결**: CDC로 자동 무효화.

7.2 CDC 기반

Kafka consumer

consumer = KafkaConsumer('mydb.public.users')

for message in consumer:

event = message.value

user_id = event['after']['id'] if event['op'] != 'd' else event['before']['id']

cache.delete(f"user:{user_id}")

**장점**:

- 코드 변경 불필요

- 모든 변경 자동 캡처

- 중앙 관리

8. 무중단 마이그레이션

8.1 시나리오

레거시 MySQL → 새 PostgreSQL 마이그레이션.

8.2 단계

**1단계: CDC 설정**

[MySQL (현재)] → [Debezium] → [Kafka]

[PostgreSQL (새)]

**2단계: 초기 스냅샷**

- Debezium이 MySQL의 모든 데이터를 한 번에 PG로 복사

**3단계: 실시간 동기화**

- 그 후 변경만 PG에 반영

- 두 DB가 거의 실시간으로 동기화됨

**4단계: 읽기 트래픽 전환**

- 일부 읽기를 PG로 (검증)

- 점진적으로 100% PG로

**5단계: 쓰기 트래픽 전환**

- CDC 방향 반전: PG → MySQL (롤백 대비)

- 모든 쓰기를 PG로

- 일정 기간 후 MySQL 폐기

8.3 장점

- **무중단**: 사용자 영향 X

- **롤백 가능**: 문제 시 즉시 복귀

- **점진적**: 위험 분산

- **검증**: 두 DB 비교로 데이터 정합성 확인

9. 흔한 함정과 해결책

9.1 Replication slot 누적

PostgreSQL replication slot이 사용되지 않으면 **WAL이 무한 누적** → 디스크 폭증.

-- 활성 슬롯 확인

SELECT slot_name, active, restart_lsn

FROM pg_replication_slots;

-- 사용 안 하는 슬롯 삭제

SELECT pg_drop_replication_slot('unused_slot');

**모니터링 필수**: WAL 크기 알림.

9.2 큰 트랜잭션

수백만 행을 한 트랜잭션에 처리하면 → Debezium 메모리 폭증.

**해결**:

- 트랜잭션 분할

- `max.queue.size` 튜닝

- Heartbeat 활용

9.3 Schema 변경

`ALTER TABLE`이 CDC를 망가뜨릴 수 있음:

**예방**:

- 추가만 (additive changes)

- DDL을 작은 단위로

- Debezium의 schema history 사용

9.4 Exactly-once delivery

Debezium은 **at-least-once** (중복 가능).

**해결**:

- 컨슈머에서 멱등성 (idempotency) 보장

- Kafka의 transaction API 사용

- 메시지 ID 추적

9.5 Monitoring

Connector 상태

curl http://connect:8083/connectors/postgres-connector/status

메트릭 (JMX)

debezium_metrics_QueueRemainingCapacity

debezium_metrics_NumberOfEventsFiltered

핵심 지표:

- **Lag**: Source DB와의 차이

- **Queue size**: Connector 내부 큐

- **Error rate**: 처리 실패율

10. CDC vs 대안

10.1 CDC vs Polling

| | CDC | Polling |

|---|---|---|

| **지연** | 실시간 (ms) | 분/시간 |

| **DB 부하** | 매우 낮음 | 높음 |

| **DELETE 감지** | ✅ | ❌ |

| **복잡도** | 높음 | 낮음 |

| **인프라** | Kafka 등 필요 | 없음 |

10.2 CDC vs Triggers

**Triggers**: DB 트리거가 다른 테이블에 쓰기.

- 장점: 단순

- 단점: DB 성능 영향, 트랜잭션 부담

**CDC**: 외부 도구가 WAL 읽음.

- 장점: 비침습적

- 단점: 별도 인프라

10.3 CDC vs Application 이벤트

**App-level**: 코드에서 직접 이벤트 발행.

- 장점: 비즈니스 로직과 통합

- 단점: dual-write 문제 (Outbox로 해결)

**CDC**: DB에서 직접.

- 장점: 안전, 완전

- 단점: 데이터 모델 수준만

**조합**: Outbox + CDC = 최선의 두 세계.

퀴즈

**답**: **Physical Replication**: 디스크 페이지 수준 복제. Standby가 Primary와 byte-identical. 빠르지만 동일 버전/스키마 필수. 주로 HA에 사용. **Logical Replication**: 논리적 변경(INSERT/UPDATE/DELETE) 복제. 다른 스키마, 다른 버전 가능. CDC에 적합. PostgreSQL 10+ 지원. CDC는 logical replication 기반이며, `wal_level = logical`로 활성화합니다.

**답**: DB 트랜잭션 안에서 outbox 테이블에 이벤트를 함께 저장합니다. 두 INSERT가 같은 트랜잭션 → 원자적으로 함께 성공 또는 실패. 그 후 별도 프로세스(Polling 또는 CDC)가 outbox에서 Kafka로 이벤트 전달. 결과: **DB와 메시지 브로커 사이에 원자성 보장**. Debezium + Outbox Event Router transformation이 표준 구현. Microservices에서 거의 필수 패턴입니다.

**답**: (1) **다양한 DB 지원** — PostgreSQL, MySQL, MongoDB, Oracle, SQL Server, Cassandra, (2) **Kafka Connect 위에 구축** — 검증된 인프라, (3) **오픈소스** — 무료, (4) **풍부한 기능** — 스키마 변경 감지, 필터링, transformation, (5) **Red Hat 후원** — 안정적 개발. 대안(Maxwell, Canal)은 특정 DB만 지원하거나 기능 부족. 클라우드 매니지드 (Fivetran, Airbyte)는 비싸지만 운영 부담 적음.

**답**: (1) **자동** — 코드에서 cache.delete()를 잊을 일 없음, (2) **완전** — DB의 모든 변경(다른 서비스 포함) 캡처, (3) **중앙 관리** — 캐시 무효화 로직이 한 곳에, (4) **신뢰성** — DB 트랜잭션 commit 시점에 발생. 단점: 약간의 지연 (보통 1초 이내), 별도 인프라 필요. **여러 마이크로서비스가 같은 DB를 공유**할 때 특히 유용합니다.

**답**: 1) **초기 스냅샷**: Debezium이 모든 데이터를 한 번에 새 DB로 복사. 2) **실시간 동기화**: WAL을 읽어 변경을 새 DB에 즉시 반영. 두 DB가 거의 실시간으로 일치. 3) **점진적 트래픽 전환**: 읽기를 점진적으로 새 DB로, 그 다음 쓰기. 4) **롤백 가능**: 문제 시 CDC 방향 반전. **다운타임 없이** 수개월에 걸쳐 안전한 마이그레이션 가능. Stripe, Airbnb가 이 방식으로 PostgreSQL 마이그레이션 수행.

참고 자료

- [Debezium](https://debezium.io/) — 공식 문서

- [PostgreSQL Logical Replication](https://www.postgresql.org/docs/current/logical-replication.html)

- [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html)

- [Debezium Outbox Pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/)

- [Designing Data-Intensive Applications](https://dataintensive.net/) — Martin Kleppmann (CDC 챕터)

- [Confluent CDC](https://www.confluent.io/learn/change-data-capture/)

- [Stripe's Live Migration](https://stripe.com/blog/online-migrations) — 무중단 마이그레이션 사례

- [GitHub Engineering: MySQL Schema Changes](https://github.blog/2016-08-01-gh-ost-github-s-online-migration-tool-for-mysql/)

- [Airbyte](https://airbyte.com/) — Debezium 대안 (오픈소스)

- [Fivetran](https://www.fivetran.com/) — 매니지드 CDC

- [Maxwell's Daemon](https://maxwells-daemon.io/) — MySQL 전용 CDC

현재 단락 (1/390)

- **CDC = 데이터 동기화의 게임 체인저**: 폴링 대신 변경 이벤트 스트림. 실시간, 효율적

작성 글자: 0원문 글자: 11,532작성 단락: 0/390