Настройка Webhook-системы с подписью сообщений (HMAC)
HMAC (Hash-based Message Authentication Code) подпись в webhook гарантирует, что сообщение действительно пришло от ожидаемого отправителя и не было изменено в transit. Без проверки подписи любой злоумышленник может отправить фейковый webhook на ваш endpoint.
Принцип работы HMAC
- Отправитель и получатель договариваются о секретном ключе
- При отправке: отправитель вычисляет
HMAC-SHA256(payload, secret)и добавляет в заголовок - При получении: получатель вычисляет то же значение и сравнивает с заголовком
- Если совпадает — сообщение подлинное
Генерация подписи при отправке webhook
import hmac
import hashlib
import json
import requests
def send_webhook(url: str, payload: dict, secret: str):
body = json.dumps(payload, separators=(',', ':'))
timestamp = int(time.time())
# Подпись включает timestamp для защиты от replay attacks
message = f"{timestamp}.{body}"
signature = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
response = requests.post(
url,
data=body,
headers={
'Content-Type': 'application/json',
'X-Webhook-Timestamp': str(timestamp),
'X-Webhook-Signature': f"sha256={signature}",
'X-Webhook-ID': str(uuid.uuid4()),
},
timeout=10
)
return response
Верификация подписи на стороне получателя
import hmac
import hashlib
import time
def verify_webhook_signature(request) -> bool:
secret = os.environ['WEBHOOK_SECRET']
# Извлечь из заголовков
timestamp = request.headers.get('X-Webhook-Timestamp')
received_sig = request.headers.get('X-Webhook-Signature', '')
if not timestamp or not received_sig:
return False
# Защита от replay attack: не принимать события старше 5 минут
if abs(time.time() - int(timestamp)) > 300:
return False
# Вычислить ожидаемую подпись
body = request.get_data() # raw bytes, до парсинга!
message = f"{timestamp}.{body.decode()}".encode()
expected_sig = "sha256=" + hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
# Constant-time comparison для защиты от timing attack
return hmac.compare_digest(expected_sig, received_sig)
@app.route('/webhooks/payments', methods=['POST'])
def payment_webhook():
if not verify_webhook_signature(request):
return jsonify({'error': 'Invalid signature'}), 401
# Безопасно обрабатывать payload
event = request.get_json()
process_payment_event(event)
return jsonify({'status': 'ok'})
Stripe-совместимый формат
Stripe использует t=timestamp,v1=signature в заголовке Stripe-Signature:
def verify_stripe_webhook(payload, sig_header, secret):
# Stripe format: "t=1614556800,v1=abcdef..."
elements = dict(e.split('=') for e in sig_header.split(','))
timestamp = elements.get('t')
sig = elements.get('v1')
signed_payload = f"{timestamp}.{payload}"
expected = hmac.new(secret.encode(), signed_payload.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig)
Retry логика и idempotency
class WebhookDelivery:
MAX_ATTEMPTS = 5
RETRY_DELAYS = [10, 30, 120, 600, 3600] # секунды между попытками
def deliver_with_retry(self, webhook_id: str, url: str, payload: dict, secret: str):
for attempt, delay in enumerate(self.RETRY_DELAYS):
try:
response = send_webhook(url, payload, secret)
if response.status_code < 300:
db.mark_delivered(webhook_id)
return True
db.log_attempt(webhook_id, attempt + 1, response.status_code)
except requests.exceptions.Timeout:
db.log_attempt(webhook_id, attempt + 1, error='timeout')
if attempt < len(self.RETRY_DELAYS) - 1:
time.sleep(delay)
db.mark_failed(webhook_id)
return False
Идемпотентность на стороне получателя
def handle_webhook_idempotent(webhook_id: str, handler_fn):
"""Предотвратить двойную обработку при retry"""
if db.is_processed(webhook_id):
return # Уже обработано
with db.transaction():
db.mark_processing(webhook_id)
handler_fn()
db.mark_processed(webhook_id)
Срок выполнения
Реализация HMAC-подписи для webhook системы с retry и idempotency — 1–2 рабочих дня.







