Граббінг даних з мемпулу

Проєктуємо та розробляємо блокчейн-рішення повного циклу: від архітектури смарт-контрактів до запуску DeFi-протоколів, NFT-маркетплейсів та криптобірж. Аудит безпеки, токеноміка, інтеграція з наявною інфраструктурою.
Показано 1 з 1Усі 1306 послуг
Граббінг даних з мемпулу
Складний
~3-5 днів
Часті запитання

Напрямки блокчейн-розробки

Етапи блокчейн-розробки

Останні роботи

  • image_website-b2b-advance_0.webp
    Розробка сайту компанії B2B ADVANCE
    1286
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    902
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1122
  • image_logo-advance_0.webp
    Розробка логотипу компанії B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    859

Парсинг даних з мемпула

Мемпул — це черга неподтвердженних транзакцій, видима будь-якому вузлу мережі. Це вихідне сирже для MEV-ботів, систем моніторингу ризиків, детектування front-running, арбітражних стратегій та аналітики ринку. Різниця між "дивитися на підтверджені транзакції" та "дивитися на мемпул" — це різниця між пост-фактум аналізом та можливістю діяти до включення в блок.

Що таке мемпул з технічної точки зору

На Ethereum кожна full node веде власний txpool — in-memory структуру неподтвердженних транзакцій. eth_getTransactionByHash на pending транзакцію повертає дані до включення в блок. txpool_content (Geth-специфічний endpoint) повертає весь мемпул вузла в один запрос.

Мемпул не глобальний — різні ноди бачать різні підмножини транзакцій залежно від P2P топології. Транзакція поширюється через gossip протокол, але не миттєво. Якщо транзакція відправлена через один RPC, інший RPC може її не бачити протягом кількох секунд.

Для MEV-чутливих застосунків важливо розуміти: існує private mempool (Flashbots, MEV Blocker, block builder'и) — транзакції йдуть напрямки до builder минаючи публічний мемпул. Такі транзакції не видні ніякому мемпул-мониторингу.

Методи підключення до мемпула

eth_subscribe("pendingTransactions")

Найпростіший метод — WebSocket підписка:

import { createPublicClient, webSocket } from 'viem';

const client = createPublicClient({
  transport: webSocket('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'),
});

// Тільки хеші транзакцій
const unwatch = client.watchPendingTransactions({
  onTransactions: async (hashes) => {
    for (const hash of hashes) {
      // Для кожного хеша потрібен окремий запрос за деталями
      const tx = await client.getTransaction({ hash });
      await processPendingTx(tx);
    }
  },
});

Проблема: при високому трафіку (Ethereum mainnet — 50-200 pending транзакцій на секунду) — не встигаємо запитувати деталі кожної транзакції, накопичується черга.

eth_subscribe("newPendingTransactions") з full tx body

Деякі ноди підтримують розширений режим з повним телом транзакції:

import asyncio
import json
import websockets

async def subscribe_mempool_full():
    async with websockets.connect("wss://your-private-node:8546") as ws:
        # Стандартна Geth підписка з includeTransactions=true
        await ws.send(json.dumps({
            "jsonrpc": "2.0",
            "id": 1,
            "method": "eth_subscribe",
            "params": ["newPendingTransactions", True]  # True = включити full tx
        }))
        
        ack = json.loads(await ws.recv())
        subscription_id = ack["result"]
        
        async for raw in ws:
            msg = json.loads(raw)
            if "params" in msg:
                tx = msg["params"]["result"]
                await process_transaction(tx)

Не всі провайдери підтримують True флаг. Alchemy та Infura — так. QuickNode — так. Публічний Infura без ключа — ні.

txpool_content та txpool_inspect

Для снимку всього мемпула в один момент часу:

import httpx
import asyncio

async def snapshot_mempool(rpc_url: str) -> dict:
    """Повний снимок мемпула — тільки для власної ноди!"""
    async with httpx.AsyncClient() as client:
        resp = await client.post(rpc_url, json={
            "jsonrpc": "2.0",
            "method": "txpool_content",
            "params": [],
            "id": 1
        })
    
    data = resp.json()["result"]
    
    pending_txs = []
    for sender, nonce_map in data["pending"].items():
        for nonce, tx in nonce_map.items():
            pending_txs.append({
                **tx,
                "status": "pending",
                "from": sender,
            })
    
    queued_txs = []
    for sender, nonce_map in data["queued"].items():
        for nonce, tx in nonce_map.items():
            queued_txs.append({**tx, "status": "queued", "from": sender})
    
    return {"pending": pending_txs, "queued": queued_txs}

txpool_content може повертати десятки тисяч транзакцій — це тяжкий запрос, не робіть його частіше ніж раз на секунду.

Декодування calldata

Raw calldata — це ABI-encoded вихов функції. Перші 4 байти (function selector) = keccak256 перших 4 байт сигнатури функції. Решта — ABI-encoded аргументи.

from eth_abi import decode
from eth_utils import function_abi_to_4byte_selector
import json

# Завантажуємо ABI Uniswap V2 Router
with open('uniswap_v2_router_abi.json') as f:
    abi = json.load(f)

# Будуємо маппінг selector → function
selector_to_func = {}
for func in abi:
    if func['type'] == 'function':
        selector = function_abi_to_4byte_selector(func).hex()
        selector_to_func[selector] = func

def decode_calldata(calldata: str) -> dict | None:
    if len(calldata) < 10:
        return None
    
    selector = calldata[2:10]  # убираємо '0x'
    func = selector_to_func.get(selector)
    if not func:
        return None
    
    input_types = [inp['type'] for inp in func['inputs']]
    decoded = decode(input_types, bytes.fromhex(calldata[10:]))
    
    return {
        'function': func['name'],
        'args': dict(zip([i['name'] for i in func['inputs']], decoded))
    }

# Приклад: декодування Uniswap V2 swapExactTokensForTokens
tx_data = "0x38ed1739000000000000000000000000000000000000000000000000..."
result = decode_calldata(tx_data)
# {'function': 'swapExactTokensForTokens', 'args': {'amountIn': 1000, ...}}

Для невідомих selectors — база даних 4byte.directory містить 1M+ сигнатур:

async def lookup_selector(selector: str) -> str | None:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://www.4byte.directory/api/v1/signatures/?hex_signature=0x{selector}")
        results = resp.json().get("results", [])
        return results[0]["text_signature"] if results else None

Архітектура висомопродуктивного мемпул-монітора

При серйозному навантаженні (full Ethereum mempool = 50-300 tx/sec) вузьке місце — не мережа, а обробка. Архітектура:

[Multiple P2P Nodes] → [Raw Tx Stream]
                              ↓
                    [Decoder Worker Pool]  ← N процесів
                         /    |    \
              [Kafka Topic: decoded_txs]
               /         |         \
    [MEV Detector]  [Volume Monitor]  [Alert Engine]
                              ↓
                    [TimescaleDB / ClickHouse]

Kafka (або Redis Streams) як шина даних критична: декодування та зберігання повинні бути асинхронні від отримання. Якщо БД тормозить — не втрачаємо вхідні дані.

Власна нода обов'язкова для серйозного мемпул-мониторингу. Публічні RPC (Alchemy, Infura) дросселюють pending subscriptions та не дають txpool_content. Для Ethereum: Geth або Reth нода з 32GB RAM + NVMe SSD.

P2P підключення (devp2p)

Найнижчорівневий та повний доступ — підключення до Ethereum P2P мережі напрямки як non-mining peer:

# libp2p / devp2p бібліотеки для Python: py-evm, pydevp2p
# Більш практично використовувати готові рішення:
# - Blocknative Mempool Explorer API
# - TxStreet
# - EigenPhi (для MEV аналізу)
# Або Rust: reth з кастомними hooks для мемпул подій

На рівні P2P видні транзакції до того як вони досягли RPC провайдерів. Для MEV-ботів перші 100мс — це конкурентна перевага.

Специфіка інших мереж

Solana. Немає мемпула в традиційному сенсі. Транзакції йдуть напрямки до лідера-валідатора через QUIC. Публічного мемпул API немає. Моніторинг через getSignaturesForAddress з polling — це post-confirmation. Для pre-confirmation потрібно підключатися як gRPC клієнт до validator через Jito MEV infra або аналоги.

Bitcoin. getrawmempool RPC повертає список txid всіх pending транзакцій. getmempoolentry — деталі конкретної. ZMQ subscription (zmqpubrawtx) — события про нові транзакції у реальному часі:

import zmq
import asyncio

context = zmq.asyncio.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:28333")
sock.subscribe(b"rawtx")

async def monitor_bitcoin_mempool():
    while True:
        _, raw_tx = await sock.recv_multipart()
        tx = decode_bitcoin_tx(raw_tx)
        await process_bitcoin_pending_tx(tx)

TON. Шардована архітектура ускладнює мемпул-моніторинг — транзакції обробляються паралельно у різних шардах. TonCenter API надає endpoint для pending транзакцій конкретного аккаунта.

Детектування MEV паттернів

Практичне застосування мемпул-даних — детектування sandwich атак та arbitrage:

def detect_sandwich_setup(pending_txs: list[dict]) -> list[dict]:
    """Шукаємо потенційні sandwich атаки: крупний своп у мемпулі"""
    candidates = []
    
    for tx in pending_txs:
        decoded = decode_calldata(tx['input'])
        if not decoded:
            continue
        
        # Uniswap V2 / V3 своп
        if decoded['function'] in ['swapExactTokensForTokens', 'exactInputSingle']:
            amount_in_usd = get_usd_value(decoded['args'])
            
            if amount_in_usd > SANDWICH_THRESHOLD_USD:  # наприклад $50K+
                candidates.append({
                    'tx_hash': tx['hash'],
                    'from': tx['from'],
                    'function': decoded['function'],
                    'amount_usd': amount_in_usd,
                    'gas_price': int(tx.get('maxFeePerGas', tx.get('gasPrice', 0)), 16),
                })
    
    return candidates

Зберігання та retention policy

Мемпул генерує величезні обсяги даних. На Ethereum mainnet — кілька GB на день при повному логуванні. Рекомендована політика:

Дані Retention Хранилище
Confirmed tx метаданні Бессрочно PostgreSQL
Pending tx (до confirmation) 24 години Redis + periodic flush
Full calldata pending 72 години ClickHouse (колоночне)
Confirmed OHLCV агрегати Бессрочно TimescaleDB
Відброшені транзакції (dropped, replaced) 7 днів PostgreSQL

Транзакції, які у кінці-кінців не потрапили в блок (dropped, replaced by higher gas) — окрема категорія. Їх зберігання корисно для аналізу gas wars та заміни транзакцій.

Моніторинг системи

Ключові метрики мемпул-монітора:

  • Mempool lag — затримка між отриманням tx та записом у БД. Алерт при > 500мс
  • Decoder throughput — транзакцій/сек. Повинен покривати вхідний потік
  • Dropped messages — втрати у Kafka/Redis Streams черзі
  • Node connectivity — статус WebSocket з'єднання до кожної ноди
  • Pending tx count — аномальне зростання (>200K pending) = congestion event