Настройка парсинга по розписанню (cron)
Більшість data pipeline'ів для крипто-проектів працюють по розписанню: кожні 5 хвилин — свіжі ціни, кожен час — on-chain метрики, щодня — сводка по протоколах. Cron — стандартний механізм, але наївна реалізація через crontab швидко перетворюється в неуправлюваний зоопарк: задачи падають мовчки, перекриваються при довгому виконанні, нема єдиного місця для мониторингу.
Варіанти реалізації: від простого до надійного
Системний cron (crontab)
Підходить для одного сервера та простих задач. Синтаксис:
# Кожні 5 хвилин — ціни
*/5 * * * * /usr/bin/python3 /app/scrapers/prices.py >> /var/log/prices.log 2>&1
# Кожен час у 0 хвилину — on-chain метрики
0 * * * * cd /app && /usr/bin/python3 -m scrapers.onchain >> /var/log/onchain.log 2>&1
# Кожен день у 03:00 UTC — daily report
0 3 * * * /app/scripts/daily_report.sh
Проблеми: якщо задача виконується довше інтервалу — наступний запуск почнеться поверх поточного. Ошибки видні лише у log файлах. Немає retry при ошибках. Немає залежностей між задачами.
Для захисту від паралельних запусків — flock:
*/5 * * * * flock -n /tmp/prices.lock /usr/bin/python3 /app/scrapers/prices.py
-n — пропустити запуск якщо lock уже занято (неідемпотентно). Або --wait 0 для явної помилки.
node-cron / APScheduler в процесі приложення
Якщо основне приложение вже на Node.js або Python — можна встроїти планувальник:
// Node.js: node-cron
import cron from "node-cron";
cron.schedule("*/5 * * * *", async () => {
try {
await scrapePrices();
} catch (err) {
logger.error("Price scraping failed", { err });
await alertSlack(err);
}
}, { timezone: "UTC" });
# Python: APScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler(timezone="UTC")
scheduler.add_job(scrape_prices, "interval", minutes=5, misfire_grace_time=60)
scheduler.add_job(scrape_onchain, "cron", hour="*", misfire_grace_time=300)
scheduler.add_job(generate_report, "cron", hour=3, minute=0)
scheduler.start()
misfire_grace_time — скільки секунд після планового часу ще запускати задачу якщо сервер був недоступен. Важливо при рестартах сервісу.
Недостаток: задачи живуть всередину одного процесу. Упав процес — упали всі задачи.
Bull / BullMQ (Redis-backed queue)
Для production-систем з кількома воркерами:
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
const connection = new Redis({ host: "localhost", port: 6379 });
const priceQueue = new Queue("price-scraping", { connection });
// Додаємо повторюючуюся задачу
await priceQueue.add(
"scrape-binance",
{ symbols: ["BTCUSDT", "ETHUSDT"] },
{
repeat: { pattern: "*/5 * * * *", tz: "UTC" },
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
}
);
// Воркер — може бути на іншому сервері
const worker = new Worker("price-scraping", async (job) => {
await scrapePrices(job.data.symbols);
}, { connection, concurrency: 2 });
worker.on("failed", (job, err) => {
logger.error(`Job ${job?.id} failed`, { err, attempts: job?.attemptsMade });
});
Переваги: retry з backoff, кілька воркерів для паралельної обробки, вбудований dashboard (Bull Board), persistent queue у Redis — задачи не теряються при рестарті.
Kubernetes CronJob
Для cloud-native інфраструктури:
apiVersion: batch/v1
kind: CronJob
metadata:
name: price-scraper
spec:
schedule: "*/5 * * * *"
timeZone: "UTC"
concurrencyPolicy: Forbid # Не запускати якщо попередній ще працює
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: scraper
image: your-registry/scraper:latest
env:
- name: SCRAPE_TYPE
value: "prices"
resources:
limits:
memory: "512Mi"
cpu: "500m"
concurrencyPolicy: Forbid — еквівалент flock, але на рівні Kubernetes. Allow — запускати паралельно. Replace — убити старий та запустити новий.
Мониторинг та алертинг
Мовчки падаючий крон — гірше ніж повна відсутність крона. Потрібно знати про кожен сбій.
Healthcheck ping (Deadman's switch pattern): кожна успішно виконана задача пінгує мониторинг-сервіс. Якщо ping не прийшов у очікуваний час — алерт.
Сервіси: Cronitor, Healthchecks.io (open source, можна self-host), Better Uptime.
import httpx
async def scrape_prices():
try:
# ... основна логіка ...
await httpx.get(f"https://hc-ping.com/{HEALTHCHECK_UUID}") # success ping
except Exception as e:
await httpx.get(f"https://hc-ping.com/{HEALTHCHECK_UUID}/fail")
raise
Prometheus метрики для більш детального мониторингу:
-
scraper_last_success_timestamp— коли останній раз успішно виконалось -
scraper_duration_seconds— час виконання (алерт якщо різко виросло) -
scraper_items_collected— кількість зібраних записів (алерт якщо 0)
Grafana dashboard з цими метриками дозволяє за секунди видити стан всіх задач та історичні тренди.
Обробка залежностей між задачами
Якщо задача B повинна виконуватися після задачи A — просто часові offset у cron недостатньо (A може завершитися за різне часу). Правильне рішення:
Event-driven: задача A при завершенні публікує подію в Kafka/Redis Streams, задача B підписана на цю подію.
DAG orchestration: Apache Airflow або Prefect для складних залежностей. Задачи — nodes у DAG, залежності — edges. Airflow сам управляє порядком запуску, retry, логами.
# Prefect flow з залежностями
from prefect import flow, task
@task(retries=3, retry_delay_seconds=60)
def collect_raw_data(): ...
@task
def transform_data(raw): ...
@task
def load_to_db(transformed): ...
@flow(name="daily-etl")
def daily_pipeline():
raw = collect_raw_data()
transformed = transform_data(raw)
load_to_db(transformed)
Для простих лінійних пайплайнів Prefect або Dagster — добре співвідношення можливостей та складності. Airflow потужніше, але потребує більше інфраструктури.







