Налаштування Kafka Streams для обробки потоків даних

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Налаштування Kafka Streams для обробки потоків даних
Складна
~5 робочих днів
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Налаштування Kafka Streams для обробки потоків даних

Kafka Streams — це бібліотека для потокової обробки даних, яка працює як частина звичайного Java/Kotlin додатку. Жодного окремого кластеру обробки, жодного YARN або Mesos — лише залежність у pom.xml і звичайний JVM-процес. Це принципова відмінність від Flink і Spark Streaming, де потрібна окрема інфраструктура.

Архітектурний огляд

Kafka Streams читає топіки, трансформує дані, агрегує, джойнить і пише результат назад у Kafka або зовнішні системи через Kafka Connect. Стан зберігається локально в RocksDB і реплікується до changelog-топіків Kafka — це забезпечує відказостійкість без зовнішної бази.

Типові задачі на сайту: агрегація подій користувачів у реальному часі (DAU, воронки), збагачення потоку замовлень довідниковими даними, виявлення шахрайства за закономірностями поведінки, отримання матеріалізованих представлень з event-sourced даних.

Базова топологія

StreamsBuilder builder = new StreamsBuilder();

KStream<String, UserEvent> events = builder.stream(
    "user-events",
    Consumed.with(Serdes.String(), userEventSerde)
);

// Фільтрація + трансформація
KStream<String, PageView> pageViews = events
    .filter((userId, event) -> event.getType().equals("PAGE_VIEW"))
    .mapValues(event -> PageView.from(event));

// Ветвлення потоку
Map<String, KStream<String, UserEvent>> branches = events.split(Named.as("branch-"))
    .branch((k, v) -> v.getType().equals("PURCHASE"), Branched.as("purchases"))
    .branch((k, v) -> v.getType().equals("CLICK"), Branched.as("clicks"))
    .defaultBranch(Branched.as("other"));

branches.get("branch-purchases").to("purchase-events");

Агрегації з функціями вікон

Задача — рахувати кількість переглядів сторінок за користувачами у скользящому вікні 5 хвилин:

KTable<Windowed<String>, Long> pageViewCounts = pageViews
    .groupByKey(Grouped.with(Serdes.String(), pageViewSerde))
    .windowedBy(
        SlidingWindows.ofTimeDifferenceAndGrace(
            Duration.ofMinutes(5),
            Duration.ofSeconds(30)  // grace period для поздніх подій
        )
    )
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("page-view-counts")
        .withValueSerde(Serdes.Long())
    );

// Публікація результатів
pageViewCounts.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),
        new PageViewStat(windowedKey.key(), windowedKey.window().start(), count)
    ))
    .to("page-view-stats", Produced.with(Serdes.String(), pageViewStatSerde));

Типи вікон:

  • TumblingWindows — фіксовані непересікаючіся інтервали (00:00–00:05, 00:05–00:10)
  • HoppingWindows — фіксований розмір, скользящий крок
  • SlidingWindows — рухаються за кожною подією
  • SessionWindows — групування за промежутками активності

KTable та матеріалізовані представлення

KTable — changelog-потік, у якому кожен новий запис з тим же ключем перезаписує попередній. Використовується для довідкових даних:

// Збагачення потоку подій даними про користувачів
KTable<String, UserProfile> userProfiles = builder.table(
    "user-profiles",
    Materialized.as("user-profiles-store")
);

KStream<String, EnrichedEvent> enriched = events.join(
    userProfiles,
    (event, profile) -> EnrichedEvent.builder()
        .event(event)
        .userName(profile.getName())
        .userSegment(profile.getSegment())
        .build()
);

KTable join працює синхронно — запис із потоку джойниться з поточним станом таблиці на момент обробки.

Серіалізація з Avro та Schema Registry

У production завжди потрібна схема. Avro + Confluent Schema Registry — стандарт:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>7.5.0</version>
</dependency>
Map<String, Object> serdeConfig = Map.of(
    AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081",
    KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true
);

KafkaAvroSerde<UserEvent> userEventSerde = new KafkaAvroSerde<>();
userEventSerde.configure(serdeConfig, false);  // false = value serde

KStream<String, UserEvent> events = builder.stream(
    "user-events",
    Consumed.with(Serdes.String(), userEventSerde)
);

Schema Registry зберігає версії схем і забезпечує сумісність при еволюції. Змінення схеми без реєстрації ломить consumers.

Конфігурація додатку

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "site-analytics-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// Продуктивність
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

// Обробка помилок
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);

// RocksDB state store
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Interactive Queries — читання стану

Kafka Streams дозволяє читати state store без круглої поїздки через Kafka:

ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "page-view-counts",
        QueryableStoreTypes.keyValueStore()
    )
);

// GET /api/stats/user/:userId
Long count = store.get(userId);

// Для windowed store
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "page-view-counts-windowed",
        QueryableStoreTypes.windowStore()
    )
);
WindowStoreIterator<Long> iterator = windowStore.fetch(
    userId,
    Instant.now().minus(Duration.ofMinutes(5)),
    Instant.now()
);

При горизонтальному масштабуванні стан розподілений між екземплярами. Kafka Streams надає streamsMetadataForKey для визначення, на якому хосту живе потрібний ключ — основа для реалізації REST-прокси до розподіленого стану.

Моніторинг та метрики

Kafka Streams експортує метрики через JMX. Для Prometheus — JMX Exporter:

# docker-compose.yml
services:
  streams-app:
    image: my-streams-app:latest
    environment:
      JAVA_OPTS: >
        -javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent.jar=8080:/opt/jmx-exporter/config.yml
        -Xmx2g
        -XX:+UseG1GC

Ключові метрики:

  • kafka.streams:type=stream-metrics,client-id=* — метрики додатку
  • process-rate — записів за секунду
  • process-latency-avg — середня задержка обробки
  • commit-latency-avg — задержка коміту
  • rocksdb-block-cache-hit-ratio — ефективність кешу RocksDB

Dead Letter Queue

Помилки десеріалізації та обробки повинні маршрутизуватися, а не просто логуватися:

// Кастомний обробник помилок
public class DlqProductionExceptionHandler implements ProductionExceptionHandler {
    private final Producer<byte[], byte[]> dlqProducer;

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {
        dlqProducer.send(new ProducerRecord<>("dead-letter-queue", record.key(), record.value()));
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

Терміни

Налаштування Kafka + Schema Registry + базова топологія Streams — 3–4 дні. Агрегації з windowed операціями та Interactive Queries для REST API — ще 3–5 днів. Повноцінний пайплайн з моніторингом, DLQ, тестами (TopologyTestDriver) і CI/CD — 2–3 тижні. Переїзд існуючої аналітики з batch-обробки на Streams — оцінюється окремо після аудиту топіків та схем.