Настройка парсинга по расписанию (cron)
Большинство data pipeline'ов для крипто-проектов работают по расписанию: каждые 5 минут — свежие цены, каждый час — on-chain метрики, ежедневно — сводка по протоколам. Cron — стандартный механизм, но наивная реализация через crontab быстро превращается в неуправляемый зоопарк: задачи падают молча, перекрываются при длинном выполнении, нет единого места для мониторинга.
Варианты реализации: от простого к надёжному
Системный cron (crontab)
Подходит для одного сервера и простых задач. Синтаксис:
# Каждые 5 минут — цены
*/5 * * * * /usr/bin/python3 /app/scrapers/prices.py >> /var/log/prices.log 2>&1
# Каждый час в 0 минут — on-chain метрики
0 * * * * cd /app && /usr/bin/python3 -m scrapers.onchain >> /var/log/onchain.log 2>&1
# Каждый день в 03:00 UTC — daily report
0 3 * * * /app/scripts/daily_report.sh
Проблемы: если задача выполняется дольше интервала — следующий запуск начнётся поверх текущего. Ошибки видны только в log файлах. Нет retry при ошибках. Нет зависимостей между задачами.
Для защиты от параллельных запусков — flock:
*/5 * * * * flock -n /tmp/prices.lock /usr/bin/python3 /app/scrapers/prices.py
-n — пропустить запуск если lock уже занят (неидемпотентно). Или --wait 0 для явной ошибки.
node-cron / APScheduler в процессе приложения
Если основное приложение уже на Node.js или Python — можно встроить планировщик:
// Node.js: node-cron
import cron from "node-cron";
cron.schedule("*/5 * * * *", async () => {
try {
await scrapePrices();
} catch (err) {
logger.error("Price scraping failed", { err });
await alertSlack(err);
}
}, { timezone: "UTC" });
# Python: APScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler(timezone="UTC")
scheduler.add_job(scrape_prices, "interval", minutes=5, misfire_grace_time=60)
scheduler.add_job(scrape_onchain, "cron", hour="*", misfire_grace_time=300)
scheduler.add_job(generate_report, "cron", hour=3, minute=0)
scheduler.start()
misfire_grace_time — сколько секунд после планового времени ещё запускать задачу если сервер был недоступен. Важно при рестартах сервиса.
Недостаток: задачи живут внутри одного процесса. Упал процесс — упали все задачи.
Bull / BullMQ (Redis-backed queue)
Для продакшн-систем с несколькими воркерами:
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
const connection = new Redis({ host: "localhost", port: 6379 });
const priceQueue = new Queue("price-scraping", { connection });
// Добавляем повторяющуюся задачу
await priceQueue.add(
"scrape-binance",
{ symbols: ["BTCUSDT", "ETHUSDT"] },
{
repeat: { pattern: "*/5 * * * *", tz: "UTC" },
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
}
);
// Воркер — может быть на другом сервере
const worker = new Worker("price-scraping", async (job) => {
await scrapePrices(job.data.symbols);
}, { connection, concurrency: 2 });
worker.on("failed", (job, err) => {
logger.error(`Job ${job?.id} failed`, { err, attempts: job?.attemptsMade });
});
Преимущества: retry с backoff, несколько воркеров для параллельной обработки, встроенный dashboard (Bull Board), persistent queue в Redis — задачи не теряются при рестарте.
Kubernetes CronJob
Для cloud-native инфраструктуры:
apiVersion: batch/v1
kind: CronJob
metadata:
name: price-scraper
spec:
schedule: "*/5 * * * *"
timeZone: "UTC"
concurrencyPolicy: Forbid # Не запускать если предыдущий ещё работает
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: scraper
image: your-registry/scraper:latest
env:
- name: SCRAPE_TYPE
value: "prices"
resources:
limits:
memory: "512Mi"
cpu: "500m"
concurrencyPolicy: Forbid — эквивалент flock, но на уровне Kubernetes. Allow — запускать параллельно. Replace — убить старый и запустить новый.
Мониторинг и алертинг
Молча падающий крон — хуже чем полное отсутствие крона. Нужно знать о каждом сбое.
Healthcheck ping (Deadman's switch pattern): каждая успешно выполненная задача пингует мониторинг-сервис. Если ping не пришёл в ожидаемое время — алерт.
Сервисы: Cronitor, Healthchecks.io (open source, можно self-host), Better Uptime.
import httpx
async def scrape_prices():
try:
# ... основная логика ...
await httpx.get(f"https://hc-ping.com/{HEALTHCHECK_UUID}") # success ping
except Exception as e:
await httpx.get(f"https://hc-ping.com/{HEALTHCHECK_UUID}/fail")
raise
Prometheus метрики для более детального мониторинга:
-
scraper_last_success_timestamp— когда последний раз успешно выполнилось -
scraper_duration_seconds— время выполнения (алерт если резко выросло) -
scraper_items_collected— количество собранных записей (алерт если 0)
Grafana dashboard с этими метриками позволяет за секунды видеть состояние всех задач и исторические тренды.
Обработка зависимостей между задачами
Если задача B должна выполняться после задачи A — просто временные offset в cron не достаточно (A может завершиться за разное время). Правильное решение:
Event-driven: задача A при завершении публикует событие в Kafka/Redis Streams, задача B подписана на это событие.
DAG orchestration: Apache Airflow или Prefect для сложных зависимостей. Задачи — nodes в DAG, зависимости — edges. Airflow сам управляет порядком запуска, retry, логами.
# Prefect flow с зависимостями
from prefect import flow, task
@task(retries=3, retry_delay_seconds=60)
def collect_raw_data(): ...
@task
def transform_data(raw): ...
@task
def load_to_db(transformed): ...
@flow(name="daily-etl")
def daily_pipeline():
raw = collect_raw_data()
transformed = transform_data(raw)
load_to_db(transformed)
Для простых линейных пайплайнов Prefect или Dagster — хорошее соотношение возможностей и сложности. Airflow мощнее, но требует больше инфраструктуры.







