Setting Up Real-Time Scraping via WebSocket
Polling REST API every N seconds — the wrong tool for tasks requiring event reaction. Polling with 1-second interval has average event detection latency of 0.5 seconds. WebSocket subscription delivers event at moment of occurrence, latency determined only by network (10–50ms to nearest exchange server). For price monitoring, orderbook, and on-chain event monitoring, the difference is critical.
WebSocket Protocols of Exchanges
Each exchange has its own subscription protocol. Patterns are similar, details differ.
Binance: subscribe via JSON message, stream names in format symbol@streamType:
import asyncio
import json
import websockets
async def binance_stream(symbols: list[str]):
streams = '/'.join([f"{s.lower()}@trade" for s in symbols])
url = f"wss://stream.binance.com:9443/stream?streams={streams}"
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
async for message in ws:
data = json.loads(message)
stream_data = data.get('data', data)
yield {
'exchange': 'binance',
'symbol': stream_data['s'],
'price': float(stream_data['p']),
'amount': float(stream_data['q']),
'timestamp': stream_data['T'],
'is_buyer_maker': stream_data['m'],
}
Coinbase Advanced Trade: uses subscribe message with channel and product_ids:
subscribe_msg = {
"type": "subscribe",
"channel": "ticker",
"product_ids": ["BTC-USD", "ETH-USD"],
}
Kraken: requires subscription ID generation and has response format quirks with pair in array.
Ethereum/EVM: web3.py and ethers.js WebSocket
On-chain events via WebSocket subscriptions to Ethereum node (Alchemy, Infura, QuickNode, or own node):
from web3 import AsyncWeb3, WebSocketProvider
async def subscribe_to_transfers(token_address: str):
w3 = AsyncWeb3(WebSocketProvider(
"wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
))
# ERC-20 Transfer event signature hash
transfer_sig = w3.keccak(text="Transfer(address,address,uint256)").hex()
subscription_id = await w3.eth.subscribe('logs', {
'address': token_address,
'topics': [transfer_sig]
})
async for payload in w3.socket.process_subscriptions():
if payload['subscription'] == subscription_id:
log = payload['result']
yield decode_transfer_log(log)
Ethereum JSON-RPC WebSocket supports three subscription types: newHeads (new blocks), logs (contract events), newPendingTransactions (mempool transactions).
Managing Connections: Reconnect and Heartbeat
WebSocket connections break for various reasons: server timeout, network hiccup, service restart. Production system should auto-recover:
import asyncio
import websockets
from datetime import datetime
class RobustWebSocketClient:
def __init__(self, url: str, reconnect_delay: float = 1.0):
self.url = url
self.reconnect_delay = reconnect_delay
self.max_reconnect_delay = 60.0
self.last_message_at = None
self.stale_threshold = 30 # seconds without messages = staleness
async def connect_with_retry(self, on_message, on_subscribe):
delay = self.reconnect_delay
while True:
try:
async with websockets.connect(
self.url,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
) as ws:
await on_subscribe(ws)
delay = self.reconnect_delay # reset on success
async for msg in ws:
self.last_message_at = datetime.utcnow()
await on_message(msg)
except (websockets.ConnectionClosed,
websockets.InvalidHandshake,
OSError) as e:
print(f"Connection error: {e}, reconnecting in {delay}s")
await asyncio.sleep(delay)
delay = min(delay * 2, self.max_reconnect_delay)
async def staleness_watchdog(self):
"""Detects frozen connection without explicit break"""
while True:
await asyncio.sleep(10)
if self.last_message_at:
elapsed = (datetime.utcnow() - self.last_message_at).seconds
if elapsed > self.stale_threshold:
raise RuntimeError(f"Connection stale: {elapsed}s without data")
Order Book: Incremental Updates vs Snapshots
Most exchanges send orderbook as incremental updates — only changed levels. Maintain local accurate orderbook state:
from sortedcontainers import SortedDict
class LocalOrderBook:
def __init__(self):
self.bids = SortedDict(lambda k: -k) # descending
self.asks = SortedDict() # ascending
self.last_update_id = 0
def apply_snapshot(self, snapshot: dict):
self.bids.clear()
self.asks.clear()
for price, qty in snapshot['bids']:
self.bids[float(price)] = float(qty)
for price, qty in snapshot['asks']:
self.asks[float(price)] = float(qty)
self.last_update_id = snapshot['lastUpdateId']
def apply_update(self, update: dict):
if update['u'] <= self.last_update_id:
return # stale update, ignore
for price, qty in update['b']: # bids
p, q = float(price), float(qty)
if q == 0:
self.bids.pop(p, None)
else:
self.bids[p] = q
for price, qty in update['a']: # asks
p, q = float(price), float(qty)
if q == 0:
self.asks.pop(p, None)
else:
self.asks[p] = q
self.last_update_id = update['u']
def best_bid(self) -> tuple[float, float]:
k = next(iter(self.bids))
return k, self.bids[k]
def best_ask(self) -> tuple[float, float]:
k = next(iter(self.asks))
return k, self.asks[k]
Important: on startup, get snapshot via REST, then apply WebSocket updates starting from lastUpdateId > snapshotId. Updates before snapshot are discarded, gap in sequence U → u requires fresh snapshot.
Scaling: Many Pairs and Exchanges
One async event loop in Python handles 50–200 simultaneous WebSocket connections. For more — multiple processes or Go service (goroutines are significantly lighter than asyncio tasks).
Fanout results: processed messages → Redis Pub/Sub or Kafka for downstream consumers. WebSocket handler should minimize processing and publish quickly — heavy processing in separate consumer.
async def handle_message(raw: str, redis_client):
data = json.loads(raw)
normalized = normalize_trade(data)
# Quick publish — don't block event loop
await redis_client.publish(
f"trades:{normalized['exchange']}:{normalized['symbol']}",
json.dumps(normalized)
)
Health Monitoring
Metrics for each WebSocket connection: messages per second, reconnect count, last message timestamp, lag from exchange timestamp to processing timestamp. Grafana + Prometheus alerting on stale connections (> 60 sec without messages on active pair).
Setting up real-time parsing for 3–5 exchanges monitoring 20–50 pairs, reconnect logic and Redis/Kafka publishing — 1–2 weeks.







