Skip to content
Published on

Reactive Programming 완전 가이드 2025: RxJS, Reactor, Kotlin Flow, Backpressure

Authors

TL;DR

  • Reactive = 비동기 + 스트림 + 함수형: 데이터의 흐름을 다루는 패러다임
  • Observable: 0개~무한 개 값의 스트림
  • 3대 라이브러리: RxJS (JavaScript), Reactor (Java), Kotlin Flow
  • Backpressure: Reactive의 가장 어려운 문제. 소비자가 따라가지 못할 때 대처
  • Hot vs Cold: 데이터 생성 시점 차이. 매우 중요한 구분

1. Reactive Programming이란?

1.1 정의

비동기 데이터 스트림과 변화 전파(propagation of change)를 다루는 프로그래밍 패러다임.

3가지 핵심:

  1. 비동기 — 결과를 기다리지 않음
  2. 데이터 스트림 — 시간에 걸친 값들의 흐름
  3. 변화 전파 — 한 값이 변하면 의존하는 값들도 자동 업데이트

1.2 왜 Reactive인가?

전통적 명령형:

const a = 1
const b = 2
const sum = a + b  // 3
a = 5
console.log(sum)  // 여전히 3

Reactive:

const a$ = new BehaviorSubject(1)
const b$ = new BehaviorSubject(2)
const sum$ = combineLatest([a$, b$]).pipe(map(([a, b]) => a + b))

sum$.subscribe(s => console.log(s))  // 3
a$.next(5)
// 자동으로 8 출력

자동 전파 = 스프레드시트와 비슷.

1.3 적합한 시나리오

UI 이벤트 (클릭, 입력) ✅ WebSocket 메시지비동기 API 호출 합성실시간 데이터 (차트, 알림) ✅ 검색 자동완성 (debounce, throttle)

단순 CRUD (오버엔지니어링) ❌ 순수 동기 작업간단한 상태 관리


2. Observable과 Observer

2.1 Observable

Observable = 데이터 스트림.

import { Observable } from 'rxjs'

const observable$ = new Observable<number>(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  setTimeout(() => {
    subscriber.next(4)
    subscriber.complete()
  }, 1000)
})

관례: 변수명 끝에 $ (예: users$).

2.2 Observer (Subscriber)

Observer = Observable을 구독하는 객체. 3개 콜백:

observable$.subscribe({
  next: (value) => console.log('Got:', value),
  error: (err) => console.error('Error:', err),
  complete: () => console.log('Done')
})

규칙:

  • next: 값마다 호출 (0~N번)
  • error: 에러 시 1번 (그 후 종료)
  • complete: 정상 종료 시 1번

계약: error 또는 complete 후에는 next 호출 X.

2.3 Cold vs Hot

Cold Observable: 구독 시 데이터 생성 시작.

const cold$ = new Observable(subscriber => {
  console.log('Producing...')
  subscriber.next(Math.random())
})

cold$.subscribe(v => console.log('A:', v))  // "Producing..." + 다른 값
cold$.subscribe(v => console.log('B:', v))  // "Producing..." + 다른 값

Hot Observable: 데이터가 이미 흐르고 있음. 구독자는 그 시점부터 받음.

import { Subject } from 'rxjs'

const hot$ = new Subject<number>()

hot$.subscribe(v => console.log('A:', v))
hot$.next(1)  // A: 1

hot$.subscribe(v => console.log('B:', v))
hot$.next(2)  // A: 2, B: 2
// B는 1을 못 받음 (이미 지나감)

예시:

  • Cold: HTTP 요청, 파일 읽기, DB 쿼리
  • Hot: 마우스 이벤트, WebSocket, BehaviorSubject

2.4 Subject 종류

import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs'

// Subject: 일반
const subject = new Subject<number>()

// BehaviorSubject: 마지막 값 기억, 새 구독자에게 즉시 전달
const behavior = new BehaviorSubject<number>(0)

// ReplaySubject: N개 과거 값 기억
const replay = new ReplaySubject<number>(3)  // 마지막 3개

// AsyncSubject: complete 시 마지막 값만 전달
const async$ = new AsyncSubject<number>()

3. RxJS — JavaScript의 표준

3.1 기본 사용

import { fromEvent } from 'rxjs'
import { map, filter, debounceTime } from 'rxjs/operators'

const input = document.querySelector('input')

fromEvent(input, 'input').pipe(
  map((e: any) => e.target.value),
  filter(text => text.length >= 3),
  debounceTime(300)
).subscribe(text => {
  console.log('Search:', text)
})

흐름:

  1. 입력 이벤트 발생
  2. 값 추출
  3. 3글자 이상 필터
  4. 300ms 동안 입력 없으면 통과
  5. 검색 실행

3.2 자주 쓰는 연산자

Transformation:

.pipe(
  map(x => x * 2),                    // 변환
  pluck('user', 'name'),              // 중첩 객체 추출
  scan((acc, v) => acc + v, 0)        // 누적 (reduce)
)

Filtering:

.pipe(
  filter(x => x > 10),                // 조건 필터
  take(5),                            // 처음 5개만
  takeUntil(stop$),                   // stop$ 발행까지
  distinctUntilChanged(),             // 중복 제거 (연속)
  debounceTime(300),                  // 300ms 동안 조용하면
  throttleTime(1000)                  // 1초에 한 번
)

Combination:

import { merge, combineLatest, zip, forkJoin } from 'rxjs'

merge(a$, b$)              // 둘 중 어느 것이든 발행하면
combineLatest([a$, b$])    // 둘 다 최소 1번 발행 후, 둘 중 하나 발행 시마다
zip(a$, b$)                // 같은 인덱스끼리 짝
forkJoin([a$, b$])         // 모두 complete 후 마지막 값

Flattening:

.pipe(
  switchMap(id => fetch(`/api/users/${id}`)),  // 새 값 오면 이전 취소
  mergeMap(id => fetch(`/api/users/${id}`)),   // 모두 병렬 처리
  concatMap(id => fetch(`/api/users/${id}`)),  // 순차 처리
  exhaustMap(id => fetch(`/api/users/${id}`))  // 진행 중이면 새 값 무시
)

3.3 switchMap의 마법

검색 자동완성의 핵심:

fromEvent(input, 'input').pipe(
  map((e: any) => e.target.value),
  debounceTime(300),
  switchMap(query => fetch(`/search?q=${query}`))
).subscribe(results => render(results))

왜 switchMap?:

  • 사용자가 "react"를 입력 중
  • "rea" → 검색 시작
  • "reac" → 이전 검색 자동 취소, 새 검색 시작
  • 마지막 검색 결과만 받음 → race condition 방지

다른 옵션이라면:

  • mergeMap: 모든 검색 결과 (race condition)
  • concatMap: 순차 처리 (느림)
  • exhaustMap: 진행 중이면 새 입력 무시 (잘못된 동작)

3.4 Error Handling

.pipe(
  map(x => {
    if (x < 0) throw new Error('Negative')
    return x
  }),
  catchError(err => {
    console.error(err)
    return of(0)  // 기본값으로 대체
  }),
  retry(3),  // 3번 재시도
  retryWhen(errors => errors.pipe(
    delay(1000),
    take(3)
  ))
)

3.5 마블 다이어그램

input:  --1--2--3--4--5---|
map(x => x * 2):
output: --2--4--6--8--10--|
a$:    --1----2----3---|
b$:    ---a----b---c--|
combineLatest:
output: ---[1,a]-[2,a][2,b]-[3,b][3,c]--|

마블 다이어그램은 RxJS 학습의 핵심. 모든 연산자가 마블로 설명됩니다.


4. Project Reactor — Java의 표준

4.1 Mono와 Flux

Mono: 0개 또는 1개 값. Flux: 0개~N개 값.

// Mono - 단일 값 또는 없음
Mono<User> user = userRepository.findById(123);

// Flux - 여러 값
Flux<User> users = userRepository.findAll();

4.2 Spring WebFlux

전통적 Spring MVC (블로킹):

@GetMapping("/users/{id}")
public User getUser(@PathVariable Long id) {
    return userRepository.findById(id).get();
}

WebFlux (논블로킹):

@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
    return userRepository.findById(id);
}

효과: 적은 스레드로 더 많은 동시 요청 처리. Netty 기반.

4.3 자주 쓰는 연산자

Flux.range(1, 10)
    .map(i -> i * 2)
    .filter(i -> i > 5)
    .take(3)
    .collectList()
    .subscribe(System.out::println);

4.4 비동기 합성

Mono<User> user = userRepository.findById(userId);
Mono<List<Order>> orders = orderRepository.findByUserId(userId);

Mono<UserDto> userDto = Mono.zip(user, orders)
    .map(tuple -> new UserDto(tuple.getT1(), tuple.getT2()));

4.5 R2DBC

전통적 JDBC = 블로킹. R2DBC = Reactive Relational Database Connectivity.

@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
    Flux<User> findByCountry(String country);
}

블로킹 없는 PostgreSQL/MySQL 접근.


5. Kotlin Flow

5.1 Coroutines + Reactive

Kotlin Flow = Coroutines 위에 reactive 스트림.

fun fetchUsers(): Flow<User> = flow {
    val users = userRepository.findAll()
    users.forEach { user ->
        emit(user)
    }
}

// 사용
fetchUsers()
    .filter { it.age > 18 }
    .map { it.name }
    .collect { name -> println(name) }

5.2 Cold vs Hot Flow

Cold Flow (flow { }): 구독 시 시작.

Hot Flow:

  • StateFlow: BehaviorSubject 같음, 마지막 값 기억
  • SharedFlow: Subject 같음
// StateFlow
val count = MutableStateFlow(0)
count.value = 1

// 구독
count.collect { value -> println(value) }

5.3 Android에서

class UserViewModel : ViewModel() {
    val users: StateFlow<List<User>> = userRepository
        .getUsers()
        .stateIn(viewModelScope, SharingStarted.Eagerly, emptyList())
}

// Compose
@Composable
fun UserList(viewModel: UserViewModel) {
    val users by viewModel.users.collectAsState()
    LazyColumn {
        items(users) { user -> Text(user.name) }
    }
}

5.4 RxJava와의 비교

RxJavaKotlin Flow
언어JavaKotlin
기반ThreadsCoroutines
메모리보통더 적음
연산자200+~60 (충분)
학습 곡선가파름보통
Backpressure명시적Suspending

Kotlin 프로젝트는 Flow 권장.


6. Backpressure — 가장 어려운 문제

6.1 문제

시나리오: 서버가 초당 10,000 메시지 발행. 클라이언트는 초당 100 메시지만 처리 가능.

결과:

  • 메모리 폭증 (큐 누적)
  • 스레드 고갈
  • OOM 크래시

6.2 해결 전략

1. Buffer: 메모리에 쌓아둠 (한계 있음)

.pipe(
  bufferTime(1000),  // 1초씩 버퍼
  // 또는
  buffer(stop$),     // 신호로 flush
)

2. Drop: 초과분 버림

Flux.range(1, 1000)
    .onBackpressureDrop(dropped -> log.warn("Dropped: " + dropped))

3. Latest: 최신 값만 유지

Flux.range(1, 1000)
    .onBackpressureLatest()

4. Sample: 주기적 샘플링

.pipe(sampleTime(100))  // 100ms마다 최신 값

5. Throttle: 속도 제한

.pipe(throttleTime(100))  // 100ms에 한 번

6. Pull-based: 컨슈머가 요청한 만큼만

// Reactive Streams의 표준
subscription.request(10)  // 10개만 더 줘

6.3 Reactive Streams 표준

Reactive Streams = JVM/JS의 backpressure 표준.

4개 인터페이스:

  • Publisher: 데이터 발행
  • Subscriber: 데이터 수신
  • Subscription: 구독 관리
  • Processor: 둘 다

핵심: subscription.request(n) — 컨슈머가 명시적으로 n개 요청.

구현: Project Reactor, RxJava 2+, Akka Streams, Vert.x.


7. 실전 패턴

const input = document.querySelector('#search')

fromEvent(input, 'input').pipe(
  map(e => e.target.value),
  debounceTime(300),                // 300ms 대기
  distinctUntilChanged(),           // 같은 값 중복 제거
  filter(text => text.length >= 2), // 2글자 이상
  switchMap(text =>                  // 새 검색 시 이전 취소
    fromFetch(`/api/search?q=${text}`).pipe(
      switchMap(res => res.json()),
      catchError(() => of([]))      // 에러 시 빈 배열
    )
  )
).subscribe(results => render(results))

이 단순한 코드가:

  • 사용자가 빠르게 타이핑할 때 모든 글자에 대해 검색 X
  • race condition 방지 (이전 검색 결과가 새 검색 결과를 덮지 않음)
  • 에러 처리

7.2 Polling

import { interval, switchMap } from 'rxjs'

interval(5000).pipe(
  switchMap(() => fetch('/api/status'))
).subscribe(status => updateUI(status))

5초마다 API 호출. 자동 cleanup.

7.3 WebSocket 합성

import { webSocket } from 'rxjs/webSocket'

const messages$ = webSocket('wss://example.com/chat')

messages$.pipe(
  filter(msg => msg.type === 'message'),
  map(msg => msg.text)
).subscribe(text => addMessage(text))

7.4 Retry with Backoff

import { retry, timer } from 'rxjs'

fetch$.pipe(
  retry({
    count: 3,
    delay: (error, retryCount) => {
      const delay = Math.pow(2, retryCount) * 1000
      return timer(delay)
    }
  })
)

Exponential backoff: 1초 → 2초 → 4초.

7.5 Form Validation

const username$ = fromEvent(usernameInput, 'input').pipe(
  map(e => e.target.value),
  debounceTime(500),
  switchMap(name => 
    fromFetch(`/api/check-username?name=${name}`).pipe(
      switchMap(res => res.json())
    )
  )
)

const password$ = fromEvent(passwordInput, 'input').pipe(
  map(e => e.target.value),
  map(pw => pw.length >= 8)
)

const formValid$ = combineLatest([
  username$.pipe(map(res => res.available)),
  password$
]).pipe(
  map(([usernameOk, passwordOk]) => usernameOk && passwordOk)
)

formValid$.subscribe(valid => submitButton.disabled = !valid)

8. Schedulers와 동시성

8.1 Schedulers

스트림이 어떤 스레드에서 실행되는가?

RxJS Schedulers:

  • asyncScheduler: setTimeout 기반
  • asapScheduler: microtask
  • queueScheduler: 동기 (즉시)
  • animationFrameScheduler: requestAnimationFrame
.pipe(
  observeOn(asyncScheduler),  // 어디서 처리할지
  subscribeOn(asyncScheduler) // 어디서 시작할지
)

8.2 RxJava Schedulers

.subscribeOn(Schedulers.io())          // I/O 작업
.observeOn(Schedulers.computation())   // CPU 작업
.observeOn(AndroidSchedulers.mainThread()) // UI

일반 패턴: I/O는 io 스케줄러, UI 업데이트는 main 스레드.


9. 함정과 베스트 프랙티스

9.1 메모리 누수 — Subscription 관리

잘못:

ngOnInit() {
  this.users$.subscribe(users => this.users = users)
  // 컴포넌트가 죽어도 subscription은 살아있음 → 메모리 누수
}

해결 1: takeUntil

private destroy$ = new Subject<void>()

ngOnInit() {
  this.users$.pipe(
    takeUntil(this.destroy$)
  ).subscribe(users => this.users = users)
}

ngOnDestroy() {
  this.destroy$.next()
  this.destroy$.complete()
}

해결 2: async pipe (Angular)

<div *ngFor="let user of users$ | async">
  {{ user.name }}
</div>

자동 cleanup.

9.2 Cold/Hot 혼란

잘못:

const data$ = fetch$.pipe(map(transformData))

// 두 번 구독 → fetch가 두 번 실행
data$.subscribe(handler1)
data$.subscribe(handler2)

해결: share() 또는 shareReplay()

const data$ = fetch$.pipe(
  map(transformData),
  shareReplay(1)  // 한 번 fetch, 결과 공유
)

9.3 너무 많은 nested

// Pyramid of doom
.pipe(
  switchMap(a => 
    fetch1(a).pipe(
      switchMap(b => 
        fetch2(b).pipe(
          map(c => transform(c))
        )
      )
    )
  )
)

해결: 작은 함수로 분리, async/await 고려.

9.4 RxJS가 항상 답은 아니다

오버엔지니어링 신호:

  • 단순 fetch에 RxJS
  • 단일 이벤트에 Observable
  • 학습 곡선이 비즈니스 가치 초과

적합한 경우:

  • 복잡한 비동기 흐름
  • 여러 이벤트 합성
  • 실시간 데이터
  • backpressure 필요

10. Reactive vs Async/Await

10.1 비교

// Async/Await
async function getUserOrders(userId) {
  const user = await fetchUser(userId)
  const orders = await fetchOrders(user.id)
  return { user, orders }
}

// Reactive (RxJS)
function getUserOrders(userId) {
  return fetchUser$(userId).pipe(
    switchMap(user => 
      fetchOrders$(user.id).pipe(
        map(orders => ({ user, orders }))
      )
    )
  )
}

10.2 언제 무엇?

Async/Await:

  • 단순 순차 비동기
  • 1회성 작업
  • 학습 곡선 낮음
  • 대부분의 경우

Reactive:

  • 스트림 (여러 값)
  • 복잡한 합성 (combineLatest, merge)
  • 시간 기반 연산 (debounce, throttle)
  • backpressure
  • 취소 가능 (cancellation)

10.3 혼용

async function loadData() {
  const cached$ = fromPromise(getCached())
  const fresh$ = fromPromise(fetchFresh())
  
  return await firstValueFrom(
    merge(cached$, fresh$).pipe(
      first(data => data !== null)
    )
  )
}

퀴즈

1. Cold vs Hot Observable의 차이는?

: Cold: 구독 시 데이터 생성 시작. 각 구독자가 독립적인 데이터 받음. 예: HTTP 요청, 파일 읽기. Hot: 데이터가 이미 흐르고 있음. 새 구독자는 그 시점부터 받음 (이전 데이터 못 받음). 예: 마우스 이벤트, WebSocket, BehaviorSubject. 흔한 함정: HTTP를 두 번 구독하면 두 번 호출됨 (Cold). shareReplay()로 hot으로 변환 가능. Cold/Hot 혼란이 가장 흔한 RxJS 버그의 원인.

2. switchMap이 자동완성에 적합한 이유는?

: 사용자가 "react"를 입력 중일 때 매 글자마다 API 호출이 시작됩니다. switchMap: 새 입력이 오면 이전 검색을 자동 취소하고 새 검색 시작. 결과: 마지막 검색 결과만 받음 → race condition 방지. 다른 옵션: mergeMap은 모든 결과 (race condition), concatMap은 순차 (느림), exhaustMap은 진행 중이면 무시 (잘못된 동작). switchMap이 정확한 답.

3. Backpressure가 무엇이고 어떻게 처리하나요?

: Backpressure: 생산자가 소비자보다 빠를 때의 문제. 처리 안 하면 메모리 폭증, OOM. 해결 전략: (1) Buffer — 메모리에 쌓음 (한계 있음), (2) Drop — 초과분 버림, (3) Latest — 최신 값만 유지, (4) Sample — 주기적 샘플링, (5) Throttle — 속도 제한, (6) Pull-based — 소비자가 명시적으로 요청 (Reactive Streams 표준). 선택은 데이터 특성에 따라.

4. RxJS Subscription 메모리 누수를 어떻게 방지하나요?

: 컴포넌트가 죽어도 subscription이 살아있으면 메모리 누수. 해결: (1) **takeUntil(destroy)destroy)** — `destroySubject로 unsubscribe 신호, (2) **async pipe (Angular)** — 자동 unsubscribe, (3) **takeWhile** — 조건부 종료, (4) **수동 unsubscribe** —subscription.unsubscribe()` (덜 권장). 가장 안전한 패턴: takeUntil + ngOnDestroy 또는 async pipe. React에서는 useEffect cleanup 함수에서 unsubscribe.

5. Reactive와 Async/Await를 언제 사용하나요?

: Async/Await: 단순 순차 비동기, 1회성 작업, 학습 곡선 낮음, 대부분의 경우. Reactive: 스트림 (여러 값), 복잡한 합성 (combineLatest), 시간 기반 (debounce/throttle), backpressure 필요, 취소 가능. 흔한 실수: 단순 fetch에 RxJS 사용 → 오버엔지니어링. 적절한 사용: 검색 자동완성, 실시간 데이터, WebSocket, 복잡한 폼 검증. 두 가지를 혼용도 가능 (firstValueFrom, from).


참고 자료