Skip to content
Published on

[Architecture] Rate Limitingとapi呼び出し量制限の実装完全ガイド

Authors

概要

APIを運用していると、予期しないトラフィック急増、DDoS攻撃、特定ユーザーの過度な呼び出しなど、さまざまな状況に直面します。 Rate Limitingはこれらの問題に対する最も基本的かつ効果的な防御手段です。 この記事では、Rate Limitingの核心アルゴリズムからRedis、Nginx、Kongを活用した実践的な実装、 LLM APIコスト制御パターン、分散環境でのRate Limitingまで総整理します。


1. Rate Limitingとは

1.1 なぜ必要か

Rate Limitingは、単位時間内に許可されるリクエスト数を制限する技法です。

主な目的:

  • DDoS防御: 悪意のある大量リクエストからサービスを保護
  • コスト制御: 外部API呼び出しコスト管理(特にLLM API)
  • 公正利用: 特定ユーザーがリソースを独占するのを防止
  • サービス安定性: 過負荷によるサービス障害を予防
  • SLA保証: すべてのユーザーに一定水準のサービス品質を提供

1.2 Rate Limitingの配置場所

Client --> [CDN/WAF] --> [Load Balancer] --> [API Gateway] --> [Application]
              |                |                  |                 |
          L3/L4制限      Connection制限    リクエストRate Limit  ビジネスロジック制限

Rate Limitingはインフラの複数レイヤーで適用でき、最も効果的な場所はAPI Gatewayレイヤーです。


2. Rate Limitingアルゴリズム

2.1 Token Bucket

最も広く使用されているアルゴリズムで、AWS API GatewayやNginxで採用されています。

原理:

  • バケットに一定速度でトークンが充填される
  • リクエストごとにトークン1個を消費
  • トークンがなければリクエスト拒否
  • バケット容量分のバーストトラフィックを許可
Bucket Capacity: 10 tokens
Refill Rate: 2 tokens/second

Time 0s: [##########] 10 tokens -> Request OK (9 left)
Time 0s: [#########.] 9 tokens  -> Request OK (8 left)
...
Time 0s: [..........] 0 tokens  -> Request REJECTED
Time 1s: [##........] 2 tokens  -> Refilled
import time
import threading

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate  # tokens per second
        self.tokens = capacity
        self.last_refill_time = time.monotonic()
        self.lock = threading.Lock()

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self.last_refill_time
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill_time = now

    def consume(self, tokens: int = 1) -> bool:
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_time(self) -> float:
        """次のトークンまでの待機時間"""
        with self.lock:
            self._refill()
            if self.tokens >= 1:
                return 0.0
            return (1 - self.tokens) / self.refill_rate


# 使用例
bucket = TokenBucket(capacity=10, refill_rate=2.0)

for i in range(15):
    if bucket.consume():
        print(f"Request {i+1}: Allowed")
    else:
        wait = bucket.wait_time()
        print(f"Request {i+1}: Rejected (wait {wait:.2f}s)")

利点: バースト許可、実装簡単、メモリ効率的 欠点: 正確なリクエスト数の追跡が困難

2.2 Leaky Bucket

一定の速度でリクエストを処理するアルゴリズムです。

Incoming Requests --> [Bucket/Queue] --> Processing (fixed rate)
                         |
                      Overflow --> Rejected

処理速度: 2 requests/second(固定)
キューサイズ: 5

速く入ってきたリクエストはキューで待機、キューが満杯なら拒否
import time
import threading
from collections import deque

class LeakyBucket:
    def __init__(self, capacity: int, leak_rate: float):
        self.capacity = capacity
        self.leak_rate = leak_rate  # requests per second
        self.queue = deque()
        self.lock = threading.Lock()
        self.last_leak_time = time.monotonic()

    def _leak(self):
        now = time.monotonic()
        elapsed = now - self.last_leak_time
        leaked = int(elapsed * self.leak_rate)
        if leaked > 0:
            for _ in range(min(leaked, len(self.queue))):
                self.queue.popleft()
            self.last_leak_time = now

    def allow(self) -> bool:
        with self.lock:
            self._leak()
            if len(self.queue) < self.capacity:
                self.queue.append(time.monotonic())
                return True
            return False

利点: 一定の処理速度を保証 欠点: バーストを許可しないため柔軟性に欠ける

2.3 Fixed Window Counter

最も単純なアルゴリズムで、固定された時間ウィンドウ内のリクエスト数をカウントします。

Window: 1 minute, Limit: 100 requests

|------- Window 1 -------|------- Window 2 -------|
12:00:00     12:00:59    12:01:00     12:01:59

  Count: 0...50...100       Count: 0...50...100
         (allowed)                 (allowed)

問題: Boundary Burst
12:00:30~12:00:59に100件 + 12:01:00~12:01:30に100件
= 60秒間で200件処理(制限の2倍!)
import time
import threading

class FixedWindowCounter:
    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window_seconds = window_seconds
        self.counters = {}  # key -> (window_start, count)
        self.lock = threading.Lock()

    def allow(self, key: str) -> bool:
        with self.lock:
            now = time.time()
            window_start = int(now / self.window_seconds) * self.window_seconds

            if key not in self.counters or self.counters[key][0] != window_start:
                self.counters[key] = (window_start, 0)

            if self.counters[key][1] < self.limit:
                self.counters[key] = (window_start, self.counters[key][1] + 1)
                return True
            return False

利点: 実装が非常にシンプル、メモリ使用量最小 欠点: ウィンドウ境界(boundary)でのリクエスト集中問題

2.4 Sliding Window Log

各リクエストのタイムスタンプをログとして記録し、正確に追跡します。

Window: 60 seconds, Limit: 5 requests

Log: [12:00:10, 12:00:25, 12:00:40, 12:00:55, 12:01:05]

New request at 12:01:10:
  1. 60秒より古いエントリを削除(12:00:10以前)
     -> [12:00:25, 12:00:40, 12:00:55, 12:01:05]
  2. Count: 4 < 5 -> Allowed
  3. ログに追加: [12:00:25, 12:00:40, 12:00:55, 12:01:05, 12:01:10]
import time
import threading
from collections import defaultdict

class SlidingWindowLog:
    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window_seconds = window_seconds
        self.logs = defaultdict(list)
        self.lock = threading.Lock()

    def allow(self, key: str) -> bool:
        with self.lock:
            now = time.time()
            cutoff = now - self.window_seconds

            # 古いエントリを削除
            self.logs[key] = [
                ts for ts in self.logs[key] if ts > cutoff
            ]

            if len(self.logs[key]) < self.limit:
                self.logs[key].append(now)
                return True
            return False

利点: 最も正確なRate Limiting 欠点: メモリ使用量が高い(すべてのリクエストタイムスタンプを保存)

2.5 Sliding Window Counter

Fixed WindowとSliding Window Logの利点を組み合わせたアルゴリズムです。 実務で最も多く使用されています。

Window: 60 seconds, Limit: 100 requests

Previous Window Count: 80
Current Window Count: 30
Current Window Progress: 25%(15秒経過)

Weighted Count = 80 * (1 - 0.25) + 30 = 80 * 0.75 + 30 = 90

90 < 100 -> Allowed
import time
import threading

class SlidingWindowCounter:
    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window_seconds = window_seconds
        self.windows = {}  # key -> (prev_count, curr_count, curr_window_start)
        self.lock = threading.Lock()

    def allow(self, key: str) -> bool:
        with self.lock:
            now = time.time()
            curr_window = int(now / self.window_seconds) * self.window_seconds
            window_progress = (now - curr_window) / self.window_seconds

            if key not in self.windows:
                self.windows[key] = (0, 0, curr_window)

            prev_count, curr_count, stored_window = self.windows[key]

            # ウィンドウが変わったら更新
            if stored_window != curr_window:
                if curr_window - stored_window >= self.window_seconds * 2:
                    prev_count = 0
                else:
                    prev_count = curr_count
                curr_count = 0

            # 加重カウント計算
            weighted = prev_count * (1 - window_progress) + curr_count

            if weighted < self.limit:
                curr_count += 1
                self.windows[key] = (prev_count, curr_count, curr_window)
                return True
            return False

2.6 アルゴリズム比較表

アルゴリズム精度メモリバースト許可実装複雑度適した場合
Token BucketO一般API、バースト必要
Leaky BucketX一定処理率必要
Fixed Window非常に低境界問題非常に低単純な制限
Sliding LogX正確な制限必要
Sliding CounterX実務汎用

3. 実装方法

3.1 Redis + Lua Script

Redisの原子的操作とLuaスクリプトを組み合わせて、分散環境でも安全なRate Limitingを実装します。

Sliding Window Counter(Redis + Lua)

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

# Luaスクリプト: Sliding Window Counter
SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

local clear_before = now - window
redis.call('ZREMRANGEBYSCORE', key, '-inf', clear_before)

local current_count = redis.call('ZCARD', key)
if current_count < limit then
    redis.call('ZADD', key, now, now .. '-' .. math.random(1000000))
    redis.call('EXPIRE', key, window + 1)
    return 1
else
    return 0
end
"""

sliding_window_sha = r.register_script(SLIDING_WINDOW_SCRIPT)

def is_rate_limited(key: str, limit: int = 100, window: int = 60) -> bool:
    """
    レート制限された場合True、許可された場合Falseを返す。
    """
    now = time.time()
    result = sliding_window_sha(
        keys=[f"ratelimit:{key}"],
        args=[now, window, limit]
    )
    return result == 0  # 0 = rate limited, 1 = allowed


# 使用例
user_id = "user-123"
for i in range(110):
    if is_rate_limited(user_id, limit=100, window=60):
        print(f"Request {i+1}: Rate Limited!")
    else:
        print(f"Request {i+1}: Allowed")

Token Bucket(Redis + Lua)

TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1])
local last_refill = tonumber(bucket[2])

if tokens == nil then
    tokens = capacity
    last_refill = now
end

local elapsed = now - last_refill
local new_tokens = elapsed * refill_rate
tokens = math.min(capacity, tokens + new_tokens)

local allowed = 0
if tokens >= requested then
    tokens = tokens - requested
    allowed = 1
end

redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)

return allowed
"""

token_bucket_sha = r.register_script(TOKEN_BUCKET_SCRIPT)

def token_bucket_check(
    key: str,
    capacity: int = 100,
    refill_rate: float = 10.0,
    tokens: int = 1
) -> bool:
    now = time.time()
    result = token_bucket_sha(
        keys=[f"tokenbucket:{key}"],
        args=[capacity, refill_rate, now, tokens]
    )
    return result == 1  # 1 = allowed

3.2 Nginx Rate Limiting

# nginx.conf

http {
    # Zone定義:クライアントIP基準、10MB共有メモリ
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

    # ユーザー別Rate Limiting(API Keyベース)
    map $http_x_api_key $api_key_zone {
        default $http_x_api_key;
    }
    limit_req_zone $api_key_zone zone=user_limit:10m rate=100r/m;

    server {
        listen 80;

        # デフォルトAPIエンドポイント
        location /api/ {
            # burst=20: 20個まで超過リクエスト許可(キューで待機)
            # nodelay: 待機なしで即時処理(burst内)
            limit_req zone=api_limit burst=20 nodelay;

            # Rate Limit超過時のレスポンスコード
            limit_req_status 429;

            proxy_pass http://backend;
        }

        # LLM APIエンドポイント(より厳格な制限)
        location /api/v1/completions {
            limit_req zone=user_limit burst=5 nodelay;
            limit_req_status 429;

            proxy_pass http://llm_backend;
        }

        # エラーページカスタマイズ
        error_page 429 = @rate_limited;
        location @rate_limited {
            default_type application/json;
            return 429 '{"error": "Too Many Requests", "retry_after": 60}';
        }
    }
}

3.3 Kong Rate Limiting Plugin

# kong.yml - Declarative Configuration

_format_version: '3.0'

services:
  - name: llm-api
    url: http://llm-backend:8000
    routes:
      - name: llm-route
        paths:
          - /api/v1/completions

plugins:
  # グローバルRate Limiting
  - name: rate-limiting
    config:
      minute: 100
      hour: 1000
      policy: redis
      redis_host: redis
      redis_port: 6379
      redis_database: 0
      fault_tolerant: true
      hide_client_headers: false
      error_code: 429
      error_message: 'Rate limit exceeded'

  # Consumer別Rate Limiting
  - name: rate-limiting
    consumer: premium-user
    config:
      minute: 500
      hour: 10000
      policy: redis
      redis_host: redis

3.4 Express.js Rate Limiting

const rateLimit = require('express-rate-limit')
const RedisStore = require('rate-limit-redis')
const Redis = require('ioredis')

const redis = new Redis({
  host: 'localhost',
  port: 6379,
})

// デフォルトRate Limiter
const apiLimiter = rateLimit({
  windowMs: 60 * 1000, // 1 minute
  max: 100,
  standardHeaders: true, // RateLimit-* headers
  legacyHeaders: false, // X-RateLimit-* headers
  store: new RedisStore({
    sendCommand: (...args) => redis.call(...args),
  }),
  message: {
    error: 'Too Many Requests',
    retryAfter: 60,
  },
  keyGenerator: (req) => {
    return req.headers['x-api-key'] || req.ip
  },
})

// LLM API用厳格なLimiter
const llmLimiter = rateLimit({
  windowMs: 60 * 1000,
  max: 20,
  store: new RedisStore({
    sendCommand: (...args) => redis.call(...args),
    prefix: 'rl:llm:',
  }),
  keyGenerator: (req) => req.headers['x-api-key'],
})

const app = require('express')()
app.use('/api/', apiLimiter)
app.use('/api/v1/completions', llmLimiter)

3.5 Spring Boot Rate Limiting

// Bucket4j + Redisベース Rate Limiting

@Configuration
public class RateLimitConfig {

    @Bean
    public ProxyManager<String> proxyManager(RedissonClient redissonClient) {
        return Bucket4jRedisson.casBasedBuilder(redissonClient)
            .build();
    }
}

@Component
public class RateLimitInterceptor implements HandlerInterceptor {

    private final ProxyManager<String> proxyManager;

    @Override
    public boolean preHandle(
        HttpServletRequest request,
        HttpServletResponse response,
        Object handler
    ) throws Exception {
        String apiKey = request.getHeader("X-API-Key");
        if (apiKey == null) {
            response.setStatus(401);
            return false;
        }

        BucketConfiguration config = BucketConfiguration.builder()
            .addLimit(
                Bandwidth.builder()
                    .capacity(100)
                    .refillGreedy(100, Duration.ofMinutes(1))
                    .build()
            )
            .build();

        Bucket bucket = proxyManager.builder()
            .build(apiKey, () -> config);

        ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);

        if (probe.isConsumed()) {
            response.setHeader("X-RateLimit-Remaining",
                String.valueOf(probe.getRemainingTokens()));
            return true;
        } else {
            long waitSeconds = probe.getNanosToWaitForRefill() / 1_000_000_000;
            response.setStatus(429);
            response.setHeader("Retry-After", String.valueOf(waitSeconds));
            response.getWriter().write(
                "{\"error\":\"Rate limit exceeded\"}"
            );
            return false;
        }
    }
}

4. 分散環境Rate Limiting

4.1 中央集権型(Redis)

+--------+     +--------+     +--------+
| Node 1 |     | Node 2 |     | Node 3 |
+---+----+     +---+----+     +---+----+
    |              |              |
    +---------+----+---------+----+
              |              |
         +----+----+    +---+---+
         |  Redis  |    | Redis |
         | Primary |    | Replica|
         +---------+    +-------+

利点: すべてのノードで一貫したカウント 欠点: Redisへのネットワーク遅延追加、RedisがSPOFになる可能性

4.2 ローカルカウンター + 同期化

+--------+     +--------+     +--------+
| Node 1 |     | Node 2 |     | Node 3 |
| Local:  |     | Local:  |     | Local:  |
|  30/100 |     |  25/100 |     |  20/100 |
+---+----+     +---+----+     +---+----+
    |              |              |
    +----Periodic Sync (every 5s)-----+
              |
         +----+----+
         |  Redis  |
         | Global: |
         |  75/100 |
         +---------+
import time
import threading
import redis

class DistributedRateLimiter:
    def __init__(
        self,
        redis_client: redis.Redis,
        key_prefix: str,
        global_limit: int,
        window_seconds: int,
        sync_interval: float = 5.0,
        local_threshold: float = 0.1,
    ):
        self.redis = redis_client
        self.key_prefix = key_prefix
        self.global_limit = global_limit
        self.window_seconds = window_seconds
        self.sync_interval = sync_interval
        # ローカル許容量 = 全体制限の10%
        self.local_limit = int(global_limit * local_threshold)
        self.local_count = 0
        self.lock = threading.Lock()

        # 定期同期スレッド開始
        self._start_sync_thread()

    def _start_sync_thread(self):
        def sync_loop():
            while True:
                time.sleep(self.sync_interval)
                self._sync_to_redis()

        thread = threading.Thread(target=sync_loop, daemon=True)
        thread.start()

    def _sync_to_redis(self):
        with self.lock:
            if self.local_count > 0:
                key = f"{self.key_prefix}:global"
                self.redis.incrby(key, self.local_count)
                self.redis.expire(key, self.window_seconds + 10)
                self.local_count = 0

    def allow(self, key: str) -> bool:
        with self.lock:
            # ローカルカウンターチェック
            if self.local_count >= self.local_limit:
                self._sync_to_redis()
                # グローバルカウンターチェック
                global_key = f"{self.key_prefix}:global"
                global_count = int(self.redis.get(global_key) or 0)
                if global_count >= self.global_limit:
                    return False

            self.local_count += 1
            return True

5. LLM APIコスト制御

5.1 OpenAI/Anthropic API Rate Limitsの理解

OpenAI(GPT-4o基準):
  - RPM (Requests Per Minute): Tierにより異なる
  - TPM (Tokens Per Minute): 入力+出力トークン合算
  - RPD (Requests Per Day): 日次制限

Anthropic(Claude基準):
  - RPM: Tierにより異なる
  - Input TPM / Output TPM: 別々に管理

5.2 ユーザー別、モデル別Rate Limiting

import redis
import json
from dataclasses import dataclass

@dataclass
class RateLimitConfig:
    rpm: int          # Requests per minute
    tpm: int          # Tokens per minute
    rpd: int          # Requests per day
    daily_budget: float  # USD

# ユーザーTier別設定
TIER_LIMITS = {
    "free": RateLimitConfig(rpm=10, tpm=10000, rpd=100, daily_budget=1.0),
    "basic": RateLimitConfig(rpm=50, tpm=50000, rpd=1000, daily_budget=10.0),
    "premium": RateLimitConfig(rpm=200, tpm=200000, rpd=10000, daily_budget=100.0),
    "enterprise": RateLimitConfig(rpm=1000, tpm=1000000, rpd=100000, daily_budget=1000.0),
}

class LLMRateLimiter:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def check_rate_limit(
        self,
        user_id: str,
        tier: str,
        model: str,
        estimated_tokens: int,
    ) -> dict:
        limits = TIER_LIMITS.get(tier, TIER_LIMITS["free"])
        now_minute = int(time.time() / 60)
        now_day = int(time.time() / 86400)

        pipe = self.redis.pipeline()

        # RPMチェック
        rpm_key = f"rl:rpm:{user_id}:{now_minute}"
        pipe.incr(rpm_key)
        pipe.expire(rpm_key, 120)

        # TPMチェック
        tpm_key = f"rl:tpm:{user_id}:{now_minute}"
        pipe.incrby(tpm_key, estimated_tokens)
        pipe.expire(tpm_key, 120)

        # RPDチェック
        rpd_key = f"rl:rpd:{user_id}:{now_day}"
        pipe.incr(rpd_key)
        pipe.expire(rpd_key, 172800)

        results = pipe.execute()
        current_rpm = results[0]
        current_tpm = results[2]
        current_rpd = results[4]

        if current_rpm > limits.rpm:
            return {
                "allowed": False,
                "reason": "RPM limit exceeded",
                "limit": limits.rpm,
                "current": current_rpm,
                "retry_after": 60,
            }

        if current_tpm > limits.tpm:
            return {
                "allowed": False,
                "reason": "TPM limit exceeded",
                "limit": limits.tpm,
                "current": current_tpm,
                "retry_after": 60,
            }

        if current_rpd > limits.rpd:
            return {
                "allowed": False,
                "reason": "Daily request limit exceeded",
                "limit": limits.rpd,
                "current": current_rpd,
                "retry_after": 3600,
            }

        return {"allowed": True}

5.3 トークンカウントとコスト予測

import tiktoken

# モデル別価格(USD per 1K tokens、参考用)
MODEL_PRICING = {
    "gpt-4o": {"input": 0.0025, "output": 0.01},
    "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    "claude-sonnet-4-20250514": {"input": 0.003, "output": 0.015},
    "claude-haiku-35": {"input": 0.0008, "output": 0.004},
}

def estimate_cost(
    model: str,
    input_text: str,
    estimated_output_tokens: int = 500,
) -> dict:
    try:
        enc = tiktoken.encoding_for_model(model)
        input_tokens = len(enc.encode(input_text))
    except KeyError:
        input_tokens = len(input_text) // 4

    pricing = MODEL_PRICING.get(model, {"input": 0.01, "output": 0.03})

    input_cost = (input_tokens / 1000) * pricing["input"]
    output_cost = (estimated_output_tokens / 1000) * pricing["output"]

    return {
        "input_tokens": input_tokens,
        "estimated_output_tokens": estimated_output_tokens,
        "estimated_cost_usd": round(input_cost + output_cost, 6),
    }

5.4 Budget Enforcement

class BudgetEnforcer:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def check_budget(
        self,
        user_id: str,
        estimated_cost: float,
        daily_limit: float,
        monthly_limit: float,
    ) -> dict:
        today = time.strftime("%Y-%m-%d")
        month = time.strftime("%Y-%m")

        daily_key = f"budget:daily:{user_id}:{today}"
        monthly_key = f"budget:monthly:{user_id}:{month}"

        daily_spent = float(self.redis.get(daily_key) or 0)
        monthly_spent = float(self.redis.get(monthly_key) or 0)

        if daily_spent + estimated_cost > daily_limit:
            return {
                "allowed": False,
                "reason": "Daily budget exceeded",
                "spent": daily_spent,
                "limit": daily_limit,
            }

        if monthly_spent + estimated_cost > monthly_limit:
            return {
                "allowed": False,
                "reason": "Monthly budget exceeded",
                "spent": monthly_spent,
                "limit": monthly_limit,
            }

        return {"allowed": True}

    def record_spend(self, user_id: str, cost: float):
        today = time.strftime("%Y-%m-%d")
        month = time.strftime("%Y-%m")

        pipe = self.redis.pipeline()
        pipe.incrbyfloat(f"budget:daily:{user_id}:{today}", cost)
        pipe.expire(f"budget:daily:{user_id}:{today}", 172800)
        pipe.incrbyfloat(f"budget:monthly:{user_id}:{month}", cost)
        pipe.expire(f"budget:monthly:{user_id}:{month}", 2764800)
        pipe.execute()

5.5 Queue + Rate Limiter組み合わせパターン

+--------+    +------------+    +---------------+    +--------+
| Client +--->| API Gateway|+-->| Rate Limiter  +--->| Queue  |
+--------+    | (check     |    | (Token Bucket |    | (Redis/|
              |  budget)   |    |  + Budget)    |    | RabbitMQ)
              +------------+    +-------+-------+    +---+----+
                                                         |
                                +----------------+       |
                                | Worker Pool    |<------+
                                | (rate-aware    |
                                |  LLM caller)   |
                                +-------+--------+
                                        |
                                +-------v--------+
                                | LLM Provider   |
                                | (OpenAI, etc.) |
                                +----------------+

6. HTTPレスポンスヘッダー

6.1 標準レスポンス

HTTP/1.1 429 Too Many Requests
Content-Type: application/json
Retry-After: 30
RateLimit-Limit: 100
RateLimit-Remaining: 0
RateLimit-Reset: 1679900400

{
  "error": {
    "code": "rate_limit_exceeded",
    "message": "You have exceeded the rate limit. Please retry after 30 seconds.",
    "type": "rate_limit_error"
  }
}

6.2 レスポンスヘッダー実装

from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    api_key = request.headers.get("X-API-Key", request.client.host)

    result = rate_limiter.check(api_key)

    if not result["allowed"]:
        return JSONResponse(
            status_code=429,
            content={
                "error": {
                    "code": "rate_limit_exceeded",
                    "message": result["reason"],
                }
            },
            headers={
                "Retry-After": str(result.get("retry_after", 60)),
                "RateLimit-Limit": str(result["limit"]),
                "RateLimit-Remaining": "0",
                "RateLimit-Reset": str(result.get("reset_at", "")),
            }
        )

    response = await call_next(request)

    # 成功レスポンスにもRate Limit情報を含める
    response.headers["RateLimit-Limit"] = str(result.get("limit", ""))
    response.headers["RateLimit-Remaining"] = str(result.get("remaining", ""))
    response.headers["RateLimit-Reset"] = str(result.get("reset_at", ""))

    return response

7. モニタリング

7.1 Prometheus Metrics

from prometheus_client import Counter, Histogram, Gauge

# Rate Limitメトリクス
rate_limit_total = Counter(
    'rate_limit_requests_total',
    'Total rate limited requests',
    ['user_tier', 'endpoint', 'result']
)

rate_limit_remaining = Gauge(
    'rate_limit_remaining',
    'Remaining rate limit tokens',
    ['user_id', 'limit_type']
)

request_cost = Histogram(
    'llm_request_cost_usd',
    'Cost of LLM requests in USD',
    ['model', 'user_tier'],
    buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)

budget_utilization = Gauge(
    'budget_utilization_ratio',
    'Budget utilization ratio (0-1)',
    ['user_id', 'period']
)

8. ベストプラクティスとアンチパターン

8.1 ベストプラクティス

  1. 階層的Rate Limiting: グローバル、サービス別、ユーザー別の多重レイヤー適用
  2. グレースフルデグラデーション: Rate Limit超過時にキャッシュされた応答や簡略化された応答を提供
  3. ヘッダー公開: クライアントが残りの割り当てを確認できるようRateLimitヘッダーを提供
  4. 適応的制限: サーバー負荷に応じてRate Limitを動的に調整
  5. モニタリングとアラート: Rate Limit拒否率がしきい値を超えたらアラート
  6. ドキュメント化: APIドキュメントにRate Limitポリシーを明確に記述

8.2 アンチパターン

1. クライアントIPのみでRate Limiting
   問題: NAT/プロキシ背後の多数のユーザーが1つのIPを共有
   解決: API Key + IP組み合わせ、または認証ベースRate Limiting

2. 単一レイヤーRate Limiting
   問題: グローバル制限のみでは正常ユーザーも影響を受ける
   解決: ユーザー別、エンドポイント別、Tier別多重レイヤー

3. ハードコードされたRate Limit値
   問題: 変更時にデプロイが必要
   解決: 設定サーバーや環境変数で動的管理

4. 内部APIにRate Limitなし
   問題: 内部サービス間の障害伝播
   解決: 内部APIにもRate Limiting適用(サーキットブレーカーと併用)

5. リトライロジックなしのクライアント
   問題: 429レスポンス時に即座に失敗
   解決: Retry-Afterヘッダーを尊重するリトライロジック実装

9. まとめ

Rate Limitingは単純な防御技法ではなく、サービスの安定性、公正性、コスト効率性を保証する核心インフラコンポーネントです。

主要ポイント:

  1. アルゴリズム選択: Sliding Window Counterが実務で最も汎用的
  2. 実装ツール: Redis + Luaが分散環境で最も信頼性が高い
  3. LLMコスト制御: RPM/TPM Rate Limiting + Budget Enforcement組み合わせ必須
  4. 分散環境: 中央集権型(Redis)またはローカル+同期化方式を選択
  5. モニタリング: Rate Limit拒否率、コスト追跡、Budget消費率のモニタリング必須

適切なRate Limitingはサービスを保護するだけでなく、ユーザーに予測可能なサービス体験を提供します。


参考資料