Forking and IPC in Ruby, Part II

Glenn Goodrich
Ruby Editor

Fork of three roads

In the first article, we examined why the fork() system call is useful and where it fits into the grand scheme of things. We saw that by passing a block to Kernel#fork or Process#fork it is possible to execute arbitrary code concurrently (or in parallel if there are multiple processors). In addition, we saw that although forking is relatively expensive, it can compete with threading if the Ruby implementation doesn’t squander Copy-on-Write optimization.

Unfortunately, if a method is called in a fork, any data that it produces will not be available to the parent process, due to process isolation. What we need is a conduit through which we can pass data between processes. Here, in Part II, we’re going to cover some interprocess communication mechanisms as well as more ways that you can utilize fork.

The code in this article is available at github.

Pipes

Pipes enable data to flow in one direction between a pair of file descriptors. Since forks inherit open file descriptors, pipes can be used to communicate data between parent and child processes. Creating a pipe in Ruby is easy with IO#pipe.

>> reader, writer = IO.pipe
=> [#<IO:fd 5>, #<IO:fd 6>]

Here, writer will only write, and reader will only read. Sounds simple, but unfortunately there are some nuances. If you want to use a pipe more than once, IO#puts complemented by IO#gets is the simplest way,

>> writer.puts("hello world")
>> reader.gets
=> "hello world\n"

>> writer.puts("hello world, again")
>> reader.gets
=> "hello world, again\n"

Pipes communicate data using byte streams, so they need delimiters to know when to stop reading data. IO#gets reads until it receives a “\n”. Unlike IO#puts, IO#write does not automatically append “\n” to data, so a IO#gets followed by a IO#write will block indefinitely.

>>writer.write("this string is terminated\n")
>>reader.gets
=> "this string is terminated\n"

>> writer.write("this string is not terminated")
>> reader.gets
*waits indefinitely for \n*

Unlike IO#gets, IO#read will wait indefinitely for EOF. IO objects signal EOF when closed, so you will only use IO#read for one-time writes.

>>reader,writer = IO.pipe
>>writer.write("hello world")
>>writer.close
>>reader.read
=> "hello world"
>>reader.read
=> ""

Ok, so how would we connect a pipe between a parent process and one of its children? Thanks to the fact that variables and file descriptors will be shared, we only need to create a pipe once. However, since the fork will copy both ends of the pipe (reader and writer), we need to pick two extra ends and close them. Here is how you would send data from a child to parent.

child_parent_pipe.rb
# creates a fork and pipes a string from child to parent

reader,writer = IO.pipe

fork do
  reader.close
  writer.puts "sent from child process"
end

writer.close
from_child = reader.gets
puts from_child

We can also make it work the other way. Just make sure to close the writer on the end that will read and the reader on the end that will write.

parent_child_pipe.rb
# creates a fork and pipes a string from parent to child

reader,writer = IO.pipe

fork do
  writer.close
  from_parent = reader.gets
  puts from_parent
end

reader.close
writer.write "sent from parent process"

UNIX-Sockets

You can think of UNIX domain sockets as pipes with two big advantages:

  • UNIX sockets are bidirectional whereas pipes are unidirectional
  • Pipes can only use byte steams, but UNIX sockets can also use datagrams

Unlike regular sockets, they can only communicate data between two points on the same machine, using the filesystem as their address space. As a result, they can be very lightweight, making them a good fit for interprocess communication.

unix_sockets.rb
# creates a pair of UNIX sockets that send and receive a string

require 'socket'
parent_socket, child_socket = UNIXSocket.pair

fork do
  parent_socket.close
  child_socket.send("sent from child (#{$$})", 0)
  from_parent = child_socket.recv(100)
  puts from_parent
end

child_socket.close
parent_socket.send("sent from parent (#{$$})", 0)
from_child = parent_socket.recv(100)
puts from_child

Distributed Ruby

Both pipes and UNIX sockets have a couple of downsides:

  1. They are low-level, byte-transmission mechanisms. For complex behavior you will need to implement existing protocols or define your own.
  2. They cannot communicate across the machine barrier, limiting their usability in massively-scalable scenarios.

Distributed Ruby lets you create and consume what might be called “distributed object services.” Theses services let you execute code remotely by sending messages to distributed objects.

We execute code on remote machines all the time, but we do it through quite a bit of indirection. For example, let’s say you go to a web address like http://searchforallthestuffs.com/search?q=ponies. Your request hits a router which sees the route you specified and passes the arguments to the corresponding controller, which then executes the code associated with that route (generate an html view, JSON, XML, etc).

If your goal is to execute code remotely, this…kinda sucks. Every time you add a new feature, you need a route in place and perhaps a new controller. That’s a lot of overhead just to add a method to a class.

Distributed objects let you execute code remotely, but with an object receiving the message rather than an address.

distributed.rb
# Creates several worker processes and concurrently waits for the fastest one

require 'drb'

NUM_WORKERS = 4

class Worker

  def calculate
    time_to_work = rand(1..7)
    sleep time_to_work
    return time_to_work
  end

  def stop
    DRb.stop_service
  end
end

# Start object services
NUM_WORKERS.times do |i|
  DRb.start_service("druby://:700#{i}", Worker.new)
  puts "Worker running at #{DRb.uri}"
end

# Create a local end-point for each service
workers = NUM_WORKERS.times.map { |i| DRbObject.new nil, "druby://:700#{i}" }

# Concurrently wait for the fastest calculation
thread_pool = []
NUM_WORKERS.times do |i|
  thread_pool << Thread.new do
    answer = workers[i].calculate
    puts "Worker #{i} finished in #{answer} seconds"
  end
end

# Wait for every thread to get its answer
thread_pool.each(&:join)

# Shut down each worker
workers.each { |w| w.stop }

Moving Objects between Processes

Since low-level communication constructs like pipes and sockets transfer bytes, not objects, you will need to encode your objects into a byte format — i.e. serialize them — in order to move them across the process barrier.

Fortunately, Ruby ships with Marshal which can serialize most Ruby objects.

From Ruby-Doc.org: “Some objects cannot be dumped: if the objects to be dumped include bindings, procedure or method objects, instances of class IO, or singleton objects, a TypeError will be raised.”

serialization.rb
# Ruby objects can be serialized by Marshal to move across pipes
# or sockets

Tire = Struct.new(:radius, :pressure)

reader, writer = IO.pipe

fork do
  reader.close
  tire = Tire.new(7, 28)
  tire_data = Marshal.dump(tire)
  writer.write tire_data
end

writer.close
tire_data = reader.gets
tire = Marshal.load(tire_data)
puts tire.inspect

Creating a Module for Asynchronous Method Calls

If you can move objects between process, then you can execute a method in another process and get the result back in the original. This effectively enables asynchronous method execution.

Here, I have a module called Forkable that gives a class the ability to execute methods in parallel. The only thing that is different from before is that the pipe is read from in a new thread, and what comes out of the pipe is yielded to the block.

forkable.rb
# A module that lets you fork a method and get its result in a block
# Notes: 
#   1. Forked methods can't take blocks in this implementation
#   2. Call back threads will be destroyed if the main process exits
#   3. Not production ready. For educational purposes only.
####################################################################

# If included in a class #fork_method will run a method in another process
# and yield the result to a block
module Forkable
  def fork_method(method, *args)
    reader, writer = IO.pipe
    fork do
      reader.close
      result = self.send(method, *args)
      child_data = Marshal.dump(result)
      writer.puts(child_data)
    end

    writer.close
    Thread.new do
      data_from_child = reader.gets
      yield Marshal.load(data_from_child) if block_given?
    end
  end
end

# An object that takes a random amount of time to instantiate
class ExpensiveObject

  attr_reader :expense

  def initialize(max_expense)
    @expense = rand(1..max_expense)
    sleep @expense
  end
end

# A worker that forks, a...forker
class Forker
  include Forkable

  def calculate(max_expense)
    return ExpensiveObject.new(max_expense)
  end
end

# Create 3 ExpensiveObjects and print how long they took to create
f = Forker.new
3.times do 
  f.fork_method(:calculate, 7) do |result|
    puts "result: #{result.inspect}"
  end
end

puts "waiting on results..."

Process.waitall
puts "main process finished"

Cod

Cod is a library that aims to make IPC easier in Ruby. In the previous section, I demonstrated how to serialize objects for transfer across byte-transmission mechanisms like pipes or sockets. Cod takes care of this for us. It uses higher-level IPC mechanisms it calls channels. Not only do channels not need to be closed on one end in each process, but they can also transmit Ruby objects. A channel is created in Cod (however oddly) with Cod#pipe.

cod.rb
# The cod gem simplifies IPC. Ruby objects can be sent across channels.
# Installation: 
#   $ gem install cod

require 'cod'

Tire = Struct.new(:radius, :pressure)

channel = Cod.pipe

3.times do
  fork do
    radius = rand(8..14)
    pressure = rand(24..33)
    channel.put Tire.new(radius, pressure)
  end
end

tires = 3.times.map { channel.get }
puts tires.inspect

The cod gem does more than I can cover here (for example, it has beanstalkd integration). The website has a lot of examples, so definitely check it out.

Pre-forking Servers

Let’s look at a common example of forking in practice: servers. Some servers that use forking to achieve parallelism include:

  • Apache (Prefork MultiProcessing Module)
  • Unicorn
  • Rainbows! (based on Unicorn)

Specifically, forking servers generally use pre-forking. The concept works like this: forking a process might be cheaper than copying one, but it’s not free. Since running a web server generally consists of opening and closing lots of short-lived connections, it might not be a good idea to wait to create a new child process until a new connection is accepted. Instead, we can go ahead and fork a few times and let the forks wait for connections. You will often see this described as running a “pool of processes.”

So what might that look like?

preforking.rb
# Starts an echo server that can service 3 clients in parallel

require 'socket'

server = TCPServer.new 'localhost', 3000

trap("EXIT") { socket.close }
trap("INT") { exit }

3.times do
  fork do
    sock = server.accept
    sock.puts "You are connected to process #{$$}:"
    while recv = sock.gets
      sock.puts("ECHO:> #{recv}")
    end
  end
end

Process.waitall

If you run this code, you should be able to connect on 3 separate terminals.

telnet localhost 3000

The Parallel Gem

The parallel gem provides the ability to achieve parallelism in CRuby (MRI) without getting into the nitty-gritty of forks and IPC. Here is a quick way to spool up all of your CPU cores with parallel.

parallel.rb
# Parallel iterates across collections with processes or threads
# Installation: 
#   $ gem install parallel

require 'parallel'

def calculate(magnitude)
  x = 0
  cycles = 10 ** magnitude
  cycles.times do
    x += 1.000001
  end
  return x
end

results = Parallel.map([6, 6, 6, 6, 7, 7, 7, 7]) do |mag|
  calculate(mag)
end

puts results

SitePoint already has a great article about the parallel gem that explores its inner workings (at least as of 2011).

Programmers often avoid looking at source code out of fear of it turning into a massive time sink. The parallel gem, however, is a fairly small, one file library. There’s a great opportunity to learn and contribute.

Conclusion

Even as other interpreters (JRuby, Rubinius) operate freely without a Global Interpreter Lock, a process-centric approach to concurrency is still useful. The more vertical an approach to scaling is, the more limited it will be. Running multiple threads on multiple cores might be more horizontal than increasing clock speed, but it is still more vertical than running multiple machines. Running 100 single core instances is more granular than running 25 quad core instances. Thinking in terms of processes sending messages enables you to embrace that kind of granularity. It won’t be easy, but once it works with a cluster of 10 instances, why not 100? Or 1000? Why not have 500 on one provider and 500 on another? They’re just processes sending messages.

Spending a bunch of money up front on resources you may or may not use represents the old way of thinking. Why buy more than exactly what you need?

Free book: Jump Start HTML5 Basics

Grab a free copy of one our latest ebooks! Packed with hints and tips on HTML5's most powerful new features.

  • Anonymous

    You should also include Celluloid::DCell which provides both an Actor-model thread-concurrency system and distributed inter-process/cross-machine.

  • Anonymous

    Also, comments section is broken for Safari 7

  • Glenn

    Hey Brian, how’s it broken for Safari? I am posting this comment from it.

    • Anonymous

      When you hit reply and you are not signed in the login stuff does not appear.

  • Simon Mackie

    Seems to be working for me, Brian (Safari 7 OS X)

  • Anonymous

    Yeah, seems to be ok for me now too.

  • Kaspar Schiess

    I’ve also created a gem that implements what you do with ‘forkable.rb’ – just in a more verbose manner. As it happens, I’ve released version 0.6 yesterday and I am quite content with the stability of the code. Readers of this article should check that out as well: https://github.com/kschiess/procrastinate