Настройка real-time парсинга через WebSocket
Polling REST API раз в N секунд — неправильный инструмент для задач, требующих реакции на события. При polling с интервалом 1 секунда средняя задержка обнаружения события — 0.5 секунды. WebSocket-подписка даёт событие в момент его возникновения, задержка определяется только сетью (10–50мс до ближайшего сервера биржи). Для мониторинга цен, orderbook и on-chain событий разница принципиальна.
WebSocket протоколы бирж
Каждая биржа имеет свой протокол подписки. Паттерны схожи, детали различаются.
Binance: subscribe через JSON сообщение, stream names в формате symbol@streamType:
import asyncio
import json
import websockets
async def binance_stream(symbols: list[str]):
streams = '/'.join([f"{s.lower()}@trade" for s in symbols])
url = f"wss://stream.binance.com:9443/stream?streams={streams}"
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
async for message in ws:
data = json.loads(message)
stream_data = data.get('data', data)
yield {
'exchange': 'binance',
'symbol': stream_data['s'],
'price': float(stream_data['p']),
'amount': float(stream_data['q']),
'timestamp': stream_data['T'],
'is_buyer_maker': stream_data['m'],
}
Coinbase Advanced Trade: использует subscribe message с channel и product_ids:
subscribe_msg = {
"type": "subscribe",
"channel": "ticker",
"product_ids": ["BTC-USD", "ETH-USD"],
}
Kraken: требует генерацию subscription ID и имеет особенности формата ответа с парой в массиве.
Ethereum/EVM: web3.py и ethers.js WebSocket
On-chain события через WebSocket subscriptions к Ethereum ноде (Alchemy, Infura, QuickNode или собственная нода):
from web3 import AsyncWeb3, WebSocketProvider
async def subscribe_to_transfers(token_address: str):
w3 = AsyncWeb3(WebSocketProvider(
"wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
))
# ERC-20 Transfer event signature hash
transfer_sig = w3.keccak(text="Transfer(address,address,uint256)").hex()
subscription_id = await w3.eth.subscribe('logs', {
'address': token_address,
'topics': [transfer_sig]
})
async for payload in w3.socket.process_subscriptions():
if payload['subscription'] == subscription_id:
log = payload['result']
yield decode_transfer_log(log)
Ethereum JSON-RPC WebSocket поддерживает три типа подписок: newHeads (новые блоки), logs (события контрактов), newPendingTransactions (mempool транзакции).
Управление соединениями: reconnect и heartbeat
WebSocket соединения разрываются по разным причинам: timeout сервера, network hiccup, перезапуск сервиса биржи. Production система должна автоматически восстанавливаться:
import asyncio
import websockets
from datetime import datetime
class RobustWebSocketClient:
def __init__(self, url: str, reconnect_delay: float = 1.0):
self.url = url
self.reconnect_delay = reconnect_delay
self.max_reconnect_delay = 60.0
self.last_message_at = None
self.stale_threshold = 30 # секунд без сообщений = staleness
async def connect_with_retry(self, on_message, on_subscribe):
delay = self.reconnect_delay
while True:
try:
async with websockets.connect(
self.url,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
) as ws:
await on_subscribe(ws)
delay = self.reconnect_delay # сбрасываем при успехе
async for msg in ws:
self.last_message_at = datetime.utcnow()
await on_message(msg)
except (websockets.ConnectionClosed,
websockets.InvalidHandshake,
OSError) as e:
print(f"Connection error: {e}, reconnecting in {delay}s")
await asyncio.sleep(delay)
delay = min(delay * 2, self.max_reconnect_delay)
async def staleness_watchdog(self):
"""Детектирует зависшее соединение без явного разрыва"""
while True:
await asyncio.sleep(10)
if self.last_message_at:
elapsed = (datetime.utcnow() - self.last_message_at).seconds
if elapsed > self.stale_threshold:
raise RuntimeError(f"Connection stale: {elapsed}s without data")
Order book: incremental updates vs снапшоты
Большинство бирж отдают orderbook через incremental updates — только изменившиеся уровни. Локальное поддержание актуального состояния orderbook:
from sortedcontainers import SortedDict
class LocalOrderBook:
def __init__(self):
self.bids = SortedDict(lambda k: -k) # descending
self.asks = SortedDict() # ascending
self.last_update_id = 0
def apply_snapshot(self, snapshot: dict):
self.bids.clear()
self.asks.clear()
for price, qty in snapshot['bids']:
self.bids[float(price)] = float(qty)
for price, qty in snapshot['asks']:
self.asks[float(price)] = float(qty)
self.last_update_id = snapshot['lastUpdateId']
def apply_update(self, update: dict):
if update['u'] <= self.last_update_id:
return # устаревший update, игнорируем
for price, qty in update['b']: # bids
p, q = float(price), float(qty)
if q == 0:
self.bids.pop(p, None)
else:
self.bids[p] = q
for price, qty in update['a']: # asks
p, q = float(price), float(qty)
if q == 0:
self.asks.pop(p, None)
else:
self.asks[p] = q
self.last_update_id = update['u']
def best_bid(self) -> tuple[float, float]:
k = next(iter(self.bids))
return k, self.bids[k]
def best_ask(self) -> tuple[float, float]:
k = next(iter(self.asks))
return k, self.asks[k]
Важно: при старте нужно получить снапшот через REST, затем применять WebSocket updates начиная с lastUpdateId > snapshotId. Обновления до снапшота отбрасываются, пропуск в последовательности U → u требует повторного снапшота.
Масштабирование: множество пар и бирж
Одна async event loop в Python справляется с 50–200 одновременными WebSocket соединениями. Для большего числа — несколько процессов или Go-сервис (goroutines значительно легче asyncio tasks).
Fanout результатов: обработанные сообщения → Redis Pub/Sub или Kafka для downstream consumers. WebSocket handler должен минимально обрабатывать данные и быстро публиковать — тяжёлую обработку делает отдельный consumer.
async def handle_message(raw: str, redis_client):
data = json.loads(raw)
normalized = normalize_trade(data)
# Быстрая публикация — не блокируем event loop
await redis_client.publish(
f"trades:{normalized['exchange']}:{normalized['symbol']}",
json.dumps(normalized)
)
Мониторинг здоровья
Метрики для каждого WebSocket соединения: messages per second, reconnect count, last message timestamp, lag от биржевого timestamp до processing timestamp. Grafana + Prometheus alerting на stale connections (> 60 сек без сообщений по активной паре).
Настройка real-time парсинга для 3–5 бирж с мониторингом 20–50 пар, reconnect логикой и публикацией в Redis/Kafka — 1–2 недели.







