Розробка WebSocket-агрегатора ринкових даних
WebSocket-агрегатор — це сервіс, який підтримує постійні з'єднання з біржами та транслює нормалізовані ринкові дані downstream-споживачам: торговельним ботам, дашбордам, системам риск-менеджменту. Ключовий параметр якості — latency від біржі до споживача та стійкість до розривів з'єднань.
Топологія з'єднань
Кожна біржа має власні обмеження на WebSocket:
| Біржа | Max streams / conn | Ping interval | Max connections |
|---|---|---|---|
| Binance | 1024 | 3 хв | Не обмежено |
| Bybit | 10 тем / conn | 20 сек | Не обмежено |
| OKX | 240 каналів / conn | 30 сек | Не обмежено |
| Kraken | Не задокументовано | Adaptive | Не обмежено |
Агрегатор створює стільки з'єднань, скільки потрібно для охоплення всіх підписок, розподіляючи символи по з'єднанням з урахуванням лімітів.
Connection Manager
class ConnectionManager:
def __init__(self, max_per_conn: int = 900):
self.connections: list[WSConnection] = []
self.max_per_conn = max_per_conn
self.subscriptions: dict[str, WSConnection] = {}
async def subscribe(self, channels: list[str]):
for channel in channels:
conn = self._find_or_create_connection()
await conn.subscribe(channel)
self.subscriptions[channel] = conn
def _find_or_create_connection(self) -> WSConnection:
for conn in self.connections:
if conn.subscription_count < self.max_per_conn:
return conn
new_conn = WSConnection(self.on_message, self.on_disconnect)
self.connections.append(new_conn)
return new_conn
async def on_disconnect(self, conn: WSConnection):
# Exponential backoff та переписка
await asyncio.sleep(conn.backoff.next())
await conn.reconnect()
await conn.resubscribe()
Heartbeat та виявлення stale-з'єднань
Біржи можуть "замовкнути" без явного розриву — TCP-з'єднання живо, але даних немає. Watchdog-таймер для кожного з'єднання:
class HeartbeatMonitor:
STALE_THRESHOLD_SEC = 30
async def watch(self, conn: WSConnection):
while True:
await asyncio.sleep(5)
age = time.time() - conn.last_message_time
if age > self.STALE_THRESHOLD_SEC:
logger.warning(f"Stale connection detected, forcing reconnect")
await conn.force_reconnect()
Публікація даних спожива чам
Агрегатор публікує нормалізовані дані через кілька каналів залежно від потреб:
Redis Pub/Sub — для real-time розсилки з мінімальною latency. Підходить коли дані не потрібно зберігати.
Redis Streams — для надійної доставки з можливістю читання пропущених повідомлень (consumer groups, persistent log).
Kafka — для високонавантажених систем з вимогою гарантованої доставки та партиціонування за символом.
gRPC streaming — для прямих з'єднань клієнт-агрегатор з низькою latency.
Метрики продуктивності
Агрегатор повинен експортувати Prometheus-метрики для моніторингу:
-
ws_messages_received_total{exchange, channel}— лічильник вхідних повідомлень -
ws_message_latency_ms{exchange}— затримка від біржевого timestamp до отримання -
ws_reconnects_total{exchange}— кількість переподключень -
ws_active_connections{exchange}— поточна кількість з'єднань -
ws_subscription_count{exchange}— кількість активних підписок
При правильній реалізації на Python (asyncio) агрегатор обробляє 50,000–100,000 повідомлень в секунду на одному ядрі. На Go або Rust — на порядок більше.







