Real-Time Order Book Data Scraping from Exchanges

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
Real-Time Order Book Data Scraping from Exchanges
Medium
~2-3 business days
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1217
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1046
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Real-time Order Book Data Parsing

Order book data is needed in three scenarios: building a trading bot, creating a liquidity aggregator, or monitoring markets. Each scenario has its own specifics, but the common problem is the same — stable high-frequency data reception without loss and with minimal latency.

WebSocket vs REST polling

REST polling (GET /api/v3/depth?symbol=BTCUSDT) — wrong choice for real-time order book. On active markets the stack updates 10–100 times per second. Polling once per second gives stale data and hits API limits. Correct approach — WebSocket streams with incremental updates.

Most major CEX (Binance, Bybit, OKX) follow one scheme:

  1. Get snapshot via REST (full stack at current moment)
  2. Subscribe to WebSocket update stream
  3. Apply updates to snapshot, maintaining local stack copy
import asyncio, json, aiohttp
from sortedcontainers import SortedDict

class OrderBook:
    def __init__(self):
        self.bids = SortedDict(lambda x: -x)  # descending order
        self.asks = SortedDict()
        self.last_update_id = 0

    def apply_update(self, bids: list, asks: list, update_id: int):
        if update_id <= self.last_update_id:
            return  # stale update, ignore

        for price, qty in bids:
            price, qty = float(price), float(qty)
            if qty == 0:
                self.bids.pop(price, None)  # remove level
            else:
                self.bids[price] = qty

        for price, qty in asks:
            price, qty = float(price), float(qty)
            if qty == 0:
                self.asks.pop(price, None)
            else:
                self.asks[price] = qty

        self.last_update_id = update_id

    @property
    def best_bid(self) -> tuple[float, float] | None:
        if self.bids:
            price = self.bids.keys()[0]
            return price, self.bids[price]
        return None

    @property
    def best_ask(self) -> tuple[float, float] | None:
        if self.asks:
            price = self.asks.keys()[0]
            return price, self.asks[price]
        return None

Binance depth stream: protocol details

Binance — the most common request. They have two stream variants:

  • btcusdt@depth — updates every 100ms or 1000ms (parameter @depth@100ms)
  • btcusdt@depth20 — top-20 levels every 100ms (no incremental updates, always full)

For full stack with patch application:

async def maintain_binance_orderbook(symbol: str):
    ob = OrderBook()
    buffer = []  # buffer updates until we get snapshot

    async def handle_ws_message(msg):
        data = json.loads(msg)
        # Accumulate updates WHILE we haven't got snapshot
        if ob.last_update_id == 0:
            buffer.append(data)
            return

        # Binance: update valid if U <= lastUpdateId+1 <= u
        if data['U'] <= ob.last_update_id + 1 <= data['u']:
            ob.apply_update(data['b'], data['a'], data['u'])

    # Start WS
    ws_task = asyncio.create_task(connect_ws(
        f"wss://stream.binance.com:9443/ws/{symbol.lower()}@depth@100ms",
        handle_ws_message
    ))

    # Get snapshot (wait a bit for buffer to accumulate)
    await asyncio.sleep(0.5)
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://api.binance.com/api/v3/depth",
            params={"symbol": symbol.upper(), "limit": 1000}
        ) as resp:
            snapshot = await resp.json()

    # Initialize book from snapshot
    for price, qty in snapshot['bids']:
        ob.bids[float(price)] = float(qty)
    for price, qty in snapshot['asks']:
        ob.asks[float(price)] = float(qty)
    ob.last_update_id = snapshot['lastUpdateId']

    # Apply buffered updates
    for update in buffer:
        if update['u'] > ob.last_update_id:
            ob.apply_update(update['b'], update['a'], update['u'])

    await ws_task

Critical point: if an update is missed (gap in Uu sequence) — stack is out of sync. Need resync logic: detect gap and reinitialize from new snapshot.

Aggregation of multiple exchanges

For liquidity aggregator or cross-exchange arbitrage — maintain stacks of multiple exchanges in parallel:

EXCHANGES = {
    "binance": BinanceOrderBook,
    "bybit": BybitOrderBook,
    "okx": OKXOrderBook,
}

async def run_aggregator(symbol: str):
    books = {name: cls(symbol) for name, cls in EXCHANGES.items()}
    tasks = [book.run() for book in books.values()]
    await asyncio.gather(*tasks)

def get_best_price_across_exchanges(books: dict[str, OrderBook]) -> dict:
    best_bids = [(name, *ob.best_bid) for name, ob in books.items() if ob.best_bid]
    best_asks = [(name, *ob.best_ask) for name, ob in books.items() if ob.best_ask]

    best_bids.sort(key=lambda x: x[1], reverse=True)
    best_asks.sort(key=lambda x: x[1])

    return {
        "best_bid": {"exchange": best_bids[0][0], "price": best_bids[0][1], "qty": best_bids[0][2]},
        "best_ask": {"exchange": best_asks[0][0], "price": best_asks[0][1], "qty": best_asks[0][2]},
        "spread": best_asks[0][1] - best_bids[0][1]
    }

Storage and replay

For backtesting or audit — store update stream, not just snapshots. L2 order book updates — large volume: for BTC/USDT on Binance ~100MB/hour uncompressed.

Efficient storage:

# Record in binary format via msgpack
import msgpack, lz4.frame

def serialize_update(update: dict) -> bytes:
    packed = msgpack.packb(update, use_bin_type=True)
    return lz4.frame.compress(packed)

# TimescaleDB for time-series storage
# Hypertable automatically partitions by time
CREATE TABLE ob_updates (
    time        TIMESTAMPTZ NOT NULL,
    exchange    TEXT NOT NULL,
    symbol      TEXT NOT NULL,
    side        CHAR(1) NOT NULL,  -- 'b' or 'a'
    price       NUMERIC NOT NULL,
    quantity    NUMERIC NOT NULL
);
SELECT create_hypertable('ob_updates', 'time');

Rate limits and connection management

Binance: maximum 5 WebSocket connections per stream on public access, 1024 on API key. On disconnect — exponential backoff:

async def connect_ws_with_retry(url: str, handler, max_retries=10):
    for attempt in range(max_retries):
        try:
            async with websockets.connect(url, ping_interval=20) as ws:
                async for message in ws:
                    await handler(message)
        except (websockets.exceptions.ConnectionClosed, Exception) as e:
            wait = min(2 ** attempt, 60)  # max 60 seconds
            logging.warning(f"WS disconnected: {e}, retry in {wait}s")
            await asyncio.sleep(wait)

Complete system aggregating 3–5 exchanges with TimescaleDB storage and REST API for queries: 3–4 weeks.