On-chain ETL pipeline development

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
On-chain ETL pipeline development
Complex
~1-2 weeks
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1217
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1046
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

On-chain ETL Pipeline Development

Three days after launching Uniswap v3 analytics, you discover that eth_getLogs with broad filters starts timing out, aggregates diverge due to missed reorganizations, and your PostgreSQL bloats with gigabyte tables lacking partitioning. On-chain ETL isn't just "read logs and write to database". It's a system with consistency guarantees, reorg handling, data transformation, and managed backlog management. Let's build it right from the start.

Architecture: Three ETL layers

Classical ETL (Extract — Transform — Load) gains specifics in blockchain context: the data source is immutable but not final (reorgs), volumes are measured in hundreds of millions of events, and latency can range from seconds to hours depending on the task.

Extract: Ingestion from node

Choosing a data source determines everything else. Three levels with increasing complexity:

  • Logs/Events — what the contract explicitly emits. Cheap, fast, structured through ABI. Limitation: only what the developer decided to log.
  • Traces (internal transactions) — all calls within a transaction, including ETH transfers without events. Requires debug_traceTransaction or trace_block (Parity-style). Not all nodes support it; Erigon is the best choice for trace-heavy tasks.
  • State diffs — changes to storage slots per block. Maximum completeness, but enormous data volume and interpretation complexity without ABI.

For most DeFi tasks, logs + traces suffice. State diffs are needed for MEV analytics and monitoring contracts without events (e.g., legacy WETH).

Data fetching patterns:

# Polling with exponential backoff
async def fetch_logs_range(
    rpc: AsyncWeb3,
    from_block: int,
    to_block: int,
    addresses: list[str],
    topics: list[str],
) -> list[Log]:
    try:
        return await rpc.eth.get_logs({
            "fromBlock": from_block,
            "toBlock": to_block,
            "address": addresses,
            "topics": [topics],
        })
    except ValueError as e:
        # "Log response size exceeded" — split range in half
        if "exceeded" in str(e) and from_block < to_block:
            mid = (from_block + to_block) // 2
            left = await fetch_logs_range(rpc, from_block, mid, addresses, topics)
            right = await fetch_logs_range(rpc, mid + 1, to_block, addresses, topics)
            return left + right
        raise

This recursive bisection pattern is mandatory. Public RPCs (and even Alchemy/Infura) cut responses by size. Without it, your pipeline will crash on active blocks.

WebSocket subscriptions for real-time: eth_subscribe("newHeads") gives new blocks, eth_subscribe("logs", filter) streams events. Critical: always do catch-up via polling from the last processed block when reconnecting.

Firehose (StreamingFast/Pinax) — binary protocol over gRPC, specifically for high-throughput indexing. Ingestion speed is an order of magnitude higher than JSON-RPC. Used in Substreams. If you need to process 15M+ Ethereum blocks — consider this first.

Transform: Transformation and enrichment

This is the most logic-heavy layer. Tasks:

ABI decoding. Raw log is topics[] (bytes32) and data (bytes). Decode through viem/ethers/web3.py. Subtlety with proxy contracts: take ABI from implementation, not proxy. EIP-1967 defines the standard slot 0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc for the implementation address.

import { decodeEventLog, parseAbiItem } from 'viem'

// For proxy: resolve implementation
const implSlot = '0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc'
const implAddr = await client.getStorageAt({ address: proxy, slot: implSlot })

const event = parseAbiItem('event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)')
const decoded = decodeEventLog({ abi: [event], data: log.data, topics: log.topics })

Data enrichment. Raw events rarely contain everything needed. Typical enrichments:

  • USD value: pull token price at block moment from Chainlink or custom price oracle
  • Token metadata: symbol(), decimals() — cache aggressively, they're immutable
  • Identity resolution: map addresses to known protocols (Uniswap Router, Aave Pool)

Normalization. Bring token amounts to the correct decimal scale. uint256 from contract → Python Decimal or PostgreSQL numeric — never float64, you'll lose precision on large values.

State transformations — the hardest part. Computing running totals, current balances, LP positions. Requires strict event processing order within block (sort by logIndex).

Load: Writing to storage

Batch writing — mandatory. Don't INSERT one record at a time. PostgreSQL COPY or bulk INSERT via executemany:

# 10-50x faster than individual INSERTs
await conn.executemany(
    """
    INSERT INTO swaps (block_number, tx_hash, log_index, pool, sender, amount0, amount1, price_usd, ts)
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    ON CONFLICT (tx_hash, log_index) DO NOTHING
    """,
    [(s.block, s.tx_hash, s.log_index, s.pool, s.sender, s.amount0, s.amount1, s.price, s.ts)
     for s in batch]
)

ON CONFLICT DO NOTHING — insurance against duplicates when retrying after error. Always add UNIQUE(tx_hash, log_index).

Handling Reorgs: Mandatory

Reorg on Ethereum is not an exception. On PoS Ethereum, 1-2 block reorgs happen several times a day. Ignoring them means having "dirty" data in your database.

Strategy: tombstone + replay. Each record contains block_hash. When receiving a new block, check if block_hash changed for already-processed block_number:

-- Detecting reorg
SELECT block_number, block_hash
FROM processed_blocks
WHERE block_number >= $1
  AND block_hash != ANY($2::bytea[])
ORDER BY block_number;

Upon detecting divergence — rollback and reprocess in one transaction:

BEGIN;
  -- Delete data from "orphaned" blocks
  DELETE FROM swaps WHERE block_hash = ANY($orphaned_hashes);
  DELETE FROM processed_blocks WHERE block_hash = ANY($orphaned_hashes);

  -- Write new canonical data
  INSERT INTO processed_blocks ...;
  INSERT INTO swaps ...;
COMMIT;

For financial data — wait for safe finality (12+ blocks on PoS Ethereum) before considering data authoritative. For analytics — latest suffices with a "preliminary" flag.

Queue and Orchestration

For non-trivial pipelines, you need a queue between Extract and Transform/Load — buffer during peak load and failure isolation.

Tool When to use
Redis Streams < 10k events/sec, simple topology, need dev speed
Apache Kafka > 10k events/sec, multiple consumer groups, retention for replay
RabbitMQ Complex routing, fanout to multiple downstream
Celery + Redis One-off tasks, no throughput requirements

For most DeFi projects, Redis Streams suffices. Kafka adds operational complexity but enables replay — re-read history when adding new transformations.

Orchestration with Airflow or Prefect is needed when the pipeline has dependencies: first load prices, then calculate swap USD values. DAG describes these dependencies explicitly.

Database Schema

Critical schema decisions:

Time-based partitioning — mandatory for event tables. PostgreSQL native partitioning or TimescaleDB hypertables. Without partitioning, VACUUM on 500M row tables takes hours and blocks inserts.

-- TimescaleDB: automatic time partitioning
SELECT create_hypertable('swaps', 'block_time', chunk_time_interval => INTERVAL '1 day');
-- Compress old chunks
SELECT add_compression_policy('swaps', INTERVAL '7 days');

Only necessary indexes. Each index is INSERT overhead. Typical set:

  • (pool_address, block_time) — queries for specific pool over period
  • (sender, block_time) — user transaction history
  • (tx_hash, log_index) — UNIQUE constraint for idempotence

Materialized views for aggregates. Don't compute sums on 100M rows on-the-fly. Materialized view with daily/hourly aggregates + REFRESH MATERIALIZED VIEW CONCURRENTLY on schedule.

Performance: Real Numbers

For reference: Python + asyncio + PostgreSQL pipeline on 8 CPU / 32 GB RAM processes ~2000-5000 events/sec on write. For Ethereum historical sync (2M+ blocks), this means days of work.

Optimizations in impact order:

  1. Parallel ingestion — multiple workers on different block ranges. Linear speedup to CPU count and RPC limits.
  2. Disable indexes during bulk load — load raw data, then CREATE INDEX CONCURRENTLY. 3-10x insert speedup.
  3. Switch to Rust/Go for critical components. ABI parsing and block deserialization in Rust (alloy crate) is 10-20x faster than Python.
  4. Firehose instead of JSON-RPC — if available for your network, gives 5-10x ingestion speedup.

Pipeline Monitoring

Metrics that must be there from day one:

  • Pipeline lagcurrent_block - processed_block. Alert at > 20 blocks. Lag growth signals a bottleneck.
  • Reorg rate — reorg count per hour. Sharp increase = unstable node or RPC.
  • Throughput — events/sec at each stage. Finds the bottleneck.
  • Error rate — decode errors. > 0 means unknown ABI or changed contract.
# Grafana/Prometheus alert
- alert: ETLPipelineLagHigh
  expr: blockchain_latest_block - etl_processed_block > 50
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "ETL pipeline lag: {{ $value }} blocks"

Technology Stack

Component Choice Alternative
Language Python (asyncio + web3.py) TypeScript/Node.js (viem), Rust (alloy)
High-throughput ingestion Substreams + Firehose Custom Rust ingester
Queue Redis Streams Apache Kafka
Database PostgreSQL 16 + TimescaleDB ClickHouse (analytics only)
Orchestration Prefect / Airflow Temporal (complex workflows)
Monitoring Prometheus + Grafana Datadog

Development Process

Phase 1 (3-5 days): Design. Determine data sources, contracts and events, DB schema, latency and volume requirements. Prototype ingester on test data.

Phase 2 (7-14 days): Core pipeline. Extract + Transform + Load with reorg handling. Testing on mainnet data, correctness verification through on-chain state comparison.

Phase 3 (3-5 days): Performance. Profiling, bottleneck optimization, DB tuning (indexes, partitioning, vacuum).

Phase 4 (2-3 days): Deploy and monitor. Docker Compose or Kubernetes, alert setup, runbook.

Total: 2-4 weeks for single protocol pipeline. Multi-chain with cross-chain aggregation — 4-8 weeks.