Розробка системи автоматичного перенавчання моделей

Проєктуємо та розробляємо блокчейн-рішення повного циклу: від архітектури смарт-контрактів до запуску DeFi-протоколів, NFT-маркетплейсів та криптобірж. Аудит безпеки, токеноміка, інтеграція з наявною інфраструктурою.
Показано 1 з 1Усі 1306 послуг
Розробка системи автоматичного перенавчання моделей
Складний
~1-2 тижні
Часті запитання

Напрямки блокчейн-розробки

Етапи блокчейн-розробки

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1122
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    859

Розробка системи автоматичного перенавчання моделей

ML моделі для трейдингу деградують з часом: ринкові режими змінюються, відносини між змінними зміщуються. Система автоматичного перенавчання виявляє деградацію та запускає нове навчання без ручного втручання.

Тригери перенавчання

Тригер на основі продуктивності: точність спрямованості моделі падає нижче порогу на кільцюючому вікні:

class RetrainingTrigger:
    def __init__(self, performance_threshold=0.52, window_days=14, 
                 min_predictions=100):
        self.threshold = performance_threshold
        self.window = window_days
        self.min_predictions = min_predictions
    
    def should_retrain(self, recent_predictions, recent_actuals):
        if len(recent_predictions) < self.min_predictions:
            return False, 'insufficient_data'
        
        accuracy = np.mean(
            np.sign(recent_predictions) == np.sign(recent_actuals)
        )
        
        if accuracy < self.threshold:
            return True, f'accuracy_{accuracy:.3f}_below_{self.threshold}'
        
        return False, 'performance_ok'
    
    def check_data_drift(self, train_features, current_features):
        """Індекс стабільності популяції (PSI) для дрейфу ознак"""
        psi_values = {}
        
        for col in train_features.columns:
            # Розділяємо на 10 бінів на основі даних навчання
            bins = np.percentile(train_features[col].dropna(), 
                                np.linspace(0, 100, 11))
            bins[0] -= 1e-8
            
            train_counts = np.histogram(train_features[col], bins=bins)[0]
            current_counts = np.histogram(current_features[col], bins=bins)[0]
            
            # PSI
            train_pct = train_counts / train_counts.sum()
            current_pct = current_counts / current_counts.sum()
            
            # Уникаємо log(0)
            train_pct = np.clip(train_pct, 1e-8, None)
            current_pct = np.clip(current_pct, 1e-8, None)
            
            psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
            psi_values[col] = psi
        
        # PSI > 0.2 = істотний дрейф
        max_psi = max(psi_values.values())
        n_drifted = sum(1 for v in psi_values.values() if v > 0.2)
        
        return {
            'max_psi': max_psi,
            'n_drifted_features': n_drifted,
            'should_retrain': max_psi > 0.25 or n_drifted > 3,
            'psi_by_feature': psi_values
        }
    
    def check_scheduled(self, last_training_date, retrain_frequency_days=7):
        """Запланований перенавчання за розкладом"""
        days_since_training = (datetime.utcnow() - last_training_date).days
        return days_since_training >= retrain_frequency_days

Автоматичний конвеєр навчання

import mlflow
from prefect import flow, task

@task
def fetch_training_data(symbol, lookback_days=365):
    """Завантажити дані для перенавчання"""
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(days=lookback_days)
    # Завантажити з ClickHouse/PostgreSQL
    return load_ohlcv_data(symbol, start_date, end_date)

@task
def prepare_features(raw_data):
    """Feature engineering"""
    from feature_pipeline import FeatureEngineer
    engineer = FeatureEngineer()
    return engineer.create_all_features(raw_data)

@task
def train_and_evaluate(features_df, target_col, model_config):
    """Навчити модель з walk-forward валідацією"""
    from training import WalkForwardTrainer
    
    trainer = WalkForwardTrainer(
        n_splits=5,
        test_size=60,  # 60 днів тестового набору
        gap=24  # зазор між навчанням та тестом (години)
    )
    
    with mlflow.start_run():
        model, metrics = trainer.fit_evaluate(features_df, target_col, model_config)
        
        # Логування метрик до MLflow
        mlflow.log_metrics(metrics)
        mlflow.log_params(model_config)
        mlflow.sklearn.log_model(model, 'model')
        
        run_id = mlflow.active_run().info.run_id
    
    return model, metrics, run_id

@task
def validate_and_promote(model, metrics, run_id, min_metrics):
    """Перевірити якість та прийняти рішення про розгортання"""
    passes_validation = (
        metrics.get('directional_accuracy', 0) >= min_metrics['accuracy'] and
        metrics.get('sharpe_ratio', 0) >= min_metrics['sharpe'] and
        metrics.get('max_drawdown', 1) <= min_metrics['max_drawdown']
    )
    
    if passes_validation:
        # Зареєструвати як нову версію Production
        client = mlflow.tracking.MlflowClient()
        model_version = client.create_model_version(
            name='crypto_predictor',
            source=f'runs:/{run_id}/model',
            run_id=run_id
        )
        client.transition_model_version_stage(
            'crypto_predictor', model_version.version, 'Production'
        )
        return True, model_version.version
    
    return False, None

@flow(name="model_retraining_pipeline")
def retrain_model_pipeline(symbol, model_config, min_metrics):
    raw_data = fetch_training_data(symbol)
    features_df = prepare_features(raw_data)
    model, metrics, run_id = train_and_evaluate(features_df, 'target', model_config)
    promoted, version = validate_and_promote(model, metrics, run_id, min_metrics)
    
    return {'promoted': promoted, 'version': version, 'metrics': metrics}

Оновлення моделі без простоїв

При успішному навчанні нової моделі замініть стару без зупинки торговлі:

class ModelHotSwapper:
    def __init__(self):
        self.current_model = None
        self.model_version = None
        self._lock = asyncio.Lock()
    
    async def swap_model(self, new_model, new_version):
        """Потокобезпечна заміна моделі"""
        async with self._lock:
            old_model = self.current_model
            old_version = self.model_version
            
            self.current_model = new_model
            self.model_version = new_version
            
            # Логування заміни моделі
            logger.info(f"Model swapped: {old_version} -> {new_version}")
            
            # Стару модель можна виписати з пам'яті
            del old_model
    
    async def predict(self, features):
        async with self._lock:
            return self.current_model.predict(features)

Розклад перенавчання

Prefect або Airflow для оркестрування:

Щодня в 00:00 UTC:
    1. Перевіра тригера продуктивності
    2. Перевіра тригера PSI дрейфу
    3. Перевіра тригера розкладу (якщо > 7 днів з останнього навчання)
    → Якщо будь-який тригер спрацює → запустити конвеєр перенавчання
    → При успішному навчанні → гарячу заміну моделі
    → Повідомлення в Telegram: "Модель оновлена: v15 → v16, точність 0.567"

Розробка системи автоматичного перенавчання з виявленням дрейфу PSI, моніторингом продуктивності, оркеструванням Prefect/Airflow, відстеженням MLflow та гарячою заміною без простоїв.