Skip to content

필사 모드: Redis 캐싱 전략 완벽 가이드 — Cache-Aside부터 Write-Behind까지 실전 패턴 6가지

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

캐싱은 애플리케이션 성능 최적화의 핵심 전략입니다. 하지만 "캐시를 쓰면 빨라진다" 정도의 이해만으로는 프로덕션에서 다양한 문제에 직면하게 됩니다. 캐시 일관성, 캐시 스탬피드, 메모리 관리 등 실전에서 마주하는 과제들을 해결하려면 **올바른 캐싱 전략 선택**이 필수입니다.

캐싱 전략 개요

┌─────────────────────────────────────────────────────┐

│ 캐싱 전략 분류 │

├──────────────────┬──────────────────────────────────┤

│ 읽기 전략 │ 쓰기 전략 │

├──────────────────┼──────────────────────────────────┤

│ Cache-Aside │ Write-Through │

│ Read-Through │ Write-Behind (Write-Back) │

│ Refresh-Ahead │ Write-Around │

└──────────────────┴──────────────────────────────────┘

1. Cache-Aside (Lazy Loading)

가장 널리 사용되는 패턴으로, 애플리케이션이 캐시를 직접 관리합니다.

읽기 플로우:

[App] → Cache hit? → [Redis] → 데이터 반환

↓ miss

[App] → [Database] → 데이터 반환

[App] → [Redis] 캐시 저장

from typing import Optional

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class UserService:

def get_user(self, user_id: int) -> Optional[dict]:

cache_key = f"user:{user_id}"

1. 캐시 확인

cached = r.get(cache_key)

if cached:

return json.loads(cached)

2. Cache Miss → DB 조회

user = self.db.query("SELECT * FROM users WHERE id = %s", user_id)

if not user:

Negative caching: 없는 데이터도 캐시 (짧은 TTL)

r.setex(cache_key, 60, json.dumps(None))

return None

3. 캐시 저장

r.setex(cache_key, 3600, json.dumps(user))

return user

def update_user(self, user_id: int, data: dict):

DB 업데이트

self.db.execute("UPDATE users SET ... WHERE id = %s", user_id)

캐시 무효화 (삭제)

cache_key = f"user:{user_id}"

r.delete(cache_key)

주의: 캐시 업데이트가 아닌 삭제!

다음 읽기 시 최신 데이터로 캐시 갱신

**장점**: 구현 간단, 필요한 데이터만 캐시, 캐시 장애 시 DB로 폴백

**단점**: 최초 요청은 항상 느림 (Cold Start), 데이터 불일치 가능

2. Read-Through

캐시가 데이터 로딩을 담당합니다. 애플리케이션은 항상 캐시만 바라봅니다.

class ReadThroughCache:

"""

캐시가 DB 조회를 대행

애플리케이션은 캐시만 호출

"""

def __init__(self, redis_client, db, default_ttl=3600):

self.redis = redis_client

self.db = db

self.ttl = default_ttl

def get(self, key: str, loader_fn=None) -> Optional[dict]:

캐시 확인

cached = self.redis.get(key)

if cached:

return json.loads(cached)

Cache Miss → loader 함수로 데이터 로드

if loader_fn:

data = loader_fn()

if data is not None:

self.redis.setex(key, self.ttl, json.dumps(data))

return data

return None

사용 예시

cache = ReadThroughCache(r, db)

def get_product(product_id: int):

return cache.get(

f"product:{product_id}",

loader_fn=lambda: db.query(

"SELECT * FROM products WHERE id = %s", product_id

)

)

3. Write-Through

쓰기가 캐시를 거쳐 DB로 동기적으로 전파됩니다.

class WriteThroughCache:

"""

쓰기: App → Cache → DB (동기)

읽기: App → Cache (항상 최신)

"""

def write(self, key: str, data: dict, db_writer_fn=None):

1. 캐시에 먼저 쓰기

self.redis.setex(key, self.ttl, json.dumps(data))

2. DB에 동기적으로 쓰기

if db_writer_fn:

db_writer_fn(data)

return data

def get(self, key: str) -> Optional[dict]:

캐시가 항상 최신이므로 캐시에서만 읽기

cached = self.redis.get(key)

if cached:

return json.loads(cached)

return None

사용 예시

cache = WriteThroughCache(r, db)

def update_inventory(product_id: int, quantity: int):

data = {"product_id": product_id, "quantity": quantity}

cache.write(

f"inventory:{product_id}",

data,

db_writer_fn=lambda d: db.execute(

"UPDATE inventory SET quantity = %s WHERE product_id = %s",

d["quantity"], d["product_id"]

)

)

**장점**: 캐시와 DB 일관성 보장

**단점**: 쓰기 레이턴시 증가 (캐시 + DB 두 번), 사용하지 않는 데이터도 캐시

4. Write-Behind (Write-Back)

쓰기를 캐시에만 하고, DB 반영은 비동기로 지연합니다.

from collections import defaultdict

class WriteBehindCache:

"""

쓰기: App → Cache (즉시) → DB (비동기, 배치)

높은 쓰기 처리량이 필요한 경우

"""

def __init__(self, redis_client, db, flush_interval=5):

self.redis = redis_client

self.db = db

self.dirty_keys = set()

self.flush_interval = flush_interval

self._start_flusher()

def write(self, key: str, data: dict):

캐시에만 즉시 쓰기

self.redis.setex(key, 7200, json.dumps(data))

Dirty 표시 (DB 반영 필요)

self.redis.sadd("dirty_keys", key)

def _start_flusher(self):

"""주기적으로 Dirty 데이터를 DB에 배치 반영"""

def flush():

while True:

try:

Dirty 키 가져오기

dirty_keys = self.redis.smembers("dirty_keys")

if dirty_keys:

pipe = self.db.pipeline()

for key in dirty_keys:

data = self.redis.get(key)

if data:

pipe.add_to_batch(key, json.loads(data))

pipe.execute() # 배치 DB 쓰기

Dirty 표시 제거

self.redis.srem("dirty_keys", *dirty_keys)

except Exception as e:

print(f"Flush error: {e}")

threading.Event().wait(self.flush_interval)

thread = threading.Thread(target=flush, daemon=True)

thread.start()

**장점**: 매우 빠른 쓰기 성능, DB 부하 감소

**단점**: 데이터 유실 위험 (캐시 장애 시), 복잡한 구현

5. Refresh-Ahead

TTL 만료 전에 미리 캐시를 갱신합니다.

class RefreshAheadCache:

"""

TTL의 특정 비율 지점에서 백그라운드로 캐시 갱신

예: TTL 3600초, factor 0.8 → 2880초(80%) 지점에서 갱신 시작

"""

def __init__(self, redis_client, refresh_factor=0.8):

self.redis = redis_client

self.refresh_factor = refresh_factor

def get(self, key: str, ttl: int, loader_fn=None):

cached = self.redis.get(key)

if cached:

남은 TTL 확인

remaining_ttl = self.redis.ttl(key)

threshold = ttl * (1 - self.refresh_factor)

if remaining_ttl < threshold:

백그라운드에서 미리 갱신

self._async_refresh(key, ttl, loader_fn)

return json.loads(cached)

Cache Miss

if loader_fn:

data = loader_fn()

self.redis.setex(key, ttl, json.dumps(data))

return data

return None

def _async_refresh(self, key, ttl, loader_fn):

"""비동기로 캐시 갱신 (락으로 중복 방지)"""

lock_key = f"refresh_lock:{key}"

if self.redis.set(lock_key, "1", nx=True, ex=30):

락 획득 성공 → 갱신

threading.Thread(

target=self._refresh,

args=(key, ttl, loader_fn, lock_key),

daemon=True

).start()

def _refresh(self, key, ttl, loader_fn, lock_key):

try:

data = loader_fn()

self.redis.setex(key, ttl, json.dumps(data))

finally:

self.redis.delete(lock_key)

6. Cache Stampede 방지

TTL 만료 시 대량 요청이 동시에 DB를 때리는 **Cache Stampede** 문제 해결:

class StampedeProtectedCache:

def get_with_lock(self, key: str, ttl: int, loader_fn):

"""분산 락으로 하나의 요청만 DB 조회"""

cached = self.redis.get(key)

if cached:

return json.loads(cached)

lock_key = f"lock:{key}"

분산 락 시도

if self.redis.set(lock_key, "1", nx=True, ex=10):

try:

락 획득 → DB 조회 & 캐시 갱신

data = loader_fn()

self.redis.setex(key, ttl, json.dumps(data))

return data

finally:

self.redis.delete(lock_key)

else:

락 실패 → 잠시 대기 후 캐시 재확인

time.sleep(0.1)

cached = self.redis.get(key)

if cached:

return json.loads(cached)

여전히 없으면 직접 DB 조회

return loader_fn()

def get_with_probabilistic_refresh(self, key: str, ttl: int, loader_fn, beta=1.0):

"""확률적 조기 갱신 (XFetch 알고리즘)"""

cached = self.redis.get(key)

if cached:

data = json.loads(cached)

remaining_ttl = self.redis.ttl(key)

delta = ttl - remaining_ttl # 경과 시간

확률적으로 조기 갱신

TTL 만료에 가까울수록 갱신 확률 증가

if delta > 0:

prob = math.exp(-remaining_ttl / (beta * delta))

if random.random() < prob:

조기 갱신

new_data = loader_fn()

self.redis.setex(key, ttl, json.dumps(new_data))

return new_data

return data

Cache Miss

data = loader_fn()

self.redis.setex(key, ttl, json.dumps(data))

return data

TTL 전략

데이터 유형별 TTL 가이드

TTL_STRATEGIES = {

자주 변경되는 데이터

"session": 1800, # 30분

"rate_limit": 60, # 1분

"realtime_stats": 10, # 10초

가끔 변경되는 데이터

"user_profile": 3600, # 1시간

"product_detail": 1800, # 30분

"api_response": 300, # 5분

거의 변경되지 않는 데이터

"config": 86400, # 24시간

"country_list": 604800, # 7일

"static_content": 2592000, # 30일

Negative cache (존재하지 않는 데이터)

"not_found": 60, # 1분 (짧게!)

}

TTL에 약간의 랜덤성 추가 (Stampede 방지)

def ttl_with_jitter(base_ttl: int, jitter_pct: float = 0.1) -> int:

"""TTL에 ±10% 랜덤 편차 추가"""

jitter = int(base_ttl * jitter_pct)

return base_ttl + random.randint(-jitter, jitter)

사용: r.setex(key, ttl_with_jitter(3600), value)

Redis 데이터 구조 활용

1. String — 단순 캐시

r.setex("user:123", 3600, json.dumps(user_data))

2. Hash — 부분 업데이트가 필요한 경우

r.hset("user:123", mapping={

"name": "Youngju",

"email": "yj@example.com",

"login_count": 42

})

r.hincrby("user:123", "login_count", 1) # 부분 업데이트

3. Sorted Set — 순위/리더보드

r.zadd("leaderboard", {"player1": 100, "player2": 85, "player3": 92})

top_3 = r.zrevrange("leaderboard", 0, 2, withscores=True)

4. List — 최근 활동 피드

r.lpush("feed:user:123", json.dumps(activity))

r.ltrim("feed:user:123", 0, 99) # 최근 100개만 유지

5. Set — 중복 제거

r.sadd("online_users", "user:123", "user:456")

online_count = r.scard("online_users")

6. Stream — 이벤트 로그

r.xadd("events:orders", {"action": "created", "order_id": "ORD-123"})

캐시 모니터링

Redis INFO 명령으로 캐시 효율 확인

info = r.info("stats")

hits = info["keyspace_hits"]

misses = info["keyspace_misses"]

hit_rate = hits / (hits + misses) * 100

print(f"Cache Hit Rate: {hit_rate:.1f}%")

목표: 95% 이상

메모리 사용량 확인

memory_info = r.info("memory")

print(f"Used Memory: {memory_info['used_memory_human']}")

print(f"Peak Memory: {memory_info['used_memory_peak_human']}")

print(f"Fragmentation Ratio: {memory_info['mem_fragmentation_ratio']}")

Redis CLI로 모니터링

redis-cli info stats | grep -E "keyspace_hits|keyspace_misses"

redis-cli info memory | grep "used_memory_human"

느린 쿼리 확인

redis-cli slowlog get 10

실시간 명령 모니터링

redis-cli monitor

전략 선택 가이드

┌─────────────────────────────────────────────────┐

│ 어떤 캐싱 전략을 쓸까? │

├─────────────────────────────────────────────────┤

│ │

│ 읽기 중심? ──YES──> Cache-Aside │

│ │ (가장 범용적) │

│ NO │

│ │ │

│ 쓰기 중심? ──YES──> Write-Behind │

│ │ (높은 쓰기 처리량) │

│ NO │

│ │ │

│ 일관성 중요? ──YES──> Write-Through │

│ │ (캐시-DB 항상 동기) │

│ NO │

│ │ │

│ 지연 민감? ──YES──> Refresh-Ahead │

│ (Cache Miss 최소화) │

└─────────────────────────────────────────────────┘

퀴즈

Race Condition 방지를 위해서입니다. 두 요청이 동시에 업데이트하면 캐시에 오래된 데이터가 남을 수 있습니다. 삭제하면 다음 읽기 시 최신 데이터를 DB에서 가져와 캐시합니다.

인기 있는 캐시 키의 TTL이 만료될 때 대량의 요청이 동시에 DB를 조회하는 현상입니다. 분산 락, 확률적 조기 갱신(XFetch), Refresh-Ahead 패턴으로 방지합니다.

캐시(Redis) 장애 시 아직 DB에 반영되지 않은 데이터가 유실될 수 있습니다. AOF/RDB 지속성 설정과 Write-Ahead Log로 위험을 줄일 수 있습니다.

DB에 존재하지 않는 데이터도 캐시하는 것입니다. 동일한 존재하지 않는 키에 대한 반복 조회가 DB를 때리는 것을 방지합니다. 짧은 TTL(예: 60초)을 사용합니다.

같은 시간에 생성된 캐시들이 동시에 만료되어 Cache Stampede가 발생하는 것을 방지합니다. TTL에 ±10% 랜덤 편차를 추가합니다.

일반적으로 **95% 이상**을 목표로 합니다. 80% 미만이면 캐싱 전략을 재검토해야 합니다.

객체의 **일부 필드만 업데이트**하는 경우입니다. String은 전체 데이터를 직렬화/역직렬화해야 하지만, Hash는 개별 필드를 독립적으로 읽기/쓰기할 수 있습니다.

마무리

캐싱 전략은 "Redis에 넣으면 끝"이 아닙니다. 데이터의 특성(읽기/쓰기 비율, 일관성 요구사항, 갱신 빈도)에 따라 적절한 전략을 선택하고, TTL 관리, Stampede 방지, 모니터링까지 고려해야 프로덕션에서 안정적으로 운영할 수 있습니다.

참고 자료

- [AWS: Database Caching Strategies Using Redis](https://docs.aws.amazon.com/whitepapers/latest/database-caching-strategies-using-redis/caching-patterns.html)

- [Redis Official: Caching Patterns](https://redis.io/solutions/caching/)

- [Redis Blog: Advanced Caching Strategies](https://redis.io/blog/why-your-caching-strategies-might-be-holding-you-back-and-what-to-consider-next/)

현재 단락 (1/279)

캐싱은 애플리케이션 성능 최적화의 핵심 전략입니다. 하지만 "캐시를 쓰면 빨라진다" 정도의 이해만으로는 프로덕션에서 다양한 문제에 직면하게 됩니다. 캐시 일관성, 캐시 스탬피드, ...

작성 글자: 0원문 글자: 8,898작성 단락: 0/279