Skip to content
Published on

Complete Guide to Redis Caching Strategies — 6 Practical Patterns from Cache-Aside to Write-Behind

Authors
  • Name
    Twitter
Redis Caching Strategies

Introduction

Caching is a core strategy for application performance optimization. However, simply understanding that "using cache makes things faster" will lead you to face various problems in production. To solve real-world challenges such as cache consistency, cache stampede, and memory management, choosing the right caching strategy is essential.

Overview of Caching Strategies

┌─────────────────────────────────────────────────────┐
Caching Strategy Categories├──────────────────┬──────────────────────────────────┤
Read StrategiesWrite Strategies├──────────────────┼──────────────────────────────────┤
Cache-AsideWrite-ThroughRead-ThroughWrite-Behind (Write-Back)Refresh-AheadWrite-Around└──────────────────┴──────────────────────────────────┘

1. Cache-Aside (Lazy Loading)

The most widely used pattern where the application manages the cache directly.

Read flow:
[App]Cache hit?[Redis]Return data
  ↓ miss
[App][Database]Return data
[App][Redis] Store in cache
import redis
import json
from typing import Optional

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class UserService:
    def get_user(self, user_id: int) -> Optional[dict]:
        cache_key = f"user:{user_id}"

        # 1. Check cache
        cached = r.get(cache_key)
        if cached:
            return json.loads(cached)

        # 2. Cache Miss → Query DB
        user = self.db.query("SELECT * FROM users WHERE id = %s", user_id)
        if not user:
            # Negative caching: cache non-existent data too (short TTL)
            r.setex(cache_key, 60, json.dumps(None))
            return None

        # 3. Store in cache
        r.setex(cache_key, 3600, json.dumps(user))
        return user

    def update_user(self, user_id: int, data: dict):
        # Update DB
        self.db.execute("UPDATE users SET ... WHERE id = %s", user_id)

        # Invalidate cache (delete)
        cache_key = f"user:{user_id}"
        r.delete(cache_key)
        # Note: Delete, not update the cache!
        # Next read will fetch fresh data from DB and re-cache

Pros: Simple implementation, only caches needed data, falls back to DB on cache failure Cons: First request is always slow (Cold Start), possible data inconsistency

2. Read-Through

The cache handles data loading. The application always looks only at the cache.

class ReadThroughCache:
    """
    Cache handles DB queries on behalf of the app
    Application only calls the cache
    """

    def __init__(self, redis_client, db, default_ttl=3600):
        self.redis = redis_client
        self.db = db
        self.ttl = default_ttl

    def get(self, key: str, loader_fn=None) -> Optional[dict]:
        # Check cache
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        # Cache Miss → Load data via loader function
        if loader_fn:
            data = loader_fn()
            if data is not None:
                self.redis.setex(key, self.ttl, json.dumps(data))
            return data

        return None


# Usage example
cache = ReadThroughCache(r, db)

def get_product(product_id: int):
    return cache.get(
        f"product:{product_id}",
        loader_fn=lambda: db.query(
            "SELECT * FROM products WHERE id = %s", product_id
        )
    )

3. Write-Through

Writes propagate synchronously through the cache to the DB.

class WriteThroughCache:
    """
    Write: App → Cache → DB (synchronous)
    Read: App → Cache (always up-to-date)
    """

    def write(self, key: str, data: dict, db_writer_fn=None):
        # 1. Write to cache first
        self.redis.setex(key, self.ttl, json.dumps(data))

        # 2. Write to DB synchronously
        if db_writer_fn:
            db_writer_fn(data)

        return data

    def get(self, key: str) -> Optional[dict]:
        # Cache is always up-to-date, so read from cache only
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None


# Usage example
cache = WriteThroughCache(r, db)

def update_inventory(product_id: int, quantity: int):
    data = {"product_id": product_id, "quantity": quantity}
    cache.write(
        f"inventory:{product_id}",
        data,
        db_writer_fn=lambda d: db.execute(
            "UPDATE inventory SET quantity = %s WHERE product_id = %s",
            d["quantity"], d["product_id"]
        )
    )

Pros: Guarantees consistency between cache and DB Cons: Increased write latency (both cache + DB), caches unused data too

4. Write-Behind (Write-Back)

Writes go only to the cache, and DB updates are deferred asynchronously.

import threading
from collections import defaultdict

class WriteBehindCache:
    """
    Write: App → Cache (immediate) → DB (async, batch)
    For cases requiring high write throughput
    """

    def __init__(self, redis_client, db, flush_interval=5):
        self.redis = redis_client
        self.db = db
        self.dirty_keys = set()
        self.flush_interval = flush_interval
        self._start_flusher()

    def write(self, key: str, data: dict):
        # Write to cache only (immediate)
        self.redis.setex(key, 7200, json.dumps(data))
        # Mark as dirty (needs DB sync)
        self.redis.sadd("dirty_keys", key)

    def _start_flusher(self):
        """Periodically batch-flush dirty data to DB"""
        def flush():
            while True:
                try:
                    # Get dirty keys
                    dirty_keys = self.redis.smembers("dirty_keys")
                    if dirty_keys:
                        pipe = self.db.pipeline()
                        for key in dirty_keys:
                            data = self.redis.get(key)
                            if data:
                                pipe.add_to_batch(key, json.loads(data))
                        pipe.execute()  # Batch DB write

                        # Remove dirty marks
                        self.redis.srem("dirty_keys", *dirty_keys)
                except Exception as e:
                    print(f"Flush error: {e}")

                threading.Event().wait(self.flush_interval)

        thread = threading.Thread(target=flush, daemon=True)
        thread.start()

Pros: Very fast write performance, reduced DB load Cons: Risk of data loss (on cache failure), complex implementation

5. Refresh-Ahead

Proactively refreshes the cache before TTL expiration.

import time

class RefreshAheadCache:
    """
    Refreshes cache in background at a certain percentage of TTL
    Example: TTL 3600s, factor 0.8 → starts refresh at 2880s (80%) mark
    """

    def __init__(self, redis_client, refresh_factor=0.8):
        self.redis = redis_client
        self.refresh_factor = refresh_factor

    def get(self, key: str, ttl: int, loader_fn=None):
        cached = self.redis.get(key)

        if cached:
            # Check remaining TTL
            remaining_ttl = self.redis.ttl(key)
            threshold = ttl * (1 - self.refresh_factor)

            if remaining_ttl < threshold:
                # Proactively refresh in background
                self._async_refresh(key, ttl, loader_fn)

            return json.loads(cached)

        # Cache Miss
        if loader_fn:
            data = loader_fn()
            self.redis.setex(key, ttl, json.dumps(data))
            return data

        return None

    def _async_refresh(self, key, ttl, loader_fn):
        """Asynchronously refresh cache (with lock to prevent duplicates)"""
        lock_key = f"refresh_lock:{key}"
        if self.redis.set(lock_key, "1", nx=True, ex=30):
            # Lock acquired → refresh
            threading.Thread(
                target=self._refresh,
                args=(key, ttl, loader_fn, lock_key),
                daemon=True
            ).start()

    def _refresh(self, key, ttl, loader_fn, lock_key):
        try:
            data = loader_fn()
            self.redis.setex(key, ttl, json.dumps(data))
        finally:
            self.redis.delete(lock_key)

6. Cache Stampede Prevention

Solving the Cache Stampede problem where massive requests simultaneously hit the DB when TTL expires:

class StampedeProtectedCache:

    def get_with_lock(self, key: str, ttl: int, loader_fn):
        """Only one request queries DB via distributed lock"""
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        lock_key = f"lock:{key}"

        # Attempt distributed lock
        if self.redis.set(lock_key, "1", nx=True, ex=10):
            try:
                # Lock acquired → query DB and update cache
                data = loader_fn()
                self.redis.setex(key, ttl, json.dumps(data))
                return data
            finally:
                self.redis.delete(lock_key)
        else:
            # Lock failed → wait briefly and recheck cache
            time.sleep(0.1)
            cached = self.redis.get(key)
            if cached:
                return json.loads(cached)
            # Still not cached → query DB directly
            return loader_fn()

    def get_with_probabilistic_refresh(self, key: str, ttl: int, loader_fn, beta=1.0):
        """Probabilistic early refresh (XFetch algorithm)"""
        cached = self.redis.get(key)
        if cached:
            data = json.loads(cached)
            remaining_ttl = self.redis.ttl(key)
            delta = ttl - remaining_ttl  # Elapsed time

            # Probabilistically trigger early refresh
            # Probability increases as TTL approaches expiration
            import random, math
            if delta > 0:
                prob = math.exp(-remaining_ttl / (beta * delta))
                if random.random() < prob:
                    # Early refresh
                    new_data = loader_fn()
                    self.redis.setex(key, ttl, json.dumps(new_data))
                    return new_data

            return data

        # Cache Miss
        data = loader_fn()
        self.redis.setex(key, ttl, json.dumps(data))
        return data

TTL Strategy

# TTL guide by data type
TTL_STRATEGIES = {
    # Frequently changing data
    "session": 1800,          # 30 minutes
    "rate_limit": 60,         # 1 minute
    "realtime_stats": 10,     # 10 seconds

    # Occasionally changing data
    "user_profile": 3600,     # 1 hour
    "product_detail": 1800,   # 30 minutes
    "api_response": 300,      # 5 minutes

    # Rarely changing data
    "config": 86400,          # 24 hours
    "country_list": 604800,   # 7 days
    "static_content": 2592000, # 30 days

    # Negative cache (non-existent data)
    "not_found": 60,          # 1 minute (keep it short!)
}

# Add slight randomness to TTL (Stampede prevention)
import random

def ttl_with_jitter(base_ttl: int, jitter_pct: float = 0.1) -> int:
    """Add ±10% random deviation to TTL"""
    jitter = int(base_ttl * jitter_pct)
    return base_ttl + random.randint(-jitter, jitter)

# Usage: r.setex(key, ttl_with_jitter(3600), value)

Leveraging Redis Data Structures

# 1. String — Simple caching
r.setex("user:123", 3600, json.dumps(user_data))

# 2. Hash — When partial updates are needed
r.hset("user:123", mapping={
    "name": "Youngju",
    "email": "yj@example.com",
    "login_count": 42
})
r.hincrby("user:123", "login_count", 1)  # Partial update

# 3. Sorted Set — Rankings/Leaderboards
r.zadd("leaderboard", {"player1": 100, "player2": 85, "player3": 92})
top_3 = r.zrevrange("leaderboard", 0, 2, withscores=True)

# 4. List — Recent activity feed
r.lpush("feed:user:123", json.dumps(activity))
r.ltrim("feed:user:123", 0, 99)  # Keep only the latest 100

# 5. Set — Deduplication
r.sadd("online_users", "user:123", "user:456")
online_count = r.scard("online_users")

# 6. Stream — Event logs
r.xadd("events:orders", {"action": "created", "order_id": "ORD-123"})

Cache Monitoring

# Check cache efficiency with Redis INFO command
info = r.info("stats")
hits = info["keyspace_hits"]
misses = info["keyspace_misses"]
hit_rate = hits / (hits + misses) * 100

print(f"Cache Hit Rate: {hit_rate:.1f}%")
# Target: 95% or higher

# Check memory usage
memory_info = r.info("memory")
print(f"Used Memory: {memory_info['used_memory_human']}")
print(f"Peak Memory: {memory_info['used_memory_peak_human']}")
print(f"Fragmentation Ratio: {memory_info['mem_fragmentation_ratio']}")
# Monitoring with Redis CLI
redis-cli info stats | grep -E "keyspace_hits|keyspace_misses"
redis-cli info memory | grep "used_memory_human"

# Check slow queries
redis-cli slowlog get 10

# Real-time command monitoring
redis-cli monitor

Strategy Selection Guide

┌─────────────────────────────────────────────────┐
Which caching strategy to use?├─────────────────────────────────────────────────┤
│                                                 │
Read-heavy?  ──YES──> Cache-Aside                  (Most versatile)NO│     │                                          │
Write-heavy? ──YES──> Write-Behind                  (High write throughput)NO│     │                                          │
Consistency  ──YES──> Write-Through│  matters?              (Cache-DB always in sync)NO│     │                                          │
Latency      ──YES──> Refresh-Ahead│  sensitive?            (Minimize Cache Miss)└─────────────────────────────────────────────────┘

Quiz

Q1. In the Cache-Aside pattern, why do we "delete" the cache instead of "updating" it when data changes?

To prevent race conditions. If two requests update simultaneously, stale data can remain in the cache. Deleting ensures the next read fetches fresh data from the DB and re-caches it.

Q2. What is Cache Stampede and how do you prevent it?

It occurs when the TTL of a popular cache key expires and massive requests simultaneously query the DB. It can be prevented using distributed locks, probabilistic early refresh (XFetch), or the Refresh-Ahead pattern.

Q3. What is the biggest risk of the Write-Behind pattern?

Data that has not yet been written to the DB can be lost if the cache (Redis) fails. The risk can be mitigated with AOF/RDB persistence settings and Write-Ahead Logs.

Q4. What is Negative Caching?

It means caching data that does not exist in the DB. This prevents repeated lookups for the same non-existent key from hitting the DB. A short TTL (e.g., 60 seconds) is used.

Q5. Why add Jitter to TTL?

To prevent Cache Stampede caused by caches created at the same time expiring simultaneously. A random deviation of plus or minus 10% is added to the TTL.

Q6. What is the recommended target for Cache Hit Rate?

Generally, the target is 95% or higher. If it falls below 80%, the caching strategy should be re-evaluated.

Q7. When is Redis Hash more advantageous than String?

When you need to update only some fields of an object. String requires serializing/deserializing the entire data, while Hash allows reading/writing individual fields independently.

Conclusion

Caching strategy is not just about "putting data in Redis and calling it a day." You need to select the appropriate strategy based on data characteristics (read/write ratio, consistency requirements, update frequency), and consider TTL management, Stampede prevention, and monitoring to operate reliably in production.

References