BullMQ Task Queue Setup for Node.js
BullMQ is a queue library for Node.js based on Redis. Supports priorities, delays, retry with exponential backoff, cron scheduling, Worker threads. Suitable for most tasks: emails, notifications, media processing, PDF generation.
Installation
npm install bullmq ioredis
Connection configuration
// lib/redis.ts
import { Redis } from 'ioredis';
export const redisConnection = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: Number(process.env.REDIS_PORT) || 6379,
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null, // required for BullMQ
enableReadyCheck: false,
});
Queue definition
// queues/index.ts
import { Queue } from 'bullmq';
import { redisConnection } from '../lib/redis';
const defaultJobOptions = {
attempts: 3,
backoff: {
type: 'exponential' as const,
delay: 5000, // 5s, 10s, 20s
},
removeOnComplete: { count: 1000, age: 86400 },
removeOnFail: { count: 5000, age: 604800 },
};
export const emailQueue = new Queue('emails', {
connection: redisConnection,
defaultJobOptions,
});
export const notificationQueue = new Queue('notifications', {
connection: redisConnection,
defaultJobOptions,
});
export const reportQueue = new Queue('reports', {
connection: redisConnection,
defaultJobOptions: {
...defaultJobOptions,
attempts: 1, // don't retry reports
},
});
Workers
// workers/emailWorker.ts
import { Worker, Job } from 'bullmq';
import { redisConnection } from '../lib/redis';
import { sendEmail } from '../services/email';
interface EmailJobData {
to: string;
subject: string;
template: string;
variables: Record<string, unknown>;
}
const worker = new Worker<EmailJobData>(
'emails',
async (job: Job<EmailJobData>) => {
const { to, subject, template, variables } = job.data;
await job.updateProgress(10);
await sendEmail({ to, subject, template, variables });
await job.updateProgress(100);
return { sent: true, to, timestamp: new Date().toISOString() };
},
{
connection: redisConnection,
concurrency: 10, // 10 parallel tasks
limiter: {
max: 100, // not more than 100 tasks
duration: 60_000, // per minute (rate limiting)
},
}
);
worker.on('completed', (job, result) => {
console.log(`Email sent to ${result.to}`);
});
worker.on('failed', (job, err) => {
console.error(`Email failed: job ${job?.id}:`, err.message);
});
worker.on('error', (err) => {
console.error('Worker error:', err);
});
export default worker;
Adding tasks
// Simple task
await emailQueue.add('welcome-email', {
to: user.email,
subject: 'Welcome!',
template: 'welcome',
variables: { name: user.name },
});
// With delay (5 minutes)
await emailQueue.add('follow-up-email', {
to: user.email,
subject: 'How are you?',
template: 'follow-up',
variables: { name: user.name },
}, {
delay: 5 * 60 * 1000,
});
// With priority (1 = highest)
await notificationQueue.add('push-notification', {
userId: user.id,
message: 'Urgent notification',
}, {
priority: 1,
});
// Bulk send
const jobs = users.map(user => ({
name: 'newsletter',
data: { to: user.email, template: 'newsletter' },
opts: { delay: Math.random() * 60_000 }, // spread over minute
}));
await emailQueue.addBulk(jobs);
Recurring tasks (Cron)
// Daily report at 9:00 UTC
await reportQueue.add(
'daily-report',
{ type: 'daily', recipients: ['[email protected]'] },
{
repeat: { pattern: '0 9 * * *' },
jobId: 'daily-report-unique', // prevent duplicates
}
);
// Every 5 minutes: health check
await emailQueue.add(
'health-check',
{},
{ repeat: { every: 5 * 60 * 1000 } }
);
// Get all recurring tasks
const repeatableJobs = await reportQueue.getRepeatableJobs();
// Remove
await reportQueue.removeRepeatableByKey(repeatableJobs[0].key);
BullMQ Board (monitoring)
// app.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue, notificationQueue, reportQueue } from './queues';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(notificationQueue),
new BullMQAdapter(reportQueue),
],
serverAdapter,
});
app.use('/admin/queues', authenticate, serverAdapter.getRouter());
Flow (dependent tasks)
import { FlowProducer } from 'bullmq';
const flow = new FlowProducer({ connection: redisConnection });
// Create tasks with dependencies: resize, then upload, then notify
await flow.add({
name: 'notify-user',
queueName: 'notifications',
data: { userId },
children: [{
name: 'upload-to-s3',
queueName: 'uploads',
data: { tempPath },
children: [{
name: 'resize-image',
queueName: 'images',
data: { originalPath, sizes: [200, 400, 800] },
}],
}],
});
Implementation timeline
BullMQ for typical Node.js project (emails, notifications, cron): 2–3 days. With BullBoard, monitoring and Flow: 3–4 days.







