Developing Blockchain Data Stream Processing (Kafka/Flink)
An Ethereum node in real-time generates approximately 2–5 MB of data per second during periods of high network activity. These are Transfer events, contract calls, state changes. If your analytics system or trading engine receives this data through periodic polling of an RPC node — you are working with stale data and missing events. For tasks where latency of 1–2 blocks is critical (arbitrage, liquidation monitoring, fraud detection), you need a streaming architecture with delivery guarantees.
Data Sources: From Node to Kafka
WebSocket Subscriptions vs Polling
Standard eth_subscribe("newHeads") via WebSocket provides notification of a new block without polling latency. But WebSocket connections are unstable over long periods — reconnect with catchup logic is needed:
func (s *NodeSubscriber) subscribeWithRecovery(ctx context.Context) error {
for {
lastBlock, _ := s.db.GetLastProcessedBlock()
// Catch up on missed blocks on reconnect
if err := s.catchUpFromBlock(ctx, lastBlock+1); err != nil {
return err
}
// Subscribe to new blocks
sub, err := s.client.SubscribeNewHead(ctx, s.headers)
if err != nil {
time.Sleep(backoffDuration)
continue
}
select {
case err := <-sub.Err():
log.Warnf("subscription error: %v, reconnecting", err)
case <-ctx.Done():
return nil
}
}
}
Firehose Protocol (StreamingFast/Pinax)
For Ethereum and other EVM networks, the most efficient way to get raw data is Firehose, developed by StreamingFast (now part of The Graph Foundation). The protocol instruments the node at the binary level and exports blocks in binary format (protobuf) with minimal latency. Throughput is an order of magnitude higher than JSON-RPC.
# Connect to Firehose endpoint
grpcurl -plaintext mainnet.eth.streamingfast.io:443 \
sf.ethereum.type.v2.Fetch/Block
For projects requiring full historical replay — Firehose + storing flat files in S3/GCS allows replaying any block range without re-syncing the node.
Kafka as Transport Layer
Kafka is a message queue with retention (log-based). Unlike RabbitMQ/Redis Streams, Kafka stores all messages for a configured retention period (days, weeks), allowing consumers to replay data. This is critical for blockchain analytics: a new consumer group can read the entire history of events without accessing the node.
Topic topologies for blockchain pipeline:
raw.blocks → raw blocks (partitioned by block_number % N)
raw.transactions → all transactions
raw.logs → all event logs
decoded.transfers → decoded ERC-20 Transfer events
decoded.swaps → decoded Swap events (Uniswap, Curve, etc.)
alerts.large-txns → transactions > threshold
analytics.prices → aggregated price data
Partitioning strategy matters: for events of a specific contract — partition by contractAddress (guarantees ordering). For transactions — partition by from address or blockNumber.
Apache Flink: Stateful Stream Processing
Flink is the right tool for tasks that require state: sliding aggregates, joining streams, pattern detection over time. Spark Streaming is batching disguised as streaming (micro-batches). Flink is true event-time processing.
ABI Decoding On-The-Fly
Incoming logs are raw hex data. The Flink job should decode them into typed events:
public class LogDecoderFunction extends RichFlatMapFunction<RawLog, DecodedEvent> {
private Map<String, ContractABI> abiRegistry;
@Override
public void flatMap(RawLog log, Collector<DecodedEvent> out) {
String contractAddress = log.getAddress().toLowerCase();
ContractABI abi = abiRegistry.get(contractAddress);
if (abi == null) return; // unknown contract
String topic0 = log.getTopics().get(0);
EventDefinition eventDef = abi.findEventBySignatureHash(topic0);
if (eventDef != null) {
DecodedEvent decoded = AbiDecoder.decode(eventDef, log);
out.collect(decoded);
}
}
}
ABI registry is loaded from PostgreSQL/Redis on job start and updated via Broadcast State pattern — without restarting the job when adding new contracts.
Temporal Windows and Aggregations
Task: compute 5-minute VWAP (Volume Weighted Average Price) from Uniswap V3 swaps in real-time.
DataStream<SwapEvent> swaps = source
.filter(e -> e.getType().equals("Swap"))
.map(e -> (SwapEvent) e);
DataStream<VWAPResult> vwap = swaps
.keyBy(SwapEvent::getPoolAddress)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new VWAPAggregator(), new VWAPWindowFunction());
Event time vs processing time is a critical choice. Event time (block time) gives deterministic results on replay. Processing time is faster but gives different results on replay.
Watermarks for handling late events — blockchain transactions may arrive in Kafka with slight delay:
WatermarkStrategy.<RawLog>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((log, ts) -> log.getBlockTimestamp() * 1000L)
Complex Patterns: CEP for Anomaly Detection
Flink CEP (Complex Event Processing) allows describing sequences of events. Task: detect sandwich attack — front-run transaction, victim, back-run transaction in one block.
Pattern<DecodedEvent, ?> sandwichPattern = Pattern
.<DecodedEvent>begin("frontrun")
.where(e -> e.isSwap() && e.getGasPrice() > threshold)
.next("victim")
.where(e -> e.isSwap() && samePool(e, "frontrun"))
.next("backrun")
.where(e -> e.isSwap() && samePool(e, "frontrun")
&& e.getSender().equals(frontrunSender(e)))
.within(Time.seconds(12)); // within one block
State Backend and Fault Tolerance
Flink checkpoint is a snapshot of all operators' state to S3/HDFS. On failure — recovery from the last checkpoint, Kafka consumer offset is saved atomically with state. This guarantees exactly-once semantics for most operators.
RocksDB state backend is required for production with large state (millions of keys). In-memory backend doesn't scale.
# flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: s3://your-bucket/flink-checkpoints
execution.checkpointing.interval: 60000 # every minute
execution.checkpointing.mode: EXACTLY_ONCE
Monitoring and Dead Letter Queues
Unprocessed events (unknown ABI, parse error, unexpected format) cannot simply be dropped. Dead letter queue (DLQ) in a separate Kafka topic with original message and error stack trace — standard pattern.
Flink metrics + Prometheus + Grafana: lag per topic, operator throughput, backpressure across the job graph. Backpressure is the first indicator that downstream can't keep up.
Typical Use Cases and Latencies
| Use case | Acceptable latency | Tool |
|---|---|---|
| MEV bot / arbitrage | < 100ms | WebSocket → in-process |
| Liquidation monitoring | < 1 sec | Kafka + Flink CEP |
| Real-time DeFi analytics | 1–5 sec | Kafka + Flink aggregations |
| On-chain analytics/BI | < 1 min | Kafka + Flink → ClickHouse |
| Historical analysis | no limits | Firehose → S3 → Spark/dbt |
Infrastructure and Stack
Minimal production cluster: 3 Kafka brokers (3 replicas for durability), Flink cluster with 1 JobManager + 3–5 TaskManager pods in Kubernetes. Results storage — ClickHouse for analytics queries (columnar, fast aggregations on large volumes) or PostgreSQL + TimescaleDB for time series metrics.
Managed services reduce operational burden: Confluent Cloud (Kafka), Amazon Kinesis (alternative for AWS-native stack). For on-premise or compliance requirements — own cluster.
Developing the first production pipeline with event decoding, basic aggregations, and monitoring — 4–6 weeks. Complex CEP patterns and multi-chain support add another 3–4 weeks.







