Налаштування Exchange та Queue RabbitMQ (Direct, Fanout, Topic)

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Налаштування Exchange та Queue RabbitMQ (Direct, Fanout, Topic)
Середня
~2-3 робочих дні
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Налаштування RabbitMQ Exchange та Queue (Direct, Fanout, Topic)

RabbitMQ не доставляє повідомлення прямо в черги — між продюсером і чергою стоїть exchange. Тип exchange визначає, куди піде повідомлення. Правильний вибір типу скорочує складність маршрутизації в коді.

Чотири типи exchange

Direct — точна відповідність routing key. Повідомлення з ключем order.created йде в черги, прив'язані з тим же ключем. Один-до-одного або один-до-багатьох (кілька черг з одним ключем).

Fanout — routing key ігнорується, повідомлення розсилається у всі прив'язані черги. Broadcast: подія "користувач авторизувався" має бути отримана логгером, сервісом сесій та аналітикою.

Topic — routing key з підстановочними символами. * — одне слово, # — нуль або більше слів. order.*.created буде збігатися з order.express.created та order.regular.created. order.# буде збігатися з будь-яким ключем, що починається з order..

Headers — маршрутизація за заголовками AMQP, routing key ігнорується. Використовується рідко, коли маршрут визначається кількома атрибутами.

Створення через CLI

# Direct exchange
rabbitmqadmin declare exchange \
    name=orders \
    type=direct \
    durable=true \
    auto_delete=false

# Fanout exchange
rabbitmqadmin declare exchange \
    name=events-broadcast \
    type=fanout \
    durable=true

# Topic exchange
rabbitmqadmin declare exchange \
    name=app-events \
    type=topic \
    durable=true \
    arguments='{"alternate-exchange":"app-events-unrouted"}'

# Черги
rabbitmqadmin declare queue \
    name=order-processing \
    durable=true \
    arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"order-processing.failed","x-delivery-limit":5}'

rabbitmqadmin declare queue \
    name=order-notifications \
    durable=true \
    arguments='{"x-queue-type":"quorum"}'

# Bindings
rabbitmqadmin declare binding \
    source=orders \
    destination=order-processing \
    routing_key=order.created

rabbitmqadmin declare binding \
    source=app-events \
    destination=order-processing \
    routing_key="order.#"

rabbitmqadmin declare binding \
    source=app-events \
    destination=order-notifications \
    routing_key="order.*.created"

Створення через Management HTTP API

BASE="http://rabbit-1:15672/api"
AUTH="admin:password"

# Створюємо topic exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/app-events" \
  -H "Content-Type: application/json" \
  -d '{
    "type": "topic",
    "durable": true,
    "auto_delete": false,
    "arguments": {
      "alternate-exchange": "app-events-unrouted"
    }
  }'

# Dead Letter Exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/dlx" \
  -H "Content-Type: application/json" \
  -d '{"type": "direct", "durable": true}'

# DLQ черга
curl -u $AUTH -X PUT "$BASE/queues/%2F/failed-messages" \
  -H "Content-Type: application/json" \
  -d '{
    "durable": true,
    "arguments": {
      "x-queue-type": "quorum",
      "x-message-ttl": 2592000000
    }
  }'

# Прив'язуємо DLQ до DLX
curl -u $AUTH -X POST "$BASE/bindings/%2F/e/dlx/q/failed-messages" \
  -H "Content-Type: application/json" \
  -d '{"routing_key": "#"}'

PHP-продюсер через php-amqplib

use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class EventPublisher
{
    private AMQPLazyConnection $connection;
    private ?\AMQPChannel $channel = null;

    public function __construct(
        private readonly array $hosts, // [['host'=>'rabbit-1','port'=>5672,'user'=>'...','password'=>'...']]
    ) {}

    private function channel(): \AMQPChannel
    {
        if ($this->channel === null || !$this->channel->is_open()) {
            $this->connection = AMQPLazyConnection::create_connection($this->hosts, [
                'heartbeat' => 60,
                'connection_timeout' => 5,
                'read_write_timeout' => 10,
            ]);
            $this->channel = $this->connection->channel();
            // Підтвердження доставки
            $this->channel->confirm_select();
        }
        return $this->channel;
    }

    public function publish(string $exchange, string $routingKey, array $payload): void
    {
        $channel = $this->channel();

        $message = new AMQPMessage(
            json_encode($payload, JSON_THROW_ON_ERROR),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'content_type'  => 'application/json',
                'message_id'    => (string) Str::uuid(),
                'timestamp'     => time(),
                'app_id'        => 'webapp',
                'headers'       => new AMQPTable([
                    'x-retry-count' => 0,
                    'x-source'      => 'api',
                ]),
            ]
        );

        $channel->basic_publish($message, $exchange, $routingKey);

        // Очікуємо підтвердження від брокера
        $channel->wait_for_pending_acks(5.0);
    }
}

// Використання
$publisher->publish('app-events', 'order.express.created', [
    'order_id' => 12345,
    'user_id'  => 67890,
    'amount'   => 1999.99,
]);

PHP-консьюмер

class OrderConsumer
{
    public function consume(): void
    {
        $channel = $this->connection->channel();
        $channel->basic_qos(null, 20, null); // prefetch: не більше 20 повідомлень без ack

        $channel->basic_consume(
            'order-processing',
            '',     // consumer tag (auto-generated)
            false,  // no_local
            false,  // no_ack — ручне підтвердження
            false,  // exclusive
            false,  // nowait
            function (AMQPMessage $message) {
                try {
                    $payload = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
                    $this->processOrder($payload);
                    $message->ack();
                } catch (\Throwable $e) {
                    // Requeue=false → повідомлення йде в DLX за налаштуванням черги
                    $message->nack(false);
                    Log::error('Failed to process order', ['error' => $e->getMessage()]);
                }
            }
        );

        while ($channel->is_consuming()) {
            $channel->wait(null, false, 5.0); // timeout 5s для graceful shutdown
            if ($this->shouldStop()) break;
        }

        $channel->close();
    }
}

Python-клієнт через pika

import pika
import json

credentials = pika.PlainCredentials('webapp', 'password')
parameters = pika.ConnectionParameters(
    host='rabbit-1',
    port=5672,
    credentials=credentials,
    heartbeat=60,
    blocked_connection_timeout=30,
)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Публікуємо в topic exchange
channel.basic_publish(
    exchange='app-events',
    routing_key='user.premium.upgraded',
    body=json.dumps({'user_id': 42, 'plan': 'premium'}),
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent,
        content_type='application/json',
    )
)
connection.close()

Типові паттерни маршрутизації

Work Queue — кілька воркерів читають з однієї черги, RabbitMQ розподіляє round-robin. Direct exchange, один binding.

Pub/Sub — одна подія отримується всіма підписниками. Fanout exchange, кожен сервіс має свою чергу.

Routing — різні події йдуть в різні черги. Direct або topic exchange.

RPC — запит-відповідь через черги. Продюсер створює тимчасову чергу, зазначає її в reply_to, консьюмер відповідає туди.

Графік

День 1 — проектування схеми exchanges/queues/bindings для бізнес-логіки додатку. Створення через Management UI або CLI.

День 2 — інтеграція продюсерів, налаштування confirm mode, graceful reconnect при втраті з'єднання. Інтеграція консьюмерів з ручним ack.

День 3 — тестування маршрутизації, перевірка DLX, нагрузочне тестування з rabbitmq-perf-test.