Elegant Network Communication with RabbitMQ
Network communication is fundamental to tons of applications, but it is very difficult to get right. Dealing with traditional sockets (TCP, UDP, etc.) is frustrating because the concepts associated with them are too low level for a lot of the work that we do. For example, sockets are operated as streams, meaning that you have come up with your own way to distinguish between packets of information.
This is one of the gaps that messaging systems fill. They make writing networked applications much easier by providing message-based, rather than stream-based, communication. That means instead of having to read from a seemingly never-ending stream of information, you get your information nicely packaged as messages. In addition, messaging systems allow us to easily design complex network topologies with “queues” in the middle (more on that later).
In this article, we’ll explore RabbitMQ. We’ll take a look at its underlying concepts and how it compares to other message queuing systems. Also, the article will cover how to use “Bunny”, the gem that lets Ruby developers interact with RabbitMQ.
Let’s get to it.
Concepts
The basic concept of RabbitMQ is pretty clear. It lets two computers communicate by using messages (a concept similar, at a very high level, to that of a packet). In addition, RabbitMQ also provides queues. This means that if computer A sends a message to computer B and computer B doesn’t read it immediately, RabbitMQ holds onto the message in a queue (which is a [type of data structure] (http://en.wikipedia.org/wiki/Queue(abstractdata_type))). Generally, the node that adds the queue is called a producer and the node that dequeues from the queue is called a consumer.
Unlike ZeroMQ, RabbitMQ is a message broker, implying that there is a “middle man” between computers that “brokers” the messages. This middle man does increase latency (i.e. waiting time) and can decrease throughput, but it makes load-balancing, persistent queuing (i.e. saving the queue of outstanding messages to disk), etc. much easier. So, it’s a tradeoff. In addition, RabbitMQ implements an open protocol named AMQP instead of just rolling its own like ZeroMQ.
The best way to learn about RabbitMQ is to start using it, so let’s dive in!
Bunny
Since RabbitMQ is a brokered messaging system, we need to install a RabbitMQ server that serves as the middle man. If you’re on OS X and have Homebrew, the process is really simple:
brew install rabbitmq
PATH=$PATH:/usr/local/sbin
rabbitmq-server
Notice that we had to update the path. /usr/local/sbin/
is generally not on the path, but that is where rabbitmq-server
is installed. If you are on Linux or Windows, RabbitMQ has standalone installers and debs for you.
To get Ruby talking with RabbitMQ, we need a gem called bunny:
gem install bunny
We can start out by writing a simple message sender:
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue("hello_world")
ch.default_exchange.publish("Hello, world!", :routing_key => q.name)
puts "Sent!"
conn.close
Let’s break down the code.
conn = Bunny.new
conn.start
ch = conn.create_channel
Here, we connect to the RabbitMQ server and create a channel of communication.
q = ch.queue("hello_world")
This is actually a fairly important bit of code. It creates a queue called hello_world
inside the RabbitMQ broker. If this queue already exists, it just gives us a handle to the existing queue. Queue names are quite important, since they allow us to distinguish between messages (e.g. queues “httptasks” and “newsmessages” likely contain two completely different types of information).
ch.default_exchange.publish("Hello, world!", :routing_key => q.name)
Here, we’re publishing the message “Hello, world”. The routing key can be used to create routes within your network which is functionality we do not need at the moment.
Now that we have a sender, we need a receiver:
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue('hello_world')
puts " Waiting for messages in #{q.name}. To exit press CTRL+C"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received #{body}"
end
Most of the code is the same as the sender except for the subscribing block:
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received #{body}"
end
This subscribes to new messages in the queue. Passing in :block => true
tells it that this call can block, i.e. wait around until a message is actually available. Then, once it is received, the code block we’ve passed in (not to be confused with the “waiting around” kind of block) prints out the contents of the message received.
The flow of data is pretty simple. The sender sends data to the broker. The receiver (often called the consumer) “consumes” it from the queue. Let’s do some more interesting things with the same ideas.
HTTP Master/Worker
It turns out that the same concepts can be applied to make a parallel/distributed HTTP link downloader. We’ll have a master who has a list of URLs which are then added to the queue. Workers will consume and download the contents of these URLs. Here’s the producer/master code:
require 'bunny'
class HttpMaster
#takes a list of urls
#to submit to workers
def initialize(urls)
@conn = Bunny.new
@urls = urls
end
def start
@conn.start
ch = @conn.create_channel
q = ch.queue("http_tasks")
@urls.each do |url|
ch.default_exchange.publish(url, :routing_key => q.name)
puts "Sent url: #{url}"
end
end
end
http_master = HttpMaster.new(['http://www.google.com/', 'http://yahoo.com/'])
http_master.start
At first glance, the code seems quite a bit more involved. But, the only thing we’ve really changed is the type of message (along with the name of the queue, but that’s just to keep ourselves organized):
ch.default_exchange.publish(url, :routing_key => q.name)
Now, instead of just publishing “Hello, world!”, the master publishes the URL it wants downloaded. Let’s take a look at the worker/consumer which makes use of this URL:
require 'bunny'
require 'open-uri'
class HttpWorker
def initialize(folder)
@conn = Bunny.new
@folder = folder
@counter = 0
end
def work(url)
open("#{@folder}/#{@counter}.backup", 'w+') do |file|
file << open(url).read
end
@counter += 1
end
def start
@conn.start
ch = @conn.create_channel
q = ch.queue "http_tasks"
puts "Waiting for tasks on queue #{q.name}"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
end
end
if ARGV.length != 1
abort 'Usage: http_worker.rb FOLDER'
end
worker = HttpWorker.new(ARGV[0])
worker.start
Let’s focus in on the important bit:
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
Instead of just discarding the message it receives like the “Hello, world” example, the worker calls work body
, where body should be the URL received from the master. Then, work
downloads the url to a specified folder.
Notice the power that RabbitMQ has given us. Instead of worrying about whether or not our data made it across the network, checking frame sizes, etc. as we would with sockets, we’re just dealing with messages. Also, if we run the master without running a worker, then the URLs will be held in a queue instead of just being thrown away. So, the master sends the URLs to the broker, the broker puts them in a queue and the consumer/worker downloads them to a folder. But, what if the worker encounters an error while doing that?
ACK’ing
As it is arranged right now, if a worker encountered an error (e.g. out of memory) in downloading a URL, the URL would be dequeued anyway, meaning that we would not have the downloaded file! Obviously, this can be quite problematic because we would not have a full copy of the data. Fortunately, RabbitMQ provides a way to solve this issue in the form of “ACKs”.
We can configure the queue so that it only dequeues items once it receives a message from the consumer that says “ok, I’ve done with this item”, i.e. an ACK message. It is important to note that this is similar in spirit but not the same as a TCP ACK; it is a feature implemented by RabbitMQ, not by the underlying transport protocol. RabbitMQ makes ACK’ing incredibly easy:
q.subscribe(:ack => true, :block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
ch.ack(delivery_info.delivery_tag)
end
Notice the argument :ack => true
in the first line. That tells RabbitMQ that it should dequeue stuff until it receives an ACK. Then, after calling work body
, we send back an ACK over the channel with ch.ack(delivery_info.delivery_tag)
. To implement the ACK properly, we’d simply need to catch a few exceptions that we know have some chance of occurring (e.g. no network connection).
Exchanges
So far, I’ve been presenting a simplified version of the RabbitMQ messaging model. The producer doesn’t actually send anything to the queue, instead it sends everything to a middle man called an exchange. The exchange decides to which queue to send messages. In all the code examples, we’ve been doing this:
ch.default_exchange.publish(message, :routing_key => q.name)
We’re using the “default exchange” which routes messages according to the routing key we give it, which, in this case, is q.name
. That means that consumers/workers will only get the messages if they are operating on the same q.name
.
Right now, if we ran two workers, then each worker would, on average, get half of the total tasks. What if we wanted to arrange it so that every worker gets every message? The use case isn’t hard to imagine: assume we had one worker on every computer and every computer needed a copy of all the URLs that the master was going to provide. What we want a “publish/subscribe” model.
It is pretty easy to think of a way to accomplish this: give every worker its own queue and make sure that all messages that are sent from the master are queued onto every worker queue. To do that, we’ll use a “fanout” exchange which sends messages to all the queues associated with it rather than sending to just one.
This results in a new start
method on the master:
def start
@conn.start
ch = @conn.create_channel
exchange = ch.fanout("http-fanout")
@urls.each do |url|
exchange.publish(url)
puts "Sent url: #{url}"
end
end
The important line is exchange = ch.fanout("http-fanout")
where we tell RabbitMQ about what kind of exchange to create. Then, we publish our message with exchange.publish(url)
. Notice we no longer need a routing key since the fanout exchange sends to all queues associated with it.
Now, we’ll tackle the next part of the problem, which is to create a queue with every worker and associate those queues with the exchange. We have following in the worker code:
def start
@conn.start
ch = @conn.create_channel
#instead of using a queue with a name,
#we tell RabbitMQ that we want a temporary
#queue (i.e. can get rid of it after this
#worker dies) that only one worker
#gets to use.
q = ch.queue("", :exclusive => true)
#get the exchange
exchange = ch.fanout('http-fanout')
#we need to tell RabbitMQ that this
#queue is connected ("bound") to the
#"http-fanout" exchange - see http_master_fanout.rb
q.bind(exchange)
puts "Waiting for tasks on queue #{q.name}"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
end
When we create the queue with q = ch.queue("", :exclusive => true)
, by providing an empty name, we let RabbitMQ choose a random name. With :exclusive => true
, we tell it that this queue is meant only for this worker. Then, with q.bind(exchange)
we bind (or associate) the queue with the exchange that the master is using. And that’s that! When we run the master with a few workers, we can see that the URLs are sent to every worker and not just distributed amongst them.
Wrapping It Up
I began with networked code in C and TCP sockets. Overall, it was a miserable and tedious experience. You have to mentally take care of a million things and debugging is a nightmare. RabbitMQ and Ruby come together to solve these problems and make communication simple and elegant.
So far, we’ve only covered a few parts of RabbitMQ and there’s tons of other stuff you can do with it. But, the basics in this article should be enough to allow you write some pretty awesome distributed code.