WebSocket market data aggregator development

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1All 1306 services
WebSocket market data aggregator development
Medium
~5 days
Frequently Asked Questions

Blockchain Development Services

Blockchain Development Stages

Latest works

  • image_website-b2b-advance_0.webp
    B2B ADVANCE company website development
    1288
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    902
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1122
  • image_logo-advance_0.webp
    B2B Advance company logo design
    589
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    859

WebSocket Market Data Aggregator Development

A WebSocket aggregator is a service that maintains persistent connections with exchanges and transmits normalized market data to downstream consumers: trading bots, dashboards, risk management systems. The key quality parameter is latency from exchange to consumer and resilience to connection breaks.

Connection Topology

Each exchange has its own WebSocket limitations:

Exchange Max streams / conn Ping interval Max connections
Binance 1024 3 min Unlimited
Bybit 10 topics / conn 20 sec Unlimited
OKX 240 channels / conn 30 sec Unlimited
Kraken Not documented Adaptive Unlimited

The aggregator creates as many connections as needed to cover all subscriptions, distributing symbols across connections according to limits.

Connection Manager

class ConnectionManager:
    def __init__(self, max_per_conn: int = 900):
        self.connections: list[WSConnection] = []
        self.max_per_conn = max_per_conn
        self.subscriptions: dict[str, WSConnection] = {}

    async def subscribe(self, channels: list[str]):
        for channel in channels:
            conn = self._find_or_create_connection()
            await conn.subscribe(channel)
            self.subscriptions[channel] = conn

    def _find_or_create_connection(self) -> WSConnection:
        for conn in self.connections:
            if conn.subscription_count < self.max_per_conn:
                return conn
        new_conn = WSConnection(self.on_message, self.on_disconnect)
        self.connections.append(new_conn)
        return new_conn

    async def on_disconnect(self, conn: WSConnection):
        # Exponential backoff and resubscription
        await asyncio.sleep(conn.backoff.next())
        await conn.reconnect()
        await conn.resubscribe()

Heartbeat and Stale Connection Detection

Exchanges may go silent without explicit disconnect — TCP connection is alive, but no data. Watchdog timer for each connection:

class HeartbeatMonitor:
    STALE_THRESHOLD_SEC = 30

    async def watch(self, conn: WSConnection):
        while True:
            await asyncio.sleep(5)
            age = time.time() - conn.last_message_time
            if age > self.STALE_THRESHOLD_SEC:
                logger.warning(f"Stale connection detected, forcing reconnect")
                await conn.force_reconnect()

Data Publishing to Consumers

The aggregator publishes normalized data through several channels depending on requirements:

Redis Pub/Sub — for real-time distribution with minimal latency. Suitable when data doesn't need to be persisted.

Redis Streams — for reliable delivery with the ability to read missed messages (consumer groups, persistent log).

Kafka — for high-load systems with guaranteed delivery and partitioning by symbol.

gRPC streaming — for direct client-aggregator connections with low latency.

Performance Metrics

The aggregator should export Prometheus metrics for monitoring:

  • ws_messages_received_total{exchange, channel} — counter of incoming messages
  • ws_message_latency_ms{exchange} — delay from exchange timestamp to receipt
  • ws_reconnects_total{exchange} — number of reconnections
  • ws_active_connections{exchange} — current number of connections
  • ws_subscription_count{exchange} — number of active subscriptions

With proper implementation on Python (asyncio), the aggregator processes 50,000–100,000 messages per second on one core. On Go or Rust — an order of magnitude more.