Розробка продюсерів та консьюмерів Kafka для веб-застосунку
Kafka-клієнти у виробництві — це не просто «відправити повідомлення» та «отримати повідомлення». Це гарантії доставки, ідемпотентність, управління rebalance, управління offset'ами та ізоляція від збоїв брокера.
Продюсер: гарантії доставки
Три режими надійності (acks):
-
acks=0— fire and forget, втрата даних можлива -
acks=1— лідер підтвердив, але реплік можуть не встигнути -
acks=all(або-1) — всі ISR-реплік підтвердили, втрата даних неможлива приmin.insync.replicas=2
Для фінансових транзакцій та критичних подій — тільки acks=all з enable.idempotence=true.
// Java — ідемпотентний продюсер
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
// Надійність
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // макс з ідемпотентністю
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
// Продуктивність
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB батч
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // чекаємо до 5мс на наповнення батча
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67_108_864); // 64MB буфер
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);
Відправка з обробкою помилок:
public CompletableFuture<RecordMetadata> sendOrderEvent(OrderEvent event) {
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"order-events",
event.getOrderId(), // ключ — всі события замовлення в одну партицію
event
);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Kafka сам ретраїть — цей блок не повинен спрацьовувати при правильній конфігурації
log.error("Retriable error, Kafka will retry: {}", exception.getMessage());
} else {
// Non-retriable: Authorization, RecordTooLarge, SerializationException
log.error("Fatal producer error for order {}: {}", event.getOrderId(), exception.getMessage());
future.completeExceptionally(exception);
}
} else {
log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
future.complete(metadata);
}
});
return future;
}
Трансакційний продюсер
Коли потрібно атомарно записати у кілька топіків (напр., результат обробки + повідомлення):
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-instance-1");
// transactional.id має бути унікальним для кожного екземпляру продюсера
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders-processed", orderId, processedOrder));
producer.send(new ProducerRecord<>("order-notifications", userId, notification));
// Фіксуємо offset'и консьюмера в межах тієї ж транзакції (exactly-once)
producer.sendOffsetsToTransaction(currentOffsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
producer.close(); // Цей екземпляр вже невалідний
throw e;
} catch (KafkaException e) {
producer.abortTransaction();
throw e;
}
Консьюмер: правильне управління offset'ами
Auto-commit приховує помилки: повідомлення позначене як оброблене ще до того, як застосунок насправді його обробив. При збої — втрата даних.
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://schema-registry:8081");
consumerProps.put("specific.avro.reader", true);
// Вимикаємо auto-commit
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Session timeout — якщо брокер не отримує heartbeat за цей час, вважає консьюмера мертвим
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);
// Скільки записів отримуємо за poll
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Максимальний час між poll — якщо перевищено, брокер вважає консьюмера мертвим
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);
Poll-цикл з ручною фіксацією:
KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("order-events"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Перед rebalance — зберігаємо стан обробки
commitCurrentOffsets();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Після rebalance — ініціалізуємо стан для нових партицій
log.info("Assigned partitions: {}", partitions);
}
});
Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();
try {
while (!shutdown.get()) {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, OrderEvent> record : records) {
try {
processOrder(record.value());
pendingOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (NonRetriableException e) {
// Відправляємо у DLQ та фіксуємо
sendToDlq(record, e);
pendingOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// RetriableException — не фіксуємо, poll повернемо це повідомлення знову
}
if (!pendingOffsets.isEmpty()) {
consumer.commitSync(pendingOffsets);
pendingOffsets.clear();
}
}
} finally {
consumer.close();
}
Паралельна обробка без втрати порядку
Один потік для poll + пул робітників за ключем:
// Зберігаємо порядок подій одного замовлення, паралелимо між замовленнями
Map<Integer, BlockingQueue<ConsumerRecord<String, OrderEvent>>> partitionQueues = new HashMap<>();
ExecutorService workers = Executors.newFixedThreadPool(12);
// При отриманні записів — маршрутизуємо в чергу за партицією
for (ConsumerRecord<String, OrderEvent> record : records) {
int partitionIndex = record.partition() % NUM_WORKERS;
workerQueues.get(partitionIndex).offer(record);
}
// Кожен робітник обробляє свою чергу послідовно
// → порядок в межах ключа зберігається
Python-клієнт (confluent-kafka-python)
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json
# Продюсер
producer = Producer({
'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'batch.size': 65536,
'linger.ms': 5,
'retries': 2147483647,
'delivery.timeout.ms': 120000,
})
def delivery_report(err, msg):
if err is not None:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer.produce(
'user-events',
key=str(user_id),
value=json.dumps(event).encode('utf-8'),
callback=delivery_report
)
producer.flush() # чекаємо доставки всіх pending повідомлень
# Консьюмер
consumer = Consumer({
'bootstrap.servers': 'kafka-1:9092',
'group.id': 'web-app-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 30000,
})
consumer.subscribe(['user-events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError.PARTITION_EOF:
continue
raise KafkaException(msg.error())
event = json.loads(msg.value().decode('utf-8'))
process_event(event)
consumer.commit(asynchronous=False)
finally:
consumer.close()
Таймлайн
День 1 — проектування: визначаємо топіки, ключі повідомлень (для гарантії порядку), формат повідомлень (Avro/JSON/Protobuf), групи консьюмерів.
День 2 — розробка продюсерів: серіалізація, налаштування acks/idempotence, інтеграція з бізнес-логікою застосунку.
День 3 — розробка консьюмерів: ручне управління offset'ами, обробка rebalance, Dead Letter Queue для ошибочных повідомлень.
Дні 4–5 — тестування: unit-тести з EmbeddedKafka або Testcontainers, інтеграційні тести, нагрузочний тест з kafka-producer-perf-test, перевірка exactly-once семантики.
День 6 — розгортання, моніторинг consumer lag, налаштування алертів.







