Розробка потокової обробки блокчейн-даних (Kafka/Flink)
Нода Ethereum в режимі real-time генерує близько 2–5 МБ даних на секунду в період високої активності мережі. Це подій Transfer, виклики контрактів, змін стану. Якщо ваша аналітична система або торговий движок отримують ці дані через періодичний polling RPC-ноди — ви працюєте зі застарілими даними та пропускаєте подій. Для задач, де затримка в 1–2 блока критична (арбітраж, liquidation monitoring, fraud detection), потрібна потокова архітектура з гарантіями доставки.
Джерела даних: від ноди до Kafka
WebSocket підписки vs polling
Стандартна eth_subscribe("newHeads") через WebSocket дає сповіщення про новий блок без затримки polling. Але WebSocket-з'єднання нестабільні на довгих періодах — потрібна reconnect з логікою catchup:
func (s *NodeSubscriber) subscribeWithRecovery(ctx context.Context) error {
for {
lastBlock, _ := s.db.GetLastProcessedBlock()
// Наздогнати пропущені блоки при reconnect
if err := s.catchUpFromBlock(ctx, lastBlock+1); err != nil {
return err
}
// Підписатися на нові блоки
sub, err := s.client.SubscribeNewHead(ctx, s.headers)
if err != nil {
time.Sleep(backoffDuration)
continue
}
select {
case err := <-sub.Err():
log.Warnf("subscription error: %v, reconnecting", err)
case <-ctx.Done():
return nil
}
}
}
Firehose протокол (StreamingFast/Pinax)
Для Ethereum та інших EVM-мереж найефективніший спосіб отримання сирих даних — Firehose, розроблений StreamingFast (тепер частина The Graph Foundation). Протокол інструментує ноду на рівні бінарника та експортує блоки в бінарному форматі (protobuf) з мінімальною затримкою. Пропускна спроможність на порядок вища ніж через JSON-RPC.
# Підключення до Firehose endpoint
grpcurl -plaintext mainnet.eth.streamingfast.io:443 \
sf.ethereum.type.v2.Fetch/Block
Для проектів з вимогою повної історичної прокрутки — Firehose + зберігання flat files в S3/GCS дозволяє відтворити будь-який діапазон блоків без повторної синхронізації ноди.
Kafka як транспортний шар
Kafka — черга з збереженням (log-based). На відміну від RabbitMQ/Redis Streams, Kafka зберігає всі повідомлення на налаштований період збереження (дні, тижні), що дозволяє consumers перечитувати дані. Це критично для блокчейн-аналітики: новий consumer group може прочитати всю історію подій без звернення до ноди.
Топології топіків для blockchain pipeline:
raw.blocks → сирі блоки (partitioned by block_number % N)
raw.transactions → всі транзакції
raw.logs → всі event logs
decoded.transfers → декодовані ERC-20 Transfer подій
decoded.swaps → декодовані Swap подій (Uniswap, Curve, etc.)
alerts.large-txns → транзакції > threshold
analytics.prices → агреговані ціни
Стратегія partitioning важлива: для подій конкретного контракту — partition by contractAddress (гарантує ordering). Для транзакцій — partition by from адреса або blockNumber.
Apache Flink: Stateful Stream Processing
Flink — правильний інструмент для задач, які вимагають стану: скользящі агрегати, join потоків, виявлення паттернів у часі. Spark Streaming — батчинг під видом стрімінгу (micro-batches). Flink — справжня event-time обробка.
Декодування ABI on-the-fly
Входящі logs — сирі hex дані. Flink job повинен декодувати їх у типізовані подій:
public class LogDecoderFunction extends RichFlatMapFunction<RawLog, DecodedEvent> {
private Map<String, ContractABI> abiRegistry;
@Override
public void flatMap(RawLog log, Collector<DecodedEvent> out) {
String contractAddress = log.getAddress().toLowerCase();
ContractABI abi = abiRegistry.get(contractAddress);
if (abi == null) return; // невідомий контракт
String topic0 = log.getTopics().get(0);
EventDefinition eventDef = abi.findEventBySignatureHash(topic0);
if (eventDef != null) {
DecodedEvent decoded = AbiDecoder.decode(eventDef, log);
out.collect(decoded);
}
}
}
ABI реєстр завантажується з PostgreSQL/Redis при старті job та оновлюється через Broadcast State pattern — без перезапуску job при додаванні нових контрактів.
Часові вікна та агрегації
Задача: обчислювати 5-хвилинний VWAP (Volume Weighted Average Price) по свапам Uniswap V3 в режимі реального часу.
DataStream<SwapEvent> swaps = source
.filter(e -> e.getType().equals("Swap"))
.map(e -> (SwapEvent) e);
DataStream<VWAPResult> vwap = swaps
.keyBy(SwapEvent::getPoolAddress)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new VWAPAggregator(), new VWAPWindowFunction());
Event time vs processing time — принципіальний вибір. Event time (час блока) дає детерміновані результати при переигруванні історії. Processing time швидше, але дає різні результати при replay.
Watermarks для обробки late events — блокчейн транзакції можуть приходити в Kafka з невеликою затримкою:
WatermarkStrategy.<RawLog>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((log, ts) -> log.getBlockTimestamp() * 1000L)
Складні паттерни: CEP для виявлення аномалій
Flink CEP (Complex Event Processing) дозволяє описувати послідовності подій. Задача: детектувати sandwich attack — front-run транзакція, жертва, back-run транзакція в одному блоці.
Pattern<DecodedEvent, ?> sandwichPattern = Pattern
.<DecodedEvent>begin("frontrun")
.where(e -> e.isSwap() && e.getGasPrice() > threshold)
.next("victim")
.where(e -> e.isSwap() && samePool(e, "frontrun"))
.next("backrun")
.where(e -> e.isSwap() && samePool(e, "frontrun")
&& e.getSender().equals(frontrunSender(e)))
.within(Time.seconds(12)); // в межах одного блока
State backend та відмовостійкість
Flink checkpoint — снапшот стану всіх операторів в S3/HDFS. При збої — відновлення з останнього checkpoint, Kafka consumer offset зберігається атомарно зі state. Це гарантує exactly-once семантику для більшості операторів.
RocksDB state backend — обов'язковий для production при великому стані (мільйони ключів). In-memory backend не масштабується.
# flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: s3://your-bucket/flink-checkpoints
execution.checkpointing.interval: 60000 # щохвилини
execution.checkpointing.mode: EXACTLY_ONCE
Моніторинг та Dead Letter Queues
Необроблені подій (невідомий ABI, помилка парсингу, неочікуваний формат) не можна просто відкидати. Dead letter queue (DLQ) в окремий Kafka топік з збереженням оригінального повідомлення та stack trace помилки — стандартний паттерн.
Метрики Flink + Prometheus + Grafana: lag по кожному топіку, пропускна спроможність операторів, backpressure по графу job. Backpressure — перший індикатор, що downstream не справляється.
Типові use cases та затримки
| Use case | Допустима затримка | Інструмент |
|---|---|---|
| MEV bot / арбітраж | < 100мс | WebSocket → in-process |
| Liquidation monitoring | < 1 сек | Kafka + Flink CEP |
| Real-time DeFi аналітика | 1–5 сек | Kafka + Flink aggregations |
| Ончейн аналітика/BI | < 1 хв | Kafka + Flink → ClickHouse |
| Історичний аналіз | без обмежень | Firehose → S3 → Spark/dbt |
Інфраструктура та стек
Мінімальний production кластер: 3 Kafka брокери (3 репліки для durability), Flink cluster з 1 JobManager + 3–5 TaskManager pods в Kubernetes. Зберігання результатів — ClickHouse для аналітичних запитів (колоночне, швидкі aggregations на великих обсягах) або PostgreSQL + TimescaleDB для метрик часових рядів.
Керовані сервіси скорочують операційну нагрузку: Confluent Cloud (Kafka), Amazon Kinesis (альтернатива для AWS-native стека). Для on-premise або compliance вимог — власний кластер.
Розробка першої production pipeline з декодуванням подій, базовими агрегаціями та мониторингом — 4–6 тижнів. Складні CEP паттерни та multi-chain підтримка добавляють ще 3–4 тижні.







