Agent: Go-Like Concurrency in Ruby
Concurrency and parallelism are hard. With the traditional threading model, a developer has to constantly ensure threads are not in conflict, which is a very difficult task. The team behind Go implemented fantastic concurrency primitives within the language that make reasoning about concurrent code far easier.
Fortunately, all that goodness is no longer locked up inside Go; there’s now a library for Ruby! It’s called “Agent” and it implements the Golang-concurrency magic in Ruby.
Before we start using it, it is important to get some background out of the way.
The Go model for concurrency is based on the theories of pi-calculus and Communicating Sequential Processes (CSPs). Though the math underlying the conepts is quite interesting, a condensed version will get us up to speed much more quickly.
In the old days, we had threads which would communicate with each other by sharing bits of memory. This caused all sorts of problems. There would be times when two threads would try to write to the same thing at once or one thread would attempt to read from something that was being written to by another. Ingenious as they are, computer scientists came up with the concepts of locking, semaphores, etc. to solve these issues.
Go/CSP takes a different approach. Instead of sharing memory, concurrent “processes” (here, I do not mean processes as the *nix term) communicate with messages. Go implements these “processes” as “goroutines” which are like threads but much more “lightweight” in the resources they consume. Initially, the whole “messaging” concept might seem very limiting in comparison to just letting threads access the same set of variables. But it turns out that boxing ourselves in a little produces fantastic results in terms of “correctness” of concurrent code.
Agent takes a good portion of Go concurrency and implements it in Ruby. There is, however, one thing to be aware of when using Agent (or any Ruby-based concurrency framework): The standard ruby interpreter implements green threading. This means that threads are not actually run on separate processors.
Agent, underneath the abstractions that we are exposed to, uses green threading. As such, it can provide concurrency (two operations running over the same period of time, but not necessarily both executing an operation at the same instant) but not parallelism (two operations running at exactly the same instant). There are several benefits to this arrangement, but the downside is pretty clear: if you want to distribute a workload over multiple cores, Agent (and probably the standard Ruby interpreter) is not the best bet.
Now that we have the general ideas down, let’s move on to the code.
First of all, get yourself a copy of Agent:
gem install agent
You can find all of the code associated with this article here.
The basic unit of concurrency in Agent/Golang is a “goroutine”, which is a sort of lightweight thread. Agent makes it really easy to create and run one of these:
require 'agent' #go routine go! do puts 'hello, world!' end #wait around loop do end
Let’s quickly go through what we’re doing here. We pass a block to the
go! function (provided by Agent), thereby running a goroutine that executes that block. Notice that we have a forever loop at the end of that snippet. If it wasn’t for that bit of code, the goroutine would never have time to complete since the program would exit before finishing the goroutine! Obviously, this “waiting around forever” approach isn’t practical or elegant. But, to come up with something better, we need some kind of mechanism for goroutines to communicate amongst themselves and with the main thread of execution.
Channels serve as communication mechanisms between goroutines. You can add things
to a channel and take things out.
If the concept isn’t clear yet, some code will help:
require 'agent' chan = channel!(Integer) go! do #the program should only end #when this goroutine ends sleep 10 puts "Hello, world!" chan << 1 end puts chan.receive.first
This snippet waits until the message “Hello, world!” is printed and then exits. At the head of the program, we create
channel!. The first argument to
channel! is a class name which tells Agent that
chan should only take messages of type
Integer. Looking inside the goroutine block, we have the statement
chan . This means: "take the
Integer 1 and add it to the channel called
chan.” Then outside of the goroutine, we read a value from the channel.
A very important point to note here is that channel reads are generally blocking, which means that they do not return immediately if there’s nothing to read in a channel. Instead, they wait until there is something available to read. This is crucial to the execution of the above snippet. If
chan.receive.first returned immediately, the program would finish executing before finishing the goroutine.
As it happens, channel sends are also blocking on the type of channel we are using right now. What if, for some reason, you want to do a non-blocking send on a channel? You can probably think of one way of doing that:
go! do chan << 1 end
We’ll also look at buffered channels later on, which let you do roughly the same thing.
Now let’s put some of our Agent knowledge to work.
Echo server: Thread Pools vs. Goroutines
We often need to use concurrency when writing servers because they have to serve lots of clients at once. Using thread pools as a method of concurrency is a traditional solution to this problem. Essentially, a few threads are created when the server is initialized and each thread serves one client at a time. Then, once a thread is done dealing with a client, it is ready to serve another one.
But there all sorts of issues with this approach. First of all, it is difficult to implement this in a performant way because the algorithm that decides when to allocate more threads (in case the existing thread pool is insufficient) can cause massive overhead with either creating too many threads or unnecessarily destroying them.
You might be asking: “Why don’t we just create a thread for every single client?” Creating a thread has significant overhead attached to it. So, for servers that must deal with thousands of clients (fairly common these days), this approach is not practical. On the other hand, goroutines are less resource intensive than threads. The difference is significant enough that creating a goroutine for every client is quite reasonable. This can make implementation of fast, concurrent servers much quicker.
Let’s see how we can do it in Agent:
require 'agent' require 'socket' class EchoServer attr_accessor :host, :port def initialize(port) @port = port @server = TCPServer.new @port end def handle_client(client) loop do line = client.gets client.puts line end end def start loop do client = @server.accept puts 'accepted' go! do handle_client(client) #could do other stuff here end end end end server = EchoServer.new(1337) server.start()
Most of the code is just setting up structure. The central ideas are held in
EchoServer.start. In the latter, we have the main application loop which waits until a client can be accepted. Then, it calls
handle_client as a goroutine which sets up “echoing” (i.e. repeats whatever the client says) with the client. Notice that each client requires a new goroutine. If something similar is used in production, one would obviously need to check for errors in
handle_client and exit from the goroutine as needed.
Agent and Golang Goroutines Differences
As discussed, goroutines consume less resources than threads. In fact, a fairly run-of-the-mill server can execute up to 100,000 concurrent goroutines. However, Agent runs Ruby green threads underneath. The implementations of the two differ (stack allocation, context switching, etc.) which means that the performance benefits of Go may not translate perfectly into Agent. So, you have to be aware of (i.e. benchmark) performance of Agent-based applications (just as you would when trying any new technology). Regardless, Agent provides a fantastic way to reason about concurrency within Ruby.
Once you start writing larger pieces of code with Agent, you will realize that one channel is often not enough. Most of the time, you will be dealing with multiple channels. There is usually a portion of code that has to decide what to do based on which channels are ready to be read from or written to. That’s where the “select” call comes in.
select! in Agent, similar in spirit to the the POSIX call,
allows us to to act according to which channel is ready for reading/writing.
Let’s look at an example:
require 'agent' require 'socket' #this example involves selecting between #two channels. $string_chan = channel!(String, 1) $int_chan = channel!(Integer, 1) def process_chan loop do select! do |s| s.case($string_chan, :receive) do |value| puts "Received string: ", value end s.case($int_chan, :receive) do |value| puts "Received integer: ", value end end end end go! do process_chan end $string_chan << "hello" $int_chan << 18 $string_chan << "world" #in a typical design, this would not actually #be here; more or less specific to this example loop do end
Though the code may look a little bit complicated at first, it is actually fairly straightforward. The crux of it lies in the
process_chan method; in it resides the
select! statement. This “selects” between two channels,
s.case($string_chan, :receive), check if
$string_chan can be read (“received”) from. Using the block notation, store the value read from
value and subsequently print it. We do the same for
$int_chan. In some sense, this can be considered an event driven style of programming: we are waiting for a given event (one of the channels becoming readable).
So far, we have mostly used unbuffered channels, although I did sneak buffered channels into the last example. For these, both sending and receiving are blocking, implying that doing this:
unbuffered_chan << 1
waits around until
unbuffered_chan is actually read from.
On the other hand, buffered channels allow you to do this without waiting until the channel is “full”. A buffered channel is declared with a size and sends are non-blocking until the number of messages “waiting” (i.e. not read) on the channel reaches the size. If you are familiar with the queue data structure, the idea of a buffered channel is nearly the same: first in, first out. That means if you put in 50 messages into the channel, the one you put in first will be read first. And if the size of the channel is 50, only on attempting to send the 51st message into the channel (while 50 have not beed read yet) will the send call become blocking.
Let’s see a straightforward example:
require 'agent' buffered_chan = channel!(Integer, 2) #this goroutine makes Ruby think #there isn't a deadlock go! do loop do end end #should not block buffered_chan << 1 puts 'added one message to buffered_chan' #should not block buffered_chan << 1 puts 'added one message to buffered_chan' #will block buffered_chan << 1 puts 'this should never print'
The basic idea of the code is simple. We decleare a buffered channel with a buffer size of 2. Then, try to stuff a third message into the buffered channel without reading from it first, which will block forever. We do have to perform a bit of trickery to make sure Ruby doesn’t think there’s a deadlock (i.e. threads just waiting around for no reason at all).
So, what’s the point? Instead of one goroutine waiting around while another (probably performing a time-intensive operation) reads and operates on data from a channel, the process can move along more quickly if the messages in the channel are buffered, i.e. held until the time-intensive goroutine has had gotten around to them. This type of construct is used heavily in multicore processing (which is not possible on Agent/Ruby’s green threads, as mentioned) but it can also be applied to other areas which require concurrency.
Wrapping It Up
I hope you’ve enjoyed this tour of Agent, a library where Ruby and the best of Go collide. Over the past year, I’ve done a lot of Go development and I often find myself looking for Go’s concurrency features in Ruby: Agent is a very positive step to quench this thirst.