Разработка системы нормализации граббинг-данных с разных источников

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка системы нормализации граббинг-данных с разных источников
Средний
~3-5 дней
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1286
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка системы нормализации парсинг-данных в единый стандарт

Когда данные приходят из пяти бирж, трёх блокчейн-сетей и двух социальных платформ — каждый источник присылает их в своём формате. Binance возвращает timestamps в миллисекундах, OKX — в секундах, Telegram — в UTC datetime, on-chain данные — в Unix секундах из блока. Суммы везде разные: где-то wei, где-то Gwei, где-то string с плавающей точкой. Нормализация — это не скучная задача, это то, что делает данные использованием, а не болью.

Проблемы гетерогенных данных

Перечислим конкретные расхождения, которые встречаются в реальных проектах:

Временные метки:

  • Unix milliseconds (Binance, most CEX)
  • Unix seconds (Ethereum blocks, Chainlink)
  • ISO 8601 strings (некоторые REST API)
  • Relative ("2 hours ago") — в social data scraping
  • Timezone-aware vs naive datetimes

Суммы и цены:

  • Wei (10^-18 ETH) — on-chain Ethereum
  • Lamports (10^-9 SOL) — on-chain Solana
  • String с decimals ("1234.567890") — Binance REST
  • Integer с fixed decimals (100000000 = 1 BTC у некоторых бирж)
  • Float64 — потеря точности на больших числах

Идентификаторы активов:

  • BTCUSDT (Binance), BTC-USDT (OKX), BTC/USDT (ccxt standard), tBTCUST (Bitfinex)
  • ERC-20 address (0x2260fac...) vs ticker (WBTC)
  • CoinGecko ID ("bitcoin") vs CMC ID (1)

Числовые форматы:

  • null vs "0" vs 0 vs отсутствие поля — для нулевых объёмов
  • -0.0 — валидное значение в Python/JS float, неочевидное поведение при сравнении
  • NaN — иногда встречается в JSON от сторонних API

Архитектура нормализационного слоя

Система состоит из трёх частей:

Raw Data (from scrapers)
        ↓
[Validation Layer]   — отбрасываем невалидные записи, логируем ошибки
        ↓
[Transformation Layer] — приводим к единому формату
        ↓
[Enrichment Layer]   — добавляем derived поля (USD-стоимость, нормализованный тикер)
        ↓
Normalized Storage

Validation Layer

Перед трансформацией — явная валидация входных данных. Pydantic v2 для Python:

from pydantic import BaseModel, field_validator, model_validator
from decimal import Decimal
from datetime import datetime
from typing import Optional

class RawTradeEvent(BaseModel):
    """Схема для сырых trade событий от любой биржи"""
    exchange: str
    raw_symbol: str
    raw_price: str | float | int
    raw_quantity: str | float | int
    raw_timestamp: int | str | float
    side: str  # 'buy'/'sell' или 'BUY'/'SELL' или 1/2
    raw_trade_id: str | int

    @field_validator('raw_price', 'raw_quantity', mode='before')
    @classmethod
    def coerce_to_string(cls, v):
        # Всегда конвертируем в string перед Decimal — избегаем float precision loss
        if isinstance(v, float):
            return f"{v:.10f}"
        return str(v)

    @field_validator('side', mode='before')
    @classmethod
    def normalize_side(cls, v):
        s = str(v).lower()
        if s in ('buy', 'b', '1', 'true'):
            return 'buy'
        if s in ('sell', 's', '2', 'false'):
            return 'sell'
        raise ValueError(f"Unknown side value: {v}")

Невалидные записи не падают весь pipeline — они логируются в отдельную таблицу validation_errors для разбора:

async def process_batch(raw_records: list[dict]) -> tuple[list, list]:
    valid, errors = [], []
    for record in raw_records:
        try:
            validated = RawTradeEvent(**record)
            valid.append(validated)
        except ValidationError as e:
            errors.append({
                "raw": record,
                "error": e.errors(),
                "timestamp": datetime.utcnow(),
                "source": record.get("exchange", "unknown"),
            })
    return valid, errors

Transformation Layer

Приведение к каноническому формату:

from dataclasses import dataclass
from decimal import Decimal, ROUND_DOWN
from datetime import datetime, timezone

@dataclass
class NormalizedTrade:
    exchange: str
    symbol: str           # canonical: "BTC/USDT"
    price: Decimal        # всегда Decimal, никаких float
    quantity: Decimal
    quote_quantity: Decimal  # price * quantity
    side: str             # 'buy' или 'sell'
    timestamp: datetime   # UTC timezone-aware
    trade_id: str         # строка, уникальна в рамках биржи

def normalize_trade(raw: RawTradeEvent) -> NormalizedTrade:
    return NormalizedTrade(
        exchange=raw.exchange,
        symbol=normalize_symbol(raw.raw_symbol, raw.exchange),
        price=parse_decimal(raw.raw_price),
        quantity=parse_decimal(raw.raw_quantity),
        quote_quantity=parse_decimal(raw.raw_price) * parse_decimal(raw.raw_quantity),
        side=raw.side,
        timestamp=normalize_timestamp(raw.raw_timestamp),
        trade_id=str(raw.raw_trade_id),
    )

def normalize_timestamp(raw: int | str | float) -> datetime:
    """Приводит любой timestamp к UTC datetime"""
    if isinstance(raw, str):
        # ISO 8601 string
        dt = datetime.fromisoformat(raw.replace('Z', '+00:00'))
        return dt.astimezone(timezone.utc)

    ts = float(raw)
    # Автоопределение: ms vs seconds
    # timestamps в ms > 1e12 (секунды в 2033 году = ~2e9)
    if ts > 1e12:
        ts = ts / 1000
    return datetime.fromtimestamp(ts, tz=timezone.utc)

def parse_decimal(value: str) -> Decimal:
    """Безопасная конвертация в Decimal"""
    try:
        d = Decimal(str(value))
        if d.is_nan() or d.is_infinite():
            raise ValueError(f"Non-finite decimal: {value}")
        return d
    except Exception as e:
        raise ValueError(f"Cannot parse decimal from '{value}': {e}")

Symbol normalization

Маппинг тикеров между биржами — отдельная задача. Используем ccxt-совместимый формат BASE/QUOTE:

SYMBOL_MAPPINGS = {
    "binance": {
        "BTCUSDT": "BTC/USDT",
        "ETHUSDT": "ETH/USDT",
        "BTCBUSD": "BTC/BUSD",
        # ...
    },
    "okx": {
        "BTC-USDT": "BTC/USDT",
        "BTC-USDT-SWAP": "BTC/USDT:USDT",  # perpetual
    },
    "bybit": {
        "BTCUSDT": "BTC/USDT",
        "BTCPERP": "BTC/USDT:USDT",
    },
}

def normalize_symbol(raw_symbol: str, exchange: str) -> str:
    exchange_map = SYMBOL_MAPPINGS.get(exchange, {})
    if raw_symbol in exchange_map:
        return exchange_map[raw_symbol]
    # Fallback: пробуем split по общим разделителям
    for sep in ['-', '_', '']:
        if sep in raw_symbol or sep == '':
            # Эвристика: последние 4 символа = quote (USDT, USDC, BTC)
            for quote in ['USDT', 'USDC', 'BTC', 'ETH', 'BNB']:
                if raw_symbol.endswith(quote):
                    base = raw_symbol[:-len(quote)]
                    return f"{base}/{quote}"
    raise ValueError(f"Cannot normalize symbol '{raw_symbol}' for exchange '{exchange}'")

On-chain специфика: decimal normalization

Для токенов с разным количеством decimals:

async def normalize_token_amount(
    raw_amount: int,          # wei или аналог
    token_address: str,
    network: str,
) -> Decimal:
    decimals = await token_decimals_cache.get(token_address, network)
    # Используем Decimal для точного деления
    divisor = Decimal(10) ** decimals
    return Decimal(raw_amount) / divisor

# Кэш decimals — они immutable, агрессивно кэшируем
class TokenDecimalsCache:
    def __init__(self):
        self._cache: dict[tuple, int] = {}

    async def get(self, address: str, network: str) -> int:
        key = (address.lower(), network)
        if key not in self._cache:
            decimals = await self._fetch_decimals(address, network)
            self._cache[key] = decimals
        return self._cache[key]

Schema Registry: версионирование форматов

Источники данных меняются. Binance обновил API — добавилось поле, изменился формат timestamp. Без версионирования схем — сломается вся нормализация.

Решение: schema registry (аналог Confluent Schema Registry для Kafka, или кастомный):

SCHEMA_VERSIONS = {
    "binance_trade": {
        "v1": BinanceTradeV1Schema,   # до 2023-01
        "v2": BinanceTradeV2Schema,   # после 2023-01: добавлен quoteQty
    }
}

def get_schema(source: str, version: str):
    return SCHEMA_VERSIONS[source][version]

# Каждая запись хранит версию схемы источника
# raw_data: {"_schema": "binance_trade:v2", "T": 1234567890123, "p": "50000.00", ...}

Мониторинг качества данных

Нормализация без мониторинга — это иллюзия качества. Ключевые метрики:

-- Процент ошибок валидации по источнику за последний час
SELECT
    source,
    COUNT(*) FILTER (WHERE status = 'error') AS errors,
    COUNT(*) AS total,
    ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error') / COUNT(*), 2) AS error_rate_pct
FROM normalization_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY source
ORDER BY error_rate_pct DESC;

Алерт при error_rate > 5% для любого источника — значит изменился формат данных и нужно обновить схему.

Cross-source consistency check: одна и та же цена BTC в одно время не должна расходиться между биржами более чем на 0.5%. Если расходится — либо ошибка нормализации, либо реальный арбитраж.

Технологический стек

Компонент Выбор
Валидация схем Pydantic v2 (Python) или Zod (TypeScript)
Обработка числовых значений Python decimal.Decimal, PostgreSQL numeric
Очередь Redis Streams или Kafka
Хранение PostgreSQL (normalized) + raw backup в S3
Schema registry Custom или Confluent Schema Registry
Мониторинг качества dbt tests + Prometheus метрики

Сырые данные всегда сохраняем в S3 до нормализации. Если обнаружена ошибка в логике нормализации — можно перепрогнать по исходным данным без повторного сбора.

Разработка нормализационного слоя для 5-7 источников — 1-2 недели. Сложность растёт нелинейно с числом источников: каждый новый источник добавляет свои edge cases.