Разработка системы реального времени для ML-предсказаний

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка системы реального времени для ML-предсказаний
Сложный
от 2 недель до 3 месяцев
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка системы реального времени для ML-предсказаний

Обученная ML-модель ценна только если её предсказания доступны в нужный момент с минимальной задержкой. Система realtime ML predictions — это не просто «запустить модель», это полноценная инфраструктура с low-latency serving, мониторингом качества и автоматическим переключением моделей.

Архитектура realtime ML serving

Market Data Sources
    │
    ▼
Feature Pipeline (sliding window calculation)
    │
    ▼
Feature Store (Redis — hot features)
    │
    ▼
ML Model Server (FastAPI + GPU/CPU inference)
    │
    ▼
Prediction Cache (Redis — результаты)
    │
    ├──► Trading Strategy (consume predictions)
    ├──► Dashboard (visualize)
    └──► Monitoring (track accuracy)

Feature Pipeline для realtime

import asyncio
import numpy as np
from collections import deque
from datetime import datetime

class RealtimeFeaturePipeline:
    def __init__(self, symbol, window_sizes=[60, 120, 240]):
        self.symbol = symbol
        self.window_sizes = window_sizes
        self.max_window = max(window_sizes)
        
        # Circular buffer для хранения последних N свечей
        self.price_buffer = deque(maxlen=self.max_window + 10)
        self.volume_buffer = deque(maxlen=self.max_window + 10)
        self.high_buffer = deque(maxlen=self.max_window + 10)
        self.low_buffer = deque(maxlen=self.max_window + 10)
    
    def update(self, ohlcv):
        self.price_buffer.append(ohlcv['close'])
        self.volume_buffer.append(ohlcv['volume'])
        self.high_buffer.append(ohlcv['high'])
        self.low_buffer.append(ohlcv['low'])
    
    def get_features(self):
        if len(self.price_buffer) < self.max_window:
            return None  # Недостаточно данных
        
        prices = np.array(self.price_buffer)
        volumes = np.array(self.volume_buffer)
        highs = np.array(self.high_buffer)
        lows = np.array(self.low_buffer)
        
        features = {}
        
        for window in self.window_sizes:
            p = prices[-window:]
            v = volumes[-window:]
            
            # Returns
            features[f'return_{window}'] = (p[-1] - p[0]) / p[0]
            features[f'return_std_{window}'] = np.std(np.diff(np.log(p)))
            
            # Volume
            features[f'vol_ratio_{window}'] = v[-1] / np.mean(v)
            
            # RSI (упрощённо)
            diffs = np.diff(p)
            gains = diffs[diffs > 0].sum()
            losses = -diffs[diffs < 0].sum()
            rs = gains / (losses + 1e-8)
            features[f'rsi_{window}'] = 100 - 100 / (1 + rs)
            
            # Bollinger Band position
            ma = np.mean(p)
            std = np.std(p)
            features[f'bb_pos_{window}'] = (p[-1] - ma) / (2 * std + 1e-8)
        
        return features

ML Model Serving с FastAPI

from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np
from typing import Optional
import time

app = FastAPI()

# Загружаем модели при старте
models = {
    'lgbm_1h': joblib.load('models/lgbm_1h_v3.pkl'),
    'lgbm_4h': joblib.load('models/lgbm_4h_v2.pkl'),
    'lstm_24h': load_torch_model('models/lstm_24h_v1.pt')
}
scaler = joblib.load('models/feature_scaler.pkl')

class PredictionRequest(BaseModel):
    symbol: str
    features: dict
    model_id: Optional[str] = 'lgbm_1h'

class PredictionResponse(BaseModel):
    symbol: str
    model_id: str
    prediction: float
    probability_up: float
    probability_down: float
    confidence: float
    latency_ms: float
    timestamp: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    start_time = time.time()
    
    # Подготовка features
    feature_vector = np.array(list(request.features.values())).reshape(1, -1)
    feature_vector_scaled = scaler.transform(feature_vector)
    
    # Inference
    model = models.get(request.model_id, models['lgbm_1h'])
    proba = model.predict_proba(feature_vector_scaled)[0]
    
    latency = (time.time() - start_time) * 1000
    
    return PredictionResponse(
        symbol=request.symbol,
        model_id=request.model_id,
        prediction=float(proba[1] - proba[0]),  # [-1, 1]
        probability_up=float(proba[1]),
        probability_down=float(proba[0]),
        confidence=float(max(proba)),
        latency_ms=latency,
        timestamp=datetime.utcnow().isoformat()
    )

Batching для оптимизации throughput

При большом количестве одновременных запросов — batching снижает overhead:

class BatchedPredictor:
    def __init__(self, model, batch_size=32, max_wait_ms=10):
        self.model = model
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        self.queue = asyncio.Queue()
    
    async def predict(self, features):
        future = asyncio.Future()
        await self.queue.put((features, future))
        return await future
    
    async def batch_worker(self):
        while True:
            batch = []
            try:
                # Ждём первый запрос
                item = await asyncio.wait_for(
                    self.queue.get(), timeout=self.max_wait_ms/1000
                )
                batch.append(item)
                
                # Собираем оставшиеся запросы (без ожидания)
                while len(batch) < self.batch_size and not self.queue.empty():
                    batch.append(self.queue.get_nowait())
                
            except asyncio.TimeoutError:
                continue
            
            if batch:
                # Inference для всего батча
                features_batch = np.array([b[0] for b in batch])
                predictions = self.model.predict_proba(features_batch)
                
                # Возвращаем результаты
                for i, (_, future) in enumerate(batch):
                    future.set_result(predictions[i])

Model registry и версионирование

import mlflow
from mlflow.tracking import MlflowClient

class ModelRegistry:
    def __init__(self, tracking_uri):
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()
    
    def load_production_model(self, model_name):
        """Загружаем последнюю Production модель"""
        model_version = self.client.get_latest_versions(
            model_name, stages=['Production']
        )[0]
        
        model = mlflow.sklearn.load_model(
            f"models:/{model_name}/{model_version.version}"
        )
        return model, model_version
    
    def promote_to_production(self, model_name, version, metrics):
        """Переводим модель в Production если метрики достаточно хороши"""
        if metrics['test_accuracy'] > 0.54 and metrics['sharpe'] > 1.2:
            self.client.transition_model_version_stage(
                model_name, version, 'Production'
            )
            return True
        return False

Мониторинг quality predictionов

Realtime отслеживание качества предсказаний:

class PredictionMonitor:
    def __init__(self, window=1000):
        self.predictions = deque(maxlen=window)
        self.actuals = deque(maxlen=window)
    
    def log_prediction(self, pred_id, prediction, confidence, timestamp):
        self.predictions.append({
            'id': pred_id, 'prediction': prediction,
            'confidence': confidence, 'timestamp': timestamp
        })
    
    def log_actual(self, pred_id, actual_return):
        """Логируем фактический результат после горизонта предсказания"""
        self.actuals.append({'id': pred_id, 'actual': actual_return})
    
    def calculate_metrics(self):
        # Матчим predictions с actuals
        matched = ...  # join по id
        
        directional_accuracy = np.mean(
            np.sign(matched['prediction']) == np.sign(matched['actual'])
        )
        
        # Распределение confidence: высокая уверенность → более точные предсказания?
        high_conf_accuracy = np.mean(
            np.sign(matched[matched['confidence'] > 0.7]['prediction']) == 
            np.sign(matched[matched['confidence'] > 0.7]['actual'])
        )
        
        return {
            'directional_accuracy': directional_accuracy,
            'high_confidence_accuracy': high_conf_accuracy,
            'recent_accuracy_trend': ...  # rolling accuracy
        }

Latency мон��торинг

P50, P95, P99 latency отслеживаются через Prometheus + Grafana. SLA: P95 < 50ms для single prediction, P99 < 100ms.

Разрабатываем production-ready ML serving систему: FastAPI inference server, batching для throughput оптимизации, MLflow model registry, realtime monitoring качества предсказаний и автоматический rollback при деградации.