Розробка системи алертів для трейдерів (ціна, обсяг, ліквідація)
Система алертів — це вуха трейдера на ринку. Без неї потрібно постійно дивитися на екран; з нею — можна жити нормально та реагувати лише на значимі события. Гарна система алертів покриває три категорії: ціновні события, аномалії обсягу та дані ліквідації.
Типи алертів
Ціновні алерти:
- Ціна досягла X (above/below)
- Ціна змінилася на N% за період
- Ціна перетнула ковзну середню
- Новий ATH / ATL за період
Об'ємні алерти:
- Обсяг за свічку перевищив N × середній
- Крупна угода (whale) > $X в одній трансакції
- Різкий ріст open interest
Ліквідаційні алерти:
- Крупна ліквідація (> $1M за 1 хвилину)
- Кумулятивні ліквідації за період
- Liquidation heatmap — ціни з великою кількістю очікуваних ліквідацій
Архітектура
class AlertRule(BaseModel):
id: str
user_id: str
type: str # 'price_above', 'price_below', 'volume_spike', 'liquidation'
symbol: str
exchange: str
# Параметри залежно від типу
price_threshold: Optional[Decimal]
volume_multiplier: Optional[float] # N × avg volume
liquidation_usd: Optional[float]
# Доставка
channels: list[str] # ['telegram', 'email', 'push', 'webhook']
webhook_url: Optional[str]
# Поведінка
one_time: bool = True # деактивувати після спрацювання
cooldown_minutes: int = 60 # мінімум між повторними спрацюваннями
last_triggered: Optional[datetime] = None
is_active: bool = True
Alert Engine
class AlertEngine:
def __init__(self, rule_repo, notifier):
self.rules = {} # symbol → list[AlertRule]
self.rule_repo = rule_repo
self.notifier = notifier
async def on_ticker_update(self, ticker: NormalizedTicker):
rules = self.rules.get(f"{ticker.exchange}:{ticker.symbol}", [])
for rule in rules:
if not rule.is_active:
continue
if self.is_in_cooldown(rule):
continue
if await self.evaluate_rule(rule, ticker):
await self.trigger_alert(rule, ticker)
async def evaluate_rule(self, rule: AlertRule, ticker: NormalizedTicker) -> bool:
if rule.type == 'price_above':
return ticker.last >= rule.price_threshold
elif rule.type == 'price_below':
return ticker.last <= rule.price_threshold
elif rule.type == 'price_change_pct':
change = await self.compute_price_change(rule.symbol, rule.period_minutes)
return abs(change) >= rule.change_pct_threshold
return False
async def trigger_alert(self, rule: AlertRule, ticker: NormalizedTicker):
message = self.format_alert_message(rule, ticker)
# Доставляємо по всіх каналах
for channel in rule.channels:
await self.notifier.send(channel, rule.user_id, message)
# Оновлюємо стан правила
rule.last_triggered = datetime.utcnow()
if rule.one_time:
rule.is_active = False
await self.rule_repo.save(rule)
def is_in_cooldown(self, rule: AlertRule) -> bool:
if not rule.last_triggered:
return False
elapsed = (datetime.utcnow() - rule.last_triggered).total_seconds() / 60
return elapsed < rule.cooldown_minutes
Детектор об'ємної аномалії
class VolumeAnomalyDetector:
WINDOW_PERIODS = 20 # свічок для розрахунку середнього
async def check_volume_spike(self, symbol: str, current_volume: Decimal) -> float:
"""Повертає множник відносно середнього обсягу"""
recent_volumes = await self.candle_repo.get_recent_volumes(
symbol, count=self.WINDOW_PERIODS
)
if len(recent_volumes) < 5:
return 1.0
avg_volume = sum(recent_volumes) / len(recent_volumes)
if avg_volume == 0:
return 1.0
return float(current_volume / avg_volume)
Ліквідаційні алерти
Дані про ліквідації отримуємо з бірж (Binance forceOrder stream, Bybit liquidation) або агрегаторів (Coinalyze, CoinGlass API):
class LiquidationMonitor:
async def monitor_binance_liquidations(self):
async with websockets.connect("wss://fstream.binance.com/ws/!forceOrder@arr") as ws:
async for message in ws:
data = json.loads(message)
order = data["o"]
liquidation = Liquidation(
symbol=order["s"],
side=order["S"],
quantity=Decimal(order["q"]),
price=Decimal(order["p"]),
usd_value=Decimal(order["q"]) * Decimal(order["p"]),
timestamp=data["T"],
)
await self.process_liquidation(liquidation)
async def process_liquidation(self, liq: Liquidation):
# Оновлюємо rolling 1-minute total
await self.redis.incrbyfloat(
f"liq_total:{liq.symbol}:1m",
float(liq.usd_value)
)
await self.redis.expire(f"liq_total:{liq.symbol}:1m", 60)
# Перевіряємо алерти по крупним одиничним ліквідаціям
if liq.usd_value >= 1_000_000: # > $1M
await self.alert_engine.fire_liquidation_alert(liq)
Webhook доставка
class WebhookDelivery:
async def send(self, webhook_url: str, alert: AlertMessage):
payload = {
"type": alert.type,
"symbol": alert.symbol,
"message": alert.text,
"timestamp": alert.timestamp.isoformat(),
"data": alert.raw_data,
}
# Підписуємо payload для верифікації на стороні отримувача
signature = hmac.new(
alert.rule.webhook_secret.encode(),
json.dumps(payload).encode(),
hashlib.sha256
).hexdigest()
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json=payload,
headers={"X-Alert-Signature": f"sha256={signature}"},
timeout=10.0,
)
Webhook-алерти дозволяють інтегрувати систему з зовнішніми ботами, торговельними системами, CRM. По webhook можна триггерити автоматичні дії — наприклад, відкриття ордера при цьовому алерті.







