Scheduling with Zend Job Queue

Web applications usually follow a synchronous communication model. However, non-interactive and long-running tasks (such as report generation) are better suited for asynchronous execution. One way to off-load tasks to run at a later time, or even on a different server, is use the Job Queue module available as a part of Zend Server 5 (though not as part of the Community Edition).

Job Queue allows job scheduling based on time, priority, and even dependencies. Jobs can be deferred or executed periodically and – the best part – run in parallel! On top of that, Zend Server itself provides a management GUI to track the execution of jobs including their status, execution time, and output.

The main advantage of the Job Queue module is its ability to execute tasks in parallel. Unlike cron jobs, Job Queue allows:

  • Running tasks now without waiting for them to finish (asynchronous execution)
  • Running tasks once but not right now (deferred jobs)
  • Running tasks periodically (recurring jobs like cron, but with full control over them from through the PHP API – start, stop, suspend, resume)
  • Ability to query job status, handle failures, and re-queue via the API as well as keep track of past, current, and pending jobs from the GUI.

Some examples of asynchronous tasks where Job Queue can be helpful are:

  • Preparing data for the next request (pre-calculating)
  • Pre-caching data
  • Generating periodical reports
  • Sending e-mails
  • Cleaning temporary data or files
  • Communicating with external systems
  • Background data synchronization with mobile devices

Job Queue Usage

Job Queue’s API is made available through the ZendJobQueue class. To perform most tasks you will connect to a Job Queue server by instantiating a ZendJobQueue object and create a job using the createHttpJob() method.

<?php
$queue = new ZendJobQueue();
$queue->createHttpJob("http://example.com/jobs/somejob.php");

Passing a path to createHttpJob() instead of a full URL will create a job with the value of $_SERVER["HTTP_HOST"] as the host name. Watch out for instances when $_SERVER["HTTP_HOST"] is not available, such as when the job is scheduled from a cron script.

<?php
// both calls are equivalent
$queue->createHttpJob("/jobs/somejob.php");
$queue->createHttpJob("http://" . $_SERVER["HTTP_HOST"] . "/jobs/somejob.php");

Job parameters can be passed either as a part of the query string or in an array as the second argument of createHttpJob(). If parameters are passed as the second argument, the array must be JSON compatible.

To access the parameters, the getCurrentJobParams() static method can be used inside the job code.

<?php
$params = ZendJobQueue::getCurrentJobParams();

Additional job options are available via a third argument to createHttpJob(). It is an associative array with the following keys:

  • name – an optional job name
  • priority – the job priority as defined by the corresponding constants PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH, and PRIORITY_URGENT
  • persistent – a Boolean value whether to keep the job in history forever
  • predecessor – an integer predecessor job ID
  • http_headers – additional HTTP headers
  • schedule – cron-like scheduling command
  • schedule_time – time when the job should be executed (but it may actually run after that time depending on Job Queue’s load)

For example, creating a deferred or recurring job would look like the examples below:

<?php
$params = array("p1" => 10, "p2" => "somevalue");

// process in one hour
$options = array("schedule_time" => date("Y-m-d H:i:s", strtotime("+1 hour")));
$queue->createHttpJob("http://example.com/jobs/somejob.php", $params, $options);

// process every other day at 1:05 am
$options = array("schedule" => "5 1 */2 * *");
$queue->createHttpJob("http://example.com/jobs/somejob.php", $params, $options);

Failures (and successes) can be handled in the following manner:

<?php
try {
    doSomething();
    ZendJobQueue::setCurrentJobStatus(ZendJobQueue::OK);
}
catch (Exception $e) {
    ZendJobQueue::setCurrentJobStatus(ZendJobQueue::STATUS_LOGICALLY_FAILED, $e->getMessage());
}

An Extended Example

Let’s say your web application has to generate and e-mail a set of reports upon a user’s request.

Typically, given the fact that PHP does not support multiprocessing and synchronous communication model is used, the user will have to wait until all of the requested reports are generated one by one and e-mailed.

Using Job Queue in this case will not only allow the user to perform other actions with the application (since the work will be done asynchronously) but also the application can process multiple reports at the same time (since jobs can be executed in parallel) – so most of the reports (if not all) will finish at about the same time.

<?php
function scheduleReport($reportList, $recipient) {
    // list of scheduled jobs
    $jobList = array();

    $queue = new ZendJobQueue();

    // check that Job Queue is running
    if ($queue->isJobQueueDaemonRunning() && count($reportList) > 0) {
    foreach ($reportList as $report) {
        $params = array("type" => $report["type"],
                        "start" => $report["start"],
                        "length" => $report["length"],
                        "recipient" => $recipient);
        $options = array("priority" => $report["priority"]);

        // execute the job in two minutes unless the priority is urgent
        if ($report["priority"] != ZendJobQueue::PRIORITY_URGENT) {
            $options["schedule_time"] = date("Y-m-d H:i:s", strtotime("+2 minutes"));
        }

        $jobID = $queue->createHttpJob("http://example.com/jobs/report.php", $params, $options);

        // add job ID to the list of successfully scheduled jobs
        if ($jobID !== false) {
           $jobList[] = $jobID;
       }
    }

    return $jobList;
}

The scheduleReport() function returns the list of job identifiers associated with each scheduled report. Within this function, the isJobQueueDaemonRunning() method of ZendJobQueue class verifies that the appropriate service is running and jobs can be scheduled.

Depending on the report’s priority, the job can be scheduled to run immediately or two minutes later (in an effort to reduce the load on the server if many reports are requested at the same time). Once a job is scheduled, its ID is saved to the list of all successfully created jobs. It’s important to know a job’s ID in order to be able to monitor the job or even cancel it.

There’s what the call to scheduleReport() function looks like:

<?php
// setup request for a daily sales report and monthly financial report
$reportList = array(
    array("type" => "sales",
          "start" => "2011-12-09 00:00:00",
          "length" => 1,
          "priority" => ZendJobQueue::PRIORITY_URGENT),
    array("type" => "finance",
          "start" => "2011-11-01 00:00:00",
          "length" => 30,
          "priority" => ZendJobQueue::PRIORITY_NORMAL));

// schedule reports
$jobList = scheduleReport($reportList, "user@example.com");

// verify that reports were scheduled
if (empty($jobList)) {
    // show error message
}

As mentioned earlier, it is also possible cancel a scheduled job. Once the job is in progress though it will be finished. Thus, if the priority of the request is not urgent, the user has two minutes to cancel the delivery of the scheduled report.

<?php
function cancelReport($jobID) {
    $queue = new ZendJobQueue();
    return $queue->removeJob($jobID);
}

if ($jobID !== false && cancelReport($jobID)) {
    // the job was successfully removed from the queue
}

The cancelReport() function simply removes the job from the queue of scheduled reports which have not yet started to run.

The job script then looks like this:

<?php
function runReport() {
    $params = ZendJobQueue::GetParamList();

    try {
        $report = prepareReport($params["type"], $params["start"], $params["length"]);
        sendReport($params["recipient"], $report);
        ZendJobQueue::setCurrentJobStatus(ZendJobQueue::OK);
    }
    catch (Exception $e) {
        ZendJobQueue::setCurrentJobStatus(ZendJobQueue::STATUS_LOGICALLY_FAILED, $e->getMessage());
    }
}

The runReport() function finally prepares and sends the report based on provided parameters. After completion, the job status is set as successful (or logically failed if there was an error).

Alternatives

Of course there are alternatives to Job Queue. Solutions like cron, pcntl_fork, or even something Java based via PHP/Java Bridge may or may not be worth looking into depending on your need. More interesting tools also exist, such as Gearman, node.js, and RabbitMQ.

Summary

While Zend Server’s Job Queue is not the only way to handle queues and parallel processing in PHP, it is an extremely straight-forward solution backed by “The PHP Company” and is very easy to start using. And with the growing success of Zend’s PHPCloud adoption of Job Queue should become even wide-spread.

If you want to view the example code from this article in its entirety, you can find it on GitHub.

Image via Varina and Jay Patel/Shutterstock

Free book: Jump Start HTML5 Basics

Grab a free copy of one our latest ebooks! Packed with hints and tips on HTML5's most powerful new features.