Налаштування Webhook-системи з підписом повідомлень (HMAC)
HMAC (Hash-based Message Authentication Code) підпис у webhook гарантує, що повідомлення дійсно прийшло від очікуваного відправника та не було змінено в транзиті. Без перевірки підпису будь-який зловмисник може відправити підроблений webhook на ваш endpoint.
Принцип роботи HMAC
- Відправник і отримувач домовляються про секретний ключ
- При відправці: відправник обчислює
HMAC-SHA256(payload, secret)і додає в заголовок - При отриманні: отримувач обчислює те саме значення та порівнює з заголовком
- Якщо збігаються — повідомлення автентичне
Генерація підпису при відправці webhook
import hmac
import hashlib
import json
import requests
def send_webhook(url: str, payload: dict, secret: str):
body = json.dumps(payload, separators=(',', ':'))
timestamp = int(time.time())
# Підпис включає timestamp для захисту від replay атак
message = f"{timestamp}.{body}"
signature = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
response = requests.post(
url,
data=body,
headers={
'Content-Type': 'application/json',
'X-Webhook-Timestamp': str(timestamp),
'X-Webhook-Signature': f"sha256={signature}",
'X-Webhook-ID': str(uuid.uuid4()),
},
timeout=10
)
return response
Верифікація підпису на стороні отримувача
import hmac
import hashlib
import time
def verify_webhook_signature(request) -> bool:
secret = os.environ['WEBHOOK_SECRET']
# Витягти з заголовків
timestamp = request.headers.get('X-Webhook-Timestamp')
received_sig = request.headers.get('X-Webhook-Signature', '')
if not timestamp or not received_sig:
return False
# Захист від replay атаки: не приймати події старші за 5 хвилин
if abs(time.time() - int(timestamp)) > 300:
return False
# Обчислити очікуваний підпис
body = request.get_data() # raw bytes, перед парсингом!
message = f"{timestamp}.{body.decode()}".encode()
expected_sig = "sha256=" + hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
# Constant-time порівняння для захисту від timing атак
return hmac.compare_digest(expected_sig, received_sig)
@app.route('/webhooks/payments', methods=['POST'])
def payment_webhook():
if not verify_webhook_signature(request):
return jsonify({'error': 'Invalid signature'}), 401
# Безпечно обробити payload
event = request.get_json()
process_payment_event(event)
return jsonify({'status': 'ok'})
Stripe-сумісний формат
Stripe використовує t=timestamp,v1=signature у заголовку Stripe-Signature:
def verify_stripe_webhook(payload, sig_header, secret):
# Stripe формат: "t=1614556800,v1=abcdef..."
elements = dict(e.split('=') for e in sig_header.split(','))
timestamp = elements.get('t')
sig = elements.get('v1')
signed_payload = f"{timestamp}.{payload}"
expected = hmac.new(secret.encode(), signed_payload.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig)
Логіка повтору та Idempotency
class WebhookDelivery:
MAX_ATTEMPTS = 5
RETRY_DELAYS = [10, 30, 120, 600, 3600] # секунди між спробами
def deliver_with_retry(self, webhook_id: str, url: str, payload: dict, secret: str):
for attempt, delay in enumerate(self.RETRY_DELAYS):
try:
response = send_webhook(url, payload, secret)
if response.status_code < 300:
db.mark_delivered(webhook_id)
return True
db.log_attempt(webhook_id, attempt + 1, response.status_code)
except requests.exceptions.Timeout:
db.log_attempt(webhook_id, attempt + 1, error='timeout')
if attempt < len(self.RETRY_DELAYS) - 1:
time.sleep(delay)
db.mark_failed(webhook_id)
return False
Idempotency на стороні отримувача
def handle_webhook_idempotent(webhook_id: str, handler_fn):
"""Запобігти подвійній обробці при повторі"""
if db.is_processed(webhook_id):
return # Вже оброблено
with db.transaction():
db.mark_processing(webhook_id)
handler_fn()
db.mark_processed(webhook_id)







