Разработка pipeline обработки tick-данных для ML

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка pipeline обработки tick-данных для ML
Сложный
~1-2 недели
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка pipeline обработки tick-данных для ML

Tick-данные — запись каждой отдельной сделки: цена, объём, сторона (buy/sell), timestamp. Это самый гранулярный уровень рыночных данных, содержащий информацию, которая полностью теряется при агрегации в OHLCV свечи.

Сбор tick-данных

import asyncio
import websockets
import json
from datetime import datetime
import asyncpg

class TickDataCollector:
    def __init__(self, symbol, db_pool):
        self.symbol = symbol
        self.db_pool = db_pool
        self.buffer = []
        self.buffer_size = 1000  # Flush каждые 1000 тиков
    
    async def connect_binance_trades(self):
        url = f"wss://stream.binance.com:9443/ws/{self.symbol.lower()}@aggTrade"
        
        async with websockets.connect(url, ping_interval=20) as ws:
            async for msg in ws:
                trade = json.loads(msg)
                tick = {
                    'symbol': self.symbol,
                    'timestamp': datetime.fromtimestamp(trade['T'] / 1000),
                    'price': float(trade['p']),
                    'quantity': float(trade['q']),
                    'is_buyer_maker': trade['m'],  # True = продавец агрессор
                    'trade_id': trade['a']
                }
                
                self.buffer.append(tick)
                
                if len(self.buffer) >= self.buffer_size:
                    await self.flush_to_db()
    
    async def flush_to_db(self):
        async with self.db_pool.acquire() as conn:
            await conn.executemany(
                """INSERT INTO trades (symbol, timestamp, price, quantity, is_buyer_maker, trade_id)
                   VALUES ($1, $2, $3, $4, $5, $6)""",
                [(t['symbol'], t['timestamp'], t['price'], t['quantity'],
                  t['is_buyer_maker'], t['trade_id']) for t in self.buffer]
            )
        self.buffer.clear()

Хранение: ClickHouse для tick-данных

CREATE TABLE trades (
    timestamp DateTime64(3),
    symbol LowCardinality(String),
    price Float64,
    quantity Float32,
    is_buyer_maker UInt8,
    trade_id UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (symbol, timestamp)
TTL timestamp + INTERVAL 365 DAY
SETTINGS index_granularity = 8192;

ClickHouse вставляет 500K+ строк/секунду и обеспечивает быстрые агрегации (GROUP BY timestamp за месяц данных — секунды).

Агрегация tick-данных

Custom OHLCV: агрегируем тики в свечи любого таймфрейма:

def ticks_to_ohlcv(ticks_df, freq='1h'):
    ohlcv = ticks_df.resample(freq, on='timestamp').agg({
        'price': ['first', 'max', 'min', 'last'],
        'quantity': 'sum'
    })
    ohlcv.columns = ['open', 'high', 'low', 'close', 'volume']
    return ohlcv

Volume bars: вместо временных интервалов — N единиц объёма. Адаптируется к активности рынка:

def create_volume_bars(ticks_df, bar_volume=10):
    """Каждый бар = bar_volume единиц актива"""
    bars = []
    current_bar = {'open': None, 'high': -np.inf, 'low': np.inf,
                   'close': None, 'volume': 0, 'start_time': None}
    
    for _, tick in ticks_df.iterrows():
        if current_bar['open'] is None:
            current_bar['open'] = tick['price']
            current_bar['start_time'] = tick['timestamp']
        
        current_bar['high'] = max(current_bar['high'], tick['price'])
        current_bar['low'] = min(current_bar['low'], tick['price'])
        current_bar['close'] = tick['price']
        current_bar['volume'] += tick['quantity']
        
        if current_bar['volume'] >= bar_volume:
            bars.append(current_bar.copy())
            current_bar = {'open': None, 'high': -np.inf, 'low': np.inf,
                          'close': None, 'volume': 0, 'start_time': None}
    
    return pd.DataFrame(bars)

Dollar bars: аналогично, но N USD объёма. Imbalance bars: бар закрывается при накоплении N дисбаланса buy/sell.

Feature engineering из тиков

def create_tick_features(ticks_df, window_ticks=[50, 200, 1000]):
    features = []
    
    for i in range(max(window_ticks), len(ticks_df)):
        row_features = {}
        
        for window in window_ticks:
            window_data = ticks_df.iloc[i-window:i]
            
            # Buy/Sell pressure
            buy_vol = window_data[~window_data['is_buyer_maker']]['quantity'].sum()
            sell_vol = window_data[window_data['is_buyer_maker']]['quantity'].sum()
            row_features[f'flow_imbalance_{window}'] = (
                (buy_vol - sell_vol) / (buy_vol + sell_vol + 1e-8)
            )
            
            # Trade frequency
            row_features[f'trade_frequency_{window}'] = (
                window / (window_data['timestamp'].max() - 
                         window_data['timestamp'].min()).total_seconds() + 1e-8
            )
            
            # Average trade size
            row_features[f'avg_trade_size_{window}'] = window_data['quantity'].mean()
            row_features[f'large_trade_ratio_{window}'] = (
                (window_data['quantity'] > window_data['quantity'].quantile(0.9)).mean()
            )
            
            # VWAP deviation
            vwap = (window_data['price'] * window_data['quantity']).sum() / window_data['quantity'].sum()
            row_features[f'vwap_deviation_{window}'] = (
                ticks_df.iloc[i]['price'] - vwap
            ) / vwap
        
        features.append(row_features)
    
    return pd.DataFrame(features)

Trade size distribution анализ

Крупные сделки могут сигнализировать о institutional activity:

def analyze_trade_size_distribution(ticks_df, percentile_threshold=99):
    large_threshold = ticks_df['quantity'].quantile(percentile_threshold / 100)
    
    # Сравниваем price impact крупных vs мелких сделок
    large_trades = ticks_df[ticks_df['quantity'] > large_threshold]
    small_trades = ticks_df[ticks_df['quantity'] <= large_threshold]
    
    # Вычисляем price impact: изменение mid-price после сделки
    # (требует order book данных для точности)
    
    large_buy_pct = (~large_trades['is_buyer_maker']).mean()
    small_buy_pct = (~small_trades['is_buyer_maker']).mean()
    
    return {
        'large_trade_threshold': large_threshold,
        'large_buy_pct': large_buy_pct,
        'small_buy_pct': small_buy_pct,
        'institutional_directional_bias': large_buy_pct - 0.5  # > 0 = buying bias
    }

Realtime streaming pipeline

Binance WebSocket → asyncio consumer → buffer → ClickHouse batch insert
                                     → Redis sorted set (last 10k ticks)
                                     → Feature calculator (sliding window)
                                     → ML inference
                                     → Signal output

Latency: от получения тика до выходного сигнала — < 10ms для Python asyncio pipeline.

Разрабатываем полный tick-data pipeline: WebSocket collector, ClickHouse storage, custom bar types (volume/dollar/imbalance), feature engineering, realtime ML inference.