Code Safari: Forks, pipes, and the Parallel gem

Share this article

A few weeks ago, I wrote an article about splitting out work to be done into threads to improve performance of a URL checker. A commenter noted that the parallel gem could be used to achieve the same result, which got me all curious. I investigated the gem and found that not only can it parallelize into threads, it also supports splitting out work into multiple processes.

I wonder how that works?

Breaking it open

As always, I start with a copy of the code, usually found on GitHub:

git clone https://github.com/grosser/parallel.git

The README points us towards Parallel.map as an entry point to the code. It is easy to find, since everything is in the one file: lib/parallel.rb. Tracing this through leads us to the work_in_processes and worker methods, which is the core of what we are trying to figure out. Let’s start the top of work_in_processes with the intent of figuring out the structure.

# lib/parallel.rb
def self.work_in_processes(items, options, &blk)
  workers = Array.new(options[:count]).map{ worker(items, options, &blk) }
  Parallel.kill_on_ctrl_c(workers.map{|worker| worker[:pid] })

This spawns a number of workers, then registers them to be correctly terminated if control C is sent to the parent manager process. Without this, killing your script would cause extra processes to be left running abandoned on your system! The worker method actually creates the new process, and this is where things start to get interesting.

def self.worker(items, options, &block)
  # use less memory on REE
  GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)

  child_read, parent_write = IO.pipe
  parent_read, child_write = IO.pipe

  pid = Process.fork do
    begin
      parent_write.close
      parent_read.close

      process_incoming_jobs(child_read, child_write, items, options, &block)
    ensure
      child_read.close
      child_write.close
    end
  end

  child_read.close
  child_write.close

  {:read => parent_read, :write => parent_write, :pid => pid}
end

There are three important concepts here that I will cover in turn. The first is the call to Process.fork. This is a system level call (not available on Windows) that efficiently duplicates the current process to create a new one, called the “child”. For most intents and purposes, the two processes are exactly the same — same local variables, same stack trace, same everything. Ruby then directs the original process to skip over the block given to Process.fork, but the new one to enter it, allowing different behaviour to be executed in each process. In other words, the block given to Process.fork is only executed by the new child process.

Having two processes, we now need a way to communicate between them so that we can schedule work to be done. This is where the second important concept comes in: IO.pipe. Since the two new processes are separate, they can not communicate by changing variables, since though they will initially share the same name and values in each process, they are duplicated so that a change made in the child process will not be seen by the parent. A pipe is a method of communicating between processes (again, not available on Windows). It acts just like a file that can be written to by one process, and read from by another. Parallel sets up two pipes to enable bi-directional communication. We will investigate this method further later in the article, for now just recognize that this is setting up a communication channel.

The last important concept is the copy_on_write_friendly call, which requires a quick digression into memory management to explain. The reason fork is so efficient is that though it looks like it is making an exact duplicate, it doesn’t actually copy any memory initially — both processes will read the exact same memory locations. It is only when a process writes to memory that it is copied and duplicated. Not only does this mean a fast start up time for the new processes, but it also allows them to have very small memory footprints if they are only reading and processing, as we could often expect our parallel workers to be doing.

For example, say a typical process was 20mb. Running five instances individually would result in memory usage of 100mb (5 lots of 20), but running one instance and forking it would result in a memory usage of still just 20mb! (It’s actually slightly higher due to some overhead in making a new process, but this is neglible.)

We have a problem though, which is Ruby’s garbage collector (how it manages memory). It uses an algorithm known as “mark-and-sweep”, which is a two step process:

  1. Scan all objects in memory and write a flag to them indicating whether they are in use or not.
  2. Clean up all objects not marked as in use.

Did you see the problem in step 1? As soon as the Ruby garbage collector runs it executes a write to every object, triggering a copy of that memory to be made! Even with forking, five instances of our 20mb script will still end up using 100mb of memory.

As indicated by the comment in the code snippet about, some very smart people have solved this problem and released it as Ruby Enterprise Edition. Their FAQ has plenty more detail for you to continue reading if you’re interested.

Communication

There is not much more to say about forking that is relevant, so I want to spend the rest of the article focussing on the communication channel: IO.pipe. On the child side of the parallel forks, the results of processing are sent back to the parent by writing suitably encoded Ruby objects — either the result or an exception — to the pipe (see the end of process_incoming_jobs).

The parent spawns a thread per sub-process that blocks waiting for data to appear on the pipe, then collates the result before sending more work to the process. This continues until there is no more work to schedule.

# lib/parallel.rb:124
workers.each do |worker|
  listener_threads << Thread.new do
    begin
      while output = worker[:read].gets
        # store output from worker
        result_index, output = decode(output.chomp)
        if ExceptionWrapper === output
          exception = output.exception
          break
        elsif exception # some other thread failed
          break
        end

        result[result_index] = output

        # give worker next item
        next_index = Thread.exclusive{ current_index += 1 }
        break if next_index >= items.size
        write_to_pipe(worker[:write], next_index)
      end
    ensure
      worker[:read].close
      worker[:write].close
    end
  end
end

Note that rather than using a Queue as we did in the last article, parallel uses Thread.exclusive to keep a thread-safe counter of the current index.

Wrapping it up

We now have a general idea of how to create and communicate between new processes, let’s try and test our knowledge by building a toy app to verify the methods we have learned about: fork and pipe.

reader, writer = IO.pipe

process_id = Process.fork do
  writer.write "Hello"
end

if process_id
  # fork will return nil inside the child process
  # only the parent process wil execute this block
  puts "Message from child: #{reader.read}"
end

At first glance this looks fine, but running it you will find the process hangs. We have missed something important! read appears to be blocking because it isn’t receiving an end of file signal from the pipe. We can confirm that the communication is working otherwise by sending a newline and using gets instead (which recieves input up to a newline character):

reader, writer = IO.pipe

process_id = Process.fork do
  writer.write "Hello"
end
if process_id
  puts "Message from child: #{reader.gets}"
end

This script works as expected. So why isn’t our first script working? The answer is non-obvious if you are not used to working with concurrent code, but is neatly explained in the IO.pipe documentation:

The read end of a pipe will not generate an end of file condition if there are any writers with the pipe still open. In the case of the parent process, the read will never return if it does not first issue a writer.close.

Bingo! Though our child process is closing it’s copy of the writer implicitly when it exits, our parent process still has the original reference to writer open! We can fix this by closing it before we try to read:

reader, writer = IO.pipe

process_id = Process.fork do
  writer.write "Hello"
end

if process_id
  writer.close
  puts "Message from child: #{reader.read}"
end

Here are some further exercises for you to work on:

  • Extend our sample script to allow the parent to also send messages to the child.
  • What advantages/disadvantages are there to using Thread.exclusive (as parallel does) rather than Queue (as our last article did) for scheduling?

Let us know how you go in the comments. Tune in next week for more exciting adventures in the code jungle.

Frequently Asked Questions (FAQs) about Forks, Pipes, and the Parallel Gem

What is the Parallel gem in Ruby and why is it important?

The Parallel gem is a powerful tool in Ruby that allows for the execution of multiple tasks simultaneously. This is particularly useful when dealing with large data sets or complex computations that can be broken down into smaller, independent tasks. By running these tasks in parallel, you can significantly reduce the overall execution time, leading to more efficient and faster programs.

How does the Parallel gem work?

The Parallel gem works by creating multiple processes or threads, each running a portion of the tasks. It uses the fork and pipe system calls to create these processes and threads and to communicate between them. This allows for the distribution of tasks across multiple cores, leading to parallel execution.

How do I install the Parallel gem?

You can install the Parallel gem by using the gem install command in your terminal. The command is as follows: gem install parallel. After running this command, the gem will be installed and ready for use in your Ruby programs.

How do I use the Parallel gem in my code?

To use the Parallel gem in your code, you first need to require it at the top of your Ruby file with require 'parallel'. You can then use the Parallel.map method to run a block of code in parallel. The method takes an enumerable and a block, and runs the block for each element in the enumerable in parallel.

What are the differences between processes and threads in the Parallel gem?

In the context of the Parallel gem, processes and threads are both ways to achieve parallel execution. However, they have some key differences. Processes are independent of each other and do not share memory, while threads are lighter weight and share memory with each other. This means that processes are more isolated and less likely to interfere with each other, but threads can be more efficient for tasks that require shared data.

How can I handle errors in the Parallel gem?

The Parallel gem provides several ways to handle errors. One way is to use the rescue option, which allows you to specify a block of code to run if an error occurs. Another way is to use the finish option, which allows you to specify a block of code to run after each piece of work, regardless of whether an error occurred.

Can I use the Parallel gem with Bundler?

Yes, you can use the Parallel gem with Bundler. To do this, you need to add the gem to your Gemfile with gem 'parallel', and then run bundle install. After this, you can require the gem in your Ruby files with require 'parallel'.

What are some common use cases for the Parallel gem?

The Parallel gem is commonly used for tasks that can be broken down into smaller, independent tasks and run simultaneously. This includes tasks like processing large data sets, performing complex computations, and handling multiple network requests.

Are there any limitations or drawbacks to using the Parallel gem?

While the Parallel gem is a powerful tool, it does have some limitations. For example, it can only be used with Ruby versions 1.9.3 and above. Additionally, because it uses multiple processes or threads, it can consume more system resources than a single-threaded program. It’s also important to note that not all tasks can be parallelized, and attempting to do so can sometimes lead to unexpected results or errors.

How can I optimize the performance of my code using the Parallel gem?

There are several ways to optimize the performance of your code using the Parallel gem. One way is to carefully choose between using processes and threads, depending on the nature of your tasks. Another way is to fine-tune the number of processes or threads used, which can be done using the in_processes or in_threads options. Additionally, handling errors properly can also help to improve performance.

Xavier ShayXavier Shay
View Author

Xavier Shay is a DataMapper committer who has contributed to Ruby on Rails. He wrote the Enki blog platform, as well as other widely used libraries, including TufteGraph for JavaScript graphing and Kronic for date formatting and parsing. In 2010, he traveled the world running Ruby on Rails workshops titled "Your Database Is Your Friend." He regularly organizes and speaks at the Melbourne Ruby on Rails Meetup, and also blogs on personal development at TwoShay, and on more esoteric coding topics at Robot Has No Heart.

Parallel gemrubyRuby on Rails
Share this article
Read Next
Get the freshest news and resources for developers, designers and digital creators in your inbox each week