Разработка системы нормализации данных с бирж
Каждая криптобиржа — отдельная вселенная с собственными соглашениями об именовании, форматах чисел, единицах времени и семантике полей. BTC/USDT на Binance называется BTCUSDT, на Kraken — XBT/USDT, на Bitfinex — tBTCUST. Нормализация — это слой, который прячет эту несовместимость за единым интерфейсом.
Что нужно нормализовать
Символы и пары. Каждая биржа имеет собственные соглашения. Нормализованный формат — BASE/QUOTE в верхнем регистре: BTC/USDT, ETH/BTC. Биржевые символы хранятся в маппинге с возможностью обратного преобразования.
Timestamps. Binance возвращает миллисекунды, некоторые биржи — секунды, OKX — наносекунды. Нормализованный формат — миллисекунды UTC, хранимые как int64.
Числа. REST API часто возвращает числа как строки ("43250.50"), некоторые биржи теряют trailing zeros. Нормализованный формат — Decimal с явной точностью, зависящей от инструмента.
Стороны ордера. BUY/SELL, buy/sell, b/s, 1/-1 — всё это встречается. Нормализованный формат — enum BUY | SELL.
Статусы ордеров. У каждой биржи свои статусы. Нормализованный маппинг:
| Биржа | Raw | Normalized |
|---|---|---|
| Binance | NEW, PARTIALLY_FILLED, FILLED, CANCELED |
OPEN, PARTIAL, FILLED, CANCELLED |
| Bybit | Created, New, PartiallyFilled, Filled |
OPEN, OPEN, PARTIAL, FILLED |
| OKX | live, partially_filled, filled, canceled |
OPEN, PARTIAL, FILLED, CANCELLED |
Архитектура нормализатора
Нормализатор реализуется как набор биржеспецифичных адаптеров с общим интерфейсом:
from abc import ABC, abstractmethod
from decimal import Decimal
class ExchangeNormalizer(ABC):
@abstractmethod
def normalize_symbol(self, raw_symbol: str) -> str:
"""Преобразует биржевой символ в нормализованный формат BASE/QUOTE"""
@abstractmethod
def normalize_ticker(self, raw_data: dict) -> NormalizedTicker:
"""Нормализует ticker данные"""
@abstractmethod
def normalize_order(self, raw_data: dict) -> NormalizedOrder:
"""Нормализует данные ордера"""
class BinanceNormalizer(ExchangeNormalizer):
SYMBOL_MAP = {
"BTCUSDT": "BTC/USDT",
"ETHUSDT": "ETH/USDT",
# ... из API /api/v3/exchangeInfo
}
def normalize_ticker(self, raw: dict) -> NormalizedTicker:
return NormalizedTicker(
exchange="binance",
symbol=self.normalize_symbol(raw["s"]),
timestamp=int(raw["T"]),
price=Decimal(raw["c"]),
volume_24h=Decimal(raw["v"]),
)
Динамическая загрузка маппинга символов
Жёсткий маппинг символов в коде — плохая идея: биржи добавляют новые пары ежедневно. Правильный подход — загружать маппинг из Exchange Info API при старте и периодически обновлять:
async def load_symbol_map(self):
exchange_info = await self.rest_client.get("/api/v3/exchangeInfo")
self.symbol_map = {
s["symbol"]: f"{s['baseAsset']}/{s['quoteAsset']}"
for s in exchange_info["symbols"]
if s["status"] == "TRADING"
}
# Инвертированный маппинг для обратного преобразования
self.reverse_map = {v: k for k, v in self.symbol_map.items()}
Валидация нормализованных данных
После нормализации важно валидировать результат. Отрицательные цены, нулевые объёмы, timestamp в будущем — всё это признаки проблем с источником данных:
def validate_ticker(ticker: NormalizedTicker) -> list[str]:
errors = []
if ticker.price <= 0:
errors.append(f"Invalid price: {ticker.price}")
if ticker.timestamp > now_ms() + 5000:
errors.append(f"Future timestamp: {ticker.timestamp}")
if ticker.bid and ticker.ask and ticker.bid >= ticker.ask:
errors.append(f"Crossed book: bid={ticker.bid} ask={ticker.ask}")
return errors
Невалидные данные логируются и отбрасываются, не попадая в downstream-системы.
Тестирование нормализатора
Unit-тесты с реальными примерами raw-данных от каждой биржи — обязательны. Биржи иногда меняют формат API без предупреждения. Набор фиксированных fixtures с ожидаемыми нормализованными результатами позволяет быстро обнаружить регрессию:
def test_binance_normalizer():
raw = {"s": "BTCUSDT", "c": "43250.50", "v": "28450.12", "T": 1704067200000}
result = BinanceNormalizer().normalize_ticker(raw)
assert result.symbol == "BTC/USDT"
assert result.price == Decimal("43250.50")
assert result.exchange == "binance"
Дополнительно — integration тесты с live API биржей в sandbox-режиме, запускаемые ежедневно в CI для раннего обнаружения изменений в API.







