Разработка pipeline обработки order book данных для ML

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка pipeline обработки order book данных для ML
Сложный
~1-2 недели
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка pipeline обработки order book данных для ML

Order book данные — богатейший источник информации о рыночной структуре. Полный стакан заявок содержит информацию об ожидаемом спросе/предложении, которая недоступна из OHLCV данных. Однако объём и структура этих данных требуют специализированного pipeline.

Структура order book д��нных

Level 1 (Top of Book): лучший bid и ask с объёмами. Минимальный объём, максимальная актуальность.

Level 2 (Full Depth): все уровни стакана с объёмами. Binance предоставляет глубину 5000 уровней. Обновляется через WebSocket diff stream.

Level 3 (Full Order Feed): каждый отдельный ордер с ID. Доступен не на всех биржах, максимальная детальность.

Сбор и хранение

import asyncio
import websockets
import json
from collections import deque
import numpy as np

class OrderBookCollector:
    def __init__(self, symbol, max_depth=100):
        self.symbol = symbol
        self.bids = {}  # price -> quantity
        self.asks = {}
        self.max_depth = max_depth
        self.snapshots = deque(maxlen=10000)
    
    async def connect_binance(self):
        url = f"wss://stream.binance.com:9443/ws/{self.symbol.lower()}@depth@100ms"
        
        async with websockets.connect(url) as ws:
            # Сначала получаем snapshot через REST
            await self.fetch_snapshot()
            
            async for msg in ws:
                data = json.loads(msg)
                self.process_diff_update(data)
                
                # Сохраняем снэпшот каждые N updates
                if len(self.snapshots) % 10 == 0:
                    self.save_snapshot()
    
    def process_diff_update(self, data):
        for bid_level in data.get('b', []):
            price, qty = float(bid_level[0]), float(bid_level[1])
            if qty == 0:
                self.bids.pop(price, None)
            else:
                self.bids[price] = qty
        
        for ask_level in data.get('a', []):
            price, qty = float(ask_level[0]), float(ask_level[1])
            if qty == 0:
                self.asks.pop(price, None)
            else:
                self.asks[price] = qty
    
    def get_features(self, n_levels=20):
        """Извлекаем ML features из текущего состояния стакана"""
        sorted_bids = sorted(self.bids.items(), reverse=True)[:n_levels]
        sorted_asks = sorted(self.asks.items())[:n_levels]
        
        if not sorted_bids or not sorted_asks:
            return None
        
        mid_price = (sorted_bids[0][0] + sorted_asks[0][0]) / 2
        
        features = {}
        
        # Объёмы на разных уровнях
        for i, (price, qty) in enumerate(sorted_bids[:10]):
            features[f'bid_qty_{i}'] = qty
            features[f'bid_dist_{i}'] = (mid_price - price) / mid_price
        
        for i, (price, qty) in enumerate(sorted_asks[:10]):
            features[f'ask_qty_{i}'] = qty
            features[f'ask_dist_{i}'] = (price - mid_price) / mid_price
        
        # Order Book Imbalance (OBI)
        bid_vol_n = sum(qty for _, qty in sorted_bids[:5])
        ask_vol_n = sum(qty for _, qty in sorted_asks[:5])
        features['obi_5'] = (bid_vol_n - ask_vol_n) / (bid_vol_n + ask_vol_n + 1e-8)
        
        bid_vol_20 = sum(qty for _, qty in sorted_bids[:20])
        ask_vol_20 = sum(qty for _, qty in sorted_asks[:20])
        features['obi_20'] = (bid_vol_20 - ask_vol_20) / (bid_vol_20 + ask_vol_20 + 1e-8)
        
        # Weighted mid price
        features['wmid'] = (
            sorted_bids[0][0] * sorted_asks[0][1] + 
            sorted_asks[0][0] * sorted_bids[0][1]
        ) / (sorted_bids[0][1] + sorted_asks[0][1])
        
        # Spread
        features['spread'] = (sorted_asks[0][0] - sorted_bids[0][0]) / mid_price
        
        # Depth asymmetry at multiple levels
        for n in [5, 10, 20]:
            bid_depth = sum(qty for _, qty in sorted_bids[:n])
            ask_depth = sum(qty for _, qty in sorted_asks[:n])
            features[f'depth_ratio_{n}'] = bid_depth / max(ask_depth, 1e-8)
        
        return features

Order Book Imbalance (OBI) как торговый сигнал

OBI — наиболее исследованный признак из order book для краткосрочного прогнозирования:

def calculate_multi_level_obi(orderbook, levels=[1, 5, 10, 20]):
    """Расчёт OBI на разных глубинах"""
    obi_features = {}
    
    sorted_bids = sorted(orderbook['bids'], reverse=True)
    sorted_asks = sorted(orderbook['asks'])
    
    for n in levels:
        bid_vol = sum(qty for _, qty in sorted_bids[:n])
        ask_vol = sum(qty for _, qty in sorted_asks[:n])
        obi_features[f'obi_{n}'] = (bid_vol - ask_vol) / (bid_vol + ask_vol + 1e-8)
    
    # OBI slope (изменение OBI за последние K обновлений)
    return obi_features

Хранение order book данных

Полный L2 order book — огромный объём данных. Стратегии хранения:

Snapshots: полный стакан каждые N секунд (например, каждые 100ms). Для 20 уровней × 2 стороны × 2 значения = 80 float значений. При 10 обновлений/сек × 86400 сек = 69M записей/день.

ClickHouse — идеально для order book данных: высокая скорость записи, эффективное колоночное хранение, быстрые агрегации.

CREATE TABLE order_book_snapshots (
    timestamp DateTime64(3),  -- миллисекундная точность
    symbol LowCardinality(String),
    exchange LowCardinality(String),
    
    -- Bid levels (price + quantity для 20 уровней)
    bid_price_0 Float32, bid_qty_0 Float32,
    bid_price_1 Float32, bid_qty_1 Float32,
    -- ... до bid_price_19, bid_qty_19
    
    -- Ask levels аналогично
    ask_price_0 Float32, ask_qty_0 Float32,
    -- ...
    
    -- Pre-calculated features
    spread Float32,
    obi_5 Float32,
    obi_20 Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (symbol, timestamp)
TTL timestamp + INTERVAL 90 DAY;  -- хранение 90 дней

Feature engineering из order book

def engineer_orderbook_features(snapshots_df, window_sizes=[10, 50, 100]):
    """
    snapshots_df: DataFrame с историей снэпшотов стакана
    """
    features = snapshots_df.copy()
    
    # Производные от OBI
    for window in window_sizes:
        # Скользящее среднее OBI
        features[f'obi_5_ma_{window}'] = features['obi_5'].rolling(window).mean()
        # Изменение OBI
        features[f'obi_5_delta_{window}'] = features['obi_5'].diff(window)
        # Волатильность OBI
        features[f'obi_5_std_{window}'] = features['obi_5'].rolling(window).std()
    
    # Volume imbalance accumulation (COF - Cumulative Order Flow)
    features['cof'] = features['obi_5'].cumsum()
    features['cof_ma'] = features['cof'].rolling(100).mean()
    features['cof_deviation'] = features['cof'] - features['cof_ma']
    
    # Spread dynamics
    features['spread_ma'] = features['spread'].rolling(50).mean()
    features['spread_ratio'] = features['spread'] / features['spread_ma']
    
    # Depth stability (изменение глубины стакана)
    features['depth_change'] = features['depth_ratio_10'].diff(10)
    
    return features

Краткосрочный прогноз mid-price из OB

Задача: предсказать изменение mid-price через N обновлений (например, через 10 обновлений стакана ≈ 1 секунда):

def create_training_data(snapshots_df, prediction_horizon=10):
    features = engineer_orderbook_features(snapshots_df)
    
    # Target: знак изменения mid-price через horizon обновлений
    future_mid = snapshots_df['mid_price'].shift(-prediction_horizon)
    current_mid = snapshots_df['mid_price']
    target = np.sign(future_mid - current_mid)  # -1, 0, 1
    
    # Убираем строки с NaN
    valid_mask = features.notna().all(axis=1) & target.notna()
    
    return features[valid_mask], target[valid_mask]

Разрабатываем полный order book ML pipeline: WebSocket коллектор с инкрементальным обновлением, ClickHouse для хранения снэпшотов, feature engineering из OBI и depth данных, обучение краткосрочной прогностической модели (LightGBM/XGBoost) и realtime inference.