Skip to content
Published on

Redis キャッシュ戦略 完全ガイド — Cache-Aside から Write-Behind まで実践パターン6選

Authors
  • Name
    Twitter
Redis Caching Strategies

はじめに

キャッシュはアプリケーションパフォーマンス最適化の中核戦略です。しかし「キャッシュを使えば速くなる」程度の理解だけでは、本番環境でさまざまな問題に直面します。キャッシュの整合性、キャッシュスタンピード、メモリ管理など実務で遭遇する課題を解決するには、正しいキャッシュ戦略の選択が不可欠です。

キャッシュ戦略の概要

┌─────────────────────────────────────────────────────┐
│                キャッシュ戦略の分類                     │
├──────────────────┬──────────────────────────────────┤
│   読み取り戦略    │   書き込み戦略                     │
├──────────────────┼──────────────────────────────────┤
Cache-AsideWrite-ThroughRead-ThroughWrite-Behind (Write-Back)Refresh-AheadWrite-Around└──────────────────┴──────────────────────────────────┘

1. Cache-Aside(Lazy Loading)

最も広く使われるパターンで、アプリケーションがキャッシュを直接管理します。

読み取りフロー:
[App]Cache hit?[Redis] → データ返却
  ↓ miss
[App][Database] → データ返却
[App][Redis] キャッシュ保存
import redis
import json
from typing import Optional

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class UserService:
    def get_user(self, user_id: int) -> Optional[dict]:
        cache_key = f"user:{user_id}"

        # 1. キャッシュ確認
        cached = r.get(cache_key)
        if cached:
            return json.loads(cached)

        # 2. Cache Miss → DB 問い合わせ
        user = self.db.query("SELECT * FROM users WHERE id = %s", user_id)
        if not user:
            # Negative caching: 存在しないデータもキャッシュ(短い TTL)
            r.setex(cache_key, 60, json.dumps(None))
            return None

        # 3. キャッシュ保存
        r.setex(cache_key, 3600, json.dumps(user))
        return user

    def update_user(self, user_id: int, data: dict):
        # DB 更新
        self.db.execute("UPDATE users SET ... WHERE id = %s", user_id)

        # キャッシュ無効化(削除)
        cache_key = f"user:{user_id}"
        r.delete(cache_key)
        # 注意: キャッシュの更新ではなく削除!
        # 次の読み取り時に最新データで再キャッシュ

メリット: 実装がシンプル、必要なデータのみキャッシュ、キャッシュ障害時はDBにフォールバック デメリット: 初回リクエストは常に遅い(Cold Start)、データ不整合の可能性

2. Read-Through

キャッシュがデータロードを担当します。アプリケーションは常にキャッシュだけを参照します。

class ReadThroughCache:
    """
    キャッシュがDBクエリを代行
    アプリケーションはキャッシュのみを呼び出す
    """

    def __init__(self, redis_client, db, default_ttl=3600):
        self.redis = redis_client
        self.db = db
        self.ttl = default_ttl

    def get(self, key: str, loader_fn=None) -> Optional[dict]:
        # キャッシュ確認
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        # Cache Miss → loader 関数でデータをロード
        if loader_fn:
            data = loader_fn()
            if data is not None:
                self.redis.setex(key, self.ttl, json.dumps(data))
            return data

        return None


# 使用例
cache = ReadThroughCache(r, db)

def get_product(product_id: int):
    return cache.get(
        f"product:{product_id}",
        loader_fn=lambda: db.query(
            "SELECT * FROM products WHERE id = %s", product_id
        )
    )

3. Write-Through

書き込みがキャッシュを経由してDBに同期的に伝播します。

class WriteThroughCache:
    """
    書き込み: App → Cache → DB(同期)
    読み取り: App → Cache(常に最新)
    """

    def write(self, key: str, data: dict, db_writer_fn=None):
        # 1. まずキャッシュに書き込み
        self.redis.setex(key, self.ttl, json.dumps(data))

        # 2. DBに同期的に書き込み
        if db_writer_fn:
            db_writer_fn(data)

        return data

    def get(self, key: str) -> Optional[dict]:
        # キャッシュは常に最新なのでキャッシュからのみ読み取り
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None


# 使用例
cache = WriteThroughCache(r, db)

def update_inventory(product_id: int, quantity: int):
    data = {"product_id": product_id, "quantity": quantity}
    cache.write(
        f"inventory:{product_id}",
        data,
        db_writer_fn=lambda d: db.execute(
            "UPDATE inventory SET quantity = %s WHERE product_id = %s",
            d["quantity"], d["product_id"]
        )
    )

メリット: キャッシュとDBの整合性を保証 デメリット: 書き込みレイテンシ増加(キャッシュ + DB の2回)、使わないデータもキャッシュ

4. Write-Behind(Write-Back)

書き込みをキャッシュにのみ行い、DB反映は非同期で遅延させます。

import threading
from collections import defaultdict

class WriteBehindCache:
    """
    書き込み: App → Cache(即時) → DB(非同期、バッチ)
    高い書き込みスループットが必要な場合
    """

    def __init__(self, redis_client, db, flush_interval=5):
        self.redis = redis_client
        self.db = db
        self.dirty_keys = set()
        self.flush_interval = flush_interval
        self._start_flusher()

    def write(self, key: str, data: dict):
        # キャッシュにのみ即時書き込み
        self.redis.setex(key, 7200, json.dumps(data))
        # Dirty マーク(DB反映が必要)
        self.redis.sadd("dirty_keys", key)

    def _start_flusher(self):
        """定期的に Dirty データをDBにバッチ反映"""
        def flush():
            while True:
                try:
                    # Dirty キーを取得
                    dirty_keys = self.redis.smembers("dirty_keys")
                    if dirty_keys:
                        pipe = self.db.pipeline()
                        for key in dirty_keys:
                            data = self.redis.get(key)
                            if data:
                                pipe.add_to_batch(key, json.loads(data))
                        pipe.execute()  # バッチ DB 書き込み

                        # Dirty マークを削除
                        self.redis.srem("dirty_keys", *dirty_keys)
                except Exception as e:
                    print(f"Flush error: {e}")

                threading.Event().wait(self.flush_interval)

        thread = threading.Thread(target=flush, daemon=True)
        thread.start()

メリット: 非常に高速な書き込みパフォーマンス、DB負荷の軽減 デメリット: データ損失のリスク(キャッシュ障害時)、実装が複雑

5. Refresh-Ahead

TTL 期限切れの前にキャッシュを事前に更新します。

import time

class RefreshAheadCache:
    """
    TTL の一定割合の時点でバックグラウンドでキャッシュを更新
    例: TTL 3600秒、factor 0.8 → 2880秒(80%)の時点で更新開始
    """

    def __init__(self, redis_client, refresh_factor=0.8):
        self.redis = redis_client
        self.refresh_factor = refresh_factor

    def get(self, key: str, ttl: int, loader_fn=None):
        cached = self.redis.get(key)

        if cached:
            # 残り TTL を確認
            remaining_ttl = self.redis.ttl(key)
            threshold = ttl * (1 - self.refresh_factor)

            if remaining_ttl < threshold:
                # バックグラウンドで事前に更新
                self._async_refresh(key, ttl, loader_fn)

            return json.loads(cached)

        # Cache Miss
        if loader_fn:
            data = loader_fn()
            self.redis.setex(key, ttl, json.dumps(data))
            return data

        return None

    def _async_refresh(self, key, ttl, loader_fn):
        """非同期でキャッシュを更新(ロックで重複防止)"""
        lock_key = f"refresh_lock:{key}"
        if self.redis.set(lock_key, "1", nx=True, ex=30):
            # ロック取得成功 → 更新
            threading.Thread(
                target=self._refresh,
                args=(key, ttl, loader_fn, lock_key),
                daemon=True
            ).start()

    def _refresh(self, key, ttl, loader_fn, lock_key):
        try:
            data = loader_fn()
            self.redis.setex(key, ttl, json.dumps(data))
        finally:
            self.redis.delete(lock_key)

6. Cache Stampede の防止

TTL 期限切れ時に大量のリクエストが同時にDBを叩く Cache Stampede 問題の解決:

class StampedeProtectedCache:

    def get_with_lock(self, key: str, ttl: int, loader_fn):
        """分散ロックで1つのリクエストのみDBに問い合わせ"""
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        lock_key = f"lock:{key}"

        # 分散ロックを試行
        if self.redis.set(lock_key, "1", nx=True, ex=10):
            try:
                # ロック取得 → DB問い合わせ & キャッシュ更新
                data = loader_fn()
                self.redis.setex(key, ttl, json.dumps(data))
                return data
            finally:
                self.redis.delete(lock_key)
        else:
            # ロック失敗 → 少し待ってからキャッシュを再確認
            time.sleep(0.1)
            cached = self.redis.get(key)
            if cached:
                return json.loads(cached)
            # まだなければ直接DB問い合わせ
            return loader_fn()

    def get_with_probabilistic_refresh(self, key: str, ttl: int, loader_fn, beta=1.0):
        """確率的早期更新(XFetch アルゴリズム)"""
        cached = self.redis.get(key)
        if cached:
            data = json.loads(cached)
            remaining_ttl = self.redis.ttl(key)
            delta = ttl - remaining_ttl  # 経過時間

            # 確率的に早期更新をトリガー
            # TTL 期限切れが近いほど更新確率が上昇
            import random, math
            if delta > 0:
                prob = math.exp(-remaining_ttl / (beta * delta))
                if random.random() < prob:
                    # 早期更新
                    new_data = loader_fn()
                    self.redis.setex(key, ttl, json.dumps(new_data))
                    return new_data

            return data

        # Cache Miss
        data = loader_fn()
        self.redis.setex(key, ttl, json.dumps(data))
        return data

TTL 戦略

# データ種別ごとの TTL ガイド
TTL_STRATEGIES = {
    # 頻繁に変更されるデータ
    "session": 1800,          # 30分
    "rate_limit": 60,         # 1分
    "realtime_stats": 10,     # 10秒

    # たまに変更されるデータ
    "user_profile": 3600,     # 1時間
    "product_detail": 1800,   # 30分
    "api_response": 300,      # 5分

    # ほとんど変更されないデータ
    "config": 86400,          # 24時間
    "country_list": 604800,   # 7日
    "static_content": 2592000, # 30日

    # Negative cache(存在しないデータ)
    "not_found": 60,          # 1分(短く!)
}

# TTL に若干のランダム性を追加(Stampede 防止)
import random

def ttl_with_jitter(base_ttl: int, jitter_pct: float = 0.1) -> int:
    """TTL に ±10% のランダム偏差を追加"""
    jitter = int(base_ttl * jitter_pct)
    return base_ttl + random.randint(-jitter, jitter)

# 使用: r.setex(key, ttl_with_jitter(3600), value)

Redis データ構造の活用

# 1. String — シンプルなキャッシュ
r.setex("user:123", 3600, json.dumps(user_data))

# 2. Hash — 部分更新が必要な場合
r.hset("user:123", mapping={
    "name": "Youngju",
    "email": "yj@example.com",
    "login_count": 42
})
r.hincrby("user:123", "login_count", 1)  # 部分更新

# 3. Sorted Set — ランキング/リーダーボード
r.zadd("leaderboard", {"player1": 100, "player2": 85, "player3": 92})
top_3 = r.zrevrange("leaderboard", 0, 2, withscores=True)

# 4. List — 最近のアクティビティフィード
r.lpush("feed:user:123", json.dumps(activity))
r.ltrim("feed:user:123", 0, 99)  # 最新100件のみ保持

# 5. Set — 重複排除
r.sadd("online_users", "user:123", "user:456")
online_count = r.scard("online_users")

# 6. Stream — イベントログ
r.xadd("events:orders", {"action": "created", "order_id": "ORD-123"})

キャッシュモニタリング

# Redis INFO コマンドでキャッシュ効率を確認
info = r.info("stats")
hits = info["keyspace_hits"]
misses = info["keyspace_misses"]
hit_rate = hits / (hits + misses) * 100

print(f"Cache Hit Rate: {hit_rate:.1f}%")
# 目標: 95% 以上

# メモリ使用量の確認
memory_info = r.info("memory")
print(f"Used Memory: {memory_info['used_memory_human']}")
print(f"Peak Memory: {memory_info['used_memory_peak_human']}")
print(f"Fragmentation Ratio: {memory_info['mem_fragmentation_ratio']}")
# Redis CLI でモニタリング
redis-cli info stats | grep -E "keyspace_hits|keyspace_misses"
redis-cli info memory | grep "used_memory_human"

# スロークエリの確認
redis-cli slowlog get 10

# リアルタイムコマンドモニタリング
redis-cli monitor

戦略選択ガイド

┌─────────────────────────────────────────────────┐
│        どのキャッシュ戦略を使うべきか?            │
├─────────────────────────────────────────────────┤
│                                                 │
│  読み取り中心? ──YES──> Cache-Aside│     │                   (最も汎用的)           │
NO│     │                                          │
│  書き込み中心? ──YES──> Write-Behind│     │                   (高い書き込みスループット)│
NO│     │                                          │
│  整合性重要?  ──YES──> Write-Through│     │                  (キャッシュ-DB常時同期)   │
NO│     │                                          │
│  レイテンシ敏感?──YES──> Refresh-Ahead│                        (Cache Miss 最小化)     │
└─────────────────────────────────────────────────┘

クイズ

Q1. Cache-Aside パターンでデータ更新時にキャッシュを「更新」せずに「削除」する理由は?

Race Condition の防止のためです。2つのリクエストが同時に更新すると、古いデータがキャッシュに残る可能性があります。削除すれば、次の読み取り時に最新データをDBから取得して再キャッシュします。

Q2. Cache Stampede とは何か、どう防ぐか?

人気のあるキャッシュキーの TTL が期限切れになった際に、大量のリクエストが同時にDBに問い合わせる現象です。分散ロック、確率的早期更新(XFetch)、Refresh-Ahead パターンで防止します。

Q3. Write-Behind パターンの最大のリスクは?

キャッシュ(Redis)障害時に、まだDBに反映されていないデータが失われる可能性があります。AOF/RDB 永続化設定と Write-Ahead Log でリスクを軽減できます。

Q4. Negative Caching とは?

DBに存在しないデータもキャッシュすることです。同じ存在しないキーへの繰り返しクエリがDBを叩くのを防ぎます。短い TTL(例: 60秒)を使用します。

Q5. TTL に Jitter を追加する理由は?

同時に作成されたキャッシュが一斉に期限切れとなり Cache Stampede が発生するのを防ぐためです。TTL に ±10% のランダム偏差を追加します。

Q6. Cache Hit Rate の推奨目標値は?

一般的に 95% 以上 を目標とします。80% 未満であればキャッシュ戦略を見直す必要があります。

Q7. Redis Hash が String より有利な状況は?

オブジェクトの一部のフィールドのみ更新する場合です。String はデータ全体をシリアライズ/デシリアライズする必要がありますが、Hash は個別フィールドを独立して読み書きできます。

まとめ

キャッシュ戦略は「Redis に入れれば終わり」ではありません。データの特性(読み取り/書き込み比率、整合性要件、更新頻度)に応じた適切な戦略を選択し、TTL 管理、Stampede 防止、モニタリングまで考慮してこそ、本番環境で安定的に運用できます。

参考資料