Настройка real-time парсингу через WebSocket
Polling REST API раз в N секунд — неправильний інструмент для задач, які вимагають реакції на подій. При polling з інтервалом 1 секунда середня затримка виявлення подій — 0.5 секунди. WebSocket-підписка дає подію в момент її виникнення, затримка визначається тільки мережею (10–50мс до найближчого сервера біржи). Для моніторингу цін, orderbook та on-chain подій різниця принципіальна.
WebSocket протоколи бірж
Кожна біржа має свій протокол підписки. Паттерни схожі, деталі різняться.
Binance: підписка через JSON повідомлення, stream names у форматі symbol@streamType:
import asyncio
import json
import websockets
async def binance_stream(symbols: list[str]):
streams = '/'.join([f"{s.lower()}@trade" for s in symbols])
url = f"wss://stream.binance.com:9443/stream?streams={streams}"
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
async for message in ws:
data = json.loads(message)
stream_data = data.get('data', data)
yield {
'exchange': 'binance',
'symbol': stream_data['s'],
'price': float(stream_data['p']),
'amount': float(stream_data['q']),
'timestamp': stream_data['T'],
'is_buyer_maker': stream_data['m'],
}
Coinbase Advanced Trade: використовує subscribe message з channel та product_ids:
subscribe_msg = {
"type": "subscribe",
"channel": "ticker",
"product_ids": ["BTC-USD", "ETH-USD"],
}
Kraken: вимагає генерацію subscription ID та має особливості формату відповіді з парою в масиву.
Ethereum/EVM: web3.py та ethers.js WebSocket
On-chain подій через WebSocket підписки до Ethereum ноди (Alchemy, Infura, QuickNode або власна нода):
from web3 import AsyncWeb3, WebSocketProvider
async def subscribe_to_transfers(token_address: str):
w3 = AsyncWeb3(WebSocketProvider(
"wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
))
# ERC-20 Transfer event signature hash
transfer_sig = w3.keccak(text="Transfer(address,address,uint256)").hex()
subscription_id = await w3.eth.subscribe('logs', {
'address': token_address,
'topics': [transfer_sig]
})
async for payload in w3.socket.process_subscriptions():
if payload['subscription'] == subscription_id:
log = payload['result']
yield decode_transfer_log(log)
Ethereum JSON-RPC WebSocket підтримує три типи підписок: newHeads (нові блоки), logs (подій контрактів), newPendingTransactions (mempool транзакції).
Управління з'єднаннями: reconnect та heartbeat
WebSocket з'єднання розриваються з різних причин: timeout сервера, network hiccup, перезапуск сервісу. Production система повинна автоматично відновлюватися:
import asyncio
import websockets
from datetime import datetime
class RobustWebSocketClient:
def __init__(self, url: str, reconnect_delay: float = 1.0):
self.url = url
self.reconnect_delay = reconnect_delay
self.max_reconnect_delay = 60.0
self.last_message_at = None
self.stale_threshold = 30 # секунд без повідомлень = staleness
async def connect_with_retry(self, on_message, on_subscribe):
delay = self.reconnect_delay
while True:
try:
async with websockets.connect(
self.url,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
) as ws:
await on_subscribe(ws)
delay = self.reconnect_delay # сбрасуємо при успіху
async for msg in ws:
self.last_message_at = datetime.utcnow()
await on_message(msg)
except (websockets.ConnectionClosed,
websockets.InvalidHandshake,
OSError) as e:
print(f"Connection error: {e}, reconnecting in {delay}s")
await asyncio.sleep(delay)
delay = min(delay * 2, self.max_reconnect_delay)
async def staleness_watchdog(self):
"""Детектує зависше з'єднання без явного розриву"""
while True:
await asyncio.sleep(10)
if self.last_message_at:
elapsed = (datetime.utcnow() - self.last_message_at).seconds
if elapsed > self.stale_threshold:
raise RuntimeError(f"Connection stale: {elapsed}s without data")
Order book: incremental updates vs снапшоти
Більшість бірж надають orderbook через incremental updates — тільки змінені рівні. Локальне утримання актуального стану orderbook:
from sortedcontainers import SortedDict
class LocalOrderBook:
def __init__(self):
self.bids = SortedDict(lambda k: -k) # descending
self.asks = SortedDict() # ascending
self.last_update_id = 0
def apply_snapshot(self, snapshot: dict):
self.bids.clear()
self.asks.clear()
for price, qty in snapshot['bids']:
self.bids[float(price)] = float(qty)
for price, qty in snapshot['asks']:
self.asks[float(price)] = float(qty)
self.last_update_id = snapshot['lastUpdateId']
def apply_update(self, update: dict):
if update['u'] <= self.last_update_id:
return # застарілий update, ігноруємо
for price, qty in update['b']: # bids
p, q = float(price), float(qty)
if q == 0:
self.bids.pop(p, None)
else:
self.bids[p] = q
for price, qty in update['a']: # asks
p, q = float(price), float(qty)
if q == 0:
self.asks.pop(p, None)
else:
self.asks[p] = q
self.last_update_id = update['u']
def best_bid(self) -> tuple[float, float]:
k = next(iter(self.bids))
return k, self.bids[k]
def best_ask(self) -> tuple[float, float]:
k = next(iter(self.asks))
return k, self.asks[k]
Важливо: при старті отримати снапшот через REST, потім застосовувати WebSocket updates починаючи з lastUpdateId > snapshotId. Обновлення до снапшота відбрасуються, пропуск у послідовності U → u вимагає повторного снапшота.
Масштабування: множество пар та бірж
Одна async event loop в Python справляється з 50–200 одночасними WebSocket з'єднаннями. Для більшого — кілька процесів або Go-сервіс (goroutines значительно легші за asyncio tasks).
Fanout результатів: оброблені повідомлення → Redis Pub/Sub або Kafka для downstream consumers. WebSocket handler повинен мінімально обробляти дані та швидко публікувати — важку обробку робить окремий consumer.
async def handle_message(raw: str, redis_client):
data = json.loads(raw)
normalized = normalize_trade(data)
# Швидка публікація — не блокуємо event loop
await redis_client.publish(
f"trades:{normalized['exchange']}:{normalized['symbol']}",
json.dumps(normalized)
)
Моніторинг здоров'я
Метрики для кожного WebSocket з'єднання: messages per second, reconnect count, last message timestamp, lag від біржевого timestamp до processing timestamp. Grafana + Prometheus alerting на stale connections (> 60 сек без повідомлень по активній парі).
Настройка real-time парсингу для 3–5 бірж з мониторингом 20–50 пар, reconnect логікою та публікацією в Redis/Kafka — 1–2 тижні.







