TL;DR
- Reactive = 비동기 + 스트림 + 함수형: 데이터의 흐름을 다루는 패러다임
- Observable: 0개~무한 개 값의 스트림
- 3대 라이브러리: RxJS (JavaScript), Reactor (Java), Kotlin Flow
- Backpressure: Reactive의 가장 어려운 문제. 소비자가 따라가지 못할 때 대처
- Hot vs Cold: 데이터 생성 시점 차이. 매우 중요한 구분
1. Reactive Programming이란?
1.1 정의
비동기 데이터 스트림과 변화 전파(propagation of change)를 다루는 프로그래밍 패러다임.
3가지 핵심:
- 비동기 — 결과를 기다리지 않음
- 데이터 스트림 — 시간에 걸친 값들의 흐름
- 변화 전파 — 한 값이 변하면 의존하는 값들도 자동 업데이트
1.2 왜 Reactive인가?
전통적 명령형:
const a = 1
const b = 2
const sum = a + b // 3
a = 5
console.log(sum) // 여전히 3
Reactive:
const a$ = new BehaviorSubject(1)
const b$ = new BehaviorSubject(2)
const sum$ = combineLatest([a$, b$]).pipe(map(([a, b]) => a + b))
sum$.subscribe(s => console.log(s)) // 3
a$.next(5)
// 자동으로 8 출력
→ 자동 전파 = 스프레드시트와 비슷.
1.3 적합한 시나리오
✅ UI 이벤트 (클릭, 입력) ✅ WebSocket 메시지 ✅ 비동기 API 호출 합성 ✅ 실시간 데이터 (차트, 알림) ✅ 검색 자동완성 (debounce, throttle)
❌ 단순 CRUD (오버엔지니어링) ❌ 순수 동기 작업 ❌ 간단한 상태 관리
2. Observable과 Observer
2.1 Observable
Observable = 데이터 스트림.
import { Observable } from 'rxjs'
const observable$ = new Observable<number>(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
setTimeout(() => {
subscriber.next(4)
subscriber.complete()
}, 1000)
})
관례: 변수명 끝에 $ (예: users$).
2.2 Observer (Subscriber)
Observer = Observable을 구독하는 객체. 3개 콜백:
observable$.subscribe({
next: (value) => console.log('Got:', value),
error: (err) => console.error('Error:', err),
complete: () => console.log('Done')
})
규칙:
next: 값마다 호출 (0~N번)error: 에러 시 1번 (그 후 종료)complete: 정상 종료 시 1번
계약: error 또는 complete 후에는 next 호출 X.
2.3 Cold vs Hot
Cold Observable: 구독 시 데이터 생성 시작.
const cold$ = new Observable(subscriber => {
console.log('Producing...')
subscriber.next(Math.random())
})
cold$.subscribe(v => console.log('A:', v)) // "Producing..." + 다른 값
cold$.subscribe(v => console.log('B:', v)) // "Producing..." + 다른 값
Hot Observable: 데이터가 이미 흐르고 있음. 구독자는 그 시점부터 받음.
import { Subject } from 'rxjs'
const hot$ = new Subject<number>()
hot$.subscribe(v => console.log('A:', v))
hot$.next(1) // A: 1
hot$.subscribe(v => console.log('B:', v))
hot$.next(2) // A: 2, B: 2
// B는 1을 못 받음 (이미 지나감)
예시:
- Cold: HTTP 요청, 파일 읽기, DB 쿼리
- Hot: 마우스 이벤트, WebSocket, BehaviorSubject
2.4 Subject 종류
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs'
// Subject: 일반
const subject = new Subject<number>()
// BehaviorSubject: 마지막 값 기억, 새 구독자에게 즉시 전달
const behavior = new BehaviorSubject<number>(0)
// ReplaySubject: N개 과거 값 기억
const replay = new ReplaySubject<number>(3) // 마지막 3개
// AsyncSubject: complete 시 마지막 값만 전달
const async$ = new AsyncSubject<number>()
3. RxJS — JavaScript의 표준
3.1 기본 사용
import { fromEvent } from 'rxjs'
import { map, filter, debounceTime } from 'rxjs/operators'
const input = document.querySelector('input')
fromEvent(input, 'input').pipe(
map((e: any) => e.target.value),
filter(text => text.length >= 3),
debounceTime(300)
).subscribe(text => {
console.log('Search:', text)
})
흐름:
- 입력 이벤트 발생
- 값 추출
- 3글자 이상 필터
- 300ms 동안 입력 없으면 통과
- 검색 실행
3.2 자주 쓰는 연산자
Transformation:
.pipe(
map(x => x * 2), // 변환
pluck('user', 'name'), // 중첩 객체 추출
scan((acc, v) => acc + v, 0) // 누적 (reduce)
)
Filtering:
.pipe(
filter(x => x > 10), // 조건 필터
take(5), // 처음 5개만
takeUntil(stop$), // stop$ 발행까지
distinctUntilChanged(), // 중복 제거 (연속)
debounceTime(300), // 300ms 동안 조용하면
throttleTime(1000) // 1초에 한 번
)
Combination:
import { merge, combineLatest, zip, forkJoin } from 'rxjs'
merge(a$, b$) // 둘 중 어느 것이든 발행하면
combineLatest([a$, b$]) // 둘 다 최소 1번 발행 후, 둘 중 하나 발행 시마다
zip(a$, b$) // 같은 인덱스끼리 짝
forkJoin([a$, b$]) // 모두 complete 후 마지막 값
Flattening:
.pipe(
switchMap(id => fetch(`/api/users/${id}`)), // 새 값 오면 이전 취소
mergeMap(id => fetch(`/api/users/${id}`)), // 모두 병렬 처리
concatMap(id => fetch(`/api/users/${id}`)), // 순차 처리
exhaustMap(id => fetch(`/api/users/${id}`)) // 진행 중이면 새 값 무시
)
3.3 switchMap의 마법
검색 자동완성의 핵심:
fromEvent(input, 'input').pipe(
map((e: any) => e.target.value),
debounceTime(300),
switchMap(query => fetch(`/search?q=${query}`))
).subscribe(results => render(results))
왜 switchMap?:
- 사용자가 "react"를 입력 중
- "rea" → 검색 시작
- "reac" → 이전 검색 자동 취소, 새 검색 시작
- 마지막 검색 결과만 받음 → race condition 방지
다른 옵션이라면:
mergeMap: 모든 검색 결과 (race condition)concatMap: 순차 처리 (느림)exhaustMap: 진행 중이면 새 입력 무시 (잘못된 동작)
3.4 Error Handling
.pipe(
map(x => {
if (x < 0) throw new Error('Negative')
return x
}),
catchError(err => {
console.error(err)
return of(0) // 기본값으로 대체
}),
retry(3), // 3번 재시도
retryWhen(errors => errors.pipe(
delay(1000),
take(3)
))
)
3.5 마블 다이어그램
input: --1--2--3--4--5---|
map(x => x * 2):
output: --2--4--6--8--10--|
a$: --1----2----3---|
b$: ---a----b---c--|
combineLatest:
output: ---[1,a]-[2,a][2,b]-[3,b][3,c]--|
마블 다이어그램은 RxJS 학습의 핵심. 모든 연산자가 마블로 설명됩니다.
4. Project Reactor — Java의 표준
4.1 Mono와 Flux
Mono: 0개 또는 1개 값. Flux: 0개~N개 값.
// Mono - 단일 값 또는 없음
Mono<User> user = userRepository.findById(123);
// Flux - 여러 값
Flux<User> users = userRepository.findAll();
4.2 Spring WebFlux
전통적 Spring MVC (블로킹):
@GetMapping("/users/{id}")
public User getUser(@PathVariable Long id) {
return userRepository.findById(id).get();
}
WebFlux (논블로킹):
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userRepository.findById(id);
}
효과: 적은 스레드로 더 많은 동시 요청 처리. Netty 기반.
4.3 자주 쓰는 연산자
Flux.range(1, 10)
.map(i -> i * 2)
.filter(i -> i > 5)
.take(3)
.collectList()
.subscribe(System.out::println);
4.4 비동기 합성
Mono<User> user = userRepository.findById(userId);
Mono<List<Order>> orders = orderRepository.findByUserId(userId);
Mono<UserDto> userDto = Mono.zip(user, orders)
.map(tuple -> new UserDto(tuple.getT1(), tuple.getT2()));
4.5 R2DBC
전통적 JDBC = 블로킹. R2DBC = Reactive Relational Database Connectivity.
@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByCountry(String country);
}
블로킹 없는 PostgreSQL/MySQL 접근.
5. Kotlin Flow
5.1 Coroutines + Reactive
Kotlin Flow = Coroutines 위에 reactive 스트림.
fun fetchUsers(): Flow<User> = flow {
val users = userRepository.findAll()
users.forEach { user ->
emit(user)
}
}
// 사용
fetchUsers()
.filter { it.age > 18 }
.map { it.name }
.collect { name -> println(name) }
5.2 Cold vs Hot Flow
Cold Flow (flow { }): 구독 시 시작.
Hot Flow:
StateFlow: BehaviorSubject 같음, 마지막 값 기억SharedFlow: Subject 같음
// StateFlow
val count = MutableStateFlow(0)
count.value = 1
// 구독
count.collect { value -> println(value) }
5.3 Android에서
class UserViewModel : ViewModel() {
val users: StateFlow<List<User>> = userRepository
.getUsers()
.stateIn(viewModelScope, SharingStarted.Eagerly, emptyList())
}
// Compose
@Composable
fun UserList(viewModel: UserViewModel) {
val users by viewModel.users.collectAsState()
LazyColumn {
items(users) { user -> Text(user.name) }
}
}
5.4 RxJava와의 비교
| RxJava | Kotlin Flow | |
|---|---|---|
| 언어 | Java | Kotlin |
| 기반 | Threads | Coroutines |
| 메모리 | 보통 | 더 적음 |
| 연산자 | 200+ | ~60 (충분) |
| 학습 곡선 | 가파름 | 보통 |
| Backpressure | 명시적 | Suspending |
Kotlin 프로젝트는 Flow 권장.
6. Backpressure — 가장 어려운 문제
6.1 문제
시나리오: 서버가 초당 10,000 메시지 발행. 클라이언트는 초당 100 메시지만 처리 가능.
결과:
- 메모리 폭증 (큐 누적)
- 스레드 고갈
- OOM 크래시
6.2 해결 전략
1. Buffer: 메모리에 쌓아둠 (한계 있음)
.pipe(
bufferTime(1000), // 1초씩 버퍼
// 또는
buffer(stop$), // 신호로 flush
)
2. Drop: 초과분 버림
Flux.range(1, 1000)
.onBackpressureDrop(dropped -> log.warn("Dropped: " + dropped))
3. Latest: 최신 값만 유지
Flux.range(1, 1000)
.onBackpressureLatest()
4. Sample: 주기적 샘플링
.pipe(sampleTime(100)) // 100ms마다 최신 값
5. Throttle: 속도 제한
.pipe(throttleTime(100)) // 100ms에 한 번
6. Pull-based: 컨슈머가 요청한 만큼만
// Reactive Streams의 표준
subscription.request(10) // 10개만 더 줘
6.3 Reactive Streams 표준
Reactive Streams = JVM/JS의 backpressure 표준.
4개 인터페이스:
Publisher: 데이터 발행Subscriber: 데이터 수신Subscription: 구독 관리Processor: 둘 다
핵심: subscription.request(n) — 컨슈머가 명시적으로 n개 요청.
구현: Project Reactor, RxJava 2+, Akka Streams, Vert.x.
7. 실전 패턴
7.1 자동완성 (Type-Ahead Search)
const input = document.querySelector('#search')
fromEvent(input, 'input').pipe(
map(e => e.target.value),
debounceTime(300), // 300ms 대기
distinctUntilChanged(), // 같은 값 중복 제거
filter(text => text.length >= 2), // 2글자 이상
switchMap(text => // 새 검색 시 이전 취소
fromFetch(`/api/search?q=${text}`).pipe(
switchMap(res => res.json()),
catchError(() => of([])) // 에러 시 빈 배열
)
)
).subscribe(results => render(results))
이 단순한 코드가:
- 사용자가 빠르게 타이핑할 때 모든 글자에 대해 검색 X
- race condition 방지 (이전 검색 결과가 새 검색 결과를 덮지 않음)
- 에러 처리
7.2 Polling
import { interval, switchMap } from 'rxjs'
interval(5000).pipe(
switchMap(() => fetch('/api/status'))
).subscribe(status => updateUI(status))
5초마다 API 호출. 자동 cleanup.
7.3 WebSocket 합성
import { webSocket } from 'rxjs/webSocket'
const messages$ = webSocket('wss://example.com/chat')
messages$.pipe(
filter(msg => msg.type === 'message'),
map(msg => msg.text)
).subscribe(text => addMessage(text))
7.4 Retry with Backoff
import { retry, timer } from 'rxjs'
fetch$.pipe(
retry({
count: 3,
delay: (error, retryCount) => {
const delay = Math.pow(2, retryCount) * 1000
return timer(delay)
}
})
)
Exponential backoff: 1초 → 2초 → 4초.
7.5 Form Validation
const username$ = fromEvent(usernameInput, 'input').pipe(
map(e => e.target.value),
debounceTime(500),
switchMap(name =>
fromFetch(`/api/check-username?name=${name}`).pipe(
switchMap(res => res.json())
)
)
)
const password$ = fromEvent(passwordInput, 'input').pipe(
map(e => e.target.value),
map(pw => pw.length >= 8)
)
const formValid$ = combineLatest([
username$.pipe(map(res => res.available)),
password$
]).pipe(
map(([usernameOk, passwordOk]) => usernameOk && passwordOk)
)
formValid$.subscribe(valid => submitButton.disabled = !valid)
8. Schedulers와 동시성
8.1 Schedulers
스트림이 어떤 스레드에서 실행되는가?
RxJS Schedulers:
asyncScheduler: setTimeout 기반asapScheduler: microtaskqueueScheduler: 동기 (즉시)animationFrameScheduler: requestAnimationFrame
.pipe(
observeOn(asyncScheduler), // 어디서 처리할지
subscribeOn(asyncScheduler) // 어디서 시작할지
)
8.2 RxJava Schedulers
.subscribeOn(Schedulers.io()) // I/O 작업
.observeOn(Schedulers.computation()) // CPU 작업
.observeOn(AndroidSchedulers.mainThread()) // UI
일반 패턴: I/O는 io 스케줄러, UI 업데이트는 main 스레드.
9. 함정과 베스트 프랙티스
9.1 메모리 누수 — Subscription 관리
잘못:
ngOnInit() {
this.users$.subscribe(users => this.users = users)
// 컴포넌트가 죽어도 subscription은 살아있음 → 메모리 누수
}
해결 1: takeUntil
private destroy$ = new Subject<void>()
ngOnInit() {
this.users$.pipe(
takeUntil(this.destroy$)
).subscribe(users => this.users = users)
}
ngOnDestroy() {
this.destroy$.next()
this.destroy$.complete()
}
해결 2: async pipe (Angular)
<div *ngFor="let user of users$ | async">
{{ user.name }}
</div>
자동 cleanup.
9.2 Cold/Hot 혼란
잘못:
const data$ = fetch$.pipe(map(transformData))
// 두 번 구독 → fetch가 두 번 실행
data$.subscribe(handler1)
data$.subscribe(handler2)
해결: share() 또는 shareReplay()
const data$ = fetch$.pipe(
map(transformData),
shareReplay(1) // 한 번 fetch, 결과 공유
)
9.3 너무 많은 nested
// Pyramid of doom
.pipe(
switchMap(a =>
fetch1(a).pipe(
switchMap(b =>
fetch2(b).pipe(
map(c => transform(c))
)
)
)
)
)
해결: 작은 함수로 분리, async/await 고려.
9.4 RxJS가 항상 답은 아니다
오버엔지니어링 신호:
- 단순 fetch에 RxJS
- 단일 이벤트에 Observable
- 학습 곡선이 비즈니스 가치 초과
적합한 경우:
- 복잡한 비동기 흐름
- 여러 이벤트 합성
- 실시간 데이터
- backpressure 필요
10. Reactive vs Async/Await
10.1 비교
// Async/Await
async function getUserOrders(userId) {
const user = await fetchUser(userId)
const orders = await fetchOrders(user.id)
return { user, orders }
}
// Reactive (RxJS)
function getUserOrders(userId) {
return fetchUser$(userId).pipe(
switchMap(user =>
fetchOrders$(user.id).pipe(
map(orders => ({ user, orders }))
)
)
)
}
10.2 언제 무엇?
Async/Await:
- 단순 순차 비동기
- 1회성 작업
- 학습 곡선 낮음
- 대부분의 경우
Reactive:
- 스트림 (여러 값)
- 복잡한 합성 (combineLatest, merge)
- 시간 기반 연산 (debounce, throttle)
- backpressure
- 취소 가능 (cancellation)
10.3 혼용
async function loadData() {
const cached$ = fromPromise(getCached())
const fresh$ = fromPromise(fetchFresh())
return await firstValueFrom(
merge(cached$, fresh$).pipe(
first(data => data !== null)
)
)
}
퀴즈
1. Cold vs Hot Observable의 차이는?
답: Cold: 구독 시 데이터 생성 시작. 각 구독자가 독립적인 데이터 받음. 예: HTTP 요청, 파일 읽기. Hot: 데이터가 이미 흐르고 있음. 새 구독자는 그 시점부터 받음 (이전 데이터 못 받음). 예: 마우스 이벤트, WebSocket, BehaviorSubject. 흔한 함정: HTTP를 두 번 구독하면 두 번 호출됨 (Cold). shareReplay()로 hot으로 변환 가능. Cold/Hot 혼란이 가장 흔한 RxJS 버그의 원인.
2. switchMap이 자동완성에 적합한 이유는?
답: 사용자가 "react"를 입력 중일 때 매 글자마다 API 호출이 시작됩니다. switchMap: 새 입력이 오면 이전 검색을 자동 취소하고 새 검색 시작. 결과: 마지막 검색 결과만 받음 → race condition 방지. 다른 옵션: mergeMap은 모든 결과 (race condition), concatMap은 순차 (느림), exhaustMap은 진행 중이면 무시 (잘못된 동작). switchMap이 정확한 답.
3. Backpressure가 무엇이고 어떻게 처리하나요?
답: Backpressure: 생산자가 소비자보다 빠를 때의 문제. 처리 안 하면 메모리 폭증, OOM. 해결 전략: (1) Buffer — 메모리에 쌓음 (한계 있음), (2) Drop — 초과분 버림, (3) Latest — 최신 값만 유지, (4) Sample — 주기적 샘플링, (5) Throttle — 속도 제한, (6) Pull-based — 소비자가 명시적으로 요청 (Reactive Streams 표준). 선택은 데이터 특성에 따라.
4. RxJS Subscription 메모리 누수를 어떻게 방지하나요?
답: 컴포넌트가 죽어도 subscription이 살아있으면 메모리 누수. 해결: (1) **takeUntil(destroySubject로 unsubscribe 신호, (2) **async pipe (Angular)** — 자동 unsubscribe, (3) **takeWhile** — 조건부 종료, (4) **수동 unsubscribe** —subscription.unsubscribe()` (덜 권장). 가장 안전한 패턴: takeUntil + ngOnDestroy 또는 async pipe. React에서는 useEffect cleanup 함수에서 unsubscribe.
5. Reactive와 Async/Await를 언제 사용하나요?
답: Async/Await: 단순 순차 비동기, 1회성 작업, 학습 곡선 낮음, 대부분의 경우. Reactive: 스트림 (여러 값), 복잡한 합성 (combineLatest), 시간 기반 (debounce/throttle), backpressure 필요, 취소 가능. 흔한 실수: 단순 fetch에 RxJS 사용 → 오버엔지니어링. 적절한 사용: 검색 자동완성, 실시간 데이터, WebSocket, 복잡한 폼 검증. 두 가지를 혼용도 가능 (firstValueFrom, from).
참고 자료
- RxJS — 공식 문서
- Project Reactor — Java
- Kotlin Flow
- ReactiveX — 모든 언어
- Marble Diagrams — 시각화
- Reactive Streams — JVM/JS 표준
- RxJS in Action
- Functional Reactive Programming
- Spring WebFlux
- Akka Streams
- Vert.x
현재 단락 (1/456)
- **Reactive = 비동기 + 스트림 + 함수형**: 데이터의 흐름을 다루는 패러다임