TL;DR
- **Reactive = 비동기 + 스트림 + 함수형**: 데이터의 흐름을 다루는 패러다임
- **Observable**: 0개~무한 개 값의 스트림
- **3대 라이브러리**: RxJS (JavaScript), Reactor (Java), Kotlin Flow
- **Backpressure**: Reactive의 가장 어려운 문제. 소비자가 따라가지 못할 때 대처
- **Hot vs Cold**: 데이터 생성 시점 차이. 매우 중요한 구분
1. Reactive Programming이란?
1.1 정의
> 비동기 데이터 스트림과 변화 전파(propagation of change)를 다루는 프로그래밍 패러다임.
**3가지 핵심**:
1. **비동기** — 결과를 기다리지 않음
2. **데이터 스트림** — 시간에 걸친 값들의 흐름
3. **변화 전파** — 한 값이 변하면 의존하는 값들도 자동 업데이트
1.2 왜 Reactive인가?
**전통적 명령형**:
const a = 1
const b = 2
const sum = a + b // 3
a = 5
console.log(sum) // 여전히 3
**Reactive**:
const a$ = new BehaviorSubject(1)
const b$ = new BehaviorSubject(2)
const sum$ = combineLatest([a$, b$]).pipe(map(([a, b]) => a + b))
sum$.subscribe(s => console.log(s)) // 3
a$.next(5)
// 자동으로 8 출력
→ **자동 전파** = 스프레드시트와 비슷.
1.3 적합한 시나리오
✅ **UI 이벤트** (클릭, 입력)
✅ **WebSocket 메시지**
✅ **비동기 API 호출 합성**
✅ **실시간 데이터** (차트, 알림)
✅ **검색 자동완성** (debounce, throttle)
❌ **단순 CRUD** (오버엔지니어링)
❌ **순수 동기 작업**
❌ **간단한 상태 관리**
2. Observable과 Observer
2.1 Observable
**Observable** = 데이터 스트림.
const observable$ = new Observable<number>(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
setTimeout(() => {
subscriber.next(4)
subscriber.complete()
}, 1000)
})
**관례**: 변수명 끝에 `$` (예: `users$`).
2.2 Observer (Subscriber)
**Observer** = Observable을 구독하는 객체. 3개 콜백:
observable$.subscribe({
next: (value) => console.log('Got:', value),
error: (err) => console.error('Error:', err),
complete: () => console.log('Done')
})
**규칙**:
- `next`: 값마다 호출 (0~N번)
- `error`: 에러 시 1번 (그 후 종료)
- `complete`: 정상 종료 시 1번
**계약**: error 또는 complete 후에는 next 호출 X.
2.3 Cold vs Hot
**Cold Observable**: 구독 시 데이터 생성 시작.
const cold$ = new Observable(subscriber => {
console.log('Producing...')
subscriber.next(Math.random())
})
cold$.subscribe(v => console.log('A:', v)) // "Producing..." + 다른 값
cold$.subscribe(v => console.log('B:', v)) // "Producing..." + 다른 값
**Hot Observable**: 데이터가 이미 흐르고 있음. 구독자는 그 시점부터 받음.
const hot$ = new Subject<number>()
hot$.subscribe(v => console.log('A:', v))
hot$.next(1) // A: 1
hot$.subscribe(v => console.log('B:', v))
hot$.next(2) // A: 2, B: 2
// B는 1을 못 받음 (이미 지나감)
**예시**:
- **Cold**: HTTP 요청, 파일 읽기, DB 쿼리
- **Hot**: 마우스 이벤트, WebSocket, BehaviorSubject
2.4 Subject 종류
// Subject: 일반
const subject = new Subject<number>()
// BehaviorSubject: 마지막 값 기억, 새 구독자에게 즉시 전달
const behavior = new BehaviorSubject<number>(0)
// ReplaySubject: N개 과거 값 기억
const replay = new ReplaySubject<number>(3) // 마지막 3개
// AsyncSubject: complete 시 마지막 값만 전달
const async$ = new AsyncSubject<number>()
3. RxJS — JavaScript의 표준
3.1 기본 사용
const input = document.querySelector('input')
fromEvent(input, 'input').pipe(
map((e: any) => e.target.value),
filter(text => text.length >= 3),
debounceTime(300)
).subscribe(text => {
console.log('Search:', text)
})
**흐름**:
1. 입력 이벤트 발생
2. 값 추출
3. 3글자 이상 필터
4. 300ms 동안 입력 없으면 통과
5. 검색 실행
3.2 자주 쓰는 연산자
**Transformation**:
.pipe(
map(x => x * 2), // 변환
pluck('user', 'name'), // 중첩 객체 추출
scan((acc, v) => acc + v, 0) // 누적 (reduce)
)
**Filtering**:
.pipe(
filter(x => x > 10), // 조건 필터
take(5), // 처음 5개만
takeUntil(stop$), // stop$ 발행까지
distinctUntilChanged(), // 중복 제거 (연속)
debounceTime(300), // 300ms 동안 조용하면
throttleTime(1000) // 1초에 한 번
)
**Combination**:
merge(a$, b$) // 둘 중 어느 것이든 발행하면
combineLatest([a$, b$]) // 둘 다 최소 1번 발행 후, 둘 중 하나 발행 시마다
zip(a$, b$) // 같은 인덱스끼리 짝
forkJoin([a$, b$]) // 모두 complete 후 마지막 값
**Flattening**:
.pipe(
switchMap(id => fetch(`/api/users/${id}`)), // 새 값 오면 이전 취소
mergeMap(id => fetch(`/api/users/${id}`)), // 모두 병렬 처리
concatMap(id => fetch(`/api/users/${id}`)), // 순차 처리
exhaustMap(id => fetch(`/api/users/${id}`)) // 진행 중이면 새 값 무시
)
3.3 switchMap의 마법
**검색 자동완성**의 핵심:
fromEvent(input, 'input').pipe(
map((e: any) => e.target.value),
debounceTime(300),
switchMap(query => fetch(`/search?q=${query}`))
).subscribe(results => render(results))
**왜 switchMap?**:
- 사용자가 "react"를 입력 중
- "rea" → 검색 시작
- "reac" → 이전 검색 **자동 취소**, 새 검색 시작
- 마지막 검색 결과만 받음 → race condition 방지
**다른 옵션이라면**:
- `mergeMap`: 모든 검색 결과 (race condition)
- `concatMap`: 순차 처리 (느림)
- `exhaustMap`: 진행 중이면 새 입력 무시 (잘못된 동작)
3.4 Error Handling
.pipe(
map(x => {
if (x < 0) throw new Error('Negative')
return x
}),
catchError(err => {
console.error(err)
return of(0) // 기본값으로 대체
}),
retry(3), // 3번 재시도
retryWhen(errors => errors.pipe(
delay(1000),
take(3)
))
)
3.5 마블 다이어그램
input: --1--2--3--4--5---|
map(x => x * 2):
output: --2--4--6--8--10--|
a$: --1----2----3---|
b$: ---a----b---c--|
combineLatest:
output: ---[1,a]-[2,a][2,b]-[3,b][3,c]--|
마블 다이어그램은 RxJS 학습의 핵심. 모든 연산자가 마블로 설명됩니다.
4. Project Reactor — Java의 표준
4.1 Mono와 Flux
**Mono**: 0개 또는 1개 값.
**Flux**: 0개~N개 값.
// Mono - 단일 값 또는 없음
Mono<User> user = userRepository.findById(123);
// Flux - 여러 값
Flux<User> users = userRepository.findAll();
4.2 Spring WebFlux
전통적 Spring MVC (블로킹):
@GetMapping("/users/{id}")
public User getUser(@PathVariable Long id) {
return userRepository.findById(id).get();
}
WebFlux (논블로킹):
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userRepository.findById(id);
}
**효과**: 적은 스레드로 더 많은 동시 요청 처리. Netty 기반.
4.3 자주 쓰는 연산자
Flux.range(1, 10)
.map(i -> i * 2)
.filter(i -> i > 5)
.take(3)
.collectList()
.subscribe(System.out::println);
4.4 비동기 합성
Mono<User> user = userRepository.findById(userId);
Mono<List<Order>> orders = orderRepository.findByUserId(userId);
Mono<UserDto> userDto = Mono.zip(user, orders)
.map(tuple -> new UserDto(tuple.getT1(), tuple.getT2()));
4.5 R2DBC
전통적 JDBC = 블로킹.
**R2DBC** = Reactive Relational Database Connectivity.
@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByCountry(String country);
}
블로킹 없는 PostgreSQL/MySQL 접근.
5. Kotlin Flow
5.1 Coroutines + Reactive
Kotlin Flow = Coroutines 위에 reactive 스트림.
fun fetchUsers(): Flow<User> = flow {
val users = userRepository.findAll()
users.forEach { user ->
emit(user)
}
}
// 사용
fetchUsers()
.filter { it.age > 18 }
.map { it.name }
.collect { name -> println(name) }
5.2 Cold vs Hot Flow
**Cold Flow** (`flow { }`): 구독 시 시작.
**Hot Flow**:
- `StateFlow`: BehaviorSubject 같음, 마지막 값 기억
- `SharedFlow`: Subject 같음
// StateFlow
val count = MutableStateFlow(0)
count.value = 1
// 구독
count.collect { value -> println(value) }
5.3 Android에서
class UserViewModel : ViewModel() {
val users: StateFlow<List<User>> = userRepository
.getUsers()
.stateIn(viewModelScope, SharingStarted.Eagerly, emptyList())
}
// Compose
@Composable
fun UserList(viewModel: UserViewModel) {
val users by viewModel.users.collectAsState()
LazyColumn {
items(users) { user -> Text(user.name) }
}
}
5.4 RxJava와의 비교
| | RxJava | Kotlin Flow |
|---|---|---|
| **언어** | Java | Kotlin |
| **기반** | Threads | Coroutines |
| **메모리** | 보통 | 더 적음 |
| **연산자** | 200+ | ~60 (충분) |
| **학습 곡선** | 가파름 | 보통 |
| **Backpressure** | 명시적 | Suspending |
**Kotlin 프로젝트는 Flow 권장**.
6. Backpressure — 가장 어려운 문제
6.1 문제
**시나리오**: 서버가 초당 10,000 메시지 발행. 클라이언트는 초당 100 메시지만 처리 가능.
**결과**:
- 메모리 폭증 (큐 누적)
- 스레드 고갈
- OOM 크래시
6.2 해결 전략
**1. Buffer**: 메모리에 쌓아둠 (한계 있음)
.pipe(
bufferTime(1000), // 1초씩 버퍼
// 또는
buffer(stop$), // 신호로 flush
)
**2. Drop**: 초과분 버림
Flux.range(1, 1000)
.onBackpressureDrop(dropped -> log.warn("Dropped: " + dropped))
**3. Latest**: 최신 값만 유지
Flux.range(1, 1000)
.onBackpressureLatest()
**4. Sample**: 주기적 샘플링
.pipe(sampleTime(100)) // 100ms마다 최신 값
**5. Throttle**: 속도 제한
.pipe(throttleTime(100)) // 100ms에 한 번
**6. Pull-based**: 컨슈머가 요청한 만큼만
// Reactive Streams의 표준
subscription.request(10) // 10개만 더 줘
6.3 Reactive Streams 표준
**Reactive Streams** = JVM/JS의 backpressure 표준.
**4개 인터페이스**:
- `Publisher`: 데이터 발행
- `Subscriber`: 데이터 수신
- `Subscription`: 구독 관리
- `Processor`: 둘 다
**핵심**: `subscription.request(n)` — 컨슈머가 명시적으로 n개 요청.
**구현**: Project Reactor, RxJava 2+, Akka Streams, Vert.x.
7. 실전 패턴
7.1 자동완성 (Type-Ahead Search)
const input = document.querySelector('#search')
fromEvent(input, 'input').pipe(
map(e => e.target.value),
debounceTime(300), // 300ms 대기
distinctUntilChanged(), // 같은 값 중복 제거
filter(text => text.length >= 2), // 2글자 이상
switchMap(text => // 새 검색 시 이전 취소
fromFetch(`/api/search?q=${text}`).pipe(
switchMap(res => res.json()),
catchError(() => of([])) // 에러 시 빈 배열
)
)
).subscribe(results => render(results))
이 단순한 코드가:
- 사용자가 빠르게 타이핑할 때 모든 글자에 대해 검색 X
- race condition 방지 (이전 검색 결과가 새 검색 결과를 덮지 않음)
- 에러 처리
7.2 Polling
interval(5000).pipe(
switchMap(() => fetch('/api/status'))
).subscribe(status => updateUI(status))
5초마다 API 호출. 자동 cleanup.
7.3 WebSocket 합성
const messages$ = webSocket('wss://example.com/chat')
messages$.pipe(
filter(msg => msg.type === 'message'),
map(msg => msg.text)
).subscribe(text => addMessage(text))
7.4 Retry with Backoff
fetch$.pipe(
retry({
count: 3,
delay: (error, retryCount) => {
const delay = Math.pow(2, retryCount) * 1000
return timer(delay)
}
})
)
Exponential backoff: 1초 → 2초 → 4초.
7.5 Form Validation
const username$ = fromEvent(usernameInput, 'input').pipe(
map(e => e.target.value),
debounceTime(500),
switchMap(name =>
fromFetch(`/api/check-username?name=${name}`).pipe(
switchMap(res => res.json())
)
)
)
const password$ = fromEvent(passwordInput, 'input').pipe(
map(e => e.target.value),
map(pw => pw.length >= 8)
)
const formValid$ = combineLatest([
username$.pipe(map(res => res.available)),
password$
]).pipe(
map(([usernameOk, passwordOk]) => usernameOk && passwordOk)
)
formValid$.subscribe(valid => submitButton.disabled = !valid)
8. Schedulers와 동시성
8.1 Schedulers
스트림이 어떤 스레드에서 실행되는가?
**RxJS Schedulers**:
- `asyncScheduler`: setTimeout 기반
- `asapScheduler`: microtask
- `queueScheduler`: 동기 (즉시)
- `animationFrameScheduler`: requestAnimationFrame
.pipe(
observeOn(asyncScheduler), // 어디서 처리할지
subscribeOn(asyncScheduler) // 어디서 시작할지
)
8.2 RxJava Schedulers
.subscribeOn(Schedulers.io()) // I/O 작업
.observeOn(Schedulers.computation()) // CPU 작업
.observeOn(AndroidSchedulers.mainThread()) // UI
**일반 패턴**: I/O는 io 스케줄러, UI 업데이트는 main 스레드.
9. 함정과 베스트 프랙티스
9.1 메모리 누수 — Subscription 관리
**잘못**:
ngOnInit() {
this.users$.subscribe(users => this.users = users)
// 컴포넌트가 죽어도 subscription은 살아있음 → 메모리 누수
}
**해결 1**: takeUntil
private destroy$ = new Subject<void>()
ngOnInit() {
this.users$.pipe(
takeUntil(this.destroy$)
).subscribe(users => this.users = users)
}
ngOnDestroy() {
this.destroy$.next()
this.destroy$.complete()
}
**해결 2**: async pipe (Angular)
{{ user.name }}
**자동 cleanup**.
9.2 Cold/Hot 혼란
**잘못**:
const data$ = fetch$.pipe(map(transformData))
// 두 번 구독 → fetch가 두 번 실행
data$.subscribe(handler1)
data$.subscribe(handler2)
**해결**: `share()` 또는 `shareReplay()`
const data$ = fetch$.pipe(
map(transformData),
shareReplay(1) // 한 번 fetch, 결과 공유
)
9.3 너무 많은 nested
// Pyramid of doom
.pipe(
switchMap(a =>
fetch1(a).pipe(
switchMap(b =>
fetch2(b).pipe(
map(c => transform(c))
)
)
)
)
)
**해결**: 작은 함수로 분리, async/await 고려.
9.4 RxJS가 항상 답은 아니다
**오버엔지니어링 신호**:
- 단순 fetch에 RxJS
- 단일 이벤트에 Observable
- 학습 곡선이 비즈니스 가치 초과
**적합한 경우**:
- 복잡한 비동기 흐름
- 여러 이벤트 합성
- 실시간 데이터
- backpressure 필요
10. Reactive vs Async/Await
10.1 비교
// Async/Await
async function getUserOrders(userId) {
const user = await fetchUser(userId)
const orders = await fetchOrders(user.id)
return { user, orders }
}
// Reactive (RxJS)
function getUserOrders(userId) {
return fetchUser$(userId).pipe(
switchMap(user =>
fetchOrders$(user.id).pipe(
map(orders => ({ user, orders }))
)
)
)
}
10.2 언제 무엇?
**Async/Await**:
- 단순 순차 비동기
- 1회성 작업
- 학습 곡선 낮음
- 대부분의 경우
**Reactive**:
- 스트림 (여러 값)
- 복잡한 합성 (combineLatest, merge)
- 시간 기반 연산 (debounce, throttle)
- backpressure
- 취소 가능 (cancellation)
10.3 혼용
async function loadData() {
const cached$ = fromPromise(getCached())
const fresh$ = fromPromise(fetchFresh())
return await firstValueFrom(
merge(cached$, fresh$).pipe(
first(data => data !== null)
)
)
}
퀴즈
**답**: **Cold**: 구독 시 데이터 생성 시작. 각 구독자가 독립적인 데이터 받음. 예: HTTP 요청, 파일 읽기. **Hot**: 데이터가 이미 흐르고 있음. 새 구독자는 그 시점부터 받음 (이전 데이터 못 받음). 예: 마우스 이벤트, WebSocket, BehaviorSubject. **흔한 함정**: HTTP를 두 번 구독하면 두 번 호출됨 (Cold). `shareReplay()`로 hot으로 변환 가능. **Cold/Hot 혼란이 가장 흔한 RxJS 버그의 원인**.
**답**: 사용자가 "react"를 입력 중일 때 매 글자마다 API 호출이 시작됩니다. **switchMap**: 새 입력이 오면 **이전 검색을 자동 취소**하고 새 검색 시작. 결과: 마지막 검색 결과만 받음 → race condition 방지. **다른 옵션**: `mergeMap`은 모든 결과 (race condition), `concatMap`은 순차 (느림), `exhaustMap`은 진행 중이면 무시 (잘못된 동작). switchMap이 정확한 답.
**답**: **Backpressure**: 생산자가 소비자보다 빠를 때의 문제. 처리 안 하면 메모리 폭증, OOM. **해결 전략**: (1) **Buffer** — 메모리에 쌓음 (한계 있음), (2) **Drop** — 초과분 버림, (3) **Latest** — 최신 값만 유지, (4) **Sample** — 주기적 샘플링, (5) **Throttle** — 속도 제한, (6) **Pull-based** — 소비자가 명시적으로 요청 (Reactive Streams 표준). 선택은 데이터 특성에 따라.
**답**: 컴포넌트가 죽어도 subscription이 살아있으면 메모리 누수. **해결**: (1) **takeUntil(destroy$)** — `destroy$` Subject로 unsubscribe 신호, (2) **async pipe (Angular)** — 자동 unsubscribe, (3) **takeWhile** — 조건부 종료, (4) **수동 unsubscribe** — `subscription.unsubscribe()` (덜 권장). **가장 안전한 패턴**: takeUntil + ngOnDestroy 또는 async pipe. React에서는 useEffect cleanup 함수에서 unsubscribe.
**답**: **Async/Await**: 단순 순차 비동기, 1회성 작업, 학습 곡선 낮음, 대부분의 경우. **Reactive**: 스트림 (여러 값), 복잡한 합성 (combineLatest), 시간 기반 (debounce/throttle), backpressure 필요, 취소 가능. **흔한 실수**: 단순 fetch에 RxJS 사용 → 오버엔지니어링. **적절한 사용**: 검색 자동완성, 실시간 데이터, WebSocket, 복잡한 폼 검증. 두 가지를 혼용도 가능 (`firstValueFrom`, `from`).
참고 자료
- [RxJS](https://rxjs.dev/) — 공식 문서
- [Project Reactor](https://projectreactor.io/) — Java
- [Kotlin Flow](https://kotlinlang.org/docs/flow.html)
- [ReactiveX](http://reactivex.io/) — 모든 언어
- [Marble Diagrams](https://rxmarbles.com/) — 시각화
- [Reactive Streams](https://www.reactive-streams.org/) — JVM/JS 표준
- [RxJS in Action](https://www.manning.com/books/rxjs-in-action)
- [Functional Reactive Programming](https://www.manning.com/books/functional-reactive-programming)
- [Spring WebFlux](https://docs.spring.io/spring-framework/reference/web/webflux.html)
- [Akka Streams](https://doc.akka.io/docs/akka/current/stream/index.html)
- [Vert.x](https://vertx.io/)
현재 단락 (1/439)
- **Reactive = 비동기 + 스트림 + 함수형**: 데이터의 흐름을 다루는 패러다임