Разработка pipeline обработки tick-данных для ML
Tick-данные — запись каждой отдельной сделки: цена, объём, сторона (buy/sell), timestamp. Это самый гранулярный уровень рыночных данных, содержащий информацию, которая полностью теряется при агрегации в OHLCV свечи.
Сбор tick-данных
import asyncio
import websockets
import json
from datetime import datetime
import asyncpg
class TickDataCollector:
def __init__(self, symbol, db_pool):
self.symbol = symbol
self.db_pool = db_pool
self.buffer = []
self.buffer_size = 1000 # Flush каждые 1000 тиков
async def connect_binance_trades(self):
url = f"wss://stream.binance.com:9443/ws/{self.symbol.lower()}@aggTrade"
async with websockets.connect(url, ping_interval=20) as ws:
async for msg in ws:
trade = json.loads(msg)
tick = {
'symbol': self.symbol,
'timestamp': datetime.fromtimestamp(trade['T'] / 1000),
'price': float(trade['p']),
'quantity': float(trade['q']),
'is_buyer_maker': trade['m'], # True = продавец агрессор
'trade_id': trade['a']
}
self.buffer.append(tick)
if len(self.buffer) >= self.buffer_size:
await self.flush_to_db()
async def flush_to_db(self):
async with self.db_pool.acquire() as conn:
await conn.executemany(
"""INSERT INTO trades (symbol, timestamp, price, quantity, is_buyer_maker, trade_id)
VALUES ($1, $2, $3, $4, $5, $6)""",
[(t['symbol'], t['timestamp'], t['price'], t['quantity'],
t['is_buyer_maker'], t['trade_id']) for t in self.buffer]
)
self.buffer.clear()
Хранение: ClickHouse для tick-данных
CREATE TABLE trades (
timestamp DateTime64(3),
symbol LowCardinality(String),
price Float64,
quantity Float32,
is_buyer_maker UInt8,
trade_id UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (symbol, timestamp)
TTL timestamp + INTERVAL 365 DAY
SETTINGS index_granularity = 8192;
ClickHouse вставляет 500K+ строк/секунду и обеспечивает быстрые агрегации (GROUP BY timestamp за месяц данных — секунды).
Агрегация tick-данных
Custom OHLCV: агрегируем тики в свечи любого таймфрейма:
def ticks_to_ohlcv(ticks_df, freq='1h'):
ohlcv = ticks_df.resample(freq, on='timestamp').agg({
'price': ['first', 'max', 'min', 'last'],
'quantity': 'sum'
})
ohlcv.columns = ['open', 'high', 'low', 'close', 'volume']
return ohlcv
Volume bars: вместо временных интервалов — N единиц объёма. Адаптируется к активности рынка:
def create_volume_bars(ticks_df, bar_volume=10):
"""Каждый бар = bar_volume единиц актива"""
bars = []
current_bar = {'open': None, 'high': -np.inf, 'low': np.inf,
'close': None, 'volume': 0, 'start_time': None}
for _, tick in ticks_df.iterrows():
if current_bar['open'] is None:
current_bar['open'] = tick['price']
current_bar['start_time'] = tick['timestamp']
current_bar['high'] = max(current_bar['high'], tick['price'])
current_bar['low'] = min(current_bar['low'], tick['price'])
current_bar['close'] = tick['price']
current_bar['volume'] += tick['quantity']
if current_bar['volume'] >= bar_volume:
bars.append(current_bar.copy())
current_bar = {'open': None, 'high': -np.inf, 'low': np.inf,
'close': None, 'volume': 0, 'start_time': None}
return pd.DataFrame(bars)
Dollar bars: аналогично, но N USD объёма. Imbalance bars: бар закрывается при накоплении N дисбаланса buy/sell.
Feature engineering из тиков
def create_tick_features(ticks_df, window_ticks=[50, 200, 1000]):
features = []
for i in range(max(window_ticks), len(ticks_df)):
row_features = {}
for window in window_ticks:
window_data = ticks_df.iloc[i-window:i]
# Buy/Sell pressure
buy_vol = window_data[~window_data['is_buyer_maker']]['quantity'].sum()
sell_vol = window_data[window_data['is_buyer_maker']]['quantity'].sum()
row_features[f'flow_imbalance_{window}'] = (
(buy_vol - sell_vol) / (buy_vol + sell_vol + 1e-8)
)
# Trade frequency
row_features[f'trade_frequency_{window}'] = (
window / (window_data['timestamp'].max() -
window_data['timestamp'].min()).total_seconds() + 1e-8
)
# Average trade size
row_features[f'avg_trade_size_{window}'] = window_data['quantity'].mean()
row_features[f'large_trade_ratio_{window}'] = (
(window_data['quantity'] > window_data['quantity'].quantile(0.9)).mean()
)
# VWAP deviation
vwap = (window_data['price'] * window_data['quantity']).sum() / window_data['quantity'].sum()
row_features[f'vwap_deviation_{window}'] = (
ticks_df.iloc[i]['price'] - vwap
) / vwap
features.append(row_features)
return pd.DataFrame(features)
Trade size distribution анализ
Крупные сделки могут сигнализировать о institutional activity:
def analyze_trade_size_distribution(ticks_df, percentile_threshold=99):
large_threshold = ticks_df['quantity'].quantile(percentile_threshold / 100)
# Сравниваем price impact крупных vs мелких сделок
large_trades = ticks_df[ticks_df['quantity'] > large_threshold]
small_trades = ticks_df[ticks_df['quantity'] <= large_threshold]
# Вычисляем price impact: изменение mid-price после сделки
# (требует order book данных для точности)
large_buy_pct = (~large_trades['is_buyer_maker']).mean()
small_buy_pct = (~small_trades['is_buyer_maker']).mean()
return {
'large_trade_threshold': large_threshold,
'large_buy_pct': large_buy_pct,
'small_buy_pct': small_buy_pct,
'institutional_directional_bias': large_buy_pct - 0.5 # > 0 = buying bias
}
Realtime streaming pipeline
Binance WebSocket → asyncio consumer → buffer → ClickHouse batch insert
→ Redis sorted set (last 10k ticks)
→ Feature calculator (sliding window)
→ ML inference
→ Signal output
Latency: от получения тика до выходного сигнала — < 10ms для Python asyncio pipeline.
Разрабатываем полный tick-data pipeline: WebSocket collector, ClickHouse storage, custom bar types (volume/dollar/imbalance), feature engineering, realtime ML inference.







