Data Normalization System Development
When data arrives from five exchanges, three blockchains and two social platforms — each source sends it in its format. Binance returns timestamps in milliseconds, OKX in seconds, Telegram in UTC datetime, on-chain data in Unix seconds from block. Amounts everywhere different: wei, Gwei, string with floating point. Normalization — not boring task, it's what makes data usable, not a pain.
Problems of heterogeneous data
Real-world discrepancies:
Timestamps:
- Unix milliseconds (Binance, most CEX)
- Unix seconds (Ethereum blocks, Chainlink)
- ISO 8601 strings (some REST APIs)
- Relative ("2 hours ago") — social data scraping
- Timezone-aware vs naive datetimes
Amounts and prices:
- Wei (10^-18 ETH) — on-chain Ethereum
- Lamports (10^-9 SOL) — on-chain Solana
- String with decimals ("1234.567890") — Binance REST
- Integer with fixed decimals (100000000 = 1 BTC some exchanges)
- Float64 — precision loss on big numbers
Asset identifiers:
-
BTCUSDT(Binance),BTC-USDT(OKX),BTC/USDT(ccxt standard),tBTCUST(Bitfinex) - ERC-20 address (0x2260fac...) vs ticker (WBTC)
- CoinGecko ID ("bitcoin") vs CMC ID (1)
Number formats:
-
nullvs"0"vs0vs missing field — for zero volumes -
-0.0— valid in Python/JS float, unintuitive comparison behavior - NaN — sometimes in third-party APIs
Normalization Architecture
System has three parts:
Raw Data (from scrapers)
↓
[Validation Layer] — reject invalid records, log errors
↓
[Transformation Layer] — convert to unified format
↓
[Enrichment Layer] — add derived fields (USD value, normalized ticker)
↓
Normalized Storage
Validation Layer
Before transformation — explicit input validation. Pydantic v2 for Python:
from pydantic import BaseModel, field_validator, model_validator
from decimal import Decimal
from datetime import datetime
class RawTradeEvent(BaseModel):
"""Schema for raw trade events from any exchange"""
exchange: str
raw_symbol: str
raw_price: str | float | int
raw_quantity: str | float | int
raw_timestamp: int | str | float
side: str # 'buy'/'sell' or 'BUY'/'SELL' or 1/2
raw_trade_id: str | int
@field_validator('raw_price', 'raw_quantity', mode='before')
@classmethod
def coerce_to_string(cls, v):
# Always convert to string before Decimal — avoid float precision loss
if isinstance(v, float):
return f"{v:.10f}"
return str(v)
@field_validator('side', mode='before')
@classmethod
def normalize_side(cls, v):
s = str(v).lower()
if s in ('buy', 'b', '1', 'true'):
return 'buy'
if s in ('sell', 's', '2', 'false'):
return 'sell'
raise ValueError(f"Unknown side value: {v}")
Invalid records don't crash pipeline — logged to validation_errors table for review:
async def process_batch(raw_records: list[dict]) -> tuple[list, list]:
valid, errors = [], []
for record in raw_records:
try:
validated = RawTradeEvent(**record)
valid.append(validated)
except ValidationError as e:
errors.append({
"raw": record,
"error": e.errors(),
"timestamp": datetime.utcnow(),
"source": record.get("exchange", "unknown"),
})
return valid, errors
Transformation Layer
Convert to canonical format:
@dataclass
class NormalizedTrade:
exchange: str
symbol: str # canonical: "BTC/USDT"
price: Decimal # always Decimal, no float
quantity: Decimal
quote_quantity: Decimal # price * quantity
side: str # 'buy' or 'sell'
timestamp: datetime # UTC timezone-aware
trade_id: str # unique per exchange
def normalize_timestamp(raw: int | str | float) -> datetime:
"""Convert any timestamp to UTC datetime"""
if isinstance(raw, str):
dt = datetime.fromisoformat(raw.replace('Z', '+00:00'))
return dt.astimezone(timezone.utc)
ts = float(raw)
# Auto-detect: ms vs seconds
# ms timestamps > 1e12 (seconds in 2033 = ~2e9)
if ts > 1e12:
ts = ts / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc)
def parse_decimal(value: str) -> Decimal:
"""Safe conversion to Decimal"""
try:
d = Decimal(str(value))
if d.is_nan() or d.is_infinite():
raise ValueError(f"Non-finite decimal: {value}")
return d
except Exception as e:
raise ValueError(f"Cannot parse decimal from '{value}': {e}")
Symbol normalization
Map tickers between exchanges — use ccxt-compatible BASE/QUOTE:
SYMBOL_MAPPINGS = {
"binance": {
"BTCUSDT": "BTC/USDT",
"ETHUSDT": "ETH/USDT",
"BTCBUSD": "BTC/BUSD",
},
"okx": {
"BTC-USDT": "BTC/USDT",
"BTC-USDT-SWAP": "BTC/USDT:USDT", # perpetual
},
}
def normalize_symbol(raw_symbol: str, exchange: str) -> str:
exchange_map = SYMBOL_MAPPINGS.get(exchange, {})
if raw_symbol in exchange_map:
return exchange_map[raw_symbol]
# Fallback: try split by common separators
for quote in ['USDT', 'USDC', 'BTC', 'ETH']:
if raw_symbol.endswith(quote):
base = raw_symbol[:-len(quote)]
return f"{base}/{quote}"
raise ValueError(f"Cannot normalize symbol '{raw_symbol}' for '{exchange}'")
On-chain specifics: Decimal normalization
For tokens with different decimals:
async def normalize_token_amount(
raw_amount: int, # wei or equivalent
token_address: str,
network: str,
) -> Decimal:
decimals = await token_decimals_cache.get(token_address, network)
# Use Decimal for exact division
divisor = Decimal(10) ** decimals
return Decimal(raw_amount) / divisor
# Cache decimals — immutable, cache aggressively
class TokenDecimalsCache:
def __init__(self):
self._cache: dict[tuple, int] = {}
async def get(self, address: str, network: str) -> int:
key = (address.lower(), network)
if key not in self._cache:
decimals = await self._fetch_decimals(address, network)
self._cache[key] = decimals
return self._cache[key]
Schema Registry: Format versioning
Data sources change. Binance API update — field added, timestamp format changed. Without schema versioning — whole normalization breaks.
Solution: schema registry (like Confluent Schema Registry for Kafka):
SCHEMA_VERSIONS = {
"binance_trade": {
"v1": BinanceTradeV1Schema, # before 2023-01
"v2": BinanceTradeV2Schema, # after 2023-01: added quoteQty
}
}
# Each record stores source schema version
# raw_data: {"_schema": "binance_trade:v2", "T": 1234567890123, "p": "50000.00", ...}
Data Quality Monitoring
Normalization without monitoring — illusion of quality. Key metrics:
-- Error rate by source last hour
SELECT
source,
COUNT(*) FILTER (WHERE status = 'error') AS errors,
COUNT(*) AS total,
ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error') / COUNT(*), 2) AS error_rate_pct
FROM normalization_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY source
ORDER BY error_rate_pct DESC;
Alert at > 5% error rate — data format changed, schema needs update.
Cross-source consistency: same BTC price at same time shouldn't diverge > 0.5% between exchanges. If diverges — either normalization error or real arbitrage.
Tech Stack
| Component | Choice |
|---|---|
| Schema validation | Pydantic v2 (Python) or Zod (TypeScript) |
| Numeric handling | Python decimal.Decimal, PostgreSQL numeric |
| Queue | Redis Streams or Kafka |
| Storage | PostgreSQL (normalized) + S3 (raw backup) |
| Schema registry | Custom or Confluent |
| Quality monitoring | dbt tests + Prometheus |
Always save raw data to S3 before normalization. If normalization error discovered — can re-run on source data without re-collecting.
Development of normalization layer for 5-7 sources — 1-2 weeks. Complexity grows non-linearly with source count: each new source adds edge cases.







