Парсинг данных из мемпула
Мемпул — это очередь неподтверждённых транзакций, видимая любому узлу сети. Это исходное сырьё для MEV-ботов, систем мониторинга рисков, front-running detection, арбитражных стратегий и аналитики рынка. Разница между "смотреть на подтверждённые транзакции" и "смотреть на мемпул" — это разница между пост-фактум анализом и возможностью действовать до включения в блок.
Что такое мемпул с технической точки зрения
В Ethereum каждый full node ведёт собственный txpool — in-memory структуру неподтверждённых транзакций. eth_getTransactionByHash на pending транзакцию возвращает данные до включения в блок. txpool_content (Geth-специфичный endpoint) возвращает весь мемпул узла в один запрос.
Мемпул не глобальный — разные ноды видят разные подмножества транзакций в зависимости от P2P топологии. Транзакция распространяется через gossip протокол, но не мгновенно. Если транзакция отправлена через один RPC, другой RPC может её не видеть в течение нескольких секунд.
Для MEV-чувствительных приложений важно понимать: существует private mempool (Flashbots, MEV Blocker, блочные builder'ы) — транзакции идут напрямую к builder минуя публичный мемпул. Такие транзакции не видны никаким мемпул-мониторингом.
Методы подключения к мемпулу
eth_subscribe("pendingTransactions")
Самый простой метод — WebSocket подписка:
import { createPublicClient, webSocket } from 'viem';
const client = createPublicClient({
transport: webSocket('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'),
});
// Только хеши транзакций
const unwatch = client.watchPendingTransactions({
onTransactions: async (hashes) => {
for (const hash of hashes) {
// Для каждого хеша нужен отдельный запрос за деталями
const tx = await client.getTransaction({ hash });
await processPendingTx(tx);
}
},
});
Проблема: при высоком трафике (Ethereum mainnet — 50-200 pending транзакций в секунду) — не успеваем запрашивать детали каждой транзакции, накапливается очередь.
eth_subscribe("newPendingTransactions") с full tx body
Некоторые ноды поддерживают расширенный режим с полным телом транзакции:
import asyncio
import json
import websockets
async def subscribe_mempool_full():
async with websockets.connect("wss://your-private-node:8546") as ws:
# Стандартная Geth подписка с includeTransactions=true
await ws.send(json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_subscribe",
"params": ["newPendingTransactions", True] # True = include full tx
}))
ack = json.loads(await ws.recv())
subscription_id = ack["result"]
async for raw in ws:
msg = json.loads(raw)
if "params" in msg:
tx = msg["params"]["result"]
await process_transaction(tx)
Не все провайдеры поддерживают True флаг. Alchemy и Infura — да. QuickNode — да. Публичный Infura без ключа — нет.
txpool_content и txpool_inspect
Для снимка всего мемпула в один момент времени:
import httpx
import asyncio
async def snapshot_mempool(rpc_url: str) -> dict:
"""Полный снимок мемпула — только для собственной ноды!"""
async with httpx.AsyncClient() as client:
resp = await client.post(rpc_url, json={
"jsonrpc": "2.0",
"method": "txpool_content",
"params": [],
"id": 1
})
data = resp.json()["result"]
pending_txs = []
for sender, nonce_map in data["pending"].items():
for nonce, tx in nonce_map.items():
pending_txs.append({
**tx,
"status": "pending",
"from": sender,
})
queued_txs = []
for sender, nonce_map in data["queued"].items():
for nonce, tx in nonce_map.items():
queued_txs.append({**tx, "status": "queued", "from": sender})
return {"pending": pending_txs, "queued": queued_txs}
txpool_content может возвращать десятки тысяч транзакций — это тяжёлый запрос, не делайте его чаще раза в секунду.
Декодирование calldata
Raw calldata — это ABI-encoded вызов функции. Первые 4 байта (function selector) = keccak256 первых 4 байт сигнатуры функции. Остальное — ABI-encoded аргументы.
from eth_abi import decode
from eth_utils import function_abi_to_4byte_selector
import json
# Загрузка ABI Uniswap V2 Router
with open('uniswap_v2_router_abi.json') as f:
abi = json.load(f)
# Строим маппинг selector → function
selector_to_func = {}
for func in abi:
if func['type'] == 'function':
selector = function_abi_to_4byte_selector(func).hex()
selector_to_func[selector] = func
def decode_calldata(calldata: str) -> dict | None:
if len(calldata) < 10:
return None
selector = calldata[2:10] # убираем '0x'
func = selector_to_func.get(selector)
if not func:
return None
input_types = [inp['type'] for inp in func['inputs']]
decoded = decode(input_types, bytes.fromhex(calldata[10:]))
return {
'function': func['name'],
'args': dict(zip([i['name'] for i in func['inputs']], decoded))
}
# Пример: декодирование Uniswap V2 swapExactTokensForTokens
tx_data = "0x38ed1739000000000000000000000000000000000000000000000000..."
result = decode_calldata(tx_data)
# {'function': 'swapExactTokensForTokens', 'args': {'amountIn': 1000000, ...}}
Для неизвестных selectors — база данных 4byte.directory содержит 1M+ сигнатур:
async def lookup_selector(selector: str) -> str | None:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://www.4byte.directory/api/v1/signatures/?hex_signature=0x{selector}")
results = resp.json().get("results", [])
return results[0]["text_signature"] if results else None
Архитектура высокопроизводительного мемпул-монитора
При серьёзной нагрузке (full Ethereum mempool = 50-300 tx/sec) узкое место — это не сеть, а обработка. Архитектура:
[Multiple P2P Nodes] → [Raw Tx Stream]
↓
[Decoder Worker Pool] ← N процессов
/ | \
[Kafka Topic: decoded_txs]
/ | \
[MEV Detector] [Volume Monitor] [Alert Engine]
↓
[TimescaleDB / ClickHouse]
Kafka (или Redis Streams) как шина данных критична: декодирование и хранение должны быть асинхронны от получения. Если БД тормозит — не теряем входящие данные.
Собственная нода обязательна для серьёзного мемпул-мониторинга. Публичные RPC (Alchemy, Infura) дросселируют pending subscriptions и не дают txpool_content. Для Ethereum: Geth или Reth нода с 32GB RAM + NVMe SSD.
P2P подключение (devp2p)
Самый низкоуровневый и полный доступ — подключение к Ethereum P2P сети напрямую как non-mining peer:
# libp2p / devp2p библиотеки для Python: py-evm, pydevp2p
# Более практично использовать готовые решения:
# - Blocknative Mempool Explorer API
# - TxStreet
# - EigenPhi (для MEV анализа)
# Или Rust: reth с custom hooks для мемпул событий
На уровне P2P видны транзакции до того как они достигли RPC провайдеров. Для MEV-ботов первых 100мс — это конкурентное преимущество.
Специфика других сетей
Solana. Нет мемпула в традиционном смысле. Транзакции идут напрямую к лидеру-валидатору через QUIC. Публичного мемпул API нет. Мониторинг через getSignaturesForAddress с polling — это post-confirmation. Для pre-confirmation нужно подключаться как gRPC клиент к validator через Jito MEV infra или аналоги.
Bitcoin. getrawmempool RPC возвращает список txid всех pending транзакций. getmempoolentry — детали конкретной. ZMQ subscription (zmqpubrawtx) — события о новых транзакциях в реальном времени:
import zmq
import asyncio
context = zmq.asyncio.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:28333")
sock.subscribe(b"rawtx")
async def monitor_bitcoin_mempool():
while True:
_, raw_tx = await sock.recv_multipart()
tx = decode_bitcoin_tx(raw_tx)
await process_bitcoin_pending_tx(tx)
TON. Шардированная архитектура усложняет мемпул-мониторинг — транзакции обрабатываются параллельно в разных шардах. TonCenter API предоставляет endpoint для pending транзакций конкретного аккаунта.
Детектирование MEV паттернов
Практическое применение мемпул-данных — детектирование sandwich атак и arbitrage:
def detect_sandwich_setup(pending_txs: list[dict]) -> list[dict]:
"""Ищем потенциальные sandwich атаки: крупный своп в мемпуле"""
candidates = []
for tx in pending_txs:
decoded = decode_calldata(tx['input'])
if not decoded:
continue
# Uniswap V2 / V3 свопы
if decoded['function'] in ['swapExactTokensForTokens', 'exactInputSingle']:
amount_in_usd = get_usd_value(decoded['args'])
if amount_in_usd > SANDWICH_THRESHOLD_USD: # например $50K+
candidates.append({
'tx_hash': tx['hash'],
'from': tx['from'],
'function': decoded['function'],
'amount_usd': amount_in_usd,
'gas_price': int(tx.get('maxFeePerGas', tx.get('gasPrice', 0)), 16),
})
return candidates
Хранение и retention policy
Мемпул генерирует огромные объёмы данных. На Ethereum mainnet — несколько GB в сутки при полном логировании. Рекомендованная политика:
| Данные | Retention | Хранилище |
|---|---|---|
| Confirmed tx метаданные | Бессрочно | PostgreSQL |
| Pending tx (до confirmation) | 24 часа | Redis + periodic flush |
| Full calldata pending | 72 часа | ClickHouse (колоночное) |
| Confirmed OHLCV агрегаты | Бессрочно | TimescaleDB |
| Отброшенные транзакции (dropped) | 7 дней | PostgreSQL |
Транзакции, которые в итоге не попали в блок (dropped, replaced by higher gas) — отдельная категория. Их хранение полезно для анализа gas wars и замены транзакций.
Мониторинг системы
Ключевые метрики мемпул-монитора:
- Mempool lag — задержка между получением tx и записью в БД. Алерт при > 500мс
- Decoder throughput — транзакций/сек. Должен покрывать входящий поток
- Dropped messages — потери в Kafka/Redis Streams очереди
- Node connectivity — статус WebSocket соединения к каждой ноде
- Pending tx count — аномальный рост (>200K pending) = congestion event







