Multi-exchange balance aggregation system

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
Multi-exchange balance aggregation system
Medium
~1-2 weeks
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1218
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    853
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1047
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Development of Multi-Exchange Balance Aggregation System

A balance aggregation system gives a trader a unified view of all their assets distributed across exchanges, wallets, and accounts. This is the foundation for portfolio accounting, capital allocation optimization, and tax reporting.

Data Structure

from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime

@dataclass
class AssetBalance:
    asset: str
    exchange: str
    account_type: str  # spot, margin, futures, earn
    available: Decimal
    locked: Decimal    # frozen in orders
    total: Decimal

@dataclass
class PortfolioSnapshot:
    timestamp: datetime
    balances: list[AssetBalance]
    total_usd: Decimal
    by_exchange: dict[str, Decimal]  # USD equivalent by exchanges
    by_asset: dict[str, Decimal]     # USD equivalent by assets

Parallel Balance Collection

import asyncio
from decimal import Decimal

class BalanceAggregator:
    def __init__(self, exchange_clients: dict, price_feed):
        self.exchanges = exchange_clients
        self.price_feed = price_feed

    async def get_portfolio_snapshot(self) -> PortfolioSnapshot:
        # Get balances from all exchanges in parallel
        balance_tasks = {
            name: asyncio.create_task(self._get_exchange_balances(name, client))
            for name, client in self.exchanges.items()
        }

        results = await asyncio.gather(
            *balance_tasks.values(),
            return_exceptions=True
        )

        all_balances = []
        for exchange_name, result in zip(balance_tasks.keys(), results):
            if isinstance(result, Exception):
                # Log error, continue with others
                logger.error(f"Failed to get balances from {exchange_name}: {result}")
                continue
            all_balances.extend(result)

        # Calculate USD equivalents
        prices = await self.price_feed.get_prices(
            {b.asset for b in all_balances} - {'USDT', 'USDC', 'BUSD'}
        )

        return self._build_snapshot(all_balances, prices)

    def _build_snapshot(self, balances: list[AssetBalance], prices: dict) -> PortfolioSnapshot:
        total_usd = Decimal(0)
        by_exchange: dict[str, Decimal] = {}
        by_asset: dict[str, Decimal] = {}

        for b in balances:
            price = prices.get(b.asset, Decimal(1))  # stablecoins = 1
            usd_value = b.total * price

            total_usd += usd_value
            by_exchange[b.exchange] = by_exchange.get(b.exchange, Decimal(0)) + usd_value
            by_asset[b.asset] = by_asset.get(b.asset, Decimal(0)) + usd_value

        return PortfolioSnapshot(
            timestamp=datetime.utcnow(),
            balances=balances,
            total_usd=total_usd,
            by_exchange=by_exchange,
            by_asset=by_asset,
        )

Caching and Updates

Full balance refresh is done no more than once per minute (REST API limits). For real-time updates, use WebSocket User Data Stream (Binance) or equivalents:

async def subscribe_balance_updates(self, exchange: str):
    """Subscribe to balance updates via User Data Stream"""
    listen_key = await self.get_listen_key(exchange)

    async with websockets.connect(f"wss://stream.binance.com:9443/ws/{listen_key}") as ws:
        async for message in ws:
            data = json.loads(message)

            if data.get("e") == "outboundAccountPosition":
                # Update balance cache
                for balance in data["B"]:
                    await self.update_cached_balance(
                        exchange=exchange,
                        asset=balance["a"],
                        free=Decimal(balance["f"]),
                        locked=Decimal(balance["l"]),
                    )

Capital Allocation

With an aggregated view, you can build allocation optimization tools:

def suggest_rebalancing(portfolio: PortfolioSnapshot, target_allocation: dict[str, float]) -> list[str]:
    """Suggests transfers to approach target allocation"""
    suggestions = []

    for asset, target_pct in target_allocation.items():
        current_usd = portfolio.by_asset.get(asset, Decimal(0))
        current_pct = float(current_usd / portfolio.total_usd * 100)
        diff = target_pct - current_pct

        if abs(diff) > 2:  # rebalancing threshold
            action = "BUY" if diff > 0 else "SELL"
            usd_amount = abs(diff / 100) * float(portfolio.total_usd)
            suggestions.append(f"{action} ${usd_amount:.0f} of {asset} (current {current_pct:.1f}% → target {target_pct:.1f}%)")

    return suggestions

Historical portfolio snapshots are saved to TimescaleDB — this allows building a portfolio growth graph over time, calculating returns by period, generating reports for tax purposes.