Розробка системи моніторингу деградації ML-моделі

Проєктуємо та розробляємо блокчейн-рішення повного циклу: від архітектури смарт-контрактів до запуску DeFi-протоколів, NFT-маркетплейсів та криптобірж. Аудит безпеки, токеноміка, інтеграція з наявною інфраструктурою.
Показано 1 з 1Усі 1306 послуг
Розробка системи моніторингу деградації ML-моделі
Середній
~5 днів
Часті запитання

Напрямки блокчейн-розробки

Етапи блокчейн-розробки

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1122
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    859

Розробка системи моніторингу деградації ML-моделей

ML моделі для торговлі деградують з кількох причин: ринкові режими змінюються, паттерни, які використовувала модель, арбітражуються, змінюються кореляції між активами. Моніторинг деградації дозволяє виявити проблему до того, як вона завдасть фінансової шкоди.

Типи деградації

Concept drift: змінилися відносини між ознаками та цільовою змінною. Модель передбачає правильно для "старого" ринку.

Data drift (feature drift): вхідні дані почали розподілятися інакше, ніж під час навчання. Модель отримує нетипові входи.

Label drift: сам розподіл цільової змінної змінився (наприклад, ринок змінився з трендового на бічний).

Performance degradation: якість прогнозування знизилася навіть без явного дрейфу.

Метрики моніторингу

import numpy as np
import pandas as pd
from scipy import stats
from collections import deque

class ModelDegradationMonitor:
    def __init__(self, model_id, baseline_metrics, alert_thresholds):
        self.model_id = model_id
        self.baseline = baseline_metrics
        self.thresholds = alert_thresholds
        
        # Кільцюючі вікна для метрик
        self.predictions_buffer = deque(maxlen=500)
        self.actuals_buffer = deque(maxlen=500)
        self.features_buffer = deque(maxlen=1000)
    
    def log_prediction(self, features, prediction, confidence):
        self.predictions_buffer.append({
            'prediction': prediction,
            'confidence': confidence,
            'timestamp': datetime.utcnow()
        })
        self.features_buffer.append(features)
    
    def log_actual(self, actual_return):
        self.actuals_buffer.append(actual_return)
    
    def calculate_performance_metrics(self, window=100):
        if len(self.predictions_buffer) < window:
            return None
        
        recent_preds = [p['prediction'] for p in list(self.predictions_buffer)[-window:]]
        recent_actuals = list(self.actuals_buffer)[-window:]
        
        if len(recent_actuals) < window:
            return None
        
        # Точність спрямованості
        dir_accuracy = np.mean(
            np.sign(recent_preds) == np.sign(recent_actuals)
        )
        
        # Калібрування впевненості: висока впевненість повинна дати високу точність
        high_conf_preds = [
            (p['prediction'], a) 
            for p, a in zip(list(self.predictions_buffer)[-window:], recent_actuals)
            if p['confidence'] > 0.65
        ]
        
        if high_conf_preds:
            high_conf_accuracy = np.mean([
                np.sign(pred) == np.sign(actual) 
                for pred, actual in high_conf_preds
            ])
        else:
            high_conf_accuracy = None
        
        return {
            'directional_accuracy': dir_accuracy,
            'high_conf_accuracy': high_conf_accuracy,
            'degradation': dir_accuracy - self.baseline.get('directional_accuracy', 0.55),
            'n_predictions': window
        }
    
    def calculate_psi(self, train_distribution, current_values, n_bins=10):
        """Індекс стабільності популяції для дрейфу ознак"""
        bins = np.percentile(train_distribution, np.linspace(0, 100, n_bins + 1))
        bins[0] -= 1e-8
        
        train_pct = np.ones(n_bins) / n_bins  # рівномірно по квантилям
        current_hist = np.histogram(current_values, bins=bins)[0]
        current_pct = np.clip(current_hist / current_hist.sum(), 1e-8, None)
        
        psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
        return psi
    
    def detect_concept_drift(self, method='ks_test', alpha=0.05):
        """КС-тест для порівняння розподілів нещодавніх та історичних прогнозів"""
        if len(self.predictions_buffer) < 200:
            return False, 1.0
        
        preds = [p['prediction'] for p in self.predictions_buffer]
        old_preds = preds[:100]
        new_preds = preds[-100:]
        
        if method == 'ks_test':
            ks_stat, p_value = stats.ks_2samp(old_preds, new_preds)
            return p_value < alpha, p_value
        
        return False, 1.0
    
    def check_all_alerts(self):
        alerts = []
        
        # 1. Деградація продуктивності
        perf = self.calculate_performance_metrics()
        if perf and perf['degradation'] < -self.thresholds.get('max_accuracy_drop', 0.05):
            alerts.append({
                'type': 'performance_degradation',
                'severity': 'HIGH',
                'detail': f"Accuracy dropped {perf['degradation']:.3f} from baseline"
            })
        
        # 2. Дрейф ознак
        recent_features = list(self.features_buffer)[-100:]
        if recent_features and self.baseline.get('feature_distributions'):
            for feature_name in self.baseline['feature_distributions']:
                current_vals = [f.get(feature_name) for f in recent_features if f.get(feature_name) is not None]
                if current_vals:
                    psi = self.calculate_psi(
                        self.baseline['feature_distributions'][feature_name],
                        current_vals
                    )
                    if psi > 0.25:
                        alerts.append({
                            'type': 'feature_drift',
                            'severity': 'MEDIUM',
                            'feature': feature_name,
                            'psi': psi
                        })
        
        # 3. Concept drift
        drifted, p_val = self.detect_concept_drift()
        if drifted:
            alerts.append({
                'type': 'concept_drift',
                'severity': 'MEDIUM',
                'p_value': p_val
            })
        
        return alerts

Grafana Dashboard

Ключові панелі:

  • Rolling Accuracy Chart: 100-точковий кільцюючий середній спрямованості точності. Базова лінія для порівняння.

  • PSI Heatmap: матриця PSI за ознаками над часом. Колір: зелений (< 0.1), жовтий (0.1–0.2), червоний (> 0.2).

  • Confidence Distribution: гістограма оцінок впевненості. Зсув до 0.5 (невпевненість) є ознакою деградації.

  • Alert Timeline: стрічка сповіщень з важкістю та типом.

Система сповіщень

ALERT_CHANNELS = {
    'HIGH': ['telegram', 'email', 'pagerduty'],
    'MEDIUM': ['telegram', 'email'],
    'LOW': ['telegram']
}

async def send_degradation_alert(alert, model_id):
    message = f"""
⚠️ ML Model Degradation Alert
Model: {model_id}
Type: {alert['type']}
Severity: {alert['severity']}
Detail: {alert.get('detail', '')}
Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}

Action recommended: Check model retraining system
"""
    channels = ALERT_CHANNELS.get(alert['severity'], ['telegram'])
    for channel in channels:
        await send_notification(channel, message)

Розробка системи моніторингу деградації з виявленням дрейфу PSI ознак, відстеженням продуктивності, тестуванням дрейфу концепції, Grafana dashboard та багаторівневою системою сповіщень.