Розробка системи агрегації даних з кількох бірж
Торговельні системи, що працюють одночасно на кількох біржах, стикаються з фундаментальною проблемою: кожна біржа має свій WebSocket API, свій формат даних, свої обмеження з частоти запитів і свої особливості в поведінці. Агрегатор перетворює цей зоопарк у єдиний нормалізований потік.
Архітектура агрегатора
Система будується за принципом fan-in: множина джерел даних збирається в єдиний нормалізований потік.
Exchange Connectors — окремий модуль для кожної біржі. Відповідає за встановлення WebSocket-з'єднання, підписку на потрібні канали, обробку reconnects та помилок, парсинг raw-формату біржі в нормалізований.
Normalization Layer — перетворює форми специфічні для бірж в єдину схему. Binance називає поле best bid b, Kraken — b теж, але з іншою семантикою. OKX використовує наносекунди для timestamp, Bitfinex — мілісекунди.
Distribution Layer — публікує нормалізовані події в шину (Redis Streams, Kafka) для downstream-споживачів.
Нормалізований формат
Універсальна схема ticker-події:
{
"exchange": "binance",
"symbol": "BTC/USDT",
"timestamp": 1704067200000,
"received_at": 1704067200045,
"bid": 43250.50,
"ask": 43251.00,
"last": 43250.75,
"volume_24h": 28450.123,
"open_24h": 42800.00
}
Поле received_at — час отримання даних агрегатором, відмінне від біржевого timestamp. Різниця між ними — мережева затримка до біржі, корисна метрика для моніторингу.
Робота з rate limits
Кожна біржа обмежує кількість запитів. WebSocket-підключення зазвичай не обмежені за кількістю повідомлень, але є ліміти на кількість підписок в одному з'єднанні (Binance: 1024 потоків на з'єднання) і швидкість відправлення команд підписки.
Правильний коннектор управляє чергою підписок з урахуванням цих обмежень:
class ExchangeConnector:
MAX_SUBSCRIPTIONS_PER_CONN = 1000
SUBSCRIPTION_RATE_LIMIT = 10 # per second
async def subscribe_symbols(self, symbols: list[str]):
# Розбиваємо на чанки за розміром з'єднання
for chunk in chunks(symbols, self.MAX_SUBSCRIPTIONS_PER_CONN):
conn = await self.create_connection()
# Rate-limit підписи
async with self.rate_limiter:
await conn.subscribe(chunk)
Reconnection та data gaps
WebSocket-з'єднання розриваються. Біржі іноді посилають "ping" і очікують "pong" в межах строго визначеного часу (Binance: 10 хвилин без pong = disconnect). Правильний коннектор:
- Автоматично відповідає на ping-frames
- Відслідковує час останнього повідомлення (heartbeat check)
- При розриві — exponential backoff reconnect з jitter
- При відновленні — переписується на всі символи
- Публікує подію
GAP_DETECTEDз часовим діапазоном відсутніх даних
Downstream-споживачі повинні корректно обробляти GAP-події, особливо якщо використовують вальні агрегації.
Latency та синхронізація часу
При порівнянні цін на різних біржах критично важлива синхронізація часу. Системний час сервера повинен синхронізуватися через NTP з точністю до 1–5ms. Більшість cloud-провайдерів забезпечують точний NTP, але це потрібно перевірити.
Біржі мають різну мережеву затримку — від 1ms (co-location) до 50–100ms для звичайного сервера. Для арбітражних стратегій важливо враховувати цю затримку.
Моніторинг якості даних
| Метрика | Описання |
|---|---|
| Message rate | Повідомлення в секунду за біржу/символ |
| Latency (p50/p99) | Затримка від біржі до агрегатора |
| Gap rate | Кількість розривів у даних на годину |
| Reconnect count | Частота переподключень |
| Stale data alerts | Символи без оновлень > X секунд |
Prometheus + Grafana — стандартний стек для цього моніторингу.
Бібліотеки та готові рішення
CCXT Pro — WebSocket розширення CCXT з підтримкою 50+ бірж. Хороша відправна точка для прототипу, але для production часто потрібні кастомні коннектори через продуктивність і специфічні вимоги.
cryptofeed (Python) — спеціалізована бібліотека для крипто-feeds з підтримкою 30+ бірж, нормалізацією даних та бендами для Kafka, Redis, RabbitMQ, PostgreSQL.
Для high-performance систем (< 1ms latency) пишемо коннектори на Rust або Go з нуля.







