Парсинг даних з мемпула
Мемпул — це черга неподтвердженних транзакцій, видима будь-якому вузлу мережі. Це вихідне сирже для MEV-ботів, систем моніторингу ризиків, детектування front-running, арбітражних стратегій та аналітики ринку. Різниця між "дивитися на підтверджені транзакції" та "дивитися на мемпул" — це різниця між пост-фактум аналізом та можливістю діяти до включення в блок.
Що таке мемпул з технічної точки зору
На Ethereum кожна full node веде власний txpool — in-memory структуру неподтвердженних транзакцій. eth_getTransactionByHash на pending транзакцію повертає дані до включення в блок. txpool_content (Geth-специфічний endpoint) повертає весь мемпул вузла в один запрос.
Мемпул не глобальний — різні ноди бачать різні підмножини транзакцій залежно від P2P топології. Транзакція поширюється через gossip протокол, але не миттєво. Якщо транзакція відправлена через один RPC, інший RPC може її не бачити протягом кількох секунд.
Для MEV-чутливих застосунків важливо розуміти: існує private mempool (Flashbots, MEV Blocker, block builder'и) — транзакції йдуть напрямки до builder минаючи публічний мемпул. Такі транзакції не видні ніякому мемпул-мониторингу.
Методи підключення до мемпула
eth_subscribe("pendingTransactions")
Найпростіший метод — WebSocket підписка:
import { createPublicClient, webSocket } from 'viem';
const client = createPublicClient({
transport: webSocket('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'),
});
// Тільки хеші транзакцій
const unwatch = client.watchPendingTransactions({
onTransactions: async (hashes) => {
for (const hash of hashes) {
// Для кожного хеша потрібен окремий запрос за деталями
const tx = await client.getTransaction({ hash });
await processPendingTx(tx);
}
},
});
Проблема: при високому трафіку (Ethereum mainnet — 50-200 pending транзакцій на секунду) — не встигаємо запитувати деталі кожної транзакції, накопичується черга.
eth_subscribe("newPendingTransactions") з full tx body
Деякі ноди підтримують розширений режим з повним телом транзакції:
import asyncio
import json
import websockets
async def subscribe_mempool_full():
async with websockets.connect("wss://your-private-node:8546") as ws:
# Стандартна Geth підписка з includeTransactions=true
await ws.send(json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_subscribe",
"params": ["newPendingTransactions", True] # True = включити full tx
}))
ack = json.loads(await ws.recv())
subscription_id = ack["result"]
async for raw in ws:
msg = json.loads(raw)
if "params" in msg:
tx = msg["params"]["result"]
await process_transaction(tx)
Не всі провайдери підтримують True флаг. Alchemy та Infura — так. QuickNode — так. Публічний Infura без ключа — ні.
txpool_content та txpool_inspect
Для снимку всього мемпула в один момент часу:
import httpx
import asyncio
async def snapshot_mempool(rpc_url: str) -> dict:
"""Повний снимок мемпула — тільки для власної ноди!"""
async with httpx.AsyncClient() as client:
resp = await client.post(rpc_url, json={
"jsonrpc": "2.0",
"method": "txpool_content",
"params": [],
"id": 1
})
data = resp.json()["result"]
pending_txs = []
for sender, nonce_map in data["pending"].items():
for nonce, tx in nonce_map.items():
pending_txs.append({
**tx,
"status": "pending",
"from": sender,
})
queued_txs = []
for sender, nonce_map in data["queued"].items():
for nonce, tx in nonce_map.items():
queued_txs.append({**tx, "status": "queued", "from": sender})
return {"pending": pending_txs, "queued": queued_txs}
txpool_content може повертати десятки тисяч транзакцій — це тяжкий запрос, не робіть його частіше ніж раз на секунду.
Декодування calldata
Raw calldata — це ABI-encoded вихов функції. Перші 4 байти (function selector) = keccak256 перших 4 байт сигнатури функції. Решта — ABI-encoded аргументи.
from eth_abi import decode
from eth_utils import function_abi_to_4byte_selector
import json
# Завантажуємо ABI Uniswap V2 Router
with open('uniswap_v2_router_abi.json') as f:
abi = json.load(f)
# Будуємо маппінг selector → function
selector_to_func = {}
for func in abi:
if func['type'] == 'function':
selector = function_abi_to_4byte_selector(func).hex()
selector_to_func[selector] = func
def decode_calldata(calldata: str) -> dict | None:
if len(calldata) < 10:
return None
selector = calldata[2:10] # убираємо '0x'
func = selector_to_func.get(selector)
if not func:
return None
input_types = [inp['type'] for inp in func['inputs']]
decoded = decode(input_types, bytes.fromhex(calldata[10:]))
return {
'function': func['name'],
'args': dict(zip([i['name'] for i in func['inputs']], decoded))
}
# Приклад: декодування Uniswap V2 swapExactTokensForTokens
tx_data = "0x38ed1739000000000000000000000000000000000000000000000000..."
result = decode_calldata(tx_data)
# {'function': 'swapExactTokensForTokens', 'args': {'amountIn': 1000, ...}}
Для невідомих selectors — база даних 4byte.directory містить 1M+ сигнатур:
async def lookup_selector(selector: str) -> str | None:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://www.4byte.directory/api/v1/signatures/?hex_signature=0x{selector}")
results = resp.json().get("results", [])
return results[0]["text_signature"] if results else None
Архітектура висомопродуктивного мемпул-монітора
При серйозному навантаженні (full Ethereum mempool = 50-300 tx/sec) вузьке місце — не мережа, а обробка. Архітектура:
[Multiple P2P Nodes] → [Raw Tx Stream]
↓
[Decoder Worker Pool] ← N процесів
/ | \
[Kafka Topic: decoded_txs]
/ | \
[MEV Detector] [Volume Monitor] [Alert Engine]
↓
[TimescaleDB / ClickHouse]
Kafka (або Redis Streams) як шина даних критична: декодування та зберігання повинні бути асинхронні від отримання. Якщо БД тормозить — не втрачаємо вхідні дані.
Власна нода обов'язкова для серйозного мемпул-мониторингу. Публічні RPC (Alchemy, Infura) дросселюють pending subscriptions та не дають txpool_content. Для Ethereum: Geth або Reth нода з 32GB RAM + NVMe SSD.
P2P підключення (devp2p)
Найнижчорівневий та повний доступ — підключення до Ethereum P2P мережі напрямки як non-mining peer:
# libp2p / devp2p бібліотеки для Python: py-evm, pydevp2p
# Більш практично використовувати готові рішення:
# - Blocknative Mempool Explorer API
# - TxStreet
# - EigenPhi (для MEV аналізу)
# Або Rust: reth з кастомними hooks для мемпул подій
На рівні P2P видні транзакції до того як вони досягли RPC провайдерів. Для MEV-ботів перші 100мс — це конкурентна перевага.
Специфіка інших мереж
Solana. Немає мемпула в традиційному сенсі. Транзакції йдуть напрямки до лідера-валідатора через QUIC. Публічного мемпул API немає. Моніторинг через getSignaturesForAddress з polling — це post-confirmation. Для pre-confirmation потрібно підключатися як gRPC клієнт до validator через Jito MEV infra або аналоги.
Bitcoin. getrawmempool RPC повертає список txid всіх pending транзакцій. getmempoolentry — деталі конкретної. ZMQ subscription (zmqpubrawtx) — события про нові транзакції у реальному часі:
import zmq
import asyncio
context = zmq.asyncio.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:28333")
sock.subscribe(b"rawtx")
async def monitor_bitcoin_mempool():
while True:
_, raw_tx = await sock.recv_multipart()
tx = decode_bitcoin_tx(raw_tx)
await process_bitcoin_pending_tx(tx)
TON. Шардована архітектура ускладнює мемпул-моніторинг — транзакції обробляються паралельно у різних шардах. TonCenter API надає endpoint для pending транзакцій конкретного аккаунта.
Детектування MEV паттернів
Практичне застосування мемпул-даних — детектування sandwich атак та arbitrage:
def detect_sandwich_setup(pending_txs: list[dict]) -> list[dict]:
"""Шукаємо потенційні sandwich атаки: крупний своп у мемпулі"""
candidates = []
for tx in pending_txs:
decoded = decode_calldata(tx['input'])
if not decoded:
continue
# Uniswap V2 / V3 своп
if decoded['function'] in ['swapExactTokensForTokens', 'exactInputSingle']:
amount_in_usd = get_usd_value(decoded['args'])
if amount_in_usd > SANDWICH_THRESHOLD_USD: # наприклад $50K+
candidates.append({
'tx_hash': tx['hash'],
'from': tx['from'],
'function': decoded['function'],
'amount_usd': amount_in_usd,
'gas_price': int(tx.get('maxFeePerGas', tx.get('gasPrice', 0)), 16),
})
return candidates
Зберігання та retention policy
Мемпул генерує величезні обсяги даних. На Ethereum mainnet — кілька GB на день при повному логуванні. Рекомендована політика:
| Дані | Retention | Хранилище |
|---|---|---|
| Confirmed tx метаданні | Бессрочно | PostgreSQL |
| Pending tx (до confirmation) | 24 години | Redis + periodic flush |
| Full calldata pending | 72 години | ClickHouse (колоночне) |
| Confirmed OHLCV агрегати | Бессрочно | TimescaleDB |
| Відброшені транзакції (dropped, replaced) | 7 днів | PostgreSQL |
Транзакції, які у кінці-кінців не потрапили в блок (dropped, replaced by higher gas) — окрема категорія. Їх зберігання корисно для аналізу gas wars та заміни транзакцій.
Моніторинг системи
Ключові метрики мемпул-монітора:
- Mempool lag — затримка між отриманням tx та записом у БД. Алерт при > 500мс
- Decoder throughput — транзакцій/сек. Повинен покривати вхідний потік
- Dropped messages — втрати у Kafka/Redis Streams черзі
- Node connectivity — статус WebSocket з'єднання до кожної ноди
- Pending tx count — аномальне зростання (>200K pending) = congestion event







