AMQP (Advanced Message Queueing Protocol) is a network protocol that can deliver messages from one application endpoint to another application endpoint. It does not care about the platform or language of said applications, as long as they support AMQP.
Essentially, one of the application endpoints sends a message, thus being a Producer, through an AMQP broker. In turn the broker will deliver the message to another application endpoint, called the Consumer.
RabbitMQ is an AMQP broker that has support for several programming languages, such as PHP.
The advantage of having a message broker such as RabbitMQ, and AMQP being a network protocol, is that the producer, the broker, and the consumer can live on different physical/virtual servers on different geographic locations.
Also, since networks are unreliable, and applications might fail to process a message completely, AMQP supports message acknowledgements, either automatically or when an application endpoint decides to send them.
RabbitMQ implements the AMQP 0-9-1 protocol. At a glance, it follows the following model.
Glossary
Concept | Definition | Icon |
---|---|---|
Producer | Application endpoint that sends messages | |
Consumer | Application endpoint that receives messages | |
Connection | Handles protocol, errors, authentication, etc… The connection is done using TCP protocol. | – |
Channel | Connections are multiplexed through channels. Even though all channels share the same tcp connection, communication from one channel is completely independent of another. | – |
Exchange | Receives messages from producers and pushes them to queues. Depending on the situation, this can be transparent to the developer. | |
Queue | Buffer that stores messages | |
Message | Piece of arbitrary information conforming to the AMQP format that is sent from the producer to a consumer through the broker. The broker cannot modify the information inside the message. | – |
Acknowledgements | Notice sent back from the consumer to tell the server that a message has been received and processed, so the server can delete it from the queue. | – |
Another advantage of AMQP 0-9-1 is that the application defines the routing logic instead of a broker administrator. This gives the developer a lot of flexibility, without the need to learn a new programming/scripting/markup language.
You can learn more about AMQP and RabbitMQ at the “AMQP 0-9-1 Model Explained” guide. Although not necessary for this tutorial, I encourage you to read it completely.
Installing RabbitMQ
You can find detailed documentation for your platform here, but roughly for Ubuntu it goes as follows:
sudo vim /etc/apt/sources.list
Add the following line to the file: deb https://www.rabbitmq.com/debian/ testing main
After this, just run the following commands
wget https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo apt-key add rabbitmq-signing-key-public.asc
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo rabbitmqctl status
You should see something like the following:
sudo rabbitmqctl status
Status of node 'rabbit@localhost-sprabbitmq-926807' ...
[{pid,790},
{running_applications,[{rabbit,"RabbitMQ","3.3.4"},
{os_mon,"CPO CXC 138 46","2.2.14"},
{mnesia,"MNESIA CXC 138 12","4.11"},
{xmerl,"XML parser","1.3.5"},
{sasl,"SASL CXC 138 11","2.3.4"},
{stdlib,"ERTS CXC 138 10","1.19.4"},
{kernel,"ERTS CXC 138 10","2.16.4"}]},
{os,{unix,linux}},
{erlang_version,"Erlang R16B03 (erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [kernel-poll:true]\n"},
{memory,[{total,53400096},
{connection_procs,2704},
{queue_procs,34432},
{plugins,0},
{other_proc,13983664},
{mnesia,64504},
{mgmt_db,0},
{msg_index,26056},
{other_ets,770880},
{binary,16112},
{code,16372077},
{atom,594537},
{other_system,21535130}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,12677506662},
{disk_free_limit,50000000},
{disk_free,899366912},
{file_descriptors,[{total_limit,199900},
{total_used,5},
{sockets_limit,179908},
{sockets_used,1}]},
{processes,[{limit,1048576},{used,133}]},
{run_queue,0},
{uptime,34}]
...done.
If you get an error message, the server probably needs to be started with the following command
sudo invoke-rc.d rabbitmq-server start
* Starting message broker rabbitmq-server
...done.
RabbitMQ defines a default user:
- Username: guest
- Password: guest
Be aware that this user will only be able to be used if connecting to RabbitMQ from localhost. For a true distributed system, you will have to define users and roles. You can read more on Access Control in the documentation. For the following examples we will be using the above credentials.
In order to be able to integrate your php application with RabbitMQ, you’ll need the php-amqplib library. Getting it is easy with composer, just define the requirement inside your composer.json
file:
{
...
"require": {
...,
"videlalvaro/php-amqplib": "2.2.*"
}
...
}
After a composer update
execution you’ll be all set.
Disclaimer
By no means should this code ever be used in production ready applications nor be executed on production servers. No security checks and/or validations were enforced. This code was written for educational purposes only, having the scope to showcase basic functionality only. Performance, efficiency, or reusability were not a priority.
Please note that even though the following examples uses the same host for the producer, broker, and consumer applications to ease development, deployment, and testing, in real life it would make no sense to have a “distributed system” in the same box.
For full source code listing, you can check this GitHub repository, containing the application used in the following examples.
Simple example: send request to process data asynchronously
Suppose we have a pizza company, and we receive online orders. Let’s also suppose we have an automated system that processes orders, but this system cannot be exposed directly to the public…
We will implement the simplest of patterns:
First of all, we have the following script to accept requests from the form:
<?php
chdir(dirname(__DIR__));
require_once('vendor/autoload.php');
use Acme\AmqpWrapper\SimpleSender;
$theName = filter_input(INPUT_POST, 'theName', FILTER_SANITIZE_STRING);
$simpleSender = new SimpleSender();
$simpleSender->execute($theName);
header("Location: orderReceived.html");
This code will simply check for a POST parameter named theName
and send it to an object we created for processing it. Let’s take a look at the SimpleSender::execute() method:
<?php
// ... SOME CODE HERE ...
/**
* Sends a message to the pizzaTime queue.
*
* @param string $message
*/
public function execute($message)
{
/**
* Create a connection to RabbitAMQP
*/
$connection = new AMQPConnection(
'localhost', #host - host name where the RabbitMQ server is runing
5672, #port - port number of the service, 5672 is the default
'guest', #user - username to connect to server
'guest' #password
);
/** @var $channel AMQPChannel */
$channel = $connection->channel();
$channel->queue_declare(
'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters
false, #passive - can use this to check whether an exchange exists without modifying the server state
false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
false #autodelete - queue is deleted when last consumer unsubscribes
);
$msg = new AMQPMessage($message);
$channel->basic_publish(
$msg, #message
'', #exchange
'pizzaTime' #routing key
);
$channel->close();
$connection->close();
}
A line-by-line breakdown is as follows:
<?php
/* ... MORE CODE HERE ... */
$connection = new AMQPConnection(
'localhost', #host - host name where the RabbitMQ server is runing
5672, #port - port number of the service, 5672 is the default
'guest', #user - username to connect to server
'guest' #password
);
/* ... MORE CODE HERE ... */
First, we create a connection object. Please be aware that the credentials guest:guest are the default for RabbitMQ. However, you will only be allowed to connect to the server using those if you connect from within the same host (localhost).
Since RabbitMQ listens and serves using a single port, we need to create a channel (think of it as a virtual port) with $channel = $connection->channel();
so other clients are able to connect to the server.
<?php
/* ... MORE CODE HERE ... */
$channel->queue_declare(
'pizzaTime', #queue name - Queue names may be up to 255 bytes of UTF-8 characters
false, #passive - can use this to check whether an exchange exists without modifying the server state
false, #durable - make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
false, #exclusive - used by only one connection and the queue will be deleted when that connection closes
false #autodelete - queue is deleted when last consumer unsubscribes
);
/* ... MORE CODE HERE ... */
Once we have our channel ready, let’s declare a queue to send the request to. The good thing about RabbitMQ is that we can create queues directly from the client, but we have to be careful how we create it. Let’s explain briefly the parameters used to create a queue with $channel->queue_declare()
- Queue Name: this is an arbitrary name, will be used to identify the queue
- Passive: if set to true, the server will only check if the queue can be created, false will actually attempt to create the queue.
- Durable: Typically, if the server stops or crashes, all queues and messages are lost… unless we declare the queue durable, in which case the queue will persist if the server is restarted.
- Exclusive: If true, the queue can only be used by the connection that created it.
- Autodelete: if true, the queue will be deleted once it has no messages and there are no subscribers connected
In our example the queue will not persist if the server is restarted, can be used by other connections and will not be deleted if there are no more subscribers to it.
Next, we created a message object with $msg = new AMQPMessage($message);
, $message
being the POST parameter, theName
that we received from the form. A message can be any string.
<?php
/* ... MORE CODE HERE ... */
$channel->basic_publish(
$msg, #message
'', #exchange
'pizzaTime' #routing key
);
/* ... MORE CODE HERE ... */
Now we have to publish the message to the queue. However, we cannot publish messages directly to the queue if it is not through an exchange. We never declared an exchange, so how will this be possible? It turns out that when we create a queue without defining an exchange to bind the queue to, a default exchange will be used. We can publish the message to the queue through the default exchange with $channel->basic_publish()
, the parameters it uses are:
- Message: the message we want to send
- Exchange: notice that we are using an empty string, because we will use the default exchange
- Routing key: the queue name we want the message to be delivered to.
<?php
/* ... MORE CODE HERE ... */
$channel->close();
$connection->close();
/* ... MORE CODE HERE ... */
After we are done, we have to close the connection to the channel and the server.
Please notice that we did not receive any response back from the server. At best we can be sure that the message was queued but we totally ignore if the message reached the end recipient. This is part of the beauty of AMQP… we can quickly dispatch customers on our public site and asynchronously process the orders on a back-office application.
So the pizza order is in the queue, how do we retrieve those? First of all we have to be aware that a consumer has to establish a constant connection to the queue server (a.k.a. subscribe) in order for it to receive messages from the server. The server will not push those messages by itself to our application. Fortunately, creating that connection is super easy.
<?php
namespace Acme\AmqpWrapper;
use PhpAmqpLib\Connection\AMQPConnection;
class SimpleReceiver
{
/* ... SOME CODE HERE ... */
/**
* Listens for incoming messages
*/
public function listen()
{
$connection = new AMQPConnection(
'localhost', #host
5672, #port
'guest', #user
'guest' #password
);
$channel = $connection->channel();
$channel->queue_declare(
'pizzaTime', #queue name, the same as the sender
false, #passive
false, #durable
false, #exclusive
false #autodelete
);
$channel->basic_consume(
'pizzaTime', #queue
'', #consumer tag - Identifier for the consumer, valid within the current channel. just string
false, #no local - TRUE: the server will not send messages to the connection that published them
true, #no ack - send a proper acknowledgment from the worker, once we're done with a task
false, #exclusive - queues may only be accessed by the current connection
false, #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
array($this, 'processOrder') #callback - method that will receive the message
);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
/**
* @param $msg
*/
public function processOrder($msg)
{
/* ... CODE TO PROCESS ORDER HERE ... */
}
}
Just as we connected, created a channel and declared a queue in the producer, we have to do exactly the same inside the consumer. However in the consumer, we have to subscribe to the channel with $channel->basic_consume()
, and the parameters used are defined as follows:
- Queue: has to be the same queue name we defined in the producer
- Consumer tag: an arbitrary name given to the consumer. If this field is empty the server will generate a unique tag
- No local: This is an obscure parameter, if activated, the server will not deliver its own messages
- No Ack(nowledgement): will automatically acknowledge that the consumer received the message, so we do not have to manually do so.
- No wait: If set, the server will not wait for the process in the consumer to complete
- Callback: can be a function name, an array containing the object and the method name, or a closure that will receive the queued message. This callback has to accept a parameter, containing such a message. In our example,
array($this, 'processOrder')
is used to define theprocessOrder()
method of the current object as the callback.
<?php
/* ... SOME CODE HERE ... */
while(count($channel->callbacks)) {
$channel->wait();
}
/* ... SOME CODE HERE ... */
The ‘magic’ happens inside the while
loop. If the subscriber has at least one defined callback, we will wait for any event in the channel. Every time a messaged is received, our defined callback processOrder()
will be executed, so we can process the message as we need.
How do we fire this up? Simple create a script that will invoke the SimpleReceiver::listen()
method like:
<?php
chdir(dirname(__DIR__));
require_once('vendor/autoload.php');
use Acme\AmqpWrapper\SimpleReceiver;
$receiver = new SimpleReceiver();
$receiver->listen();
Now launch the process in the console with php <script name>
and let it do its job. If you want to kill the consumer, a simple Ctrl + C
will interrupt the process.
One of the nice things of having a queue server is that if for whichever reason your consumer job crashes, it will not disrupt the service to the users of your public application. The messages will simply stack inside the queue and once you relaunch the consumer, the messages will be delivered one by one to it for processing.
Conclusion
In this part, we introduced the theory of AMQP and queueing systems and demonstrated their use on a simple example. In the followup to this post, we’ll deal with two more examples of increased complexity and advanced concepts. Stay tuned!
Frequently Asked Questions (FAQs) about Using RabbitMQ with PHP
What is the basic setup required to use RabbitMQ with PHP?
To use RabbitMQ with PHP, you need to have PHP installed on your system. You also need to install the RabbitMQ server and the PHP AMQP extension. The AMQP extension is a PHP extension that communicates with the RabbitMQ server using the AMQP protocol. Once these are installed, you can start using RabbitMQ with PHP by creating a connection, a channel, and then sending and receiving messages.
How can I install the PHP AMQP extension?
The PHP AMQP extension can be installed using the PECL installer. You can run the command ‘pecl install amqp’ in your terminal. After the installation, you need to add ‘extension=amqp.so’ to your php.ini file. Then, restart your web server to load the new extension.
How can I create a connection to the RabbitMQ server in PHP?
You can create a connection to the RabbitMQ server using the AMQPStreamConnection class. You need to provide the host, port, username, and password as parameters to the constructor of this class. The default values are ‘localhost’, 5672, ‘guest’, and ‘guest’ respectively.
How can I send a message to a queue in RabbitMQ using PHP?
You can send a message to a queue in RabbitMQ using the ‘basic_publish’ method of the AMQPChannel class. You need to create a message using the AMQPMessage class and then publish it to a queue.
How can I receive a message from a queue in RabbitMQ using PHP?
You can receive a message from a queue in RabbitMQ using the ‘basic_consume’ method of the AMQPChannel class. You need to provide the queue name and a callback function as parameters to this method. The callback function will be called whenever a message is received from the queue.
How can I handle errors in RabbitMQ with PHP?
You can handle errors in RabbitMQ with PHP using try-catch blocks. The PHP AMQP extension throws exceptions when errors occur. You can catch these exceptions and handle them appropriately.
How can I close a connection to the RabbitMQ server in PHP?
You can close a connection to the RabbitMQ server using the ‘close’ method of the AMQPStreamConnection class. It’s a good practice to close the connection when you’re done with it to free up resources.
How can I use exchanges in RabbitMQ with PHP?
You can use exchanges in RabbitMQ with PHP using the ‘exchange_declare’ method of the AMQPChannel class. You need to provide the exchange name, type, and other options as parameters to this method. Then, you can publish messages to the exchange instead of a queue.
How can I use routing keys in RabbitMQ with PHP?
You can use routing keys in RabbitMQ with PHP by providing them as a parameter to the ‘basic_publish’ method. The routing key determines which queues the message will be sent to, based on the bindings between the queues and the exchange.
How can I use acknowledgments in RabbitMQ with PHP?
You can use acknowledgments in RabbitMQ with PHP by calling the ‘basic_ack’ method of the AMQPChannel class in your callback function. This tells the RabbitMQ server that the message has been processed successfully and it can be removed from the queue.
Web application developer, database administrator, project lead in a variety of enterprise apps and now article author. Interested in database design and optimization. Amazed and involved in distributed systems development. Cryptography and information security fan.