Message Brokering with RabbitMQ
RabbitMQ is open-source message brokering software written in Erlang. The MQ in its name refers to a standard known as Advanced Message Queuing Protocol. For our purposes, and most others, it acts as a middleman between producer (sending) and consumer (receiving) programs — it simply accepts and forwards messages. A common analogy with RabbitMQ is that it acts like a shipping service, like a post office. You send a package to your friend in any part of the world and your friend eventually receives the package without really knowing or caring how it got there. In this analogy, RabbitMQ is the shipping service, the producer program is you (sending the package), the package is a generic blob of data, and the consuming program is your friend.
Both producers and consumers can be written in any language that has an available RabbitMQ or AMQP client library. In this article, I’ll demo a producer program written in PHP and a consuming program in Python.
Installing RabbitMQ and Client Libraries
Go to rabbitmq.com/download.html and pick the installation file suitable for your particular system. I happen to be using Ubuntu and have had problems installing the package provided by Ubuntu (it’s not the freshest package available). In this case, using the .deb package from rabbitmq.com provided a trouble-free installation as well as the latest version.
Once installed, there is a plugin installation tool available called rabbitmq-plugins. As an optional step, you can install the rabbitmq_management plugin which will provide a web browser view of the server and make various monitoring/management tasks easier. To enable the plugin on Ubuntu, I ran:
sudo rabbitmq-plugins enable rabbitmq_management
Restart the rabbitmq server for changes to take effect:
sudo /etc/init.d/rabbitmq-server restart
Point your browser to
http://localhost:55672/mgmt/ for the web-based GUI. The default login user and password will be “guest”.
If you prefer a command line tool,
rabbitmqctl can be used as an alternative.
PIP is a package manager for Python, similar to PHP’s PEAR of PECL, or Perl’s CPAN. It can be used to easily install Pika, the RabbitMQ library for Python.
sudo pip install pika
The RabbitMQ homepage provides a few links to different AMQP libraries for PHP, but the only one I found to work without headaches is php-amqplib, available from GitHub. If you do not already have git installed, please install it as the project’s make file depends on it to clone some additional submodules.
README can be found at the project’s GitHub page, but the short version is as follows:
git clone git://github.com/videlalvaro/php-amqplib.git cd php-amqplib/ make
config.php file will contain basic connection settings. This is the place to make any changes if you have installed the RabbitMQ server anywhere other than localhost.
Queues and Exchanges
A quick glance at the first few lines of
amqp_publisher.php reveals some basic RabbitMQ ideas.
A queue in RabbitMQ can be thought of as a mailbox, or an inbox, or more generally an endpoint at which messages can arrive. Queues are typically used by consumers to pluck off new messages and make interesting things happen, but both consumers and producers can declare or create queues that they will use.
$ch->queue_declare($queue, false, true, false, false);
The extra Boolean parameters correspond to the passive, durable, exclusive, and auto_delete bits used by RabbitMQ. They are not extremely important right now, but they are fully documented online.
queue_declare() creates a queue named “msgs”. If a queue named “msgs” already exists, nothing extra will happen. Only one queue per unique name can be created at once. Because of this, it is typical that both consumer and producer programs will each call
queue_declare(). This ensures the queue will always be ready when you are unsure which program (producer or consumer) will be running first. It is also important to note that messages are not written to queues directly — they must go through an exchange.
An exchange is “all the stuff in the middle” when you think of the shipping service or post office analogy of RabbitMQ. We don’t send a package directly to a friend’s mailbox, rather we drop it off at some acceptable pickup point, and it is then magically routed to its destination. In this demo source code, the
$exchange name variable was aptly named ‘router’, which is another good way to think of an exchange.
$ch->exchange_declare($exchange, 'direct', false, true, false);
Again there are some extra parameters here: passive, durable, and auto_delete. See the documentation for more information.
The second parameter, ‘direct’, is the type of exchange. To make sense of what the different types of exchanges are, we need to know about binding keys and routing keys. A routing key is an identifier used when a producer publishes a message to an exchange. A binding key is identifier that binds a particular queue to an exchange. Both keys are limited to 255 bytes in length.
In a direct exchange, a routing key is sent with a message to the exchange. If a queue is bound to that exchange with a binding key that directly matches the routing key, then the message is routed to that queue.
In the diagram below, queue Q1 is bound to an exchange with
binding_key="spades", and queue Q2 is bound to the same exchange with
If we send a message of the form
(message="Ace", routing_key="spades"), then that message will end up in Q1. Sending a message in the form
(message="King", routing_key="clubs") will result in Q2. It is possible for Q1 or Q2 to have multiple bindings to the same exchange. Q1 could be bound with “spades” and “hearts”, so that any message with the routing key “spades” or “hearts” will be routed to Q1.
Another type of exchange is known as fanout. In this type of exchange, routing keys are not important because the exchange will broadcast the message to all known bounded queues. This is probably the simplest exchange to work with.
A more interesting type of exchange is called topic. A topic exchange uses a dot-delimited group of words for routing keys, which provide more complex routing capabilities. For example, “error.production.database” or “weather.ny.syracuse”.
When binding queues to a topic exchange, we can use two special characters to match routing keys in a limited regular expression fashion.
- * (asterisk) – matches exactly one word
- # (octothorpe) – matches one or more words
If Q1 is bound with the binding key “*.production.*”, Q1 receives any production message of any severity level. All of the following routing keys will be routed to Q1:
Q2 bound with “info.#” receives any info message, regardless of source. All of these routing keys would end up in Q2:
Q3 could receive all database messages with binding “*.*.database”.
A Demo Producer and Consumer
Let’s create a pair of simple demo programs, one in PHP which will produce messages and one in Python that will act as the consumer. All messages will be sent through a topic exchange.
<?php include("config.php"); use PhpAmqpLibConnectionAMQPConnection; use PhpAmqpLibMessageAMQPMessage; $exchange = "rabbitmq_demo"; $exchangeType = "topic"; $queue = "events"; $message = $_SERVER["argv"]; $routingKey = $_SERVER["argv"]; $connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); // declare/create the queue $channel->queue_declare($queue, false, true, false, false); // declare/create the exchange as a topic exchange. $channel->exchange_declare($exchange, $exchangeType, false, false, false); $msg = new AMQPMessage($message, array("content_type" => "text/plain")); $channel->basic_publish($msg, $exchange, $routingKey); print "Sent $message ($routingKey)n"; $channel->close(); $connection->close();
import sys import pika EXCHANGE = "rabbitmq_demo" EXCHANGE_TYPE = "topic" QUEUE = "events" # consume callback function def callback(ch, method, properties, body): print " - Received '%s' on routing_key %s" % (body, method.routing_key) # Anything else could happen here: # Send an email alert, send an xmnp message, trigger another process, etc connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE) result = channel.queue_declare(queue=QUEUE, durable=True) if len(sys.argv) != 2: sys.exit("You must provide a binding key.") else: key = sys.argv channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=key) print " Listening for messages..." channel.basic_consume(callback, queue=QUEUE, no_ack=True) channel.start_consuming()
The producer PHP program will take two command line arguments: the message itself and the routing key. The consumer Python program will take a binding key as its only argument. They will create a topic exchange called “rabbitmq_demo” and a queue called “events”.
Run the Python program as follows:
python topic_consumer.py python topic_consumer.py errors.production.*
This will listen for production errors from any source (a database, web server, etc.).
Now run the PHP program with an error message and routing key:
php topicProducer.php "the prod database has been deleted. call the authorities" errors.production.database
You should see the message picked up by the Python consumer.
Listening for messages... - Received 'the prod database has been deleted. call the authorities' on routing_key errors.production.database
This message sent from the producer should be ignored by the consumer since its routing key does not match:
php topicProducer.php "DNS server timeout" warnings.prod.dns
You can press Control+C any time to stop the Python consumer, then start it back up with different binding keys to play around with how messages are routed.
This basic example should do two things. First, it should provide a way to try out different routing keys and see how topic exchanges can route messages. Second, it should get you thinking about how relatively simple it is to provide a means of communication among any number of independent programs written in any permutation of languages.
What if you worked in the operations world maintaining a collection of production level physical servers and software services? And what if you had a everything from Perl to Ruby to Haskell.NET programs all monitoring different parts of your system and needed a way for them to report back through a central channel? RabbitMQ can make your life a lot easier. Especially if your consumer program was able to tap into an e-mail or XMNP library to alert real live human beings of production level problems.
Or what if you had a website front end written in PHP that was responsible for uploading and then processing a large amount of user photos or videos? You may want to shift the processing burden to speedier or specialized language that can run in the background or on a different server instance. RabbitMQ can be configured as a worker queue for such a situation.
Hopefully I’ve sparked an interest in RabbitMQ, and maybe it will turn out to be the right tool for a particular decoupling or scalability problem you might be facing. It’s pretty easy to install and has wide selection of client libraries for your language of choice. The examples of an operations monitoring system and a worker queue for executing large jobs are just two real world example where RabbitMQ can be helpful, but there may be many more scenarios where it could help. Just use your imagination.
Image via Fotolia