Welcome to Chapter 8! In modern web applications, not all tasks can or should be handled synchronously within the main request-response cycle. Operations like sending emails, processing large image files, generating complex reports, or integrating with third-party APIs can be time-consuming. If these tasks block the main thread, they can lead to slow response times, poor user experience, and even timeouts, especially under heavy load. This is where background jobs and message queues become indispensable.
In this chapter, we will introduce the concept of asynchronous task processing using job queues. We’ll integrate BullMQ, a robust and performant queue system for Node.js, backed by Redis as our message broker. By the end of this chapter, you will have a clear understanding of how to offload long-running operations from your Fastify API, improving its responsiveness and scalability. We’ll build a simple example where an API endpoint dispatches a “job” to a queue, and a separate worker process picks up and executes that job asynchronously, simulating a resource-intensive task.
This chapter builds upon the existing Fastify application from previous chapters. You should have a working Fastify server, a PostgreSQL database, and a Docker Compose setup. We will extend our Docker Compose configuration to include a Redis service and then integrate BullMQ into our Fastify application and create a dedicated worker process. The expected outcome is a decoupled system where your API remains fast and responsive, while heavy tasks are handled efficiently in the background, making your application more resilient and scalable.
Planning & Design
To effectively handle background jobs, we need to design a system that separates the concerns of job creation (producer) and job execution (consumer/worker). Our Fastify application will act as the job producer, adding tasks to a Redis-backed queue. A separate Node.js process will function as the worker, continuously listening to the queue and processing jobs as they arrive.
Component Architecture
The following diagram illustrates the flow of a background job in our system:
Explanation:
- Client Application: Initiates a request to the Fastify API.
- Fastify API Server (Producer): Receives the request. If the request involves a long-running task, it quickly creates a job with necessary data and adds it to the Redis Queue. It then responds to the client immediately, without waiting for the job to complete.
- Redis Queue: Acts as a central message broker, storing jobs in a reliable queue.
BullMQuses Redis to manage queues, jobs, and workers. - Background Worker (Consumer): A separate Node.js process that continuously monitors the Redis Queue. When a job appears, the worker retrieves it, processes it, and handles its completion or failure.
- External Service / DB Update: Represents the actual heavy lifting (e.g., image processing, database operations, sending emails).
API Endpoints Design
For this chapter, we’ll introduce a new API endpoint to trigger a background job:
- POST /api/jobs/process-image
- Description: Simulates the initiation of an image processing task. It will accept an
imageUrlanduserIdin the request body, which will be passed to the background job. - Request Body:
{ "imageUrl": "https://example.com/image.jpg", "userId": "user123" } - Response:
{ "message": "Image processing job initiated", "jobId": "unique-job-id" }
- Description: Simulates the initiation of an image processing task. It will accept an
File Structure
We’ll organize our queue-related code within a new src/queue directory.
src/
├── config/
│ └── redis.ts # Redis connection configuration
├── plugins/
│ └── queue.ts # Fastify plugin for queue producers
├── queue/
│ ├── producers/
│ │ └── imageProcessor.ts # Defines the image processing queue and producer
│ └── workers/
│ └── imageProcessorWorker.ts # Defines the worker that processes image jobs
├── routes/
│ └── jobRoutes.ts # API routes to trigger jobs
├── app.ts # Main Fastify application
├── server.ts # Entry point for the Fastify server
└── worker.ts # Entry point for the background worker
Step-by-Step Implementation
a) Setup/Configuration
First, we need to add Redis to our Docker Compose setup and install the necessary Node.js packages.
1. Update docker-compose.yml for Redis:
Add a redis service to your docker-compose.yml. We’ll expose port 6379.
# docker-compose.yml
version: '3.8'
services:
# ... other services (e.g., postgres, api)
redis:
image: redis:7-alpine
container_name: myapp_redis
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis-data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 5
networks:
- app-network
volumes:
# ... other volumes
redis-data:
networks:
app-network:
driver: bridge
Explanation:
- We use the
redis:7-alpineimage for a lightweight Redis instance. container_namemakes it easy to reference.- Port
6379is mapped for local access. --appendonly yesenables AOF persistence for data durability.volumesmounts a named volume to persist Redis data.healthcheckensures Redis is ready before other services depend on it.app-networkallows services to communicate internally by name (e.g.,redis).
2. Install Dependencies:
We need ioredis for connecting to Redis and bullmq for queue management.
npm install ioredis bullmq
npm install --save-dev @types/ioredis
3. Add Environment Variables for Redis:
Update your .env file with Redis connection details.
# .env
# ... existing variables
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_DB=0
Explanation:
REDIS_HOSTisredisbecause that’s the service name indocker-compose.yml, allowing our Node.js app inside Docker to resolve it. If running Node.js locally outside Docker, this would belocalhost.REDIS_PORTis the default Redis port.REDIS_DBspecifies the Redis database to use.
4. Create Redis Configuration (src/config/redis.ts):
This file will provide a centralized Redis client instance for BullMQ.
// src/config/redis.ts
import { Redis } from 'ioredis';
import { env } from './env';
import { logger } from './logger';
/**
* Creates and returns a Redis client instance.
* BullMQ requires a dedicated Redis connection for the queue and another for the worker.
* We'll export a connection options object rather than a direct client instance,
* as BullMQ prefers to manage its own connections.
*
* @returns {RedisOptions} Redis connection options.
*/
export const getRedisConnectionOptions = () => {
const redisOptions = {
host: env.REDIS_HOST,
port: env.REDIS_PORT,
db: env.REDIS_DB,
maxRetriesPerRequest: null, // Important for ioredis in BullMQ context
enableReadyCheck: false, // Prevents ioredis from blocking if Redis is not immediately available
};
logger.info(`Connecting to Redis at ${redisOptions.host}:${redisOptions.port}/${redisOptions.db}`);
return redisOptions;
};
/**
* Creates a new ioredis client instance.
* Useful for direct Redis operations outside of BullMQ's managed connections.
* @returns {Redis} A new Redis client instance.
*/
export const createRedisClient = (): Redis => {
const options = getRedisConnectionOptions();
const client = new Redis(options);
client.on('error', (err) => {
logger.error(`Redis client error: ${err.message}`, { error: err });
});
client.on('connect', () => {
logger.info('Redis client connected successfully.');
});
client.on('ready', () => {
logger.info('Redis client is ready.');
});
client.on('end', () => {
logger.warn('Redis client disconnected.');
});
return client;
};
Explanation:
getRedisConnectionOptions: Provides a configuration object suitable forBullMQ.maxRetriesPerRequest: nullandenableReadyCheck: falseare common recommendations foriorediswhen used withBullMQto prevent connection issues.createRedisClient: A utility function to create a directioredisclient, which can be useful for other Redis operations, butBullMQwill create its own connections using the options we provide.- Includes robust logging for connection status and errors, which is crucial for production environments.
b) Core Implementation
Now we’ll implement the producer (Fastify API) and the consumer (worker).
1. Define the Queue Producer (src/queue/producers/imageProcessor.ts):
This file will define our queue and a function to add jobs to it.
// src/queue/producers/imageProcessor.ts
import { Queue, Job } from 'bullmq';
import { getRedisConnectionOptions } from '../../config/redis';
import { logger } from '../../config/logger';
// Define the name of our queue
export const IMAGE_PROCESSING_QUEUE_NAME = 'image-processing-queue';
// Create a new BullMQ Queue instance
// This queue will be used by the API server (producer) to add jobs.
export const imageProcessingQueue = new Queue(IMAGE_PROCESSING_QUEUE_NAME, {
connection: getRedisConnectionOptions(),
defaultJobOptions: {
attempts: 3, // Retry failed jobs up to 3 times
backoff: {
type: 'exponential', // Exponential backoff for retries
delay: 1000, // Initial delay of 1 second
},
removeOnComplete: {
count: 1000, // Keep last 1000 completed jobs
},
removeOnFail: {
count: 5000, // Keep last 5000 failed jobs
},
},
});
// Log queue events for monitoring
imageProcessingQueue.on('error', (err) => {
logger.error(`Image processing queue error: ${err.message}`, { error: err });
});
imageProcessingQueue.on('added', (jobId, name) => {
logger.info(`Job ${jobId} (name: ${name}) added to ${IMAGE_PROCESSING_QUEUE_NAME}`);
});
/**
* Adds an image processing job to the queue.
* @param {string} imageUrl - The URL of the image to process.
* @param {string} userId - The ID of the user initiating the job.
* @returns {Promise<Job>} The BullMQ Job instance.
*/
export async function addImageProcessingJob(imageUrl: string, userId: string): Promise<Job> {
const jobName = `process-image-${userId}-${Date.now()}`;
logger.info(`Attempting to add job '${jobName}' for user '${userId}' with image '${imageUrl}' to queue.`);
try {
const job = await imageProcessingQueue.add(jobName, { imageUrl, userId }, {
// Specific job options can override defaultJobOptions
delay: 0, // Process immediately
priority: 5, // Lower number means higher priority
});
logger.info(`Successfully added job ${job.id} to queue '${IMAGE_PROCESSING_QUEUE_NAME}'.`);
return job;
} catch (error) {
logger.error(`Failed to add image processing job: ${error.message}`, { error, imageUrl, userId });
throw new Error('Failed to add image processing job to queue.');
}
}
Explanation:
IMAGE_PROCESSING_QUEUE_NAME: A constant to ensure consistency across producer and worker.imageProcessingQueue: TheBullMQQueueinstance. It connects to Redis using ourgetRedisConnectionOptions.defaultJobOptions: Crucial for production.attempts: How many times a failed job should be retried.backoff: Strategy for retrying (e.g., exponential delay).removeOnComplete,removeOnFail: Important for managing the queue’s memory footprint by automatically removing old job records.
- Event listeners: Log errors and job additions for monitoring.
addImageProcessingJob: An async function that adds a job to the queue. It takesimageUrlanduserIdas data. Includes robust error handling and logging.
2. Create a Fastify Plugin for Queue Producers (src/plugins/queue.ts):
To make our queue producer easily accessible throughout our Fastify application, we’ll register it as a plugin.
// src/plugins/queue.ts
import fp from 'fastify-plugin';
import { FastifyInstance } from 'fastify';
import { imageProcessingQueue, addImageProcessingJob } from '../queue/producers/imageProcessor';
import { logger } from '../config/logger';
declare module 'fastify' {
interface FastifyInstance {
queues: {
imageProcessing: {
addJob: typeof addImageProcessingJob;
queueInstance: typeof imageProcessingQueue;
};
// Add other queues here as your application grows
};
}
}
/**
* Fastify plugin to register BullMQ queues and their producers.
* This makes queue functionalities available via `fastify.queues`.
*/
export default fp(async (fastify: FastifyInstance) => {
fastify.decorate('queues', {
imageProcessing: {
addJob: addImageProcessingJob,
queueInstance: imageProcessingQueue,
},
});
// Ensure queues are gracefully closed on application shutdown
fastify.addHook('onClose', async () => {
logger.info('Closing BullMQ image processing queue...');
try {
await imageProcessingQueue.close();
logger.info('BullMQ image processing queue closed.');
} catch (error) {
logger.error(`Error closing image processing queue: ${error.message}`, { error });
}
});
logger.info('BullMQ queue plugin loaded.');
}, {
name: 'queue-plugin',
dependencies: [], // No specific plugin dependencies for now
});
Explanation:
fp from 'fastify-plugin': Ensures our plugin is properly encapsulated and doesn’t interfere with other plugins.declare module 'fastify': Extends theFastifyInstanceinterface to add aqueuesproperty, providing type safety and auto-completion for our queue producers.fastify.decorate('queues', ...): Attaches ourimageProcessingQueueandaddImageProcessingJobfunction to the Fastify instance, making them accessible viafastify.queues.imageProcessing.addJob.fastify.addHook('onClose', ...): Crucial for graceful shutdown. When the Fastify server stops, we ensure theBullMQqueue connection is properly closed to prevent resource leaks.
3. Integrate the Queue Plugin into src/app.ts:
Register the newly created plugin in your main Fastify application.
// src/app.ts
import { FastifyInstance, FastifyPluginAsync } from 'fastify';
import AutoLoad from '@fastify/autoload';
import { join } from 'path';
import { env } from './config/env';
import { logger } from './config/logger';
import appConfig from './config/app';
// ... other imports
// Import the new queue plugin
import queuePlugin from './plugins/queue';
const app: FastifyPluginAsync = async (fastify: FastifyInstance) => {
// ... existing plugin registrations
// Register the queue plugin
// Make sure it's registered before any routes that might use it
fastify.register(queuePlugin);
logger.info('Queue plugin registered.');
// This loads all plugins defined in your `plugins` directory
// Pass an options object with `cwd` to ensure correct path resolution
fastify.register(AutoLoad, {
dir: join(__dirname, 'plugins'),
options: Object.assign({}, appConfig),
ignoreFilter: (path: string) => path === join(__dirname, 'plugins', 'queue.ts'), // Ignore if already registered
});
// This loads all plugins defined in your `routes` directory
// Pass an options object with `cwd` to ensure correct path resolution
fastify.register(AutoLoad, {
dir: join(__dirname, 'routes'),
options: Object.assign({}, appConfig),
});
// ... existing error handling, root route, etc.
};
export default app;
Explanation:
- We explicitly
importandregisterqueuePluginbeforeAutoLoadfor other plugins and routes. This ensures thatfastify.queuesis available when routes that depend on it are loaded. - Added
ignoreFiltertoAutoLoadfor thepluginsdirectory to prevent double-registration ifqueuePluginis also insrc/plugins.
4. Create API Route to Dispatch Job (src/routes/jobRoutes.ts):
This route will receive requests and use our addImageProcessingJob function to add tasks to the queue.
// src/routes/jobRoutes.ts
import { FastifyInstance, FastifyPluginAsync } from 'fastify';
import { Static, Type } from '@sinclair/typebox';
import { logger } from '../config/logger';
// Define the schema for the request body using TypeBox
const ImageProcessingJobBodySchema = Type.Object({
imageUrl: Type.String({ format: 'uri', description: 'URL of the image to process' }),
userId: Type.String({ minLength: 1, description: 'ID of the user initiating the job' }),
});
type ImageProcessingJobBody = Static<typeof ImageProcessingJobBodySchema>;
const jobRoutes: FastifyPluginAsync = async (fastify: FastifyInstance) => {
fastify.post<{ Body: ImageProcessingJobBody }>(
'/process-image',
{
schema: {
body: ImageProcessingJobBodySchema,
response: {
202: Type.Object({
message: Type.String(),
jobId: Type.String(),
}),
},
},
preHandler: fastify.auth([fastify.verifyJWT]), // Assuming authentication is required
},
async (request, reply) => {
const { imageUrl, userId } = request.body;
logger.info(`Received request to process image for user ${userId}: ${imageUrl}`);
try {
// Use the decorated queue producer to add a job
const job = await fastify.queues.imageProcessing.addJob(imageUrl, userId);
reply.status(202).send({
message: 'Image processing job initiated',
jobId: job.id!, // job.id is guaranteed to be present after successful addition
});
} catch (error) {
logger.error(`Error initiating image processing job: ${error.message}`, { error, imageUrl, userId });
// Centralized error handler will catch this, but we throw for clarity
throw fastify.httpErrors.internalServerError('Failed to initiate image processing job.');
}
}
);
};
export default jobRoutes;
Explanation:
- Uses
TypeBoxfor request body validation, ensuringimageUrlis a valid URI anduserIdis present. This is a critical security and data integrity best practice. preHandler: fastify.auth([fastify.verifyJWT]): Assumes you have an authentication plugin (verifyJWT) from previous chapters to secure this endpoint.fastify.queues.imageProcessing.addJob(imageUrl, userId): This is how we access our decorated queue producer.- Responds with
202 Acceptedimmediately, indicating that the request has been accepted for processing but not yet completed. This is the correct HTTP status for asynchronous operations. - Includes comprehensive logging for request reception and error handling.
5. Create the Queue Worker (src/queue/workers/imageProcessorWorker.ts):
This separate Node.js script will run independently and consume jobs from the queue.
// src/queue/workers/imageProcessorWorker.ts
import { Worker, Job } from 'bullmq';
import { getRedisConnectionOptions } from '../../config/redis';
import { logger } from '../../config/logger';
import { IMAGE_PROCESSING_QUEUE_NAME } from '../producers/imageProcessor';
/**
* Defines the image processing worker.
* This worker will listen to the 'image-processing-queue' and execute jobs.
*/
export const imageProcessingWorker = new Worker(
IMAGE_PROCESSING_QUEUE_NAME,
async (job: Job) => {
const { imageUrl, userId } = job.data;
logger.info(`Worker ${job.id} started processing image '${imageUrl}' for user '${userId}'.`);
// Simulate a long-running task (e.g., image resizing, manipulation, API call)
try {
// Example: Download image, resize, upload to S3, update DB
await new Promise(resolve => setTimeout(resolve, Math.random() * 5000 + 1000)); // Simulate 1-6 second task
// Simulate a random failure for demonstration purposes
if (Math.random() < 0.1) { // 10% chance of failure
throw new Error('Simulated image processing failure.');
}
logger.info(`Worker ${job.id} successfully processed image '${imageUrl}' for user '${userId}'.`);
// You might update a database here, send a notification, etc.
// job.updateProgress(100); // Update job progress if needed
return { status: 'completed', processedAt: new Date(), originalUrl: imageUrl };
} catch (error) {
logger.error(`Worker ${job.id} failed to process image '${imageUrl}' for user '${userId}': ${error.message}`, { error, jobData: job.data });
// BullMQ will automatically retry the job based on 'attempts' option
throw error; // Re-throw to mark job as failed for BullMQ to handle retries
}
},
{
connection: getRedisConnectionOptions(),
concurrency: 5, // Process up to 5 jobs concurrently
}
);
// Log worker events for monitoring and debugging
imageProcessingWorker.on('completed', (job) => {
logger.info(`Job ${job.id} completed successfully.`);
});
imageProcessingWorker.on('failed', (job, err) => {
logger.error(`Job ${job?.id} failed with error: ${err.message}`, { error: err, jobData: job?.data });
});
imageProcessingWorker.on('active', (job) => {
logger.info(`Job ${job.id} is active and being processed.`);
});
imageProcessingWorker.on('stalled', (jobId) => {
logger.warn(`Job ${jobId} has stalled.`);
});
imageProcessingWorker.on('error', (err) => {
logger.error(`Worker error: ${err.message}`, { error: err });
});
logger.info(`Image processing worker initialized, listening on queue '${IMAGE_PROCESSING_QUEUE_NAME}'.`);
// Export a function to start and stop the worker
export const startImageProcessingWorker = () => {
logger.info('Starting image processing worker...');
// The worker starts listening to jobs as soon as it's instantiated
// No explicit 'start' method needed for BullMQ Worker
};
export const stopImageProcessingWorker = async () => {
logger.info('Stopping image processing worker...');
try {
await imageProcessingWorker.close();
logger.info('Image processing worker stopped gracefully.');
} catch (error) {
logger.error(`Error stopping image processing worker: ${error.message}`, { error });
}
};
Explanation:
Worker(IMAGE_PROCESSING_QUEUE_NAME, async (job) => { ... }, { connection, concurrency }): The core of our worker.- The first argument is the queue name, matching the producer.
- The second argument is an async function that defines the job processing logic. It receives the
jobobject, which containsjob.data(ourimageUrlanduserId). - The third argument is options, including
connection(same Redis options) andconcurrency(how many jobs the worker can process in parallel).
- Simulated Task: We use
setTimeoutto mimic a long-running, asynchronous operation. A random failure is also introduced to demonstrateBullMQ’s retry mechanism. - Error Handling: If the job processing logic throws an error,
BullMQwill catch it and manage retries based on theattemptsandbackoffoptions defined in the queue producer. - Event Listeners: Crucial for monitoring the worker’s activity, job completions, failures, and stalls. This provides visibility into your background processing.
startImageProcessingWorkerandstopImageProcessingWorker: Provides methods to manage the worker’s lifecycle, especially useful for graceful shutdowns.
6. Create a separate worker entry point (worker.ts):
This file will be the main entry point for running our background worker.
// worker.ts
import { startImageProcessingWorker, stopImageProcessingWorker } from './src/queue/workers/imageProcessorWorker';
import { logger } from './src/config/logger';
async function main() {
logger.info('Starting background worker application...');
startImageProcessingWorker();
// Handle graceful shutdown
process.on('SIGTERM', async () => {
logger.info('SIGTERM signal received. Shutting down worker gracefully...');
await stopImageProcessingWorker();
process.exit(0);
});
process.on('SIGINT', async () => {
logger.info('SIGINT signal received. Shutting down worker gracefully...');
await stopImageProcessingWorker();
process.exit(0);
});
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', { promise, reason });
});
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', { error });
process.exit(1); // Exit process after uncaught exception
});
}
main().catch((error) => {
logger.fatal(`Worker application failed to start: ${error.message}`, { error });
process.exit(1);
});
Explanation:
- This is a standalone script that initializes and starts the
imageProcessingWorker. - Graceful Shutdown: It includes
SIGTERMandSIGINT(Ctrl+C) handlers to ensure that the worker can shut down cleanly, closing its Redis connections before exiting. This is vital for preventing orphaned jobs or connection leaks in production. - Unhandled Errors: Catches
unhandledRejectionanduncaughtExceptionto log critical errors and prevent the worker from crashing silently.
7. Add npm scripts to run the worker:
Update your package.json to easily start the worker.
// package.json
{
"name": "your-app-name",
"version": "1.0.0",
"description": "",
"main": "server.ts",
"scripts": {
"dev": "npm run build && concurrently \"npm run watch:server\" \"npm run watch:worker\"",
"build": "rimraf dist && tsc",
"start": "node dist/server.js",
"start:worker": "node dist/worker.js",
"watch:server": "tsc-watch --onSuccess \"node dist/server.js\"",
"watch:worker": "tsc-watch --project tsconfig.worker.json --onSuccess \"node dist/worker.js\"",
"test": "jest",
"lint": "eslint . --ext .ts",
"lint:fix": "eslint . --ext .ts --fix"
},
// ... rest of package.json
}
Explanation:
start:worker: A new script to run the compiled worker.watch:worker: A development script that watches for changes in worker-related files and restarts it. We’ll need a separatetsconfig.worker.jsonto compileworker.tsand its dependencies.
8. Create tsconfig.worker.json:
This separate TypeScript configuration ensures that worker.ts and its dependencies (like src/queue/workers) are compiled correctly without interfering with the main server’s compilation.
// tsconfig.worker.json
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./",
"composite": true,
"tsBuildInfoFile": "./dist/.tsbuildinfo-worker"
},
"include": [
"worker.ts",
"src/config/**/*.ts",
"src/queue/**/*.ts"
],
"exclude": [
"node_modules",
"**/*.spec.ts",
"**/*.test.ts",
"src/server.ts",
"src/app.ts",
"src/plugins/**/*.ts",
"src/routes/**/*.ts",
"src/utils/**/*.ts"
]
}
Explanation:
extends: "./tsconfig.json": Inherits common settings from the maintsconfig.json.include: Explicitly listsworker.tsand its direct dependencies insrc/configandsrc/queue.exclude: Prevents compilation of server-specific files, keeping the worker build lean.composite: true: Enables project references, which is good practice for larger monorepos or when separating build concerns.tsBuildInfoFile: Generates a separate build info file for incremental compilation.
c) Testing This Component
To test our background job system, you need three main components running: Redis, the Fastify API server, and the background worker.
Start Docker Compose services (including Redis):
docker-compose up -d --buildVerify Redis is running:
docker-compose psshould showmyapp_redisasUp.Start the Fastify API Server:
npm run dev # Or `npm start` for production buildThis will start your API server, which acts as the job producer.
Start the Background Worker: In a separate terminal, run the worker:
npm run start:worker # Or `npm run watch:worker` for devYou should see logs indicating the worker has started and is listening to the queue.
Send a request to the API to dispatch a job: Use
curlor a tool like Postman/Insomnia. Ensure you provide a valid JWT token ifverifyJWTis enabled.curl -X POST http://localhost:PORT/api/jobs/process-image \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_JWT_TOKEN" \ -d '{ "imageUrl": "https://picsum.photos/200/300", "userId": "testuser-123" }'Replace
PORTwith your Fastify server’s port (e.g., 3000). ReplaceYOUR_JWT_TOKENwith a valid token obtained from your authentication endpoint.
Expected Behavior:
- API Server Terminal: You should see logs like
Received request to process image...andSuccessfully added job ... to queue 'image-processing-queue'. - API Response: You should receive a
202 Acceptedresponse with ajobId.{ "message": "Image processing job initiated", "jobId": "bullmq-job-id" } - Worker Terminal: Shortly after the API responds, you should see logs from the worker:
Job <jobId> is active and being processed.Worker <jobId> started processing image ...- After a few seconds (simulated delay):
Worker <jobId> successfully processed image ...(orJob <jobId> failed...if it hit the simulated 10% failure rate). - If a job fails, you might see it being retried based on your
attemptsconfiguration.
This confirms that your API successfully dispatches jobs and your worker successfully consumes and processes them asynchronously.
Production Considerations
Deploying a system with background jobs requires careful thought to ensure reliability, scalability, and maintainability.
Error Handling and Retries:
- Worker Resilience: The
attemptsandbackoffoptions inBullMQare critical. Configure them wisely. For transient errors (e.g., network issues), retries are good. For persistent errors (e.g., invalid data), too many retries can be wasteful. - Dead-Letter Queues (DLQ): While
BullMQdoesn’t have a built-in DLQ concept like some message brokers, you can implement one. Jobs that exhaust all retries are marked asfailed. You can then have a separate process orBullMQlistener that monitors failed jobs and moves them to a “dead-letter” queue for manual inspection or specific handling. - Idempotency: Ensure your worker jobs are idempotent. If a job is retried, executing it multiple times with the same input should produce the same result and not cause unintended side effects (e.g., sending the same email twice, double-charging a user).
- Worker Resilience: The
Performance Optimization and Scalability:
- Horizontal Scaling: Both the API (producers) and workers (consumers) can be scaled horizontally. Run multiple instances of your Fastify API and multiple instances of your worker process. Redis acts as the central coordination point.
- Concurrency: The
concurrencyoption in theWorkerconstructor controls how many jobs a single worker instance can process simultaneously. Tune this based on your worker’s resource usage (CPU, memory, I/O). - Batching: For certain types of tasks (e.g., sending many notifications), it might be more efficient to batch them into a single job rather than creating many small jobs.
- Monitoring Queue Length: Keep an eye on the number of pending jobs in your queue. A consistently growing queue indicates that your workers are not keeping up with the job production rate, suggesting you need more worker instances or better optimized job processing.
Security Considerations:
- Redis Security:
- Authentication: Always protect your Redis instance with a password (
requirepassinredis.conf). Update yourREDIS_PASSWORDenvironment variable andioredisoptions. - Network Access: Restrict network access to Redis only from your application servers and workers. Do not expose Redis to the public internet. Use VPC security groups or firewalls.
- TLS/SSL: For sensitive data, use TLS/SSL to encrypt communication between your application and Redis.
- Authentication: Always protect your Redis instance with a password (
- Job Payload Validation: Always validate the data (
job.data) received by your workers, just as you would validate API request bodies. Malicious or malformed data could lead to crashes or security vulnerabilities in your worker.
- Redis Security:
Logging and Monitoring:
- Centralized Logging: Ensure both your API server and worker processes send logs to a centralized logging system (e.g., ELK stack, Datadog, CloudWatch Logs). This is crucial for debugging and understanding the system’s behavior.
- Metrics: Monitor key metrics:
- Queue size: Number of pending, active, completed, failed jobs.
- Job processing time: Average and percentile processing times.
- Worker health: CPU, memory usage of worker processes.
- Redis metrics: Latency, memory usage, hit/miss ratio.
- BullMQ UI (Optional): BullMQ provides a UI (
@bull-board/apiand@bull-board/ui) that can be a valuable tool for visualizing queues, jobs, and worker status in real-time during development and production. Consider integrating it for better observability.
Code Review Checkpoint
At this point, you have successfully implemented a robust background job processing system using BullMQ and Redis.
New Files Created:
src/config/redis.ts: Centralized Redis connection configuration.src/queue/producers/imageProcessor.ts: Defines theimage-processing-queueand theaddImageProcessingJobfunction.src/plugins/queue.ts: Fastify plugin to decorate the Fastify instance with queue producers.src/routes/jobRoutes.ts: API route to trigger image processing jobs.src/queue/workers/imageProcessorWorker.ts: The worker logic for processing image jobs.worker.ts: The main entry point for running the background worker.tsconfig.worker.json: TypeScript configuration specifically for the worker.
Files Modified:
docker-compose.yml: Added aredisservice andredis-datavolume..env: AddedREDIS_HOST,REDIS_PORT,REDIS_DB.src/app.ts: Registered thequeuePlugin.package.json: Addedstart:workerandwatch:workerscripts.
This setup provides a clear separation of concerns: your API remains lean and responsive, focusing on receiving requests and dispatching tasks, while dedicated workers handle the heavy lifting asynchronously. This architecture significantly improves the scalability and resilience of your application, making it production-ready for handling various long-running operations.
Common Issues & Solutions
Issue: Redis connection errors (e.g.,
connect ECONNREFUSED)- Cause: The Node.js application (API or worker) cannot connect to the Redis server. This could be due to Redis not running, incorrect host/port in
.env, or firewall issues. - Solution:
- Ensure Redis is running:
docker-compose ps(if using Docker) or check your Redis server status. - Verify
REDIS_HOSTandREDIS_PORTin your.envfile are correct. Remember that if your Node.js app is running inside Docker,REDIS_HOSTshould be the service name (redis), notlocalhost. If your Node.js app is running locally outside Docker,REDIS_HOSTshould belocalhost. - Check Docker logs for the Redis container (
docker-compose logs redis) for any startup errors. - Ensure no firewall is blocking port
6379.
- Ensure Redis is running:
- Cause: The Node.js application (API or worker) cannot connect to the Redis server. This could be due to Redis not running, incorrect host/port in
Issue: Jobs are added to the queue, but the worker doesn’t process them.
- Cause: The worker process is not running, or it’s connected to a different Redis instance/database, or the queue name doesn’t match.
- Solution:
- Verify the worker process is running in a separate terminal:
npm run start:worker. Check its logs for any startup errors or connection issues. - Double-check that
IMAGE_PROCESSING_QUEUE_NAMEis identical in bothsrc/queue/producers/imageProcessor.tsandsrc/queue/workers/imageProcessorWorker.ts. - Ensure both the producer (API) and consumer (worker) are configured to connect to the same Redis instance and database (check
REDIS_HOST,REDIS_PORT,REDIS_DBin.env). - Check for any
BullMQerror events on the worker (e.g.,worker.on('error', ...)) that might indicate underlying issues.
- Verify the worker process is running in a separate terminal:
Issue: Jobs are failing, but no retries are happening, or jobs are stuck.
- Cause: Incorrect
defaultJobOptionsorjob.optionsfor retries, or a worker process is crashing without proper error handling. - Solution:
- Review
defaultJobOptionsinimageProcessingQueue(src/queue/producers/imageProcessor.ts). Ensureattemptsis set to a value greater than 1 andbackoffis configured. - Make sure your worker’s job processing function (
async (job: Job) => { ... }) correctly throws an error when a failure occurs. If it catches an error and doesn’t re-throw it,BullMQwon’t know the job failed and won’t retry. - Check worker logs for
uncaughtExceptionorunhandledRejectionerrors, which could indicate the worker is crashing unexpectedly. Implement robust error handling inworker.tsas shown. - If jobs are stuck in an
activestate indefinitely, the worker might have crashed mid-processing without marking the job as failed or completed.BullMQhas a “stalled” job detection mechanism, but it relies on worker health checks. Ensure Redis is stable and worker processes have enough resources.
- Review
- Cause: Incorrect
Testing & Verification
To ensure everything is correctly implemented and working as expected, perform the following end-to-end test:
Clean Up Previous Runs (Optional but Recommended): Stop all Docker containers and remove volumes to start fresh:
docker-compose down -vStart All Services:
docker-compose up -d --build npm run dev # Or `npm start` npm run start:workerEnsure all three terminals (Docker, API, Worker) are actively logging.
Trigger Multiple Jobs: Send several
POST /api/jobs/process-imagerequests (e.g., 5-10 requests, using differentuserIdvalues to distinguish them in logs).curl -X POST http://localhost:PORT/api/jobs/process-image -H "Content-Type: application/json" -H "Authorization: Bearer YOUR_JWT_TOKEN" -d '{"imageUrl": "https://picsum.photos/200/300", "userId": "user-A"}' # Repeat with user-B, user-C, etc.Verify Job Processing:
- API Logs: Confirm that each request results in a
Successfully added job ... to queue. - Worker Logs: Observe the worker terminal. You should see jobs being picked up (
Job ... is active), processed (Worker ... started processing), and eventually completed (Job ... completed successfully) or failed (Job ... failed with error). - Retries: If you hit the simulated 10% failure rate, you should see jobs failing and then being retried automatically by the worker, eventually succeeding or exhausting their
attempts. - Concurrency: If you send many jobs quickly, you should see the worker processing multiple jobs concurrently (up to the
concurrencylimit you set, e.g., 5).
- API Logs: Confirm that each request results in a
Graceful Shutdown Test:
- In the API server terminal, press
Ctrl+C. You should seeClosing BullMQ image processing queue...logs. - In the worker terminal, press
Ctrl+C. You should seeShutting down worker gracefully...andImage processing worker stopped gracefully.logs. - This confirms that your application and worker can shut down cleanly without leaving open connections or orphaned processes.
- In the API server terminal, press
This comprehensive testing approach helps ensure that your background job system is robust, handles failures, and scales correctly.
Summary & Next Steps
Congratulations! In this chapter, you’ve successfully implemented a crucial architectural pattern for modern web applications: handling long-running tasks with background jobs using BullMQ and Redis. You learned how to:
- Integrate Redis into your Docker Compose setup.
- Configure
BullMQfor robust queue management. - Create job producers within your Fastify API to dispatch tasks asynchronously.
- Develop separate worker processes to consume and execute these tasks.
- Implement production-ready practices like graceful shutdowns, error handling, retries, and comprehensive logging.
By offloading compute-intensive or time-consuming operations to background workers, your Fastify API remains responsive, providing a better user experience and significantly improving the scalability and resilience of your entire application. This decoupling is a cornerstone of building microservices and distributed systems.
In the next chapter, Chapter 9: Implementing Caching for Performance, we will explore another critical performance optimization technique: caching. We’ll learn how to use Redis as a cache store to reduce database load and speed up frequently accessed data, further enhancing your application’s responsiveness and efficiency.