- Authors
- Name
- Introduction
- Overview of Caching Strategies
- 1. Cache-Aside (Lazy Loading)
- 2. Read-Through
- 3. Write-Through
- 4. Write-Behind (Write-Back)
- 5. Refresh-Ahead
- 6. Cache Stampede Prevention
- TTL Strategy
- Leveraging Redis Data Structures
- Cache Monitoring
- Strategy Selection Guide
- Quiz
- Conclusion
- References

Introduction
Caching is a core strategy for application performance optimization. However, simply understanding that "using cache makes things faster" will lead you to face various problems in production. To solve real-world challenges such as cache consistency, cache stampede, and memory management, choosing the right caching strategy is essential.
Overview of Caching Strategies
┌─────────────────────────────────────────────────────┐
│ Caching Strategy Categories │
├──────────────────┬──────────────────────────────────┤
│ Read Strategies │ Write Strategies │
├──────────────────┼──────────────────────────────────┤
│ Cache-Aside │ Write-Through │
│ Read-Through │ Write-Behind (Write-Back) │
│ Refresh-Ahead │ Write-Around │
└──────────────────┴──────────────────────────────────┘
1. Cache-Aside (Lazy Loading)
The most widely used pattern where the application manages the cache directly.
Read flow:
[App] → Cache hit? → [Redis] → Return data
↓ miss
[App] → [Database] → Return data
↓
[App] → [Redis] Store in cache
import redis
import json
from typing import Optional
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
class UserService:
def get_user(self, user_id: int) -> Optional[dict]:
cache_key = f"user:{user_id}"
# 1. Check cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. Cache Miss → Query DB
user = self.db.query("SELECT * FROM users WHERE id = %s", user_id)
if not user:
# Negative caching: cache non-existent data too (short TTL)
r.setex(cache_key, 60, json.dumps(None))
return None
# 3. Store in cache
r.setex(cache_key, 3600, json.dumps(user))
return user
def update_user(self, user_id: int, data: dict):
# Update DB
self.db.execute("UPDATE users SET ... WHERE id = %s", user_id)
# Invalidate cache (delete)
cache_key = f"user:{user_id}"
r.delete(cache_key)
# Note: Delete, not update the cache!
# Next read will fetch fresh data from DB and re-cache
Pros: Simple implementation, only caches needed data, falls back to DB on cache failure Cons: First request is always slow (Cold Start), possible data inconsistency
2. Read-Through
The cache handles data loading. The application always looks only at the cache.
class ReadThroughCache:
"""
Cache handles DB queries on behalf of the app
Application only calls the cache
"""
def __init__(self, redis_client, db, default_ttl=3600):
self.redis = redis_client
self.db = db
self.ttl = default_ttl
def get(self, key: str, loader_fn=None) -> Optional[dict]:
# Check cache
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Cache Miss → Load data via loader function
if loader_fn:
data = loader_fn()
if data is not None:
self.redis.setex(key, self.ttl, json.dumps(data))
return data
return None
# Usage example
cache = ReadThroughCache(r, db)
def get_product(product_id: int):
return cache.get(
f"product:{product_id}",
loader_fn=lambda: db.query(
"SELECT * FROM products WHERE id = %s", product_id
)
)
3. Write-Through
Writes propagate synchronously through the cache to the DB.
class WriteThroughCache:
"""
Write: App → Cache → DB (synchronous)
Read: App → Cache (always up-to-date)
"""
def write(self, key: str, data: dict, db_writer_fn=None):
# 1. Write to cache first
self.redis.setex(key, self.ttl, json.dumps(data))
# 2. Write to DB synchronously
if db_writer_fn:
db_writer_fn(data)
return data
def get(self, key: str) -> Optional[dict]:
# Cache is always up-to-date, so read from cache only
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
# Usage example
cache = WriteThroughCache(r, db)
def update_inventory(product_id: int, quantity: int):
data = {"product_id": product_id, "quantity": quantity}
cache.write(
f"inventory:{product_id}",
data,
db_writer_fn=lambda d: db.execute(
"UPDATE inventory SET quantity = %s WHERE product_id = %s",
d["quantity"], d["product_id"]
)
)
Pros: Guarantees consistency between cache and DB Cons: Increased write latency (both cache + DB), caches unused data too
4. Write-Behind (Write-Back)
Writes go only to the cache, and DB updates are deferred asynchronously.
import threading
from collections import defaultdict
class WriteBehindCache:
"""
Write: App → Cache (immediate) → DB (async, batch)
For cases requiring high write throughput
"""
def __init__(self, redis_client, db, flush_interval=5):
self.redis = redis_client
self.db = db
self.dirty_keys = set()
self.flush_interval = flush_interval
self._start_flusher()
def write(self, key: str, data: dict):
# Write to cache only (immediate)
self.redis.setex(key, 7200, json.dumps(data))
# Mark as dirty (needs DB sync)
self.redis.sadd("dirty_keys", key)
def _start_flusher(self):
"""Periodically batch-flush dirty data to DB"""
def flush():
while True:
try:
# Get dirty keys
dirty_keys = self.redis.smembers("dirty_keys")
if dirty_keys:
pipe = self.db.pipeline()
for key in dirty_keys:
data = self.redis.get(key)
if data:
pipe.add_to_batch(key, json.loads(data))
pipe.execute() # Batch DB write
# Remove dirty marks
self.redis.srem("dirty_keys", *dirty_keys)
except Exception as e:
print(f"Flush error: {e}")
threading.Event().wait(self.flush_interval)
thread = threading.Thread(target=flush, daemon=True)
thread.start()
Pros: Very fast write performance, reduced DB load Cons: Risk of data loss (on cache failure), complex implementation
5. Refresh-Ahead
Proactively refreshes the cache before TTL expiration.
import time
class RefreshAheadCache:
"""
Refreshes cache in background at a certain percentage of TTL
Example: TTL 3600s, factor 0.8 → starts refresh at 2880s (80%) mark
"""
def __init__(self, redis_client, refresh_factor=0.8):
self.redis = redis_client
self.refresh_factor = refresh_factor
def get(self, key: str, ttl: int, loader_fn=None):
cached = self.redis.get(key)
if cached:
# Check remaining TTL
remaining_ttl = self.redis.ttl(key)
threshold = ttl * (1 - self.refresh_factor)
if remaining_ttl < threshold:
# Proactively refresh in background
self._async_refresh(key, ttl, loader_fn)
return json.loads(cached)
# Cache Miss
if loader_fn:
data = loader_fn()
self.redis.setex(key, ttl, json.dumps(data))
return data
return None
def _async_refresh(self, key, ttl, loader_fn):
"""Asynchronously refresh cache (with lock to prevent duplicates)"""
lock_key = f"refresh_lock:{key}"
if self.redis.set(lock_key, "1", nx=True, ex=30):
# Lock acquired → refresh
threading.Thread(
target=self._refresh,
args=(key, ttl, loader_fn, lock_key),
daemon=True
).start()
def _refresh(self, key, ttl, loader_fn, lock_key):
try:
data = loader_fn()
self.redis.setex(key, ttl, json.dumps(data))
finally:
self.redis.delete(lock_key)
6. Cache Stampede Prevention
Solving the Cache Stampede problem where massive requests simultaneously hit the DB when TTL expires:
class StampedeProtectedCache:
def get_with_lock(self, key: str, ttl: int, loader_fn):
"""Only one request queries DB via distributed lock"""
cached = self.redis.get(key)
if cached:
return json.loads(cached)
lock_key = f"lock:{key}"
# Attempt distributed lock
if self.redis.set(lock_key, "1", nx=True, ex=10):
try:
# Lock acquired → query DB and update cache
data = loader_fn()
self.redis.setex(key, ttl, json.dumps(data))
return data
finally:
self.redis.delete(lock_key)
else:
# Lock failed → wait briefly and recheck cache
time.sleep(0.1)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Still not cached → query DB directly
return loader_fn()
def get_with_probabilistic_refresh(self, key: str, ttl: int, loader_fn, beta=1.0):
"""Probabilistic early refresh (XFetch algorithm)"""
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
remaining_ttl = self.redis.ttl(key)
delta = ttl - remaining_ttl # Elapsed time
# Probabilistically trigger early refresh
# Probability increases as TTL approaches expiration
import random, math
if delta > 0:
prob = math.exp(-remaining_ttl / (beta * delta))
if random.random() < prob:
# Early refresh
new_data = loader_fn()
self.redis.setex(key, ttl, json.dumps(new_data))
return new_data
return data
# Cache Miss
data = loader_fn()
self.redis.setex(key, ttl, json.dumps(data))
return data
TTL Strategy
# TTL guide by data type
TTL_STRATEGIES = {
# Frequently changing data
"session": 1800, # 30 minutes
"rate_limit": 60, # 1 minute
"realtime_stats": 10, # 10 seconds
# Occasionally changing data
"user_profile": 3600, # 1 hour
"product_detail": 1800, # 30 minutes
"api_response": 300, # 5 minutes
# Rarely changing data
"config": 86400, # 24 hours
"country_list": 604800, # 7 days
"static_content": 2592000, # 30 days
# Negative cache (non-existent data)
"not_found": 60, # 1 minute (keep it short!)
}
# Add slight randomness to TTL (Stampede prevention)
import random
def ttl_with_jitter(base_ttl: int, jitter_pct: float = 0.1) -> int:
"""Add ±10% random deviation to TTL"""
jitter = int(base_ttl * jitter_pct)
return base_ttl + random.randint(-jitter, jitter)
# Usage: r.setex(key, ttl_with_jitter(3600), value)
Leveraging Redis Data Structures
# 1. String — Simple caching
r.setex("user:123", 3600, json.dumps(user_data))
# 2. Hash — When partial updates are needed
r.hset("user:123", mapping={
"name": "Youngju",
"email": "yj@example.com",
"login_count": 42
})
r.hincrby("user:123", "login_count", 1) # Partial update
# 3. Sorted Set — Rankings/Leaderboards
r.zadd("leaderboard", {"player1": 100, "player2": 85, "player3": 92})
top_3 = r.zrevrange("leaderboard", 0, 2, withscores=True)
# 4. List — Recent activity feed
r.lpush("feed:user:123", json.dumps(activity))
r.ltrim("feed:user:123", 0, 99) # Keep only the latest 100
# 5. Set — Deduplication
r.sadd("online_users", "user:123", "user:456")
online_count = r.scard("online_users")
# 6. Stream — Event logs
r.xadd("events:orders", {"action": "created", "order_id": "ORD-123"})
Cache Monitoring
# Check cache efficiency with Redis INFO command
info = r.info("stats")
hits = info["keyspace_hits"]
misses = info["keyspace_misses"]
hit_rate = hits / (hits + misses) * 100
print(f"Cache Hit Rate: {hit_rate:.1f}%")
# Target: 95% or higher
# Check memory usage
memory_info = r.info("memory")
print(f"Used Memory: {memory_info['used_memory_human']}")
print(f"Peak Memory: {memory_info['used_memory_peak_human']}")
print(f"Fragmentation Ratio: {memory_info['mem_fragmentation_ratio']}")
# Monitoring with Redis CLI
redis-cli info stats | grep -E "keyspace_hits|keyspace_misses"
redis-cli info memory | grep "used_memory_human"
# Check slow queries
redis-cli slowlog get 10
# Real-time command monitoring
redis-cli monitor
Strategy Selection Guide
┌─────────────────────────────────────────────────┐
│ Which caching strategy to use? │
├─────────────────────────────────────────────────┤
│ │
│ Read-heavy? ──YES──> Cache-Aside │
│ │ (Most versatile) │
│ NO │
│ │ │
│ Write-heavy? ──YES──> Write-Behind │
│ │ (High write throughput) │
│ NO │
│ │ │
│ Consistency ──YES──> Write-Through │
│ matters? (Cache-DB always in sync)│
│ NO │
│ │ │
│ Latency ──YES──> Refresh-Ahead │
│ sensitive? (Minimize Cache Miss) │
└─────────────────────────────────────────────────┘
Quiz
Q1. In the Cache-Aside pattern, why do we "delete" the cache instead of "updating" it when data changes?
To prevent race conditions. If two requests update simultaneously, stale data can remain in the cache. Deleting ensures the next read fetches fresh data from the DB and re-caches it.
Q2. What is Cache Stampede and how do you prevent it?
It occurs when the TTL of a popular cache key expires and massive requests simultaneously query the DB. It can be prevented using distributed locks, probabilistic early refresh (XFetch), or the Refresh-Ahead pattern.
Q3. What is the biggest risk of the Write-Behind pattern?
Data that has not yet been written to the DB can be lost if the cache (Redis) fails. The risk can be mitigated with AOF/RDB persistence settings and Write-Ahead Logs.
Q4. What is Negative Caching?
It means caching data that does not exist in the DB. This prevents repeated lookups for the same non-existent key from hitting the DB. A short TTL (e.g., 60 seconds) is used.
Q5. Why add Jitter to TTL?
To prevent Cache Stampede caused by caches created at the same time expiring simultaneously. A random deviation of plus or minus 10% is added to the TTL.
Q6. What is the recommended target for Cache Hit Rate?
Generally, the target is 95% or higher. If it falls below 80%, the caching strategy should be re-evaluated.
Q7. When is Redis Hash more advantageous than String?
When you need to update only some fields of an object. String requires serializing/deserializing the entire data, while Hash allows reading/writing individual fields independently.
Conclusion
Caching strategy is not just about "putting data in Redis and calling it a day." You need to select the appropriate strategy based on data characteristics (read/write ratio, consistency requirements, update frequency), and consider TTL management, Stampede prevention, and monitoring to operate reliably in production.