PHP
Article

Drupal 8 Queue API – Powerful Manual and Cron Queueing

By Daniel Sipos

Drupal 8 logo

The Queue API in Drupal allows us to handle a number of tasks at a later stage. What this means is that we can place items into a queue which will run some time in the future and process each individual item at that point and at least once. Usually, this happens on CRON runs, and Drupal 8 allows for a quick set up for cronjob based queues. It doesn’t necessarily have to be CRON, however.

In this article, we will look at using the Queue API in Drupal 8 by exploring two simple examples. The first will see the queue triggered by Cron while the second will allow us to manually do so ourselves. However, the actual processing will be handled by a similar worker. If you want to follow along, clone this git repository where you can find the npq module we will write in this article.

The module we’ll work with is called Node Publisher Queue and it automatically adds newly created nodes that are saved unpublished to a queue to be published later on. We will see how later on can be the next CRON run or a manual action triggered by the site’s administrator. First, let’s understand some basic concepts about queues in Drupal 8.

The theory

There are a few components that make up the Queue API in Drupal 8.

The most important role in this API is played by the QueueInterface implementation which represents the queue. The default queue type Drupal 8 ships with is currently the DatabaseQueue which is a type of reliable queue that makes sure all its items are processed at least once and in their original order (FIFO). This is in contrast to unreliable queues which only do their best to achieve this (something for which valid use cases do exist).

The typical role of the queue object is to create items, later claim them from the queue and delete them when they have been processed. In addition, it can release items if processing is either not finished or another worker needs to process them again before deletion.

The QueueInterface implementation is instantiated with the help of a general QueueFactory. In the case of the DatabaseQueue, the former uses the DatabaseQueueFactory as well. Queues also need to be created before they can be used. However, the DatabaseQueue is already created when Drupal is first installed so no additional setup is required.

The Queue Workers are responsible for processing queue items as they receive them. In Drupal 8 these are QueueWorker plugins that implement the QueueWorkerInterface. Using the QueueWorkerManager, we create instances of these plugins and process the items whenever the queue needs to be run.

The Node Publish Queue module

Now that we’ve covered the basic concepts of the Queue API in Drupal 8, let’s get our hands dirty and create the functionality described in the introduction. Our npq.info.yml file can be simple:

name: Node Publish Queue
description: Demo module illustrating the Queue API in Drupal 8
core: 8.x
type: module

Queue item creation

Inside the npq.module file we take care of the logic for creating queue items whenever a node is saved and not published:

use Drupal\Core\Entity\EntityInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;

/**
 * Implements hook_entity_insert().
 */
function npq_entity_insert(EntityInterface $entity) {
  if ($entity->getEntityTypeId() !== 'node') {
    return;
  }

  if ($entity->isPublished()) {
    return;
  }

  /** @var QueueFactory $queue_factory */
  $queue_factory = \Drupal::service('queue');
  /** @var QueueInterface $queue */
  $queue = $queue_factory->get('cron_node_publisher');
  $item = new \stdClass();
  $item->nid = $entity->id();
  $queue->createItem($item);
}

Inside this basic hook_entity_insert() implementation we do a very simple task. We first retrieve the QueueFactoryInterface object from the service container and use it to get a queue called cron_node_publisher. If we track things down, we notice that the get() method on the DatabaseQueueFactory simply creates a new DatabaseQueue instance with the name we pass to it.

Lastly, we create a small PHP object containing the node ID and create an item in the queue with that data. Simple.

The CRON queue worker

Next, let’s create a QueueWorker plugin that will process the queue items whenever Cron is run. However, because we know that we will also need one for manual processing that does the same thing, we will add most of the logic in a base abstract class. So inside the Plugin/QueueWorker namespace of our module we can have the NodePublishBase class:

/**
 * @file
 * Contains Drupal\npq\Plugin\QueueWorker\NodePublishBase.php
 */

namespace Drupal\npq\Plugin\QueueWorker;

use Drupal\Core\Entity\EntityStorageInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\node\NodeInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;


/**
 * Provides base functionality for the NodePublish Queue Workers.
 */
abstract class NodePublishBase extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The node storage.
   *
   * @var \Drupal\Core\Entity\EntityStorageInterface
   */
  protected $nodeStorage;

  /**
   * Creates a new NodePublishBase object.
   *
   * @param \Drupal\Core\Entity\EntityStorageInterface $node_storage
   *   The node storage.
   */
  public function __construct(EntityStorageInterface $node_storage) {
    $this->nodeStorage = $node_storage;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $container->get('entity.manager')->getStorage('node')
    );
  }

  /**
   * Publishes a node.
   *
   * @param NodeInterface $node
   * @return int
   */
  protected function publishNode($node) {
    $node->setPublished(TRUE);
    return $node->save();
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    /** @var NodeInterface $node */
    $node = $this->nodeStorage->load($data->nid);
    if (!$node->isPublished() && $node instanceof NodeInterface) {
      return $this->publishNode($node);
    }
  }
}

Right off the bat we can see that we are using dependency injection to inject the NodeStorage into our class. For more information about dependency injection and the service container, feel free to check out my article on the topic.

In this base class we have two methods: publishNode() and the obligatory processItem(). The former publishes and saves a node that is passed to it. The latter loads the node using the node ID contained in the $data object and publishes it if it’s unpublished.

Now, let’s create a CronNodePublisher plugin that will use this logic on Cron runs:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes on CRON run.
 *
 * @QueueWorker(
 *   id = "cron_node_publisher",
 *   title = @Translation("Cron Node Publisher"),
 *   cron = {"time" = 10}
 * )
 */
class CronNodePublisher extends NodePublishBase {}

And that is all. We don’t need any other logic than what already is in our base class. Notice that, in the annotation, we are telling Drupal that this worker needs to be used by Cron to process as many items as it can within 10 seconds. How does this happen?

Whenever Cron runs, it uses the QueueWorkerManager to load all its plugin definitions. Then, if any of them have the cron key in their annotation, a Queue with the same name as the ID of the worker is loaded for processing. Lastly, each item in the queue is claimed and processed by the worker until the specified time has elapsed.

If we now save an unpublished node, it will most likely become published at the next Cron run.

The manual worker

Let’s create also the possibility for the Queue to be processed manually. First, let’s adapt the hook_entity_insert() implementation from before and change this line:

$queue = $queue_factory->get('cron_node_publisher');

to this:

$queue = $queue_factory->get('manual_node_publisher');

You can of course provide an admin screen for configuring which type of node publisher the application should use.

Second, let’s create our ManualNodePublisher plugin:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes via a manual action triggered by an admin.
 *
 * @QueueWorker(
 *   id = "manual_node_publisher",
 *   title = @Translation("Manual Node Publisher"),
 * )
 */
class ManualNodePublisher extends NodePublishBase {}

This is almost the same as with the CRON example but without the cron key.

Third, let’s create a form where we can see how many items are in the manual_node_publisher queue and process them all by the press of a button. Inside npq.routing.yml in the module root folder:

demo.form:
  path: '/npq'
  defaults:
    _form: '\Drupal\npq\Form\NodePublisherQueueForm'
    _title: 'Node Publisher'
  requirements:
    _permission: 'administer site configuration'

We define a path at /npq which should use the specified form that lives in that namespace and that we can define as such:

/**
 * @file
 * Contains \Drupal\npq\Form\NodePublisherQueueForm.
 */

namespace Drupal\npq\Form;

use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\SuspendQueueException;
use Symfony\Component\DependencyInjection\ContainerInterface;

class NodePublisherQueueForm extends FormBase {

  /**
   * @var QueueFactory
   */
  protected $queueFactory;

  /**
   * @var QueueWorkerManagerInterface
   */
  protected $queueManager;


  /**
   * {@inheritdoc}
   */
  public function __construct(QueueFactory $queue, QueueWorkerManagerInterface $queue_manager) {
    $this->queueFactory = $queue;
    $this->queueManager = $queue_manager;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('queue'),
      $container->get('plugin.manager.queue_worker')
    );
  }
  
  /**
   * {@inheritdoc}.
   */
  public function getFormId() {
    return 'demo_form';
  }
  
  /**
   * {@inheritdoc}.
   */
  public function buildForm(array $form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('node_publisher');

    $form['help'] = array(
      '#type' => 'markup',
      '#markup' => $this->t('Submitting this form will process the Manual Queue which contains @number items.', array('@number' => $queue->numberOfItems())),
    );
    $form['actions']['#type'] = 'actions';
    $form['actions']['submit'] = array(
      '#type' => 'submit',
      '#value' => $this->t('Process queue'),
      '#button_type' => 'primary',
    );
    
    return $form;
  }
  
  /**
   * {@inheritdoc}
   */
  public function submitForm(array &$form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('manual_node_publisher');
    /** @var QueueWorkerInterface $queue_worker */
    $queue_worker = $this->queueManager->createInstance('manual_node_publisher');

    while($item = $queue->claimItem()) {
      try {
        $queue_worker->processItem($item->data);
        $queue->deleteItem($item);
      }
      catch (SuspendQueueException $e) {
        $queue->releaseItem($item);
        break;
      }
      catch (\Exception $e) {
        watchdog_exception('npq', $e);
      }
    }
  }
}

We are again using dependency injection to inject the QueueFactory and the manager for QueueWorker plugins. Inside buildForm() we are creating a basic form structure and using the numberOfItems() method on the queue to tell the user how many items they are about to process. And finally, inside the submitForm() method we take care of the processing. But how do we do that?

First, we load the Queue and instantiate a Queue worker (in both cases we use the manual_node_publisher id). Then we run a while loop until all the items have been processed. The claimItem() method is responsible for blocking a queue item from being claimed by another queue and returning it for processing. After it gets processed by the worker, we delete it. In the next iteration, the next item is returned and on like this until no items are left.

Although we have not used it, the SuspendQueueException is meant to indicate that during the processing of the item, the worker found a problem that would most likely make all other items in the queue fail as well. And for this reason it is pointless to continue to the next item so we break out of the loop. However, we also release the item so that when we try again later, the item is available. Other exceptions are also caught and logged to the watchdog.

Now if we create a couple of nodes and don’t publish them, we’ll see their count inside the message if we navigate to /npq. By clicking the submit button we process (publish) them all one by one.

This has been a demonstration example only. It’s always important to take into account the potential load of processing a large number of items and either limit that so your request doesn’t time out or use the Batch API to split them into multiple requests.

Conclusion

In this article we’ve looked at the Queue API in Drupal 8. We’ve learned some basic concepts about how it is built and how it works, but we’ve also seen some examples of how we can work with it. Namely, we’ve played with two use cases by which we can publish unpublished nodes either during Cron runs or manually via an action executed by the user.

Have you tried out the Queue API in Drupal 8? Let us know how it went!

Free Guide:

7 Habits of Successful CTOs

"What makes a great CTO?" Engineering skills? Business savvy? An innate tendency to channel a mythical creature (ahem, unicorn)? All of the above? Discover the top traits of the most successful CTOs in this free guide.

  • http://cashpath20.com SherylJHaddock

    .❝my neighbor’s mother is making $98 HOURLY on the
    internet❞….

    A few days ago new McLaren F1 subsequent after earning 18,512$,,,this was my
    previous month’s paycheck ,and-a little over, $17k Last month ..3-5 h/r of work a day ..with extra
    open doors & weekly paychecks.. it’s realy the
    easiest work I have ever Do.. I Joined This 7 months ago and now making over
    $87, p/h.

    Learn More right Here….website on my PrroFile

    +rrrrrrrrrrrr

  • Matthew Oliveira

    Great article and much needed. I poked around in examples module and helped clean up the queue example, and it’s not very well documented. Kudos!

    In your last code example, inside the buildForm method, should this line:

    $queue = $this->queueFactory->get(‘node_publisher’);

    be:

    $queue = $this->queueFactory->get(‘manual_node_publisher’);

    • ARUN AK

      In D8 batch process replaced with any other new term(queue)? Can you provide any example link for batch process implementation in D8?

  • http://www.cygnet-infotech.com/ Hemang Rindani

    Drupal is a great enterprise content management system for developing sophisticated, flexible and robust websites and powerful applications. Drupal is developed and maintained by thousands of developers and contributors from across the globe making it a great CMS. Flexibility of the platform allows a CMS developer to modify modules and themes to match the business requirements. The latest version 8 has some significant changes mainly focused towards providing a better User Experience. One such feature is the inclusion of CKEditor that provides functionalities like word document to edit content. Another notable change is the addition of responsive image manager that scales an image to work well with any device. Cron Queue in Drupal 8 allows a developer to prioritize the processes. It helps in making the queue of the processes that will run sometime in future but are essential part of a business.

  • Regina

    Thank You very much for this exampel.
    I tried it with a few unpublished nodes and the Module did it’s job wunderfull.
    But no suddently it doesn’t goes in the function npq_entity_insert in the npq.module file.
    I dont know why. There are the unpublsihed nodes and i did not change anything in the module.
    Any Idea what goes wrong?

    • Regina

      Sorry, a stupid question. The Hook will save the Item in Queue when saving a new node.
      Ok…I have to find an other hook for my Module.

Recommended
Sponsors
Because We Like You
Free Ebooks!

Grab SitePoint's top 10 web dev and design ebooks, completely free!

Get the latest in PHP, once a week, for free.