Разработка системы хранения order book snapshots
Order book — самые информативные и самые сложные в хранении биржевые данные. Полный стакан BTC/USDT на Binance содержит 5000 уровней с обеих сторон, обновляется несколько раз в секунду и генерирует сотни мегабайт данных в час. Наивный подход к хранению приводит к катастрофическому росту объёмов; правильная система балансирует полноту данных с практическими ограничениями.
Типы order book данных
Прежде чем проектировать хранилище, важно понять, какие данные реально нужны:
Full snapshots — полный срез стакана в момент времени. Объёмные (несколько КБ на снапшот), но позволяют восстановить точное состояние рынка. При частоте 1 снапшот/сек по 100 символам — ~100 GB в сутки.
Depth snapshots — первые N уровней (обычно 5, 10, 20). Достаточно для большинства стратегий, занимают в 50–250 раз меньше места.
Order book diffs — только изменения (добавление/удаление/изменение уровней). Минимальный объём, но требуют полного снапшота для восстановления состояния.
Mid-price и spread — агрегированные производные. Микроскопический объём, подходит для долгосрочного анализа.
На практике системы хранят комбинацию: полные снапшоты раз в минуту для восстановления, diffs — для секундного разрешения между снапшотами.
Формат хранения: Delta Encoding
Дифференциальное хранение критично для уменьшения объёмов. Идея: хранить не полный стакан, а изменения относительно предыдущего состояния.
Snapshot @ T=0:
bids: [(43250.0, 1.5), (43249.5, 2.0), (43249.0, 0.8)]
asks: [(43251.0, 1.2), (43251.5, 3.0), (43252.0, 0.5)]
Diff @ T=1 (только изменения):
bids_updated: [(43250.0, 2.1)] # объём изменился
bids_removed: [(43249.5, 0)] # уровень исчез
bids_added: [(43248.5, 1.0)] # новый уровень
asks_updated: []
asks_removed: []
asks_added: [(43251.75, 0.3)]
Полный снапшот: ~10 KB. Diff: ~200 байт. При 5 обновлениях в секунду и снапшоте раз в минуту — 300 diffs + 1 snapshot = ~60 KB/мин вместо 3 MB/мин.
Схема базы данных
Используем ClickHouse с custom serialization:
-- Полные снапшоты (раз в минуту)
CREATE TABLE orderbook_snapshots (
exchange LowCardinality(String),
symbol LowCardinality(String),
snapshot_time DateTime64(3, 'UTC'),
depth UInt16,
bids Array(Tuple(Decimal(24,8), Decimal(24,8))), -- [(price, qty)]
asks Array(Tuple(Decimal(24,8), Decimal(24,8)))
)
ENGINE = MergeTree()
PARTITION BY (exchange, toYYYYMM(snapshot_time))
ORDER BY (exchange, symbol, snapshot_time);
-- Дельты между снапшотами
CREATE TABLE orderbook_diffs (
exchange LowCardinality(String),
symbol LowCardinality(String),
diff_time DateTime64(3, 'UTC'),
first_update_id UInt64,
last_update_id UInt64,
bids_changes Array(Tuple(Decimal(24,8), Decimal(24,8))), -- qty=0 означает удаление
asks_changes Array(Tuple(Decimal(24,8), Decimal(24,8)))
)
ENGINE = MergeTree()
PARTITION BY (exchange, toYYYYMM(diff_time))
ORDER BY (exchange, symbol, diff_time);
-- Агрегированные производные (mid-price, spread) для быстрого доступа
CREATE TABLE orderbook_metrics (
exchange LowCardinality(String),
symbol LowCardinality(String),
ts DateTime64(3, 'UTC'),
mid_price Decimal(24,8),
spread Decimal(24,8),
spread_bps Decimal(10,4),
bid_1 Decimal(24,8),
ask_1 Decimal(24,8),
bid_vol_10 Decimal(24,8), -- суммарный объём в топ-10 bids
ask_vol_10 Decimal(24,8),
imbalance Decimal(10,6) -- (bid_vol - ask_vol) / (bid_vol + ask_vol)
)
ENGINE = MergeTree()
PARTITION BY (exchange, toYYYYMM(ts))
ORDER BY (exchange, symbol, ts);
Восстановление состояния стакана
Ключевая операция — восстановление стакана на произвольный момент времени:
class OrderBookReplay:
def __init__(self, storage: OrderBookStorage):
self.storage = storage
async def reconstruct_at(self, exchange: str, symbol: str, target_ts: int) -> OrderBook:
# 1. Найти последний снапшот ДО target_ts
snapshot = await self.storage.get_last_snapshot_before(
exchange, symbol, target_ts
)
if not snapshot:
raise ValueError("No snapshot available before target timestamp")
# 2. Загрузить все дельты от снапшота до target_ts
diffs = await self.storage.get_diffs(
exchange, symbol,
from_ts=snapshot.timestamp,
to_ts=target_ts
)
# 3. Применить дельты последовательно
book = OrderBook.from_snapshot(snapshot)
for diff in diffs:
book.apply_diff(diff)
return book
class OrderBook:
def apply_diff(self, diff: OrderBookDiff):
for price, qty in diff.bids_changes:
if qty == 0:
self.bids.pop(price, None)
else:
self.bids[price] = qty
for price, qty in diff.asks_changes:
if qty == 0:
self.asks.pop(price, None)
else:
self.asks[price] = qty
Важен порядок применения дельт и проверка через update_id — у Binance каждый diff имеет lastUpdateId, следующий diff должен иметь firstUpdateId = lastUpdateId + 1. Разрыв в этой последовательности означает пропущенные данные.
Сжатие и оптимизация
Стакан содержит много похожих чисел (цены располагаются близко друг к другу). Перед записью в ClickHouse эффективно применить:
Delta encoding для цен — хранить не абсолютные цены, а разницу от лучшего bid/ask в базисных пунктах (bps). Целые числа сжимаются лучше дробных.
Binary serialization — вместо JSON использовать Protocol Buffers или MessagePack. Reduction 3–5x по размеру + значительно быстрее сериализация/десериализация.
ClickHouse compression — алгоритм ZSTD для данных типа Float/Decimal даёт лучшее сжатие, чем дефолтный LZ4:
CREATE TABLE orderbook_snapshots (...)
ENGINE = MergeTree()
...
SETTINGS
compress_block_size = 1048576,
default_codec = ZSTD(3);
Потоковая запись
Ingestion pipeline для снапшотов и дельт работает параллельно:
class OrderBookIngester:
SNAPSHOT_INTERVAL = 60 # секунды
DIFF_BATCH_SIZE = 100
def __init__(self, storage):
self.storage = storage
self.diff_buffer = []
self.last_snapshot_time = 0
async def on_orderbook_update(self, book: OrderBook, diff: OrderBookDiff):
now = time.time()
# Каждые SNAPSHOT_INTERVAL секунд сохраняем полный снапшот
if now - self.last_snapshot_time >= self.SNAPSHOT_INTERVAL:
await self.storage.save_snapshot(book.to_snapshot())
self.last_snapshot_time = now
# Дельты буферизируем и сохраняем батчами
self.diff_buffer.append(diff)
if len(self.diff_buffer) >= self.DIFF_BATCH_SIZE:
await self.storage.save_diffs(self.diff_buffer)
self.diff_buffer.clear()
Аналитические запросы
После накопления данных открываются интересные возможности для анализа:
-- Средний спред BTC/USDT по часам за месяц
SELECT
toStartOfHour(ts) AS hour,
avg(spread_bps) AS avg_spread_bps,
avg(imbalance) AS avg_imbalance
FROM orderbook_metrics
WHERE exchange = 'binance'
AND symbol = 'BTC/USDT'
AND ts BETWEEN '2024-01-01' AND '2024-02-01'
GROUP BY hour
ORDER BY hour;
-- Корреляция imbalance с последующим движением цены
WITH book AS (
SELECT ts, imbalance, mid_price
FROM orderbook_metrics
WHERE exchange = 'binance' AND symbol = 'BTC/USDT'
),
future AS (
SELECT
b.ts,
b.imbalance,
(f.mid_price - b.mid_price) / b.mid_price * 10000 AS fwd_return_bps
FROM book b
ASOF JOIN book f ON b.symbol = f.symbol
AND f.ts BETWEEN b.ts + INTERVAL 1 MINUTE AND b.ts + INTERVAL 2 MINUTE
)
SELECT
round(imbalance, 1) AS imbalance_bucket,
avg(fwd_return_bps) AS avg_1min_return_bps,
count() AS count
FROM future
GROUP BY imbalance_bucket
ORDER BY imbalance_bucket;
Мониторинг и качество данных
Критически важно отслеживать разрывы в последовательностях дельт. Система валидации сравнивает lastUpdateId каждого diff с firstUpdateId следующего и алертит при обнаружении пробелов. Gap в данных делает восстановление стакана между снапшотами невозможным.
Метрики для мониторинга: частота записи снапшотов на символ, задержка от биржевого timestamp до записи в ClickHouse, размер буфера дельт, процент пропущенных дельт.







