Tokenomics Data Parsing
The task sounds simple, but the devil is in details: tokenomics data is not in one place. One project's vesting schedule is in a smart contract, another's in a PDF whitepaper, third's only in a Discord post. Circulating supply often diverges between CoinGecko, the project, and what's actually on-chain. Building a reliable data source is a normalization layer on top of heterogeneous sources.
What is included in "tokenomics" and where it lives
| Metric | Primary Source | Secondary Source |
|---|---|---|
| Total supply | ERC-20 totalSupply() on-chain |
CoinGecko API |
| Circulating supply | Calculation (total - locked - burned) | CoinGecko (often inaccurate) |
| Holder distribution | On-chain (Transfer events) | Etherscan API |
| Vesting schedule | Vesting contract | Documentation |
| Unlock events | Vesting contract events | TokenUnlocks.app, Vestlab |
| Burn events | Transfer to 0x0...dead |
On-chain |
| Inflation/emission | Contract (mint events) | Whitepaper |
On-chain data: direct calls
For ERC-20 tokens, basic metrics via RPC:
from web3 import Web3
from decimal import Decimal
ERC20_ABI = [
{"name": "totalSupply", "type": "function", "inputs": [], "outputs": [{"type": "uint256"}]},
{"name": "decimals", "type": "function", "inputs": [], "outputs": [{"type": "uint8"}]},
{"name": "balanceOf", "inputs": [{"name": "account", "type": "address"}], "outputs": [{"type": "uint256"}], "type": "function"},
]
def get_token_supply_metrics(token_address: str, w3: Web3) -> dict:
contract = w3.eth.contract(address=Web3.to_checksum_address(token_address), abi=ERC20_ABI)
decimals = contract.functions.decimals().call()
total_supply = Decimal(contract.functions.totalSupply().call()) / Decimal(10 ** decimals)
# Burned tokens: balance on dead addresses
dead_addresses = [
"0x000000000000000000000000000000000000dEaD",
"0x0000000000000000000000000000000000000000"
]
burned = sum(
Decimal(contract.functions.balanceOf(addr).call()) / Decimal(10 ** decimals)
for addr in dead_addresses
)
return {
"total_supply": float(total_supply),
"burned": float(burned),
"circulating_approx": float(total_supply - burned)
}
Indexing Transfer events for holder distribution
Complete holder history — via Transfer event scanning:
def get_all_holders(token_address: str, w3: Web3, from_block: int = 0) -> dict[str, Decimal]:
"""Returns {address: balance} dict via Transfer event replay"""
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
balances: dict[str, Decimal] = {}
decimals = get_decimals(token_address, w3)
# Chunk by 2000 blocks (most RPC limit)
current_block = w3.eth.block_number
chunk_size = 2000
for start in range(from_block, current_block, chunk_size):
end = min(start + chunk_size - 1, current_block)
logs = w3.eth.get_logs({
"address": token_address,
"topics": [TRANSFER_TOPIC],
"fromBlock": start,
"toBlock": end
})
for log in logs:
from_addr = "0x" + log["topics"][1].hex()[-40:]
to_addr = "0x" + log["topics"][2].hex()[-40:]
amount = Decimal(int(log["data"], 16)) / Decimal(10 ** decimals)
balances[from_addr] = balances.get(from_addr, Decimal(0)) - amount
balances[to_addr] = balances.get(to_addr, Decimal(0)) + amount
# Remove zero balances
return {addr: bal for addr, bal in balances.items() if bal > 0}
For tokens with years of history this takes time and thousands of RPC requests. Better to use The Graph subgraph if it exists for the token, or Etherscan API with caching.
Vesting contract parsing: unlock schedule
Most serious projects deploy vesting contracts. Standard implementations — OpenZeppelin VestingWallet, Sablier, LlamaPay. Parsing schedule:
# OpenZeppelin VestingWallet ABI (simplified)
VESTING_ABI = [
{"name": "beneficiary", "type": "function", "inputs": [], "outputs": [{"type": "address"}]},
{"name": "start", "type": "function", "inputs": [], "outputs": [{"type": "uint64"}]},
{"name": "duration", "type": "function", "inputs": [], "outputs": [{"type": "uint64"}]},
{"name": "vestedAmount", "inputs": [{"name": "token", "type": "address"}, {"name": "timestamp", "type": "uint64"}], "outputs": [{"type": "uint256"}], "type": "function"},
{"name": "released", "inputs": [{"name": "token", "type": "address"}], "outputs": [{"type": "uint256"}], "type": "function"},
]
def parse_vesting_contract(vesting_address: str, token_address: str, w3: Web3) -> dict:
contract = w3.eth.contract(address=Web3.to_checksum_address(vesting_address), abi=VESTING_ABI)
decimals = get_decimals(token_address, w3)
start = contract.functions.start().call()
duration = contract.functions.duration().call()
end = start + duration
released = Decimal(contract.functions.released(token_address).call()) / Decimal(10 ** decimals)
# Build unlock schedule: how much unlocks each month
schedule = []
step = 30 * 24 * 3600 # 30 days
for ts in range(start, end + step, step):
vested = Decimal(contract.functions.vestedAmount(token_address, ts).call()) / Decimal(10 ** decimals)
schedule.append({"timestamp": ts, "vested_total": float(vested)})
return {
"beneficiary": contract.functions.beneficiary().call(),
"start": start,
"end": end,
"released": float(released),
"schedule": schedule
}
For Sablier (stream-based vesting) and LlamaPay — APIs are different, need to read stream parameters from their contracts.
CoinGecko API for aggregated data
For market cap, volume, price history — CoinGecko Pro API:
import httpx
from datetime import datetime
COINGECKO_BASE = "https://pro-api.coingecko.com/api/v3"
async def get_token_market_data(coingecko_id: str) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{COINGECKO_BASE}/coins/{coingecko_id}",
headers={"x-cg-pro-api-key": CG_API_KEY},
params={"localization": "false", "tickers": "false", "community_data": "false"}
)
data = resp.json()
mdata = data["market_data"]
return {
"price_usd": mdata["current_price"]["usd"],
"market_cap_usd": mdata["market_cap"]["usd"],
"fully_diluted_valuation": mdata["fully_diluted_valuation"]["usd"],
"total_supply": mdata["total_supply"],
"circulating_supply": mdata["circulating_supply"],
"max_supply": mdata["max_supply"],
"volume_24h": mdata["total_volume"]["usd"],
"price_change_24h_pct": mdata["price_change_percentage_24h"],
}
Important: CoinGecko circulating supply is often inaccurate — projects report it themselves and not always correctly. For critical calculations — verify on-chain.
Normalization and storage
Data from different sources must be brought to unified model:
CREATE TABLE token_snapshots (
id BIGSERIAL PRIMARY KEY,
token_address TEXT NOT NULL,
chain_id INTEGER NOT NULL,
snapshot_time TIMESTAMPTZ NOT NULL,
total_supply NUMERIC,
circulating_supply NUMERIC,
burned_supply NUMERIC,
locked_supply NUMERIC,
price_usd NUMERIC,
market_cap_usd NUMERIC,
holder_count INTEGER,
source TEXT NOT NULL -- 'on-chain', 'coingecko', 'computed'
);
CREATE TABLE unlock_events (
id BIGSERIAL PRIMARY KEY,
token_address TEXT NOT NULL,
vesting_contract TEXT,
beneficiary TEXT,
unlock_time TIMESTAMPTZ NOT NULL,
amount NUMERIC NOT NULL,
label TEXT -- 'team', 'investors', 'ecosystem'
);
Complete tokenomics monitoring system for 50–100 tokens with daily snapshots and alerts on major unlocks: 3–5 weeks development.







