Налаштування Kafka Streams для обробки потоків даних
Kafka Streams — це бібліотека для потокової обробки даних, яка працює як частина звичайного Java/Kotlin додатку. Жодного окремого кластеру обробки, жодного YARN або Mesos — лише залежність у pom.xml і звичайний JVM-процес. Це принципова відмінність від Flink і Spark Streaming, де потрібна окрема інфраструктура.
Архітектурний огляд
Kafka Streams читає топіки, трансформує дані, агрегує, джойнить і пише результат назад у Kafka або зовнішні системи через Kafka Connect. Стан зберігається локально в RocksDB і реплікується до changelog-топіків Kafka — це забезпечує відказостійкість без зовнішної бази.
Типові задачі на сайту: агрегація подій користувачів у реальному часі (DAU, воронки), збагачення потоку замовлень довідниковими даними, виявлення шахрайства за закономірностями поведінки, отримання матеріалізованих представлень з event-sourced даних.
Базова топологія
StreamsBuilder builder = new StreamsBuilder();
KStream<String, UserEvent> events = builder.stream(
"user-events",
Consumed.with(Serdes.String(), userEventSerde)
);
// Фільтрація + трансформація
KStream<String, PageView> pageViews = events
.filter((userId, event) -> event.getType().equals("PAGE_VIEW"))
.mapValues(event -> PageView.from(event));
// Ветвлення потоку
Map<String, KStream<String, UserEvent>> branches = events.split(Named.as("branch-"))
.branch((k, v) -> v.getType().equals("PURCHASE"), Branched.as("purchases"))
.branch((k, v) -> v.getType().equals("CLICK"), Branched.as("clicks"))
.defaultBranch(Branched.as("other"));
branches.get("branch-purchases").to("purchase-events");
Агрегації з функціями вікон
Задача — рахувати кількість переглядів сторінок за користувачами у скользящому вікні 5 хвилин:
KTable<Windowed<String>, Long> pageViewCounts = pageViews
.groupByKey(Grouped.with(Serdes.String(), pageViewSerde))
.windowedBy(
SlidingWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5),
Duration.ofSeconds(30) // grace period для поздніх подій
)
)
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("page-view-counts")
.withValueSerde(Serdes.Long())
);
// Публікація результатів
pageViewCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(),
new PageViewStat(windowedKey.key(), windowedKey.window().start(), count)
))
.to("page-view-stats", Produced.with(Serdes.String(), pageViewStatSerde));
Типи вікон:
-
TumblingWindows— фіксовані непересікаючіся інтервали (00:00–00:05, 00:05–00:10) -
HoppingWindows— фіксований розмір, скользящий крок -
SlidingWindows— рухаються за кожною подією -
SessionWindows— групування за промежутками активності
KTable та матеріалізовані представлення
KTable — changelog-потік, у якому кожен новий запис з тим же ключем перезаписує попередній. Використовується для довідкових даних:
// Збагачення потоку подій даними про користувачів
KTable<String, UserProfile> userProfiles = builder.table(
"user-profiles",
Materialized.as("user-profiles-store")
);
KStream<String, EnrichedEvent> enriched = events.join(
userProfiles,
(event, profile) -> EnrichedEvent.builder()
.event(event)
.userName(profile.getName())
.userSegment(profile.getSegment())
.build()
);
KTable join працює синхронно — запис із потоку джойниться з поточним станом таблиці на момент обробки.
Серіалізація з Avro та Schema Registry
У production завжди потрібна схема. Avro + Confluent Schema Registry — стандарт:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>7.5.0</version>
</dependency>
Map<String, Object> serdeConfig = Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true
);
KafkaAvroSerde<UserEvent> userEventSerde = new KafkaAvroSerde<>();
userEventSerde.configure(serdeConfig, false); // false = value serde
KStream<String, UserEvent> events = builder.stream(
"user-events",
Consumed.with(Serdes.String(), userEventSerde)
);
Schema Registry зберігає версії схем і забезпечує сумісність при еволюції. Змінення схеми без реєстрації ломить consumers.
Конфігурація додатку
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "site-analytics-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Продуктивність
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// Обробка помилок
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// RocksDB state store
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Interactive Queries — читання стану
Kafka Streams дозволяє читати state store без круглої поїздки через Kafka:
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"page-view-counts",
QueryableStoreTypes.keyValueStore()
)
);
// GET /api/stats/user/:userId
Long count = store.get(userId);
// Для windowed store
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
StoreQueryParameters.fromNameAndType(
"page-view-counts-windowed",
QueryableStoreTypes.windowStore()
)
);
WindowStoreIterator<Long> iterator = windowStore.fetch(
userId,
Instant.now().minus(Duration.ofMinutes(5)),
Instant.now()
);
При горизонтальному масштабуванні стан розподілений між екземплярами. Kafka Streams надає streamsMetadataForKey для визначення, на якому хосту живе потрібний ключ — основа для реалізації REST-прокси до розподіленого стану.
Моніторинг та метрики
Kafka Streams експортує метрики через JMX. Для Prometheus — JMX Exporter:
# docker-compose.yml
services:
streams-app:
image: my-streams-app:latest
environment:
JAVA_OPTS: >
-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent.jar=8080:/opt/jmx-exporter/config.yml
-Xmx2g
-XX:+UseG1GC
Ключові метрики:
-
kafka.streams:type=stream-metrics,client-id=*— метрики додатку -
process-rate— записів за секунду -
process-latency-avg— середня задержка обробки -
commit-latency-avg— задержка коміту -
rocksdb-block-cache-hit-ratio— ефективність кешу RocksDB
Dead Letter Queue
Помилки десеріалізації та обробки повинні маршрутизуватися, а не просто логуватися:
// Кастомний обробник помилок
public class DlqProductionExceptionHandler implements ProductionExceptionHandler {
private final Producer<byte[], byte[]> dlqProducer;
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception) {
dlqProducer.send(new ProducerRecord<>("dead-letter-queue", record.key(), record.value()));
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
Терміни
Налаштування Kafka + Schema Registry + базова топологія Streams — 3–4 дні. Агрегації з windowed операціями та Interactive Queries для REST API — ще 3–5 днів. Повноцінний пайплайн з моніторингом, DLQ, тестами (TopologyTestDriver) і CI/CD — 2–3 тижні. Переїзд існуючої аналітики з batch-обробки на Streams — оцінюється окремо після аудиту топіків та схем.







