Development of Data Lake for Blockchain Data
Blockchain is a public database where you can read everything, but conveniently read almost nothing. JSON-RPC nodes answer "what happened in block X," but not "show me all swaps on Uniswap V3 for the last 30 days by addresses with volume > $10k." For analytics, risk systems, trading strategies, you need a data lake — a layer where raw on-chain data is transformed into structured, indexed, quickly queryable tables.
Data Sources and the Scale Problem
Ethereum mainnet today is about 20 million blocks, ~2 billion transactions, terabytes of event logs. Full Ethereum history in Parquet format takes ~3–4 TB. Each new block (~12 seconds) adds hundreds of transactions and thousands of event logs. BSC, Polygon, Arbitrum, Base — each network has its own history and growth rate.
Three classes of data with different characteristics:
Blocks & transactions — structured, predictable schema. Main complexity: reorgs. Blockchain can temporarily fork, and the last N blocks can be rewritten. Data pipeline must be able to rollback already recorded data on reorg.
Event logs — most valuable for analytics. Transfer, Swap, Liquidation, Mint — all EVM events. Problem: ABI decoding. Without contract ABI a log is just bytes with topics. Need ABI registry or service like Etherscan API / Sourcify to get them.
Traces (internal transactions) — calls between contracts that don't create direct transaction. Without traces, significant part of DeFi activity is invisible: flash loan within one tx, recursive liquidations, MEV bundle. Getting traces via debug_traceTransaction or trace_block — heavy operation, available only on archive nodes.
Data Lake Architecture
Ingestion Layer
Two approaches to data retrieval:
Node-based ingestion — direct connection to Ethereum/EVM node via WebSocket. Subscribe to new blocks + backfill historical data via eth_getLogs batch calls. Requires archive node (or paid access to Alchemy/Infura archive). For backfill of millions of blocks, parallel processing is necessary:
async def fetch_block_range(start: int, end: int, batch_size: int = 100):
tasks = []
for i in range(start, end, batch_size):
chunk = range(i, min(i + batch_size, end))
tasks.append(fetch_blocks(chunk))
return await asyncio.gather(*tasks)
Third-party data providers — Goldsky, Envio, Substreams (StreamingFast/Pinax). They've already indexed history, you pay for API or managed pipeline. Faster to start, but vendor lock-in and more expensive at scale.
Storage: Format and Engine Selection
For raw blockchain data, columnar storage is optimal:
- Apache Parquet on S3/GCS — de facto standard for analytical data. zstd compression reduces event logs volume by 5–10x. Partitioning by block date + block number.
- Apache Iceberg on top of Parquet — adds ACID transactions, schema evolution, time travel. Critical for reorg handling: can rollback specific partitions.
- ClickHouse — OLAP database for hot queries. Hundreds of millions of rows of event logs queried in seconds. Stores data columnarly on disk, no need to read Parquet files via Spark for each query.
Typical two-layer architecture:
Raw layer (S3 + Parquet/Iceberg)
↓ ETL (dbt / Spark / Flink)
Serving layer (ClickHouse / BigQuery)
↓ Query API
Analytics / Trading systems / Dashboards
ABI Decoding and Enrichment
Raw event logs contain topics (hashes of event signatures) and data (ABI-encoded parameters). To decode them you need ABI registry:
from eth_abi import decode
from web3 import Web3
TRANSFER_TOPIC = Web3.keccak(text="Transfer(address,address,uint256)").hex()
def decode_transfer(log: dict) -> dict | None:
if log["topics"][0] != TRANSFER_TOPIC:
return None
from_addr = "0x" + log["topics"][1][-40:]
to_addr = "0x" + log["topics"][2][-40:]
amount = decode(["uint256"], bytes.fromhex(log["data"][2:]))[0]
return {"from": from_addr, "to": to_addr, "amount": amount}
For bulk decoding you need ABI registry — table with contract_address → ABI mapping. Sources: Etherscan API (?module=contract&action=getabi), Sourcify, 4byte.directory for function signatures. Unknown contracts are processed as raw bytes, enrichment happens as ABI becomes available.
Token metadata enrichment: for ERC-20 transfers you need decimals, symbol, price at time. Prices are taken from own Uniswap V3 TWAP records or external sources (CoinGecko historical API, DeFiLlama).
Reorg Handling
Reorg — main headache for any blockchain data pipeline. When a node reports new block, it still has no finality. Ethereum with Proof-of-Stake has probabilistic finality through several blocks and full finality through ~12.8 minutes (2 epochs). L2 networks have even more complex model.
Standard approach:
- Record blocks with confirmation lag (wait for N confirmations before writing to "final" layer). For Ethereum: 32–64 blocks for practical finality.
- Keep "staging" layer for last M blocks — data is written immediately but marked as
pending. - Listen to
Reorganizationevents from node (WebSocket subscriptionnewHeads+ compare parentHash). On reorg — remove affected blocks from staging and reapply new chain.
For Iceberg this is elegantly solved via time travel and merge operations. For ClickHouse — via ReplacingMergeTree with version column.
Data Schema and Key Tables
-- Normalized events with enrichment
CREATE TABLE decoded_events (
block_number UInt64,
block_timestamp DateTime,
tx_hash FixedString(66),
log_index UInt32,
contract FixedString(42),
event_name LowCardinality(String),
chain_id UInt32,
-- decoded fields as JSON or separate tables per event type
params String, -- JSON
INDEX idx_contract (contract) TYPE bloom_filter GRANULARITY 4,
INDEX idx_event (event_name) TYPE set(100) GRANULARITY 4
) ENGINE = ReplacingMergeTree(block_number)
PARTITION BY toYYYYMM(block_timestamp)
ORDER BY (chain_id, contract, block_number, log_index);
Separate tables for high-frequency event types: erc20_transfers, uniswap_v3_swaps, aave_liquidations. Partitioning by month allows efficient querying of time ranges.
Infrastructure and Operational Aspects
Compute: Apache Spark on EMR/Dataproc for batch ETL of historical data. Apache Flink or Kafka Streams for real-time processing of new blocks.
Orchestration: Apache Airflow or Dagster to manage DAG pipelines. Separate DAG for each network and data type.
Monitoring: lag between last on-chain block and last recorded — key metric. Alert if lag > 5 minutes. Also: percentage of successfully decoded logs (ABI registry coverage), size of staging layer (indicator of accumulated reorgs).
Archive cost: full Ethereum history in ClickHouse — order of $500–1500/month on cloud infrastructure depending on provider and configuration.
Development Stages
| Phase | Content | Duration |
|---|---|---|
| Design | Determining scope of networks/events, data schema, stack selection | 1–2 weeks |
| Core ingestion | WebSocket listener, backfill worker, reorg handler | 3–4 weeks |
| ABI registry | ABI accumulation, event decoding, enrichment | 2–3 weeks |
| Storage layer | Parquet/Iceberg on S3, ClickHouse setup, ETL pipeline | 3–4 weeks |
| Serving API | REST/GraphQL API on top of ClickHouse, rate limiting | 2–3 weeks |
| Monitoring & ops | Airflow DAGs, alerts, schema documentation | 1–2 weeks |







