Drupal 8 Queue API – Powerful Manual and Cron Queueing

Share this article

Drupal 8 logo

The Queue API in Drupal allows us to handle a number of tasks at a later stage. What this means is that we can place items into a queue which will run some time in the future and process each individual item at that point and at least once. Usually, this happens on CRON runs, and Drupal 8 allows for a quick set up for cronjob based queues. It doesn’t necessarily have to be CRON, however.

In this article, we will look at using the Queue API in Drupal 8 by exploring two simple examples. The first will see the queue triggered by Cron while the second will allow us to manually do so ourselves. However, the actual processing will be handled by a similar worker. If you want to follow along, clone this git repository where you can find the npq module we will write in this article.

The module we’ll work with is called Node Publisher Queue and it automatically adds newly created nodes that are saved unpublished to a queue to be published later on. We will see how later on can be the next CRON run or a manual action triggered by the site’s administrator. First, let’s understand some basic concepts about queues in Drupal 8.

The theory

There are a few components that make up the Queue API in Drupal 8.

The most important role in this API is played by the QueueInterface implementation which represents the queue. The default queue type Drupal 8 ships with is currently the DatabaseQueue which is a type of reliable queue that makes sure all its items are processed at least once and in their original order (FIFO). This is in contrast to unreliable queues which only do their best to achieve this (something for which valid use cases do exist).

The typical role of the queue object is to create items, later claim them from the queue and delete them when they have been processed. In addition, it can release items if processing is either not finished or another worker needs to process them again before deletion.

The QueueInterface implementation is instantiated with the help of a general QueueFactory. In the case of the DatabaseQueue, the former uses the DatabaseQueueFactory as well. Queues also need to be created before they can be used. However, the DatabaseQueue is already created when Drupal is first installed so no additional setup is required.

The Queue Workers are responsible for processing queue items as they receive them. In Drupal 8 these are QueueWorker plugins that implement the QueueWorkerInterface. Using the QueueWorkerManager, we create instances of these plugins and process the items whenever the queue needs to be run.

The Node Publish Queue module

Now that we’ve covered the basic concepts of the Queue API in Drupal 8, let’s get our hands dirty and create the functionality described in the introduction. Our npq.info.yml file can be simple:

name: Node Publish Queue
description: Demo module illustrating the Queue API in Drupal 8
core: 8.x
type: module

Queue item creation

Inside the npq.module file we take care of the logic for creating queue items whenever a node is saved and not published:

use Drupal\Core\Entity\EntityInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;

/**
 * Implements hook_entity_insert().
 */
function npq_entity_insert(EntityInterface $entity) {
  if ($entity->getEntityTypeId() !== 'node') {
    return;
  }

  if ($entity->isPublished()) {
    return;
  }

  /** @var QueueFactory $queue_factory */
  $queue_factory = \Drupal::service('queue');
  /** @var QueueInterface $queue */
  $queue = $queue_factory->get('cron_node_publisher');
  $item = new \stdClass();
  $item->nid = $entity->id();
  $queue->createItem($item);
}

Inside this basic hook_entity_insert() implementation we do a very simple task. We first retrieve the QueueFactoryInterface object from the service container and use it to get a queue called cron_node_publisher. If we track things down, we notice that the get() method on the DatabaseQueueFactory simply creates a new DatabaseQueue instance with the name we pass to it.

Lastly, we create a small PHP object containing the node ID and create an item in the queue with that data. Simple.

The CRON queue worker

Next, let’s create a QueueWorker plugin that will process the queue items whenever Cron is run. However, because we know that we will also need one for manual processing that does the same thing, we will add most of the logic in a base abstract class. So inside the Plugin/QueueWorker namespace of our module we can have the NodePublishBase class:

/**
 * @file
 * Contains Drupal\npq\Plugin\QueueWorker\NodePublishBase.php
 */

namespace Drupal\npq\Plugin\QueueWorker;

use Drupal\Core\Entity\EntityStorageInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\node\NodeInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;


/**
 * Provides base functionality for the NodePublish Queue Workers.
 */
abstract class NodePublishBase extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The node storage.
   *
   * @var \Drupal\Core\Entity\EntityStorageInterface
   */
  protected $nodeStorage;

  /**
   * Creates a new NodePublishBase object.
   *
   * @param \Drupal\Core\Entity\EntityStorageInterface $node_storage
   *   The node storage.
   */
  public function __construct(EntityStorageInterface $node_storage) {
    $this->nodeStorage = $node_storage;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $container->get('entity.manager')->getStorage('node')
    );
  }

  /**
   * Publishes a node.
   *
   * @param NodeInterface $node
   * @return int
   */
  protected function publishNode($node) {
    $node->setPublished(TRUE);
    return $node->save();
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    /** @var NodeInterface $node */
    $node = $this->nodeStorage->load($data->nid);
    if (!$node->isPublished() && $node instanceof NodeInterface) {
      return $this->publishNode($node);
    }
  }
}

Right off the bat we can see that we are using dependency injection to inject the NodeStorage into our class. For more information about dependency injection and the service container, feel free to check out my article on the topic.

In this base class we have two methods: publishNode() and the obligatory processItem(). The former publishes and saves a node that is passed to it. The latter loads the node using the node ID contained in the $data object and publishes it if it’s unpublished.

Now, let’s create a CronNodePublisher plugin that will use this logic on Cron runs:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes on CRON run.
 *
 * @QueueWorker(
 *   id = "cron_node_publisher",
 *   title = @Translation("Cron Node Publisher"),
 *   cron = {"time" = 10}
 * )
 */
class CronNodePublisher extends NodePublishBase {}

And that is all. We don’t need any other logic than what already is in our base class. Notice that, in the annotation, we are telling Drupal that this worker needs to be used by Cron to process as many items as it can within 10 seconds. How does this happen?

Whenever Cron runs, it uses the QueueWorkerManager to load all its plugin definitions. Then, if any of them have the cron key in their annotation, a Queue with the same name as the ID of the worker is loaded for processing. Lastly, each item in the queue is claimed and processed by the worker until the specified time has elapsed.

If we now save an unpublished node, it will most likely become published at the next Cron run.

The manual worker

Let’s create also the possibility for the Queue to be processed manually. First, let’s adapt the hook_entity_insert() implementation from before and change this line:

$queue = $queue_factory->get('cron_node_publisher');

to this:

$queue = $queue_factory->get('manual_node_publisher');

You can of course provide an admin screen for configuring which type of node publisher the application should use.

Second, let’s create our ManualNodePublisher plugin:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes via a manual action triggered by an admin.
 *
 * @QueueWorker(
 *   id = "manual_node_publisher",
 *   title = @Translation("Manual Node Publisher"),
 * )
 */
class ManualNodePublisher extends NodePublishBase {}

This is almost the same as with the CRON example but without the cron key.

Third, let’s create a form where we can see how many items are in the manual_node_publisher queue and process them all by the press of a button. Inside npq.routing.yml in the module root folder:

demo.form:
  path: '/npq'
  defaults:
    _form: '\Drupal\npq\Form\NodePublisherQueueForm'
    _title: 'Node Publisher'
  requirements:
    _permission: 'administer site configuration'

We define a path at /npq which should use the specified form that lives in that namespace and that we can define as such:

/**
 * @file
 * Contains \Drupal\npq\Form\NodePublisherQueueForm.
 */

namespace Drupal\npq\Form;

use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\SuspendQueueException;
use Symfony\Component\DependencyInjection\ContainerInterface;

class NodePublisherQueueForm extends FormBase {

  /**
   * @var QueueFactory
   */
  protected $queueFactory;

  /**
   * @var QueueWorkerManagerInterface
   */
  protected $queueManager;


  /**
   * {@inheritdoc}
   */
  public function __construct(QueueFactory $queue, QueueWorkerManagerInterface $queue_manager) {
    $this->queueFactory = $queue;
    $this->queueManager = $queue_manager;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('queue'),
      $container->get('plugin.manager.queue_worker')
    );
  }
  
  /**
   * {@inheritdoc}.
   */
  public function getFormId() {
    return 'demo_form';
  }
  
  /**
   * {@inheritdoc}.
   */
  public function buildForm(array $form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('node_publisher');

    $form['help'] = array(
      '#type' => 'markup',
      '#markup' => $this->t('Submitting this form will process the Manual Queue which contains @number items.', array('@number' => $queue->numberOfItems())),
    );
    $form['actions']['#type'] = 'actions';
    $form['actions']['submit'] = array(
      '#type' => 'submit',
      '#value' => $this->t('Process queue'),
      '#button_type' => 'primary',
    );
    
    return $form;
  }
  
  /**
   * {@inheritdoc}
   */
  public function submitForm(array &$form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('manual_node_publisher');
    /** @var QueueWorkerInterface $queue_worker */
    $queue_worker = $this->queueManager->createInstance('manual_node_publisher');

    while($item = $queue->claimItem()) {
      try {
        $queue_worker->processItem($item->data);
        $queue->deleteItem($item);
      }
      catch (SuspendQueueException $e) {
        $queue->releaseItem($item);
        break;
      }
      catch (\Exception $e) {
        watchdog_exception('npq', $e);
      }
    }
  }
}

We are again using dependency injection to inject the QueueFactory and the manager for QueueWorker plugins. Inside buildForm() we are creating a basic form structure and using the numberOfItems() method on the queue to tell the user how many items they are about to process. And finally, inside the submitForm() method we take care of the processing. But how do we do that?

First, we load the Queue and instantiate a Queue worker (in both cases we use the manual_node_publisher id). Then we run a while loop until all the items have been processed. The claimItem() method is responsible for blocking a queue item from being claimed by another queue and returning it for processing. After it gets processed by the worker, we delete it. In the next iteration, the next item is returned and on like this until no items are left.

Although we have not used it, the SuspendQueueException is meant to indicate that during the processing of the item, the worker found a problem that would most likely make all other items in the queue fail as well. And for this reason it is pointless to continue to the next item so we break out of the loop. However, we also release the item so that when we try again later, the item is available. Other exceptions are also caught and logged to the watchdog.

Now if we create a couple of nodes and don’t publish them, we’ll see their count inside the message if we navigate to /npq. By clicking the submit button we process (publish) them all one by one.

This has been a demonstration example only. It’s always important to take into account the potential load of processing a large number of items and either limit that so your request doesn’t time out or use the Batch API to split them into multiple requests.

Conclusion

In this article we’ve looked at the Queue API in Drupal 8. We’ve learned some basic concepts about how it is built and how it works, but we’ve also seen some examples of how we can work with it. Namely, we’ve played with two use cases by which we can publish unpublished nodes either during Cron runs or manually via an action executed by the user.

Have you tried out the Queue API in Drupal 8? Let us know how it went!

Frequently Asked Questions (FAQs) about Drupal 8 Queue API

What is the Drupal 8 Queue API and why is it important?

The Drupal 8 Queue API is a powerful tool that allows developers to handle tasks that might take too long to execute during a single page request. It provides a way to defer these tasks and process them later, either manually or automatically using cron. This is particularly useful for tasks such as sending bulk emails, processing large amounts of data, or performing complex calculations. By using the Queue API, you can ensure that these tasks are completed efficiently and without impacting the user experience.

How do I create a queue in Drupal 8?

Creating a queue in Drupal 8 involves defining a queue worker plugin. This plugin will specify the ID of the queue and the method that will process the items in the queue. You can define a queue worker plugin by creating a new file in your module’s ‘src/Plugin/QueueWorker’ directory. The file should contain a class that extends ‘QueueWorkerBase’ and implements the ‘processItem’ method.

How can I add items to a queue?

You can add items to a queue using the ‘createItem’ method of the queue service. This method takes a single argument, which is the data to be added to the queue. The data can be any PHP variable that can be serialized, such as an array or an object.

How do I process items in a queue?

Items in a queue can be processed manually or automatically using cron. To process items manually, you can use the ‘claimItem’ and ‘deleteItem’ methods of the queue service. To process items automatically, you can implement a cron hook in your module and use the ‘run’ method of the queue service.

What happens if an error occurs while processing a queue item?

If an error occurs while processing a queue item, the item will remain in the queue and will be attempted again the next time the queue is processed. You can also catch exceptions in your queue worker’s ‘processItem’ method and handle them as needed.

Can I prioritize items in a queue?

The Drupal 8 Queue API does not support prioritizing items in a queue. However, you can create multiple queues and process them in the order of their priority.

How can I monitor the status of a queue?

You can monitor the status of a queue using the ‘numberOfItems’ and ‘createQueue’ methods of the queue service. The ‘numberOfItems’ method will return the number of items in the queue, and the ‘createQueue’ method will ensure that the queue exists.

Can I use the Queue API with Drupal 7?

Yes, the Queue API is available in Drupal 7. However, there are some differences in the API between Drupal 7 and Drupal 8. For example, in Drupal 7, you need to implement a hook to define a queue, whereas in Drupal 8, you define a queue using a plugin.

What are some use cases for the Queue API?

The Queue API can be used in a variety of scenarios where tasks need to be deferred and processed later. Some examples include sending bulk emails, processing large amounts of data, performing complex calculations, and integrating with third-party APIs.

Are there any modules that extend the functionality of the Queue API?

Yes, there are several modules that extend the functionality of the Queue API. For example, the Queue UI module provides a user interface for managing queues, and the Advanced Queue module provides additional features such as job tracking and failure handling.

Daniel SiposDaniel Sipos
View Author

Daniel Sipos is a Drupal developer who lives in Brussels, Belgium. He works professionally with Drupal but likes to use other PHP frameworks and technologies as well. He runs webomelette.com, a Drupal blog where he writes articles and tutorials about Drupal development, theming and site building.

BrunoScroncronjobdrupaldrupal 8drupal-planetdrupal8PHPqueuequeues
Share this article
Read Next
Get the freshest news and resources for developers, designers and digital creators in your inbox each week