A few weeks ago, I wrote an article about splitting out work to be done into threads to improve performance of a URL checker. A commenter noted that the parallel gem could be used to achieve the same result, which got me all curious. I investigated the gem and found that not only can it parallelize into threads, it also supports splitting out work into multiple processes.
I wonder how that works?
Breaking it open
As always, I start with a copy of the code, usually found on GitHub:
git clone https://github.com/grosser/parallel.git
The README points us towards Parallel.map
as an entry point to the code. It is easy to find, since everything is in the one file: lib/parallel.rb
. Tracing this through leads us to the work_in_processes
and worker
methods, which is the core of what we are trying to figure out. Let’s start the top of work_in_processes
with the intent of figuring out the structure.
# lib/parallel.rb
def self.work_in_processes(items, options, &blk)
workers = Array.new(options[:count]).map{ worker(items, options, &blk) }
Parallel.kill_on_ctrl_c(workers.map{|worker| worker[:pid] })
This spawns a number of workers, then registers them to be correctly terminated if control C is sent to the parent manager process. Without this, killing your script would cause extra processes to be left running abandoned on your system! The worker
method actually creates the new process, and this is where things start to get interesting.
def self.worker(items, options, &block)
# use less memory on REE
GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe
pid = Process.fork do
begin
parent_write.close
parent_read.close
process_incoming_jobs(child_read, child_write, items, options, &block)
ensure
child_read.close
child_write.close
end
end
child_read.close
child_write.close
{:read => parent_read, :write => parent_write, :pid => pid}
end
There are three important concepts here that I will cover in turn. The first is the call to Process.fork
. This is a system level call (not available on Windows) that efficiently duplicates the current process to create a new one, called the “child”. For most intents and purposes, the two processes are exactly the same — same local variables, same stack trace, same everything. Ruby then directs the original process to skip over the block given to Process.fork
, but the new one to enter it, allowing different behaviour to be executed in each process. In other words, the block given to Process.fork
is only executed by the new child process.
Having two processes, we now need a way to communicate between them so that we can schedule work to be done. This is where the second important concept comes in: IO.pipe
. Since the two new processes are separate, they can not communicate by changing variables, since though they will initially share the same name and values in each process, they are duplicated so that a change made in the child process will not be seen by the parent. A pipe is a method of communicating between processes (again, not available on Windows). It acts just like a file that can be written to by one process, and read from by another. Parallel sets up two pipes to enable bi-directional communication. We will investigate this method further later in the article, for now just recognize that this is setting up a communication channel.
The last important concept is the copy_on_write_friendly
call, which requires a quick digression into memory management to explain. The reason fork
is so efficient is that though it looks like it is making an exact duplicate, it doesn’t actually copy any memory initially — both processes will read the exact same memory locations. It is only when a process writes to memory that it is copied and duplicated. Not only does this mean a fast start up time for the new processes, but it also allows them to have very small memory footprints if they are only reading and processing, as we could often expect our parallel workers to be doing.
For example, say a typical process was 20mb. Running five instances individually would result in memory usage of 100mb (5 lots of 20), but running one instance and forking it would result in a memory usage of still just 20mb! (It’s actually slightly higher due to some overhead in making a new process, but this is neglible.)
We have a problem though, which is Ruby’s garbage collector (how it manages memory). It uses an algorithm known as “mark-and-sweep”, which is a two step process:
- Scan all objects in memory and write a flag to them indicating whether they are in use or not.
- Clean up all objects not marked as in use.
Did you see the problem in step 1? As soon as the Ruby garbage collector runs it executes a write to every object, triggering a copy of that memory to be made! Even with forking, five instances of our 20mb script will still end up using 100mb of memory.
As indicated by the comment in the code snippet about, some very smart people have solved this problem and released it as Ruby Enterprise Edition. Their FAQ has plenty more detail for you to continue reading if you’re interested.
Communication
There is not much more to say about forking that is relevant, so I want to spend the rest of the article focussing on the communication channel: IO.pipe
. On the child side of the parallel
forks, the results of processing are sent back to the parent by writing suitably encoded Ruby objects — either the result or an exception — to the pipe (see the end of process_incoming_jobs
).
The parent spawns a thread per sub-process that blocks waiting for data to appear on the pipe, then collates the result before sending more work to the process. This continues until there is no more work to schedule.
# lib/parallel.rb:124
workers.each do |worker|
listener_threads << Thread.new do
begin
while output = worker[:read].gets
# store output from worker
result_index, output = decode(output.chomp)
if ExceptionWrapper === output
exception = output.exception
break
elsif exception # some other thread failed
break
end
result[result_index] = output
# give worker next item
next_index = Thread.exclusive{ current_index += 1 }
break if next_index >= items.size
write_to_pipe(worker[:write], next_index)
end
ensure
worker[:read].close
worker[:write].close
end
end
end
Note that rather than using a Queue
as we did in the last article, parallel
uses Thread.exclusive
to keep a thread-safe counter of the current index.
Wrapping it up
We now have a general idea of how to create and communicate between new processes, let’s try and test our knowledge by building a toy app to verify the methods we have learned about: fork
and pipe
.
reader, writer = IO.pipe
process_id = Process.fork do
writer.write "Hello"
end
if process_id
# fork will return nil inside the child process
# only the parent process wil execute this block
puts "Message from child: #{reader.read}"
end
At first glance this looks fine, but running it you will find the process hangs. We have missed something important! read
appears to be blocking because it isn’t receiving an end of file signal from the pipe. We can confirm that the communication is working otherwise by sending a newline and using gets
instead (which recieves input up to a newline character):
reader, writer = IO.pipe
process_id = Process.fork do
writer.write "Hello"
end
if process_id
puts "Message from child: #{reader.gets}"
end
This script works as expected. So why isn’t our first script working? The answer is non-obvious if you are not used to working with concurrent code, but is neatly explained in the IO.pipe
documentation:
The read end of a pipe will not generate an end of file condition if there are any writers with the pipe still open. In the case of the parent process, the
read
will never return if it does not first issue awriter.close
.
Bingo! Though our child process is closing it’s copy of the writer
implicitly when it exits, our parent process still has the original reference to writer
open! We can fix this by closing it before we try to read:
reader, writer = IO.pipe
process_id = Process.fork do
writer.write "Hello"
end
if process_id
writer.close
puts "Message from child: #{reader.read}"
end
Here are some further exercises for you to work on:
- Extend our sample script to allow the parent to also send messages to the child.
- What advantages/disadvantages are there to using
Thread.exclusive
(as parallel does) rather thanQueue
(as our last article did) for scheduling?
Let us know how you go in the comments. Tune in next week for more exciting adventures in the code jungle.
Frequently Asked Questions (FAQs) about Forks, Pipes, and the Parallel Gem
What is the Parallel gem in Ruby and why is it important?
The Parallel gem is a powerful tool in Ruby that allows for the execution of multiple tasks simultaneously. This is particularly useful when dealing with large data sets or complex computations that can be broken down into smaller, independent tasks. By running these tasks in parallel, you can significantly reduce the overall execution time, leading to more efficient and faster programs.
How does the Parallel gem work?
The Parallel gem works by creating multiple processes or threads, each running a portion of the tasks. It uses the fork and pipe system calls to create these processes and threads and to communicate between them. This allows for the distribution of tasks across multiple cores, leading to parallel execution.
How do I install the Parallel gem?
You can install the Parallel gem by using the gem install command in your terminal. The command is as follows: gem install parallel
. After running this command, the gem will be installed and ready for use in your Ruby programs.
How do I use the Parallel gem in my code?
To use the Parallel gem in your code, you first need to require it at the top of your Ruby file with require 'parallel'
. You can then use the Parallel.map
method to run a block of code in parallel. The method takes an enumerable and a block, and runs the block for each element in the enumerable in parallel.
What are the differences between processes and threads in the Parallel gem?
In the context of the Parallel gem, processes and threads are both ways to achieve parallel execution. However, they have some key differences. Processes are independent of each other and do not share memory, while threads are lighter weight and share memory with each other. This means that processes are more isolated and less likely to interfere with each other, but threads can be more efficient for tasks that require shared data.
How can I handle errors in the Parallel gem?
The Parallel gem provides several ways to handle errors. One way is to use the rescue
option, which allows you to specify a block of code to run if an error occurs. Another way is to use the finish
option, which allows you to specify a block of code to run after each piece of work, regardless of whether an error occurred.
Can I use the Parallel gem with Bundler?
Yes, you can use the Parallel gem with Bundler. To do this, you need to add the gem to your Gemfile with gem 'parallel'
, and then run bundle install
. After this, you can require the gem in your Ruby files with require 'parallel'
.
What are some common use cases for the Parallel gem?
The Parallel gem is commonly used for tasks that can be broken down into smaller, independent tasks and run simultaneously. This includes tasks like processing large data sets, performing complex computations, and handling multiple network requests.
Are there any limitations or drawbacks to using the Parallel gem?
While the Parallel gem is a powerful tool, it does have some limitations. For example, it can only be used with Ruby versions 1.9.3 and above. Additionally, because it uses multiple processes or threads, it can consume more system resources than a single-threaded program. It’s also important to note that not all tasks can be parallelized, and attempting to do so can sometimes lead to unexpected results or errors.
How can I optimize the performance of my code using the Parallel gem?
There are several ways to optimize the performance of your code using the Parallel gem. One way is to carefully choose between using processes and threads, depending on the nature of your tasks. Another way is to fine-tune the number of processes or threads used, which can be done using the in_processes
or in_threads
options. Additionally, handling errors properly can also help to improve performance.
Xavier Shay is a DataMapper committer who has contributed to Ruby on Rails. He wrote the Enki blog platform, as well as other widely used libraries, including TufteGraph for JavaScript graphing and Kronic for date formatting and parsing. In 2010, he traveled the world running Ruby on Rails workshops titled "Your Database Is Your Friend." He regularly organizes and speaks at the Melbourne Ruby on Rails Meetup, and also blogs on personal development at TwoShay, and on more esoteric coding topics at Robot Has No Heart.