PHP - - By Thomas Punt

Parallel Programming with Pthreads in PHP – the Fundamentals

This article was peer reviewed by Christopher Pitt. Thanks to all of SitePoint’s peer reviewers for making SitePoint content the best it can be!


PHP developers seem to rarely utilise parallelism. The appeal of the simplicity of synchronous, single-threaded programming certainly is high, but sometimes the usage of a little concurrency can bring some worthwhile performance improvements.

In this article, we will be taking a look at how threading can be achieved in PHP with the pthreads extension. This will require a ZTS (Zend Thread Safety) version of PHP 7.x installed, along with the pthreads v3 installed. (At the time of writing, PHP 7.1 users will need to install from the master branch of the pthreads repo – see this article’s section for details on building third-party extensions from source.)

Just as a quick clarification: pthreads v2 targets PHP 5.x and is no longer supported; pthreads v3 targets PHP 7.x and is being actively developed.

Parallel execution abstract image

A big thank you to Joe Watkins (creator of the pthreads extension) for proofreading and helping to improve my article!

When not to use pthreads

Before we move on, I would first like to clarify when you should not (as well as cannot) use the pthreads extension.

In pthreads v2, the recommendation was that pthreads should not be used in a web server environment (i.e. in an FCGI process). As of pthreads v3, this recommendation has been enforced, so now you simply cannot use it in a web server environment. The two prominent reasons for this are:

  1. It is not safe to use multiple threads in such an environment (causing IO issues, amongst other problems).
  2. It does not scale well. For example, let’s say you have a PHP script that creates a new thread to handle some work, and that script is executed upon each request. This means that for each request, your application will create one new thread (this is a 1:1 threading model – one thread to one request). If your application is serving 1,000 requests per second, then it is creating 1,000 threads per second! Having this many threads running on a single machine will quickly inundate it, and the problem will only be exacerbated as the request rate increases.

That’s why threading is not a good solution in such an environment. If you’re looking for threading as a solution to IO-blocking tasks (such as performing HTTP requests), then let me point you in the direction of asynchronous programming, which can be achieved via frameworks such as Amp. SitePoint has released some excellent articles that cover this topic (such as writing asynchronous libraries and Modding Minecraft in PHP), in case you’re interested.

With that out of the way, let’s jump straight into things!

Handling one-off tasks

Sometimes, you will want to handle one-off tasks in a multi-threaded way (such as performing some IO-bound task). In such instances, the Thread class may be used to create a new thread and run some unit of work in that separate thread.

For example:

$task = new class extends Thread {
    private $response;

    public function run()
    {
        $content = file_get_contents("http://google.com");
        preg_match("~<title>(.+)</title>~", $content, $matches);
        $this->response = $matches[1];
    }
};

$task->start() && $task->join();

var_dump($task->response); // string(6) "Google"

In the above, the run method is our unit of work that will be executed inside of the new thread. When invoking Thread::start, the new thread is spawned and the run method is invoked. We then join the spawned thread back to the main thread (via Thread::join), which will block until the separate thread has finished executing. This ensures that the task has finished executing before we attempt to output the result (stored in $task->response).

It may not be desirable to pollute a class’s responsibility with thread-related logic (including having to define a run method). We are able to segregate such classes by having them extend the Threaded class instead, where they can then be run inside other threads:

class Task extends Threaded
{
    public $response;

    public function someWork()
    {
        $content = file_get_contents('http://google.com');
        preg_match('~<title>(.+)</title>~', $content, $matches);
        $this->response = $matches[1];
    }
}

$task = new Task;

$thread = new class($task) extends Thread {
    private $task;

    public function __construct(Threaded $task)
    {
        $this->task = $task;
    }

    public function run()
    {
        $this->task->someWork();
    }
};

$thread->start() && $thread->join();

var_dump($task->response);

Any class that needs to be run inside of a separate thread must extend the Threaded class in some way. This is because it provides the necessary abilities to run inside different threads, as well as providing implicit safety and useful interfaces (for things like resource synchronization).

Let’s take a quick look at the hierarchy of classes exposed by pthreads:

Threaded (implements Traversable, Collectable)
    Thread
        Worker
    Volatile
Pool

We’ve already seen and learnt the basics about the Thread and Threaded classes, so now let’s take a look at the remaining three (Worker, Volatile, and Pool).

Recycling threads

Spinning up a new thread for every task to be parallelised is expensive. This is because a shared-nothing architecture must be employed by pthreads in order to achieve threading inside PHP. What this means is that the entire execution context of the current instance of PHP’s interpreter (including every class, interface, trait, and function) must be copied for each thread created. Since this incurs a noticeable performance impact, a thread should always be reused when possible. Threads may be reused in two ways: with Workers or with Pools.

The Worker class is used to execute a series of tasks synchronously inside of another thread. This is done by creating a new Worker instance (which creates a new thread), and then stacking the tasks onto that separate thread (via Worker::stack).

Here’s a quick example:

class Task extends Threaded
{
    private $value;

    public function __construct(int $i)
    {
        $this->value = $i;
    }

    public function run()
    {
        usleep(250000);
        echo "Task: {$this->value}\n";
    }
}

$worker = new Worker();
$worker->start();

for ($i = 0; $i < 15; ++$i) {
    $worker->stack(new Task($i));
}

while ($worker->collect());

$worker->shutdown();

Output:

Pool output

The above stacks 15 tasks onto the new $worker object via Worker::stack, and then processes them in the stacked order. The Worker::collect method, as seen above, is used to clean up the tasks once they have finished executing. By using it inside of a while loop, we block the main thread until all stacked tasks have finished executing and have been cleaned up before we trigger Worker::shutdown. Shutting down the worker prematurely (i.e. whilst there are still tasks to be executed) will still block the main thread until all tasks have finished executing – the tasks will simply not be garbage collected (causing memory leaks).

The Worker class provides a few other methods pertaining to its task stack, including Worker::unstack to remove the oldest stacked item, and Worker::getStacked for the number of items on the execution stack. The worker’s stack only holds the tasks that are to be executed. Once a task in the stack has been executed, it is removed and then placed on a separate (internal) stack to be garbage collected (using Worker::collect).

Another way to reuse a thread when executing many tasks is to use a thread pool (via the Pool class). Thread pools are powered by a group of Workers to enable for tasks to be executed concurrently, where the concurrency factor (the number of threads the pool runs on) is specified upon pool creation.

Let’s adapt the above example to use a pool of workers instead:

class Task extends Threaded
{
    private $value;

    public function __construct(int $i)
    {
        $this->value = $i;
    }

    public function run()
    {
        usleep(250000);
        echo "Task: {$this->value}\n";
    }
}

$pool = new Pool(4);

for ($i = 0; $i < 15; ++$i) {
    $pool->submit(new Task($i));
}

while ($pool->collect());

$pool->shutdown();

Output:

Pool output

There are a few notable differences between using a pool as opposed to a worker. Firstly, pools do not need to be manually started, they begin executing tasks as soon as they become available. Secondly, we submit tasks to the pool, rather than stack them. Also, the Pool class does not extend Threaded, and so it may not be passed around to other threads (unlike Worker).

As a matter of good practice, workers and pools should always have their tasks collected once finished, and be manually shut down. Threads created via the Thread class should also be joined back to the creator thread.

pthreads and (im)mutability

The final class to cover is Volatile – a new addition to pthreads v3. Immutability has become an important concept in pthreads, since without it, performance is severely degraded. Therefore, by default, the properties of Threaded classes that are themselves Threaded objects are now immutable, and so they cannot be reassigned after initial assignment. Explicit mutability for such properties is now favoured, and can still be done by using the new Volatile class.

Let’s take a quick look at an example to demonstrate the new immutability constraints:

class Task extends Threaded // a Threaded class
{
    public function __construct()
    {
        $this->data = new Threaded();
        // $this->data is not overwritable, since it is a Threaded property of a Threaded class
    }
}

$task = new class(new Task()) extends Thread { // a Threaded class, since Thread extends Threaded
    public function __construct($tm)
    {
        $this->threadedMember = $tm;
        var_dump($this->threadedMember->data); // object(Threaded)#3 (0) {}
        $this->threadedMember = new StdClass(); // invalid, since the property is a Threaded member of a Threaded class
    }
};

Threaded properties of Volatile classes, on the other hand, are mutable:

class Task extends Volatile
{
    public function __construct()
    {
        $this->data = new Threaded();
        $this->data = new StdClass(); // valid, since we are in a volatile class
    }
}

$task = new class(new Task()) extends Thread {
    public function __construct($vm)
    {
        $this->volatileMember = $vm;

        var_dump($this->volatileMember->data); // object(stdClass)#4 (0) {}

        // still invalid, since Volatile extends Threaded, so the property is still a Threaded member of a Threaded class
        $this->volatileMember = new StdClass();
    }
};

We can see that the Volatile class overrides the immutability enforced by its parent Threaded class to enable for Threaded properties to be reassignable (as well as unset()).

There’s just one last fundamental topic to cover with respect to mutability and the Volatile class – arrays. Arrays in pthreads are automatically coerced to Volatile objects when assigned to the property of a Threaded class. This is because it simply isn’t safe to manipulate an array from multiple contexts in PHP.

Let’s again take a quick look at an example to better understand things:

$array = [1,2,3];

$task = new class($array) extends Thread {
    private $data;

    public function __construct(array $array)
    {
        $this->data = $array;
    }

    public function run()
    {
        $this->data[3] = 4;
        $this->data[] = 5;

        print_r($this->data);
    }
};

$task->start() && $task->join();

/* Output:
Volatile Object
(
    [0] => 1
    [1] => 2
    [2] => 3
    [3] => 4
    [4] => 5
)
*/

We can see that Volatile objects can be treated as if they were arrays, since they provide support for the array-based operations (as shown above) with the subset operator ([]). Volatile classes are not, however, supported by the common array-based functions, such as array_pop and array_shift. Instead, the Threaded class provides us with such operations as built-in methods.

As a demonstration:

$data = new class extends Volatile {
    public $a = 1;
    public $b = 2;
    public $c = 3;
};

var_dump($data);
var_dump($data->pop());
var_dump($data->shift());
var_dump($data);

/* Output:
object(class@anonymous)#1 (3) {
  ["a"]=> int(1)
  ["b"]=> int(2)
  ["c"]=> int(3)
}
int(3)
int(1)
object(class@anonymous)#1 (1) {
  ["b"]=> int(2)
}
*/

Other supported operations include Threaded::chunk and Threaded::merge.

Synchronization

The final topic we will be covering in this article is synchronization in pthreads. Synchronization is a technique for enabling controlled access to shared resources.

For example, let’s implement a naive counter:

$counter = new class extends Thread {
    public $i = 0;

    public function run()
    {
        for ($i = 0; $i < 10; ++$i) {
            ++$this->i;
        }
    }
};

$counter->start();

for ($i = 0; $i < 10; ++$i) {
    ++$counter->i;
}

$counter->join();

var_dump($counter->i); // outputs a number from between 10 and 20

Without using synchronization, the output isn’t deterministic. Multiple threads writing to a single variable without controlled access has caused updates to be lost.

Let’s rectify this by adding synchronization so that we receive the correct output of 20:

$counter = new class extends Thread {
    public $i = 0;

    public function run()
    {
        $this->synchronized(function () {
            for ($i = 0; $i < 10; ++$i) {
                ++$this->i;
            }
        });
    }
};

$counter->start();

$counter->synchronized(function ($counter) {
    for ($i = 0; $i < 10; ++$i) {
        ++$counter->i;
    }
}, $counter);

$counter->join();

var_dump($counter->i); // int(20)

Synchronized blocks of code can also cooperate with one-another using Threaded::wait and Threaded::notify (along with Threaded::notifyOne).

Here’s a staggered increment from two synchronized while loops:

$counter = new class extends Thread {
    public $cond = 1;

    public function run()
    {
        $this->synchronized(function () {
            for ($i = 0; $i < 10; ++$i) {
                var_dump($i);
                $this->notify();

                if ($this->cond === 1) {
                    $this->cond = 2;
                    $this->wait();
                }
            }
        });
    }
};

$counter->start();

$counter->synchronized(function ($counter) {
    if ($counter->cond !== 2) {
        $counter->wait(); // wait for the other to start first
    }

    for ($i = 10; $i < 20; ++$i) {
        var_dump($i);
        $counter->notify();

        if ($counter->cond === 2) {
            $counter->cond = 1;
            $counter->wait();
        }
    }
}, $counter);

$counter->join();

/* Output:
int(0)
int(10)
int(1)
int(11)
int(2)
int(12)
int(3)
int(13)
int(4)
int(14)
int(5)
int(15)
int(6)
int(16)
int(7)
int(17)
int(8)
int(18)
int(9)
int(19)
*/

You may have noticed the additional conditions that have been placed around the invocations to Threaded::wait. These conditions are crucial because they only allow a synchronized callback to resume when it has received a notification and the specified condition is true. This is important because notifications may come from places other than calls to Threaded::notify. Thus, if the calls to Threaded::wait were not enclosed within conditions, we would be open to spurious wakeup calls, which will lead to unpredictable code.

Conclusion

We have seen the five classes pthreads packs with it (Threaded, Thread, Worker, Volatile, and Pool), including covering when each of the classes are used. We have also looked at the new immutability concept in pthreads, as well as having a quick tour of the synchronization feature it supports. With these fundamentals covered, we can now begin to look into applying pthreads to some real world use-cases! That will be the topic of our next post.

In the meanwhile, if you have some application ideas regarding pthreads, don’t hesitate to drop them below into the comments area!

Sponsors