Налаштування RabbitMQ Exchange та Queue (Direct, Fanout, Topic)
RabbitMQ не доставляє повідомлення прямо в черги — між продюсером і чергою стоїть exchange. Тип exchange визначає, куди піде повідомлення. Правильний вибір типу скорочує складність маршрутизації в коді.
Чотири типи exchange
Direct — точна відповідність routing key. Повідомлення з ключем order.created йде в черги, прив'язані з тим же ключем. Один-до-одного або один-до-багатьох (кілька черг з одним ключем).
Fanout — routing key ігнорується, повідомлення розсилається у всі прив'язані черги. Broadcast: подія "користувач авторизувався" має бути отримана логгером, сервісом сесій та аналітикою.
Topic — routing key з підстановочними символами. * — одне слово, # — нуль або більше слів. order.*.created буде збігатися з order.express.created та order.regular.created. order.# буде збігатися з будь-яким ключем, що починається з order..
Headers — маршрутизація за заголовками AMQP, routing key ігнорується. Використовується рідко, коли маршрут визначається кількома атрибутами.
Створення через CLI
# Direct exchange
rabbitmqadmin declare exchange \
name=orders \
type=direct \
durable=true \
auto_delete=false
# Fanout exchange
rabbitmqadmin declare exchange \
name=events-broadcast \
type=fanout \
durable=true
# Topic exchange
rabbitmqadmin declare exchange \
name=app-events \
type=topic \
durable=true \
arguments='{"alternate-exchange":"app-events-unrouted"}'
# Черги
rabbitmqadmin declare queue \
name=order-processing \
durable=true \
arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"order-processing.failed","x-delivery-limit":5}'
rabbitmqadmin declare queue \
name=order-notifications \
durable=true \
arguments='{"x-queue-type":"quorum"}'
# Bindings
rabbitmqadmin declare binding \
source=orders \
destination=order-processing \
routing_key=order.created
rabbitmqadmin declare binding \
source=app-events \
destination=order-processing \
routing_key="order.#"
rabbitmqadmin declare binding \
source=app-events \
destination=order-notifications \
routing_key="order.*.created"
Створення через Management HTTP API
BASE="http://rabbit-1:15672/api"
AUTH="admin:password"
# Створюємо topic exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/app-events" \
-H "Content-Type: application/json" \
-d '{
"type": "topic",
"durable": true,
"auto_delete": false,
"arguments": {
"alternate-exchange": "app-events-unrouted"
}
}'
# Dead Letter Exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/dlx" \
-H "Content-Type: application/json" \
-d '{"type": "direct", "durable": true}'
# DLQ черга
curl -u $AUTH -X PUT "$BASE/queues/%2F/failed-messages" \
-H "Content-Type: application/json" \
-d '{
"durable": true,
"arguments": {
"x-queue-type": "quorum",
"x-message-ttl": 2592000000
}
}'
# Прив'язуємо DLQ до DLX
curl -u $AUTH -X POST "$BASE/bindings/%2F/e/dlx/q/failed-messages" \
-H "Content-Type: application/json" \
-d '{"routing_key": "#"}'
PHP-продюсер через php-amqplib
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class EventPublisher
{
private AMQPLazyConnection $connection;
private ?\AMQPChannel $channel = null;
public function __construct(
private readonly array $hosts, // [['host'=>'rabbit-1','port'=>5672,'user'=>'...','password'=>'...']]
) {}
private function channel(): \AMQPChannel
{
if ($this->channel === null || !$this->channel->is_open()) {
$this->connection = AMQPLazyConnection::create_connection($this->hosts, [
'heartbeat' => 60,
'connection_timeout' => 5,
'read_write_timeout' => 10,
]);
$this->channel = $this->connection->channel();
// Підтвердження доставки
$this->channel->confirm_select();
}
return $this->channel;
}
public function publish(string $exchange, string $routingKey, array $payload): void
{
$channel = $this->channel();
$message = new AMQPMessage(
json_encode($payload, JSON_THROW_ON_ERROR),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'message_id' => (string) Str::uuid(),
'timestamp' => time(),
'app_id' => 'webapp',
'headers' => new AMQPTable([
'x-retry-count' => 0,
'x-source' => 'api',
]),
]
);
$channel->basic_publish($message, $exchange, $routingKey);
// Очікуємо підтвердження від брокера
$channel->wait_for_pending_acks(5.0);
}
}
// Використання
$publisher->publish('app-events', 'order.express.created', [
'order_id' => 12345,
'user_id' => 67890,
'amount' => 1999.99,
]);
PHP-консьюмер
class OrderConsumer
{
public function consume(): void
{
$channel = $this->connection->channel();
$channel->basic_qos(null, 20, null); // prefetch: не більше 20 повідомлень без ack
$channel->basic_consume(
'order-processing',
'', // consumer tag (auto-generated)
false, // no_local
false, // no_ack — ручне підтвердження
false, // exclusive
false, // nowait
function (AMQPMessage $message) {
try {
$payload = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
$this->processOrder($payload);
$message->ack();
} catch (\Throwable $e) {
// Requeue=false → повідомлення йде в DLX за налаштуванням черги
$message->nack(false);
Log::error('Failed to process order', ['error' => $e->getMessage()]);
}
}
);
while ($channel->is_consuming()) {
$channel->wait(null, false, 5.0); // timeout 5s для graceful shutdown
if ($this->shouldStop()) break;
}
$channel->close();
}
}
Python-клієнт через pika
import pika
import json
credentials = pika.PlainCredentials('webapp', 'password')
parameters = pika.ConnectionParameters(
host='rabbit-1',
port=5672,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=30,
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Публікуємо в topic exchange
channel.basic_publish(
exchange='app-events',
routing_key='user.premium.upgraded',
body=json.dumps({'user_id': 42, 'plan': 'premium'}),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
content_type='application/json',
)
)
connection.close()
Типові паттерни маршрутизації
Work Queue — кілька воркерів читають з однієї черги, RabbitMQ розподіляє round-robin. Direct exchange, один binding.
Pub/Sub — одна подія отримується всіма підписниками. Fanout exchange, кожен сервіс має свою чергу.
Routing — різні події йдуть в різні черги. Direct або topic exchange.
RPC — запит-відповідь через черги. Продюсер створює тимчасову чергу, зазначає її в reply_to, консьюмер відповідає туди.
Графік
День 1 — проектування схеми exchanges/queues/bindings для бізнес-логіки додатку. Створення через Management UI або CLI.
День 2 — інтеграція продюсерів, налаштування confirm mode, graceful reconnect при втраті з'єднання. Інтеграція консьюмерів з ручним ack.
День 3 — тестування маршрутизації, перевірка DLX, нагрузочне тестування з rabbitmq-perf-test.







