Разработка AI-агента для автоматической торговли
AI Trading Agent — это не просто алгоритм с правилами, это автономная система, которая воспринимает рыночное состояние через множество сенсоров (технические данные, sentiment, on-chain, макро), принимает торговые решения с помощью ML/RL моделей и адаптируется к изменяющимся условиям без ручного вмешательства.
Архитектура AI Trading Agent
┌─────────────────────────────────────────────────────┐
│ AI Trading Agent │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────┐ │
│ │ Perception │ │ Decision │ │ Execution │ │
│ │ Layer │──► │ Engine │──►│ Layer │ │
│ └─────────────┘ └─────────────┘ └──────────┘ │
│ │ │ │ │
│ ┌─────▼──────┐ ┌───────▼──────┐ ┌─────▼───┐ │
│ │ Market │ │ Signal │ │ Risk │ │
│ │ State │ │ Aggregator │ │ Guard │ │
│ └────────────┘ └──────────────┘ └─────────┘ │
└─────────────────────────────────────────────────────┘
Perception Layer — восприятие рынка
from dataclasses import dataclass
from typing import Dict, Optional
import numpy as np
@dataclass
class MarketState:
"""Полное состояние рынка в момент t"""
timestamp: datetime
symbol: str
# Price features
current_price: float
price_features: Dict[str, float] # returns, MA distances, BB position...
# Volatility
realized_vol_24h: float
predicted_vol_4h: float
# Trend
trend_direction: int # 1=up, 0=neutral, -1=down
trend_strength: float # 0-1
# Momentum signals
momentum_score: float # -1 to 1
# Sentiment
sentiment_short: float # Twitter/Telegram based
sentiment_medium: float # Reddit/News based
# On-chain (for BTC/ETH)
exchange_flow: Optional[float]
# Market regime
regime: str # 'trending_up', 'trending_down', 'ranging', 'volatile'
# Position context
current_position: float # -1=full short, 0=flat, 1=full long
unrealized_pnl: float
time_in_position: int # bars
class MarketStateBuilder:
def __init__(self, feature_pipeline, sentiment_analyzer, regime_detector):
self.features = feature_pipeline
self.sentiment = sentiment_analyzer
self.regime = regime_detector
def build(self, symbol, raw_data):
state = MarketState(
timestamp=datetime.utcnow(),
symbol=symbol,
current_price=raw_data['close'].iloc[-1],
price_features=self.features.get_features(raw_data),
realized_vol_24h=self._calc_realized_vol(raw_data, 24),
predicted_vol_4h=self._predict_volatility(raw_data),
trend_direction=self._get_trend_direction(raw_data),
trend_strength=self._get_trend_strength(raw_data),
momentum_score=self._calc_momentum(raw_data),
sentiment_short=self.sentiment.get_score(symbol, 'short'),
sentiment_medium=self.sentiment.get_score(symbol, 'medium'),
exchange_flow=self._get_exchange_flow(symbol),
regime=self.regime.detect(raw_data),
current_position=0, # будет заполнено из portfolio
unrealized_pnl=0,
time_in_position=0
)
return state
Decision Engine — принятие решений
import torch
from typing import Tuple
class AIDecisionEngine:
"""
Иерархическая система принятия решений:
1. Market Regime Classifier → определяет режим рынка
2. Strategy Selector → выбирает активную стратегию
3. Signal Generator → генерирует торговый сигнал
4. Position Sizer → рассчитывает размер позиции
"""
def __init__(self, models_config):
# Ensemble из нескольких моделей
self.models = {
'regime_classifier': load_model(models_config['regime']),
'lgbm_signal': load_model(models_config['lgbm']),
'lstm_signal': load_model(models_config['lstm']),
'rl_agent': load_model(models_config['rl']),
'vol_forecaster': load_model(models_config['vol'])
}
# Веса ансамбля по режимам рынка
self.regime_weights = {
'trending_up': {'lgbm': 0.3, 'lstm': 0.3, 'rl': 0.4},
'trending_down': {'lgbm': 0.3, 'lstm': 0.3, 'rl': 0.4},
'ranging': {'lgbm': 0.5, 'lstm': 0.3, 'rl': 0.2},
'volatile': {'lgbm': 0.6, 'lstm': 0.4, 'rl': 0.0} # RL не торгует при экстремальной волатильности
}
def decide(self, state: MarketState, portfolio_state) -> 'TradingDecision':
# 1. Классифицируем режим рынка
regime = self.classify_regime(state)
# 2. Получаем сигналы от всех моделей
signals = self._aggregate_signals(state, regime)
# 3. Фильтрация по минимальному confidence
if signals['confidence'] < 0.55:
return TradingDecision(action='hold', reason='low_confidence')
# 4. Расчёт размера позиции
target_position = self._calculate_target_position(signals, state, portfolio_state)
# 5. Определяем действие (long/short/close/hold)
action = self._determine_action(target_position, state.current_position)
return TradingDecision(
action=action,
target_position=target_position,
confidence=signals['confidence'],
regime=regime,
signal_breakdown=signals,
stop_loss=self._calculate_stop(state),
take_profit=self._calculate_target(state)
)
def _aggregate_signals(self, state, regime):
weights = self.regime_weights.get(regime, {})
signals_raw = {}
if 'lgbm' in weights and weights['lgbm'] > 0:
signals_raw['lgbm'] = self.models['lgbm_signal'].predict_proba(
state_to_features(state)
)[0]
if 'lstm' in weights and weights['lstm'] > 0:
signals_raw['lstm'] = self.models['lstm_signal'].predict(
state_to_sequence(state)
)[0]
if 'rl' in weights and weights['rl'] > 0:
rl_action, _ = self.models['rl_agent'].predict(
state_to_obs(state), deterministic=True
)
signals_raw['rl'] = rl_action_to_signal(rl_action)
# Взвешенная агрегация
final_signal = sum(
signals_raw[m] * weights.get(m, 0)
for m in signals_raw
) / max(sum(weights.get(m, 0) for m in signals_raw), 1e-8)
confidence = 0.5 + abs(final_signal) * 0.5
return {
'signal': final_signal,
'confidence': confidence,
'direction': 1 if final_signal > 0 else -1,
'breakdown': signals_raw
}
Risk Guard — защитный слой
class RiskGuard:
"""Последний рубеж перед исполнением. Блокирует опасные действия."""
def __init__(self, risk_config):
self.config = risk_config
self.portfolio_monitor = PortfolioRiskMonitor(risk_config)
def validate_decision(self, decision, portfolio_state, market_state) -> Tuple[bool, str]:
# 1. Portfolio-level checks
if portfolio_state.current_drawdown > self.config['max_drawdown']:
return False, f"Max drawdown exceeded: {portfolio_state.current_drawdown:.2%}"
if portfolio_state.daily_loss > self.config['max_daily_loss']:
return False, "Daily loss limit reached"
# 2. Position-level checks
if abs(decision.target_position) > self.config['max_single_position']:
decision.target_position = np.sign(decision.target_position) * self.config['max_single_position']
# 3. Market condition checks
if market_state.realized_vol_24h > self.config['max_vol_to_trade']:
return False, f"Market too volatile: {market_state.realized_vol_24h:.2%} annualized"
# 4. Liquidity check
if market_state.spread > self.config['max_spread_to_trade']:
return False, f"Spread too wide: {market_state.spread:.4%}"
return True, "OK"
Execution Layer
class ExecutionLayer:
def __init__(self, exchanges, execution_config):
self.exchanges = exchanges
self.config = execution_config
async def execute_decision(self, decision, current_position):
if decision.action == 'hold':
return
size_diff = decision.target_position - current_position
if abs(size_diff) < 0.01: # незначительное изменение
return
# Выбираем метод исполнения
if abs(size_diff) > self.config['large_order_threshold']:
# Крупный ордер — используем TWAP
await self.execute_twap(size_diff, duration_minutes=30)
else:
# Обычный ордер
await self.execute_limit_order(size_diff, decision)
# Выставляем защитные ордера
if decision.stop_loss:
await self.place_stop_loss(decision.stop_loss)
if decision.take_profit:
await self.place_take_profit(decision.take_profit)
Continuous Learning
AI агент должен обновляться с течением времени:
class ContinuousLearner:
def __init__(self, agent, retraining_config):
self.agent = agent
self.config = retraining_config
self.experience_buffer = []
def log_experience(self, state, decision, reward):
self.experience_buffer.append({
'state': state, 'decision': decision, 'reward': reward,
'timestamp': datetime.utcnow()
})
async def maybe_retrain(self):
"""Переобучаем если накопилось достаточно опыта и производительность упала"""
if len(self.experience_buffer) < self.config['min_experiences']:
return
recent_rewards = [e['reward'] for e in self.experience_buffer[-100:]]
avg_reward = np.mean(recent_rewards)
if avg_reward < self.config['performance_threshold']:
await self._trigger_retraining()
self.experience_buffer.clear()
Monitoring и observability
Realtime dashboard (Grafana):
- Decision timeline: когда и почему агент открыл/закрыл позицию
- Signal breakdown: вклад каждой модели в текущее решение
- P&L attribution: какая модель заработала/потеряла
- Regime detection history: как агент классифицировал рынок
- Risk metrics: drawdown, exposure, VaR
Telegram алерты: открытие/закрытие позиций с объяснением, срабатывание risk limits, обнаружение аномального рыночного состояния.
Backtesting comparison: сравнение production результатов с backtest — насколько реальность соответствует ожиданиям.
Технический стек
Python (PyTorch, stable-baselines3, lightgbm, transformers), FastAPI для internal API, PostgreSQL для трейдинговой истории, ClickHouse для market data, Redis для state management, Kafka для event streaming, Kubernetes для production deployment, MLflow для model registry, Grafana + Prometheus для мониторинга.
Разрабатываем полноценный AI Trading Agent: perception layer с multi-source data, ensemble decision engine (LGBM + LSTM + RL), risk guard, execution layer с smart order routing, continuous learning и полный observability stack.







