Parsing Trade Data from Exchanges
Task sounds simple until hitting API limitations: Binance bans IP after 1200 requests per minute, Bybit WebSocket disconnects every 5 minutes, DEX data decentralized across dozens of nodes. Reliable trade data collection in production — not "call REST endpoint", it's connection management, rate limits, format normalization and guaranteed delivery without losses.
CEX: REST vs WebSocket
For real-time only WebSocket. REST polling adds delay equal to poll interval (500ms-2sec), misses trades during high load, quickly hits rate limits.
Binance WebSocket Streams:
import asyncio
import websockets
import json
async def subscribe_binance_trades(symbols: list[str]):
streams = "/".join(f"{s.lower()}@trade" for s in symbols)
url = f"wss://stream.binance.com:9443/stream?streams={streams}"
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
async for message in ws:
data = json.loads(message)
trade = data['data']
yield {
'exchange': 'binance',
'symbol': trade['s'],
'trade_id': trade['t'],
'price': float(trade['p']),
'qty': float(trade['q']),
'timestamp_ms': trade['T'],
'side': 'sell' if trade['m'] else 'buy', # m=True if maker=buyer (then taker=seller)
}
Field m (is_buyer_maker) in Binance — intuition inversion. m=true means buyer was maker, so aggressor — seller. Normalize correctly.
Managing multiple exchanges via CCXT:
import ccxt.pro as ccxtpro
import asyncio
async def collect_trades(exchange_id: str, symbols: list[str], queue: asyncio.Queue):
exchange = getattr(ccxtpro, exchange_id)({
'enableRateLimit': True,
'options': {'tradesLimit': 1000},
})
try:
while True:
try:
trades = await exchange.watch_trades_for_symbols(symbols)
for trade in trades:
await queue.put({
'exchange': exchange_id,
'symbol': trade['symbol'],
'id': trade['id'],
'price': trade['price'],
'amount': trade['amount'],
'side': trade['side'],
'timestamp': trade['timestamp'],
})
except Exception as e:
print(f"Error {exchange_id}: {e}, reconnecting...")
await asyncio.sleep(1)
finally:
await exchange.close()
CCXT Pro supports 30+ exchanges with unified watch_trades interface. Inside — WebSocket with auto-reconnect.
DEX: On-chain Events
On DEX trades are smart contract events. Three methods to get:
The Graph subgraph — ready data via GraphQL. For Uniswap V3:
{
swaps(
first: 100
orderBy: timestamp
orderDirection: desc
where: { pool: "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" }
) {
id
timestamp
amount0
amount1
sqrtPriceX96
tick
transaction { id }
}
}
Delay — 1-5 minutes from on-chain event.
Direct RPC monitoring — eth_subscribe("logs", filter) on Uniswap V3 Swap event:
import { createPublicClient, webSocket, parseAbiItem } from 'viem';
const SWAP_EVENT = parseAbiItem(
'event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)'
);
client.watchContractEvent({
address: UNISWAP_V3_POOL,
event: SWAP_EVENT,
onLogs: (logs) => {
for (const log of logs) {
const { amount0, amount1, sqrtPriceX96 } = log.args;
const price = sqrtPriceX96ToPrice(sqrtPriceX96, token0Decimals, token1Decimals);
processSwap({ price, amount0, amount1, txHash: log.transactionHash });
}
}
});
Price in Uniswap V3 stored as sqrtPriceX96 (Q64.96 fixed point). Decoding:
function sqrtPriceX96ToPrice(sqrtPriceX96: bigint, d0: number, d1: number): number {
const price = Number(sqrtPriceX96 ** 2n * BigInt(10 ** d0)) /
Number(BigInt(2 ** 192) * BigInt(10 ** d1));
return price;
}
Rate Limits and Bypass
Binance: 1200 requests/min on IP for REST, WebSocket limit — 300 streams per connection, up to 5 connections from one IP. When working with large number of pairs — multiple connections, each 300 pairs.
Bybit, OKX: similar limits. Bybit disconnects WebSocket every 5 minutes with no activity — need ping every 20 sec.
IP rotation via proxy works for REST, not WebSocket (long-term connections). For high-frequency data collection from multiple exchanges — multiple VPS in different datacenters.
Storage and Normalization
Normalized schema for cross-exchange trades:
CREATE TABLE trades (
id BIGSERIAL PRIMARY KEY,
exchange VARCHAR(50) NOT NULL,
symbol VARCHAR(30) NOT NULL, -- normalized: 'BTC/USDT'
trade_id VARCHAR(100), -- original ID from exchange
price NUMERIC(30, 10) NOT NULL,
quantity NUMERIC(30, 10) NOT NULL,
side CHAR(4) NOT NULL, -- 'buy' / 'sell'
ts TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (ts);
-- Daily partition for manageable size
CREATE TABLE trades_2024_11 PARTITION OF trades
FOR VALUES FROM ('2024-11-01') TO ('2024-12-01');
CREATE INDEX ON trades (exchange, symbol, ts DESC);
CREATE INDEX ON trades (symbol, ts DESC);
TimescaleDB hypertable does this automatically and adds time_bucket aggregations for OHLCV.
Deduplication. On WebSocket reconnect often resends last N trades. Unique constraint on (exchange, trade_id) prevents duplicates.







