Разработка WebSocket-агрегатора рыночных данных
WebSocket-агрегатор — это сервис, который поддерживает постоянные соединения с биржами и транслирует нормализованные рыночные данные downstream-потребителям: торговым ботам, дашбордам, системам риск-менеджмента. Ключевой параметр качества — latency от биржи до потребителя и устойчивость к разрывам соединений.
Топология соединений
Каждая биржа имеет собственные ограничения на WebSocket:
| Биржа | Max streams / conn | Ping interval | Max connections |
|---|---|---|---|
| Binance | 1024 | 3 мин | Не ограничено |
| Bybit | 10 тем / conn | 20 сек | Не ограничено |
| OKX | 240 каналов / conn | 30 сек | Не ограничено |
| Kraken | Не задокументировано | Adaptive | Не ограничено |
Агрегатор создаёт столько соединений, сколько нужно для охвата всех подписок, распределяя символы по соединениям с учётом лимитов.
Connection Manager
class ConnectionManager:
def __init__(self, max_per_conn: int = 900):
self.connections: list[WSConnection] = []
self.max_per_conn = max_per_conn
self.subscriptions: dict[str, WSConnection] = {}
async def subscribe(self, channels: list[str]):
for channel in channels:
conn = self._find_or_create_connection()
await conn.subscribe(channel)
self.subscriptions[channel] = conn
def _find_or_create_connection(self) -> WSConnection:
for conn in self.connections:
if conn.subscription_count < self.max_per_conn:
return conn
new_conn = WSConnection(self.on_message, self.on_disconnect)
self.connections.append(new_conn)
return new_conn
async def on_disconnect(self, conn: WSConnection):
# Экспоненциальный backoff и переподписка
await asyncio.sleep(conn.backoff.next())
await conn.reconnect()
await conn.resubscribe()
Heartbeat и обнаружение stale-соединений
Биржи могут "замолчать" без явного разрыва — TCP-соединение живо, но данных нет. Watchdog-таймер для каждого соединения:
class HeartbeatMonitor:
STALE_THRESHOLD_SEC = 30
async def watch(self, conn: WSConnection):
while True:
await asyncio.sleep(5)
age = time.time() - conn.last_message_time
if age > self.STALE_THRESHOLD_SEC:
logger.warning(f"Stale connection detected, forcing reconnect")
await conn.force_reconnect()
Публикация данных потребителям
Агрегатор публикует нормализованные данные через несколько каналов в зависимости от потребностей:
Redis Pub/Sub — для real-time рассылки с минимальной latency. Подходит когда данные не нужно сохранять.
Redis Streams — для надёжной доставки с возможностью чтения пропущенных сообщений (consumer groups, persistent log).
Kafka — для высоконагруженных систем с требованием гарантированной доставки и партиционирования по символу.
gRPC streaming — для прямых соединений клиент-агрегатор с low latency.
Метрики производительности
Агрегатор должен экспортировать Prometheus-метрики для мониторинга:
-
ws_messages_received_total{exchange, channel}— счётчик входящих сообщений -
ws_message_latency_ms{exchange}— задержка от биржевого timestamp до получения -
ws_reconnects_total{exchange}— количество переподключений -
ws_active_connections{exchange}— текущее количество соединений -
ws_subscription_count{exchange}— количество активных подписок
При правильной реализации на Python (asyncio) агрегатор обрабатывает 50,000–100,000 сообщений в секунду на одном ядре. На Go или Rust — на порядок больше.







