Skip to main content

Queue Data Structures: How to Build a Node Task Queue

Share:

This tutorial explains queue data structures and demonstrates queuing systems. Queues are often used to process long-running tasks such as email newsletter delivery. Below, you’ll build a simple Node task queue.

It’s not always practical to execute a task the moment it’s requested.

Consider an email newsletter administration system. After writing, an administrator must hit a big red “SEND NOW” button. The application could send every email immediately and show a “completed” response. That would work for a dozen messages, but how long would it take for 1,000 subscribers or more? The browser request would time out before the process completed.

Another example: a user can upload any number of photographs to a gallery application. The system resizes and sharpens each image for alternative dimensions. This process could run on upload, but it would to incur a delay for every image.

It’s more effective to decouple tasks in these situations. The user receives an instant response but task processing occurs in the background. Other applications or servers handle tasks and schedule re-attempts on failure. The user can receive alerts or examine logs to determine progress.

What Are Queue Data Structures?

A queue is a data structure which holds a collection of items:

  • Any process can send (or enqueue) an item at any time — such as send newsletter X to recipient Y.
  • Any process can receive (or dequeue) the item at the front of the queue — for example, the item that’s been in the queue for longest.

Queue data structures are a first-in-first-out (FIFO) structure. The first item added to the queue will be the first out.

A Basic JavaScript Task Queue Data Structure

You can create a task queue using a JavaScript array. The push() method adds an item to the end of an Array while the shift() method removes and returns an item from the start:

const queue = [];

queue.push( 'item 1' );
queue.push( 'item 2' );

console.log( queue.shift() ); // item 1
console.log( queue.shift() ); // item 2
console.log( queue.shift() ); // undefined

Your queue data structures can hold any data in individual array elements. You can push strings, numbers, Booleans, other arrays, or objects.

You can use an ES6 class to define any number of separate queues:

class Queue {

  constructor() { this.q = []; }
  send( item )  { this.q.push( item ); }
  receive()     { return this.q.shift(); }

}

// define two queues
const q1 = new Queue();
const q2 = new Queue();

q1.send('item 1');
q2.send('item 2');

console.log( q1.receive() ); // item 1
console.log( q1.receive() ); // undefined
console.log( q2.receive() ); // item 2

These simple queue data structures may be useful for less critical client-side code such as queuing UI updates so processing occurs in a single DOM update. localStorage or IndexedDB can offer a level of data persistence if necessary.

Queuing Platforms

In-memory queues are less practical for complex server applications:

  1. Two or more separate applications can’t (easily) access the same queue.
  2. Queue data disappears when the application terminates.

Purpose built message-broker software provides more robust queuing. Platforms vary, but offer features such as:

  • data persistence in a choice of databases with replication, sharding, and clustering options
  • a range of access protocols, often including HTTP and Web Sockets
  • any number of separate queues
  • delayed messaging, where message processing can occur at a later time
  • transaction-like support, where a message is re-queued when processing isn’t confirmed
  • publish-subscribe patterns, where applications receive an event when a new item appears on a queue

Message-broker software includes Redis, RabbitMQ, Apache ActiveMQ, and Gearman. Cloud messaging services include Amazon SQS, Azure Service Bus, and Google Pub/Sub.

These may be viable options for enterprise-level applications. Yet they could be overkill if you have simpler requirements and already use a database.

Use MongoDB as Our Node Task Queue’s Message Broker

It’s possible to develop a sophisticated Node task queue system to manage queue data structures in a couple of hundred lines of code.

The queue-mongodb module described here uses MongoDB for data storage, but the same concepts could be adopted by any SQL or NoSQL database. The code is available on GitHub and npm.

Node Task Queue Project: Getting Started

Make sure you have Node.js 14 or above installed, then create a new project folder such as queue-test. Add a new package.json file:

{
  "name": "queue-test",
  "version": "1.0.0",
  "description": "Queue test",
  "type": "module",
  "scripts": {
    "send": "node ./send.js",
    "receive": "node ./receive.js"
  }
}

Note: "type": "module" configures the project to use ES6 modules. The "scripts" will send and receive queued items.

Install the queue-mongodb module:

npm install @craigbuckler/queue-mongodb

Then create a .env file with your MongoDB database connection credentials. For example:

QUEUE_DB_HOST=localhost
QUEUE_DB_PORT=27017
QUEUE_DB_USER=root
QUEUE_DB_PASS=mysecret
QUEUE_DB_NAME=qdb
QUEUE_DB_COLL=queue

Note: this creates a queue collection (QUEUE_DB_COLL) in the qdb database (QUEUE_DB_NAME). You can use an existing database, but make sure the collection doesn’t clash with another.

Database read/write access must be granted to the user root (QUEUE_DB_USER) with password mysecret (QUEUE_DB_PASS). Set both values blank if no authentication is required.

Start a MongoDB database if it’s not already running. Those with Docker and Docker Compose can create a new docker-compose.yml file:

version: '3'

services:

  queuedb:
    environment:
      - MONGO_INITDB_ROOT_USERNAME=${QUEUE_DB_USER}
      - MONGO_INITDB_ROOT_PASSWORD=${QUEUE_DB_PASS}
    image: mongo:4.4-bionic
    container_name: queuedb
    volumes:
      - queuedata:/data/db
    ports:
      - "${QUEUE_DB_PORT}:${QUEUE_DB_PORT}"
    restart: always

volumes:
  queuedata:

Then run docker-compose up to download and start MongoDB with a persistent data volume.

Docker is available Linux, macOS, and Windows 10. See the Docker installation instructions.

Create a new send.js file to add a randomly generated email messages to a queue named news:

// Queue module
import { Queue } from '@craigbuckler/queue-mongodb';

// initialize queue named 'news'
const newsQ = new Queue('news');

// random name
const name = String.fromCharCode(65 + Math.random() * 26).repeat(1 + Math.random() * 10);

// add object to queue
const send = await newsQ.send({
  name:     name,
  email:    `${ name.toLowerCase() }@test.com`,
  date:     new Date(),
  message:  `Hey there, ${ name }!`
});

console.log('send', send);

// get number of items remaining in queue
console.log('items queued:', await newsQ.count());

// close connection and quit
await newsQ.close();

Execute it with npm run send and you’ll see output such as this:

send {
  _id: 607d692563bd6d05bb459931,
  sent: 2021-04-19T11:27:33.000Z,
  data: {
    name: 'AAA',
    email: 'aaa@test.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  }
}
items queued: 1

The .send() method returns an qItem object containing:

  1. the MongoDB document _id
  2. the date/time the item was originally queued, and
  3. a copy of the message data

Run the script any number of times to add further items to the queue. The items queued will increment on every run.

Now create a new receive.js file to retrieve messages from the same Node task queue:

// Queue module
import { Queue } from '@craigbuckler/queue-mongodb';

// initialize queue named 'news'
const newsQ = new Queue('news');

let qItem;

do {

  qItem = await newsQ.receive();

  if (qItem) {

    console.log('\nreceive', qItem);

    // ... process qItem.data ...
    // ... to send email ...

  }

} while (qItem);

// number of items remaining in queue
console.log('items queued:', await newsQ.count());

await newsQ.close();

Run npm run receive to fetch and process queued items:

receive {
  _id: 607d692563bd6d05bb459931,
  sent: 2021-04-19T11:27:33.000Z,
  data: {
    name: 'AAA',
    email: 'aaa@test.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  }
}
items queued: 0

No email is sent in this example, but that could be implemented using Nodemailer or another suitable module.

If processing fails — perhaps because the mail server is down — an item can be re-queued with this:

newsQ.send( qItem.data, 600 );

The second 600 argument is an optional number of seconds or future date. This command re-queues the item after 600 seconds (ten minutes) have elapsed.

This is a simple example, but any application can send data to any number of queues. Another process, perhaps started as a cron job, can receive and process items when necessary.

How the queue-mongodb Module Works

The type string passed to the class constructor defines a queue name. The .send() method creates a new MongoDB document when passed data to add to the queue. The MongoDB document contains:

  1. A MongoDB _id (the creation date/time is encoded within the value).
  2. The queue type.
  3. A processing date/time value named proc. It’s possible to set a future time but the current time is the default.
  4. The item data. This can be anything: a Boolean, number, string, array, object, and so on.

The .receive() method locates the oldest document that has a matching type and a proc date/time in the past. The document is formatted, returned to the calling code, and deleted from the database.

The following sections describe the module in further detail.

queue-mongodb Module: Initialization

The dotenv module reads the .env environment variables if necessary. A database connection object is created using the official mongodb driver module:

// modules
import dotenv from 'dotenv';
import mongoDB from 'mongodb';

// environment variables
if (!process.env.QUEUE_DB_HOST) {
  dotenv.config();
}

// MongoDB database client
const
  dbName = process.env.QUEUE_DB_NAME || 'qdb',
  qCollectionName = process.env.QUEUE_DB_COLL || 'queue',
  qAuth = process.env.QUEUE_DB_USER ? `${ process.env.QUEUE_DB_USER }:${ process.env.QUEUE_DB_PASS || '' }@` : '',

  dbClient = new mongoDB.MongoClient(
    `mongodb://${ qAuth }${ process.env.QUEUE_DB_HOST || 'localhost' }:${ process.env.QUEUE_DB_PORT || '27017' }/`,
    { useNewUrlParser: true, useUnifiedTopology: true }
  );

The qCollection variable holds a reference to the database’s queue collection (defined by QUEUE_DB_COLL). It’s created and returned by the dbConnect() function, which also defines the collection schema and indexes when necessary. All Queue methods run const q = await dbConnect(); to get the collection reference:

let qCollection; // queue collection


// shared connection
async function dbConnect() {

  // collection available
  if (qCollection) return qCollection;

  // connect to database
  await dbClient.connect();

  // collection defined?
  const
    db = dbClient.db( dbName ),
    colList = await db.listCollections({ name: qCollectionName }, { nameOnly: true }).toArray();

  if (!colList.length) {

    // define collection schema
    let $jsonSchema = {
      bsonType: 'object',
      required: [ 'type', 'proc', 'data' ],
      properties: {
        type: { bsonType: 'string', minLength: 1 },
        proc: { bsonType: 'date' }
      }
    };
    await db.createCollection(qCollectionName, { validator: { $jsonSchema } });

    // define indexes
    await db.collection( qCollectionName ).createIndexes([
      { key: { type: 1 } },
      { key: { proc: 1 } }
    ]);

  }

  // return queue collection
  qCollection = db.collection( qCollectionName );
  return qCollection;

}

The dbClose() function closes the database connection:

// close MongoDB database connection
async function dbClose() {

  if (qCollection) {
    await dbClient.close();
    qCollection = null;
  }

}

queue-mongodb Module: Queue Constructor

The Queue constructor sets the queue type or name:

export class Queue {

  constructor(type = 'DEFAULT') {

    this.type = type;

  }

queue-mongodb Module: Queue.send() Method

The .send() method adds data to the queue with the appropriate type. It has an optional delayUntil parameter, which adds an item to the queue at a future time by specifying a number of seconds or a Date().

The method inserts a new document into the database and returns a qItem object ( { _id, sent, data} ) or null if unsuccessful:

  async send(data = null, delayUntil) {

    try {

      // calculate start date/time
      let proc = new Date();
      if (delayUntil instanceof Date) {
        proc = delayUntil;
      }
      else if (!isNaN(delayUntil)) {
        proc = new Date( +proc + delayUntil * 1000);
      }

      // add item to queue
      const
        q     = await dbConnect(),
        ins   = await q.insertOne({
          type: this.type, proc, data
        });

      // return qItem
      return ins && ins.insertedCount && ins.insertedId ? { _id: ins.insertedId, sent: ins.insertedId.getTimestamp(), data } : null;

    }
    catch(err) {

      console.log(`Queue.send error:\n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.receive() Method

The .receive() method retrieves and deletes the oldest queued item in the database with a specific type and a proc date/time in the past. It returns a qItem object ( {_id, sent, data} ) or null if nothing is available or an error occurs:

  async receive() {

    try {

      // find and delete next item on queue
      const
        now = new Date(),
        q   = await dbConnect(),
        rec = await q.findOneAndDelete(
          {
            type: this.type,
            proc: { $lt: now }
          },
          {
            sort: { proc: 1 }
          }
        );

      const v = rec && rec.value;

      // return qItem
      return v ? { _id: v._id, sent: v._id.getTimestamp(), data: v.data } : null;

    }
    catch(err) {

      console.log(`Queue.receive error:\n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.remove() Method

The .remove() method deletes the queued item identified by a qItem object ( {_id, sent, data} ) returned by the .send() method. It can be used to remove a queued item regardless of its position in the queue.

The method returns the number of deleted documents (normally 1) or null when an error occurs:

  async remove(qItem) {

    // no item to remove
    if (!qItem || !qItem._id) return null;

    try {

      const
        q   = await dbConnect(),
        del = await q.deleteOne({ _id: qItem._id });

      return del.deletedCount;

    }
    catch(err) {

      console.log(`Queue.remove error:\n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.purge() Method

The .purge() method deletes all queued items of the same type and returns the number of deletions:

  async purge() {

    try {

      const
        q   = await dbConnect(),
        del = await q.deleteMany({ type: this.type });

      return del.deletedCount;

    }
    catch(err) {

      console.log(`Queue.purge error:\n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.count() Method

The .count() method returns the number of queued items of the same type:

  async count() {

    try {

      const q = await dbConnect();
      return await q.countDocuments({ type: this.type });

    }
    catch(err) {

      console.log(`Queue.count error:\n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.close() Method

The .close() method runs the dbClose() function to terminate the database connection so the Node.js event loop can end:

  async close() {

    try {

      await dbClose();

    }
    catch(err) {

      console.log(`Queue.close error:\n${ err }`);
      return null;

    }

  }

// end of class
}

A New Queue

Queues are a consideration for any web application with computationally expensive functions that could cause a bottleneck. They can improve performance and maintenance by decoupling applications into smaller, faster, more robust processes. Dedicated message broker software is an option, but simple queuing systems like the Node task queue we built today are possible in a few dozen lines of code.

Craig is a freelance UK web consultant who built his first page for IE2.0 in 1995. Since that time he's been advocating standards, accessibility, and best-practice HTML5 techniques. He's created enterprise specifications, websites and online applications for companies and organisations including the UK Parliament, the European Parliament, the Department of Energy & Climate Change, Microsoft, and more. He's written more than 1,000 articles for SitePoint and you can find him @craigbuckler.

Integromat Tower Ad