Реалізація розподіленого скрейпінгу (кілька воркерів) для масштабування
Один воркер упирається в швидкість мережі, обмеження цільового сайту та пропускну спроможність проксі. Розподілена система дозволяє горизонтально масштабуватися: 5 воркерів дають не просто ×5 швидкість — вони працюють з різними проксі, різними IP, паралельно обходять різні розділи сайту та не мішають одне одному при записі результатів.
Загальна архітектура
Координатор (Scheduler)
│
▼
Черга завдань (Redis + BullMQ / Celery / Sidekiq)
┌────┴────┐
│ │
Worker-1 Worker-2 ... Worker-N
(proxy-1) (proxy-2) (proxy-N)
│ │
└────┬────┘
▼
Спільне сховище (PostgreSQL + S3)
│
▼
Дедупліцірор / Merger
Координатор не скрейпить сам — тільки генерує завдання та слідкує за прогресом. Воркери stateless: будь-який може взяти будь-яке завдання.
Структура завдання
{
"task_id": "uuid-v4",
"job_type": "scrape_product_list",
"url": "https://target.com/catalog?page=42",
"site_id": 7,
"depth": 2,
"retry_count": 0,
"max_retries": 3,
"proxy_pool": "residential",
"created_at": "2025-03-01T12:00:00Z"
}
Координатор: генерація завдань
Для сайту з пагінацією координатор спочатку визначає кількість сторінок, потім видає завдання на кожну:
class CatalogScheduler:
def __init__(self, queue: Queue, site_config: dict):
self.queue = queue
self.config = site_config
async def schedule_full_crawl(self):
total_pages = await self.detect_total_pages(self.config['catalog_url'])
tasks = [
ScrapeTask(
url=self.config['catalog_url'] + f"?page={i}",
job_type="scrape_listing",
site_id=self.config['id'],
)
for i in range(1, total_pages + 1)
]
await self.queue.bulk_add(tasks, priority=5)
logger.info(f"Scheduled {len(tasks)} tasks for site {self.config['id']}")
Завдання на сторінки листингу породжують вторинні завдання — на окремі карточки товарів. Це двохрівнева черга з різними пріоритетами.
Управління проксі
Кожен воркер привязан до пулу проксі. Ротація — по round-robin з «карантином» забаніних IP:
class ProxyRotator:
def __init__(self, proxies: list[str]):
self.proxies = proxies
self.banned: dict[str, datetime] = {}
self.idx = 0
def get_proxy(self) -> str:
for _ in range(len(self.proxies)):
proxy = self.proxies[self.idx % len(self.proxies)]
self.idx += 1
ban_until = self.banned.get(proxy)
if ban_until and ban_until > datetime.utcnow():
continue
return proxy
raise NoProxyAvailable("All proxies are in cooldown")
def report_banned(self, proxy: str, cooldown_minutes: int = 30):
self.banned[proxy] = datetime.utcnow() + timedelta(minutes=cooldown_minutes)
Резидентні проксі ротуються рідше, датацентрові — частіше (виявляються швидше).
Запобігання дублюванню роботи
Перед додаванням URL у чергу координатор перевіряє Bloom filter або Redis Set:
async def should_schedule(self, url: str) -> bool:
normalized = normalize_url(url) # прибрати UTM, trailing slash, тощо
key = f"visited:{self.site_id}"
return not await self.redis.sismember(key, normalized)
async def mark_visited(self, url: str):
normalized = normalize_url(url)
await self.redis.sadd(f"visited:{self.site_id}", normalized)
await self.redis.expire(f"visited:{self.site_id}", 86400 * 7)
Bloom filter переважніший для масштабу > 10M URL: пам'яті потрібно в 50–100 раз менше, ніж у Set, при допустимій похибці < 0.1%.
Узгоджена запис результатів
Кілька воркерів пишуть у одну БД одночасно. Конфлікти обробляються через INSERT ... ON CONFLICT DO UPDATE:
INSERT INTO scraped_products (site_id, external_id, url, data, scraped_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (site_id, external_id)
DO UPDATE SET
data = EXCLUDED.data,
scraped_at = EXCLUDED.scraped_at,
updated_at = NOW()
WHERE scraped_products.scraped_at < EXCLUDED.scraped_at - INTERVAL '1 hour';
Умова WHERE scraped_at < EXCLUDED.scraped_at - '1 hour' не дає двом воркерам, попавшим на один товар, бесконечно перезаписувати одне одного.
Масштабування воркерів
Воркери запускаються у Docker-контейнерах. Горизонтальне масштабування через docker-compose scale або Kubernetes HPA:
# docker-compose.yml
services:
scraper-worker:
image: scraper:latest
environment:
- REDIS_URL=redis://redis:6379
- DB_URL=postgresql://...
- PROXY_LIST=/run/secrets/proxies
deploy:
replicas: 5
restart: unless-stopped
При додаванні нових воркерів черга автоматично розподіляє навантаження — ніякої ручної конфігурації.
Моніторинг прогресу
Координатор веде лічильники в Redis:
scrape_job:{job_id}:total → 2400 (всього завдань)
scrape_job:{job_id}:done → 1837 (виконано)
scrape_job:{job_id}:failed → 12 (помилки)
scrape_job:{job_id}:started_at → timestamp
Розрахункове час закінчення = (total - done) / (done / elapsed_seconds).
Дашборд (BullMQ Board або власний UI) показує активні завдання, швидкість обробки, розподіл по воркерам та статистику помилок.
Типові конфігурації та продуктивність
| Воркерів | Проксі | Швидкість | Підходить для |
|---|---|---|---|
| 3 | 10 датацентрових | ~500 стр/хв | Каталоги до 100к товарів |
| 10 | 50 резидентних | ~1500 стр/хв | Великі маркетплейси |
| 20+ | 100+ резидентних | ~5000 стр/хв | Щоденний повний обхід |
Час реалізації
Базова розподілена система з 2–3 воркерами, Redis-чергою та PostgreSQL-сховищем — 8–10 робочих днів. Динамічна ротація проксі, Bloom filter для дедуплікації, автомасштабування через Docker/K8s, дашборд моніторингу — ще 5–7 днів.







