Разработка системы автоматического переобучения моделей
ML модели для трейдинга деградируют со временем: рыночные режимы меняются, отношения между переменными сдвигаются. Система автоматического переобучения детектирует деградацию и запускает новое обучение без ручного вмешательства.
Триггеры переобучения
Performance-based trigger: директional accuracy модели упала ниже threshold за rolling window:
class RetrainingTrigger:
def __init__(self, performance_threshold=0.52, window_days=14,
min_predictions=100):
self.threshold = performance_threshold
self.window = window_days
self.min_predictions = min_predictions
def should_retrain(self, recent_predictions, recent_actuals):
if len(recent_predictions) < self.min_predictions:
return False, 'insufficient_data'
accuracy = np.mean(
np.sign(recent_predictions) == np.sign(recent_actuals)
)
if accuracy < self.threshold:
return True, f'accuracy_{accuracy:.3f}_below_{self.threshold}'
return False, 'performance_ok'
def check_data_drift(self, train_features, current_features):
"""Population Stability Index (PSI) для feature drift"""
psi_values = {}
for col in train_features.columns:
# Делим на 10 бинов по обучающим данным
bins = np.percentile(train_features[col].dropna(),
np.linspace(0, 100, 11))
bins[0] -= 1e-8
train_counts = np.histogram(train_features[col], bins=bins)[0]
current_counts = np.histogram(current_features[col], bins=bins)[0]
# PSI
train_pct = train_counts / train_counts.sum()
current_pct = current_counts / current_counts.sum()
# Avoid log(0)
train_pct = np.clip(train_pct, 1e-8, None)
current_pct = np.clip(current_pct, 1e-8, None)
psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
psi_values[col] = psi
# PSI > 0.2 = значительный drift
max_psi = max(psi_values.values())
n_drifted = sum(1 for v in psi_values.values() if v > 0.2)
return {
'max_psi': max_psi,
'n_drifted_features': n_drifted,
'should_retrain': max_psi > 0.25 or n_drifted > 3,
'psi_by_feature': psi_values
}
def check_scheduled(self, last_training_date, retrain_frequency_days=7):
"""Плановое переобучение по расписанию"""
days_since_training = (datetime.utcnow() - last_training_date).days
return days_since_training >= retrain_frequency_days
Автоматический training pipeline
import mlflow
from prefect import flow, task
@task
def fetch_training_data(symbol, lookback_days=365):
"""Загружаем данные для переобучения"""
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=lookback_days)
# Загружаем из ClickHouse/PostgreSQL
return load_ohlcv_data(symbol, start_date, end_date)
@task
def prepare_features(raw_data):
"""Feature engineering"""
from feature_pipeline import FeatureEngineer
engineer = FeatureEngineer()
return engineer.create_all_features(raw_data)
@task
def train_and_evaluate(features_df, target_col, model_config):
"""Обучение модели с walk-forward validation"""
from training import WalkForwardTrainer
trainer = WalkForwardTrainer(
n_splits=5,
test_size=60, # 60 дней тестовой выборки
gap=24 # gap между train и test (часы)
)
with mlflow.start_run():
model, metrics = trainer.fit_evaluate(features_df, target_col, model_config)
# Логируем метрики в MLflow
mlflow.log_metrics(metrics)
mlflow.log_params(model_config)
mlflow.sklearn.log_model(model, 'model')
run_id = mlflow.active_run().info.run_id
return model, metrics, run_id
@task
def validate_and_promote(model, metrics, run_id, min_metrics):
"""Проверяем качество и решаем о деплое"""
passes_validation = (
metrics.get('directional_accuracy', 0) >= min_metrics['accuracy'] and
metrics.get('sharpe_ratio', 0) >= min_metrics['sharpe'] and
metrics.get('max_drawdown', 1) <= min_metrics['max_drawdown']
)
if passes_validation:
# Регистрируем как новую Production версию
client = mlflow.tracking.MlflowClient()
model_version = client.create_model_version(
name='crypto_predictor',
source=f'runs:/{run_id}/model',
run_id=run_id
)
client.transition_model_version_stage(
'crypto_predictor', model_version.version, 'Production'
)
return True, model_version.version
return False, None
@flow(name="model_retraining_pipeline")
def retrain_model_pipeline(symbol, model_config, min_metrics):
raw_data = fetch_training_data(symbol)
features_df = prepare_features(raw_data)
model, metrics, run_id = train_and_evaluate(features_df, 'target', model_config)
promoted, version = validate_and_promote(model, metrics, run_id, min_metrics)
return {'promoted': promoted, 'version': version, 'metrics': metrics}
Zero-downtime модель обновление
При успешном обучении новой модели нужно заменить старую без остановки торговли:
class ModelHotSwapper:
def __init__(self):
self.current_model = None
self.model_version = None
self._lock = asyncio.Lock()
async def swap_model(self, new_model, new_version):
"""Thread-safe замена модели"""
async with self._lock:
old_model = self.current_model
old_version = self.model_version
self.current_model = new_model
self.model_version = new_version
# Логируем смену модели
logger.info(f"Model swapped: {old_version} -> {new_version}")
# Старую модель можно выгрузить из памяти
del old_model
async def predict(self, features):
async with self._lock:
return self.current_model.predict(features)
Расписание переобучения
Prefect или Airflow для оркестрации:
Ежедневно в 00:00 UTC:
1. Проверка performance trigger
2. Проверка PSI drift trigger
3. Проверка schedule trigger (если > 7 дней с последнего обучения)
→ Если хотя бы один trigger сработал → запуск retraining pipeline
→ При успешном обучении → hot swap модели
→ Уведомление в Telegram: "Модель обновлена: v15 → v16, accuracy 0.567"
Разрабатываем систему авто-переобучения с PSI drift detection, performance monitoring trigger, Prefect/Airflow оркестрацией, MLflow tracking и zero-downtime hot swap.







