Development of Blockchain Event Indexing Systems
The problem emerges when you need to answer "show me all transactions of this user in the last 30 days" or "what's the total trading volume through our DEX". Direct node calls via eth_getLogs with block ranges isn't architecture, it's a workaround. On an archive node starting from block 0, such a query takes minutes; on a public RPC, it likely times out or returns block range too large.
A proper indexing system is a pipeline: node → event listener → parser → database → API. Each component with independent scaling and exactly-once or at-least-once semantics.
Event pipeline architecture
Event acquisition layer
Three approaches to fetching events from a node, each with its reliability cost:
Polling via eth_getLogs — simplest and most reliable when implemented correctly. A worker periodically requests events for a block range, saves lastIndexedBlock, and resumes from the last processed block on restart.
interface IndexerState {
lastIndexedBlock: number
lastIndexedBlockHash: string // for reorg detection
}
async function pollEvents(
provider: ethers.JsonRpcProvider,
contracts: ContractConfig[],
state: IndexerState
): Promise<ProcessedEvent[]> {
const currentBlock = await provider.getBlockNumber()
const toBlock = Math.min(state.lastIndexedBlock + BATCH_SIZE, currentBlock - CONFIRMATIONS)
if (toBlock <= state.lastIndexedBlock) return []
// Verify chain continuity
const lastBlock = await provider.getBlock(state.lastIndexedBlock)
if (lastBlock?.hash !== state.lastIndexedBlockHash) {
throw new ReorgDetectedError(state.lastIndexedBlock)
}
const logs = await provider.getLogs({
address: contracts.map(c => c.address),
topics: [contracts.flatMap(c => c.topics)],
fromBlock: state.lastIndexedBlock + 1,
toBlock,
})
return logs.map(parseLog)
}
WebSocket subscriptions — low latency (new events arrive immediately), but WebSocket connections are unstable. Need automatic reconnect with backoff and missed block synchronization on reconnection:
async function subscribeWithFallback(provider: ethers.WebSocketProvider) {
const filter = { address: CONTRACT, topics: [EVENT_TOPIC] }
provider.on(filter, async (log) => {
await processLog(log)
})
provider.websocket.on('close', async () => {
console.log('WebSocket closed, syncing missed blocks...')
await syncMissedBlocks(lastProcessedBlock)
reconnect() // exponential backoff
})
}
Firehose / StreamingFast — enterprise-grade. Binary stream of blocks with all contents, minimal latency, built-in reorg handling. Used as data source for The Graph protocol nodes. Significantly more complex to set up.
Handling reorganizations
Reorgs are the most common source of indexer bugs. On Ethereum with PoS, finality comes after two epochs (~12-13 minutes). On PoW networks and young EVM chains (BSC, Polygon), reorgs of 3-5 blocks are normal.
Strategy: index with a delay of N blocks (safe confirmations), store hash of each indexed block, on mismatch — rollback to last consistent state.
-- Indexer state table
CREATE TABLE indexer_blocks (
block_number BIGINT PRIMARY KEY,
block_hash VARCHAR(66) NOT NULL,
indexed_at TIMESTAMPTZ DEFAULT NOW()
);
-- Event with block linkage for rollback capability
CREATE TABLE indexed_events (
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES indexer_blocks(block_number),
log_index INT NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
contract_addr VARCHAR(42) NOT NULL,
event_name VARCHAR(100) NOT NULL,
decoded_data JSONB NOT NULL,
UNIQUE(tx_hash, log_index)
);
On reorg detection — DELETE FROM indexed_events WHERE block_number >= reorg_depth, then reindex from the reorg point.
Event decoding
ABI-decoding events is trivial for ethers.js or viem, but there are nuances.
Indexed vs non-indexed parameters: indexed parameters land in topics, non-indexed in data. An event with 3 indexed parameters + event signature takes 4 topics. Decoding topics for non-primitive types (structs, dynamic arrays) is impossible — they're hashed via keccak256 and lose original data.
import { decodeEventLog } from 'viem'
function parseSwapEvent(log: Log, abi: Abi): SwapEvent {
const decoded = decodeEventLog({
abi,
eventName: 'Swap',
data: log.data,
topics: log.topics,
})
return {
blockNumber: log.blockNumber,
txHash: log.transactionHash,
sender: decoded.args.sender,
recipient: decoded.args.recipient,
amount0: decoded.args.amount0,
amount1: decoded.args.amount1,
sqrtPriceX96: decoded.args.sqrtPriceX96,
liquidity: decoded.args.liquidity,
tick: decoded.args.tick,
}
}
Anonymous events (without event selector in topic0) — rare, but found in old contracts. Decoding requires knowing data structure without ABI.
Proxy contracts: events emit from proxy address, but ABI is from implementation. Need to resolve implementation address via EIP-1967 storage slot: 0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc.
Database and performance
PostgreSQL is the de-facto standard for storing events. Key schema decisions:
Partitioning by block_number: for high-volume contracts (Uniswap V3, major ERC-20s), tables quickly grow to hundreds of millions of rows. Range partitioning provides planned scans instead of seq scan:
CREATE TABLE swap_events (
block_number BIGINT NOT NULL,
-- ...other fields
) PARTITION BY RANGE (block_number);
CREATE TABLE swap_events_0_5m PARTITION OF swap_events FOR VALUES FROM (0) TO (5000000);
CREATE TABLE swap_events_5m_10m PARTITION OF swap_events FOR VALUES FROM (5000000) TO (10000000);
-- etc.
JSONB for decoded_data: storing decoded data in JSONB allows adding new event types without schema migrations. GIN indexes on frequently queried fields inside JSONB. For critical fields — extract to separate typed columns.
TimescaleDB — PostgreSQL extension providing automatic time-based partitioning (hypertables), compression of old data, and continuous aggregates for OHLCV/metrics without background jobs:
-- Continuous aggregate: volume per hour per pool
CREATE MATERIALIZED VIEW pool_hourly_volume
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', event_timestamp) AS hour,
pool_address,
SUM(amount_usd) AS volume_usd,
COUNT(*) AS tx_count
FROM swap_events
GROUP BY 1, 2;
SELECT add_continuous_aggregate_policy('pool_hourly_volume',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
Monitoring and alerts
Indexers must expose metrics: indexer_lag_blocks (gap between head and last indexed block), events_per_second, reorg_count. Alert if indexer_lag_blocks > 50 — indexer is lagging or stuck.
Deploy via Docker Compose / Kubernetes with health check endpoint returning 503 if lag exceeds threshold. This lets load balancers exclude unhealthy instances.
Typical stack: Go or Rust for worker (performance), PostgreSQL / TimescaleDB for storage, Redis for state and queues, Grafana + Prometheus for monitoring.







