PHP
Article

How to use RabbitMQ with PHP

By Miguel Ibarra Romero

Message Queues in PHP with RabbitMQ

AMQP (Advanced Message Queueing Protocol) is a network protocol that can deliver messages from one application endpoint to another application endpoint. It does not care about the platform or language of said applications, as long as they support AMQP.

Essentially, one of the application endpoints sends a message, thus being a Producer, through an AMQP broker. In turn the broker will deliver the message to another application endpoint, called the Consumer.

RabbitMQ is an AMQP broker that has support for several programming languages, such as PHP.

The advantage of having a message broker such as RabbitMQ, and AMQP being a network protocol, is that the producer, the broker, and the consumer can live on different physical/virtual servers on different geographic locations.

Also, since networks are unreliable, and applications might fail to process a message completely, AMQP supports message acknowledgements, either automatically or when an application endpoint decides to send them.

RabbitMQ implements the AMQP 0-9-1 protocol. At a glance, it follows the following model.

Hello world example routing

Glossary

Concept Definition Icon
Producer Application endpoint that sends messages Producer icon
Consumer Application endpoint that receives messages Consumer icon
Connection Handles protocol, errors, authentication, etc… The connection is done using TCP protocol.
Channel Connections are multiplexed through channels. Even though all channels share the same tcp connection, communication from one channel is completely independent of another.
Exchange Receives messages from producers and pushes them to queues. Depending on the situation, this can be transparent to the developer. Exchange representation
Queue Buffer that stores messages Queue icon
Message Piece of arbitrary information conforming to the AMQP format that is sent from the producer to a consumer through the broker. The broker cannot modify the information inside the message.
Acknowledgements Notice sent back from the consumer to tell the server that a message has been received and processed, so the server can delete it from the queue.

Another advantage of AMQP 0-9-1 is that the application defines the routing logic instead of a broker administrator. This gives the developer a lot of flexibility, without the need to learn a new programming/scripting/markup language.

You can learn more about AMQP and RabbitMQ at the “AMQP 0-9-1 Model Explained” guide. Although not necessary for this tutorial, I encourage you to read it completely.

Installing RabbitMQ

You can find detailed documentation for your platform here, but roughly for Ubuntu it goes as follows:

sudo vim /etc/apt/sources.list

Add the following line to the file: deb http://www.rabbitmq.com/debian/ testing main

sources.list

After this, just run the following commands

wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo apt-key add rabbitmq-signing-key-public.asc
sudo apt-get update    
sudo apt-get install rabbitmq-server
sudo rabbitmqctl status

You should see something like the following:

sudo rabbitmqctl status

Status of node 'rabbit@localhost-sprabbitmq-926807' ...
[{pid,790},
 {running_applications,[{rabbit,"RabbitMQ","3.3.4"},
                        {os_mon,"CPO  CXC 138 46","2.2.14"},
                        {mnesia,"MNESIA  CXC 138 12","4.11"},
                        {xmerl,"XML parser","1.3.5"},
                        {sasl,"SASL  CXC 138 11","2.3.4"},
                        {stdlib,"ERTS  CXC 138 10","1.19.4"},
                        {kernel,"ERTS  CXC 138 10","2.16.4"}]},
 {os,{unix,linux}},
 {erlang_version,"Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [kernel-poll:true]\n"},
 {memory,[{total,53400096},
          {connection_procs,2704},
          {queue_procs,34432},
          {plugins,0},
          {other_proc,13983664},
          {mnesia,64504},
          {mgmt_db,0},
          {msg_index,26056},
          {other_ets,770880},
          {binary,16112},
          {code,16372077},
          {atom,594537},
          {other_system,21535130}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,12677506662},
 {disk_free_limit,50000000},
 {disk_free,899366912},
 {file_descriptors,[{total_limit,199900},
                    {total_used,5},
                    {sockets_limit,179908},
                    {sockets_used,1}]},
 {processes,[{limit,1048576},{used,133}]},
 {run_queue,0},
 {uptime,34}]
...done.

If you get an error message, the server probably needs to be started with the following command

sudo invoke-rc.d rabbitmq-server start

 * Starting message broker rabbitmq-server
   ...done.

RabbitMQ defines a default user:

  • Username: guest
  • Password: guest

Be aware that this user will only be able to be used if connecting to RabbitMQ from localhost. For a true distributed system, you will have to define users and roles. You can read more on Access Control in the documentation. For the following examples we will be using the above credentials.

In order to be able to integrate your php application with RabbitMQ, you’ll need the php-amqplib library. Getting it is easy with composer, just define the requirement inside your composer.json file:

{
    ...
    "require": {
        ...,
        "videlalvaro/php-amqplib": "2.2.*"
    }
    ...
}

After a composer update execution you’ll be all set.

Disclaimer

By no means should this code ever be used in production ready applications nor be executed on production servers. No security checks and/or validations were enforced. This code was written for educational purposes only, having the scope to showcase basic functionality only. Performance, efficiency, or reusability were not a priority.

Please note that even though the following examples uses the same host for the producer, broker, and consumer applications to ease development, deployment, and testing, in real life it would make no sense to have a “distributed system” in the same box.

For full source code listing, you can check this GitHub repository, containing the application used in the following examples.

Simple example: send request to process data asynchronously

Suppose we have a pizza company, and we receive online orders. Let’s also suppose we have an automated system that processes orders, but this system cannot be exposed directly to the public…

enter image description here

We will implement the simplest of patterns:

enter image description here

First of all, we have the following script to accept requests from the form:

<?php
chdir(dirname(__DIR__));
require_once('vendor/autoload.php');

use Acme\AmqpWrapper\SimpleSender;

$theName = filter_input(INPUT_POST, 'theName', FILTER_SANITIZE_STRING);
$simpleSender = new SimpleSender();
$simpleSender->execute($theName);
header("Location: orderReceived.html");

This code will simply check for a POST parameter named theName and send it to an object we created for processing it. Let’s take a look at the SimpleSender::execute() method:

<?php

// ... SOME CODE HERE ...

    /**
     * Sends a message to the pizzaTime queue.
     * 
     * @param string $message
     */
    public function execute($message)
    {
        /**
         * Create a connection to RabbitAMQP
         */
        $connection = new AMQPConnection(
            'localhost',    #host - host name where the RabbitMQ server is runing
            5672,           #port - port number of the service, 5672 is the default
            'guest',        #user - username to connect to server
            'guest'         #password
            );


        /** @var $channel AMQPChannel */
        $channel = $connection->channel();
        
        $channel->queue_declare(
            'pizzaTime',    #queue name - Queue names may be up to 255 bytes of UTF-8 characters
            false,          #passive - can use this to check whether an exchange exists without modifying the server state
            false,          #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
            false,          #exclusive - used by only one connection and the queue will be deleted when that connection closes
            false           #autodelete - queue is deleted when last consumer unsubscribes
            );
        
        $msg = new AMQPMessage($message);
        
        $channel->basic_publish(
            $msg,           #message 
            '',             #exchange
            'pizzaTime'     #routing key
            );
        
        $channel->close();
        $connection->close();
    }

A line-by-line breakdown is as follows:

<?php
/* ... MORE CODE HERE ... */

        $connection = new AMQPConnection(
            'localhost',    #host - host name where the RabbitMQ server is runing
            5672,           #port - port number of the service, 5672 is the default
            'guest',        #user - username to connect to server
            'guest'         #password
            );
            
/* ... MORE CODE HERE ... */

First, we create a connection object. Please be aware that the credentials guest:guest are the default for RabbitMQ. However, you will only be allowed to connect to the server using those if you connect from within the same host (localhost).

Since RabbitMQ listens and serves using a single port, we need to create a channel (think of it as a virtual port) with $channel = $connection->channel(); so other clients are able to connect to the server.

<?php
/* ... MORE CODE HERE ... */

        $channel->queue_declare(
            'pizzaTime',    #queue name - Queue names may be up to 255 bytes of UTF-8 characters
            false,          #passive - can use this to check whether an exchange exists without modifying the server state
            false,          #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
            false,          #exclusive - used by only one connection and the queue will be deleted when that connection closes
            false           #autodelete - queue is deleted when last consumer unsubscribes
            );
            
/* ... MORE CODE HERE ... */

Once we have our channel ready, let’s declare a queue to send the request to. The good thing about RabbitMQ is that we can create queues directly from the client, but we have to be careful how we create it. Let’s explain briefly the parameters used to create a queue with $channel->queue_declare()

  • Queue Name: this is an arbitrary name, will be used to identify the queue
  • Passive: if set to true, the server will only check if the queue can be created, false will actually attempt to create the queue.
  • Durable: Typically, if the server stops or crashes, all queues and messages are lost… unless we declare the queue durable, in which case the queue will persist if the server is restarted.
  • Exclusive: If true, the queue can only be used by the connection that created it.
  • Autodelete: if true, the queue will be deleted once it has no messages and there are no subscribers connected

In our example the queue will not persist if the server is restarted, can be used by other connections and will not be deleted if there are no more subscribers to it.

Next, we created a message object with $msg = new AMQPMessage($message);, $message being the POST parameter, theName that we received from the form. A message can be any string.

<?php
/* ... MORE CODE HERE ... */

        $channel->basic_publish(
            $msg,           #message 
            '',             #exchange
            'pizzaTime'     #routing key
            );
            
/* ... MORE CODE HERE ... */

Now we have to publish the message to the queue. However, we cannot publish messages directly to the queue if it is not through an exchange. We never declared an exchange, so how will this be possible? It turns out that when we create a queue without defining an exchange to bind the queue to, a default exchange will be used. We can publish the message to the queue through the default exchange with $channel->basic_publish(), the parameters it uses are:

  • Message: the message we want to send
  • Exchange: notice that we are using an empty string, because we will use the default exchange
  • Routing key: the queue name we want the message to be delivered to.
<?php
/* ... MORE CODE HERE ... */

        $channel->close();
        $connection->close();
            
/* ... MORE CODE HERE ... */

After we are done, we have to close the connection to the channel and the server.

Please notice that we did not receive any response back from the server. At best we can be sure that the message was queued but we totally ignore if the message reached the end recipient. This is part of the beauty of AMQP… we can quickly dispatch customers on our public site and asynchronously process the orders on a back-office application.

So the pizza order is in the queue, how do we retrieve those? First of all we have to be aware that a consumer has to establish a constant connection to the queue server (a.k.a. subscribe) in order for it to receive messages from the server. The server will not push those messages by itself to our application. Fortunately, creating that connection is super easy.

<?php
namespace Acme\AmqpWrapper;

use PhpAmqpLib\Connection\AMQPConnection;


class SimpleReceiver
{
    /* ... SOME CODE HERE ... */

    /**
     * Listens for incoming messages
     */
    public function listen()
    {
        $connection = new AMQPConnection(
            'localhost',    #host 
            5672,           #port
            'guest',        #user
            'guest'         #password
            );
            
        $channel = $connection->channel();
        
        $channel->queue_declare(
            'pizzaTime',    #queue name, the same as the sender
            false,          #passive
            false,          #durable
            false,          #exclusive
            false           #autodelete
            );
        
        $channel->basic_consume(
            'pizzaTime',                    #queue 
            '',                             #consumer tag - Identifier for the consumer, valid within the current channel. just string
            false,                          #no local - TRUE: the server will not send messages to the connection that published them
            true,                           #no ack - send a proper acknowledgment from the worker, once we're done with a task
            false,                          #exclusive - queues may only be accessed by the current connection
            false,                          #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
            array($this, 'processOrder')    #callback - method that will receive the message
            );
            
        while(count($channel->callbacks)) {
            $channel->wait();
        }
        
        $channel->close();
        $connection->close();
    }

    /**
     * @param $msg
     */
    public function processOrder($msg)
    {
        /* ... CODE TO PROCESS ORDER HERE ... */
    }
}

Just as we connected, created a channel and declared a queue in the producer, we have to do exactly the same inside the consumer. However in the consumer, we have to subscribe to the channel with $channel->basic_consume(), and the parameters used are defined as follows:

  • Queue: has to be the same queue name we defined in the producer
  • Consumer tag: an arbitrary name given to the consumer. If this field is empty the server will generate a unique tag
  • No local: This is an obscure parameter, if activated, the server will not deliver its own messages
  • No Ack(nowledgement): will automatically acknowledge that the consumer received the message, so we do not have to manually do so.
  • No wait: If set, the server will not wait for the process in the consumer to complete
  • Callback: can be a function name, an array containing the object and the method name, or a closure that will receive the queued message. This callback has to accept a parameter, containing such a message. In our example, array($this, 'processOrder') is used to define the processOrder() method of the current object as the callback.
<?php

/* ... SOME CODE HERE ... */

        while(count($channel->callbacks)) {
            $channel->wait();
        }
        
/* ... SOME CODE HERE ... */

The ‘magic’ happens inside the while loop. If the subscriber has at least one defined callback, we will wait for any event in the channel. Every time a messaged is received, our defined callback processOrder() will be executed, so we can process the message as we need.

How do we fire this up? Simple create a script that will invoke the SimpleReceiver::listen() method like:

<?php
chdir(dirname(__DIR__));

require_once('vendor/autoload.php');

use Acme\AmqpWrapper\SimpleReceiver;

$receiver = new SimpleReceiver();
$receiver->listen();

Now launch the process in the console with php <script name> and let it do its job. If you want to kill the consumer, a simple Ctrl + C will interrupt the process.

One of the nice things of having a queue server is that if for whichever reason your consumer job crashes, it will not disrupt the service to the users of your public application. The messages will simply stack inside the queue and once you relaunch the consumer, the messages will be delivered one by one to it for processing.

Conclusion

In this part, we introduced the theory of AMQP and queueing systems and demonstrated their use on a simple example. In the followup to this post, we’ll deal with two more examples of increased complexity and advanced concepts. Stay tuned!

  • chinmoym2004

    awesome tutorial .

  • Bruno Seixas

    Simple, direct and effective ;) Good tutorial

  • David

    You hava type in “sudo apt-get install rabbitmq-servercd”
    should be “sudo apt-get install rabbitmq-server”

  • http://www.bitfalls.com/ Bruno Skvorc

    Thanks, fixed!

  • Anil

    (Y)

  • Richard

    Very good tutorial. Straight forward, very easy to follow. Thanks for sharing.

  • Nabeel

    Very good tutorial, but my question is how to keep the receiver listening ? Do we need to use Daemon ?

    • https://www.phpcontext.com/wordpress Mike Mx

      Yes. You can use Supervisord to keep the process running always.

  • Tary

    Need to understand…does new AMQPConnection always creates a new connection, whenever called. Say If I am publishing data/message one by one and call new AMQPConnection, then will it always creates a new connection or uses the existing one as the existing was not closed yet.

    • Tary

      To add: – Calling new AMQPConnection in a loop and publishing data..

Recommended

Learn Coding Online
Learn Web Development

Start learning web development and design for free with SitePoint Premium!

Get the latest in PHP, once a week, for free.