Інтелектуальне Rate Limiting для API
Базовий rate limit по IP — X запитів на хвилину без розбору. Інтелектуальний rate limiting враховує ідентифікатор користувача, тип endpoint, історичну поведінку та автоматично адаптує ліміти. Правильно налаштований rate limiting не заважає легітимним користувачам, але надійно блокує скрейперів і DDoS.
Алгоритми та їх застосування
Token Bucket — класика для API. Кожен користувач має відро токенів, поповнюване з фіксованою швидкістю. Дозволяє короткочасні сплески.
Sliding Window — точніше за Fixed Window. Лічить запити за останні N секунд відносно поточного моменту, не дозволяючи подвоєння ліміту на межі вікна.
Adaptive Rate Limiting — ліміти змінюються динамічно на основі навантаження сервера або оцінки ризику клієнта.
Redis-реалізація Sliding Window
import redis
import time
from functools import wraps
r = redis.Redis(host='localhost', decode_responses=True)
def sliding_window_rate_limit(key: str, limit: int, window: int) -> bool:
"""
key: унікальний ідентифікатор (user_id, ip, api_key)
limit: макс. запитів за window секунд
window: розмір вікна в секундах
Повертає True якщо запит дозволено
"""
now = time.time()
window_start = now - window
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, window_start) # видалити старі записи
pipe.zadd(key, {str(now): now}) # додати поточний запит
pipe.zcard(key) # підрахувати в вікні
pipe.expire(key, window) # TTL для cleanup
results = pipe.execute()
count = results[2]
return count <= limit
def rate_limit(limit=100, window=60, key_func=None):
"""Декоратор для Flask/FastAPI"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if key_func:
key = f"rl:{key_func()}"
else:
key = f"rl:{request.remote_addr}"
if not sliding_window_rate_limit(key, limit, window):
# Повернути Retry-After
return jsonify({'error': 'Too Many Requests'}), 429, {
'Retry-After': str(window),
'X-RateLimit-Limit': str(limit),
'X-RateLimit-Remaining': '0'
}
return f(*args, **kwargs)
return wrapper
return decorator
Багаторівневі ліміти по Endpoint
# Різні ліміти для різних операцій
RATE_LIMITS = {
'default': {'limit': 1000, 'window': 3600}, # 1000/година
'auth.login': {'limit': 10, 'window': 900}, # 10 спроб за 15 хв
'auth.register': {'limit': 5, 'window': 3600}, # 5/година
'api.search': {'limit': 100, 'window': 60}, # 100/хв
'api.export': {'limit': 10, 'window': 3600}, # 10 експортів/година
'api.upload': {'limit': 50, 'window': 3600}, # 50 завантажень/година
'webhooks.send': {'limit': 500, 'window': 60}, # 500/хв
}
class MultiLevelRateLimiter:
def check(self, user_id: int, endpoint: str, ip: str) -> dict:
config = RATE_LIMITS.get(endpoint, RATE_LIMITS['default'])
# Рівень 1: по користувачу (автентифіковані)
if user_id:
user_key = f"rl:user:{user_id}:{endpoint}"
if not sliding_window_rate_limit(user_key, config['limit'], config['window']):
return {'allowed': False, 'reason': 'user_limit'}
# Рівень 2: по IP (захист від зловживання створенням акаунтів)
ip_key = f"rl:ip:{ip}:{endpoint}"
ip_limit = config['limit'] * 3 # IP-ліміт вищий за user-ліміт
if not sliding_window_rate_limit(ip_key, ip_limit, config['window']):
return {'allowed': False, 'reason': 'ip_limit'}
# Рівень 3: глобальний (захист від DDoS)
global_key = f"rl:global:{endpoint}"
global_limit = config['limit'] * 100
if not sliding_window_rate_limit(global_key, global_limit, config['window']):
return {'allowed': False, 'reason': 'global_limit'}
return {'allowed': True}
Адаптивний Rate Limit по ризику
class AdaptiveRateLimiter:
def get_risk_score(self, request) -> float:
"""Оцінити ризик запиту від 0.0 (низький) до 1.0 (високий)"""
score = 0.0
# Підозрілий User-Agent
ua = request.headers.get('User-Agent', '')
if not ua or 'python-requests' in ua.lower() or 'curl' in ua.lower():
score += 0.3
# Немає заголовків браузера
if not request.headers.get('Accept-Language'):
score += 0.2
# Недавня історія помилок (багато 404, 401)
error_count = r.get(f"errors:{request.remote_addr}") or 0
if int(error_count) > 10:
score += 0.3
# Запити з Tor/VPN IP (перевірка за списком)
if self.is_known_proxy(request.remote_addr):
score += 0.2
return min(score, 1.0)
def get_effective_limit(self, base_limit: int, risk_score: float) -> int:
"""Знизити ліміт для підозрілих клієнтів"""
multiplier = 1.0 - (risk_score * 0.8) # до 80% зниження
return max(int(base_limit * multiplier), 1)
Заголовки відповіді
RFC 6585 та стандарти API вимагають інформативних заголовків:
def add_rate_limit_headers(response, key, limit, window):
now = time.time()
current_count = r.zcard(key) or 0
remaining = max(0, limit - current_count)
# Час скидання = початок наступного вікна
reset_at = int(now) + window - (int(now) % window)







