Розробка WebSocket-агрегатора ринкових даних

Проєктуємо та розробляємо блокчейн-рішення повного циклу: від архітектури смарт-контрактів до запуску DeFi-протоколів, NFT-маркетплейсів та криптобірж. Аудит безпеки, токеноміка, інтеграція з наявною інфраструктурою.
Показано 1 з 1Усі 1306 послуг
Розробка WebSocket-агрегатора ринкових даних
Середній
~5 днів
Часті запитання

Напрямки блокчейн-розробки

Етапи блокчейн-розробки

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1122
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    860

Розробка WebSocket-агрегатора ринкових даних

WebSocket-агрегатор — це сервіс, який підтримує постійні з'єднання з біржами та транслює нормалізовані ринкові дані downstream-споживачам: торговельним ботам, дашбордам, системам риск-менеджменту. Ключовий параметр якості — latency від біржі до споживача та стійкість до розривів з'єднань.

Топологія з'єднань

Кожна біржа має власні обмеження на WebSocket:

Біржа Max streams / conn Ping interval Max connections
Binance 1024 3 хв Не обмежено
Bybit 10 тем / conn 20 сек Не обмежено
OKX 240 каналів / conn 30 сек Не обмежено
Kraken Не задокументовано Adaptive Не обмежено

Агрегатор створює стільки з'єднань, скільки потрібно для охоплення всіх підписок, розподіляючи символи по з'єднанням з урахуванням лімітів.

Connection Manager

class ConnectionManager:
    def __init__(self, max_per_conn: int = 900):
        self.connections: list[WSConnection] = []
        self.max_per_conn = max_per_conn
        self.subscriptions: dict[str, WSConnection] = {}

    async def subscribe(self, channels: list[str]):
        for channel in channels:
            conn = self._find_or_create_connection()
            await conn.subscribe(channel)
            self.subscriptions[channel] = conn

    def _find_or_create_connection(self) -> WSConnection:
        for conn in self.connections:
            if conn.subscription_count < self.max_per_conn:
                return conn
        new_conn = WSConnection(self.on_message, self.on_disconnect)
        self.connections.append(new_conn)
        return new_conn

    async def on_disconnect(self, conn: WSConnection):
        # Exponential backoff та переписка
        await asyncio.sleep(conn.backoff.next())
        await conn.reconnect()
        await conn.resubscribe()

Heartbeat та виявлення stale-з'єднань

Біржи можуть "замовкнути" без явного розриву — TCP-з'єднання живо, але даних немає. Watchdog-таймер для кожного з'єднання:

class HeartbeatMonitor:
    STALE_THRESHOLD_SEC = 30

    async def watch(self, conn: WSConnection):
        while True:
            await asyncio.sleep(5)
            age = time.time() - conn.last_message_time
            if age > self.STALE_THRESHOLD_SEC:
                logger.warning(f"Stale connection detected, forcing reconnect")
                await conn.force_reconnect()

Публікація даних спожива чам

Агрегатор публікує нормалізовані дані через кілька каналів залежно від потреб:

Redis Pub/Sub — для real-time розсилки з мінімальною latency. Підходить коли дані не потрібно зберігати.

Redis Streams — для надійної доставки з можливістю читання пропущених повідомлень (consumer groups, persistent log).

Kafka — для високонавантажених систем з вимогою гарантованої доставки та партиціонування за символом.

gRPC streaming — для прямих з'єднань клієнт-агрегатор з низькою latency.

Метрики продуктивності

Агрегатор повинен експортувати Prometheus-метрики для моніторингу:

  • ws_messages_received_total{exchange, channel} — лічильник вхідних повідомлень
  • ws_message_latency_ms{exchange} — затримка від біржевого timestamp до отримання
  • ws_reconnects_total{exchange} — кількість переподключень
  • ws_active_connections{exchange} — поточна кількість з'єднань
  • ws_subscription_count{exchange} — кількість активних підписок

При правильній реалізації на Python (asyncio) агрегатор обробляє 50,000–100,000 повідомлень в секунду на одному ядрі. На Go або Rust — на порядок більше.