Розробка системи нормалізації парсинг-даних у єдиний стандарт
Коли дані приходять із п'яти бірж, трьох блокчейн-мереж та двох соціальних платформ — кожне джерело присилає їх у своєму форматі. Binance повертає timestamps у мілісекундах, OKX — у секундах, Telegram — у UTC datetime, on-chain дані — у Unix секундах з блоку. Суми везде різні: де-то wei, де-то Gwei, де-то string з плавучою точкою. Нормалізація — це не скучна задача, це те, що робить дані використанням, а не болем.
Проблеми гетерогенних даних
Перелік конкретних розбіжностей, які зустрічаються в реальних проектах:
Часові мітки:
- Unix milliseconds (Binance, більшість CEX)
- Unix seconds (Ethereum blocks, Chainlink)
- ISO 8601 strings (деякі REST API)
- Relative ("2 hours ago") — у social data scraping
- Timezone-aware vs naive datetimes
Суми та ціни:
- Wei (10^-18 ETH) — on-chain Ethereum
- Lamports (10^-9 SOL) — on-chain Solana
- String з decimals ("1234.567890") — Binance REST
- Integer з fixed decimals (100000000 = 1 BTC у деяких бірж)
- Float64 — втрата точності на великих числах
Ідентифікатори активів:
-
BTCUSDT(Binance),BTC-USDT(OKX),BTC/USDT(ccxt standard),tBTCUST(Bitfinex) - ERC-20 address (0x2260fac...) vs ticker (WBTC)
- CoinGecko ID ("bitcoin") vs CMC ID (1)
Числові формати:
-
nullvs"0"vs0vs відсутність поля — для нульових обсягів -
-0.0— валідне значення в Python/JS float, неочевидне поведення при порівнянні - NaN — іноді зустрічається в JSON від сторонніх API
Архітектура нормалізаційного шару
Система складається із трьох частин:
Raw Data (from scrapers)
↓
[Validation Layer] — відбираємо невалідні записи, логуємо помилки
↓
[Transformation Layer] — приводимо до єдиного формату
↓
[Enrichment Layer] — додаємо derived поля (USD-вартість, нормалізований тикер)
↓
Normalized Storage
Validation Layer
Перед трансформацією — явна валідація вхідних даних. Pydantic v2 для Python:
from pydantic import BaseModel, field_validator, model_validator
from decimal import Decimal
from datetime import datetime
class RawTradeEvent(BaseModel):
"""Схема для сирих trade подій від будь-якої біржи"""
exchange: str
raw_symbol: str
raw_price: str | float | int
raw_quantity: str | float | int
raw_timestamp: int | str | float
side: str # 'buy'/'sell' або 'BUY'/'SELL' або 1/2
raw_trade_id: str | int
@field_validator('raw_price', 'raw_quantity', mode='before')
@classmethod
def coerce_to_string(cls, v):
# Завжди конвертуємо в string перед Decimal — уникаємо float precision loss
if isinstance(v, float):
return f"{v:.10f}"
return str(v)
@field_validator('side', mode='before')
@classmethod
def normalize_side(cls, v):
s = str(v).lower()
if s in ('buy', 'b', '1', 'true'):
return 'buy'
if s in ('sell', 's', '2', 'false'):
return 'sell'
raise ValueError(f"Unknown side value: {v}")
Невалідні записи не падають весь pipeline — вони логуються в окремі таблицю validation_errors для розбору:
async def process_batch(raw_records: list[dict]) -> tuple[list, list]:
valid, errors = [], []
for record in raw_records:
try:
validated = RawTradeEvent(**record)
valid.append(validated)
except ValidationError as e:
errors.append({
"raw": record,
"error": e.errors(),
"timestamp": datetime.utcnow(),
"source": record.get("exchange", "unknown"),
})
return valid, errors
Transformation Layer
Приведення до канонічного формату:
@dataclass
class NormalizedTrade:
exchange: str
symbol: str # canonical: "BTC/USDT"
price: Decimal # завжди Decimal, без float
quantity: Decimal
quote_quantity: Decimal # price * quantity
side: str # 'buy' або 'sell'
timestamp: datetime # UTC timezone-aware
trade_id: str # унікальна в межах біржі
def normalize_timestamp(raw: int | str | float) -> datetime:
"""Приводить будь-який timestamp до UTC datetime"""
if isinstance(raw, str):
dt = datetime.fromisoformat(raw.replace('Z', '+00:00'))
return dt.astimezone(timezone.utc)
ts = float(raw)
# Автооцінювання: ms vs seconds
# timestamps у ms > 1e12 (секунди у 2033 = ~2e9)
if ts > 1e12:
ts = ts / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc)
def parse_decimal(value: str) -> Decimal:
"""Безпечна конвертація в Decimal"""
try:
d = Decimal(str(value))
if d.is_nan() or d.is_infinite():
raise ValueError(f"Non-finite decimal: {value}")
return d
except Exception as e:
raise ValueError(f"Cannot parse decimal from '{value}': {e}")
Symbol normalization
Картування тикерів між біржами — використовуємо ccxt-сумісний формат BASE/QUOTE:
SYMBOL_MAPPINGS = {
"binance": {
"BTCUSDT": "BTC/USDT",
"ETHUSDT": "ETH/USDT",
"BTCBUSD": "BTC/BUSD",
},
"okx": {
"BTC-USDT": "BTC/USDT",
"BTC-USDT-SWAP": "BTC/USDT:USDT", # perpetual
},
}
def normalize_symbol(raw_symbol: str, exchange: str) -> str:
exchange_map = SYMBOL_MAPPINGS.get(exchange, {})
if raw_symbol in exchange_map:
return exchange_map[raw_symbol]
# Fallback: пробуємо split по загальним розділювачам
for quote in ['USDT', 'USDC', 'BTC', 'ETH']:
if raw_symbol.endswith(quote):
base = raw_symbol[:-len(quote)]
return f"{base}/{quote}"
raise ValueError(f"Cannot normalize symbol '{raw_symbol}' for '{exchange}'")
On-chain специфіка: Decimal normalization
Для токенів з різною кількістю decimals:
async def normalize_token_amount(
raw_amount: int, # wei або аналог
token_address: str,
network: str,
) -> Decimal:
decimals = await token_decimals_cache.get(token_address, network)
# Використовуємо Decimal для точного ділення
divisor = Decimal(10) ** decimals
return Decimal(raw_amount) / divisor
# Кеш decimals — вони immutable, агресивно кешуємо
class TokenDecimalsCache:
def __init__(self):
self._cache: dict[tuple, int] = {}
async def get(self, address: str, network: str) -> int:
key = (address.lower(), network)
if key not in self._cache:
decimals = await self._fetch_decimals(address, network)
self._cache[key] = decimals
return self._cache[key]
Schema Registry: Версіонування форматів
Джерела даних змінюються. Binance оновив API — додалося поле, змінився формат timestamp. Без версіонування схем — сломається вся нормалізація.
Рішення: schema registry (аналог Confluent Schema Registry для Kafka):
SCHEMA_VERSIONS = {
"binance_trade": {
"v1": BinanceTradeV1Schema, # до 2023-01
"v2": BinanceTradeV2Schema, # після 2023-01: додано quoteQty
}
}
# Кожен запис зберігає версію схеми джерела
# raw_data: {"_schema": "binance_trade:v2", "T": 1234567890123, "p": "50000.00", ...}
Моніторинг якості даних
Нормалізація без мониторингу — ілюзія якості. Ключові метрики:
-- Процент помилок валідації по джерелу за останню годину
SELECT
source,
COUNT(*) FILTER (WHERE status = 'error') AS errors,
COUNT(*) AS total,
ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error') / COUNT(*), 2) AS error_rate_pct
FROM normalization_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY source
ORDER BY error_rate_pct DESC;
Алерт при error_rate > 5% для будь-якого джерела — значить змінився формат даних та потрібно оновити схему.
Cross-source consistency check: одна й та ж ціна BTC в один час не повинна розходитися між біржами більше ніж на 0.5%. Якщо розходится — або помилка нормалізації, або реальний арбітраж.
Технологічний стек
| Компонент | Вибір |
|---|---|
| Валідація схем | Pydantic v2 (Python) або Zod (TypeScript) |
| Обробка числових значень | Python decimal.Decimal, PostgreSQL numeric |
| Черга | Redis Streams або Kafka |
| Зберігання | PostgreSQL (normalized) + S3 (raw backup) |
| Schema registry | Custom або Confluent |
| Моніторинг якості | dbt tests + Prometheus |
Сирі дані завжди зберігаємо в S3 до нормалізації. Якщо виявлена помилка в логіці нормалізації — можна перепрогнати по вихідним даних без повторного сбору.
Розробка нормалізаційного шару для 5-7 джерел — 1-2 тижні. Складність зростає нелінійно з кількістю джерел: кожне нове джерело додає свої edge cases.







