Налаштування черг повідомлень (Apache Kafka) для веб-застосунку

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Налаштування черг повідомлень (Apache Kafka) для веб-застосунку
Складна
~5 робочих днів
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Налаштування Apache Kafka як чергиці повідомлень

Kafka — розподілена платформа потокової обробки подій. На відміну від RabbitMQ, повідомлення в Kafka не видаляються після обробки — вони зберігаються в топіці за політикою утримання (наприклад, 7 днів або 100 ГБ). Кілька груп користувачів можуть незалежно читати один топік з різними зміщеннями.

Коли використовувати Kafka, коли RabbitMQ

Kafka краще для: event sourcing, аналітичних конвеєрів, логів аудиту, потокової обробки з повторним відтворенням. RabbitMQ — для task queues (електронні листи, SMS, генерація PDF) з гарантійною однократною доставкою.

Встановлення через Docker

# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper_data:/var/lib/zookeeper/data
      - zookeeper_log:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 168        # 7 днів
      KAFKA_LOG_RETENTION_BYTES: 107374182400  # 100 ГБ
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
    volumes:
      - kafka_data:/var/lib/kafka/data
    ports:
      - "9092:9092"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on: [kafka]
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    ports:
      - "8080:8080"

volumes:
  zookeeper_data:
  zookeeper_log:
  kafka_data:

Створення топіків

# Створити топік із 6 партиціями та 3 репліками (для кластера)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic user-events \
  --partitions 6 \
  --replication-factor 1 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

# Перегляд
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic user-events

PHP: rdkafka producer

use RdKafka\Producer;
use RdKafka\Conf;

class KafkaProducer
{
    private Producer $producer;

    public function __construct()
    {
        $conf = new Conf();
        $conf->set('bootstrap.servers', config('kafka.brokers'));
        $conf->set('security.protocol', 'PLAINTEXT');
        $conf->set('acks', 'all');              // підтвердження від всіх реплік
        $conf->set('retries', '3');
        $conf->set('enable.idempotence', 'true'); // точно один раз
        $conf->set('compression.type', 'snappy');

        $conf->setDrMsgCb(function ($kafka, $message) {
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                Log::error('Kafka delivery failed', [
                    'error' => $message->errstr(),
                    'topic' => $message->topic_name,
                ]);
            }
        });

        $this->producer = new Producer($conf);
    }

    public function publish(string $topic, string $key, array $payload): void
    {
        $rdTopic = $this->producer->newTopic($topic);
        $rdTopic->produce(
            partition: RD_KAFKA_PARTITION_UA,  // автовибір партиції за ключем
            msgflags: 0,
            payload: json_encode($payload),
            key: $key,                          // один ключ → одна партиція → порядок подій
        );
        $this->producer->poll(0);
    }

    public function flush(): void
    {
        $result = $this->producer->flush(10000);  // 10 секунд таймаут
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Kafka flush failed: ' . rd_kafka_err2str($result));
        }
    }
}

// Використання
$producer->publish('user-events', (string) $user->id, [
    'event'     => 'user.registered',
    'user_id'   => $user->id,
    'email'     => $user->email,
    'timestamp' => now()->toIso8601String(),
]);
$producer->flush();

PHP: Consumer

use RdKafka\KafkaConsumer;
use RdKafka\Conf;

class UserEventConsumer
{
    public function run(): void
    {
        $conf = new Conf();
        $conf->set('group.id', 'user-events-analytics');
        $conf->set('bootstrap.servers', config('kafka.brokers'));
        $conf->set('enable.auto.commit', 'false');  // ручний commit
        $conf->set('auto.offset.reset', 'earliest');
        $conf->set('session.timeout.ms', '45000');
        $conf->set('max.poll.interval.ms', '300000');

        $conf->setRebalanceCb(function ($kafka, $err, $partitions) {
            if ($err === RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
                $kafka->assign($partitions);
                Log::info('Partitions assigned', ['count' => count($partitions)]);
            } else {
                $kafka->assign(null);
            }
        });

        $consumer = new KafkaConsumer($conf);
        $consumer->subscribe(['user-events', 'order-events']);

        while (true) {
            $message = $consumer->consume(timeout_ms: 1000);

            if ($message === null) continue;
            if ($message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) continue;
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                Log::error('Kafka consume error', ['error' => $message->errstr()]);
                continue;
            }

            try {
                $payload = json_decode($message->payload, true);
                $this->handle($message->topic_name, $payload);
                $consumer->commit($message);  // commit тільки після успішної обробки
            } catch (\Throwable $e) {
                Log::error('Processing failed', [
                    'topic'  => $message->topic_name,
                    'offset' => $message->offset,
                    'error'  => $e->getMessage(),
                ]);
                // Не робимо commit — повідомлення буде перечитано
            }
        }
    }

    private function handle(string $topic, array $payload): void
    {
        match ($payload['event']) {
            'user.registered' => $this->onUserRegistered($payload),
            'user.deleted'    => $this->onUserDeleted($payload),
            default           => Log::debug('Unknown event', ['event' => $payload['event']]),
        };
    }
}

Node.js: kafkajs

import { Kafka, CompressionTypes } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'myapp-api',
  brokers: [process.env.KAFKA_BROKERS!],
  retry: {
    retries: 5,
    initialRetryTime: 300,
    factor: 0.2,
  },
});

// Producer
const producer = kafka.producer({
  allowAutoTopicCreation: false,
  idempotent: true,
  maxInFlightRequests: 5,
});

await producer.connect();

await producer.send({
  topic: 'user-events',
  compression: CompressionTypes.Snappy,
  messages: [{
    key: String(userId),
    value: JSON.stringify({ event: 'user.login', userId, ip, timestamp: Date.now() }),
    headers: { 'content-type': 'application/json' },
  }],
});

// Consumer
const consumer = kafka.consumer({ groupId: 'audit-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const payload = JSON.parse(message.value!.toString());

    await AuditLog.create({
      event: payload.event,
      userId: payload.userId,
      metadata: payload,
    });
  },
});

Kafka Streams для агрегації

// Java: підрахунок подій за останні 5 хвилин
StreamsBuilder builder = new StreamsBuilder();

KStream<String, UserEvent> events = builder.stream("user-events");

events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count()
    .toStream()
    .to("user-activity-counts");

Schema Registry для контракту даних

# Avro схема для user-events
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.myapp.events",
  "fields": [
    {"name": "event", "type": "string"},
    {"name": "user_id", "type": "long"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

Schema Registry запобігає несумісним змінам формату повідомлень між producer та consumer.

Моніторинг

# Затримка групи користувачів (зміщення від кінця топіку)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group user-events-analytics \
  --describe

# Grafana: kafka_consumer_group_lag > 10000 → сигнал тривоги
# JMX Exporter → Prometheus → Grafana дашборд

Терміни реалізації

Завдання Термін
Kafka + базовий producer/consumer PHP/Node.js 3–4 дні
Schema Registry + Avro +2 дні
Kafka Streams агрегація 3–5 днів
3-вузловий кластер Kafka в Kubernetes 4–5 днів