Parsing New Token Deployment Data
Task — detect new tokens on blockchain at or shortly after deployment. This is needed for: MEV opportunity monitoring systems, new token screeners (like DEX Screener, Dextools), analytics databases, or protection systems (scam scanning).
How to detect new token on-chain
Method 1: Monitoring factory contracts
Most tokens are deployed via factories: Uniswap V2/V3 factory when creating a pool, Token Factory contracts, or direct deploy with event. Uniswap V2 factory emits PairCreated on new pool creation — the most reliable signal of a new tradeable token:
from web3 import Web3
import asyncio
UNISWAP_V2_FACTORY = "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f"
PAIR_CREATED_TOPIC = "0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9"
FACTORY_ABI = [{
"name": "PairCreated",
"type": "event",
"inputs": [
{"name": "token0", "type": "address", "indexed": True},
{"name": "token1", "type": "address", "indexed": True},
{"name": "pair", "type": "address", "indexed": False},
{"name": "", "type": "uint256", "indexed": False}
]
}]
async def watch_new_pairs(w3: Web3, callback):
factory = w3.eth.contract(address=UNISWAP_V2_FACTORY, abi=FACTORY_ABI)
# Subscribe via WebSocket to new events
event_filter = await w3.eth.filter({
"address": UNISWAP_V2_FACTORY,
"topics": [PAIR_CREATED_TOPIC]
})
while True:
events = await event_filter.get_new_entries()
for event_log in events:
decoded = factory.events.PairCreated().process_log(event_log)
await callback({
"token0": decoded.args.token0,
"token1": decoded.args.token1,
"pair": decoded.args.pair,
"block": event_log.blockNumber,
"tx_hash": event_log.transactionHash.hex()
})
await asyncio.sleep(3)
For Uniswap V3 — similarly, listen to PoolCreated on 0x1F98431c8aD98523631AE4a59f267346ea31F984.
Method 2: Detecting ERC-20 contract deployment
Direct ERC-20 deploy doesn't emit standard events. Detect via analyzing transaction receipts — if contractAddress is non-empty, this is contract deploy:
async def scan_block_for_deployments(block_number: int, w3: Web3) -> list[dict]:
block = w3.eth.get_block(block_number, full_transactions=True)
deployments = []
for tx in block.transactions:
if tx.to is None: # tx without to = contract deploy
receipt = w3.eth.get_transaction_receipt(tx.hash)
if receipt.contractAddress:
# Check if it's ERC-20
token_info = await check_if_erc20(receipt.contractAddress, w3)
if token_info:
deployments.append({
"contract": receipt.contractAddress,
"deployer": tx["from"],
"block": block_number,
"tx_hash": tx.hash.hex(),
**token_info
})
return deployments
async def check_if_erc20(address: str, w3: Web3) -> dict | None:
"""Check for mandatory ERC-20 methods"""
minimal_abi = [
{"name": "totalSupply", "type": "function", "inputs": [], "outputs": [{"type": "uint256"}]},
{"name": "decimals", "type": "function", "inputs": [], "outputs": [{"type": "uint8"}]},
{"name": "symbol", "type": "function", "inputs": [], "outputs": [{"type": "string"}]},
{"name": "name", "type": "function", "inputs": [], "outputs": [{"type": "string"}]},
]
try:
contract = w3.eth.contract(address=address, abi=minimal_abi)
return {
"name": contract.functions.name().call(),
"symbol": contract.functions.symbol().call(),
"decimals": contract.functions.decimals().call(),
"total_supply": contract.functions.totalSupply().call()
}
except Exception:
return None # not ERC-20 or reverting contract
Scanning every block — high RPC load. On Ethereum mainnet ~15 blocks/minute, each may have hundreds of transactions. Need dedicated Alchemy/QuickNode plan or own node.
Enriching data after detection
Bare contract address is uninformative. Immediately after detection enrich:
async def enrich_new_token(contract_address: str, w3: Web3) -> dict:
tasks = await asyncio.gather(
get_contract_source_code(contract_address), # Etherscan API
get_lp_info(contract_address), # existing pools
get_social_links(contract_address), # from contract or Etherscan
run_honeypot_check(contract_address), # sell tax, tradeability
return_exceptions=True
)
source, lp_info, socials, honeypot = tasks
return {
"verified_source": bool(source and not isinstance(source, Exception)),
"has_liquidity": bool(lp_info and not isinstance(lp_info, Exception)),
"honeypot_risk": honeypot if not isinstance(honeypot, Exception) else "unknown",
**socials if not isinstance(socials, Exception) else {}
}
Detecting scam patterns
For screener with warnings — automatic bytecode and behavior analysis:
SCAM_PATTERNS = {
"mint_function": "0x40c10f19", # bytes4 selector for mint(address, uint256)
"ownership_transfer": "0xf2fde38b",
"blacklist_function": "0x44337ea1",
}
def check_bytecode_risks(bytecode: str) -> list[str]:
risks = []
if len(bytecode) < 100:
risks.append("minimal_bytecode") # proxy or dummy
for name, selector in SCAM_PATTERNS.items():
if selector[2:] in bytecode: # remove 0x
risks.append(name)
return risks
Real honeypot check requires simulating buy/sell transactions via eth_call — determines buy/sell tax and whether token can be sold at all. Services: honeypot.is API, GoPlus Security API.
Storage and indexing
CREATE TABLE new_tokens (
id BIGSERIAL PRIMARY KEY,
chain_id INTEGER NOT NULL,
contract TEXT NOT NULL,
name TEXT,
symbol TEXT,
decimals SMALLINT,
total_supply NUMERIC,
deployer TEXT NOT NULL,
deploy_block INTEGER NOT NULL,
deploy_tx TEXT NOT NULL,
deploy_time TIMESTAMPTZ NOT NULL,
verified BOOLEAN DEFAULT FALSE,
has_liquidity BOOLEAN DEFAULT FALSE,
risk_flags TEXT[] DEFAULT '{}',
enriched_at TIMESTAMPTZ,
UNIQUE(chain_id, contract)
);
CREATE INDEX ON new_tokens(deploy_time DESC);
CREATE INDEX ON new_tokens(chain_id, symbol);
CREATE INDEX ON new_tokens USING gin(risk_flags);
Monitoring multiple networks
EVM-compatible networks use same mechanism — one code with different RPC endpoints. For non-EVM (Solana, TON) — separate adapters:
Solana: new tokens via InitializeMint instruction Token Program. Raydium/Orca pool creation via InitializePool.
BSC/Polygon: identical to Ethereum, but with different factory addresses PancakeSwap/QuickSwap.
Complete new token screener for 3–4 EVM networks with enrichment, risk analysis and REST API: 3–5 weeks development.







