Welcome to Chapter 8! In modern web applications, not all tasks can or should be handled synchronously within the main request-response cycle. Operations like sending emails, processing large image files, generating complex reports, or integrating with third-party APIs can be time-consuming. If these tasks block the main thread, they can lead to slow response times, poor user experience, and even timeouts, especially under heavy load. This is where background jobs and message queues become indispensable.

In this chapter, we will introduce the concept of asynchronous task processing using job queues. We’ll integrate BullMQ, a robust and performant queue system for Node.js, backed by Redis as our message broker. By the end of this chapter, you will have a clear understanding of how to offload long-running operations from your Fastify API, improving its responsiveness and scalability. We’ll build a simple example where an API endpoint dispatches a “job” to a queue, and a separate worker process picks up and executes that job asynchronously, simulating a resource-intensive task.

This chapter builds upon the existing Fastify application from previous chapters. You should have a working Fastify server, a PostgreSQL database, and a Docker Compose setup. We will extend our Docker Compose configuration to include a Redis service and then integrate BullMQ into our Fastify application and create a dedicated worker process. The expected outcome is a decoupled system where your API remains fast and responsive, while heavy tasks are handled efficiently in the background, making your application more resilient and scalable.

Planning & Design

To effectively handle background jobs, we need to design a system that separates the concerns of job creation (producer) and job execution (consumer/worker). Our Fastify application will act as the job producer, adding tasks to a Redis-backed queue. A separate Node.js process will function as the worker, continuously listening to the queue and processing jobs as they arrive.

Component Architecture

The following diagram illustrates the flow of a background job in our system:

flowchart LR Client[Client Application] --->|HTTP Request| FastifyAPI[Fastify API Server] FastifyAPI --->|Add Job to Queue| RedisQueue[Redis Queue] RedisQueue --->|Job Available| Worker[Background Worker] Worker --->|Process Job| ExternalService[External Service] ExternalService --->|Result Status| RedisQueue Worker --> Optional --->|Complete or Fail| FastifyAPI

Explanation:

  • Client Application: Initiates a request to the Fastify API.
  • Fastify API Server (Producer): Receives the request. If the request involves a long-running task, it quickly creates a job with necessary data and adds it to the Redis Queue. It then responds to the client immediately, without waiting for the job to complete.
  • Redis Queue: Acts as a central message broker, storing jobs in a reliable queue. BullMQ uses Redis to manage queues, jobs, and workers.
  • Background Worker (Consumer): A separate Node.js process that continuously monitors the Redis Queue. When a job appears, the worker retrieves it, processes it, and handles its completion or failure.
  • External Service / DB Update: Represents the actual heavy lifting (e.g., image processing, database operations, sending emails).

API Endpoints Design

For this chapter, we’ll introduce a new API endpoint to trigger a background job:

  • POST /api/jobs/process-image
    • Description: Simulates the initiation of an image processing task. It will accept an imageUrl and userId in the request body, which will be passed to the background job.
    • Request Body:
      {
          "imageUrl": "https://example.com/image.jpg",
          "userId": "user123"
      }
      
    • Response:
      {
          "message": "Image processing job initiated",
          "jobId": "unique-job-id"
      }
      

File Structure

We’ll organize our queue-related code within a new src/queue directory.

src/
├── config/
│   └── redis.ts         # Redis connection configuration
├── plugins/
│   └── queue.ts         # Fastify plugin for queue producers
├── queue/
│   ├── producers/
│   │   └── imageProcessor.ts # Defines the image processing queue and producer
│   └── workers/
│       └── imageProcessorWorker.ts # Defines the worker that processes image jobs
├── routes/
│   └── jobRoutes.ts     # API routes to trigger jobs
├── app.ts               # Main Fastify application
├── server.ts            # Entry point for the Fastify server
└── worker.ts            # Entry point for the background worker

Step-by-Step Implementation

a) Setup/Configuration

First, we need to add Redis to our Docker Compose setup and install the necessary Node.js packages.

1. Update docker-compose.yml for Redis:

Add a redis service to your docker-compose.yml. We’ll expose port 6379.

# docker-compose.yml
version: '3.8'

services:
  # ... other services (e.g., postgres, api)

  redis:
    image: redis:7-alpine
    container_name: myapp_redis
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 5s
      retries: 5
    networks:
      - app-network

volumes:
  # ... other volumes
  redis-data:

networks:
  app-network:
    driver: bridge

Explanation:

  • We use the redis:7-alpine image for a lightweight Redis instance.
  • container_name makes it easy to reference.
  • Port 6379 is mapped for local access.
  • --appendonly yes enables AOF persistence for data durability.
  • volumes mounts a named volume to persist Redis data.
  • healthcheck ensures Redis is ready before other services depend on it.
  • app-network allows services to communicate internally by name (e.g., redis).

2. Install Dependencies:

We need ioredis for connecting to Redis and bullmq for queue management.

npm install ioredis bullmq
npm install --save-dev @types/ioredis

3. Add Environment Variables for Redis:

Update your .env file with Redis connection details.

# .env
# ... existing variables

REDIS_HOST=redis
REDIS_PORT=6379
REDIS_DB=0

Explanation:

  • REDIS_HOST is redis because that’s the service name in docker-compose.yml, allowing our Node.js app inside Docker to resolve it. If running Node.js locally outside Docker, this would be localhost.
  • REDIS_PORT is the default Redis port.
  • REDIS_DB specifies the Redis database to use.

4. Create Redis Configuration (src/config/redis.ts):

This file will provide a centralized Redis client instance for BullMQ.

// src/config/redis.ts
import { Redis } from 'ioredis';
import { env } from './env';
import { logger } from './logger';

/**
 * Creates and returns a Redis client instance.
 * BullMQ requires a dedicated Redis connection for the queue and another for the worker.
 * We'll export a connection options object rather than a direct client instance,
 * as BullMQ prefers to manage its own connections.
 *
 * @returns {RedisOptions} Redis connection options.
 */
export const getRedisConnectionOptions = () => {
  const redisOptions = {
    host: env.REDIS_HOST,
    port: env.REDIS_PORT,
    db: env.REDIS_DB,
    maxRetriesPerRequest: null, // Important for ioredis in BullMQ context
    enableReadyCheck: false, // Prevents ioredis from blocking if Redis is not immediately available
  };

  logger.info(`Connecting to Redis at ${redisOptions.host}:${redisOptions.port}/${redisOptions.db}`);
  return redisOptions;
};

/**
 * Creates a new ioredis client instance.
 * Useful for direct Redis operations outside of BullMQ's managed connections.
 * @returns {Redis} A new Redis client instance.
 */
export const createRedisClient = (): Redis => {
  const options = getRedisConnectionOptions();
  const client = new Redis(options);

  client.on('error', (err) => {
    logger.error(`Redis client error: ${err.message}`, { error: err });
  });

  client.on('connect', () => {
    logger.info('Redis client connected successfully.');
  });

  client.on('ready', () => {
    logger.info('Redis client is ready.');
  });

  client.on('end', () => {
    logger.warn('Redis client disconnected.');
  });

  return client;
};

Explanation:

  • getRedisConnectionOptions: Provides a configuration object suitable for BullMQ. maxRetriesPerRequest: null and enableReadyCheck: false are common recommendations for ioredis when used with BullMQ to prevent connection issues.
  • createRedisClient: A utility function to create a direct ioredis client, which can be useful for other Redis operations, but BullMQ will create its own connections using the options we provide.
  • Includes robust logging for connection status and errors, which is crucial for production environments.

b) Core Implementation

Now we’ll implement the producer (Fastify API) and the consumer (worker).

1. Define the Queue Producer (src/queue/producers/imageProcessor.ts):

This file will define our queue and a function to add jobs to it.

// src/queue/producers/imageProcessor.ts
import { Queue, Job } from 'bullmq';
import { getRedisConnectionOptions } from '../../config/redis';
import { logger } from '../../config/logger';

// Define the name of our queue
export const IMAGE_PROCESSING_QUEUE_NAME = 'image-processing-queue';

// Create a new BullMQ Queue instance
// This queue will be used by the API server (producer) to add jobs.
export const imageProcessingQueue = new Queue(IMAGE_PROCESSING_QUEUE_NAME, {
  connection: getRedisConnectionOptions(),
  defaultJobOptions: {
    attempts: 3, // Retry failed jobs up to 3 times
    backoff: {
      type: 'exponential', // Exponential backoff for retries
      delay: 1000, // Initial delay of 1 second
    },
    removeOnComplete: {
      count: 1000, // Keep last 1000 completed jobs
    },
    removeOnFail: {
      count: 5000, // Keep last 5000 failed jobs
    },
  },
});

// Log queue events for monitoring
imageProcessingQueue.on('error', (err) => {
  logger.error(`Image processing queue error: ${err.message}`, { error: err });
});

imageProcessingQueue.on('added', (jobId, name) => {
  logger.info(`Job ${jobId} (name: ${name}) added to ${IMAGE_PROCESSING_QUEUE_NAME}`);
});

/**
 * Adds an image processing job to the queue.
 * @param {string} imageUrl - The URL of the image to process.
 * @param {string} userId - The ID of the user initiating the job.
 * @returns {Promise<Job>} The BullMQ Job instance.
 */
export async function addImageProcessingJob(imageUrl: string, userId: string): Promise<Job> {
  const jobName = `process-image-${userId}-${Date.now()}`;
  logger.info(`Attempting to add job '${jobName}' for user '${userId}' with image '${imageUrl}' to queue.`);

  try {
    const job = await imageProcessingQueue.add(jobName, { imageUrl, userId }, {
      // Specific job options can override defaultJobOptions
      delay: 0, // Process immediately
      priority: 5, // Lower number means higher priority
    });
    logger.info(`Successfully added job ${job.id} to queue '${IMAGE_PROCESSING_QUEUE_NAME}'.`);
    return job;
  } catch (error) {
    logger.error(`Failed to add image processing job: ${error.message}`, { error, imageUrl, userId });
    throw new Error('Failed to add image processing job to queue.');
  }
}

Explanation:

  • IMAGE_PROCESSING_QUEUE_NAME: A constant to ensure consistency across producer and worker.
  • imageProcessingQueue: The BullMQ Queue instance. It connects to Redis using our getRedisConnectionOptions.
  • defaultJobOptions: Crucial for production.
    • attempts: How many times a failed job should be retried.
    • backoff: Strategy for retrying (e.g., exponential delay).
    • removeOnComplete, removeOnFail: Important for managing the queue’s memory footprint by automatically removing old job records.
  • Event listeners: Log errors and job additions for monitoring.
  • addImageProcessingJob: An async function that adds a job to the queue. It takes imageUrl and userId as data. Includes robust error handling and logging.

2. Create a Fastify Plugin for Queue Producers (src/plugins/queue.ts):

To make our queue producer easily accessible throughout our Fastify application, we’ll register it as a plugin.

// src/plugins/queue.ts
import fp from 'fastify-plugin';
import { FastifyInstance } from 'fastify';
import { imageProcessingQueue, addImageProcessingJob } from '../queue/producers/imageProcessor';
import { logger } from '../config/logger';

declare module 'fastify' {
  interface FastifyInstance {
    queues: {
      imageProcessing: {
        addJob: typeof addImageProcessingJob;
        queueInstance: typeof imageProcessingQueue;
      };
      // Add other queues here as your application grows
    };
  }
}

/**
 * Fastify plugin to register BullMQ queues and their producers.
 * This makes queue functionalities available via `fastify.queues`.
 */
export default fp(async (fastify: FastifyInstance) => {
  fastify.decorate('queues', {
    imageProcessing: {
      addJob: addImageProcessingJob,
      queueInstance: imageProcessingQueue,
    },
  });

  // Ensure queues are gracefully closed on application shutdown
  fastify.addHook('onClose', async () => {
    logger.info('Closing BullMQ image processing queue...');
    try {
      await imageProcessingQueue.close();
      logger.info('BullMQ image processing queue closed.');
    } catch (error) {
      logger.error(`Error closing image processing queue: ${error.message}`, { error });
    }
  });

  logger.info('BullMQ queue plugin loaded.');
}, {
  name: 'queue-plugin',
  dependencies: [], // No specific plugin dependencies for now
});

Explanation:

  • fp from 'fastify-plugin': Ensures our plugin is properly encapsulated and doesn’t interfere with other plugins.
  • declare module 'fastify': Extends the FastifyInstance interface to add a queues property, providing type safety and auto-completion for our queue producers.
  • fastify.decorate('queues', ...): Attaches our imageProcessingQueue and addImageProcessingJob function to the Fastify instance, making them accessible via fastify.queues.imageProcessing.addJob.
  • fastify.addHook('onClose', ...): Crucial for graceful shutdown. When the Fastify server stops, we ensure the BullMQ queue connection is properly closed to prevent resource leaks.

3. Integrate the Queue Plugin into src/app.ts:

Register the newly created plugin in your main Fastify application.

// src/app.ts
import { FastifyInstance, FastifyPluginAsync } from 'fastify';
import AutoLoad from '@fastify/autoload';
import { join } from 'path';
import { env } from './config/env';
import { logger } from './config/logger';
import appConfig from './config/app';
// ... other imports

// Import the new queue plugin
import queuePlugin from './plugins/queue';

const app: FastifyPluginAsync = async (fastify: FastifyInstance) => {
  // ... existing plugin registrations

  // Register the queue plugin
  // Make sure it's registered before any routes that might use it
  fastify.register(queuePlugin);
  logger.info('Queue plugin registered.');

  // This loads all plugins defined in your `plugins` directory
  // Pass an options object with `cwd` to ensure correct path resolution
  fastify.register(AutoLoad, {
    dir: join(__dirname, 'plugins'),
    options: Object.assign({}, appConfig),
    ignoreFilter: (path: string) => path === join(__dirname, 'plugins', 'queue.ts'), // Ignore if already registered
  });

  // This loads all plugins defined in your `routes` directory
  // Pass an options object with `cwd` to ensure correct path resolution
  fastify.register(AutoLoad, {
    dir: join(__dirname, 'routes'),
    options: Object.assign({}, appConfig),
  });

  // ... existing error handling, root route, etc.
};

export default app;

Explanation:

  • We explicitly import and register queuePlugin before AutoLoad for other plugins and routes. This ensures that fastify.queues is available when routes that depend on it are loaded.
  • Added ignoreFilter to AutoLoad for the plugins directory to prevent double-registration if queuePlugin is also in src/plugins.

4. Create API Route to Dispatch Job (src/routes/jobRoutes.ts):

This route will receive requests and use our addImageProcessingJob function to add tasks to the queue.

// src/routes/jobRoutes.ts
import { FastifyInstance, FastifyPluginAsync } from 'fastify';
import { Static, Type } from '@sinclair/typebox';
import { logger } from '../config/logger';

// Define the schema for the request body using TypeBox
const ImageProcessingJobBodySchema = Type.Object({
  imageUrl: Type.String({ format: 'uri', description: 'URL of the image to process' }),
  userId: Type.String({ minLength: 1, description: 'ID of the user initiating the job' }),
});

type ImageProcessingJobBody = Static<typeof ImageProcessingJobBodySchema>;

const jobRoutes: FastifyPluginAsync = async (fastify: FastifyInstance) => {
  fastify.post<{ Body: ImageProcessingJobBody }>(
    '/process-image',
    {
      schema: {
        body: ImageProcessingJobBodySchema,
        response: {
          202: Type.Object({
            message: Type.String(),
            jobId: Type.String(),
          }),
        },
      },
      preHandler: fastify.auth([fastify.verifyJWT]), // Assuming authentication is required
    },
    async (request, reply) => {
      const { imageUrl, userId } = request.body;
      logger.info(`Received request to process image for user ${userId}: ${imageUrl}`);

      try {
        // Use the decorated queue producer to add a job
        const job = await fastify.queues.imageProcessing.addJob(imageUrl, userId);

        reply.status(202).send({
          message: 'Image processing job initiated',
          jobId: job.id!, // job.id is guaranteed to be present after successful addition
        });
      } catch (error) {
        logger.error(`Error initiating image processing job: ${error.message}`, { error, imageUrl, userId });
        // Centralized error handler will catch this, but we throw for clarity
        throw fastify.httpErrors.internalServerError('Failed to initiate image processing job.');
      }
    }
  );
};

export default jobRoutes;

Explanation:

  • Uses TypeBox for request body validation, ensuring imageUrl is a valid URI and userId is present. This is a critical security and data integrity best practice.
  • preHandler: fastify.auth([fastify.verifyJWT]): Assumes you have an authentication plugin (verifyJWT) from previous chapters to secure this endpoint.
  • fastify.queues.imageProcessing.addJob(imageUrl, userId): This is how we access our decorated queue producer.
  • Responds with 202 Accepted immediately, indicating that the request has been accepted for processing but not yet completed. This is the correct HTTP status for asynchronous operations.
  • Includes comprehensive logging for request reception and error handling.

5. Create the Queue Worker (src/queue/workers/imageProcessorWorker.ts):

This separate Node.js script will run independently and consume jobs from the queue.

// src/queue/workers/imageProcessorWorker.ts
import { Worker, Job } from 'bullmq';
import { getRedisConnectionOptions } from '../../config/redis';
import { logger } from '../../config/logger';
import { IMAGE_PROCESSING_QUEUE_NAME } from '../producers/imageProcessor';

/**
 * Defines the image processing worker.
 * This worker will listen to the 'image-processing-queue' and execute jobs.
 */
export const imageProcessingWorker = new Worker(
  IMAGE_PROCESSING_QUEUE_NAME,
  async (job: Job) => {
    const { imageUrl, userId } = job.data;
    logger.info(`Worker ${job.id} started processing image '${imageUrl}' for user '${userId}'.`);

    // Simulate a long-running task (e.g., image resizing, manipulation, API call)
    try {
      // Example: Download image, resize, upload to S3, update DB
      await new Promise(resolve => setTimeout(resolve, Math.random() * 5000 + 1000)); // Simulate 1-6 second task

      // Simulate a random failure for demonstration purposes
      if (Math.random() < 0.1) { // 10% chance of failure
        throw new Error('Simulated image processing failure.');
      }

      logger.info(`Worker ${job.id} successfully processed image '${imageUrl}' for user '${userId}'.`);
      // You might update a database here, send a notification, etc.
      // job.updateProgress(100); // Update job progress if needed
      return { status: 'completed', processedAt: new Date(), originalUrl: imageUrl };
    } catch (error) {
      logger.error(`Worker ${job.id} failed to process image '${imageUrl}' for user '${userId}': ${error.message}`, { error, jobData: job.data });
      // BullMQ will automatically retry the job based on 'attempts' option
      throw error; // Re-throw to mark job as failed for BullMQ to handle retries
    }
  },
  {
    connection: getRedisConnectionOptions(),
    concurrency: 5, // Process up to 5 jobs concurrently
  }
);

// Log worker events for monitoring and debugging
imageProcessingWorker.on('completed', (job) => {
  logger.info(`Job ${job.id} completed successfully.`);
});

imageProcessingWorker.on('failed', (job, err) => {
  logger.error(`Job ${job?.id} failed with error: ${err.message}`, { error: err, jobData: job?.data });
});

imageProcessingWorker.on('active', (job) => {
  logger.info(`Job ${job.id} is active and being processed.`);
});

imageProcessingWorker.on('stalled', (jobId) => {
  logger.warn(`Job ${jobId} has stalled.`);
});

imageProcessingWorker.on('error', (err) => {
  logger.error(`Worker error: ${err.message}`, { error: err });
});

logger.info(`Image processing worker initialized, listening on queue '${IMAGE_PROCESSING_QUEUE_NAME}'.`);

// Export a function to start and stop the worker
export const startImageProcessingWorker = () => {
  logger.info('Starting image processing worker...');
  // The worker starts listening to jobs as soon as it's instantiated
  // No explicit 'start' method needed for BullMQ Worker
};

export const stopImageProcessingWorker = async () => {
  logger.info('Stopping image processing worker...');
  try {
    await imageProcessingWorker.close();
    logger.info('Image processing worker stopped gracefully.');
  } catch (error) {
    logger.error(`Error stopping image processing worker: ${error.message}`, { error });
  }
};

Explanation:

  • Worker(IMAGE_PROCESSING_QUEUE_NAME, async (job) => { ... }, { connection, concurrency }): The core of our worker.
    • The first argument is the queue name, matching the producer.
    • The second argument is an async function that defines the job processing logic. It receives the job object, which contains job.data (our imageUrl and userId).
    • The third argument is options, including connection (same Redis options) and concurrency (how many jobs the worker can process in parallel).
  • Simulated Task: We use setTimeout to mimic a long-running, asynchronous operation. A random failure is also introduced to demonstrate BullMQ’s retry mechanism.
  • Error Handling: If the job processing logic throws an error, BullMQ will catch it and manage retries based on the attempts and backoff options defined in the queue producer.
  • Event Listeners: Crucial for monitoring the worker’s activity, job completions, failures, and stalls. This provides visibility into your background processing.
  • startImageProcessingWorker and stopImageProcessingWorker: Provides methods to manage the worker’s lifecycle, especially useful for graceful shutdowns.

6. Create a separate worker entry point (worker.ts):

This file will be the main entry point for running our background worker.

// worker.ts
import { startImageProcessingWorker, stopImageProcessingWorker } from './src/queue/workers/imageProcessorWorker';
import { logger } from './src/config/logger';

async function main() {
  logger.info('Starting background worker application...');
  startImageProcessingWorker();

  // Handle graceful shutdown
  process.on('SIGTERM', async () => {
    logger.info('SIGTERM signal received. Shutting down worker gracefully...');
    await stopImageProcessingWorker();
    process.exit(0);
  });

  process.on('SIGINT', async () => {
    logger.info('SIGINT signal received. Shutting down worker gracefully...');
    await stopImageProcessingWorker();
    process.exit(0);
  });

  process.on('unhandledRejection', (reason, promise) => {
    logger.error('Unhandled Rejection at:', { promise, reason });
  });

  process.on('uncaughtException', (error) => {
    logger.error('Uncaught Exception:', { error });
    process.exit(1); // Exit process after uncaught exception
  });
}

main().catch((error) => {
  logger.fatal(`Worker application failed to start: ${error.message}`, { error });
  process.exit(1);
});

Explanation:

  • This is a standalone script that initializes and starts the imageProcessingWorker.
  • Graceful Shutdown: It includes SIGTERM and SIGINT (Ctrl+C) handlers to ensure that the worker can shut down cleanly, closing its Redis connections before exiting. This is vital for preventing orphaned jobs or connection leaks in production.
  • Unhandled Errors: Catches unhandledRejection and uncaughtException to log critical errors and prevent the worker from crashing silently.

7. Add npm scripts to run the worker:

Update your package.json to easily start the worker.

// package.json
{
  "name": "your-app-name",
  "version": "1.0.0",
  "description": "",
  "main": "server.ts",
  "scripts": {
    "dev": "npm run build && concurrently \"npm run watch:server\" \"npm run watch:worker\"",
    "build": "rimraf dist && tsc",
    "start": "node dist/server.js",
    "start:worker": "node dist/worker.js",
    "watch:server": "tsc-watch --onSuccess \"node dist/server.js\"",
    "watch:worker": "tsc-watch --project tsconfig.worker.json --onSuccess \"node dist/worker.js\"",
    "test": "jest",
    "lint": "eslint . --ext .ts",
    "lint:fix": "eslint . --ext .ts --fix"
  },
  // ... rest of package.json
}

Explanation:

  • start:worker: A new script to run the compiled worker.
  • watch:worker: A development script that watches for changes in worker-related files and restarts it. We’ll need a separate tsconfig.worker.json to compile worker.ts and its dependencies.

8. Create tsconfig.worker.json:

This separate TypeScript configuration ensures that worker.ts and its dependencies (like src/queue/workers) are compiled correctly without interfering with the main server’s compilation.

// tsconfig.worker.json
{
  "extends": "./tsconfig.json",
  "compilerOptions": {
    "outDir": "./dist",
    "rootDir": "./",
    "composite": true,
    "tsBuildInfoFile": "./dist/.tsbuildinfo-worker"
  },
  "include": [
    "worker.ts",
    "src/config/**/*.ts",
    "src/queue/**/*.ts"
  ],
  "exclude": [
    "node_modules",
    "**/*.spec.ts",
    "**/*.test.ts",
    "src/server.ts",
    "src/app.ts",
    "src/plugins/**/*.ts",
    "src/routes/**/*.ts",
    "src/utils/**/*.ts"
  ]
}

Explanation:

  • extends: "./tsconfig.json": Inherits common settings from the main tsconfig.json.
  • include: Explicitly lists worker.ts and its direct dependencies in src/config and src/queue.
  • exclude: Prevents compilation of server-specific files, keeping the worker build lean.
  • composite: true: Enables project references, which is good practice for larger monorepos or when separating build concerns.
  • tsBuildInfoFile: Generates a separate build info file for incremental compilation.

c) Testing This Component

To test our background job system, you need three main components running: Redis, the Fastify API server, and the background worker.

  1. Start Docker Compose services (including Redis):

    docker-compose up -d --build
    

    Verify Redis is running: docker-compose ps should show myapp_redis as Up.

  2. Start the Fastify API Server:

    npm run dev # Or `npm start` for production build
    

    This will start your API server, which acts as the job producer.

  3. Start the Background Worker: In a separate terminal, run the worker:

    npm run start:worker # Or `npm run watch:worker` for dev
    

    You should see logs indicating the worker has started and is listening to the queue.

  4. Send a request to the API to dispatch a job: Use curl or a tool like Postman/Insomnia. Ensure you provide a valid JWT token if verifyJWT is enabled.

    curl -X POST http://localhost:PORT/api/jobs/process-image \
         -H "Content-Type: application/json" \
         -H "Authorization: Bearer YOUR_JWT_TOKEN" \
         -d '{
               "imageUrl": "https://picsum.photos/200/300",
               "userId": "testuser-123"
             }'
    

    Replace PORT with your Fastify server’s port (e.g., 3000). Replace YOUR_JWT_TOKEN with a valid token obtained from your authentication endpoint.

Expected Behavior:

  • API Server Terminal: You should see logs like Received request to process image... and Successfully added job ... to queue 'image-processing-queue'.
  • API Response: You should receive a 202 Accepted response with a jobId.
    {
        "message": "Image processing job initiated",
        "jobId": "bullmq-job-id"
    }
    
  • Worker Terminal: Shortly after the API responds, you should see logs from the worker:
    • Job <jobId> is active and being processed.
    • Worker <jobId> started processing image ...
    • After a few seconds (simulated delay): Worker <jobId> successfully processed image ... (or Job <jobId> failed... if it hit the simulated 10% failure rate).
    • If a job fails, you might see it being retried based on your attempts configuration.

This confirms that your API successfully dispatches jobs and your worker successfully consumes and processes them asynchronously.

Production Considerations

Deploying a system with background jobs requires careful thought to ensure reliability, scalability, and maintainability.

  • Error Handling and Retries:

    • Worker Resilience: The attempts and backoff options in BullMQ are critical. Configure them wisely. For transient errors (e.g., network issues), retries are good. For persistent errors (e.g., invalid data), too many retries can be wasteful.
    • Dead-Letter Queues (DLQ): While BullMQ doesn’t have a built-in DLQ concept like some message brokers, you can implement one. Jobs that exhaust all retries are marked as failed. You can then have a separate process or BullMQ listener that monitors failed jobs and moves them to a “dead-letter” queue for manual inspection or specific handling.
    • Idempotency: Ensure your worker jobs are idempotent. If a job is retried, executing it multiple times with the same input should produce the same result and not cause unintended side effects (e.g., sending the same email twice, double-charging a user).
  • Performance Optimization and Scalability:

    • Horizontal Scaling: Both the API (producers) and workers (consumers) can be scaled horizontally. Run multiple instances of your Fastify API and multiple instances of your worker process. Redis acts as the central coordination point.
    • Concurrency: The concurrency option in the Worker constructor controls how many jobs a single worker instance can process simultaneously. Tune this based on your worker’s resource usage (CPU, memory, I/O).
    • Batching: For certain types of tasks (e.g., sending many notifications), it might be more efficient to batch them into a single job rather than creating many small jobs.
    • Monitoring Queue Length: Keep an eye on the number of pending jobs in your queue. A consistently growing queue indicates that your workers are not keeping up with the job production rate, suggesting you need more worker instances or better optimized job processing.
  • Security Considerations:

    • Redis Security:
      • Authentication: Always protect your Redis instance with a password (requirepass in redis.conf). Update your REDIS_PASSWORD environment variable and ioredis options.
      • Network Access: Restrict network access to Redis only from your application servers and workers. Do not expose Redis to the public internet. Use VPC security groups or firewalls.
      • TLS/SSL: For sensitive data, use TLS/SSL to encrypt communication between your application and Redis.
    • Job Payload Validation: Always validate the data (job.data) received by your workers, just as you would validate API request bodies. Malicious or malformed data could lead to crashes or security vulnerabilities in your worker.
  • Logging and Monitoring:

    • Centralized Logging: Ensure both your API server and worker processes send logs to a centralized logging system (e.g., ELK stack, Datadog, CloudWatch Logs). This is crucial for debugging and understanding the system’s behavior.
    • Metrics: Monitor key metrics:
      • Queue size: Number of pending, active, completed, failed jobs.
      • Job processing time: Average and percentile processing times.
      • Worker health: CPU, memory usage of worker processes.
      • Redis metrics: Latency, memory usage, hit/miss ratio.
    • BullMQ UI (Optional): BullMQ provides a UI (@bull-board/api and @bull-board/ui) that can be a valuable tool for visualizing queues, jobs, and worker status in real-time during development and production. Consider integrating it for better observability.

Code Review Checkpoint

At this point, you have successfully implemented a robust background job processing system using BullMQ and Redis.

New Files Created:

  • src/config/redis.ts: Centralized Redis connection configuration.
  • src/queue/producers/imageProcessor.ts: Defines the image-processing-queue and the addImageProcessingJob function.
  • src/plugins/queue.ts: Fastify plugin to decorate the Fastify instance with queue producers.
  • src/routes/jobRoutes.ts: API route to trigger image processing jobs.
  • src/queue/workers/imageProcessorWorker.ts: The worker logic for processing image jobs.
  • worker.ts: The main entry point for running the background worker.
  • tsconfig.worker.json: TypeScript configuration specifically for the worker.

Files Modified:

  • docker-compose.yml: Added a redis service and redis-data volume.
  • .env: Added REDIS_HOST, REDIS_PORT, REDIS_DB.
  • src/app.ts: Registered the queuePlugin.
  • package.json: Added start:worker and watch:worker scripts.

This setup provides a clear separation of concerns: your API remains lean and responsive, focusing on receiving requests and dispatching tasks, while dedicated workers handle the heavy lifting asynchronously. This architecture significantly improves the scalability and resilience of your application, making it production-ready for handling various long-running operations.

Common Issues & Solutions

  1. Issue: Redis connection errors (e.g., connect ECONNREFUSED)

    • Cause: The Node.js application (API or worker) cannot connect to the Redis server. This could be due to Redis not running, incorrect host/port in .env, or firewall issues.
    • Solution:
      • Ensure Redis is running: docker-compose ps (if using Docker) or check your Redis server status.
      • Verify REDIS_HOST and REDIS_PORT in your .env file are correct. Remember that if your Node.js app is running inside Docker, REDIS_HOST should be the service name (redis), not localhost. If your Node.js app is running locally outside Docker, REDIS_HOST should be localhost.
      • Check Docker logs for the Redis container (docker-compose logs redis) for any startup errors.
      • Ensure no firewall is blocking port 6379.
  2. Issue: Jobs are added to the queue, but the worker doesn’t process them.

    • Cause: The worker process is not running, or it’s connected to a different Redis instance/database, or the queue name doesn’t match.
    • Solution:
      • Verify the worker process is running in a separate terminal: npm run start:worker. Check its logs for any startup errors or connection issues.
      • Double-check that IMAGE_PROCESSING_QUEUE_NAME is identical in both src/queue/producers/imageProcessor.ts and src/queue/workers/imageProcessorWorker.ts.
      • Ensure both the producer (API) and consumer (worker) are configured to connect to the same Redis instance and database (check REDIS_HOST, REDIS_PORT, REDIS_DB in .env).
      • Check for any BullMQ error events on the worker (e.g., worker.on('error', ...)) that might indicate underlying issues.
  3. Issue: Jobs are failing, but no retries are happening, or jobs are stuck.

    • Cause: Incorrect defaultJobOptions or job.options for retries, or a worker process is crashing without proper error handling.
    • Solution:
      • Review defaultJobOptions in imageProcessingQueue (src/queue/producers/imageProcessor.ts). Ensure attempts is set to a value greater than 1 and backoff is configured.
      • Make sure your worker’s job processing function (async (job: Job) => { ... }) correctly throws an error when a failure occurs. If it catches an error and doesn’t re-throw it, BullMQ won’t know the job failed and won’t retry.
      • Check worker logs for uncaughtException or unhandledRejection errors, which could indicate the worker is crashing unexpectedly. Implement robust error handling in worker.ts as shown.
      • If jobs are stuck in an active state indefinitely, the worker might have crashed mid-processing without marking the job as failed or completed. BullMQ has a “stalled” job detection mechanism, but it relies on worker health checks. Ensure Redis is stable and worker processes have enough resources.

Testing & Verification

To ensure everything is correctly implemented and working as expected, perform the following end-to-end test:

  1. Clean Up Previous Runs (Optional but Recommended): Stop all Docker containers and remove volumes to start fresh:

    docker-compose down -v
    
  2. Start All Services:

    docker-compose up -d --build
    npm run dev # Or `npm start`
    npm run start:worker
    

    Ensure all three terminals (Docker, API, Worker) are actively logging.

  3. Trigger Multiple Jobs: Send several POST /api/jobs/process-image requests (e.g., 5-10 requests, using different userId values to distinguish them in logs).

    curl -X POST http://localhost:PORT/api/jobs/process-image -H "Content-Type: application/json" -H "Authorization: Bearer YOUR_JWT_TOKEN" -d '{"imageUrl": "https://picsum.photos/200/300", "userId": "user-A"}'
    # Repeat with user-B, user-C, etc.
    
  4. Verify Job Processing:

    • API Logs: Confirm that each request results in a Successfully added job ... to queue.
    • Worker Logs: Observe the worker terminal. You should see jobs being picked up (Job ... is active), processed (Worker ... started processing), and eventually completed (Job ... completed successfully) or failed (Job ... failed with error).
    • Retries: If you hit the simulated 10% failure rate, you should see jobs failing and then being retried automatically by the worker, eventually succeeding or exhausting their attempts.
    • Concurrency: If you send many jobs quickly, you should see the worker processing multiple jobs concurrently (up to the concurrency limit you set, e.g., 5).
  5. Graceful Shutdown Test:

    • In the API server terminal, press Ctrl+C. You should see Closing BullMQ image processing queue... logs.
    • In the worker terminal, press Ctrl+C. You should see Shutting down worker gracefully... and Image processing worker stopped gracefully. logs.
    • This confirms that your application and worker can shut down cleanly without leaving open connections or orphaned processes.

This comprehensive testing approach helps ensure that your background job system is robust, handles failures, and scales correctly.

Summary & Next Steps

Congratulations! In this chapter, you’ve successfully implemented a crucial architectural pattern for modern web applications: handling long-running tasks with background jobs using BullMQ and Redis. You learned how to:

  • Integrate Redis into your Docker Compose setup.
  • Configure BullMQ for robust queue management.
  • Create job producers within your Fastify API to dispatch tasks asynchronously.
  • Develop separate worker processes to consume and execute these tasks.
  • Implement production-ready practices like graceful shutdowns, error handling, retries, and comprehensive logging.

By offloading compute-intensive or time-consuming operations to background workers, your Fastify API remains responsive, providing a better user experience and significantly improving the scalability and resilience of your entire application. This decoupling is a cornerstone of building microservices and distributed systems.

In the next chapter, Chapter 9: Implementing Caching for Performance, we will explore another critical performance optimization technique: caching. We’ll learn how to use Redis as a cache store to reduce database load and speed up frequently accessed data, further enhancing your application’s responsiveness and efficiency.