Разработка системы реального времени для ML-предсказаний
Обученная ML-модель ценна только если её предсказания доступны в нужный момент с минимальной задержкой. Система realtime ML predictions — это не просто «запустить модель», это полноценная инфраструктура с low-latency serving, мониторингом качества и автоматическим переключением моделей.
Архитектура realtime ML serving
Market Data Sources
│
▼
Feature Pipeline (sliding window calculation)
│
▼
Feature Store (Redis — hot features)
│
▼
ML Model Server (FastAPI + GPU/CPU inference)
│
▼
Prediction Cache (Redis — результаты)
│
├──► Trading Strategy (consume predictions)
├──► Dashboard (visualize)
└──► Monitoring (track accuracy)
Feature Pipeline для realtime
import asyncio
import numpy as np
from collections import deque
from datetime import datetime
class RealtimeFeaturePipeline:
def __init__(self, symbol, window_sizes=[60, 120, 240]):
self.symbol = symbol
self.window_sizes = window_sizes
self.max_window = max(window_sizes)
# Circular buffer для хранения последних N свечей
self.price_buffer = deque(maxlen=self.max_window + 10)
self.volume_buffer = deque(maxlen=self.max_window + 10)
self.high_buffer = deque(maxlen=self.max_window + 10)
self.low_buffer = deque(maxlen=self.max_window + 10)
def update(self, ohlcv):
self.price_buffer.append(ohlcv['close'])
self.volume_buffer.append(ohlcv['volume'])
self.high_buffer.append(ohlcv['high'])
self.low_buffer.append(ohlcv['low'])
def get_features(self):
if len(self.price_buffer) < self.max_window:
return None # Недостаточно данных
prices = np.array(self.price_buffer)
volumes = np.array(self.volume_buffer)
highs = np.array(self.high_buffer)
lows = np.array(self.low_buffer)
features = {}
for window in self.window_sizes:
p = prices[-window:]
v = volumes[-window:]
# Returns
features[f'return_{window}'] = (p[-1] - p[0]) / p[0]
features[f'return_std_{window}'] = np.std(np.diff(np.log(p)))
# Volume
features[f'vol_ratio_{window}'] = v[-1] / np.mean(v)
# RSI (упрощённо)
diffs = np.diff(p)
gains = diffs[diffs > 0].sum()
losses = -diffs[diffs < 0].sum()
rs = gains / (losses + 1e-8)
features[f'rsi_{window}'] = 100 - 100 / (1 + rs)
# Bollinger Band position
ma = np.mean(p)
std = np.std(p)
features[f'bb_pos_{window}'] = (p[-1] - ma) / (2 * std + 1e-8)
return features
ML Model Serving с FastAPI
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np
from typing import Optional
import time
app = FastAPI()
# Загружаем модели при старте
models = {
'lgbm_1h': joblib.load('models/lgbm_1h_v3.pkl'),
'lgbm_4h': joblib.load('models/lgbm_4h_v2.pkl'),
'lstm_24h': load_torch_model('models/lstm_24h_v1.pt')
}
scaler = joblib.load('models/feature_scaler.pkl')
class PredictionRequest(BaseModel):
symbol: str
features: dict
model_id: Optional[str] = 'lgbm_1h'
class PredictionResponse(BaseModel):
symbol: str
model_id: str
prediction: float
probability_up: float
probability_down: float
confidence: float
latency_ms: float
timestamp: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
start_time = time.time()
# Подготовка features
feature_vector = np.array(list(request.features.values())).reshape(1, -1)
feature_vector_scaled = scaler.transform(feature_vector)
# Inference
model = models.get(request.model_id, models['lgbm_1h'])
proba = model.predict_proba(feature_vector_scaled)[0]
latency = (time.time() - start_time) * 1000
return PredictionResponse(
symbol=request.symbol,
model_id=request.model_id,
prediction=float(proba[1] - proba[0]), # [-1, 1]
probability_up=float(proba[1]),
probability_down=float(proba[0]),
confidence=float(max(proba)),
latency_ms=latency,
timestamp=datetime.utcnow().isoformat()
)
Batching для оптимизации throughput
При большом количестве одновременных запросов — batching снижает overhead:
class BatchedPredictor:
def __init__(self, model, batch_size=32, max_wait_ms=10):
self.model = model
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.queue = asyncio.Queue()
async def predict(self, features):
future = asyncio.Future()
await self.queue.put((features, future))
return await future
async def batch_worker(self):
while True:
batch = []
try:
# Ждём первый запрос
item = await asyncio.wait_for(
self.queue.get(), timeout=self.max_wait_ms/1000
)
batch.append(item)
# Собираем оставшиеся запросы (без ожидания)
while len(batch) < self.batch_size and not self.queue.empty():
batch.append(self.queue.get_nowait())
except asyncio.TimeoutError:
continue
if batch:
# Inference для всего батча
features_batch = np.array([b[0] for b in batch])
predictions = self.model.predict_proba(features_batch)
# Возвращаем результаты
for i, (_, future) in enumerate(batch):
future.set_result(predictions[i])
Model registry и версионирование
import mlflow
from mlflow.tracking import MlflowClient
class ModelRegistry:
def __init__(self, tracking_uri):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def load_production_model(self, model_name):
"""Загружаем последнюю Production модель"""
model_version = self.client.get_latest_versions(
model_name, stages=['Production']
)[0]
model = mlflow.sklearn.load_model(
f"models:/{model_name}/{model_version.version}"
)
return model, model_version
def promote_to_production(self, model_name, version, metrics):
"""Переводим модель в Production если метрики достаточно хороши"""
if metrics['test_accuracy'] > 0.54 and metrics['sharpe'] > 1.2:
self.client.transition_model_version_stage(
model_name, version, 'Production'
)
return True
return False
Мониторинг quality predictionов
Realtime отслеживание качества предсказаний:
class PredictionMonitor:
def __init__(self, window=1000):
self.predictions = deque(maxlen=window)
self.actuals = deque(maxlen=window)
def log_prediction(self, pred_id, prediction, confidence, timestamp):
self.predictions.append({
'id': pred_id, 'prediction': prediction,
'confidence': confidence, 'timestamp': timestamp
})
def log_actual(self, pred_id, actual_return):
"""Логируем фактический результат после горизонта предсказания"""
self.actuals.append({'id': pred_id, 'actual': actual_return})
def calculate_metrics(self):
# Матчим predictions с actuals
matched = ... # join по id
directional_accuracy = np.mean(
np.sign(matched['prediction']) == np.sign(matched['actual'])
)
# Распределение confidence: высокая уверенность → более точные предсказания?
high_conf_accuracy = np.mean(
np.sign(matched[matched['confidence'] > 0.7]['prediction']) ==
np.sign(matched[matched['confidence'] > 0.7]['actual'])
)
return {
'directional_accuracy': directional_accuracy,
'high_confidence_accuracy': high_conf_accuracy,
'recent_accuracy_trend': ... # rolling accuracy
}
Latency мон��торинг
P50, P95, P99 latency отслеживаются через Prometheus + Grafana. SLA: P95 < 50ms для single prediction, P99 < 100ms.
Разрабатываем production-ready ML serving систему: FastAPI inference server, batching для throughput оптимизации, MLflow model registry, realtime monitoring качества предсказаний и автоматический rollback при деградации.







