Ruby
Article

Learn Concurrency by Implementing Futures in Ruby

By Benjamin Tan Wei Hao

Abstract wallpaper with business concept

Futures are a concurrency abstraction that represents the result of an asynchronous computation. This means that when you give a future some computation to process, this is done in a separate thread. Therefore, the main thread is free to do other processing. The moment you need a result from the future, you can ask for it. If it is still processing the computation, then the main thread gets blocked. Otherwise, the result is returned.

In this article, we will implement our very own Futures library in Ruby. Along the way, you will learn more about some of the concurrency libraries that Ruby provides and some fun Ruby tricks! Let’s dive right in.

How are Futures Useful?

Before we begin, it’ll help a little to see how Futures can be useful to us. Futures are a perfect candidate for making concurrent HTTP requests. Let’s start with a simple Ruby application that fetches random Chuck Norris jokes from The Internet Chuck Norris Database:

require 'open-uri'
require 'json'
require 'benchmark'

class Chucky
  URL = 'http://api.icndb.com/jokes/random'

  def sequential
    open(URL) do |f|
      f.each_line { |line| puts JSON.parse(line)['value']['joke'] }
    end
  end

end

In order to run this application, save the above as chucky.rb and run the program like so:

% irb
> require "./chucky"
 => true
> chucky = Chucky.new
 => #<Chucky:0x007fe02c046d98>
> chucky.sequential
Contrary to popular belief, the Titanic didn't hit an iceberg. The ship was off course and ran into Chuck Norris while he was doing the backstroke across the Atlantic.

Each time you execute chucky.sequential, the program will fetch a random Chuck Norris joke. (Warning: This is highly addictive!) What happens if we wanted to fetch more than, say, ten jokes? A naive solution looks something like:

10.times { chucky.sequential }

Unfortunately, this is an extreme waste of CPU resources and your time. While each request is made, the main thread is blocked and has to wait for the request to complete before going on to the next one. We are going to fix that by implementing our own Futures abstraction (as all good developers do).

Implementing Your Own Futures: Test First!

We are going to implement our Futures gem using Test-Driven Development (TDD). Let’s do this! We begin by creating a new Ruby gem using bundle gem <gem name>:

% bundle gem futurama
Creating gem 'futurama'...
MIT License enabled in config
      create  futurama/Gemfile
      create  futurama/.gitignore
      create  futurama/lib/futurama.rb
      create  futurama/lib/futurama/version.rb
      create  futurama/futurama.gemspec
      create  futurama/Rakefile
      create  futurama/README.md
      create  futurama/bin/console
      create  futurama/bin/setup
      create  futurama/LICENSE.txt
      create  futurama/.travis.yml
      create  futurama/.rspec
      create  futurama/spec/spec_helper.rb
      create  futurama/spec/futurama_spec.rb
Initializing git repo in futurama

Next, run bin/setup to bring in the dependencies:

% bin/setup
Resolving dependencies...
Using rake 10.4.2
Using bundler 1.10.6
Using diff-lcs 1.2.5
Using futurama 0.1.0 from source at .
Using rspec-support 3.3.0
Using rspec-core 3.3.2
Using rspec-expectations 3.3.1
Using rspec-mocks 3.3.2
Using rspec 3.3.0
Bundle complete! 4 Gemfile dependencies, 9 gems now installed.
Use `bundle show [gemname]` to see where a bundled gem is installed.

Note: If you don’t see the RSpec gems install, add spec.add_development_dependency "rspec" to the futurama.gemspec file.

First Steps

Our first test is deliberately simple:

require 'spec_helper'
require 'timeout'

module Futurama
  describe 'Future' do

    it 'returns a value' do
      future = Future.new { 1 + 2 }

      expect(future).to eq(3)
    end

  end

end

This test describes the interface of creating a Future. The Future object takes in a block of computation. Also, When the object is accessed, the return value is the result of the computation within that block (This second condition is not as simple as it sounds).

Next, create a file called future.rb in lib/futurama/future.rb. Next, make sure you require future.rb in lib/futurama.rb like so:

require "futurama/version"
require "futurama/future"

module Futurama
end

In order to pass the test, the Future object must:

  • accept a block
  • return the value of the block when it is invoked

Satisfying the first condition is easy enough:

module Futurama
  class Future

    def initialize(&block)
      @block = block
    end

  end
end

The other condition is slightly trickier:

require 'delegate'

module Futurama
  class Future < Delegator

    # initialize was here

    def __getobj__
      @block.call
    end

  end
end

When we subclass the built-in Delegator class, we must implement the __getobj__ method, otherwise Ruby will complain. But then again, this is the whole point of using the Delegator class! The return value of this method is basically what is returned when the object is accessed. In other words, we can override what it means when an object is accessed, which is exactly what we need! We invoke the block from the __getobj__ method and when we run the tests:

% rspec
Future
  returns a value

Finished in 0.00077 seconds (files took 0.06902 seconds to load)
1 example, 0 failures

Great success!

Executing Computations in the Background

Back to our tests. A Future takes a computation and runs it in a thread. In order to test this out, we can create a Future and let it sleep for one second before returning a value. On the main thread, we also simulate some computation that takes for a second:

module Futurama
  describe 'Future' do

    it 'executes the computation in the background' do
      future = Future.new { sleep(1); 42 }

      sleep(1) # do some computation

      Timeout::timeout(0.9) do
        expect(future).to eq(42)
      end
    end

  end
end

What we assert is interesting. Since the Future is running in the background just as the main thread sleeps for a second, it should in theory take less than 1 second before the result from the future is returned. We make use of the built-in Timeout library (we did a require 'timeout' at the top of spec/futurama/future.rb) to make sure that the future executes within the time boundary we set.

module Futurama
  class Future < Delegator

    def initialize(&block)
      @block  = block
      @thread = Thread.new { run_future }
    end

    def run_future
      @block.call
    end

    def __getobj__
      @thread.value
    end

  end
end

We wrap the calling of the block in the run_future method and run_future in a thread. This thread runs the moment the Future is created. Once the thread completes, the return value is accessed using Thread#value, as seen in the modified implementation of __getobj__.

Run the tests and everything should be green.

Handling Exceptions

Next, we turn our attention to handling exceptions. Since Futures are asynchronous, we do not want any nasty surprises should a Future suddenly fail. Instead, it would be nice for the future to store the exception and only raise it when we poke it. Here is the test that expresses the intent:

module Futurama
  describe 'Future' do

    it 'captures exceptions and re-raises them' do
      error_msg = 'Good news, everyone!'

      future = Future.new { raise error_msg }

      expect { future.inspect }.to raise_error RuntimeError, error_msg
    end

  end
end

Happily, you don’t have to do anything to get the test to pass! There’s a caveat though. If Thread.abort_on_exception is set to true, unhandled exceptions in any thread will cause the interpreter to exit. Not fun. Let’s expose this problem in the test:

module Futurama
  describe 'Future' do

    it 'captures exceptions and re-raises them' do
      Thread.abort_on_exception = true

      error_msg = 'Good news, everyone!'

      future = Future.new { raise error_msg }

      expect { future.inspect }.to raise_error RuntimeError, error_msg
    end

  end
end

With this new line, the test no longer passes. What to do? Turns out, you do need to do more work. (Sorry!) Here’s where we come to the meat of our implementation.

Queues!

From the previous section, we now have to find a way to store exceptions when they happen, instead of relying on the interpreter. Also, we need to re-raise the exception when the future object is invoked.

Before we get into the implementation, let’s think about the Future again. In one sense, it is a data structure that stores either the resolved value or an exception. Another very important consideration is making sure of thread-safety. How can we represent this??

While Ruby doesn’t have many thread-safe collection classes, Queue is an exception. From the class’ description:

This class provides a way to synchronize communication between threads.

For our implementation, recall that we are either storing the resolved value or the exception. Therefore, we need a Queue that has a maximum size of one. SizedQueue to the rescue!

module Futurama
  class Future < Delegator

    def initialize(&block)
      @block  = block
      @queue  = SizedQueue.new(1)
      @thread = Thread.new { run_future }
    end

    def run_future
      @queue.push(value: @block.call)
    rescue Exception => ex
      @queue.push(exception: ex)
    end

    def __getobj__
      # this will change in the next section
    end

  end
end

A new instance variabl, @queueis added, which is a SizedQueue with a capacity of one. We then modify run_future to either push the result of the block or an exception if one occurs. Because we are using a SizedQueue, we are guaranteed that the queue will not have two elements being pushed into it.

Getting the Result or Exception from the Queue

Next, we need to tackle the issue of getting the result or exception from the SizedQueue. Another thing to keep in mind is that once a Future resolves a value, it is done. The next time you get a value out of the Future, the result is going to be immediate. In other words, the Future remembers the value/exception once it is resolved.

module Futurama
  class Future < Delegator

    def initialize(&block)
      @block  = block
      @queue  = SizedQueue.new(1)
      @thread = Thread.new { run_future }
      @mutex  = Mutex.new
    end

    def __getobj__
      resolved_future_or_raise[:value]
    end

    def resolved_future_or_raise
      @resolved_future || @mutex.synchronize do
        @resolved_future ||= @queue.pop
      end

      Kernel.raise @resolved_future[:exception] if @resolved_future[:exception]
      @resolved_future
    end

  end
end

Let’s concentrate on resolved_future_or_raise method:

@resolved_future || @mutex.synchronize do
  @resolved_future ||= @queue.pop
end

Here, irst check if the Future has been resolved. That is just a fancy way of saying that the Future has completed computing a value or exception. Otherwise, retrieve this value/exception from @queue. We need to make sure that the operations of popping the queue and assigning the result to @resolved_future is performed atomically. In other words, we must guarantee that interleavings never happen. Therefore, we wrap the operations in a @mutex:

Kernel.raise @resolved_future[:exception] if @resolved_future[:exception]
@resolved_future

Now,check if @resolved_future has an exception. If so, raise it. Note that we are using Kernel#raise. Without specifying Kernel, Thread#raise would be used.

Finally, if there are no exceptions, the resolved value will be returned. Run the tests again, and everything should be a sweet-colored green!

Saving on Keystrokes (or: Polluting the Kernel namespace)

Having to type Futurama::Future.new { } is no fun. What if we could just future{ } instead. In Ruby, this is trivial. Let’s write a test for this first:

module Futurama
  describe 'Future' do
    it 'pollutes the Kernel namespace' do

      msg    = 'Do the Bender!'
      future = future { msg }

      expect(future).to eq(msg)
    end

  end
end

Run the tests, and it will fail with:

NoMethodError: undefined method `future' for #<RSpec::ExampleGroups::Future:0x007fd274988390>
./spec/futurama/future_spec.rb:67:in `block (2 levels) in <module:Futurama>'
-e:1:in `load'
-e:1:in `<main>'

We can easily fix this by creating a file called kernel.rb in lib/futurama:

require 'futurama'

module Kernel
  def future(&block)
    Futurama::Future.new(&block)
  end
end

Add a require statement for this file to lib/futurama.rb and run the tests again. We are back to green!

Getting the Value or Exception from the Future

Currently, the value or exception can be accessed from __getobj__. Obviously, we do not expect the client code to know about __getobj__. We can instead alias this to something like value:

module Futurama
  describe 'Future' do

    it 'allows access to its value' do
      val    = 10
      future = Future.new { val }

      expect(future.value).to eq(val)
    end

  end
end

Unsurprisingly, the test will fail with:

NoMethodError: undefined method `value' for 10:Futurama::Future
./spec/futurama/future_spec.rb:76:in `block (2 levels) in <module:Futurama>'
-e:1:in `load'
-e:1:in `<main>'

The code to get the test to pass is a one-liner:


require 'thread'

module Futurama
  class Future < Delegator

  def __getobj__
    resolved_future_or_raise[:value]
  end

  # place this *below* __getobj__
  alias_method :value, :__getobj__
  end
end

Everything should be green now!

Let’s Have Some Concurrent Chuck Norris Jokes!

Let’s take a look at chucky.rb again. Note that I have placed this file in sample/chucky.rb of futurama.

require '../lib/futurama'
require 'open-uri'
require 'json'
require 'benchmark'

class Chucky
  URL = 'http://api.icndb.com/jokes/random'

  def sequential
    open(URL) do |f|
      f.each_line { |line| puts JSON.parse(line)['value']['joke'] }
    end
  end

  def concurrent
    future { sequential }
  end

end

It only needs one tiny change to perform concurrent Chuck Norris joke lookup:

def concurrent
  future { sequential }
end

That’s it! It might come across as a bit anti-climatic, but we’re pretty much done. In order to put this to the test, we can benchmark the concurrent version versus the sequential one:


chucky = Chucky.new

Benchmark.bm do |x|
  x.report('concurrent') { 10.times { chucky.concurrent } }
  x.report('sequential') { 10.times { chucky.sequential } }
end

Or, if you are too lazy, you can see the results on my machine:

Limitations, Acknowledgments, and Where to Learn More

Creating an unbounded number of Futures will lead to the creation of an unbounded number of threads. This is obviously a bad thing. Languages like Java have Thread Pool Executors which manage a pool of threads. It is not extremely difficult to implement one in Ruby.

This article was inspired by studying the source code of the Futuroscope gem. In fact, its implementation has a thread pool that is passed into the constructor of the Future.

If you are interested in learning more about concurrency abstractions built in Ruby, look no further than the Concurrent Ruby gem. It consists of a plethora of concurrency tools like Agents, Thread Pools, Supervisors and, yes, Futures too! The documentation provided is highly readable too.

Thanks for Reading!

I hope you had fun learning about Futures, and hopefully more about Ruby. While Ruby, by default, doesn’t come with a mature library for concurrency, gems such as Concurrency Ruby provide an excellent avenue to learn about these concurrency tools in your favorite language.

More:
  • Stephane Liu

    Great article! Well thought out and written. Incorporating TDD provided a great deal of context fo the explanation.

  • krisleech

    Enjoyed this post. Nice tip about `Kernel.raise`.

  • http://railsadventures.wordpress.com/ Erez Rabih

    Great post. Thanks!

Recommended

Learn Coding Online
Learn Web Development

Start learning web development and design for free with SitePoint Premium!

Get the latest in Ruby, once a week, for free.