Multi-Source Scraping Data Normalization System

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
Multi-Source Scraping Data Normalization System
Medium
~3-5 business days
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1217
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1046
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Data Normalization System Development

When data arrives from five exchanges, three blockchains and two social platforms — each source sends it in its format. Binance returns timestamps in milliseconds, OKX in seconds, Telegram in UTC datetime, on-chain data in Unix seconds from block. Amounts everywhere different: wei, Gwei, string with floating point. Normalization — not boring task, it's what makes data usable, not a pain.

Problems of heterogeneous data

Real-world discrepancies:

Timestamps:

  • Unix milliseconds (Binance, most CEX)
  • Unix seconds (Ethereum blocks, Chainlink)
  • ISO 8601 strings (some REST APIs)
  • Relative ("2 hours ago") — social data scraping
  • Timezone-aware vs naive datetimes

Amounts and prices:

  • Wei (10^-18 ETH) — on-chain Ethereum
  • Lamports (10^-9 SOL) — on-chain Solana
  • String with decimals ("1234.567890") — Binance REST
  • Integer with fixed decimals (100000000 = 1 BTC some exchanges)
  • Float64 — precision loss on big numbers

Asset identifiers:

  • BTCUSDT (Binance), BTC-USDT (OKX), BTC/USDT (ccxt standard), tBTCUST (Bitfinex)
  • ERC-20 address (0x2260fac...) vs ticker (WBTC)
  • CoinGecko ID ("bitcoin") vs CMC ID (1)

Number formats:

  • null vs "0" vs 0 vs missing field — for zero volumes
  • -0.0 — valid in Python/JS float, unintuitive comparison behavior
  • NaN — sometimes in third-party APIs

Normalization Architecture

System has three parts:

Raw Data (from scrapers)
        ↓
[Validation Layer]   — reject invalid records, log errors
        ↓
[Transformation Layer] — convert to unified format
        ↓
[Enrichment Layer]   — add derived fields (USD value, normalized ticker)
        ↓
Normalized Storage

Validation Layer

Before transformation — explicit input validation. Pydantic v2 for Python:

from pydantic import BaseModel, field_validator, model_validator
from decimal import Decimal
from datetime import datetime

class RawTradeEvent(BaseModel):
    """Schema for raw trade events from any exchange"""
    exchange: str
    raw_symbol: str
    raw_price: str | float | int
    raw_quantity: str | float | int
    raw_timestamp: int | str | float
    side: str  # 'buy'/'sell' or 'BUY'/'SELL' or 1/2
    raw_trade_id: str | int

    @field_validator('raw_price', 'raw_quantity', mode='before')
    @classmethod
    def coerce_to_string(cls, v):
        # Always convert to string before Decimal — avoid float precision loss
        if isinstance(v, float):
            return f"{v:.10f}"
        return str(v)

    @field_validator('side', mode='before')
    @classmethod
    def normalize_side(cls, v):
        s = str(v).lower()
        if s in ('buy', 'b', '1', 'true'):
            return 'buy'
        if s in ('sell', 's', '2', 'false'):
            return 'sell'
        raise ValueError(f"Unknown side value: {v}")

Invalid records don't crash pipeline — logged to validation_errors table for review:

async def process_batch(raw_records: list[dict]) -> tuple[list, list]:
    valid, errors = [], []
    for record in raw_records:
        try:
            validated = RawTradeEvent(**record)
            valid.append(validated)
        except ValidationError as e:
            errors.append({
                "raw": record,
                "error": e.errors(),
                "timestamp": datetime.utcnow(),
                "source": record.get("exchange", "unknown"),
            })
    return valid, errors

Transformation Layer

Convert to canonical format:

@dataclass
class NormalizedTrade:
    exchange: str
    symbol: str           # canonical: "BTC/USDT"
    price: Decimal        # always Decimal, no float
    quantity: Decimal
    quote_quantity: Decimal  # price * quantity
    side: str             # 'buy' or 'sell'
    timestamp: datetime   # UTC timezone-aware
    trade_id: str         # unique per exchange

def normalize_timestamp(raw: int | str | float) -> datetime:
    """Convert any timestamp to UTC datetime"""
    if isinstance(raw, str):
        dt = datetime.fromisoformat(raw.replace('Z', '+00:00'))
        return dt.astimezone(timezone.utc)

    ts = float(raw)
    # Auto-detect: ms vs seconds
    # ms timestamps > 1e12 (seconds in 2033 = ~2e9)
    if ts > 1e12:
        ts = ts / 1000
    return datetime.fromtimestamp(ts, tz=timezone.utc)

def parse_decimal(value: str) -> Decimal:
    """Safe conversion to Decimal"""
    try:
        d = Decimal(str(value))
        if d.is_nan() or d.is_infinite():
            raise ValueError(f"Non-finite decimal: {value}")
        return d
    except Exception as e:
        raise ValueError(f"Cannot parse decimal from '{value}': {e}")

Symbol normalization

Map tickers between exchanges — use ccxt-compatible BASE/QUOTE:

SYMBOL_MAPPINGS = {
    "binance": {
        "BTCUSDT": "BTC/USDT",
        "ETHUSDT": "ETH/USDT",
        "BTCBUSD": "BTC/BUSD",
    },
    "okx": {
        "BTC-USDT": "BTC/USDT",
        "BTC-USDT-SWAP": "BTC/USDT:USDT",  # perpetual
    },
}

def normalize_symbol(raw_symbol: str, exchange: str) -> str:
    exchange_map = SYMBOL_MAPPINGS.get(exchange, {})
    if raw_symbol in exchange_map:
        return exchange_map[raw_symbol]
    # Fallback: try split by common separators
    for quote in ['USDT', 'USDC', 'BTC', 'ETH']:
        if raw_symbol.endswith(quote):
            base = raw_symbol[:-len(quote)]
            return f"{base}/{quote}"
    raise ValueError(f"Cannot normalize symbol '{raw_symbol}' for '{exchange}'")

On-chain specifics: Decimal normalization

For tokens with different decimals:

async def normalize_token_amount(
    raw_amount: int,          # wei or equivalent
    token_address: str,
    network: str,
) -> Decimal:
    decimals = await token_decimals_cache.get(token_address, network)
    # Use Decimal for exact division
    divisor = Decimal(10) ** decimals
    return Decimal(raw_amount) / divisor

# Cache decimals — immutable, cache aggressively
class TokenDecimalsCache:
    def __init__(self):
        self._cache: dict[tuple, int] = {}

    async def get(self, address: str, network: str) -> int:
        key = (address.lower(), network)
        if key not in self._cache:
            decimals = await self._fetch_decimals(address, network)
            self._cache[key] = decimals
        return self._cache[key]

Schema Registry: Format versioning

Data sources change. Binance API update — field added, timestamp format changed. Without schema versioning — whole normalization breaks.

Solution: schema registry (like Confluent Schema Registry for Kafka):

SCHEMA_VERSIONS = {
    "binance_trade": {
        "v1": BinanceTradeV1Schema,   # before 2023-01
        "v2": BinanceTradeV2Schema,   # after 2023-01: added quoteQty
    }
}

# Each record stores source schema version
# raw_data: {"_schema": "binance_trade:v2", "T": 1234567890123, "p": "50000.00", ...}

Data Quality Monitoring

Normalization without monitoring — illusion of quality. Key metrics:

-- Error rate by source last hour
SELECT
    source,
    COUNT(*) FILTER (WHERE status = 'error') AS errors,
    COUNT(*) AS total,
    ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error') / COUNT(*), 2) AS error_rate_pct
FROM normalization_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY source
ORDER BY error_rate_pct DESC;

Alert at > 5% error rate — data format changed, schema needs update.

Cross-source consistency: same BTC price at same time shouldn't diverge > 0.5% between exchanges. If diverges — either normalization error or real arbitrage.

Tech Stack

Component Choice
Schema validation Pydantic v2 (Python) or Zod (TypeScript)
Numeric handling Python decimal.Decimal, PostgreSQL numeric
Queue Redis Streams or Kafka
Storage PostgreSQL (normalized) + S3 (raw backup)
Schema registry Custom or Confluent
Quality monitoring dbt tests + Prometheus

Always save raw data to S3 before normalization. If normalization error discovered — can re-run on source data without re-collecting.

Development of normalization layer for 5-7 sources — 1-2 weeks. Complexity grows non-linearly with source count: each new source adds edge cases.