On-chain event alerts system development
Real-time on-chain monitoring seems simple until implementation. Subscribe to contract events through WebSocket, get a log — send notification. In practice: WebSocket breaks, nodes lag, single event arrives twice, important message goes to spam. A proper alert system is not "wrapper over getLogs", it's a separate infrastructure project with queues, deduplication and delivery mechanisms.
Event types and sources
Contract events (logs) — most common: Transfer, Swap, Deposit, Liquidation, Mint. Decoded through ABI from raw topics + data.
Large transactions — transfers above threshold value in USD. Requires conversion through price feed (Chainlink, CoinGecko API) at event time.
Address activity — any transaction to/from monitored address (whale watching, portfolio tracking).
MEV events — sandwich attacks, arbitrage, flashloans. Detected through pattern analysis within single block.
Protocol health — health factor on Aave/Compound below threshold, utilization rate above 90%, TVL drop > X%.
NFT events — mint, sale, transfer of specific collections.
Architecture
Ingestion layer
Two options for receiving events:
WebSocket subscriptions — minimum latency (< 1 sec from block). Problem: on reconnect blocks are missed. Need catch-up mechanism:
class BlockListener {
private lastProcessedBlock: number;
async start(wsUrl: string) {
// First process missed blocks
const current = await this.rpc.getBlockNumber();
for (let b = this.lastProcessedBlock + 1; b <= current; b++) {
await this.processBlock(b);
}
// Subscribe to new ones
const wsClient = createPublicClient({
transport: webSocket(wsUrl),
});
wsClient.watchBlockNumber({
onBlockNumber: async (blockNum) => {
await this.processBlock(Number(blockNum));
this.lastProcessedBlock = Number(blockNum);
},
});
}
}
Polling — every N seconds do eth_getLogs for recent blocks. Less efficient but more reliable. For SLA systems — combination: WS for speed, polling as fallback.
Event processing pipeline
[WS / Polling] → [Raw Event Queue] → [Decoder] → [Enricher] → [Rule Engine] → [Alert Queue] → [Delivery]
Decoder — ABI-decode raw logs. For unknown contracts — try to find ABI through Etherscan API or 4byte.directory.
Enricher — data enrichment: USD value through price feed, labels (exchange? whale? known protocol?), entity resolution (multiple addresses of one subject).
Rule Engine — check alert conditions against enriched event.
Rule Engine
Flexible rule system — key component. Rules should be configurable without code deployment:
interface AlertRule {
id: string;
name: string;
enabled: boolean;
// Source filters
chains: string[]; // ['ethereum', 'arbitrum']
contracts?: string[]; // contract addresses
event_signatures?: string[]; // keccak256 signatures
// Conditions
conditions: Condition[];
condition_logic: 'AND' | 'OR';
// Delivery
channels: DeliveryChannel[];
cooldown_seconds: number; // anti-spam
}
interface Condition {
field: string; // 'value_usd', 'from_label', 'health_factor'
operator: 'gt' | 'lt' | 'eq' | 'contains' | 'in';
value: any;
}
Example rule for whale alert:
{
"name": "Large ETH Transfer",
"chains": ["ethereum"],
"event_signatures": ["0xddf252ad..."],
"conditions": [
{ "field": "value_usd", "operator": "gt", "value": 1000000 },
{ "field": "token_symbol", "operator": "eq", "value": "ETH" }
],
"condition_logic": "AND",
"channels": ["telegram_main", "webhook_trading_desk"],
"cooldown_seconds": 60
}
Deduplication
With multiple nodes or on catch-up one event can arrive multiple times. Deduplication by (transaction_hash, log_index) in Redis:
async function processEvent(event: DecodedEvent): Promise<boolean> {
const key = `processed:${event.txHash}:${event.logIndex}`;
const isNew = await redis.set(key, '1', 'EX', 86400, 'NX');
// NX = only if doesn't exist, EX = TTL 24 hours
return isNew !== null;
}
Delivery channels
Telegram — most popular for crypto alerts. Telegram Bot API, formatting through MarkdownV2:
async function sendTelegramAlert(
botToken: string,
chatId: string,
event: EnrichedEvent,
) {
const text = formatAlertMessage(event);
await fetch(`https://api.telegram.org/bot${botToken}/sendMessage`, {
method: 'POST',
body: JSON.stringify({
chat_id: chatId,
text,
parse_mode: 'MarkdownV2',
disable_web_page_preview: true,
}),
});
}
Important: Telegram has rate limit — 30 messages/sec per bot, 1 message/sec per chat. At high-frequency events need batching or aggregation.
Webhook — HTTP POST to arbitrary endpoint. Used for integration with trading bots, internal systems. Should have retry with exponential backoff.
Email — for important low-frequency events. SendGrid / AWS SES.
Discord — through webhook or Discord Bot API.
PagerDuty — for critical events requiring immediate response (e.g., fund position liquidation).
Anti-spam and aggregation
Problem: at flash crash or major event hundreds of alerts generate per minute. Solutions:
- Cooldown per rule — don't send repeat alert for same rule N seconds
- Digest mode — aggregate events over 5/60 minutes and send summary
- Threshold batching — "47 liquidations on Aave last 10 minutes, total $2.3M"
System monitoring
Alert system should monitor itself:
- Block lag — lag behind head chain. Alert if > 10 blocks
- Processing queue depth — queue growth = bottleneck
- Delivery failures — failed send attempts per channel
- Rule match rate — abnormal growth = possibly rule too broad
# Prometheus metrics
alert_system_block_lag_gauge
alert_system_queue_depth_gauge
alert_system_delivery_total{channel, status}
alert_system_rule_matches_total{rule_id}
Stack and timeline
| Component | Technology |
|---|---|
| Ingestion | TypeScript + viem / ethers.js |
| Queue | Redis Streams / BullMQ |
| Enrichment | Chainlink price feeds, Etherscan Labels API |
| Rule engine | JSON-configurable, stored in PostgreSQL |
| Delivery | Telegram Bot, webhooks, Discord |
| Monitoring | Prometheus + Grafana |
Basic system (5-10 event types, Telegram + webhook, single network): 2-3 weeks. Multi-chain, complex composite rules, custom rule management UI — 4-6 weeks.







