Real-time Order Book Data Parsing
Order book data is needed in three scenarios: building a trading bot, creating a liquidity aggregator, or monitoring markets. Each scenario has its own specifics, but the common problem is the same — stable high-frequency data reception without loss and with minimal latency.
WebSocket vs REST polling
REST polling (GET /api/v3/depth?symbol=BTCUSDT) — wrong choice for real-time order book. On active markets the stack updates 10–100 times per second. Polling once per second gives stale data and hits API limits. Correct approach — WebSocket streams with incremental updates.
Most major CEX (Binance, Bybit, OKX) follow one scheme:
- Get snapshot via REST (full stack at current moment)
- Subscribe to WebSocket update stream
- Apply updates to snapshot, maintaining local stack copy
import asyncio, json, aiohttp
from sortedcontainers import SortedDict
class OrderBook:
def __init__(self):
self.bids = SortedDict(lambda x: -x) # descending order
self.asks = SortedDict()
self.last_update_id = 0
def apply_update(self, bids: list, asks: list, update_id: int):
if update_id <= self.last_update_id:
return # stale update, ignore
for price, qty in bids:
price, qty = float(price), float(qty)
if qty == 0:
self.bids.pop(price, None) # remove level
else:
self.bids[price] = qty
for price, qty in asks:
price, qty = float(price), float(qty)
if qty == 0:
self.asks.pop(price, None)
else:
self.asks[price] = qty
self.last_update_id = update_id
@property
def best_bid(self) -> tuple[float, float] | None:
if self.bids:
price = self.bids.keys()[0]
return price, self.bids[price]
return None
@property
def best_ask(self) -> tuple[float, float] | None:
if self.asks:
price = self.asks.keys()[0]
return price, self.asks[price]
return None
Binance depth stream: protocol details
Binance — the most common request. They have two stream variants:
-
btcusdt@depth— updates every 100ms or 1000ms (parameter@depth@100ms) -
btcusdt@depth20— top-20 levels every 100ms (no incremental updates, always full)
For full stack with patch application:
async def maintain_binance_orderbook(symbol: str):
ob = OrderBook()
buffer = [] # buffer updates until we get snapshot
async def handle_ws_message(msg):
data = json.loads(msg)
# Accumulate updates WHILE we haven't got snapshot
if ob.last_update_id == 0:
buffer.append(data)
return
# Binance: update valid if U <= lastUpdateId+1 <= u
if data['U'] <= ob.last_update_id + 1 <= data['u']:
ob.apply_update(data['b'], data['a'], data['u'])
# Start WS
ws_task = asyncio.create_task(connect_ws(
f"wss://stream.binance.com:9443/ws/{symbol.lower()}@depth@100ms",
handle_ws_message
))
# Get snapshot (wait a bit for buffer to accumulate)
await asyncio.sleep(0.5)
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://api.binance.com/api/v3/depth",
params={"symbol": symbol.upper(), "limit": 1000}
) as resp:
snapshot = await resp.json()
# Initialize book from snapshot
for price, qty in snapshot['bids']:
ob.bids[float(price)] = float(qty)
for price, qty in snapshot['asks']:
ob.asks[float(price)] = float(qty)
ob.last_update_id = snapshot['lastUpdateId']
# Apply buffered updates
for update in buffer:
if update['u'] > ob.last_update_id:
ob.apply_update(update['b'], update['a'], update['u'])
await ws_task
Critical point: if an update is missed (gap in U → u sequence) — stack is out of sync. Need resync logic: detect gap and reinitialize from new snapshot.
Aggregation of multiple exchanges
For liquidity aggregator or cross-exchange arbitrage — maintain stacks of multiple exchanges in parallel:
EXCHANGES = {
"binance": BinanceOrderBook,
"bybit": BybitOrderBook,
"okx": OKXOrderBook,
}
async def run_aggregator(symbol: str):
books = {name: cls(symbol) for name, cls in EXCHANGES.items()}
tasks = [book.run() for book in books.values()]
await asyncio.gather(*tasks)
def get_best_price_across_exchanges(books: dict[str, OrderBook]) -> dict:
best_bids = [(name, *ob.best_bid) for name, ob in books.items() if ob.best_bid]
best_asks = [(name, *ob.best_ask) for name, ob in books.items() if ob.best_ask]
best_bids.sort(key=lambda x: x[1], reverse=True)
best_asks.sort(key=lambda x: x[1])
return {
"best_bid": {"exchange": best_bids[0][0], "price": best_bids[0][1], "qty": best_bids[0][2]},
"best_ask": {"exchange": best_asks[0][0], "price": best_asks[0][1], "qty": best_asks[0][2]},
"spread": best_asks[0][1] - best_bids[0][1]
}
Storage and replay
For backtesting or audit — store update stream, not just snapshots. L2 order book updates — large volume: for BTC/USDT on Binance ~100MB/hour uncompressed.
Efficient storage:
# Record in binary format via msgpack
import msgpack, lz4.frame
def serialize_update(update: dict) -> bytes:
packed = msgpack.packb(update, use_bin_type=True)
return lz4.frame.compress(packed)
# TimescaleDB for time-series storage
# Hypertable automatically partitions by time
CREATE TABLE ob_updates (
time TIMESTAMPTZ NOT NULL,
exchange TEXT NOT NULL,
symbol TEXT NOT NULL,
side CHAR(1) NOT NULL, -- 'b' or 'a'
price NUMERIC NOT NULL,
quantity NUMERIC NOT NULL
);
SELECT create_hypertable('ob_updates', 'time');
Rate limits and connection management
Binance: maximum 5 WebSocket connections per stream on public access, 1024 on API key. On disconnect — exponential backoff:
async def connect_ws_with_retry(url: str, handler, max_retries=10):
for attempt in range(max_retries):
try:
async with websockets.connect(url, ping_interval=20) as ws:
async for message in ws:
await handler(message)
except (websockets.exceptions.ConnectionClosed, Exception) as e:
wait = min(2 ** attempt, 60) # max 60 seconds
logging.warning(f"WS disconnected: {e}, retry in {wait}s")
await asyncio.sleep(wait)
Complete system aggregating 3–5 exchanges with TimescaleDB storage and REST API for queries: 3–4 weeks.







