Development of Multi-Exchange Balance Aggregation System
A balance aggregation system gives a trader a unified view of all their assets distributed across exchanges, wallets, and accounts. This is the foundation for portfolio accounting, capital allocation optimization, and tax reporting.
Data Structure
from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime
@dataclass
class AssetBalance:
asset: str
exchange: str
account_type: str # spot, margin, futures, earn
available: Decimal
locked: Decimal # frozen in orders
total: Decimal
@dataclass
class PortfolioSnapshot:
timestamp: datetime
balances: list[AssetBalance]
total_usd: Decimal
by_exchange: dict[str, Decimal] # USD equivalent by exchanges
by_asset: dict[str, Decimal] # USD equivalent by assets
Parallel Balance Collection
import asyncio
from decimal import Decimal
class BalanceAggregator:
def __init__(self, exchange_clients: dict, price_feed):
self.exchanges = exchange_clients
self.price_feed = price_feed
async def get_portfolio_snapshot(self) -> PortfolioSnapshot:
# Get balances from all exchanges in parallel
balance_tasks = {
name: asyncio.create_task(self._get_exchange_balances(name, client))
for name, client in self.exchanges.items()
}
results = await asyncio.gather(
*balance_tasks.values(),
return_exceptions=True
)
all_balances = []
for exchange_name, result in zip(balance_tasks.keys(), results):
if isinstance(result, Exception):
# Log error, continue with others
logger.error(f"Failed to get balances from {exchange_name}: {result}")
continue
all_balances.extend(result)
# Calculate USD equivalents
prices = await self.price_feed.get_prices(
{b.asset for b in all_balances} - {'USDT', 'USDC', 'BUSD'}
)
return self._build_snapshot(all_balances, prices)
def _build_snapshot(self, balances: list[AssetBalance], prices: dict) -> PortfolioSnapshot:
total_usd = Decimal(0)
by_exchange: dict[str, Decimal] = {}
by_asset: dict[str, Decimal] = {}
for b in balances:
price = prices.get(b.asset, Decimal(1)) # stablecoins = 1
usd_value = b.total * price
total_usd += usd_value
by_exchange[b.exchange] = by_exchange.get(b.exchange, Decimal(0)) + usd_value
by_asset[b.asset] = by_asset.get(b.asset, Decimal(0)) + usd_value
return PortfolioSnapshot(
timestamp=datetime.utcnow(),
balances=balances,
total_usd=total_usd,
by_exchange=by_exchange,
by_asset=by_asset,
)
Caching and Updates
Full balance refresh is done no more than once per minute (REST API limits). For real-time updates, use WebSocket User Data Stream (Binance) or equivalents:
async def subscribe_balance_updates(self, exchange: str):
"""Subscribe to balance updates via User Data Stream"""
listen_key = await self.get_listen_key(exchange)
async with websockets.connect(f"wss://stream.binance.com:9443/ws/{listen_key}") as ws:
async for message in ws:
data = json.loads(message)
if data.get("e") == "outboundAccountPosition":
# Update balance cache
for balance in data["B"]:
await self.update_cached_balance(
exchange=exchange,
asset=balance["a"],
free=Decimal(balance["f"]),
locked=Decimal(balance["l"]),
)
Capital Allocation
With an aggregated view, you can build allocation optimization tools:
def suggest_rebalancing(portfolio: PortfolioSnapshot, target_allocation: dict[str, float]) -> list[str]:
"""Suggests transfers to approach target allocation"""
suggestions = []
for asset, target_pct in target_allocation.items():
current_usd = portfolio.by_asset.get(asset, Decimal(0))
current_pct = float(current_usd / portfolio.total_usd * 100)
diff = target_pct - current_pct
if abs(diff) > 2: # rebalancing threshold
action = "BUY" if diff > 0 else "SELL"
usd_amount = abs(diff / 100) * float(portfolio.total_usd)
suggestions.append(f"{action} ${usd_amount:.0f} of {asset} (current {current_pct:.1f}% → target {target_pct:.1f}%)")
return suggestions
Historical portfolio snapshots are saved to TimescaleDB — this allows building a portfolio growth graph over time, calculating returns by period, generating reports for tax purposes.







