Розробка системи об'єднання балансів на кількох біржах
Система агрегації балансів дає трейдеру єдиний view на всі його активи розподілені за біржами, гаманцями та акаунтами. Це фундамент для портфельного обліку, оптимізації розміщення капіталу та податкової звітності.
Структура даних
from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime
@dataclass
class AssetBalance:
asset: str
exchange: str
account_type: str # spot, margin, futures, earn
available: Decimal
locked: Decimal # заморожено в ордерах
total: Decimal
@dataclass
class PortfolioSnapshot:
timestamp: datetime
balances: list[AssetBalance]
total_usd: Decimal
by_exchange: dict[str, Decimal] # еквівалент у доларах США за біржами
by_asset: dict[str, Decimal] # еквівалент у доларах США за активами
Паралельний збір балансів
import asyncio
from decimal import Decimal
class BalanceAggregator:
def __init__(self, exchange_clients: dict, price_feed):
self.exchanges = exchange_clients
self.price_feed = price_feed
async def get_portfolio_snapshot(self) -> PortfolioSnapshot:
# Паралельно отримуємо балансу з усіх бірж
balance_tasks = {
name: asyncio.create_task(self._get_exchange_balances(name, client))
for name, client in self.exchanges.items()
}
results = await asyncio.gather(
*balance_tasks.values(),
return_exceptions=True
)
all_balances = []
for exchange_name, result in zip(balance_tasks.keys(), results):
if isinstance(result, Exception):
# Логуємо помилку, продовжуємо з іншими
logger.error(f"Failed to get balances from {exchange_name}: {result}")
continue
all_balances.extend(result)
# Рассчитуємо еквіваленти у доларах США
prices = await self.price_feed.get_prices(
{b.asset for b in all_balances} - {'USDT', 'USDC', 'BUSD'}
)
return self._build_snapshot(all_balances, prices)
def _build_snapshot(self, balances: list[AssetBalance], prices: dict) -> PortfolioSnapshot:
total_usd = Decimal(0)
by_exchange: dict[str, Decimal] = {}
by_asset: dict[str, Decimal] = {}
for b in balances:
price = prices.get(b.asset, Decimal(1)) # стейблкоіни = 1
usd_value = b.total * price
total_usd += usd_value
by_exchange[b.exchange] = by_exchange.get(b.exchange, Decimal(0)) + usd_value
by_asset[b.asset] = by_asset.get(b.asset, Decimal(0)) + usd_value
return PortfolioSnapshot(
timestamp=datetime.utcnow(),
balances=balances,
total_usd=total_usd,
by_exchange=by_exchange,
by_asset=by_asset,
)
Кешування та оновлення
Повне оновлення балансів робиться не частіше 1 разу на хвилину (REST API limits). Для оновлень у реальному часі використовуємо WebSocket User Data Stream (Binance) або аналоги:
async def subscribe_balance_updates(self, exchange: str):
"""Підписка на оновлення балансу через User Data Stream"""
listen_key = await self.get_listen_key(exchange)
async with websockets.connect(f"wss://stream.binance.com:9443/ws/{listen_key}") as ws:
async for message in ws:
data = json.loads(message)
if data.get("e") == "outboundAccountPosition":
# Оновлюємо кэш балансу
for balance in data["B"]:
await self.update_cached_balance(
exchange=exchange,
asset=balance["a"],
free=Decimal(balance["f"]),
locked=Decimal(balance["l"]),
)
Аллокація капіталу
Маючи агрегований view, можна будувати інструменти оптимізації розміщення:
def suggest_rebalancing(portfolio: PortfolioSnapshot, target_allocation: dict[str, float]) -> list[str]:
"""Пропонує переводи для наближення до цільової аллокації"""
suggestions = []
for asset, target_pct in target_allocation.items():
current_usd = portfolio.by_asset.get(asset, Decimal(0))
current_pct = float(current_usd / portfolio.total_usd * 100)
diff = target_pct - current_pct
if abs(diff) > 2: # поріг ребалансування
action = "BUY" if diff > 0 else "SELL"
usd_amount = abs(diff / 100) * float(portfolio.total_usd)
suggestions.append(f"{action} ${usd_amount:.0f} of {asset} (current {current_pct:.1f}% → target {target_pct:.1f}%)")
return suggestions
Історичні снимки портфеля зберігаються в TimescaleDB — це дозволяє будувати графік зростання портфеля у часі, рассчитувати дохідність за періодами, генерувати звіти для податкових цілей.







