Real-Time WebSocket Scraping Setup

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
Real-Time WebSocket Scraping Setup
Medium
~2-3 business days
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1217
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1046
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Setting Up Real-Time Scraping via WebSocket

Polling REST API every N seconds — the wrong tool for tasks requiring event reaction. Polling with 1-second interval has average event detection latency of 0.5 seconds. WebSocket subscription delivers event at moment of occurrence, latency determined only by network (10–50ms to nearest exchange server). For price monitoring, orderbook, and on-chain event monitoring, the difference is critical.

WebSocket Protocols of Exchanges

Each exchange has its own subscription protocol. Patterns are similar, details differ.

Binance: subscribe via JSON message, stream names in format symbol@streamType:

import asyncio
import json
import websockets

async def binance_stream(symbols: list[str]):
    streams = '/'.join([f"{s.lower()}@trade" for s in symbols])
    url = f"wss://stream.binance.com:9443/stream?streams={streams}"
    
    async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
        async for message in ws:
            data = json.loads(message)
            stream_data = data.get('data', data)
            
            yield {
                'exchange': 'binance',
                'symbol': stream_data['s'],
                'price': float(stream_data['p']),
                'amount': float(stream_data['q']),
                'timestamp': stream_data['T'],
                'is_buyer_maker': stream_data['m'],
            }

Coinbase Advanced Trade: uses subscribe message with channel and product_ids:

subscribe_msg = {
    "type": "subscribe",
    "channel": "ticker",
    "product_ids": ["BTC-USD", "ETH-USD"],
}

Kraken: requires subscription ID generation and has response format quirks with pair in array.

Ethereum/EVM: web3.py and ethers.js WebSocket

On-chain events via WebSocket subscriptions to Ethereum node (Alchemy, Infura, QuickNode, or own node):

from web3 import AsyncWeb3, WebSocketProvider

async def subscribe_to_transfers(token_address: str):
    w3 = AsyncWeb3(WebSocketProvider(
        "wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
    ))
    
    # ERC-20 Transfer event signature hash
    transfer_sig = w3.keccak(text="Transfer(address,address,uint256)").hex()
    
    subscription_id = await w3.eth.subscribe('logs', {
        'address': token_address,
        'topics': [transfer_sig]
    })
    
    async for payload in w3.socket.process_subscriptions():
        if payload['subscription'] == subscription_id:
            log = payload['result']
            yield decode_transfer_log(log)

Ethereum JSON-RPC WebSocket supports three subscription types: newHeads (new blocks), logs (contract events), newPendingTransactions (mempool transactions).

Managing Connections: Reconnect and Heartbeat

WebSocket connections break for various reasons: server timeout, network hiccup, service restart. Production system should auto-recover:

import asyncio
import websockets
from datetime import datetime

class RobustWebSocketClient:
    def __init__(self, url: str, reconnect_delay: float = 1.0):
        self.url = url
        self.reconnect_delay = reconnect_delay
        self.max_reconnect_delay = 60.0
        self.last_message_at = None
        self.stale_threshold = 30  # seconds without messages = staleness
    
    async def connect_with_retry(self, on_message, on_subscribe):
        delay = self.reconnect_delay
        
        while True:
            try:
                async with websockets.connect(
                    self.url,
                    ping_interval=20,
                    ping_timeout=10,
                    close_timeout=5,
                ) as ws:
                    await on_subscribe(ws)
                    delay = self.reconnect_delay  # reset on success
                    
                    async for msg in ws:
                        self.last_message_at = datetime.utcnow()
                        await on_message(msg)
                        
            except (websockets.ConnectionClosed, 
                    websockets.InvalidHandshake,
                    OSError) as e:
                print(f"Connection error: {e}, reconnecting in {delay}s")
                await asyncio.sleep(delay)
                delay = min(delay * 2, self.max_reconnect_delay)
    
    async def staleness_watchdog(self):
        """Detects frozen connection without explicit break"""
        while True:
            await asyncio.sleep(10)
            if self.last_message_at:
                elapsed = (datetime.utcnow() - self.last_message_at).seconds
                if elapsed > self.stale_threshold:
                    raise RuntimeError(f"Connection stale: {elapsed}s without data")

Order Book: Incremental Updates vs Snapshots

Most exchanges send orderbook as incremental updates — only changed levels. Maintain local accurate orderbook state:

from sortedcontainers import SortedDict

class LocalOrderBook:
    def __init__(self):
        self.bids = SortedDict(lambda k: -k)  # descending
        self.asks = SortedDict()               # ascending
        self.last_update_id = 0
    
    def apply_snapshot(self, snapshot: dict):
        self.bids.clear()
        self.asks.clear()
        for price, qty in snapshot['bids']:
            self.bids[float(price)] = float(qty)
        for price, qty in snapshot['asks']:
            self.asks[float(price)] = float(qty)
        self.last_update_id = snapshot['lastUpdateId']
    
    def apply_update(self, update: dict):
        if update['u'] <= self.last_update_id:
            return  # stale update, ignore
        
        for price, qty in update['b']:  # bids
            p, q = float(price), float(qty)
            if q == 0:
                self.bids.pop(p, None)
            else:
                self.bids[p] = q
        
        for price, qty in update['a']:  # asks
            p, q = float(price), float(qty)
            if q == 0:
                self.asks.pop(p, None)
            else:
                self.asks[p] = q
        
        self.last_update_id = update['u']
    
    def best_bid(self) -> tuple[float, float]:
        k = next(iter(self.bids))
        return k, self.bids[k]
    
    def best_ask(self) -> tuple[float, float]:
        k = next(iter(self.asks))
        return k, self.asks[k]

Important: on startup, get snapshot via REST, then apply WebSocket updates starting from lastUpdateId > snapshotId. Updates before snapshot are discarded, gap in sequence Uu requires fresh snapshot.

Scaling: Many Pairs and Exchanges

One async event loop in Python handles 50–200 simultaneous WebSocket connections. For more — multiple processes or Go service (goroutines are significantly lighter than asyncio tasks).

Fanout results: processed messages → Redis Pub/Sub or Kafka for downstream consumers. WebSocket handler should minimize processing and publish quickly — heavy processing in separate consumer.

async def handle_message(raw: str, redis_client):
    data = json.loads(raw)
    normalized = normalize_trade(data)
    
    # Quick publish — don't block event loop
    await redis_client.publish(
        f"trades:{normalized['exchange']}:{normalized['symbol']}",
        json.dumps(normalized)
    )

Health Monitoring

Metrics for each WebSocket connection: messages per second, reconnect count, last message timestamp, lag from exchange timestamp to processing timestamp. Grafana + Prometheus alerting on stale connections (> 60 sec without messages on active pair).

Setting up real-time parsing for 3–5 exchanges monitoring 20–50 pairs, reconnect logic and Redis/Kafka publishing — 1–2 weeks.