Skip to content

필사 모드: Reactive Programming 완전 가이드 2025: RxJS, Reactor, Kotlin Flow, Backpressure

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

TL;DR

- **Reactive = 비동기 + 스트림 + 함수형**: 데이터의 흐름을 다루는 패러다임

- **Observable**: 0개~무한 개 값의 스트림

- **3대 라이브러리**: RxJS (JavaScript), Reactor (Java), Kotlin Flow

- **Backpressure**: Reactive의 가장 어려운 문제. 소비자가 따라가지 못할 때 대처

- **Hot vs Cold**: 데이터 생성 시점 차이. 매우 중요한 구분

1. Reactive Programming이란?

1.1 정의

> 비동기 데이터 스트림과 변화 전파(propagation of change)를 다루는 프로그래밍 패러다임.

**3가지 핵심**:

1. **비동기** — 결과를 기다리지 않음

2. **데이터 스트림** — 시간에 걸친 값들의 흐름

3. **변화 전파** — 한 값이 변하면 의존하는 값들도 자동 업데이트

1.2 왜 Reactive인가?

**전통적 명령형**:

const a = 1

const b = 2

const sum = a + b // 3

a = 5

console.log(sum) // 여전히 3

**Reactive**:

const a$ = new BehaviorSubject(1)

const b$ = new BehaviorSubject(2)

const sum$ = combineLatest([a$, b$]).pipe(map(([a, b]) => a + b))

sum$.subscribe(s => console.log(s)) // 3

a$.next(5)

// 자동으로 8 출력

→ **자동 전파** = 스프레드시트와 비슷.

1.3 적합한 시나리오

✅ **UI 이벤트** (클릭, 입력)

✅ **WebSocket 메시지**

✅ **비동기 API 호출 합성**

✅ **실시간 데이터** (차트, 알림)

✅ **검색 자동완성** (debounce, throttle)

❌ **단순 CRUD** (오버엔지니어링)

❌ **순수 동기 작업**

❌ **간단한 상태 관리**

2. Observable과 Observer

2.1 Observable

**Observable** = 데이터 스트림.

const observable$ = new Observable<number>(subscriber => {

subscriber.next(1)

subscriber.next(2)

subscriber.next(3)

setTimeout(() => {

subscriber.next(4)

subscriber.complete()

}, 1000)

})

**관례**: 변수명 끝에 `$` (예: `users$`).

2.2 Observer (Subscriber)

**Observer** = Observable을 구독하는 객체. 3개 콜백:

observable$.subscribe({

next: (value) => console.log('Got:', value),

error: (err) => console.error('Error:', err),

complete: () => console.log('Done')

})

**규칙**:

- `next`: 값마다 호출 (0~N번)

- `error`: 에러 시 1번 (그 후 종료)

- `complete`: 정상 종료 시 1번

**계약**: error 또는 complete 후에는 next 호출 X.

2.3 Cold vs Hot

**Cold Observable**: 구독 시 데이터 생성 시작.

const cold$ = new Observable(subscriber => {

console.log('Producing...')

subscriber.next(Math.random())

})

cold$.subscribe(v => console.log('A:', v)) // "Producing..." + 다른 값

cold$.subscribe(v => console.log('B:', v)) // "Producing..." + 다른 값

**Hot Observable**: 데이터가 이미 흐르고 있음. 구독자는 그 시점부터 받음.

const hot$ = new Subject<number>()

hot$.subscribe(v => console.log('A:', v))

hot$.next(1) // A: 1

hot$.subscribe(v => console.log('B:', v))

hot$.next(2) // A: 2, B: 2

// B는 1을 못 받음 (이미 지나감)

**예시**:

- **Cold**: HTTP 요청, 파일 읽기, DB 쿼리

- **Hot**: 마우스 이벤트, WebSocket, BehaviorSubject

2.4 Subject 종류

// Subject: 일반

const subject = new Subject<number>()

// BehaviorSubject: 마지막 값 기억, 새 구독자에게 즉시 전달

const behavior = new BehaviorSubject<number>(0)

// ReplaySubject: N개 과거 값 기억

const replay = new ReplaySubject<number>(3) // 마지막 3개

// AsyncSubject: complete 시 마지막 값만 전달

const async$ = new AsyncSubject<number>()

3. RxJS — JavaScript의 표준

3.1 기본 사용

const input = document.querySelector('input')

fromEvent(input, 'input').pipe(

map((e: any) => e.target.value),

filter(text => text.length >= 3),

debounceTime(300)

).subscribe(text => {

console.log('Search:', text)

})

**흐름**:

1. 입력 이벤트 발생

2. 값 추출

3. 3글자 이상 필터

4. 300ms 동안 입력 없으면 통과

5. 검색 실행

3.2 자주 쓰는 연산자

**Transformation**:

.pipe(

map(x => x * 2), // 변환

pluck('user', 'name'), // 중첩 객체 추출

scan((acc, v) => acc + v, 0) // 누적 (reduce)

)

**Filtering**:

.pipe(

filter(x => x > 10), // 조건 필터

take(5), // 처음 5개만

takeUntil(stop$), // stop$ 발행까지

distinctUntilChanged(), // 중복 제거 (연속)

debounceTime(300), // 300ms 동안 조용하면

throttleTime(1000) // 1초에 한 번

)

**Combination**:

merge(a$, b$) // 둘 중 어느 것이든 발행하면

combineLatest([a$, b$]) // 둘 다 최소 1번 발행 후, 둘 중 하나 발행 시마다

zip(a$, b$) // 같은 인덱스끼리 짝

forkJoin([a$, b$]) // 모두 complete 후 마지막 값

**Flattening**:

.pipe(

switchMap(id => fetch(`/api/users/${id}`)), // 새 값 오면 이전 취소

mergeMap(id => fetch(`/api/users/${id}`)), // 모두 병렬 처리

concatMap(id => fetch(`/api/users/${id}`)), // 순차 처리

exhaustMap(id => fetch(`/api/users/${id}`)) // 진행 중이면 새 값 무시

)

3.3 switchMap의 마법

**검색 자동완성**의 핵심:

fromEvent(input, 'input').pipe(

map((e: any) => e.target.value),

debounceTime(300),

switchMap(query => fetch(`/search?q=${query}`))

).subscribe(results => render(results))

**왜 switchMap?**:

- 사용자가 "react"를 입력 중

- "rea" → 검색 시작

- "reac" → 이전 검색 **자동 취소**, 새 검색 시작

- 마지막 검색 결과만 받음 → race condition 방지

**다른 옵션이라면**:

- `mergeMap`: 모든 검색 결과 (race condition)

- `concatMap`: 순차 처리 (느림)

- `exhaustMap`: 진행 중이면 새 입력 무시 (잘못된 동작)

3.4 Error Handling

.pipe(

map(x => {

if (x < 0) throw new Error('Negative')

return x

}),

catchError(err => {

console.error(err)

return of(0) // 기본값으로 대체

}),

retry(3), // 3번 재시도

retryWhen(errors => errors.pipe(

delay(1000),

take(3)

))

)

3.5 마블 다이어그램

input: --1--2--3--4--5---|

map(x => x * 2):

output: --2--4--6--8--10--|

a$: --1----2----3---|

b$: ---a----b---c--|

combineLatest:

output: ---[1,a]-[2,a][2,b]-[3,b][3,c]--|

마블 다이어그램은 RxJS 학습의 핵심. 모든 연산자가 마블로 설명됩니다.

4. Project Reactor — Java의 표준

4.1 Mono와 Flux

**Mono**: 0개 또는 1개 값.

**Flux**: 0개~N개 값.

// Mono - 단일 값 또는 없음

Mono<User> user = userRepository.findById(123);

// Flux - 여러 값

Flux<User> users = userRepository.findAll();

4.2 Spring WebFlux

전통적 Spring MVC (블로킹):

@GetMapping("/users/{id}")

public User getUser(@PathVariable Long id) {

return userRepository.findById(id).get();

}

WebFlux (논블로킹):

@GetMapping("/users/{id}")

public Mono<User> getUser(@PathVariable Long id) {

return userRepository.findById(id);

}

**효과**: 적은 스레드로 더 많은 동시 요청 처리. Netty 기반.

4.3 자주 쓰는 연산자

Flux.range(1, 10)

.map(i -> i * 2)

.filter(i -> i > 5)

.take(3)

.collectList()

.subscribe(System.out::println);

4.4 비동기 합성

Mono<User> user = userRepository.findById(userId);

Mono<List<Order>> orders = orderRepository.findByUserId(userId);

Mono<UserDto> userDto = Mono.zip(user, orders)

.map(tuple -> new UserDto(tuple.getT1(), tuple.getT2()));

4.5 R2DBC

전통적 JDBC = 블로킹.

**R2DBC** = Reactive Relational Database Connectivity.

@Repository

public interface UserRepository extends ReactiveCrudRepository<User, Long> {

Flux<User> findByCountry(String country);

}

블로킹 없는 PostgreSQL/MySQL 접근.

5. Kotlin Flow

5.1 Coroutines + Reactive

Kotlin Flow = Coroutines 위에 reactive 스트림.

fun fetchUsers(): Flow<User> = flow {

val users = userRepository.findAll()

users.forEach { user ->

emit(user)

}

}

// 사용

fetchUsers()

.filter { it.age > 18 }

.map { it.name }

.collect { name -> println(name) }

5.2 Cold vs Hot Flow

**Cold Flow** (`flow { }`): 구독 시 시작.

**Hot Flow**:

- `StateFlow`: BehaviorSubject 같음, 마지막 값 기억

- `SharedFlow`: Subject 같음

// StateFlow

val count = MutableStateFlow(0)

count.value = 1

// 구독

count.collect { value -> println(value) }

5.3 Android에서

class UserViewModel : ViewModel() {

val users: StateFlow<List<User>> = userRepository

.getUsers()

.stateIn(viewModelScope, SharingStarted.Eagerly, emptyList())

}

// Compose

@Composable

fun UserList(viewModel: UserViewModel) {

val users by viewModel.users.collectAsState()

LazyColumn {

items(users) { user -> Text(user.name) }

}

}

5.4 RxJava와의 비교

| | RxJava | Kotlin Flow |

|---|---|---|

| **언어** | Java | Kotlin |

| **기반** | Threads | Coroutines |

| **메모리** | 보통 | 더 적음 |

| **연산자** | 200+ | ~60 (충분) |

| **학습 곡선** | 가파름 | 보통 |

| **Backpressure** | 명시적 | Suspending |

**Kotlin 프로젝트는 Flow 권장**.

6. Backpressure — 가장 어려운 문제

6.1 문제

**시나리오**: 서버가 초당 10,000 메시지 발행. 클라이언트는 초당 100 메시지만 처리 가능.

**결과**:

- 메모리 폭증 (큐 누적)

- 스레드 고갈

- OOM 크래시

6.2 해결 전략

**1. Buffer**: 메모리에 쌓아둠 (한계 있음)

.pipe(

bufferTime(1000), // 1초씩 버퍼

// 또는

buffer(stop$), // 신호로 flush

)

**2. Drop**: 초과분 버림

Flux.range(1, 1000)

.onBackpressureDrop(dropped -> log.warn("Dropped: " + dropped))

**3. Latest**: 최신 값만 유지

Flux.range(1, 1000)

.onBackpressureLatest()

**4. Sample**: 주기적 샘플링

.pipe(sampleTime(100)) // 100ms마다 최신 값

**5. Throttle**: 속도 제한

.pipe(throttleTime(100)) // 100ms에 한 번

**6. Pull-based**: 컨슈머가 요청한 만큼만

// Reactive Streams의 표준

subscription.request(10) // 10개만 더 줘

6.3 Reactive Streams 표준

**Reactive Streams** = JVM/JS의 backpressure 표준.

**4개 인터페이스**:

- `Publisher`: 데이터 발행

- `Subscriber`: 데이터 수신

- `Subscription`: 구독 관리

- `Processor`: 둘 다

**핵심**: `subscription.request(n)` — 컨슈머가 명시적으로 n개 요청.

**구현**: Project Reactor, RxJava 2+, Akka Streams, Vert.x.

7. 실전 패턴

7.1 자동완성 (Type-Ahead Search)

const input = document.querySelector('#search')

fromEvent(input, 'input').pipe(

map(e => e.target.value),

debounceTime(300), // 300ms 대기

distinctUntilChanged(), // 같은 값 중복 제거

filter(text => text.length >= 2), // 2글자 이상

switchMap(text => // 새 검색 시 이전 취소

fromFetch(`/api/search?q=${text}`).pipe(

switchMap(res => res.json()),

catchError(() => of([])) // 에러 시 빈 배열

)

)

).subscribe(results => render(results))

이 단순한 코드가:

- 사용자가 빠르게 타이핑할 때 모든 글자에 대해 검색 X

- race condition 방지 (이전 검색 결과가 새 검색 결과를 덮지 않음)

- 에러 처리

7.2 Polling

interval(5000).pipe(

switchMap(() => fetch('/api/status'))

).subscribe(status => updateUI(status))

5초마다 API 호출. 자동 cleanup.

7.3 WebSocket 합성

const messages$ = webSocket('wss://example.com/chat')

messages$.pipe(

filter(msg => msg.type === 'message'),

map(msg => msg.text)

).subscribe(text => addMessage(text))

7.4 Retry with Backoff

fetch$.pipe(

retry({

count: 3,

delay: (error, retryCount) => {

const delay = Math.pow(2, retryCount) * 1000

return timer(delay)

}

})

)

Exponential backoff: 1초 → 2초 → 4초.

7.5 Form Validation

const username$ = fromEvent(usernameInput, 'input').pipe(

map(e => e.target.value),

debounceTime(500),

switchMap(name =>

fromFetch(`/api/check-username?name=${name}`).pipe(

switchMap(res => res.json())

)

)

)

const password$ = fromEvent(passwordInput, 'input').pipe(

map(e => e.target.value),

map(pw => pw.length >= 8)

)

const formValid$ = combineLatest([

username$.pipe(map(res => res.available)),

password$

]).pipe(

map(([usernameOk, passwordOk]) => usernameOk && passwordOk)

)

formValid$.subscribe(valid => submitButton.disabled = !valid)

8. Schedulers와 동시성

8.1 Schedulers

스트림이 어떤 스레드에서 실행되는가?

**RxJS Schedulers**:

- `asyncScheduler`: setTimeout 기반

- `asapScheduler`: microtask

- `queueScheduler`: 동기 (즉시)

- `animationFrameScheduler`: requestAnimationFrame

.pipe(

observeOn(asyncScheduler), // 어디서 처리할지

subscribeOn(asyncScheduler) // 어디서 시작할지

)

8.2 RxJava Schedulers

.subscribeOn(Schedulers.io()) // I/O 작업

.observeOn(Schedulers.computation()) // CPU 작업

.observeOn(AndroidSchedulers.mainThread()) // UI

**일반 패턴**: I/O는 io 스케줄러, UI 업데이트는 main 스레드.

9. 함정과 베스트 프랙티스

9.1 메모리 누수 — Subscription 관리

**잘못**:

ngOnInit() {

this.users$.subscribe(users => this.users = users)

// 컴포넌트가 죽어도 subscription은 살아있음 → 메모리 누수

}

**해결 1**: takeUntil

private destroy$ = new Subject<void>()

ngOnInit() {

this.users$.pipe(

takeUntil(this.destroy$)

).subscribe(users => this.users = users)

}

ngOnDestroy() {

this.destroy$.next()

this.destroy$.complete()

}

**해결 2**: async pipe (Angular)

{{ user.name }}

**자동 cleanup**.

9.2 Cold/Hot 혼란

**잘못**:

const data$ = fetch$.pipe(map(transformData))

// 두 번 구독 → fetch가 두 번 실행

data$.subscribe(handler1)

data$.subscribe(handler2)

**해결**: `share()` 또는 `shareReplay()`

const data$ = fetch$.pipe(

map(transformData),

shareReplay(1) // 한 번 fetch, 결과 공유

)

9.3 너무 많은 nested

// Pyramid of doom

.pipe(

switchMap(a =>

fetch1(a).pipe(

switchMap(b =>

fetch2(b).pipe(

map(c => transform(c))

)

)

)

)

)

**해결**: 작은 함수로 분리, async/await 고려.

9.4 RxJS가 항상 답은 아니다

**오버엔지니어링 신호**:

- 단순 fetch에 RxJS

- 단일 이벤트에 Observable

- 학습 곡선이 비즈니스 가치 초과

**적합한 경우**:

- 복잡한 비동기 흐름

- 여러 이벤트 합성

- 실시간 데이터

- backpressure 필요

10. Reactive vs Async/Await

10.1 비교

// Async/Await

async function getUserOrders(userId) {

const user = await fetchUser(userId)

const orders = await fetchOrders(user.id)

return { user, orders }

}

// Reactive (RxJS)

function getUserOrders(userId) {

return fetchUser$(userId).pipe(

switchMap(user =>

fetchOrders$(user.id).pipe(

map(orders => ({ user, orders }))

)

)

)

}

10.2 언제 무엇?

**Async/Await**:

- 단순 순차 비동기

- 1회성 작업

- 학습 곡선 낮음

- 대부분의 경우

**Reactive**:

- 스트림 (여러 값)

- 복잡한 합성 (combineLatest, merge)

- 시간 기반 연산 (debounce, throttle)

- backpressure

- 취소 가능 (cancellation)

10.3 혼용

async function loadData() {

const cached$ = fromPromise(getCached())

const fresh$ = fromPromise(fetchFresh())

return await firstValueFrom(

merge(cached$, fresh$).pipe(

first(data => data !== null)

)

)

}

퀴즈

**답**: **Cold**: 구독 시 데이터 생성 시작. 각 구독자가 독립적인 데이터 받음. 예: HTTP 요청, 파일 읽기. **Hot**: 데이터가 이미 흐르고 있음. 새 구독자는 그 시점부터 받음 (이전 데이터 못 받음). 예: 마우스 이벤트, WebSocket, BehaviorSubject. **흔한 함정**: HTTP를 두 번 구독하면 두 번 호출됨 (Cold). `shareReplay()`로 hot으로 변환 가능. **Cold/Hot 혼란이 가장 흔한 RxJS 버그의 원인**.

**답**: 사용자가 "react"를 입력 중일 때 매 글자마다 API 호출이 시작됩니다. **switchMap**: 새 입력이 오면 **이전 검색을 자동 취소**하고 새 검색 시작. 결과: 마지막 검색 결과만 받음 → race condition 방지. **다른 옵션**: `mergeMap`은 모든 결과 (race condition), `concatMap`은 순차 (느림), `exhaustMap`은 진행 중이면 무시 (잘못된 동작). switchMap이 정확한 답.

**답**: **Backpressure**: 생산자가 소비자보다 빠를 때의 문제. 처리 안 하면 메모리 폭증, OOM. **해결 전략**: (1) **Buffer** — 메모리에 쌓음 (한계 있음), (2) **Drop** — 초과분 버림, (3) **Latest** — 최신 값만 유지, (4) **Sample** — 주기적 샘플링, (5) **Throttle** — 속도 제한, (6) **Pull-based** — 소비자가 명시적으로 요청 (Reactive Streams 표준). 선택은 데이터 특성에 따라.

**답**: 컴포넌트가 죽어도 subscription이 살아있으면 메모리 누수. **해결**: (1) **takeUntil(destroy$)** — `destroy$` Subject로 unsubscribe 신호, (2) **async pipe (Angular)** — 자동 unsubscribe, (3) **takeWhile** — 조건부 종료, (4) **수동 unsubscribe** — `subscription.unsubscribe()` (덜 권장). **가장 안전한 패턴**: takeUntil + ngOnDestroy 또는 async pipe. React에서는 useEffect cleanup 함수에서 unsubscribe.

**답**: **Async/Await**: 단순 순차 비동기, 1회성 작업, 학습 곡선 낮음, 대부분의 경우. **Reactive**: 스트림 (여러 값), 복잡한 합성 (combineLatest), 시간 기반 (debounce/throttle), backpressure 필요, 취소 가능. **흔한 실수**: 단순 fetch에 RxJS 사용 → 오버엔지니어링. **적절한 사용**: 검색 자동완성, 실시간 데이터, WebSocket, 복잡한 폼 검증. 두 가지를 혼용도 가능 (`firstValueFrom`, `from`).

참고 자료

- [RxJS](https://rxjs.dev/) — 공식 문서

- [Project Reactor](https://projectreactor.io/) — Java

- [Kotlin Flow](https://kotlinlang.org/docs/flow.html)

- [ReactiveX](http://reactivex.io/) — 모든 언어

- [Marble Diagrams](https://rxmarbles.com/) — 시각화

- [Reactive Streams](https://www.reactive-streams.org/) — JVM/JS 표준

- [RxJS in Action](https://www.manning.com/books/rxjs-in-action)

- [Functional Reactive Programming](https://www.manning.com/books/functional-reactive-programming)

- [Spring WebFlux](https://docs.spring.io/spring-framework/reference/web/webflux.html)

- [Akka Streams](https://doc.akka.io/docs/akka/current/stream/index.html)

- [Vert.x](https://vertx.io/)

현재 단락 (1/439)

- **Reactive = 비동기 + 스트림 + 함수형**: 데이터의 흐름을 다루는 패러다임

작성 글자: 0원문 글자: 12,617작성 단락: 0/439