Налаштування Dead Letter Queue для обробки помилок
Коли консьюмер не може обробити повідомлення — воно не повинно просто зникати. Dead Letter Queue (DLQ) — це черга, куди автоматично потрапляють повідомлення, які не вдалося доставити: відхилені, закінчені за TTL або що перевищили ліміт доставки.
Без DLQ втрачені повідомлення неслідкови. З DLQ — є можливість розібратися з помилкою, виправити та повернути повідомлення на обробку.
Механізм DLQ у RabbitMQ
Повідомлення переміщується в Dead Letter Exchange за трьох умов:
- Консьюмер викликав
basic.nackабоbasic.rejectзrequeue=false - Закінчився TTL повідомлення (
x-message-ttlна черзі абоexpirationу властивостях) - Черга переповнена (
x-max-lengthабоx-max-length-bytes)
# 1. Створюємо Dead Letter Exchange
rabbitmqadmin declare exchange \
name=dlx \
type=direct \
durable=true
# 2. Створюємо DLQ
rabbitmqadmin declare queue \
name=order-processing-dlq \
durable=true \
arguments='{"x-queue-type":"quorum","x-message-ttl":2592000000}'
# 30 днів утримання для аналізу
# 3. Прив'язуємо DLQ до DLX
rabbitmqadmin declare binding \
source=dlx \
destination=order-processing-dlq \
routing_key=order-processing.failed
# 4. Основна черга з посиланням на DLX
rabbitmqadmin declare queue \
name=order-processing \
durable=true \
arguments='{
"x-queue-type": "quorum",
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "order-processing.failed",
"x-delivery-limit": 3
}'
# x-delivery-limit: після 3 спроб — в DLQ (тільки для quorum queues)
Retry з експоненціальною затримкою
Простий nack одразу повертає повідомлення в чергу — воркер знову його бере і знову падає. Правильний підхід — затримана спроба через ланцюг черг.
# Черга з затримкою 1 хвилина
rabbitmqadmin declare queue \
name=order-processing-retry-1m \
durable=true \
arguments='{
"x-message-ttl": 60000,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "order-processing",
"x-queue-type": "classic"
}'
# Повідомлення закінчується через 1 хвилину → автоматично йде в основну чергу
# Черга з затримкою 10 хвилин
rabbitmqadmin declare queue \
name=order-processing-retry-10m \
durable=true \
arguments='{
"x-message-ttl": 600000,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "order-processing",
"x-queue-type": "classic"
}'
# Черга з затримкою 1 година
rabbitmqadmin declare queue \
name=order-processing-retry-1h \
durable=true \
arguments='{
"x-message-ttl": 3600000,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "order-processing",
"x-queue-type": "classic"
}'
Логіка консьюмера:
function handleMessage(AMQPMessage $message): void
{
$headers = $message->get('application_headers');
$retryCount = $headers ? (int)($headers->getNativeData()['x-retry-count'] ?? 0) : 0;
try {
processOrder(json_decode($message->body, true));
$message->ack();
} catch (TemporaryException $e) {
// Тимчасова помилка — повторна спроба
$retryCount++;
if ($retryCount >= 3) {
// Вичерпали спроби — в DLQ
$message->nack(false);
return;
}
// Відправляємо в retry-чергу з затримкою
$retryQueue = match($retryCount) {
1 => 'order-processing-retry-1m',
2 => 'order-processing-retry-10m',
default => 'order-processing-retry-1h',
};
$retryMessage = new AMQPMessage(
$message->body,
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'headers' => new AMQPTable(array_merge(
$headers ? $headers->getNativeData() : [],
[
'x-retry-count' => $retryCount,
'x-original-queue' => 'order-processing',
'x-last-error' => $e->getMessage(),
'x-retry-at' => date('Y-m-d H:i:s'),
]
)),
]
);
$channel->basic_publish($retryMessage, '', $retryQueue);
$message->ack(); // ack оригіналу, щоб не було дублів
} catch (PermanentException $e) {
// Постійна помилка — одразу в DLQ
$message->nack(false);
Log::error('Permanent failure, message sent to DLQ', [
'order_id' => $payload['order_id'],
'error' => $e->getMessage(),
]);
}
}
Kafka DLQ
У Kafka немає вбудованого механізму DLQ — він реалізується в коді консьюмера:
@Component
public class OrderEventConsumer {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String DLQ_TOPIC = "order-events-dlq";
@KafkaListener(topics = "order-events", groupId = "order-processor")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
processOrder(record.value());
ack.acknowledge();
} catch (RetriableException e) {
// Spring Kafka автоматично повторює з затримкою
throw e; // не ack — SeekToCurrentErrorHandler візьме управління
} catch (Exception e) {
// Non-retriable — відправляємо в DLQ
sendToDlq(record, e);
ack.acknowledge(); // ack оригіналу, щоб не застрягти
}
}
private void sendToDlq(ConsumerRecord<String, String> original, Exception error) {
Headers headers = new RecordHeaders(original.headers().toArray());
headers.add("x-original-topic", original.topic().getBytes());
headers.add("x-original-partition", String.valueOf(original.partition()).getBytes());
headers.add("x-original-offset", String.valueOf(original.offset()).getBytes());
headers.add("x-error-message", error.getMessage().getBytes());
headers.add("x-failed-at", Instant.now().toString().getBytes());
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
null,
original.key(),
original.value(),
headers
);
kafkaTemplate.send(dlqRecord);
log.error("Sent to DLQ: topic={} partition={} offset={} error={}",
original.topic(), original.partition(), original.offset(), error.getMessage());
}
}
Конфігурація Spring Kafka з автоматичною повторною спробою:
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<?, ?> template) {
// Експоненціальна затримка: 1s, 2s, 4s, 8s, 16s
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5);
backOff.setInitialInterval(1000L);
backOff.setMultiplier(2.0);
backOff.setMaxInterval(16000L);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition(record.topic() + "-dlq", record.partition() % 3)
);
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);
handler.addNotRetryableExceptions(
JsonProcessingException.class,
IllegalArgumentException.class
);
return handler;
}
Моніторинг DLQ
Сигнал про зростання DLQ — перший ознак системної проблеми:
# Правило Prometheus alert
- alert: DLQGrowth
expr: rabbitmq_queue_messages{queue=~".*dlq.*"} > 100
for: 5m
labels:
severity: warning
annotations:
summary: "DLQ {{ $labels.queue }} has {{ $value }} messages"
- alert: DLQSpike
expr: rate(rabbitmq_queue_messages_published_total{queue=~".*dlq.*"}[5m]) > 10
for: 2m
labels:
severity: critical
Інструмент для повторної обробки повідомлень з DLQ
#!/bin/bash
# reprocess-dlq.sh — переміщуємо повідомлення з DLQ назад у основну чергу
DLQ="order-processing-dlq"
TARGET_QUEUE="order-processing"
BATCH=100
for i in $(seq 1 $BATCH); do
MESSAGE=$(rabbitmqadmin get queue=$DLQ ackmode=ack_requeue_false count=1 2>/dev/null)
if [ -z "$MESSAGE" ]; then
echo "DLQ is empty"
break
fi
# Публікуємо назад у основну чергу
BODY=$(echo "$MESSAGE" | python3 -c "import sys,json; msgs=json.load(sys.stdin); print(msgs[0]['payload'] if msgs else '')" 2>/dev/null)
rabbitmqadmin publish exchange='' routing_key="$TARGET_QUEUE" payload="$BODY"
done
Графік
День 1 — проектування схеми DLQ: для кожної робочої черги — DLX, DLQ, очереди retry з TTL. Створення через CLI або Management API.
День 2 — інтеграція retry-логіки в консьюмерів, збереження метаданих помилки у заголовках (original topic, timestamp, error message).
День 3 — сигнали про зростання DLQ, інструмент для reprocess, документування процедури обробки DLQ-повідомлень для команди підтримки.







