Code Safari: Forks, pipes, and the Parallel gem

A few weeks ago, I wrote an article about splitting out work to be done into threads to improve performance of a URL checker. A commenter noted that the parallel gem could be used to achieve the same result, which got me all curious. I investigated the gem and found that not only can it parallelize into threads, it also supports splitting out work into multiple processes.

I wonder how that works?

Breaking it open

As always, I start with a copy of the code, usually found on GitHub:

git clone https://github.com/grosser/parallel.git

The README points us towards Parallel.map as an entry point to the code. It is easy to find, since everything is in the one file: lib/parallel.rb. Tracing this through leads us to the work_in_processes and worker methods, which is the core of what we are trying to figure out. Let’s start the top of work_in_processes with the intent of figuring out the structure.

# lib/parallel.rb
def self.work_in_processes(items, options, &blk)
  workers = Array.new(options[:count]).map{ worker(items, options, &blk) }
  Parallel.kill_on_ctrl_c(workers.map{|worker| worker[:pid] })

This spawns a number of workers, then registers them to be correctly terminated if control C is sent to the parent manager process. Without this, killing your script would cause extra processes to be left running abandoned on your system! The worker method actually creates the new process, and this is where things start to get interesting.

def self.worker(items, options, &block)
  # use less memory on REE
  GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)

  child_read, parent_write = IO.pipe
  parent_read, child_write = IO.pipe

  pid = Process.fork do
    begin
      parent_write.close
      parent_read.close

      process_incoming_jobs(child_read, child_write, items, options, &block)
    ensure
      child_read.close
      child_write.close
    end
  end

  child_read.close
  child_write.close

  {:read => parent_read, :write => parent_write, :pid => pid}
end

There are three important concepts here that I will cover in turn. The first is the call to Process.fork. This is a system level call (not available on Windows) that efficiently duplicates the current process to create a new one, called the “child”. For most intents and purposes, the two processes are exactly the same — same local variables, same stack trace, same everything. Ruby then directs the original process to skip over the block given to Process.fork, but the new one to enter it, allowing different behaviour to be executed in each process. In other words, the block given to Process.fork is only executed by the new child process.

Having two processes, we now need a way to communicate between them so that we can schedule work to be done. This is where the second important concept comes in: IO.pipe. Since the two new processes are separate, they can not communicate by changing variables, since though they will initially share the same name and values in each process, they are duplicated so that a change made in the child process will not be seen by the parent. A pipe is a method of communicating between processes (again, not available on Windows). It acts just like a file that can be written to by one process, and read from by another. Parallel sets up two pipes to enable bi-directional communication. We will investigate this method further later in the article, for now just recognize that this is setting up a communication channel.

The last important concept is the copy_on_write_friendly call, which requires a quick digression into memory management to explain. The reason fork is so efficient is that though it looks like it is making an exact duplicate, it doesn’t actually copy any memory initially — both processes will read the exact same memory locations. It is only when a process writes to memory that it is copied and duplicated. Not only does this mean a fast start up time for the new processes, but it also allows them to have very small memory footprints if they are only reading and processing, as we could often expect our parallel workers to be doing.

For example, say a typical process was 20mb. Running five instances individually would result in memory usage of 100mb (5 lots of 20), but running one instance and forking it would result in a memory usage of still just 20mb! (It’s actually slightly higher due to some overhead in making a new process, but this is neglible.)

We have a problem though, which is Ruby’s garbage collector (how it manages memory). It uses an algorithm known as “mark-and-sweep”, which is a two step process:

  1. Scan all objects in memory and write a flag to them indicating whether they are in use or not.
  2. Clean up all objects not marked as in use.

Did you see the problem in step 1? As soon as the Ruby garbage collector runs it executes a write to every object, triggering a copy of that memory to be made! Even with forking, five instances of our 20mb script will still end up using 100mb of memory.

As indicated by the comment in the code snippet about, some very smart people have solved this problem and released it as Ruby Enterprise Edition. Their FAQ has plenty more detail for you to continue reading if you’re interested.

Communication

There is not much more to say about forking that is relevant, so I want to spend the rest of the article focussing on the communication channel: IO.pipe. On the child side of the parallel forks, the results of processing are sent back to the parent by writing suitably encoded Ruby objects — either the result or an exception — to the pipe (see the end of process_incoming_jobs).

The parent spawns a thread per sub-process that blocks waiting for data to appear on the pipe, then collates the result before sending more work to the process. This continues until there is no more work to schedule.

# lib/parallel.rb:124
workers.each do |worker|
  listener_threads << Thread.new do
    begin
      while output = worker[:read].gets
        # store output from worker
        result_index, output = decode(output.chomp)
        if ExceptionWrapper === output
          exception = output.exception
          break
        elsif exception # some other thread failed
          break
        end

        result[result_index] = output

        # give worker next item
        next_index = Thread.exclusive{ current_index += 1 }
        break if next_index >= items.size
        write_to_pipe(worker[:write], next_index)
      end
    ensure
      worker[:read].close
      worker[:write].close
    end
  end
end

Note that rather than using a Queue as we did in the last article, parallel uses Thread.exclusive to keep a thread-safe counter of the current index.

Wrapping it up

We now have a general idea of how to create and communicate between new processes, let’s try and test our knowledge by building a toy app to verify the methods we have learned about: fork and pipe.

reader, writer = IO.pipe

process_id = Process.fork do
  writer.write "Hello"
end

if process_id
  # fork will return nil inside the child process
  # only the parent process wil execute this block
  puts "Message from child: #{reader.read}"
end

At first glance this looks fine, but running it you will find the process hangs. We have missed something important! read appears to be blocking because it isn’t receiving an end of file signal from the pipe. We can confirm that the communication is working otherwise by sending a newline and using gets instead (which recieves input up to a newline character):

reader, writer = IO.pipe

process_id = Process.fork do
  writer.write "Hello"
end
if process_id
  puts "Message from child: #{reader.gets}"
end

This script works as expected. So why isn’t our first script working? The answer is non-obvious if you are not used to working with concurrent code, but is neatly explained in the IO.pipe documentation:

The read end of a pipe will not generate an end of file condition if there are any writers with the pipe still open. In the case of the parent process, the read will never return if it does not first issue a writer.close.

Bingo! Though our child process is closing it’s copy of the writer implicitly when it exits, our parent process still has the original reference to writer open! We can fix this by closing it before we try to read:

reader, writer = IO.pipe

process_id = Process.fork do
  writer.write "Hello"
end

if process_id
  writer.close
  puts "Message from child: #{reader.read}"
end

Here are some further exercises for you to work on:

  • Extend our sample script to allow the parent to also send messages to the child.
  • What advantages/disadvantages are there to using Thread.exclusive (as parallel does) rather than Queue (as our last article did) for scheduling?

Let us know how you go in the comments. Tune in next week for more exciting adventures in the code jungle.

Free book: Jump Start HTML5 Basics

Grab a free copy of one our latest ebooks! Packed with hints and tips on HTML5's most powerful new features.

  • http://www.linkedin.com/in/charlesmartin14 charles martin

    Thanks for explaining this. Notice that this is not so useful for processing very large files in a naive way:

    i.e. Parallel.map($stdin) …

    because map calls $stdin.to_a

    Can you suggest the optimal way(s) to use or extend the parallel gem to process unix streams? That is, is there something other than the obvious thing, which is to read the stream in chunks, pass the chunks to Parallel.map(), and then process the output?