Setting Up Parsing by Schedule (Cron)
Most data pipelines for crypto projects work on schedule: every 5 minutes — fresh prices, every hour — on-chain metrics, daily — protocol summary. Cron is standard mechanism, but naive implementation via crontab quickly becomes unmanageable zoo: tasks fail silently, overlap when execution takes long, no single place for monitoring.
Implementation Variants: From Simple to Reliable
System Cron (crontab)
Suitable for one server and simple tasks. Syntax:
# Every 5 minutes — prices
*/5 * * * * /usr/bin/python3 /app/scrapers/prices.py >> /var/log/prices.log 2>&1
# Every hour at 0 minute — on-chain metrics
0 * * * * cd /app && /usr/bin/python3 -m scrapers.onchain >> /var/log/onchain.log 2>&1
# Every day at 03:00 UTC — daily report
0 3 * * * /app/scripts/daily_report.sh
Problems: if task runs longer than interval — next run starts on top of current one. Errors visible only in log files. No retry on errors. No dependencies between tasks.
For protection from parallel runs — flock:
*/5 * * * * flock -n /tmp/prices.lock /usr/bin/python3 /app/scrapers/prices.py
-n — skip run if lock already taken (not idempotent). Or --wait 0 for explicit error.
node-cron / APScheduler in Application Process
If main application already on Node.js or Python — can embed scheduler:
// Node.js: node-cron
import cron from "node-cron";
cron.schedule("*/5 * * * *", async () => {
try {
await scrapePrices();
} catch (err) {
logger.error("Price scraping failed", { err });
await alertSlack(err);
}
}, { timezone: "UTC" });
# Python: APScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler(timezone="UTC")
scheduler.add_job(scrape_prices, "interval", minutes=5, misfire_grace_time=60)
scheduler.add_job(scrape_onchain, "cron", hour="*", misfire_grace_time=300)
scheduler.add_job(generate_report, "cron", hour=3, minute=0)
scheduler.start()
misfire_grace_time — how many seconds after scheduled time still run task if server was unavailable. Important on service restarts.
Downside: tasks live inside single process. Process crashes — all tasks crash.
Bull / BullMQ (Redis-backed Queue)
For production systems with multiple workers:
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
const connection = new Redis({ host: "localhost", port: 6379 });
const priceQueue = new Queue("price-scraping", { connection });
// Add repeating task
await priceQueue.add(
"scrape-binance",
{ symbols: ["BTCUSDT", "ETHUSDT"] },
{
repeat: { pattern: "*/5 * * * *", tz: "UTC" },
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
}
);
// Worker — can be on different server
const worker = new Worker("price-scraping", async (job) => {
await scrapePrices(job.data.symbols);
}, { connection, concurrency: 2 });
worker.on("failed", (job, err) => {
logger.error(`Job ${job?.id} failed`, { err, attempts: job?.attemptsMade });
});
Advantages: retry with backoff, multiple workers for parallel processing, built-in dashboard (Bull Board), persistent queue in Redis — tasks not lost on restart.
Kubernetes CronJob
For cloud-native infrastructure:
apiVersion: batch/v1
kind: CronJob
metadata:
name: price-scraper
spec:
schedule: "*/5 * * * *"
timeZone: "UTC"
concurrencyPolicy: Forbid # Don't run if previous still working
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: scraper
image: your-registry/scraper:latest
env:
- name: SCRAPE_TYPE
value: "prices"
resources:
limits:
memory: "512Mi"
cpu: "500m"
concurrencyPolicy: Forbid — equivalent of flock, but at Kubernetes level. Allow — run in parallel. Replace — kill old and run new.
Monitoring and Alerting
Silently crashing cron is worse than no cron at all. Need to know about every failure.
Healthcheck ping (Deadman's switch pattern): each successfully executed task pings monitoring service. If ping doesn't come in expected time — alert.
Services: Cronitor, Healthchecks.io (open source, can self-host), Better Uptime.
import httpx
async def scrape_prices():
try:
# ... main logic ...
await httpx.get(f"https://hc-ping.com/{HEALTHCHECK_UUID}") # success ping
except Exception as e:
await httpx.get(f"https://hc-ping.com/{HEALTHCHECK_UUID}/fail")
raise
Prometheus metrics for more detailed monitoring:
-
scraper_last_success_timestamp— when last successfully executed -
scraper_duration_seconds— execution time (alert if spiked) -
scraper_items_collected— number of collected records (alert if 0)
Grafana dashboard with these metrics lets you see state of all tasks in seconds and historical trends.
Handling Dependencies Between Tasks
If task B should run after task A — simple time offsets in cron not sufficient (A can complete in different times). Right solution:
Event-driven: task A on completion publishes event to Kafka/Redis Streams, task B subscribed to this event.
DAG orchestration: Apache Airflow or Prefect for complex dependencies. Tasks — nodes in DAG, dependencies — edges. Airflow manages execution order, retry, logs.
# Prefect flow with dependencies
from prefect import flow, task
@task(retries=3, retry_delay_seconds=60)
def collect_raw_data(): ...
@task
def transform_data(raw): ...
@task
def load_to_db(transformed): ...
@flow(name="daily-etl")
def daily_pipeline():
raw = collect_raw_data()
transformed = transform_data(raw)
load_to_db(transformed)
For simple linear pipelines Prefect or Dagster — good balance of capabilities and complexity. Airflow more powerful, but requires more infrastructure.







