- Authors

- Name
- Youngju Kim
- @fjvbn20031
概要
APIを運用していると、予期しないトラフィック急増、DDoS攻撃、特定ユーザーの過度な呼び出しなど、さまざまな状況に直面します。 Rate Limitingはこれらの問題に対する最も基本的かつ効果的な防御手段です。 この記事では、Rate Limitingの核心アルゴリズムからRedis、Nginx、Kongを活用した実践的な実装、 LLM APIコスト制御パターン、分散環境でのRate Limitingまで総整理します。
1. Rate Limitingとは
1.1 なぜ必要か
Rate Limitingは、単位時間内に許可されるリクエスト数を制限する技法です。
主な目的:
- DDoS防御: 悪意のある大量リクエストからサービスを保護
- コスト制御: 外部API呼び出しコスト管理(特にLLM API)
- 公正利用: 特定ユーザーがリソースを独占するのを防止
- サービス安定性: 過負荷によるサービス障害を予防
- SLA保証: すべてのユーザーに一定水準のサービス品質を提供
1.2 Rate Limitingの配置場所
Client --> [CDN/WAF] --> [Load Balancer] --> [API Gateway] --> [Application]
| | | |
L3/L4制限 Connection制限 リクエストRate Limit ビジネスロジック制限
Rate Limitingはインフラの複数レイヤーで適用でき、最も効果的な場所はAPI Gatewayレイヤーです。
2. Rate Limitingアルゴリズム
2.1 Token Bucket
最も広く使用されているアルゴリズムで、AWS API GatewayやNginxで採用されています。
原理:
- バケットに一定速度でトークンが充填される
- リクエストごとにトークン1個を消費
- トークンがなければリクエスト拒否
- バケット容量分のバーストトラフィックを許可
Bucket Capacity: 10 tokens
Refill Rate: 2 tokens/second
Time 0s: [##########] 10 tokens -> Request OK (9 left)
Time 0s: [#########.] 9 tokens -> Request OK (8 left)
...
Time 0s: [..........] 0 tokens -> Request REJECTED
Time 1s: [##........] 2 tokens -> Refilled
import time
import threading
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate # tokens per second
self.tokens = capacity
self.last_refill_time = time.monotonic()
self.lock = threading.Lock()
def _refill(self):
now = time.monotonic()
elapsed = now - self.last_refill_time
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill_time = now
def consume(self, tokens: int = 1) -> bool:
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_time(self) -> float:
"""次のトークンまでの待機時間"""
with self.lock:
self._refill()
if self.tokens >= 1:
return 0.0
return (1 - self.tokens) / self.refill_rate
# 使用例
bucket = TokenBucket(capacity=10, refill_rate=2.0)
for i in range(15):
if bucket.consume():
print(f"Request {i+1}: Allowed")
else:
wait = bucket.wait_time()
print(f"Request {i+1}: Rejected (wait {wait:.2f}s)")
利点: バースト許可、実装簡単、メモリ効率的 欠点: 正確なリクエスト数の追跡が困難
2.2 Leaky Bucket
一定の速度でリクエストを処理するアルゴリズムです。
Incoming Requests --> [Bucket/Queue] --> Processing (fixed rate)
|
Overflow --> Rejected
処理速度: 2 requests/second(固定)
キューサイズ: 5
速く入ってきたリクエストはキューで待機、キューが満杯なら拒否
import time
import threading
from collections import deque
class LeakyBucket:
def __init__(self, capacity: int, leak_rate: float):
self.capacity = capacity
self.leak_rate = leak_rate # requests per second
self.queue = deque()
self.lock = threading.Lock()
self.last_leak_time = time.monotonic()
def _leak(self):
now = time.monotonic()
elapsed = now - self.last_leak_time
leaked = int(elapsed * self.leak_rate)
if leaked > 0:
for _ in range(min(leaked, len(self.queue))):
self.queue.popleft()
self.last_leak_time = now
def allow(self) -> bool:
with self.lock:
self._leak()
if len(self.queue) < self.capacity:
self.queue.append(time.monotonic())
return True
return False
利点: 一定の処理速度を保証 欠点: バーストを許可しないため柔軟性に欠ける
2.3 Fixed Window Counter
最も単純なアルゴリズムで、固定された時間ウィンドウ内のリクエスト数をカウントします。
Window: 1 minute, Limit: 100 requests
|------- Window 1 -------|------- Window 2 -------|
12:00:00 12:00:59 12:01:00 12:01:59
Count: 0...50...100 Count: 0...50...100
(allowed) (allowed)
問題: Boundary Burst
12:00:30~12:00:59に100件 + 12:01:00~12:01:30に100件
= 60秒間で200件処理(制限の2倍!)
import time
import threading
class FixedWindowCounter:
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window_seconds = window_seconds
self.counters = {} # key -> (window_start, count)
self.lock = threading.Lock()
def allow(self, key: str) -> bool:
with self.lock:
now = time.time()
window_start = int(now / self.window_seconds) * self.window_seconds
if key not in self.counters or self.counters[key][0] != window_start:
self.counters[key] = (window_start, 0)
if self.counters[key][1] < self.limit:
self.counters[key] = (window_start, self.counters[key][1] + 1)
return True
return False
利点: 実装が非常にシンプル、メモリ使用量最小 欠点: ウィンドウ境界(boundary)でのリクエスト集中問題
2.4 Sliding Window Log
各リクエストのタイムスタンプをログとして記録し、正確に追跡します。
Window: 60 seconds, Limit: 5 requests
Log: [12:00:10, 12:00:25, 12:00:40, 12:00:55, 12:01:05]
New request at 12:01:10:
1. 60秒より古いエントリを削除(12:00:10以前)
-> [12:00:25, 12:00:40, 12:00:55, 12:01:05]
2. Count: 4 < 5 -> Allowed
3. ログに追加: [12:00:25, 12:00:40, 12:00:55, 12:01:05, 12:01:10]
import time
import threading
from collections import defaultdict
class SlidingWindowLog:
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window_seconds = window_seconds
self.logs = defaultdict(list)
self.lock = threading.Lock()
def allow(self, key: str) -> bool:
with self.lock:
now = time.time()
cutoff = now - self.window_seconds
# 古いエントリを削除
self.logs[key] = [
ts for ts in self.logs[key] if ts > cutoff
]
if len(self.logs[key]) < self.limit:
self.logs[key].append(now)
return True
return False
利点: 最も正確なRate Limiting 欠点: メモリ使用量が高い(すべてのリクエストタイムスタンプを保存)
2.5 Sliding Window Counter
Fixed WindowとSliding Window Logの利点を組み合わせたアルゴリズムです。 実務で最も多く使用されています。
Window: 60 seconds, Limit: 100 requests
Previous Window Count: 80
Current Window Count: 30
Current Window Progress: 25%(15秒経過)
Weighted Count = 80 * (1 - 0.25) + 30 = 80 * 0.75 + 30 = 90
90 < 100 -> Allowed
import time
import threading
class SlidingWindowCounter:
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window_seconds = window_seconds
self.windows = {} # key -> (prev_count, curr_count, curr_window_start)
self.lock = threading.Lock()
def allow(self, key: str) -> bool:
with self.lock:
now = time.time()
curr_window = int(now / self.window_seconds) * self.window_seconds
window_progress = (now - curr_window) / self.window_seconds
if key not in self.windows:
self.windows[key] = (0, 0, curr_window)
prev_count, curr_count, stored_window = self.windows[key]
# ウィンドウが変わったら更新
if stored_window != curr_window:
if curr_window - stored_window >= self.window_seconds * 2:
prev_count = 0
else:
prev_count = curr_count
curr_count = 0
# 加重カウント計算
weighted = prev_count * (1 - window_progress) + curr_count
if weighted < self.limit:
curr_count += 1
self.windows[key] = (prev_count, curr_count, curr_window)
return True
return False
2.6 アルゴリズム比較表
| アルゴリズム | 精度 | メモリ | バースト許可 | 実装複雑度 | 適した場合 |
|---|---|---|---|---|---|
| Token Bucket | 中 | 低 | O | 低 | 一般API、バースト必要 |
| Leaky Bucket | 中 | 低 | X | 低 | 一定処理率必要 |
| Fixed Window | 低 | 非常に低 | 境界問題 | 非常に低 | 単純な制限 |
| Sliding Log | 高 | 高 | X | 中 | 正確な制限必要 |
| Sliding Counter | 高 | 低 | X | 中 | 実務汎用 |
3. 実装方法
3.1 Redis + Lua Script
Redisの原子的操作とLuaスクリプトを組み合わせて、分散環境でも安全なRate Limitingを実装します。
Sliding Window Counter(Redis + Lua)
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
# Luaスクリプト: Sliding Window Counter
SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local clear_before = now - window
redis.call('ZREMRANGEBYSCORE', key, '-inf', clear_before)
local current_count = redis.call('ZCARD', key)
if current_count < limit then
redis.call('ZADD', key, now, now .. '-' .. math.random(1000000))
redis.call('EXPIRE', key, window + 1)
return 1
else
return 0
end
"""
sliding_window_sha = r.register_script(SLIDING_WINDOW_SCRIPT)
def is_rate_limited(key: str, limit: int = 100, window: int = 60) -> bool:
"""
レート制限された場合True、許可された場合Falseを返す。
"""
now = time.time()
result = sliding_window_sha(
keys=[f"ratelimit:{key}"],
args=[now, window, limit]
)
return result == 0 # 0 = rate limited, 1 = allowed
# 使用例
user_id = "user-123"
for i in range(110):
if is_rate_limited(user_id, limit=100, window=60):
print(f"Request {i+1}: Rate Limited!")
else:
print(f"Request {i+1}: Allowed")
Token Bucket(Redis + Lua)
TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1])
local last_refill = tonumber(bucket[2])
if tokens == nil then
tokens = capacity
last_refill = now
end
local elapsed = now - last_refill
local new_tokens = elapsed * refill_rate
tokens = math.min(capacity, tokens + new_tokens)
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
return allowed
"""
token_bucket_sha = r.register_script(TOKEN_BUCKET_SCRIPT)
def token_bucket_check(
key: str,
capacity: int = 100,
refill_rate: float = 10.0,
tokens: int = 1
) -> bool:
now = time.time()
result = token_bucket_sha(
keys=[f"tokenbucket:{key}"],
args=[capacity, refill_rate, now, tokens]
)
return result == 1 # 1 = allowed
3.2 Nginx Rate Limiting
# nginx.conf
http {
# Zone定義:クライアントIP基準、10MB共有メモリ
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
# ユーザー別Rate Limiting(API Keyベース)
map $http_x_api_key $api_key_zone {
default $http_x_api_key;
}
limit_req_zone $api_key_zone zone=user_limit:10m rate=100r/m;
server {
listen 80;
# デフォルトAPIエンドポイント
location /api/ {
# burst=20: 20個まで超過リクエスト許可(キューで待機)
# nodelay: 待機なしで即時処理(burst内)
limit_req zone=api_limit burst=20 nodelay;
# Rate Limit超過時のレスポンスコード
limit_req_status 429;
proxy_pass http://backend;
}
# LLM APIエンドポイント(より厳格な制限)
location /api/v1/completions {
limit_req zone=user_limit burst=5 nodelay;
limit_req_status 429;
proxy_pass http://llm_backend;
}
# エラーページカスタマイズ
error_page 429 = @rate_limited;
location @rate_limited {
default_type application/json;
return 429 '{"error": "Too Many Requests", "retry_after": 60}';
}
}
}
3.3 Kong Rate Limiting Plugin
# kong.yml - Declarative Configuration
_format_version: '3.0'
services:
- name: llm-api
url: http://llm-backend:8000
routes:
- name: llm-route
paths:
- /api/v1/completions
plugins:
# グローバルRate Limiting
- name: rate-limiting
config:
minute: 100
hour: 1000
policy: redis
redis_host: redis
redis_port: 6379
redis_database: 0
fault_tolerant: true
hide_client_headers: false
error_code: 429
error_message: 'Rate limit exceeded'
# Consumer別Rate Limiting
- name: rate-limiting
consumer: premium-user
config:
minute: 500
hour: 10000
policy: redis
redis_host: redis
3.4 Express.js Rate Limiting
const rateLimit = require('express-rate-limit')
const RedisStore = require('rate-limit-redis')
const Redis = require('ioredis')
const redis = new Redis({
host: 'localhost',
port: 6379,
})
// デフォルトRate Limiter
const apiLimiter = rateLimit({
windowMs: 60 * 1000, // 1 minute
max: 100,
standardHeaders: true, // RateLimit-* headers
legacyHeaders: false, // X-RateLimit-* headers
store: new RedisStore({
sendCommand: (...args) => redis.call(...args),
}),
message: {
error: 'Too Many Requests',
retryAfter: 60,
},
keyGenerator: (req) => {
return req.headers['x-api-key'] || req.ip
},
})
// LLM API用厳格なLimiter
const llmLimiter = rateLimit({
windowMs: 60 * 1000,
max: 20,
store: new RedisStore({
sendCommand: (...args) => redis.call(...args),
prefix: 'rl:llm:',
}),
keyGenerator: (req) => req.headers['x-api-key'],
})
const app = require('express')()
app.use('/api/', apiLimiter)
app.use('/api/v1/completions', llmLimiter)
3.5 Spring Boot Rate Limiting
// Bucket4j + Redisベース Rate Limiting
@Configuration
public class RateLimitConfig {
@Bean
public ProxyManager<String> proxyManager(RedissonClient redissonClient) {
return Bucket4jRedisson.casBasedBuilder(redissonClient)
.build();
}
}
@Component
public class RateLimitInterceptor implements HandlerInterceptor {
private final ProxyManager<String> proxyManager;
@Override
public boolean preHandle(
HttpServletRequest request,
HttpServletResponse response,
Object handler
) throws Exception {
String apiKey = request.getHeader("X-API-Key");
if (apiKey == null) {
response.setStatus(401);
return false;
}
BucketConfiguration config = BucketConfiguration.builder()
.addLimit(
Bandwidth.builder()
.capacity(100)
.refillGreedy(100, Duration.ofMinutes(1))
.build()
)
.build();
Bucket bucket = proxyManager.builder()
.build(apiKey, () -> config);
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) {
response.setHeader("X-RateLimit-Remaining",
String.valueOf(probe.getRemainingTokens()));
return true;
} else {
long waitSeconds = probe.getNanosToWaitForRefill() / 1_000_000_000;
response.setStatus(429);
response.setHeader("Retry-After", String.valueOf(waitSeconds));
response.getWriter().write(
"{\"error\":\"Rate limit exceeded\"}"
);
return false;
}
}
}
4. 分散環境Rate Limiting
4.1 中央集権型(Redis)
+--------+ +--------+ +--------+
| Node 1 | | Node 2 | | Node 3 |
+---+----+ +---+----+ +---+----+
| | |
+---------+----+---------+----+
| |
+----+----+ +---+---+
| Redis | | Redis |
| Primary | | Replica|
+---------+ +-------+
利点: すべてのノードで一貫したカウント 欠点: Redisへのネットワーク遅延追加、RedisがSPOFになる可能性
4.2 ローカルカウンター + 同期化
+--------+ +--------+ +--------+
| Node 1 | | Node 2 | | Node 3 |
| Local: | | Local: | | Local: |
| 30/100 | | 25/100 | | 20/100 |
+---+----+ +---+----+ +---+----+
| | |
+----Periodic Sync (every 5s)-----+
|
+----+----+
| Redis |
| Global: |
| 75/100 |
+---------+
import time
import threading
import redis
class DistributedRateLimiter:
def __init__(
self,
redis_client: redis.Redis,
key_prefix: str,
global_limit: int,
window_seconds: int,
sync_interval: float = 5.0,
local_threshold: float = 0.1,
):
self.redis = redis_client
self.key_prefix = key_prefix
self.global_limit = global_limit
self.window_seconds = window_seconds
self.sync_interval = sync_interval
# ローカル許容量 = 全体制限の10%
self.local_limit = int(global_limit * local_threshold)
self.local_count = 0
self.lock = threading.Lock()
# 定期同期スレッド開始
self._start_sync_thread()
def _start_sync_thread(self):
def sync_loop():
while True:
time.sleep(self.sync_interval)
self._sync_to_redis()
thread = threading.Thread(target=sync_loop, daemon=True)
thread.start()
def _sync_to_redis(self):
with self.lock:
if self.local_count > 0:
key = f"{self.key_prefix}:global"
self.redis.incrby(key, self.local_count)
self.redis.expire(key, self.window_seconds + 10)
self.local_count = 0
def allow(self, key: str) -> bool:
with self.lock:
# ローカルカウンターチェック
if self.local_count >= self.local_limit:
self._sync_to_redis()
# グローバルカウンターチェック
global_key = f"{self.key_prefix}:global"
global_count = int(self.redis.get(global_key) or 0)
if global_count >= self.global_limit:
return False
self.local_count += 1
return True
5. LLM APIコスト制御
5.1 OpenAI/Anthropic API Rate Limitsの理解
OpenAI(GPT-4o基準):
- RPM (Requests Per Minute): Tierにより異なる
- TPM (Tokens Per Minute): 入力+出力トークン合算
- RPD (Requests Per Day): 日次制限
Anthropic(Claude基準):
- RPM: Tierにより異なる
- Input TPM / Output TPM: 別々に管理
5.2 ユーザー別、モデル別Rate Limiting
import redis
import json
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
rpm: int # Requests per minute
tpm: int # Tokens per minute
rpd: int # Requests per day
daily_budget: float # USD
# ユーザーTier別設定
TIER_LIMITS = {
"free": RateLimitConfig(rpm=10, tpm=10000, rpd=100, daily_budget=1.0),
"basic": RateLimitConfig(rpm=50, tpm=50000, rpd=1000, daily_budget=10.0),
"premium": RateLimitConfig(rpm=200, tpm=200000, rpd=10000, daily_budget=100.0),
"enterprise": RateLimitConfig(rpm=1000, tpm=1000000, rpd=100000, daily_budget=1000.0),
}
class LLMRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def check_rate_limit(
self,
user_id: str,
tier: str,
model: str,
estimated_tokens: int,
) -> dict:
limits = TIER_LIMITS.get(tier, TIER_LIMITS["free"])
now_minute = int(time.time() / 60)
now_day = int(time.time() / 86400)
pipe = self.redis.pipeline()
# RPMチェック
rpm_key = f"rl:rpm:{user_id}:{now_minute}"
pipe.incr(rpm_key)
pipe.expire(rpm_key, 120)
# TPMチェック
tpm_key = f"rl:tpm:{user_id}:{now_minute}"
pipe.incrby(tpm_key, estimated_tokens)
pipe.expire(tpm_key, 120)
# RPDチェック
rpd_key = f"rl:rpd:{user_id}:{now_day}"
pipe.incr(rpd_key)
pipe.expire(rpd_key, 172800)
results = pipe.execute()
current_rpm = results[0]
current_tpm = results[2]
current_rpd = results[4]
if current_rpm > limits.rpm:
return {
"allowed": False,
"reason": "RPM limit exceeded",
"limit": limits.rpm,
"current": current_rpm,
"retry_after": 60,
}
if current_tpm > limits.tpm:
return {
"allowed": False,
"reason": "TPM limit exceeded",
"limit": limits.tpm,
"current": current_tpm,
"retry_after": 60,
}
if current_rpd > limits.rpd:
return {
"allowed": False,
"reason": "Daily request limit exceeded",
"limit": limits.rpd,
"current": current_rpd,
"retry_after": 3600,
}
return {"allowed": True}
5.3 トークンカウントとコスト予測
import tiktoken
# モデル別価格(USD per 1K tokens、参考用)
MODEL_PRICING = {
"gpt-4o": {"input": 0.0025, "output": 0.01},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-sonnet-4-20250514": {"input": 0.003, "output": 0.015},
"claude-haiku-35": {"input": 0.0008, "output": 0.004},
}
def estimate_cost(
model: str,
input_text: str,
estimated_output_tokens: int = 500,
) -> dict:
try:
enc = tiktoken.encoding_for_model(model)
input_tokens = len(enc.encode(input_text))
except KeyError:
input_tokens = len(input_text) // 4
pricing = MODEL_PRICING.get(model, {"input": 0.01, "output": 0.03})
input_cost = (input_tokens / 1000) * pricing["input"]
output_cost = (estimated_output_tokens / 1000) * pricing["output"]
return {
"input_tokens": input_tokens,
"estimated_output_tokens": estimated_output_tokens,
"estimated_cost_usd": round(input_cost + output_cost, 6),
}
5.4 Budget Enforcement
class BudgetEnforcer:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def check_budget(
self,
user_id: str,
estimated_cost: float,
daily_limit: float,
monthly_limit: float,
) -> dict:
today = time.strftime("%Y-%m-%d")
month = time.strftime("%Y-%m")
daily_key = f"budget:daily:{user_id}:{today}"
monthly_key = f"budget:monthly:{user_id}:{month}"
daily_spent = float(self.redis.get(daily_key) or 0)
monthly_spent = float(self.redis.get(monthly_key) or 0)
if daily_spent + estimated_cost > daily_limit:
return {
"allowed": False,
"reason": "Daily budget exceeded",
"spent": daily_spent,
"limit": daily_limit,
}
if monthly_spent + estimated_cost > monthly_limit:
return {
"allowed": False,
"reason": "Monthly budget exceeded",
"spent": monthly_spent,
"limit": monthly_limit,
}
return {"allowed": True}
def record_spend(self, user_id: str, cost: float):
today = time.strftime("%Y-%m-%d")
month = time.strftime("%Y-%m")
pipe = self.redis.pipeline()
pipe.incrbyfloat(f"budget:daily:{user_id}:{today}", cost)
pipe.expire(f"budget:daily:{user_id}:{today}", 172800)
pipe.incrbyfloat(f"budget:monthly:{user_id}:{month}", cost)
pipe.expire(f"budget:monthly:{user_id}:{month}", 2764800)
pipe.execute()
5.5 Queue + Rate Limiter組み合わせパターン
+--------+ +------------+ +---------------+ +--------+
| Client +--->| API Gateway|+-->| Rate Limiter +--->| Queue |
+--------+ | (check | | (Token Bucket | | (Redis/|
| budget) | | + Budget) | | RabbitMQ)
+------------+ +-------+-------+ +---+----+
|
+----------------+ |
| Worker Pool |<------+
| (rate-aware |
| LLM caller) |
+-------+--------+
|
+-------v--------+
| LLM Provider |
| (OpenAI, etc.) |
+----------------+
6. HTTPレスポンスヘッダー
6.1 標準レスポンス
HTTP/1.1 429 Too Many Requests
Content-Type: application/json
Retry-After: 30
RateLimit-Limit: 100
RateLimit-Remaining: 0
RateLimit-Reset: 1679900400
{
"error": {
"code": "rate_limit_exceeded",
"message": "You have exceeded the rate limit. Please retry after 30 seconds.",
"type": "rate_limit_error"
}
}
6.2 レスポンスヘッダー実装
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
app = FastAPI()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
api_key = request.headers.get("X-API-Key", request.client.host)
result = rate_limiter.check(api_key)
if not result["allowed"]:
return JSONResponse(
status_code=429,
content={
"error": {
"code": "rate_limit_exceeded",
"message": result["reason"],
}
},
headers={
"Retry-After": str(result.get("retry_after", 60)),
"RateLimit-Limit": str(result["limit"]),
"RateLimit-Remaining": "0",
"RateLimit-Reset": str(result.get("reset_at", "")),
}
)
response = await call_next(request)
# 成功レスポンスにもRate Limit情報を含める
response.headers["RateLimit-Limit"] = str(result.get("limit", ""))
response.headers["RateLimit-Remaining"] = str(result.get("remaining", ""))
response.headers["RateLimit-Reset"] = str(result.get("reset_at", ""))
return response
7. モニタリング
7.1 Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge
# Rate Limitメトリクス
rate_limit_total = Counter(
'rate_limit_requests_total',
'Total rate limited requests',
['user_tier', 'endpoint', 'result']
)
rate_limit_remaining = Gauge(
'rate_limit_remaining',
'Remaining rate limit tokens',
['user_id', 'limit_type']
)
request_cost = Histogram(
'llm_request_cost_usd',
'Cost of LLM requests in USD',
['model', 'user_tier'],
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
budget_utilization = Gauge(
'budget_utilization_ratio',
'Budget utilization ratio (0-1)',
['user_id', 'period']
)
8. ベストプラクティスとアンチパターン
8.1 ベストプラクティス
- 階層的Rate Limiting: グローバル、サービス別、ユーザー別の多重レイヤー適用
- グレースフルデグラデーション: Rate Limit超過時にキャッシュされた応答や簡略化された応答を提供
- ヘッダー公開: クライアントが残りの割り当てを確認できるようRateLimitヘッダーを提供
- 適応的制限: サーバー負荷に応じてRate Limitを動的に調整
- モニタリングとアラート: Rate Limit拒否率がしきい値を超えたらアラート
- ドキュメント化: APIドキュメントにRate Limitポリシーを明確に記述
8.2 アンチパターン
1. クライアントIPのみでRate Limiting
問題: NAT/プロキシ背後の多数のユーザーが1つのIPを共有
解決: API Key + IP組み合わせ、または認証ベースRate Limiting
2. 単一レイヤーRate Limiting
問題: グローバル制限のみでは正常ユーザーも影響を受ける
解決: ユーザー別、エンドポイント別、Tier別多重レイヤー
3. ハードコードされたRate Limit値
問題: 変更時にデプロイが必要
解決: 設定サーバーや環境変数で動的管理
4. 内部APIにRate Limitなし
問題: 内部サービス間の障害伝播
解決: 内部APIにもRate Limiting適用(サーキットブレーカーと併用)
5. リトライロジックなしのクライアント
問題: 429レスポンス時に即座に失敗
解決: Retry-Afterヘッダーを尊重するリトライロジック実装
9. まとめ
Rate Limitingは単純な防御技法ではなく、サービスの安定性、公正性、コスト効率性を保証する核心インフラコンポーネントです。
主要ポイント:
- アルゴリズム選択: Sliding Window Counterが実務で最も汎用的
- 実装ツール: Redis + Luaが分散環境で最も信頼性が高い
- LLMコスト制御: RPM/TPM Rate Limiting + Budget Enforcement組み合わせ必須
- 分散環境: 中央集権型(Redis)またはローカル+同期化方式を選択
- モニタリング: Rate Limit拒否率、コスト追跡、Budget消費率のモニタリング必須
適切なRate Limitingはサービスを保護するだけでなく、ユーザーに予測可能なサービス体験を提供します。
参考資料
- IETF RateLimit Headers Draft: https://datatracker.ietf.org/doc/draft-ietf-httpapi-ratelimit-headers/
- Redis Rate Limiting Patterns: https://redis.io/tutorials/howtos/ratelimiting/
- Nginx Rate Limiting: https://www.nginx.com/blog/rate-limiting-nginx/
- Kong Rate Limiting Plugin: https://docs.konghq.com/hub/kong-inc/rate-limiting/
- Bucket4j: https://github.com/bucket4j/bucket4j