Parsing Mempool Data
Mempool is queue of unconfirmed transactions, visible to any node. This is raw material for MEV bots, risk monitoring systems, front-running detection, arbitrage strategies and market analytics. Difference between "looking at confirmed transactions" and "looking at mempool" is difference between post-fact analysis and ability to act before block inclusion.
What is Mempool From Technical Perspective
On Ethereum each full node maintains own txpool — in-memory structure of unconfirmed transactions. eth_getTransactionByHash on pending transaction returns data before block inclusion. txpool_content (Geth-specific endpoint) returns whole mempool in one request.
Mempool not global — different nodes see different subsets depending on P2P topology. Transaction propagates via gossip protocol, not instantly. If transaction sent via one RPC, another RPC may not see it for several seconds.
For MEV-sensitive apps important to understand: there is private mempool (Flashbots, MEV Blocker, block builders) — transactions go directly to builder bypassing public mempool. Such transactions invisible to any mempool monitoring.
Methods to Connect to Mempool
eth_subscribe("pendingTransactions")
Simplest method — WebSocket subscription:
import { createPublicClient, webSocket } from 'viem';
const client = createPublicClient({
transport: webSocket('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'),
});
// Only transaction hashes
const unwatch = client.watchPendingTransactions({
onTransactions: async (hashes) => {
for (const hash of hashes) {
// For each hash need separate request for details
const tx = await client.getTransaction({ hash });
await processPendingTx(tx);
}
},
});
Problem: at high traffic (Ethereum mainnet — 50-200 pending transactions per second) — can't keep up requesting details for each transaction, queue accumulates.
eth_subscribe("newPendingTransactions") with Full TX Body
Some nodes support extended mode with full transaction body:
import asyncio
import json
import websockets
async def subscribe_mempool_full():
async with websockets.connect("wss://your-private-node:8546") as ws:
# Standard Geth subscription with includeTransactions=true
await ws.send(json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_subscribe",
"params": ["newPendingTransactions", True] # True = include full tx
}))
ack = json.loads(await ws.recv())
subscription_id = ack["result"]
async for raw in ws:
msg = json.loads(raw)
if "params" in msg:
tx = msg["params"]["result"]
await process_transaction(tx)
Not all providers support True flag. Alchemy and Infura — yes. QuickNode — yes. Public Infura without key — no.
txpool_content and txpool_inspect
For snapshot of entire mempool at single moment:
import httpx
import asyncio
async def snapshot_mempool(rpc_url: str) -> dict:
"""Complete mempool snapshot — only for own node!"""
async with httpx.AsyncClient() as client:
resp = await client.post(rpc_url, json={
"jsonrpc": "2.0",
"method": "txpool_content",
"params": [],
"id": 1
})
data = resp.json()["result"]
pending_txs = []
for sender, nonce_map in data["pending"].items():
for nonce, tx in nonce_map.items():
pending_txs.append({
**tx,
"status": "pending",
"from": sender,
})
queued_txs = []
for sender, nonce_map in data["queued"].items():
for nonce, tx in nonce_map.items():
queued_txs.append({**tx, "status": "queued", "from": sender})
return {"pending": pending_txs, "queued": queued_txs}
txpool_content can return tens of thousands of transactions — heavy request, don't call more than once per second.
Decoding Calldata
Raw calldata is ABI-encoded function call. First 4 bytes (function selector) = keccak256 first 4 bytes of function signature. Rest — ABI-encoded arguments.
from eth_abi import decode
from eth_utils import function_abi_to_4byte_selector
import json
# Load Uniswap V2 Router ABI
with open('uniswap_v2_router_abi.json') as f:
abi = json.load(f)
# Build mapping selector → function
selector_to_func = {}
for func in abi:
if func['type'] == 'function':
selector = function_abi_to_4byte_selector(func).hex()
selector_to_func[selector] = func
def decode_calldata(calldata: str) -> dict | None:
if len(calldata) < 10:
return None
selector = calldata[2:10] # remove '0x'
func = selector_to_func.get(selector)
if not func:
return None
input_types = [inp['type'] for inp in func['inputs']]
decoded = decode(input_types, bytes.fromhex(calldata[10:]))
return {
'function': func['name'],
'args': dict(zip([i['name'] for i in func['inputs']], decoded))
}
# Example: decode Uniswap V2 swapExactTokensForTokens
tx_data = "0x38ed1739000000000000000000000000000000000000000000000000..."
result = decode_calldata(tx_data)
# {'function': 'swapExactTokensForTokens', 'args': {'amountIn': 1000, ...}}
For unknown selectors — 4byte.directory database contains 1M+ signatures:
async def lookup_selector(selector: str) -> str | None:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://www.4byte.directory/api/v1/signatures/?hex_signature=0x{selector}")
results = resp.json().get("results", [])
return results[0]["text_signature"] if results else None
High-Performance Mempool Monitor Architecture
Under serious load (full Ethereum mempool = 50-300 tx/sec) bottleneck — not network, but processing. Architecture:
[Multiple P2P Nodes] → [Raw Tx Stream]
↓
[Decoder Worker Pool] ← N processes
/ | \
[Kafka Topic: decoded_txs]
/ | \
[MEV Detector] [Volume Monitor] [Alert Engine]
↓
[TimescaleDB / ClickHouse]
Kafka (or Redis Streams) as data bus critical: decoding and storage must be async from receipt. If DB slows down — don't lose incoming data.
Own node mandatory for serious mempool monitoring. Public RPC (Alchemy, Infura) throttle pending subscriptions and don't give txpool_content. For Ethereum: Geth or Reth node with 32GB RAM + NVMe SSD.
P2P Connection (devp2p)
Lowest level and complete access — connect to Ethereum P2P network directly as non-mining peer:
# libp2p / devp2p libraries for Python: py-evm, pydevp2p
# More practical use ready solutions:
# - Blocknative Mempool Explorer API
# - TxStreet
# - EigenPhi (for MEV analysis)
# Or Rust: reth with custom hooks for mempool events
At P2P level see transactions before they reach RPC providers. For MEV bots first 100ms — this is competitive advantage.
Other Networks Specifics
Solana. No mempool in traditional sense. Transactions go directly to leader-validator via QUIC. No public mempool API. Monitoring via getSignaturesForAddress with polling — this is post-confirmation. For pre-confirmation need connect as gRPC client to validator via Jito MEV infra or similar.
Bitcoin. getrawmempool RPC returns list of txids of all pending transactions. getmempoolentry — details of specific. ZMQ subscription (zmqpubrawtx) — events about new transactions in real-time:
import zmq
import asyncio
context = zmq.asyncio.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:28333")
sock.subscribe(b"rawtx")
async def monitor_bitcoin_mempool():
while True:
_, raw_tx = await sock.recv_multipart()
tx = decode_bitcoin_tx(raw_tx)
await process_bitcoin_pending_tx(tx)
TON. Sharded architecture complicates mempool monitoring — transactions processed in parallel in different shards. TonCenter API provides endpoint for pending transactions of specific account.
MEV Pattern Detection
Practical application of mempool data — detecting sandwich attacks and arbitrage:
def detect_sandwich_setup(pending_txs: list[dict]) -> list[dict]:
"""Find potential sandwich attacks: large swap in mempool"""
candidates = []
for tx in pending_txs:
decoded = decode_calldata(tx['input'])
if not decoded:
continue
# Uniswap V2 / V3 swaps
if decoded['function'] in ['swapExactTokensForTokens', 'exactInputSingle']:
amount_in_usd = get_usd_value(decoded['args'])
if amount_in_usd > SANDWICH_THRESHOLD_USD: # e.g. $50K+
candidates.append({
'tx_hash': tx['hash'],
'from': tx['from'],
'function': decoded['function'],
'amount_usd': amount_in_usd,
'gas_price': int(tx.get('maxFeePerGas', tx.get('gasPrice', 0)), 16),
})
return candidates
Storage and Retention Policy
Mempool generates huge data volumes. On Ethereum mainnet — several GB per day with full logging. Recommended policy:
| Data | Retention | Storage |
|---|---|---|
| Confirmed tx metadata | Forever | PostgreSQL |
| Pending tx (until confirmation) | 24 hours | Redis + periodic flush |
| Full calldata pending | 72 hours | ClickHouse (columnar) |
| Confirmed OHLCV aggregates | Forever | TimescaleDB |
| Dropped transactions (dropped, replaced) | 7 days | PostgreSQL |
Transactions ultimately not included in block (dropped, replaced by higher gas) — separate category. Storing them useful for analyzing gas wars and transaction replacement.
System Monitoring
Key mempool monitor metrics:
- Mempool lag — delay between receiving tx and writing to DB. Alert at > 500ms
- Decoder throughput — transactions/sec. Should cover incoming flow
- Dropped messages — losses in Kafka/Redis Streams queue
- Node connectivity — status of WebSocket connection to each node
- Pending tx count — abnormal growth (>200K pending) = congestion event







