Налаштування черг повідомлень (RabbitMQ) для веб-застосунку

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Налаштування черг повідомлень (RabbitMQ) для веб-застосунку
Складна
~3-5 робочих днів
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Налаштування RabbitMQ як чергиці повідомлень

RabbitMQ — брокер повідомлень за протоколом AMQP. Розподіляє завдання між компонентами системи: веб-додаток публікує завдання (електронні листи, генерація PDF, повідомлення), воркери обробляють їх асинхронно. Якщо воркер впаде, повідомлення залишаються в черзі й обробляються після перезавантаження.

Встановлення через Docker

# docker-compose.yml
services:
  rabbitmq:
    image: rabbitmq:3.13-management-alpine
    environment:
      RABBITMQ_DEFAULT_USER: myapp
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_DEFAULT_VHOST: myapp
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # Management UI
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:

Ключові концепції

Exchange — точка входу для публікації. Розподіляє повідомлення по черзях за правилами:

  • direct — точне збігання routing key
  • topic — паттерн з * та #
  • fanout — всім підписаним чергам
  • headers — за заголовками повідомлення

Queue — буфер зберігання повідомлень до обробки воркером.

Binding — зв'язок між exchange і queue з routing key.

Топологія для веб-додатку

[App] → [myapp.exchange (topic)] → myapp.emails → [Email Worker]
                                 → myapp.notifications → [Push Worker]
                                 → myapp.reports → [Report Worker]

PHP: phpamqplib

// Публікація повідомлення
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitMQPublisher
{
    private AMQPStreamConnection $connection;
    private \PhpAmqpLib\Channel\AMQPChannel $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            host: config('rabbitmq.host'),
            port: config('rabbitmq.port', 5672),
            user: config('rabbitmq.user'),
            password: config('rabbitmq.password'),
            vhost: config('rabbitmq.vhost', '/'),
        );
        $this->channel = $this->connection->channel();
        $this->setup();
    }

    private function setup(): void
    {
        // Durable exchange — переживає перезавантаження брокера
        $this->channel->exchange_declare(
            exchange: 'myapp.exchange',
            type: 'topic',
            durable: true,
            auto_delete: false,
        );

        // Dead Letter Queue для невдалих повідомлень
        $this->channel->queue_declare(
            queue: 'myapp.dlq',
            durable: true,
            arguments: new AMQPTable(['x-queue-type' => 'classic'])
        );

        // Основна черга зі зв'язком до DLQ
        $this->channel->queue_declare(
            queue: 'myapp.emails',
            durable: true,
            arguments: new AMQPTable([
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => 'myapp.dlq',
                'x-message-ttl' => 86400000,  // 24 години в мс
            ])
        );

        $this->channel->queue_bind('myapp.emails', 'myapp.exchange', 'emails.*');
        $this->channel->queue_bind('myapp.notifications', 'myapp.exchange', 'notifications.*');
    }

    public function publish(string $routingKey, array $payload): void
    {
        $message = new AMQPMessage(
            json_encode($payload),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'content_type'  => 'application/json',
                'message_id'    => Str::uuid()->toString(),
                'timestamp'     => time(),
            ]
        );

        $this->channel->basic_publish($message, 'myapp.exchange', $routingKey);
    }

    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// Використання
$publisher->publish('emails.welcome', [
    'user_id' => $user->id,
    'email'   => $user->email,
    'name'    => $user->name,
]);

PHP: Consumer воркер

class EmailWorker
{
    public function run(): void
    {
        $channel = $this->getChannel();

        // Prefetch: не брати більше 1 повідомлення без підтвердження
        $channel->basic_qos(prefetch_size: 0, prefetch_count: 1, global: false);

        $channel->basic_consume(
            queue: 'myapp.emails',
            consumer_tag: gethostname() . '.email',
            no_ack: false,  // ручне підтвердження обов'язково
            callback: [$this, 'handleEmail'],
        );

        while ($channel->is_consuming()) {
            $channel->wait(timeout: 60);
        }
    }

    public function handleEmail(AMQPMessage $message): void
    {
        try {
            $payload = json_decode($message->getBody(), true);

            Mail::to($payload['email'])->send(new WelcomeMail($payload));

            // Підтвердження успішної обробки
            $message->ack();

            Log::info('Email sent', ['user_id' => $payload['user_id']]);

        } catch (\Throwable $e) {
            Log::error('Email failed', ['error' => $e->getMessage()]);

            // Requeue тільки якщо це перша спроба
            $requeue = !$message->has('application_headers') ||
                       ($message->get('application_headers')->getNativeData()['x-death'] ?? null) === null;

            $message->nack(requeue: $requeue);
        }
    }
}

Node.js: amqplib

import amqp, { Connection, Channel } from 'amqplib';

class MessageBus {
  private connection!: Connection;
  private channel!: Channel;

  async connect(): Promise<void> {
    this.connection = await amqp.connect({
      hostname: process.env.RABBITMQ_HOST,
      port: 5672,
      username: process.env.RABBITMQ_USER,
      password: process.env.RABBITMQ_PASS,
      vhost: process.env.RABBITMQ_VHOST,
      heartbeat: 60,
    });

    this.channel = await this.connection.createChannel();
    await this.channel.prefetch(5);

    await this.channel.assertExchange('myapp.exchange', 'topic', { durable: true });
    await this.channel.assertQueue('myapp.notifications', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'myapp.dlq',
      },
    });
    await this.channel.bindQueue('myapp.notifications', 'myapp.exchange', 'notifications.*');
  }

  async publish(routingKey: string, payload: object): Promise<void> {
    const content = Buffer.from(JSON.stringify(payload));
    this.channel.publish('myapp.exchange', routingKey, content, {
      persistent: true,
      contentType: 'application/json',
      messageId: crypto.randomUUID(),
      timestamp: Math.floor(Date.now() / 1000),
    });
  }

  async consume(queue: string, handler: (payload: unknown) => Promise<void>): Promise<void> {
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const payload = JSON.parse(msg.content.toString());
        await handler(payload);
        this.channel.ack(msg);
      } catch (err) {
        console.error('Message processing failed:', err);
        this.channel.nack(msg, false, false);  // надіслати в DLQ
      }
    });
  }
}

Laravel Queue з RabbitMQ

За допомогою пакета vladimir-yuldashev/laravel-queue-rabbitmq:

QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_VHOST=myapp
RABBITMQ_LOGIN=myapp
RABBITMQ_PASSWORD=secret
RABBITMQ_QUEUE=myapp.jobs
// config/queue.php
'rabbitmq' => [
    'driver'   => 'rabbitmq',
    'queue'    => env('RABBITMQ_QUEUE', 'default'),
    'hosts'    => [[
        'host'     => env('RABBITMQ_HOST', '127.0.0.1'),
        'port'     => env('RABBITMQ_PORT', 5672),
        'user'     => env('RABBITMQ_LOGIN', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
        'vhost'    => env('RABBITMQ_VHOST', '/'),
    ]],
    'options' => [
        'queue' => [
            'exchange'      => 'myapp.exchange',
            'exchange_type' => 'topic',
            'exchange_routing_key' => 'jobs.*',
        ],
    ],
],
// Стандартна Laravel Job
class SendWelcomeEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries = 3;
    public int $backoff = 60;

    public function __construct(private User $user) {}

    public function handle(): void
    {
        Mail::to($this->user)->send(new WelcomeMail($this->user));
    }
}

// Публікація
SendWelcomeEmail::dispatch($user)->onQueue('myapp.emails');

Моніторинг через Management API

# Кількість повідомлень у черзі
curl -s -u myapp:password \
  "http://rabbitmq:15672/api/queues/myapp/myapp.emails" | \
  jq '.messages, .consumers'

# Сигнал тривоги: черга зростає
# Grafana: rabbitmq_queue_messages > 1000 → Slack notification

Терміни реалізації

Завдання Термін
RabbitMQ + базовий producer/consumer 2–3 дні
Laravel Queue інтеграція 1–2 дні
Dead Letter Queue + моніторинг +1–2 дні
HA кластер RabbitMQ (3 ноди) 3–4 дні