Реалізація черги завдань скрейпінгу (Redis/RabbitMQ/BullMQ)
Скрейпінг у циклі — наївне рішення, яке ломається при першій же помилці мережі. Черга завдань вирішує три проблеми разом: ізоляція збоїв, повторні спроби та горизонтальне масштабування.
Вибір брокера
| Брокер | Плюси | Коли використовувати |
|---|---|---|
| BullMQ (Redis) | Проста настройка, UI з коробки, пріоритети | Node.js-стек, до ~100k завдань/день |
| Celery (Redis/RabbitMQ) | Python-екосистема, ланцюжки завдань (chains, chords) | Python-стек, складні пайплайни |
| RabbitMQ | Надійна доставка, routing keys, dead-letter | Високонавантажені системи, multiple consumers |
| Sidekiq (Redis) | Ruby, мінімум конфігурації | Rails-застосунки |
Для більшості веб-проектів BullMQ або Celery — оптимальний вибір. RabbitMQ виправданий, коли потрібні гарантії доставки на рівні AMQP та складна маршрутизація між сервісами.
BullMQ: базова настройка
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
const connection = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: null });
// Створення черги
export const scrapeQueue = new Queue('scraping', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 60_000 },
removeOnComplete: { count: 500 },
removeOnFail: { count: 200 },
},
});
// Додання завдання
await scrapeQueue.add('scrape-url', {
url: 'https://example.com/catalog?page=5',
siteId: 42,
depth: 1,
}, { priority: 1 });
// Воркер
const worker = new Worker('scraping', async (job: Job) => {
const { url, siteId } = job.data;
const html = await fetchWithProxy(url);
const products = parseProducts(html);
await saveProducts(products, siteId);
return { count: products.length };
}, { connection, concurrency: 5 });
worker.on('failed', (job, err) => {
logger.error(`Job ${job?.id} failed: ${err.message}`);
});
Celery: пайплайн з ланцюжками завдань
Celery дозволяє будувати ланцюжки: спочатку спарсити список, потім для кожного елемента запустити детальний скрейпінг.
from celery import Celery, chain, chord
import redis
app = Celery('scraper', broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1')
app.conf.task_routes = {
'scraper.tasks.fetch_listing': {'queue': 'listings'},
'scraper.tasks.fetch_product': {'queue': 'products'},
}
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_listing(self, url: str, site_id: int) -> list[str]:
try:
html = fetch_page(url)
return extract_product_urls(html)
except (NetworkError, RateLimitError) as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
@app.task(bind=True, max_retries=3)
def fetch_product(self, url: str, site_id: int) -> dict:
try:
html = fetch_page(url)
return parse_product(html)
except Exception as exc:
raise self.retry(exc=exc)
@app.task
def save_products(products: list[dict], site_id: int):
bulk_upsert(products, site_id)
# Запуск пайплайну
def start_site_crawl(site_id: int, catalog_url: str):
urls = fetch_listing.delay(catalog_url, site_id).get()
chord(
fetch_product.s(url, site_id) for url in urls
)(save_products.s(site_id))
Dead Letter Queue
Завдання, що вичерпали всі спроби, потрапляють у DLQ. Це не просто смітник — це черга для ручного аналізу та переробки.
# RabbitMQ: настройка DLQ через аргументи черги
channel.queue_declare(
queue='scraping.products',
durable=True,
arguments={
'x-dead-letter-exchange': 'scraping.dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 3600000, # 1 година
}
)
channel.exchange_declare(exchange='scraping.dlx', exchange_type='direct')
channel.queue_declare(queue='scraping.failed', durable=True)
channel.queue_bind(queue='scraping.failed', exchange='scraping.dlx', routing_key='failed')
Завдання з DLQ можна переотправити у основну чергу після усунення причини збою — через Admin UI або скрипт.
Моніторинг черги
BullMQ Board (UI для BullMQ) або Flower (для Celery) дають візуальне представлення про стан черг. Ключові метрики, які стоїть відслідковувати:
- Глибина черги (waiting jobs)
- Швидкість обробки (jobs/sec)
- Відсоток помилок по типам завдань
- Час виконання (p50, p95, p99)
Ці метрики експортуються в Prometheus через /metrics endpoint та візуалізуються у Grafana.
Час реалізації
Черга на BullMQ або Celery з повторними спробами та базовим моніторингом — 3–4 робочі дні. Інтеграція з RabbitMQ, DLQ, Prometheus-метрики, UI для керування завданнями — ще 2–3 дні.







