On-chain data processing pipeline for ML

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
On-chain data processing pipeline for ML
Complex
~1-2 weeks
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1217
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1046
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

On-chain data ML pipeline development

The task is typically formulated like this: "we want to predict whale activity" or "we need a model for on-chain credit risk assessment". Behind this lies an engineering problem that most teams underestimate: raw blockchain data is not suitable for ML models directly. Block structure, raw hex-encoded calldata, addresses as bytes20 — these are not features, they are raw material. Between the RPC node and the training set lies several weeks of infrastructure work.

Data sources and why it's complex

Sources and their limitations

Public RPC (eth_getLogs, eth_getBlockByNumber) — the most accessible but least suitable for ML source. Limitations: rate limits (Infura/Alchemy — 10-333 req/s on paid plans), lack of internal transactions without trace_ namespace, lack of pre/post state without archive node.

Archive node with trace API — complete history including internal calls, eth_getStorageAt on any historical block. Erigon is the standard self-hosted choice: occupies ~2.5 TB for Ethereum mainnet, synchronizes in 3-5 days. Important: Erigon and Geth/Besu have different trace_ response formats — the parser needs to be adapted.

Firehose (StreamingFast/The Graph) — binary streaming protocol, exports each block with full call tree, state diffs, receipts. Latency < 500ms from block. This is the most performant source for historical loading: 100k+ blocks per minute vs 1-5k through RPC. Used in Substreams.

Specialized providers — Nansen, Dune Analytics, Flipside Crypto, Allium. Ready normalized tables, SQL interface. Drawbacks: 1-24 hour update lag, limited schema control, cost at high volume.

Why raw data isn't suitable for ML

A raw Transfer log — this is three bytes32 + data bytes. To reach ML features you need to go through:

  1. Decoding — ABI-decode topics and data
  2. Address normalization — uint256 → checksummed hex, label mapping (exchanges, protocols, MEV-bots)
  3. Money normalizationvalue / 10^decimals, conversion to USD through historical price feed
  4. Entity resolution — one EOA may have hundreds of transactions but be one economic agent; smart contracts — proxies, implementations, multisigs

Skipping any of these steps leads to garbage features.

Production pipeline architecture

Ingestion layer

Recommended architecture — event-driven with hot and cold path separation:

[Archive Node / Firehose]
         ↓
  [Kafka / Redpanda]          ← hot path: < 1s latency
         ↓
  [Stream Processor]          ← Flink or custom consumer
    /          \
[Raw Store]  [Feature Store]  ← cold: S3/Parquet, hot: Redis/Feast

Kafka topic per chain, key = block_number:log_index. This guarantees order and allows replay on processing errors. Retention depends on task: for real-time features — 7 days, for retraining — full archive in S3.

For Ethereum mainnet: ~6000 transactions/block × ~6500 blocks/day = ~39M transactions/day. Average transaction size with trace ~2KB — ~75GB/day raw data. Plan storage.

Transformation and feature engineering

This is the most labor-intensive part. Typical on-chain features for various ML tasks:

Wallet profiling (DeFi credit scoring, Sybil detection):

Feature Source Complexity
Address age (blocks since first TX) eth_getTransactionCount history low
Unique contracts interacted with event logs medium
Gas percentile (proxy for expertise) TX history low
Time between transactions (rhythmicity) TX timestamps medium
Nonce gaps (lost TX) nonce vs tx count medium
DeFi protocol diversity contract label mapping high
Liquidation history protocol-specific events high

MEV detection:

  • Sandwich attack pattern: three TX in one block, one address, surrounding target TX
  • Arbitrage: cyclic token transfers returning to sender within one TX
  • Flashloan: FlashLoan event + position delta = 0 by end of block

Whale activity prediction:

  • Large transfers from exchange deposit addresses → probability of sell pressure
  • Accumulation pattern: multiple small buys from different addresses → one recipient
# Example feature engineering for wallet scoring
import polars as pl

def compute_wallet_features(txs: pl.DataFrame) -> pl.DataFrame:
    return txs.group_by("from_address").agg([
        pl.col("block_number").min().alias("first_seen_block"),
        pl.col("block_number").max().alias("last_seen_block"),
        pl.count("hash").alias("tx_count"),
        pl.col("to_address").n_unique().alias("unique_contracts"),
        pl.col("gas_price").quantile(0.5).alias("gas_price_median"),
        pl.col("value_usd").sum().alias("total_volume_usd"),
        pl.col("block_timestamp").diff().dt.total_seconds()
            .mean().alias("avg_interval_seconds"),
    ])

Polars instead of Pandas — the performance difference for processing large datasets (millions of rows) is 5-20x.

Temporal features and data leakage

This is the main methodological pitfall of on-chain ML. Features must be computed only from data available before the prediction moment. Typical mistake: using total_tx_count of an address instead of tx_count_at_time_T.

Pattern: point-in-time correct features. Each row in feature store has entity_id, feature_timestamp, feature_value. When generating training set, join happens on entity_id and feature_timestamp <= label_timestamp.

-- Point-in-time join
SELECT 
    l.wallet_address,
    l.label,
    l.label_timestamp,
    f.tx_count,
    f.unique_contracts,
    f.volume_usd_30d
FROM labels l
ASOF JOIN wallet_features f
    ON l.wallet_address = f.wallet_address
    AND f.feature_timestamp <= l.label_timestamp

ASOF JOIN — native operation in ClickHouse and TimescaleDB, in PostgreSQL emulated through LATERAL.

Feature Store

For production ML on on-chain data you need a feature store with two layers:

Offline store — historical features for training. ClickHouse or Parquet on S3 with Hive-partitioning by date. Queries by date range.

Online store — current features for inference. Redis Hash structures: HGETALL wallet:{address}:features. Updated on each new block for active addresses.

Feast is a popular open-source feature store supporting both layers. But for on-chain specifics often a custom solution is simpler: too many peculiarities (reorg handling, multi-chain entities, temporal correctness).

Handling reorganizations

Reorgs at ML data level — serious problem. If features computed from a block that later became orphaned, training set contains unrealistic data.

Solutions:

  • Confirmation lag — index only blocks older than N blocks (usually 12-32 for PoS Ethereum finality). Adds latency but eliminates the problem.
  • Versioned features — store (entity, block_hash, features), mark orphaned records on reorg. More complex but allows low latency.

MLOps integration

Pipeline should work with existing ML stack:

Feature generation → training: export to Parquet/CSV for DVC or MLflow artifacts. Dataset versioning is critical — model trained on data from specific period must be reproducible.

Inference pipeline: new block → compute delta features → update online store → trigger inference. Latency budget usually 1-10 seconds from block to prediction.

Model drift monitoring: on-chain data changes structurally (merge, new protocols, usage pattern changes). Need input feature distribution monitoring — Evidently AI or custom.

Technology stack

Layer Technology Alternative
Ingestion Firehose + Substreams / custom Erigon reader Alchemy Webhooks
Queue Kafka / Redpanda Redis Streams (for smaller volumes)
Stream processing Apache Flink / Python consumer Bytewax (Python-native)
Offline store ClickHouse / Parquet+S3 BigQuery, Snowflake
Online store Redis 7 (Hash + sorted sets) DragonflyDB
Feature engineering Polars, dbt Spark (for > 1TB/day)
Feature store Feast / custom Hopsworks
Orchestration Airflow / Prefect Dagster

Typical project phases

Phase 1 — Data audit (1-2 weeks). Determine needed signals, their sources, historical data availability. Prototype ingester on small block range.

Phase 2 — Historical backfill (2-4 weeks). Load historical data, normalize, label mapping. Most labor-intensive phase.

Phase 3 — Feature pipeline (2-3 weeks). Implement feature engineering, point-in-time logic, storage.

Phase 4 — Real-time path (1-2 weeks). Node streaming, online store, inference integration.

Phase 5 — MLOps (1-2 weeks). Drift monitoring, dataset versioning, retraining automation.

Total: 7-13 weeks to production-ready pipeline. Estimate depends heavily on number of chains, historical data depth, and inference latency requirements.