Розробка системи нормалізації граббінг-даних з різних джерел

Проєктуємо та розробляємо блокчейн-рішення повного циклу: від архітектури смарт-контрактів до запуску DeFi-протоколів, NFT-маркетплейсів та криптобірж. Аудит безпеки, токеноміка, інтеграція з наявною інфраструктурою.
Показано 1 з 1Усі 1306 послуг
Розробка системи нормалізації граббінг-даних з різних джерел
Середній
~3-5 днів
Часті запитання

Напрямки блокчейн-розробки

Етапи блокчейн-розробки

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1286
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1122
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    859

Розробка системи нормалізації парсинг-даних у єдиний стандарт

Коли дані приходять із п'яти бірж, трьох блокчейн-мереж та двох соціальних платформ — кожне джерело присилає їх у своєму форматі. Binance повертає timestamps у мілісекундах, OKX — у секундах, Telegram — у UTC datetime, on-chain дані — у Unix секундах з блоку. Суми везде різні: де-то wei, де-то Gwei, де-то string з плавучою точкою. Нормалізація — це не скучна задача, це те, що робить дані використанням, а не болем.

Проблеми гетерогенних даних

Перелік конкретних розбіжностей, які зустрічаються в реальних проектах:

Часові мітки:

  • Unix milliseconds (Binance, більшість CEX)
  • Unix seconds (Ethereum blocks, Chainlink)
  • ISO 8601 strings (деякі REST API)
  • Relative ("2 hours ago") — у social data scraping
  • Timezone-aware vs naive datetimes

Суми та ціни:

  • Wei (10^-18 ETH) — on-chain Ethereum
  • Lamports (10^-9 SOL) — on-chain Solana
  • String з decimals ("1234.567890") — Binance REST
  • Integer з fixed decimals (100000000 = 1 BTC у деяких бірж)
  • Float64 — втрата точності на великих числах

Ідентифікатори активів:

  • BTCUSDT (Binance), BTC-USDT (OKX), BTC/USDT (ccxt standard), tBTCUST (Bitfinex)
  • ERC-20 address (0x2260fac...) vs ticker (WBTC)
  • CoinGecko ID ("bitcoin") vs CMC ID (1)

Числові формати:

  • null vs "0" vs 0 vs відсутність поля — для нульових обсягів
  • -0.0 — валідне значення в Python/JS float, неочевидне поведення при порівнянні
  • NaN — іноді зустрічається в JSON від сторонніх API

Архітектура нормалізаційного шару

Система складається із трьох частин:

Raw Data (from scrapers)
        ↓
[Validation Layer]   — відбираємо невалідні записи, логуємо помилки
        ↓
[Transformation Layer] — приводимо до єдиного формату
        ↓
[Enrichment Layer]   — додаємо derived поля (USD-вартість, нормалізований тикер)
        ↓
Normalized Storage

Validation Layer

Перед трансформацією — явна валідація вхідних даних. Pydantic v2 для Python:

from pydantic import BaseModel, field_validator, model_validator
from decimal import Decimal
from datetime import datetime

class RawTradeEvent(BaseModel):
    """Схема для сирих trade подій від будь-якої біржи"""
    exchange: str
    raw_symbol: str
    raw_price: str | float | int
    raw_quantity: str | float | int
    raw_timestamp: int | str | float
    side: str  # 'buy'/'sell' або 'BUY'/'SELL' або 1/2
    raw_trade_id: str | int

    @field_validator('raw_price', 'raw_quantity', mode='before')
    @classmethod
    def coerce_to_string(cls, v):
        # Завжди конвертуємо в string перед Decimal — уникаємо float precision loss
        if isinstance(v, float):
            return f"{v:.10f}"
        return str(v)

    @field_validator('side', mode='before')
    @classmethod
    def normalize_side(cls, v):
        s = str(v).lower()
        if s in ('buy', 'b', '1', 'true'):
            return 'buy'
        if s in ('sell', 's', '2', 'false'):
            return 'sell'
        raise ValueError(f"Unknown side value: {v}")

Невалідні записи не падають весь pipeline — вони логуються в окремі таблицю validation_errors для розбору:

async def process_batch(raw_records: list[dict]) -> tuple[list, list]:
    valid, errors = [], []
    for record in raw_records:
        try:
            validated = RawTradeEvent(**record)
            valid.append(validated)
        except ValidationError as e:
            errors.append({
                "raw": record,
                "error": e.errors(),
                "timestamp": datetime.utcnow(),
                "source": record.get("exchange", "unknown"),
            })
    return valid, errors

Transformation Layer

Приведення до канонічного формату:

@dataclass
class NormalizedTrade:
    exchange: str
    symbol: str           # canonical: "BTC/USDT"
    price: Decimal        # завжди Decimal, без float
    quantity: Decimal
    quote_quantity: Decimal  # price * quantity
    side: str             # 'buy' або 'sell'
    timestamp: datetime   # UTC timezone-aware
    trade_id: str         # унікальна в межах біржі

def normalize_timestamp(raw: int | str | float) -> datetime:
    """Приводить будь-який timestamp до UTC datetime"""
    if isinstance(raw, str):
        dt = datetime.fromisoformat(raw.replace('Z', '+00:00'))
        return dt.astimezone(timezone.utc)

    ts = float(raw)
    # Автооцінювання: ms vs seconds
    # timestamps у ms > 1e12 (секунди у 2033 = ~2e9)
    if ts > 1e12:
        ts = ts / 1000
    return datetime.fromtimestamp(ts, tz=timezone.utc)

def parse_decimal(value: str) -> Decimal:
    """Безпечна конвертація в Decimal"""
    try:
        d = Decimal(str(value))
        if d.is_nan() or d.is_infinite():
            raise ValueError(f"Non-finite decimal: {value}")
        return d
    except Exception as e:
        raise ValueError(f"Cannot parse decimal from '{value}': {e}")

Symbol normalization

Картування тикерів між біржами — використовуємо ccxt-сумісний формат BASE/QUOTE:

SYMBOL_MAPPINGS = {
    "binance": {
        "BTCUSDT": "BTC/USDT",
        "ETHUSDT": "ETH/USDT",
        "BTCBUSD": "BTC/BUSD",
    },
    "okx": {
        "BTC-USDT": "BTC/USDT",
        "BTC-USDT-SWAP": "BTC/USDT:USDT",  # perpetual
    },
}

def normalize_symbol(raw_symbol: str, exchange: str) -> str:
    exchange_map = SYMBOL_MAPPINGS.get(exchange, {})
    if raw_symbol in exchange_map:
        return exchange_map[raw_symbol]
    # Fallback: пробуємо split по загальним розділювачам
    for quote in ['USDT', 'USDC', 'BTC', 'ETH']:
        if raw_symbol.endswith(quote):
            base = raw_symbol[:-len(quote)]
            return f"{base}/{quote}"
    raise ValueError(f"Cannot normalize symbol '{raw_symbol}' for '{exchange}'")

On-chain специфіка: Decimal normalization

Для токенів з різною кількістю decimals:

async def normalize_token_amount(
    raw_amount: int,          # wei або аналог
    token_address: str,
    network: str,
) -> Decimal:
    decimals = await token_decimals_cache.get(token_address, network)
    # Використовуємо Decimal для точного ділення
    divisor = Decimal(10) ** decimals
    return Decimal(raw_amount) / divisor

# Кеш decimals — вони immutable, агресивно кешуємо
class TokenDecimalsCache:
    def __init__(self):
        self._cache: dict[tuple, int] = {}

    async def get(self, address: str, network: str) -> int:
        key = (address.lower(), network)
        if key not in self._cache:
            decimals = await self._fetch_decimals(address, network)
            self._cache[key] = decimals
        return self._cache[key]

Schema Registry: Версіонування форматів

Джерела даних змінюються. Binance оновив API — додалося поле, змінився формат timestamp. Без версіонування схем — сломається вся нормалізація.

Рішення: schema registry (аналог Confluent Schema Registry для Kafka):

SCHEMA_VERSIONS = {
    "binance_trade": {
        "v1": BinanceTradeV1Schema,   # до 2023-01
        "v2": BinanceTradeV2Schema,   # після 2023-01: додано quoteQty
    }
}

# Кожен запис зберігає версію схеми джерела
# raw_data: {"_schema": "binance_trade:v2", "T": 1234567890123, "p": "50000.00", ...}

Моніторинг якості даних

Нормалізація без мониторингу — ілюзія якості. Ключові метрики:

-- Процент помилок валідації по джерелу за останню годину
SELECT
    source,
    COUNT(*) FILTER (WHERE status = 'error') AS errors,
    COUNT(*) AS total,
    ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error') / COUNT(*), 2) AS error_rate_pct
FROM normalization_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY source
ORDER BY error_rate_pct DESC;

Алерт при error_rate > 5% для будь-якого джерела — значить змінився формат даних та потрібно оновити схему.

Cross-source consistency check: одна й та ж ціна BTC в один час не повинна розходитися між біржами більше ніж на 0.5%. Якщо розходится — або помилка нормалізації, або реальний арбітраж.

Технологічний стек

Компонент Вибір
Валідація схем Pydantic v2 (Python) або Zod (TypeScript)
Обробка числових значень Python decimal.Decimal, PostgreSQL numeric
Черга Redis Streams або Kafka
Зберігання PostgreSQL (normalized) + S3 (raw backup)
Schema registry Custom або Confluent
Моніторинг якості dbt tests + Prometheus

Сирі дані завжди зберігаємо в S3 до нормалізації. Якщо виявлена помилка в логіці нормалізації — можна перепрогнати по вихідним даних без повторного сбору.

Розробка нормалізаційного шару для 5-7 джерел — 1-2 тижні. Складність зростає нелінійно з кількістю джерел: кожне нове джерело додає свої edge cases.