Налаштування real-time граббінгу через WebSocket

Проєктуємо та розробляємо блокчейн-рішення повного циклу: від архітектури смарт-контрактів до запуску DeFi-протоколів, NFT-маркетплейсів та криптобірж. Аудит безпеки, токеноміка, інтеграція з наявною інфраструктурою.
Показано 1 з 1Усі 1306 послуг
Налаштування real-time граббінгу через WebSocket
Середній
~2-3 дні
Часті запитання

Напрямки блокчейн-розробки

Етапи блокчейн-розробки

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1286
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1122
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    859

Настройка real-time парсингу через WebSocket

Polling REST API раз в N секунд — неправильний інструмент для задач, які вимагають реакції на подій. При polling з інтервалом 1 секунда середня затримка виявлення подій — 0.5 секунди. WebSocket-підписка дає подію в момент її виникнення, затримка визначається тільки мережею (10–50мс до найближчого сервера біржи). Для моніторингу цін, orderbook та on-chain подій різниця принципіальна.

WebSocket протоколи бірж

Кожна біржа має свій протокол підписки. Паттерни схожі, деталі різняться.

Binance: підписка через JSON повідомлення, stream names у форматі symbol@streamType:

import asyncio
import json
import websockets

async def binance_stream(symbols: list[str]):
    streams = '/'.join([f"{s.lower()}@trade" for s in symbols])
    url = f"wss://stream.binance.com:9443/stream?streams={streams}"
    
    async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
        async for message in ws:
            data = json.loads(message)
            stream_data = data.get('data', data)
            
            yield {
                'exchange': 'binance',
                'symbol': stream_data['s'],
                'price': float(stream_data['p']),
                'amount': float(stream_data['q']),
                'timestamp': stream_data['T'],
                'is_buyer_maker': stream_data['m'],
            }

Coinbase Advanced Trade: використовує subscribe message з channel та product_ids:

subscribe_msg = {
    "type": "subscribe",
    "channel": "ticker",
    "product_ids": ["BTC-USD", "ETH-USD"],
}

Kraken: вимагає генерацію subscription ID та має особливості формату відповіді з парою в масиву.

Ethereum/EVM: web3.py та ethers.js WebSocket

On-chain подій через WebSocket підписки до Ethereum ноди (Alchemy, Infura, QuickNode або власна нода):

from web3 import AsyncWeb3, WebSocketProvider

async def subscribe_to_transfers(token_address: str):
    w3 = AsyncWeb3(WebSocketProvider(
        "wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
    ))
    
    # ERC-20 Transfer event signature hash
    transfer_sig = w3.keccak(text="Transfer(address,address,uint256)").hex()
    
    subscription_id = await w3.eth.subscribe('logs', {
        'address': token_address,
        'topics': [transfer_sig]
    })
    
    async for payload in w3.socket.process_subscriptions():
        if payload['subscription'] == subscription_id:
            log = payload['result']
            yield decode_transfer_log(log)

Ethereum JSON-RPC WebSocket підтримує три типи підписок: newHeads (нові блоки), logs (подій контрактів), newPendingTransactions (mempool транзакції).

Управління з'єднаннями: reconnect та heartbeat

WebSocket з'єднання розриваються з різних причин: timeout сервера, network hiccup, перезапуск сервісу. Production система повинна автоматично відновлюватися:

import asyncio
import websockets
from datetime import datetime

class RobustWebSocketClient:
    def __init__(self, url: str, reconnect_delay: float = 1.0):
        self.url = url
        self.reconnect_delay = reconnect_delay
        self.max_reconnect_delay = 60.0
        self.last_message_at = None
        self.stale_threshold = 30  # секунд без повідомлень = staleness
    
    async def connect_with_retry(self, on_message, on_subscribe):
        delay = self.reconnect_delay
        
        while True:
            try:
                async with websockets.connect(
                    self.url,
                    ping_interval=20,
                    ping_timeout=10,
                    close_timeout=5,
                ) as ws:
                    await on_subscribe(ws)
                    delay = self.reconnect_delay  # сбрасуємо при успіху
                    
                    async for msg in ws:
                        self.last_message_at = datetime.utcnow()
                        await on_message(msg)
                        
            except (websockets.ConnectionClosed, 
                    websockets.InvalidHandshake,
                    OSError) as e:
                print(f"Connection error: {e}, reconnecting in {delay}s")
                await asyncio.sleep(delay)
                delay = min(delay * 2, self.max_reconnect_delay)
    
    async def staleness_watchdog(self):
        """Детектує зависше з'єднання без явного розриву"""
        while True:
            await asyncio.sleep(10)
            if self.last_message_at:
                elapsed = (datetime.utcnow() - self.last_message_at).seconds
                if elapsed > self.stale_threshold:
                    raise RuntimeError(f"Connection stale: {elapsed}s without data")

Order book: incremental updates vs снапшоти

Більшість бірж надають orderbook через incremental updates — тільки змінені рівні. Локальне утримання актуального стану orderbook:

from sortedcontainers import SortedDict

class LocalOrderBook:
    def __init__(self):
        self.bids = SortedDict(lambda k: -k)  # descending
        self.asks = SortedDict()               # ascending
        self.last_update_id = 0
    
    def apply_snapshot(self, snapshot: dict):
        self.bids.clear()
        self.asks.clear()
        for price, qty in snapshot['bids']:
            self.bids[float(price)] = float(qty)
        for price, qty in snapshot['asks']:
            self.asks[float(price)] = float(qty)
        self.last_update_id = snapshot['lastUpdateId']
    
    def apply_update(self, update: dict):
        if update['u'] <= self.last_update_id:
            return  # застарілий update, ігноруємо
        
        for price, qty in update['b']:  # bids
            p, q = float(price), float(qty)
            if q == 0:
                self.bids.pop(p, None)
            else:
                self.bids[p] = q
        
        for price, qty in update['a']:  # asks
            p, q = float(price), float(qty)
            if q == 0:
                self.asks.pop(p, None)
            else:
                self.asks[p] = q
        
        self.last_update_id = update['u']
    
    def best_bid(self) -> tuple[float, float]:
        k = next(iter(self.bids))
        return k, self.bids[k]
    
    def best_ask(self) -> tuple[float, float]:
        k = next(iter(self.asks))
        return k, self.asks[k]

Важливо: при старті отримати снапшот через REST, потім застосовувати WebSocket updates починаючи з lastUpdateId > snapshotId. Обновлення до снапшота відбрасуються, пропуск у послідовності Uu вимагає повторного снапшота.

Масштабування: множество пар та бірж

Одна async event loop в Python справляється з 50–200 одночасними WebSocket з'єднаннями. Для більшого — кілька процесів або Go-сервіс (goroutines значительно легші за asyncio tasks).

Fanout результатів: оброблені повідомлення → Redis Pub/Sub або Kafka для downstream consumers. WebSocket handler повинен мінімально обробляти дані та швидко публікувати — важку обробку робить окремий consumer.

async def handle_message(raw: str, redis_client):
    data = json.loads(raw)
    normalized = normalize_trade(data)
    
    # Швидка публікація — не блокуємо event loop
    await redis_client.publish(
        f"trades:{normalized['exchange']}:{normalized['symbol']}",
        json.dumps(normalized)
    )

Моніторинг здоров'я

Метрики для кожного WebSocket з'єднання: messages per second, reconnect count, last message timestamp, lag від біржевого timestamp до processing timestamp. Grafana + Prometheus alerting на stale connections (> 60 сек без повідомлень по активній парі).

Настройка real-time парсингу для 3–5 бірж з мониторингом 20–50 пар, reconnect логікою та публікацією в Redis/Kafka — 1–2 тижні.