Network communication is fundamental to tons of applications, but it is very difficult to get right. Dealing with traditional sockets (TCP, UDP, etc.) is frustrating because the concepts associated with them are too low level for a lot of the work that we do. For example, sockets are operated as streams, meaning that you have come up with your own way to distinguish between packets of information.
This is one of the gaps that messaging systems fill. They make writing networked applications much easier by providing message-based, rather than stream-based, communication. That means instead of having to read from a seemingly never-ending stream of information, you get your information nicely packaged as messages. In addition, messaging systems allow us to easily design complex network topologies with “queues” in the middle (more on that later).
In this article, we’ll explore RabbitMQ. We’ll take a look at its underlying concepts and how it compares to other message queuing systems. Also, the article will cover how to use “Bunny”, the gem that lets Ruby developers interact with RabbitMQ.
Let’s get to it.
Concepts
The basic concept of RabbitMQ is pretty clear. It lets two computers communicate by using messages (a concept similar, at a very high level, to that of a packet). In addition, RabbitMQ also provides queues. This means that if computer A sends a message to computer B and computer B doesn’t read it immediately, RabbitMQ holds onto the message in a queue (which is a [type of data structure] (http://en.wikipedia.org/wiki/Queue(abstractdata_type))). Generally, the node that adds the queue is called a producer and the node that dequeues from the queue is called a consumer.
Unlike ZeroMQ, RabbitMQ is a message broker, implying that there is a “middle man” between computers that “brokers” the messages. This middle man does increase latency (i.e. waiting time) and can decrease throughput, but it makes load-balancing, persistent queuing (i.e. saving the queue of outstanding messages to disk), etc. much easier. So, it’s a tradeoff. In addition, RabbitMQ implements an open protocol named AMQP instead of just rolling its own like ZeroMQ.
The best way to learn about RabbitMQ is to start using it, so let’s dive in!
Bunny
Since RabbitMQ is a brokered messaging system, we need to install a RabbitMQ server that serves as the middle man. If you’re on OS X and have Homebrew, the process is really simple:
brew install rabbitmq
PATH=$PATH:/usr/local/sbin
rabbitmq-server
Notice that we had to update the path. /usr/local/sbin/
is generally not on the path, but that is where rabbitmq-server
is installed. If you are on Linux or Windows, RabbitMQ has standalone installers and debs for you.
To get Ruby talking with RabbitMQ, we need a gem called bunny:
gem install bunny
We can start out by writing a simple message sender:
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue("hello_world")
ch.default_exchange.publish("Hello, world!", :routing_key => q.name)
puts "Sent!"
conn.close
Let’s break down the code.
conn = Bunny.new
conn.start
ch = conn.create_channel
Here, we connect to the RabbitMQ server and create a channel of communication.
q = ch.queue("hello_world")
This is actually a fairly important bit of code. It creates a queue called hello_world
inside the RabbitMQ broker. If this queue already exists, it just gives us a handle to the existing queue. Queue names are quite important, since they allow us to distinguish between messages (e.g. queues “httptasks” and “newsmessages” likely contain two completely different types of information).
ch.default_exchange.publish("Hello, world!", :routing_key => q.name)
Here, we’re publishing the message “Hello, world”. The routing key can be used to create routes within your network which is functionality we do not need at the moment.
Now that we have a sender, we need a receiver:
require 'bunny'
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue('hello_world')
puts " Waiting for messages in #{q.name}. To exit press CTRL+C"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received #{body}"
end
Most of the code is the same as the sender except for the subscribing block:
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received #{body}"
end
This subscribes to new messages in the queue. Passing in :block => true
tells it that this call can block, i.e. wait around until a message is actually available. Then, once it is received, the code block we’ve passed in (not to be confused with the “waiting around” kind of block) prints out the contents of the message received.
The flow of data is pretty simple. The sender sends data to the broker. The receiver (often called the consumer) “consumes” it from the queue. Let’s do some more interesting things with the same ideas.
HTTP Master/Worker
It turns out that the same concepts can be applied to make a parallel/distributed HTTP link downloader. We’ll have a master who has a list of URLs which are then added to the queue. Workers will consume and download the contents of these URLs. Here’s the producer/master code:
require 'bunny'
class HttpMaster
#takes a list of urls
#to submit to workers
def initialize(urls)
@conn = Bunny.new
@urls = urls
end
def start
@conn.start
ch = @conn.create_channel
q = ch.queue("http_tasks")
@urls.each do |url|
ch.default_exchange.publish(url, :routing_key => q.name)
puts "Sent url: #{url}"
end
end
end
http_master = HttpMaster.new(['http://www.google.com/', 'http://yahoo.com/'])
http_master.start
At first glance, the code seems quite a bit more involved. But, the only thing we’ve really changed is the type of message (along with the name of the queue, but that’s just to keep ourselves organized):
ch.default_exchange.publish(url, :routing_key => q.name)
Now, instead of just publishing “Hello, world!”, the master publishes the URL it wants downloaded. Let’s take a look at the worker/consumer which makes use of this URL:
require 'bunny'
require 'open-uri'
class HttpWorker
def initialize(folder)
@conn = Bunny.new
@folder = folder
@counter = 0
end
def work(url)
open("#{@folder}/#{@counter}.backup", 'w+') do |file|
file << open(url).read
end
@counter += 1
end
def start
@conn.start
ch = @conn.create_channel
q = ch.queue "http_tasks"
puts "Waiting for tasks on queue #{q.name}"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
end
end
if ARGV.length != 1
abort 'Usage: http_worker.rb FOLDER'
end
worker = HttpWorker.new(ARGV[0])
worker.start
Let’s focus in on the important bit:
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
Instead of just discarding the message it receives like the “Hello, world” example, the worker calls work body
, where body should be the URL received from the master. Then, work
downloads the url to a specified folder.
Notice the power that RabbitMQ has given us. Instead of worrying about whether or not our data made it across the network, checking frame sizes, etc. as we would with sockets, we’re just dealing with messages. Also, if we run the master without running a worker, then the URLs will be held in a queue instead of just being thrown away. So, the master sends the URLs to the broker, the broker puts them in a queue and the consumer/worker downloads them to a folder. But, what if the worker encounters an error while doing that?
ACK’ing
As it is arranged right now, if a worker encountered an error (e.g. out of memory) in downloading a URL, the URL would be dequeued anyway, meaning that we would not have the downloaded file! Obviously, this can be quite problematic because we would not have a full copy of the data. Fortunately, RabbitMQ provides a way to solve this issue in the form of “ACKs”.
We can configure the queue so that it only dequeues items once it receives a message from the consumer that says “ok, I’ve done with this item”, i.e. an ACK message. It is important to note that this is similar in spirit but not the same as a TCP ACK; it is a feature implemented by RabbitMQ, not by the underlying transport protocol. RabbitMQ makes ACK’ing incredibly easy:
q.subscribe(:ack => true, :block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
ch.ack(delivery_info.delivery_tag)
end
Notice the argument :ack => true
in the first line. That tells RabbitMQ that it should dequeue stuff until it receives an ACK. Then, after calling work body
, we send back an ACK over the channel with ch.ack(delivery_info.delivery_tag)
. To implement the ACK properly, we’d simply need to catch a few exceptions that we know have some chance of occurring (e.g. no network connection).
Exchanges
So far, I’ve been presenting a simplified version of the RabbitMQ messaging model. The producer doesn’t actually send anything to the queue, instead it sends everything to a middle man called an exchange. The exchange decides to which queue to send messages. In all the code examples, we’ve been doing this:
ch.default_exchange.publish(message, :routing_key => q.name)
We’re using the “default exchange” which routes messages according to the routing key we give it, which, in this case, is q.name
. That means that consumers/workers will only get the messages if they are operating on the same q.name
.
Right now, if we ran two workers, then each worker would, on average, get half of the total tasks. What if we wanted to arrange it so that every worker gets every message? The use case isn’t hard to imagine: assume we had one worker on every computer and every computer needed a copy of all the URLs that the master was going to provide. What we want a “publish/subscribe” model.
It is pretty easy to think of a way to accomplish this: give every worker its own queue and make sure that all messages that are sent from the master are queued onto every worker queue. To do that, we’ll use a “fanout” exchange which sends messages to all the queues associated with it rather than sending to just one.
This results in a new start
method on the master:
def start
@conn.start
ch = @conn.create_channel
exchange = ch.fanout("http-fanout")
@urls.each do |url|
exchange.publish(url)
puts "Sent url: #{url}"
end
end
The important line is exchange = ch.fanout("http-fanout")
where we tell RabbitMQ about what kind of exchange to create. Then, we publish our message with exchange.publish(url)
. Notice we no longer need a routing key since the fanout exchange sends to all queues associated with it.
Now, we’ll tackle the next part of the problem, which is to create a queue with every worker and associate those queues with the exchange. We have following in the worker code:
def start
@conn.start
ch = @conn.create_channel
#instead of using a queue with a name,
#we tell RabbitMQ that we want a temporary
#queue (i.e. can get rid of it after this
#worker dies) that only one worker
#gets to use.
q = ch.queue("", :exclusive => true)
#get the exchange
exchange = ch.fanout('http-fanout')
#we need to tell RabbitMQ that this
#queue is connected ("bound") to the
#"http-fanout" exchange - see http_master_fanout.rb
q.bind(exchange)
puts "Waiting for tasks on queue #{q.name}"
q.subscribe(:block => true) do |delivery_info, properties, body|
puts "Received task/URL: #{body}"
work body
end
end
When we create the queue with q = ch.queue("", :exclusive => true)
, by providing an empty name, we let RabbitMQ choose a random name. With :exclusive => true
, we tell it that this queue is meant only for this worker. Then, with q.bind(exchange)
we bind (or associate) the queue with the exchange that the master is using. And that’s that! When we run the master with a few workers, we can see that the URLs are sent to every worker and not just distributed amongst them.
Wrapping It Up
I began with networked code in C and TCP sockets. Overall, it was a miserable and tedious experience. You have to mentally take care of a million things and debugging is a nightmare. RabbitMQ and Ruby come together to solve these problems and make communication simple and elegant.
So far, we’ve only covered a few parts of RabbitMQ and there’s tons of other stuff you can do with it. But, the basics in this article should be enough to allow you write some pretty awesome distributed code.
Frequently Asked Questions (FAQs) about Network Communication with RabbitMQ
What is the role of RabbitMQ in network communication?
RabbitMQ is a message broker that facilitates network communication by allowing applications to connect and talk to each other. It uses the Advanced Message Queuing Protocol (AMQP) to ensure reliable and secure data transfer. RabbitMQ acts as a middleman for different services, enabling them to send and receive messages to and from each other in a decoupled manner. This means that the sender and receiver do not need to interact with each other directly, which enhances the scalability and reliability of the system.
How does RabbitMQ ensure reliable message delivery?
RabbitMQ uses a combination of acknowledgments and persistence to ensure reliable message delivery. When a message is sent, the sender waits for an acknowledgment from the receiver before considering the message as delivered. If the acknowledgment is not received, the sender retransmits the message. Additionally, RabbitMQ can persist messages on disk, ensuring that they are not lost even if the server crashes or restarts.
How does RabbitMQ handle two-way communication?
RabbitMQ supports two-way communication through the use of reply queues and correlation IDs. When a sender sends a request, it includes a reply queue and a unique correlation ID in the message. The receiver processes the request and sends the response back to the reply queue specified by the sender, including the same correlation ID. The sender then matches the correlation ID of the response with that of the request to correlate the two.
What is the structure of an AMQP frame in RabbitMQ?
An AMQP frame in RabbitMQ consists of a frame header, a frame body, and a frame end. The frame header contains information such as the frame type and channel number. The frame body contains the actual data or control information. The frame end marks the end of the frame.
How does RabbitMQ fit into the Internet of Things (IoT)?
RabbitMQ is a versatile tool that can be used in IoT applications for reliable and scalable data transfer. It can handle high volumes of messages from various devices and ensure their reliable delivery to the appropriate services. Furthermore, RabbitMQ’s support for multiple messaging protocols, including MQTT, which is widely used in IoT, makes it a suitable choice for IoT applications.
How can I handle multiple consumers in RabbitMQ?
RabbitMQ supports multiple consumers through its queueing mechanism. When multiple consumers are connected to the same queue, RabbitMQ distributes the messages among them in a round-robin manner. This allows for load balancing and increased throughput.
How can I ensure message order in RabbitMQ?
RabbitMQ guarantees that messages sent from a single producer will be received in the same order by a single consumer. However, if there are multiple consumers, the order may not be preserved due to the round-robin distribution. To ensure order across multiple consumers, you can use a single queue and a single consumer, or use message grouping and consistent hashing.
How can I monitor RabbitMQ performance?
RabbitMQ provides various tools for monitoring its performance, including the RabbitMQ Management Plugin, which provides a web-based user interface for monitoring and controlling RabbitMQ servers. It displays various metrics such as message rates, queue lengths, and resource usage.
How can I secure my RabbitMQ server?
RabbitMQ provides various security features, including SSL/TLS for secure network communication, SASL for authentication, and access control lists for authorization. You can also configure firewall rules to restrict access to your RabbitMQ server.
How can I handle large messages in RabbitMQ?
RabbitMQ can handle large messages by chunking them into smaller pieces. The publisher can split a large message into smaller chunks, each with a sequence number, and send them separately. The consumer can then reassemble the chunks based on their sequence numbers. This can help to avoid memory issues and improve performance.
I'm a developer, math enthusiast and student.