Code Safari: Forks, pipes, and the Parallel gem
A few weeks ago, I wrote an article about splitting out work to be done into threads to improve performance of a URL checker. A commenter noted that the parallel gem could be used to achieve the same result, which got me all curious. I investigated the gem and found that not only can it parallelize into threads, it also supports splitting out work into multiple processes.
I wonder how that works?
Breaking it open
As always, I start with a copy of the code, usually found on GitHub:
git clone https://github.com/grosser/parallel.git
The README points us towards Parallel.map
as an entry point to the code. It is easy to find, since everything is in the one file: lib/parallel.rb
. Tracing this through leads us to the work_in_processes
and worker
methods, which is the core of what we are trying to figure out. Let’s start the top of work_in_processes
with the intent of figuring out the structure.
# lib/parallel.rb
def self.work_in_processes(items, options, &blk)
workers = Array.new(options[:count]).map{ worker(items, options, &blk) }
Parallel.kill_on_ctrl_c(workers.map{|worker| worker[:pid] })
This spawns a number of workers, then registers them to be correctly terminated if control C is sent to the parent manager process. Without this, killing your script would cause extra processes to be left running abandoned on your system! The worker
method actually creates the new process, and this is where things start to get interesting.
def self.worker(items, options, &block)
# use less memory on REE
GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe
pid = Process.fork do
begin
parent_write.close
parent_read.close
process_incoming_jobs(child_read, child_write, items, options, &block)
ensure
child_read.close
child_write.close
end
end
child_read.close
child_write.close
{:read => parent_read, :write => parent_write, :pid => pid}
end
There are three important concepts here that I will cover in turn. The first is the call to Process.fork
. This is a system level call (not available on Windows) that efficiently duplicates the current process to create a new one, called the “child”. For most intents and purposes, the two processes are exactly the same — same local variables, same stack trace, same everything. Ruby then directs the original process to skip over the block given to Process.fork
, but the new one to enter it, allowing different behaviour to be executed in each process. In other words, the block given to Process.fork
is only executed by the new child process.
Having two processes, we now need a way to communicate between them so that we can schedule work to be done. This is where the second important concept comes in: IO.pipe
. Since the two new processes are separate, they can not communicate by changing variables, since though they will initially share the same name and values in each process, they are duplicated so that a change made in the child process will not be seen by the parent. A pipe is a method of communicating between processes (again, not available on Windows). It acts just like a file that can be written to by one process, and read from by another. Parallel sets up two pipes to enable bi-directional communication. We will investigate this method further later in the article, for now just recognize that this is setting up a communication channel.
The last important concept is the copy_on_write_friendly
call, which requires a quick digression into memory management to explain. The reason fork
is so efficient is that though it looks like it is making an exact duplicate, it doesn’t actually copy any memory initially — both processes will read the exact same memory locations. It is only when a process writes to memory that it is copied and duplicated. Not only does this mean a fast start up time for the new processes, but it also allows them to have very small memory footprints if they are only reading and processing, as we could often expect our parallel workers to be doing.
For example, say a typical process was 20mb. Running five instances individually would result in memory usage of 100mb (5 lots of 20), but running one instance and forking it would result in a memory usage of still just 20mb! (It’s actually slightly higher due to some overhead in making a new process, but this is neglible.)
We have a problem though, which is Ruby’s garbage collector (how it manages memory). It uses an algorithm known as “mark-and-sweep”, which is a two step process:
- Scan all objects in memory and write a flag to them indicating whether they are in use or not.
- Clean up all objects not marked as in use.
Did you see the problem in step 1? As soon as the Ruby garbage collector runs it executes a write to every object, triggering a copy of that memory to be made! Even with forking, five instances of our 20mb script will still end up using 100mb of memory.
As indicated by the comment in the code snippet about, some very smart people have solved this problem and released it as Ruby Enterprise Edition. Their FAQ has plenty more detail for you to continue reading if you’re interested.
Communication
There is not much more to say about forking that is relevant, so I want to spend the rest of the article focussing on the communication channel: IO.pipe
. On the child side of the parallel
forks, the results of processing are sent back to the parent by writing suitably encoded Ruby objects — either the result or an exception — to the pipe (see the end of process_incoming_jobs
).
The parent spawns a thread per sub-process that blocks waiting for data to appear on the pipe, then collates the result before sending more work to the process. This continues until there is no more work to schedule.
# lib/parallel.rb:124
workers.each do |worker|
listener_threads << Thread.new do
begin
while output = worker[:read].gets
# store output from worker
result_index, output = decode(output.chomp)
if ExceptionWrapper === output
exception = output.exception
break
elsif exception # some other thread failed
break
end
result[result_index] = output
# give worker next item
next_index = Thread.exclusive{ current_index += 1 }
break if next_index >= items.size
write_to_pipe(worker[:write], next_index)
end
ensure
worker[:read].close
worker[:write].close
end
end
end
Note that rather than using a Queue
as we did in the last article, parallel
uses Thread.exclusive
to keep a thread-safe counter of the current index.
Wrapping it up
We now have a general idea of how to create and communicate between new processes, let’s try and test our knowledge by building a toy app to verify the methods we have learned about: fork
and pipe
.
reader, writer = IO.pipe
process_id = Process.fork do
writer.write "Hello"
end
if process_id
# fork will return nil inside the child process
# only the parent process wil execute this block
puts "Message from child: #{reader.read}"
end
At first glance this looks fine, but running it you will find the process hangs. We have missed something important! read
appears to be blocking because it isn’t receiving an end of file signal from the pipe. We can confirm that the communication is working otherwise by sending a newline and using gets
instead (which recieves input up to a newline character):
reader, writer = IO.pipe
process_id = Process.fork do
writer.write "Hello"
end
if process_id
puts "Message from child: #{reader.gets}"
end
This script works as expected. So why isn’t our first script working? The answer is non-obvious if you are not used to working with concurrent code, but is neatly explained in the IO.pipe
documentation:
The read end of a pipe will not generate an end of file condition if there are any writers with the pipe still open. In the case of the parent process, the
read
will never return if it does not first issue awriter.close
.
Bingo! Though our child process is closing it’s copy of the writer
implicitly when it exits, our parent process still has the original reference to writer
open! We can fix this by closing it before we try to read:
reader, writer = IO.pipe
process_id = Process.fork do
writer.write "Hello"
end
if process_id
writer.close
puts "Message from child: #{reader.read}"
end
Here are some further exercises for you to work on:
- Extend our sample script to allow the parent to also send messages to the child.
- What advantages/disadvantages are there to using
Thread.exclusive
(as parallel does) rather thanQueue
(as our last article did) for scheduling?
Let us know how you go in the comments. Tune in next week for more exciting adventures in the code jungle.