Система виявлення платіжного шахрайства
Платіжне шахрайство — одна з найдорожчих проблем для e-commerce: chargeback'и, штрафи платіжних систем, блокування мерчант-облікового запису. Автоматична система оцінки ризику перед кожною транзакцією знижує шахрайство без погіршення конверсії для чесних покупців.
Типи шахрайства та атрибути
Card Testing — перевірка вкрадених карток невеликими транзакціями.
Ознаки: багато спроб з одного IP/пристрою, суми $0,01–$1, різні номери карт, короткі інтервали.
Account Takeover (ATO) — компрометація облікового запису та крадіжка збережених карт.
Ознаки: зміна адреси для виставлення рахунку перед покупкою, вхід з нового пристрою, негайна велика покупка.
Friendly Fraud — покупець замовляє товар, потім подає chargeback.
Ознаки: історія chargeback, VPN/проксі, доставка до форвардера вантажу.
Модель Risk Score
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class PaymentContext:
user_id: Optional[int]
email: str
ip: str
card_bin: str # перші 6 цифр
card_last4: str
amount: float
currency: str
billing_country: str
shipping_country: Optional[str]
device_fingerprint: str
user_agent: str
session_age_seconds: int
class FraudScorer:
def __init__(self, redis, db, geoip, maxmind):
self.r = redis
self.db = db
self.geoip = geoip
self.maxmind = maxmind # MaxMind minFraud
def score(self, ctx: PaymentContext) -> dict:
signals = []
total_score = 0
# === Перевірки швидкості ===
v = self._velocity_checks(ctx)
signals.extend(v['signals'])
total_score += v['score']
# === Перевірки геолокації ===
g = self._geo_checks(ctx)
signals.extend(g['signals'])
total_score += g['score']
# === Перевірки карти ===
c = self._card_checks(ctx)
signals.extend(c['signals'])
total_score += c['score']
# === Перевірки облікового запису ===
if ctx.user_id:
a = self._account_checks(ctx)
signals.extend(a['signals'])
total_score += a['score']
# === Перевірки пристрою ===
d = self._device_checks(ctx)
signals.extend(d['signals'])
total_score += d['score']
final_score = min(total_score, 100)
return {
'score': final_score,
'signals': signals,
'decision': self._make_decision(final_score, ctx),
'timestamp': time.time()
}
def _velocity_checks(self, ctx: PaymentContext) -> dict:
score = 0
signals = []
# Кількість спроб оплати з IP за 1 годину
ip_key = f"payment_attempts:ip:{ctx.ip}"
ip_count = self.r.incr(ip_key)
self.r.expire(ip_key, 3600)
if ip_count > 20:
score += 40
signals.append('ip_velocity_critical')
elif ip_count > 10:
score += 20
signals.append('ip_velocity_high')
# Кількість унікальних карт, спроб з IP за 24 години
cards_key = f"cards_tried:ip:{ctx.ip}"
self.r.sadd(cards_key, ctx.card_last4)
self.r.expire(cards_key, 86400)
card_count = self.r.scard(cards_key)
if card_count > 3:
score += 35
signals.append(f'multiple_cards_from_ip:{card_count}')
# Невдалі спроби оплати за 1 годину
failures_key = f"payment_failures:ip:{ctx.ip}"
failures = int(self.r.get(failures_key) or 0)
if failures > 5:
score += 30
signals.append(f'payment_failures:{failures}')
return {'score': score, 'signals': signals}







