Розробка продюсерів та консьюмерів Kafka для веб-застосунку

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Розробка продюсерів та консьюмерів Kafka для веб-застосунку
Складна
~3-5 робочих днів
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Розробка продюсерів та консьюмерів Kafka для веб-застосунку

Kafka-клієнти у виробництві — це не просто «відправити повідомлення» та «отримати повідомлення». Це гарантії доставки, ідемпотентність, управління rebalance, управління offset'ами та ізоляція від збоїв брокера.

Продюсер: гарантії доставки

Три режими надійності (acks):

  • acks=0 — fire and forget, втрата даних можлива
  • acks=1 — лідер підтвердив, але реплік можуть не встигнути
  • acks=all (або -1) — всі ISR-реплік підтвердили, втрата даних неможлива при min.insync.replicas=2

Для фінансових транзакцій та критичних подій — тільки acks=all з enable.idempotence=true.

// Java — ідемпотентний продюсер
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");

// Надійність
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // макс з ідемпотентністю
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);

// Продуктивність
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);          // 64KB батч
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);               // чекаємо до 5мс на наповнення батча
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67_108_864);  // 64MB буфер
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);

Відправка з обробкою помилок:

public CompletableFuture<RecordMetadata> sendOrderEvent(OrderEvent event) {
    ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
        "order-events",
        event.getOrderId(),  // ключ — всі события замовлення в одну партицію
        event
    );

    CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            if (exception instanceof RetriableException) {
                // Kafka сам ретраїть — цей блок не повинен спрацьовувати при правильній конфігурації
                log.error("Retriable error, Kafka will retry: {}", exception.getMessage());
            } else {
                // Non-retriable: Authorization, RecordTooLarge, SerializationException
                log.error("Fatal producer error for order {}: {}", event.getOrderId(), exception.getMessage());
                future.completeExceptionally(exception);
            }
        } else {
            log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
            future.complete(metadata);
        }
    });

    return future;
}

Трансакційний продюсер

Коли потрібно атомарно записати у кілька топіків (напр., результат обробки + повідомлення):

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-instance-1");
// transactional.id має бути унікальним для кожного екземпляру продюсера

producer.initTransactions();

try {
    producer.beginTransaction();

    producer.send(new ProducerRecord<>("orders-processed", orderId, processedOrder));
    producer.send(new ProducerRecord<>("order-notifications", userId, notification));

    // Фіксуємо offset'и консьюмера в межах тієї ж транзакції (exactly-once)
    producer.sendOffsetsToTransaction(currentOffsets, consumerGroupMetadata);

    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
    producer.close(); // Цей екземпляр вже невалідний
    throw e;
} catch (KafkaException e) {
    producer.abortTransaction();
    throw e;
}

Консьюмер: правильне управління offset'ами

Auto-commit приховує помилки: повідомлення позначене як оброблене ще до того, як застосунок насправді його обробив. При збої — втрата даних.

Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://schema-registry:8081");
consumerProps.put("specific.avro.reader", true);

// Вимикаємо auto-commit
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Session timeout — якщо брокер не отримує heartbeat за цей час, вважає консьюмера мертвим
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);

// Скільки записів отримуємо за poll
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Максимальний час між poll — якщо перевищено, брокер вважає консьюмера мертвим
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);

Poll-цикл з ручною фіксацією:

KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("order-events"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Перед rebalance — зберігаємо стан обробки
        commitCurrentOffsets();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Після rebalance — ініціалізуємо стан для нових партицій
        log.info("Assigned partitions: {}", partitions);
    }
});

Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();

try {
    while (!shutdown.get()) {
        ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, OrderEvent> record : records) {
            try {
                processOrder(record.value());

                pendingOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            } catch (NonRetriableException e) {
                // Відправляємо у DLQ та фіксуємо
                sendToDlq(record, e);
                pendingOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            }
            // RetriableException — не фіксуємо, poll повернемо це повідомлення знову
        }

        if (!pendingOffsets.isEmpty()) {
            consumer.commitSync(pendingOffsets);
            pendingOffsets.clear();
        }
    }
} finally {
    consumer.close();
}

Паралельна обробка без втрати порядку

Один потік для poll + пул робітників за ключем:

// Зберігаємо порядок подій одного замовлення, паралелимо між замовленнями
Map<Integer, BlockingQueue<ConsumerRecord<String, OrderEvent>>> partitionQueues = new HashMap<>();
ExecutorService workers = Executors.newFixedThreadPool(12);

// При отриманні записів — маршрутизуємо в чергу за партицією
for (ConsumerRecord<String, OrderEvent> record : records) {
    int partitionIndex = record.partition() % NUM_WORKERS;
    workerQueues.get(partitionIndex).offer(record);
}

// Кожен робітник обробляє свою чергу послідовно
// → порядок в межах ключа зберігається

Python-клієнт (confluent-kafka-python)

from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json

# Продюсер
producer = Producer({
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'batch.size': 65536,
    'linger.ms': 5,
    'retries': 2147483647,
    'delivery.timeout.ms': 120000,
})

def delivery_report(err, msg):
    if err is not None:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer.produce(
    'user-events',
    key=str(user_id),
    value=json.dumps(event).encode('utf-8'),
    callback=delivery_report
)
producer.flush()  # чекаємо доставки всіх pending повідомлень

# Консьюмер
consumer = Consumer({
    'bootstrap.servers': 'kafka-1:9092',
    'group.id': 'web-app-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 30000,
})

consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError.PARTITION_EOF:
                continue
            raise KafkaException(msg.error())

        event = json.loads(msg.value().decode('utf-8'))
        process_event(event)
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Таймлайн

День 1 — проектування: визначаємо топіки, ключі повідомлень (для гарантії порядку), формат повідомлень (Avro/JSON/Protobuf), групи консьюмерів.

День 2 — розробка продюсерів: серіалізація, налаштування acks/idempotence, інтеграція з бізнес-логікою застосунку.

День 3 — розробка консьюмерів: ручне управління offset'ами, обробка rebalance, Dead Letter Queue для ошибочных повідомлень.

Дні 4–5 — тестування: unit-тести з EmbeddedKafka або Testcontainers, інтеграційні тести, нагрузочний тест з kafka-producer-perf-test, перевірка exactly-once семантики.

День 6 — розгортання, моніторинг consumer lag, налаштування алертів.