- Key Takeaways
- What Are Queue Data Structures?
- A Basic JavaScript Task Queue Data Structure
- Queuing Platforms
- Use MongoDB as Our Node Task Queue’s Message Broker
- Node Task Queue Project: Getting Started
- How the queue-mongodb Module Works
- queue-mongodb Module: Initialization
- queue-mongodb Module: Queue Constructor
- queue-mongodb Module: Queue.send() Method
- queue-mongodb Module: Queue.receive() Method
- queue-mongodb Module: Queue.remove() Method
- queue-mongodb Module: Queue.purge() Method
- queue-mongodb Module: Queue.count() Method
- queue-mongodb Module: Queue.close() Method
- A New Queue
- FAQs About Using the Queue Data Structure in Node
This tutorial explains queue data structures and demonstrates queuing systems. Queues are often used to process long-running tasks such as email newsletter delivery. Below, you’ll build a simple Node task queue.
It’s not always practical to execute a task the moment it’s requested.
Consider an email newsletter administration system. After writing, an administrator must hit a big red “SEND NOW” button. The application could send every email immediately and show a “completed” response. That would work for a dozen messages, but how long would it take for 1,000 subscribers or more? The browser request would time out before the process completed.
Another example: a user can upload any number of photographs to a gallery application. The system resizes and sharpens each image for alternative dimensions. This process could run on upload, but it would to incur a delay for every image.
It’s more effective to decouple tasks in these situations. The user receives an instant response but task processing occurs in the background. Other applications or servers handle tasks and schedule re-attempts on failure. The user can receive alerts or examine logs to determine progress.
Key Takeaways
- Utilize MongoDB as a message broker to manage queue data structures efficiently, allowing for sophisticated Node task queue systems in just a few hundred lines of code.
- Decouple task processing in applications to enhance user experience by providing instant responses while background processes handle the workload, reducing bottlenecks.
- Implement basic queue operations using JavaScript arrays with push() and shift() methods, or use ES6 classes for more structured queue management.
- Explore robust queuing platforms like Redis, RabbitMQ, and cloud services such as Amazon SQS for enterprise-level applications, offering features like data persistence, delayed messaging, and transaction-like support.
- Leverage the queue-mongodb module for Node.js to easily add, receive, and manage queued tasks using MongoDB, complete with methods for counting, removing, and purging tasks.
What Are Queue Data Structures?
A queue is a data structure which holds a collection of items:
- Any process can send (or enqueue) an item at any time — such as send newsletter X to recipient Y.
- Any process can receive (or dequeue) the item at the front of the queue — for example, the item that’s been in the queue for longest.
Queue data structures are a first-in-first-out (FIFO) structure. The first item added to the queue will be the first out.
A Basic JavaScript Task Queue Data Structure
You can create a task queue using a JavaScript array. The push()
method adds an item to the end of an Array while the shift()
method removes and returns an item from the start:
const queue = [];
queue.push( 'item 1' );
queue.push( 'item 2' );
console.log( queue.shift() ); // item 1
console.log( queue.shift() ); // item 2
console.log( queue.shift() ); // undefined
Your queue data structures can hold any data in individual array elements. You can push strings, numbers, Booleans, other arrays, or objects.
You can use an ES6 class to define any number of separate queues:
class Queue {
constructor() { this.q = []; }
send( item ) { this.q.push( item ); }
receive() { return this.q.shift(); }
}
// define two queues
const q1 = new Queue();
const q2 = new Queue();
q1.send('item 1');
q2.send('item 2');
console.log( q1.receive() ); // item 1
console.log( q1.receive() ); // undefined
console.log( q2.receive() ); // item 2
These simple queue data structures may be useful for less critical client-side code such as queuing UI updates so processing occurs in a single DOM update. localStorage or IndexedDB can offer a level of data persistence if necessary.
Queuing Platforms
In-memory queues are less practical for complex server applications:
- Two or more separate applications can’t (easily) access the same queue.
- Queue data disappears when the application terminates.
Purpose built message-broker software provides more robust queuing. Platforms vary, but offer features such as:
- data persistence in a choice of databases with replication, sharding, and clustering options
- a range of access protocols, often including HTTP and Web Sockets
- any number of separate queues
- delayed messaging, where message processing can occur at a later time
- transaction-like support, where a message is re-queued when processing isn’t confirmed
- publish-subscribe patterns, where applications receive an event when a new item appears on a queue
Message-broker software includes Redis, RabbitMQ, Apache ActiveMQ, and Gearman. Cloud messaging services include Amazon SQS, Azure Service Bus, and Google Pub/Sub.
These may be viable options for enterprise-level applications. Yet they could be overkill if you have simpler requirements and already use a database.
Use MongoDB as Our Node Task Queue’s Message Broker
It’s possible to develop a sophisticated Node task queue system to manage queue data structures in a couple of hundred lines of code.
The queue-mongodb
module described here uses MongoDB for data storage, but the same concepts could be adopted by any SQL or NoSQL database. The code is available on GitHub and npm.
Node Task Queue Project: Getting Started
Make sure you have Node.js 14 or above installed, then create a new project folder such as queue-test
. Add a new package.json
file:
{
"name": "queue-test",
"version": "1.0.0",
"description": "Queue test",
"type": "module",
"scripts": {
"send": "node ./send.js",
"receive": "node ./receive.js"
}
}
Note: "type": "module"
configures the project to use ES6 modules. The "scripts"
will send and receive queued items.
Install the queue-mongodb module:
npm install @craigbuckler/queue-mongodb
Then create a .env
file with your MongoDB database connection credentials. For example:
QUEUE_DB_HOST=localhost
QUEUE_DB_PORT=27017
QUEUE_DB_USER=root
QUEUE_DB_PASS=mysecret
QUEUE_DB_NAME=qdb
QUEUE_DB_COLL=queue
Note: this creates a queue
collection (QUEUE_DB_COLL
) in the qdb
database (QUEUE_DB_NAME
). You can use an existing database, but make sure the collection doesn’t clash with another.
Database read/write access must be granted to the user root
(QUEUE_DB_USER
) with password mysecret
(QUEUE_DB_PASS
). Set both values blank if no authentication is required.
Start a MongoDB database if it’s not already running. Those with Docker and Docker Compose can create a new docker-compose.yml
file:
version: '3'
services:
queuedb:
environment:
- MONGO_INITDB_ROOT_USERNAME=${QUEUE_DB_USER}
- MONGO_INITDB_ROOT_PASSWORD=${QUEUE_DB_PASS}
image: mongo:4.4-bionic
container_name: queuedb
volumes:
- queuedata:/data/db
ports:
- "${QUEUE_DB_PORT}:${QUEUE_DB_PORT}"
restart: always
volumes:
queuedata:
Then run docker-compose up
to download and start MongoDB with a persistent data volume.
Docker is available Linux, macOS, and Windows 10. See the Docker installation instructions.
Create a new send.js
file to add a randomly generated email messages to a queue named news
:
// Queue module
import { Queue } from '@craigbuckler/queue-mongodb';
// initialize queue named 'news'
const newsQ = new Queue('news');
// random name
const name = String.fromCharCode(65 + Math.random() * 26).repeat(1 + Math.random() * 10);
// add object to queue
const send = await newsQ.send({
name: name,
email: `${ name.toLowerCase() }@test.com`,
date: new Date(),
message: `Hey there, ${ name }!`
});
console.log('send', send);
// get number of items remaining in queue
console.log('items queued:', await newsQ.count());
// close connection and quit
await newsQ.close();
Execute it with npm run send
and you’ll see output such as this:
send {
_id: 607d692563bd6d05bb459931,
sent: 2021-04-19T11:27:33.000Z,
data: {
name: 'AAA',
email: 'aaa@test.com',
date: 2021-04-19T11:27:33.426Z,
message: 'Hey there, AAA!'
}
}
items queued: 1
The .send()
method returns an qItem
object containing:
- the MongoDB document
_id
- the date/time the item was originally queued, and
- a copy of the message
data
Run the script any number of times to add further items to the queue. The items queued
will increment on every run.
Now create a new receive.js
file to retrieve messages from the same Node task queue:
// Queue module
import { Queue } from '@craigbuckler/queue-mongodb';
// initialize queue named 'news'
const newsQ = new Queue('news');
let qItem;
do {
qItem = await newsQ.receive();
if (qItem) {
console.log('\nreceive', qItem);
// ... process qItem.data ...
// ... to send email ...
}
} while (qItem);
// number of items remaining in queue
console.log('items queued:', await newsQ.count());
await newsQ.close();
Run npm run receive
to fetch and process queued items:
receive {
_id: 607d692563bd6d05bb459931,
sent: 2021-04-19T11:27:33.000Z,
data: {
name: 'AAA',
email: 'aaa@test.com',
date: 2021-04-19T11:27:33.426Z,
message: 'Hey there, AAA!'
}
}
items queued: 0
No email is sent in this example, but that could be implemented using Nodemailer or another suitable module.
If processing fails — perhaps because the mail server is down — an item can be re-queued with this:
newsQ.send( qItem.data, 600 );
The second 600
argument is an optional number of seconds or future date. This command re-queues the item after 600 seconds (ten minutes) have elapsed.
This is a simple example, but any application can send data to any number of queues. Another process, perhaps started as a cron
job, can receive and process items when necessary.
How the queue-mongodb
Module Works
The type
string passed to the class constructor defines a queue name. The .send()
method creates a new MongoDB document when passed data to add to the queue. The MongoDB document contains:
- A MongoDB
_id
(the creation date/time is encoded within the value). - The queue
type
. - A processing date/time value named
proc
. It’s possible to set a future time but the current time is the default. - The item
data
. This can be anything: a Boolean, number, string, array, object, and so on.
The .receive()
method locates the oldest document that has a matching type
and a proc
date/time in the past. The document is formatted, returned to the calling code, and deleted from the database.
The following sections describe the module in further detail.
queue-mongodb
Module: Initialization
The dotenv
module reads the .env
environment variables if necessary. A database connection object is created using the official mongodb
driver module:
// modules
import dotenv from 'dotenv';
import mongoDB from 'mongodb';
// environment variables
if (!process.env.QUEUE_DB_HOST) {
dotenv.config();
}
// MongoDB database client
const
dbName = process.env.QUEUE_DB_NAME || 'qdb',
qCollectionName = process.env.QUEUE_DB_COLL || 'queue',
qAuth = process.env.QUEUE_DB_USER ? `${ process.env.QUEUE_DB_USER }:${ process.env.QUEUE_DB_PASS || '' }@` : '',
dbClient = new mongoDB.MongoClient(
`mongodb://${ qAuth }${ process.env.QUEUE_DB_HOST || 'localhost' }:${ process.env.QUEUE_DB_PORT || '27017' }/`,
{ useNewUrlParser: true, useUnifiedTopology: true }
);
The qCollection
variable holds a reference to the database’s queue collection (defined by QUEUE_DB_COLL
). It’s created and returned by the dbConnect()
function, which also defines the collection schema and indexes when necessary. All Queue
methods run const q = await dbConnect();
to get the collection reference:
let qCollection; // queue collection
// shared connection
async function dbConnect() {
// collection available
if (qCollection) return qCollection;
// connect to database
await dbClient.connect();
// collection defined?
const
db = dbClient.db( dbName ),
colList = await db.listCollections({ name: qCollectionName }, { nameOnly: true }).toArray();
if (!colList.length) {
// define collection schema
let $jsonSchema = {
bsonType: 'object',
required: [ 'type', 'proc', 'data' ],
properties: {
type: { bsonType: 'string', minLength: 1 },
proc: { bsonType: 'date' }
}
};
await db.createCollection(qCollectionName, { validator: { $jsonSchema } });
// define indexes
await db.collection( qCollectionName ).createIndexes([
{ key: { type: 1 } },
{ key: { proc: 1 } }
]);
}
// return queue collection
qCollection = db.collection( qCollectionName );
return qCollection;
}
The dbClose()
function closes the database connection:
// close MongoDB database connection
async function dbClose() {
if (qCollection) {
await dbClient.close();
qCollection = null;
}
}
queue-mongodb
Module: Queue
Constructor
The Queue
constructor sets the queue type
or name:
export class Queue {
constructor(type = 'DEFAULT') {
this.type = type;
}
queue-mongodb
Module: Queue.send()
Method
The .send()
method adds data to the queue with the appropriate type
. It has an optional delayUntil
parameter, which adds an item to the queue at a future time by specifying a number of seconds or a Date()
.
The method inserts a new document into the database and returns a qItem
object ( { _id
, sent
, data
} ) or null
if unsuccessful:
async send(data = null, delayUntil) {
try {
// calculate start date/time
let proc = new Date();
if (delayUntil instanceof Date) {
proc = delayUntil;
}
else if (!isNaN(delayUntil)) {
proc = new Date( +proc + delayUntil * 1000);
}
// add item to queue
const
q = await dbConnect(),
ins = await q.insertOne({
type: this.type, proc, data
});
// return qItem
return ins && ins.insertedCount && ins.insertedId ? { _id: ins.insertedId, sent: ins.insertedId.getTimestamp(), data } : null;
}
catch(err) {
console.log(`Queue.send error:\n${ err }`);
return null;
}
}
queue-mongodb
Module: Queue.receive()
Method
The .receive()
method retrieves and deletes the oldest queued item in the database with a specific type
and a proc
date/time in the past. It returns a qItem
object ( {_id
, sent
, data
} ) or null
if nothing is available or an error occurs:
async receive() {
try {
// find and delete next item on queue
const
now = new Date(),
q = await dbConnect(),
rec = await q.findOneAndDelete(
{
type: this.type,
proc: { $lt: now }
},
{
sort: { proc: 1 }
}
);
const v = rec && rec.value;
// return qItem
return v ? { _id: v._id, sent: v._id.getTimestamp(), data: v.data } : null;
}
catch(err) {
console.log(`Queue.receive error:\n${ err }`);
return null;
}
}
queue-mongodb
Module: Queue.remove()
Method
The .remove()
method deletes the queued item identified by a qItem
object ( {_id
, sent
, data
} ) returned by the .send()
method. It can be used to remove a queued item regardless of its position in the queue.
The method returns the number of deleted documents (normally 1) or null
when an error occurs:
async remove(qItem) {
// no item to remove
if (!qItem || !qItem._id) return null;
try {
const
q = await dbConnect(),
del = await q.deleteOne({ _id: qItem._id });
return del.deletedCount;
}
catch(err) {
console.log(`Queue.remove error:\n${ err }`);
return null;
}
}
queue-mongodb
Module: Queue.purge()
Method
The .purge()
method deletes all queued items of the same type
and returns the number of deletions:
async purge() {
try {
const
q = await dbConnect(),
del = await q.deleteMany({ type: this.type });
return del.deletedCount;
}
catch(err) {
console.log(`Queue.purge error:\n${ err }`);
return null;
}
}
queue-mongodb
Module: Queue.count()
Method
The .count()
method returns the number of queued items of the same type
:
async count() {
try {
const q = await dbConnect();
return await q.countDocuments({ type: this.type });
}
catch(err) {
console.log(`Queue.count error:\n${ err }`);
return null;
}
}
queue-mongodb
Module: Queue.close()
Method
The .close()
method runs the dbClose()
function to terminate the database connection so the Node.js event loop can end:
async close() {
try {
await dbClose();
}
catch(err) {
console.log(`Queue.close error:\n${ err }`);
return null;
}
}
// end of class
}
A New Queue
Queues are a consideration for any web application with computationally expensive functions that could cause a bottleneck. They can improve performance and maintenance by decoupling applications into smaller, faster, more robust processes. Dedicated message broker software is an option, but simple queuing systems like the Node task queue we built today are possible in a few dozen lines of code.
FAQs About Using the Queue Data Structure in Node
A queue is a linear data structure that follows the First-In-First-Out (FIFO) principle, where the first element added is the first to be removed. In a queue, elements are added to the rear and removed from the front.
Queues are useful for managing tasks or data in a sequential manner. They are commonly used for background processing, task scheduling, event handling, and managing resources like connections and requests.
You can implement a queue using arrays or linked lists. In Node.js, you can use the built-in array data structure or implement a custom queue using JavaScript objects and methods.
The key difference is the order of removal. In a queue, the first element added is the first to be removed (FIFO), while in a stack, the last element added is the first to be removed (LIFO).
Queues are used for task scheduling, managing concurrent requests, processing messages in a message queue, handling event listeners, and more. They are vital for scenarios where order and sequence matter.
Craig is a freelance UK web consultant who built his first page for IE2.0 in 1995. Since that time he's been advocating standards, accessibility, and best-practice HTML5 techniques. He's created enterprise specifications, websites and online applications for companies and organisations including the UK Parliament, the European Parliament, the Department of Energy & Climate Change, Microsoft, and more. He's written more than 1,000 articles for SitePoint and you can find him @craigbuckler.